Skip to content

PiCameraComm

plantimager.controller.camera.PiCameraComm Link

PiCameraComm Link

PiCameraComm(context, url, parent=None)

Bases: QObject

Object that will handle communication with the picamera. Meant to live in a separate thread.

Serves as a bridge between Qt and the picamera.

After init use moveToThread() to change the execution context.

Source code in plantimager/controller/camera/PiCameraComm.py
34
35
36
37
38
39
40
41
42
def __init__(self, context: zmq.Context, url: str, parent: QObject = None):
    QObject.__init__(self, parent)
    self.camera = PiCameraProxy(context, url)
    self._thread_pool = ThreadPoolExecutor(max_workers=1, thread_name_prefix=f"{self.__class__.__name__}Thread")
    self.camera.modeChanged.connect(lambda *args: self.modeChanged.emit(*args))
    self.camera.videoUrlChanged.connect(lambda *args: self.videoUrlChanged.emit(*args))
    self.camera.rotationChanged.connect(lambda *args: self.rotationChanged.emit(*args))
    self.camera.resolutionChanged.connect(lambda res: self.resolutionChanged.emit(*res))
    finalize(self, self._finalize)

getImage Link

getImage()

Submit call to camera.get_image() and returns a future representing the pending result. When camera.get_image() returns and the result is available, the signal imageReady is emitted.

Returns:

Name Type Description
future Future[tuple[memoryview, dict]]
Source code in plantimager/controller/camera/PiCameraComm.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Slot()
def getImage(self) -> Future[tuple[memoryview, dict]]:
    """
    Submit call to camera.get_image() and returns a future representing the pending result.
    When camera.get_image() returns and the result is available, the signal imageReady is emitted.

    Returns
    -------
    future : Future[tuple[memoryview, dict]]

    """
    def _callback(ft_: Future):
        if ft_.cancelled(): return
        res = ft_.result()
        if res:
            buffer, buffer_info = res
            self.imageReady.emit(buffer, buffer_info)
    ft = self._thread_pool.submit(self.camera.get_image)
    ft.add_done_callback(_callback)
    return ft