Skip to content

deviceregistry

plantimager.commons.deviceregistry Link

DeviceRegistry Link

DeviceRegistry(context, addr='*', port='5555')

Bases: Thread

A thread-based Device Registry for managing registering and unregistering devices.

This class provides a thread-based mechanism to handle device registration and unregistration events. It communicates using ZeroMQ sockets and fires appropriate callbacks when devices are added or removed.

Attributes:

Name Type Description
addr str

Address on which the registry will bind.

port str

Port on which the registry will listen for messages.

context Context

ZeroMQ context used for setting up the communication socket.

devices dict of str to tuple of (str, str)

Dictionary mapping device names to a tuple of (device_type, address).

_new_device_callbacks list of Callable

List of callback functions to execute when a new device is added.

_device_removed_callbacks list of Callable

List of callback functions to execute when a device is removed.

_stop bool

Control flag to indicate whether the registry thread should stop.

_callback_events_to_process dict of str (Literal["added", "removed"]) to list of tuple

Dictionary containing lists of registered or removed devices to process callbacks for.

Source code in plantimager/commons/deviceregistry.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def __init__(self, context: zmq.Context, addr: str = "*", port: str = "5555"):
    super().__init__()
    self.addr: str = addr
    self.port: str = port
    self.context: zmq.Context = context
    self._new_device_callbacks: list[Callable] = []
    self._device_removed_callbacks: list[Callable] = []
    self._callback_events_to_process: dict[Literal["added", "removed"], list[tuple[str, str, str]]] = {
        "added": [], "removed": []
    }
    self.devices: dict[str, tuple[str, str]] = {}  # name --> (device_type, addr)
    self.device_names: dict[str, str] = {} # uuid --> name
    self._stop = False

add_device_removed_callback Link

add_device_removed_callback(callback)

Register a callback function to be executed when a device is removed.

Source code in plantimager/commons/deviceregistry.py
227
228
229
230
231
def add_device_removed_callback(self, callback: Callable):
    """
    Register a callback function to be executed when a device is removed.
    """
    self._device_removed_callbacks.append(callback)

add_new_device_callback Link

add_new_device_callback(callback)

Register a callback function to be executed when a new device is registered.

Source code in plantimager/commons/deviceregistry.py
221
222
223
224
225
def add_new_device_callback(self, callback: Callable):
    """
    Register a callback function to be executed when a new device is registered.
    """
    self._new_device_callbacks.append(callback)

get_devices Link

get_devices()

Returns the registered devices.

Returns:

Type Description
dict[str, tuple[str, str]]

A dictionary where the keys are device identifiers as strings and the values are tuples containing the name and type of the device as strings.

Source code in plantimager/commons/deviceregistry.py
233
234
235
236
237
238
239
240
241
242
243
def get_devices(self) -> dict[str, tuple[str, str]]:
    """
    Returns the registered devices.

    Returns
    -------
    dict[str, tuple[str, str]]
        A dictionary where the keys are device identifiers as strings and the
        values are tuples containing the name and type of the device as strings.
    """
    return self.devices

run Link

run()

Runs the main loop for the registry service, handling device registration, unregistration, and device-related event callbacks. The service listens for messages using ZeroMQ (zmq) and processes the events accordingly.

Source code in plantimager/commons/deviceregistry.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def run(self):
    """
    Runs the main loop for the registry service, handling device registration, unregistration, and
    device-related event callbacks. The service listens for messages using ZeroMQ (zmq) and processes
    the events accordingly.
    """
    with self.context.socket(zmq.REP) as socket:
        socket.bind(f"tcp://{self.addr}:{self.port}")
        logger.info(f"Starting registry on {self.addr}:{self.port}")
        while not self._stop:
            # in order for the thread to stop properly, it must not block indefinitely while waiting for
            # a message, hence polling for 100 ms and continue-ing if timeout
            if socket.poll(100) == 0:
                continue
            message = socket.recv_json()
            event_type: str = message["event"]
            payload: dict = message["payload"]
            logger.debug(f"Received event: {event_type}, {payload}")
            match event_type:
                case EventType.REGISTER:
                    device_type = payload["device_type"]
                    addr = payload["addr"]
                    proposed_name = payload["name"]
                    overwrite = payload["overwrite"] if "overwrite" in payload else False
                    name, uuid = self._handle_register(device_type, addr, proposed_name, overwrite=overwrite)
                    socket.send_json({
                        "event": EventType.REGISTER_ACK,
                        "payload": {
                            "name": name,
                            "uuid": str(uuid),
                        }
                    })
                case EventType.UNREGISTER:
                    uuid = payload["uuid"]
                    if uuid in self.device_names:
                        result = self._remove_device(uuid)
                    else:
                        result = False
                        logger.warning(f"Device of id {uuid} not found in registry")
                    socket.send_json({
                        "event": EventType.ACK,
                        "payload": {
                            "req_event": EventType.UNREGISTER,
                            "success": result,
                        }
                    })

                case _:
                    logger.warning(f"Unknown event type: {event_type}")
                    socket.send_json({
                        "event": EventType.UNSUPPORTED,
                        "payload": {
                            "req_event": event_type,
                        }
                    })

            # Fire callbacks for removed and added devices after responding back
            for device_type, addr, name in self._callback_events_to_process["removed"]:
                for callback in self._device_removed_callbacks:
                    callback(device_type, addr, name)
            self._callback_events_to_process["removed"].clear()
            for device_type, addr, name in self._callback_events_to_process["added"]:
                for callback in self._new_device_callbacks:
                    callback(device_type, addr, name)
            self._callback_events_to_process["added"].clear()
    logger.info(f"Registry stopped on {self.addr}:{self.port}")

stop Link

stop()

Stop the registry thread.

Source code in plantimager/commons/deviceregistry.py
108
109
110
111
112
def stop(self):
    """Stop the registry thread."""
    self._device_removed_callbacks = []
    self._new_device_callbacks = []
    self._stop = True

register_device Link

register_device(context, device_type, addr, name, registry_url, overwrite=False)

Register device of type device_type and of address addr to registry at registry_url. Proposes name to registry.

Returns accepted device name and the uuid of the device if successful, otherwise an empty string. The uuid must be kept to unregister the device later.

Source code in plantimager/commons/deviceregistry.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def register_device(context: zmq.Context, device_type: str, addr: str, name: str, registry_url: str, overwrite: bool=False) -> str:
    """
    Register device of type `device_type` and of address `addr` to registry at `registry_url`.
    Proposes name to registry.

    Returns accepted device name and the uuid of the device if successful, otherwise an empty string.
    The uuid must be kept to unregister the device later.
    """
    with context.socket(zmq.REQ) as socket:
        socket.connect(registry_url)
        socket.send_json({
            "event": EventType.REGISTER,
            "payload": {
                "device_type": device_type,
                "addr": addr,
                "name": name,
                "overwrite": overwrite,
            }
        })
        reply = socket.recv_json()
        event_type = reply["event"]
        payload = reply["payload"]
        if event_type == EventType.REGISTER_ACK:
            registered_name = payload["name"]
            uuid = payload["uuid"]
            socket.close()
            return registered_name, uuid
        socket.close()
    return ""

unregister_device Link

unregister_device(context, uuid, registry_addr)

Unregister the device of the given uuid from the registry at registry_addr. Returns True if the device was unregistered successfully.

Source code in plantimager/commons/deviceregistry.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def unregister_device(context: zmq.Context, uuid: str, registry_addr: str) -> bool:
    """
    Unregister the device of the given uuid from the registry at `registry_addr`.
    Returns True if the device was unregistered successfully.
    """
    with context.socket(zmq.REQ) as socket:
        socket: zmq.Socket
        socket.connect(registry_addr)
        socket.send_json({
            "event": EventType.UNREGISTER,
            "payload": {
                "uuid": uuid,
            }
        })
        if socket.poll(5000, flags=zmq.POLLIN) == 0:
            logger.debug(f"No answer from registry at address {registry_addr}. Closing")
            socket.close()
            return False
        reply = socket.recv_json()
        event_type = reply["event"]
        if event_type == EventType.ACK:
            socket.close()
            return True
        socket.close()
        return False