Module qimview.cache
Expand source code
from .imagecache import ImageCache
from .filecache import FileCache
__all__ = ['ImageCache', 'FileCache' ]
Sub-modules
qimview.cache.basecacheqimview.cache.filecacheqimview.cache.imagecache
Classes
class FileCache-
Save output bytes from read() function into a cache indexed by the filename inherits from BaseCache, with id as string: input filename bytes: output from binary read() mtime: modification time as float from osp.getmtime(filename) If a file is in the cache but its modification time on disk is more recent, we can enable an automatic reload
Args
BaseCache:_type_- description
Expand source code
class FileCache(BaseCache[str,bytes,float]): """ Save output bytes from read() function into a cache indexed by the filename inherits from BaseCache, with id as string: input filename bytes: output from binary read() mtime: modification time as float from osp.getmtime(filename) If a file is in the cache but its modification time on disk is more recent, we can enable an automatic reload Args: BaseCache (_type_): _description_ """ def __init__(self): BaseCache.__init__(self, "FileCache") total_memory : float = psutil.virtual_memory().total / self.cache_unit # let use 5% of total memory by default self.max_cache_size : int = int(total_memory * 0.05) self.last_progress = 0 self.verbose : bool = False def has_file(self, filename): # is it too slow filename = os.path.abspath(filename) return filename in self.cache_list def get_file(self, filename: str, check_size: bool = True) -> Tuple[Optional[bytes], bool]: """_summary_ Args: filename (str): file to read, is supposed to exist on disk, if not an exception will be raised by Python standard methods check_size (bool, optional): _description_. Defaults to True. Returns: Tuple[Optional[bytes], bool]: first elt is the data read second is a boolean saying if it comes or not from the cache """ # print(f'get_file {filename}') start = get_time() # Get absolute normalized path filename = os.path.abspath(filename) # Get file last modification time mtime = os.path.getmtime(filename) # print(f"image cache get_image({filename})") file_data = self.search(filename) if file_data is not None: if file_data[2]>=mtime: # print(f'get_file {filename} found end') return file_data[1], True else: # Remove element from cache? print("Removing outdated cache data from file {filename}") self.cache.remove(file_data) self.cache_list.remove(filename) try: # Read file as binary data self._print_log(" FileCache::get_file() before read() {0:0.3f} sec.".format(get_time() - start)) with open(filename, 'rb') as f: file_data = f.read() self._print_log(" FileCache::get_file() after read() {0:0.3f} sec.".format(get_time() - start)) self.append(filename, file_data, extra=mtime, check_size=check_size) self._print_log(" FileCache::get_file() after append took {0:0.3f} sec.".format(get_time() - start)) except Exception as e: print("Failed to load image {0}: {1}".format(filename, e)) return None, False else: # print(f'get_file {filename} read end') return file_data, False def thread_add_files(self, filenames, progress_callback = None): """ :param filename: :param show_timing: :return: pair image_data, boolean (True is coming from cache) """ nb_files = len(filenames) nb_new_files = 0 for n, f in enumerate(filenames): if nb_new_files>80: break if f is not None and not self.has_file(f): data, flag = self.get_file(f, check_size=False) # slow down to simulate network drive # sleep(.500) if not flag: nb_new_files += 1 if progress_callback is not None: progress_callback.emit(int(n*100/nb_files+0.5)) # if n%10==0: # self.check_size_limit() def file_added(self, filename): pass # print(f" *** Added image {os.path.basename(filename)} to cache finished") def add_result(self, r): # print(f"adding {r} to results ") pass def show_progress(self, val): if val!=self.last_progress: print(f"add files done {val} %") self.check_size_limit() self.last_progress = val def on_finished(self): self.check_size_limit() def add_files(self, filenames): self.thread_pool.clear() start = get_time() self.add_results = [] # print(f" start worker with image {f}") # This part may be causing issues use_threads = True if use_threads: self.thread_pool.set_worker(self.thread_add_files, filenames) self.thread_pool.set_worker_callbacks(finished_cb=self.on_finished) self.thread_pool.start_worker() else: self.thread_add_files(filenames, progress_cb=self.show_progress) self._print_log(f" FileCache.add_files() {self.add_results} took {int((get_time()-start)*1000+0.5)} ms;")Ancestors
- BaseCache
- typing.Generic
Methods
def add_files(self, filenames)-
Expand source code
def add_files(self, filenames): self.thread_pool.clear() start = get_time() self.add_results = [] # print(f" start worker with image {f}") # This part may be causing issues use_threads = True if use_threads: self.thread_pool.set_worker(self.thread_add_files, filenames) self.thread_pool.set_worker_callbacks(finished_cb=self.on_finished) self.thread_pool.start_worker() else: self.thread_add_files(filenames, progress_cb=self.show_progress) self._print_log(f" FileCache.add_files() {self.add_results} took {int((get_time()-start)*1000+0.5)} ms;") def add_result(self, r)-
Expand source code
def add_result(self, r): # print(f"adding {r} to results ") pass def file_added(self, filename)-
Expand source code
def file_added(self, filename): pass # print(f" *** Added image {os.path.basename(filename)} to cache finished") def get_file(self, filename: str, check_size: bool = True) ‑> Tuple[Optional[bytes], bool]-
summary
Args
filename:str- file to read, is supposed to exist on disk, if not an exception will be raised by Python standard methods
check_size:bool, optional- description. Defaults to True.
Returns
Tuple[Optional[bytes], bool]- first elt is the data read second is a boolean saying if it comes or not from the cache
Expand source code
def get_file(self, filename: str, check_size: bool = True) -> Tuple[Optional[bytes], bool]: """_summary_ Args: filename (str): file to read, is supposed to exist on disk, if not an exception will be raised by Python standard methods check_size (bool, optional): _description_. Defaults to True. Returns: Tuple[Optional[bytes], bool]: first elt is the data read second is a boolean saying if it comes or not from the cache """ # print(f'get_file {filename}') start = get_time() # Get absolute normalized path filename = os.path.abspath(filename) # Get file last modification time mtime = os.path.getmtime(filename) # print(f"image cache get_image({filename})") file_data = self.search(filename) if file_data is not None: if file_data[2]>=mtime: # print(f'get_file {filename} found end') return file_data[1], True else: # Remove element from cache? print("Removing outdated cache data from file {filename}") self.cache.remove(file_data) self.cache_list.remove(filename) try: # Read file as binary data self._print_log(" FileCache::get_file() before read() {0:0.3f} sec.".format(get_time() - start)) with open(filename, 'rb') as f: file_data = f.read() self._print_log(" FileCache::get_file() after read() {0:0.3f} sec.".format(get_time() - start)) self.append(filename, file_data, extra=mtime, check_size=check_size) self._print_log(" FileCache::get_file() after append took {0:0.3f} sec.".format(get_time() - start)) except Exception as e: print("Failed to load image {0}: {1}".format(filename, e)) return None, False else: # print(f'get_file {filename} read end') return file_data, False def has_file(self, filename)-
Expand source code
def has_file(self, filename): # is it too slow filename = os.path.abspath(filename) return filename in self.cache_list def on_finished(self)-
Expand source code
def on_finished(self): self.check_size_limit() def show_progress(self, val)-
Expand source code
def show_progress(self, val): if val!=self.last_progress: print(f"add files done {val} %") self.check_size_limit() self.last_progress = val def thread_add_files(self, filenames, progress_callback=None)-
:param filename: :param show_timing: :return: pair image_data, boolean (True is coming from cache)
Expand source code
def thread_add_files(self, filenames, progress_callback = None): """ :param filename: :param show_timing: :return: pair image_data, boolean (True is coming from cache) """ nb_files = len(filenames) nb_new_files = 0 for n, f in enumerate(filenames): if nb_new_files>80: break if f is not None and not self.has_file(f): data, flag = self.get_file(f, check_size=False) # slow down to simulate network drive # sleep(.500) if not flag: nb_new_files += 1 if progress_callback is not None: progress_callback.emit(int(n*100/nb_files+0.5)) # if n%10==0: # self.check_size_limit()
Inherited members
class ImageCache-
Save output bytes from read() function into a cache indexed by the filename inherits from BaseCache, with id as string: input filename ViewerImage: image object mtime: modification time as float from osp.getmtime(filename) If a file is in the cache but its modification time on disk is more recent, we can enable an automatic reload
Args
BaseCache:_type_- description
Expand source code
class ImageCache(BaseCache[str, ViewerImage, float]): """ Save output bytes from read() function into a cache indexed by the filename inherits from BaseCache, with id as string: input filename ViewerImage: image object mtime: modification time as float from osp.getmtime(filename) If a file is in the cache but its modification time on disk is more recent, we can enable an automatic reload Args: BaseCache (_type_): _description_ """ def __init__(self): BaseCache.__init__(self, "ImageCache") # let use 25% of total memory total_memory = psutil.virtual_memory().total / self.cache_unit self.max_cache_size = int(total_memory * 0.25) self.verbose : bool = False def has_image(self, filename): # is it too slow filename = os.path.abspath(filename) return filename in self.cache_list def get_image(self, filename, read_size='full', verbose=False, use_RGB=True, image_transform=None, check_size=True): """ :param filename: :param show_timing: :return: pair image_data, boolean (True is coming from cache) """ start = get_time() # Get absolute normalized path filename = os.path.abspath(filename) # print(f"image cache get_image({filename})") image_data = self.search(filename) mtime = os.path.getmtime(filename) if image_data is not None: if image_data[2] >= mtime: return image_data[1], True else: # Remove outdated cache element self.cache.remove(image_data) self.cache_list.remove(filename) image = gb_image_reader.read(filename, None, read_size, use_RGB=use_RGB, verbose=verbose, check_filecache_size=check_size) if image is not None: if image_transform is not None: image = image_transform(image) self.append(filename, image, extra=mtime, check_size=check_size) self._print_log(" get_image after read_image took {0:0.3f} sec.".format(get_time() - start)) else: print(f"Failed to load image {filename}") return None, False return image, False def add_image(self, filename, read_size='full', verbose=False, use_RGB=True, image_transform=None, progress_callback = None): """ :param filename: :param show_timing: :return: pair image_data, boolean (True is coming from cache) """ data, flag = self.get_image(filename, read_size, verbose, use_RGB, image_transform, check_size=False) return data is not None def image_added(self, filename): pass # print(f" *** Added image {os.path.basename(filename)} to cache finished") def add_result(self, r): # print(f"adding {r} to results ") self.add_results.append(r) def add_images(self, filenames, read_size='full', verbose=False, use_RGB=True, image_transform=None): start = get_time() self.add_results = [] num_workers = 0 for f in filenames: if f is not None and not self.has_image(f): # print(f" start worker with image {f}") self.thread_pool.set_worker(self.add_image, f, read_size, verbose, use_RGB, image_transform) self.thread_pool.set_worker_callbacks(finished_cb=lambda: self.image_added(f), result_cb=self.add_result) self.thread_pool.start_worker() num_workers += 1 wait_time = 0 while len(self.add_results) != num_workers and wait_time < 2: # wait 5 ms before checking again #sleep(0.002) self.thread_pool.waitForDone(2) wait_time += 0.002 self._print_log(f" ImageCache.add_images() wait_time {wait_time} took {int((get_time()-start)*1000+0.5)} ms;") if num_workers>0: self.thread_pool.clear() # There could be unfinished thread here? # It seems that the progress bar must be updated outside the threads self.check_size_limit(update_progress=True) if gb_image_reader.file_cache is not None: gb_image_reader.file_cache.check_size_limit(update_progress=True) assert len(self.add_results) == num_workers, "Workers left" self._print_log(f" ImageCache.add_images() {num_workers}, {self.add_results} took {int((get_time()-start)*1000+0.5)} ms;")Ancestors
- BaseCache
- typing.Generic
Methods
def add_image(self, filename, read_size='full', verbose=False, use_RGB=True, image_transform=None, progress_callback=None)-
:param filename: :param show_timing: :return: pair image_data, boolean (True is coming from cache)
Expand source code
def add_image(self, filename, read_size='full', verbose=False, use_RGB=True, image_transform=None, progress_callback = None): """ :param filename: :param show_timing: :return: pair image_data, boolean (True is coming from cache) """ data, flag = self.get_image(filename, read_size, verbose, use_RGB, image_transform, check_size=False) return data is not None def add_images(self, filenames, read_size='full', verbose=False, use_RGB=True, image_transform=None)-
Expand source code
def add_images(self, filenames, read_size='full', verbose=False, use_RGB=True, image_transform=None): start = get_time() self.add_results = [] num_workers = 0 for f in filenames: if f is not None and not self.has_image(f): # print(f" start worker with image {f}") self.thread_pool.set_worker(self.add_image, f, read_size, verbose, use_RGB, image_transform) self.thread_pool.set_worker_callbacks(finished_cb=lambda: self.image_added(f), result_cb=self.add_result) self.thread_pool.start_worker() num_workers += 1 wait_time = 0 while len(self.add_results) != num_workers and wait_time < 2: # wait 5 ms before checking again #sleep(0.002) self.thread_pool.waitForDone(2) wait_time += 0.002 self._print_log(f" ImageCache.add_images() wait_time {wait_time} took {int((get_time()-start)*1000+0.5)} ms;") if num_workers>0: self.thread_pool.clear() # There could be unfinished thread here? # It seems that the progress bar must be updated outside the threads self.check_size_limit(update_progress=True) if gb_image_reader.file_cache is not None: gb_image_reader.file_cache.check_size_limit(update_progress=True) assert len(self.add_results) == num_workers, "Workers left" self._print_log(f" ImageCache.add_images() {num_workers}, {self.add_results} took {int((get_time()-start)*1000+0.5)} ms;") def add_result(self, r)-
Expand source code
def add_result(self, r): # print(f"adding {r} to results ") self.add_results.append(r) def get_image(self, filename, read_size='full', verbose=False, use_RGB=True, image_transform=None, check_size=True)-
:param filename: :param show_timing: :return: pair image_data, boolean (True is coming from cache)
Expand source code
def get_image(self, filename, read_size='full', verbose=False, use_RGB=True, image_transform=None, check_size=True): """ :param filename: :param show_timing: :return: pair image_data, boolean (True is coming from cache) """ start = get_time() # Get absolute normalized path filename = os.path.abspath(filename) # print(f"image cache get_image({filename})") image_data = self.search(filename) mtime = os.path.getmtime(filename) if image_data is not None: if image_data[2] >= mtime: return image_data[1], True else: # Remove outdated cache element self.cache.remove(image_data) self.cache_list.remove(filename) image = gb_image_reader.read(filename, None, read_size, use_RGB=use_RGB, verbose=verbose, check_filecache_size=check_size) if image is not None: if image_transform is not None: image = image_transform(image) self.append(filename, image, extra=mtime, check_size=check_size) self._print_log(" get_image after read_image took {0:0.3f} sec.".format(get_time() - start)) else: print(f"Failed to load image {filename}") return None, False return image, False def has_image(self, filename)-
Expand source code
def has_image(self, filename): # is it too slow filename = os.path.abspath(filename) return filename in self.cache_list def image_added(self, filename)-
Expand source code
def image_added(self, filename): pass # print(f" *** Added image {os.path.basename(filename)} to cache finished")
Inherited members