Skip to content

PiCameraComm

plantimager.controller.camera.PiCameraComm Link

PiCameraComm Link

PiCameraComm(context, url, parent=None)

Bases: QObject

Object that will handle communication with the picamera. Meant to live in a separate thread.

Serves as a bridge between Qt and the picamera.

After init use moveToThread() to change the execution context.

Source code in plantimager/controller/camera/PiCameraComm.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def __init__(self, context: zmq.Context, url: str, parent: QObject = None):
    QObject.__init__(self, parent)
    self._state = CameraStates.DISCONNECTED
    self._context = context
    self.url = url
    self._thread_pool = ThreadPoolExecutor(max_workers=1, thread_name_prefix=f"{self.__class__.__name__}Thread")
    self._camera: PiCameraProxy | None = None
    self._attempt_connection()

    def _finalizer(pool, camera):
        pool.shutdown(wait=False)
        if camera:
            camera.stop_server()
    finalize(self, _finalizer, self._thread_pool, self._camera)

getImage Link

getImage(lores=False)

Submits a call to camera.get_image() and returns a future representing the pending result. When camera.get_image() returns and the result is available, the signal imageReady is emitted.

Returns:

Type Description
future or None : Future[tuple[memoryview, dict]] or None

returns None when the camera is unavailable

Source code in plantimager/controller/camera/PiCameraComm.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
@Slot(bool, result=Future)
def getImage(self, lores=False) -> Future[tuple[memoryview, dict]] | None:
    """
    Submits a call to camera.get_image() and returns a future representing the pending result.
    When camera.get_image() returns and the result is available, the signal imageReady is emitted.

    Returns
    -------
    future or None : Future[tuple[memoryview, dict]] or None
        returns None when the camera is unavailable

    """
    def _callback(ft_: Future):
        if ft_.cancelled(): return
        res = ft_.result()
        if res:
            buffer, buffer_info = res
            self._set_state(CameraStates.CONNECTED)
            self.imageReady.emit(buffer, buffer_info)
    if self._camera and self.state == CameraStates.CONNECTED:
        self._set_state(CameraStates.WAITING)
        ft = self._thread_pool.submit(self._camera.get_image, lores=lores)
        ft.add_done_callback(_callback)
        return ft
    else:
        return None