Module qimview.utils.thread_pool

Expand source code
# example from https://www.learnpyqt.com/tutorials/multithreading-pyqt-applications-qthreadpool/

from .qt_imports import *

import traceback, sys

class WorkerSignals(QtCore.QObject):
    '''
    Defines the signals available from a running worker thread.
    Supported signals are:
    finished
        No data
    error
        tuple (exctype, value, traceback.format_exc() )
    result
        object data returned from processing, anything
    progress
        int indicating % progress
    '''
    finished = Signal()
    error = Signal(tuple)
    result = Signal(object)
    progress = Signal(int)


class Worker(QtCore.QRunnable):
    '''
    Worker thread
    Inherits from QRunnable to handler worker thread setup, signals and wrap-up.
    :param callback: The function callback to run on this worker thread. Supplied args and
                     kwargs will be passed through to the runner.
    :type callback: function
    :param args: Arguments to pass to the callback function
    :param kwargs: Keywords to pass to the callback function

    '''

    def __init__(self, fn, *args, **kwargs):
        super(Worker, self).__init__()

        # Store constructor arguments (re-used for processing)
        self.fn = fn
        self.args = args
        self.kwargs = kwargs
        self.signals = WorkerSignals()
        self.result_cb = None

        # Add the callback to our kwargs
        self.kwargs['progress_callback'] = self.signals.progress

    def set_progress_callback(self, cb):
        if cb is not None:
            self.signals.progress.connect(cb)

    def set_finished_callback(self, cb):
        if cb is not None:
            self.signals.finished.connect(cb)

    def set_result_callback(self, cb):
        # self.signals.result.connect(cb)
        if cb is not None:
            self.result_cb = cb

    @Slot()
    def run(self):
        """
        Initialise the runner function with passed args, kwargs.
        """
        # Retrieve args/kwargs here; and fire processing using them
        try:
            result = self.fn(*self.args, **self.kwargs)
        except:
            traceback.print_exc()
            exctype, value = sys.exc_info()[:2]
            self.signals.error.emit((exctype, value, traceback.format_exc()))
        else:
            # print(" ***** emit result signal ")
            if self.result_cb is not None:
                self.result_cb(result)  # Return the result of the processing
        finally:
            self.signals.finished.emit()  # Done


class ThreadPool(QtCore.QThreadPool):
    def __init(self):
        super(ThreadPool, self).__init__()
        print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount())

    def set_worker(self, work_function, *args, **kwargs):
        self.worker = Worker(work_function,  *args, **kwargs)  # Any other args, kwargs are passed to the run function

    def set_worker_callbacks(self, progress_cb=None, finished_cb=None, result_cb=None):
        self.worker.set_progress_callback(progress_cb)
        self.worker.set_finished_callback(finished_cb)
        self.worker.set_result_callback(result_cb)

    def start_worker(self):
        self.start(self.worker)

Classes

class ThreadPool (*args, **kwargs)

QThreadPool(self, parent: Optional[PySide6.QtCore.QObject] = None) -> None

init(self, parent: Optional[PySide6.QtCore.QObject] = None) -> None

Initialize self. See help(type(self)) for accurate signature.

Expand source code
class ThreadPool(QtCore.QThreadPool):
    def __init(self):
        super(ThreadPool, self).__init__()
        print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount())

    def set_worker(self, work_function, *args, **kwargs):
        self.worker = Worker(work_function,  *args, **kwargs)  # Any other args, kwargs are passed to the run function

    def set_worker_callbacks(self, progress_cb=None, finished_cb=None, result_cb=None):
        self.worker.set_progress_callback(progress_cb)
        self.worker.set_finished_callback(finished_cb)
        self.worker.set_result_callback(result_cb)

    def start_worker(self):
        self.start(self.worker)

Ancestors

  • PySide6.QtCore.QThreadPool
  • PySide6.QtCore.QObject
  • Shiboken.Object

Class variables

var staticMetaObject

Methods

def set_worker(self, work_function, *args, **kwargs)
Expand source code
def set_worker(self, work_function, *args, **kwargs):
    self.worker = Worker(work_function,  *args, **kwargs)  # Any other args, kwargs are passed to the run function
def set_worker_callbacks(self, progress_cb=None, finished_cb=None, result_cb=None)
Expand source code
def set_worker_callbacks(self, progress_cb=None, finished_cb=None, result_cb=None):
    self.worker.set_progress_callback(progress_cb)
    self.worker.set_finished_callback(finished_cb)
    self.worker.set_result_callback(result_cb)
def start_worker(self)
Expand source code
def start_worker(self):
    self.start(self.worker)
class Worker (fn, *args, **kwargs)

Worker thread Inherits from QRunnable to handler worker thread setup, signals and wrap-up. :param callback: The function callback to run on this worker thread. Supplied args and kwargs will be passed through to the runner. :type callback: function :param args: Arguments to pass to the callback function :param kwargs: Keywords to pass to the callback function

init(self) -> None

Initialize self. See help(type(self)) for accurate signature.

Expand source code
class Worker(QtCore.QRunnable):
    '''
    Worker thread
    Inherits from QRunnable to handler worker thread setup, signals and wrap-up.
    :param callback: The function callback to run on this worker thread. Supplied args and
                     kwargs will be passed through to the runner.
    :type callback: function
    :param args: Arguments to pass to the callback function
    :param kwargs: Keywords to pass to the callback function

    '''

    def __init__(self, fn, *args, **kwargs):
        super(Worker, self).__init__()

        # Store constructor arguments (re-used for processing)
        self.fn = fn
        self.args = args
        self.kwargs = kwargs
        self.signals = WorkerSignals()
        self.result_cb = None

        # Add the callback to our kwargs
        self.kwargs['progress_callback'] = self.signals.progress

    def set_progress_callback(self, cb):
        if cb is not None:
            self.signals.progress.connect(cb)

    def set_finished_callback(self, cb):
        if cb is not None:
            self.signals.finished.connect(cb)

    def set_result_callback(self, cb):
        # self.signals.result.connect(cb)
        if cb is not None:
            self.result_cb = cb

    @Slot()
    def run(self):
        """
        Initialise the runner function with passed args, kwargs.
        """
        # Retrieve args/kwargs here; and fire processing using them
        try:
            result = self.fn(*self.args, **self.kwargs)
        except:
            traceback.print_exc()
            exctype, value = sys.exc_info()[:2]
            self.signals.error.emit((exctype, value, traceback.format_exc()))
        else:
            # print(" ***** emit result signal ")
            if self.result_cb is not None:
                self.result_cb(result)  # Return the result of the processing
        finally:
            self.signals.finished.emit()  # Done

Ancestors

  • PySide6.QtCore.QRunnable
  • Shiboken.Object

Methods

def run(self)

Initialise the runner function with passed args, kwargs.

Expand source code
@Slot()
def run(self):
    """
    Initialise the runner function with passed args, kwargs.
    """
    # Retrieve args/kwargs here; and fire processing using them
    try:
        result = self.fn(*self.args, **self.kwargs)
    except:
        traceback.print_exc()
        exctype, value = sys.exc_info()[:2]
        self.signals.error.emit((exctype, value, traceback.format_exc()))
    else:
        # print(" ***** emit result signal ")
        if self.result_cb is not None:
            self.result_cb(result)  # Return the result of the processing
    finally:
        self.signals.finished.emit()  # Done
def set_finished_callback(self, cb)
Expand source code
def set_finished_callback(self, cb):
    if cb is not None:
        self.signals.finished.connect(cb)
def set_progress_callback(self, cb)
Expand source code
def set_progress_callback(self, cb):
    if cb is not None:
        self.signals.progress.connect(cb)
def set_result_callback(self, cb)
Expand source code
def set_result_callback(self, cb):
    # self.signals.result.connect(cb)
    if cb is not None:
        self.result_cb = cb
class WorkerSignals (*args, **kwargs)

Defines the signals available from a running worker thread. Supported signals are: finished No data error tuple (exctype, value, traceback.format_exc() ) result object data returned from processing, anything progress int indicating % progress

init(self, parent: Optional[PySide6.QtCore.QObject] = None) -> None

Initialize self. See help(type(self)) for accurate signature.

Expand source code
class WorkerSignals(QtCore.QObject):
    '''
    Defines the signals available from a running worker thread.
    Supported signals are:
    finished
        No data
    error
        tuple (exctype, value, traceback.format_exc() )
    result
        object data returned from processing, anything
    progress
        int indicating % progress
    '''
    finished = Signal()
    error = Signal(tuple)
    result = Signal(object)
    progress = Signal(int)

Ancestors

  • PySide6.QtCore.QObject
  • Shiboken.Object

Class variables

var staticMetaObject

Methods

def error(...)
def finished(...)
def progress(...)
def result(...)