Skip to content

deviceregistry

plantimager.commons.deviceregistry Link

DeviceRegistry Link

DeviceRegistry(context, addr='*', port='5555')

Bases: Thread

A thread-based Device Registry for managing registering and unregistering devices.

This class provides a thread-based mechanism to handle device registration and unregistration events. It communicates using ZeroMQ sockets and fires appropriate callbacks when devices are added or removed.

Attributes:

Name Type Description
addr str

Address on which the registry will bind.

port str

Port on which the registry will listen for messages.

context Context

ZeroMQ context used for setting up the communication socket.

devices dict of str to tuple of (str, str)

Dictionary mapping device names to a tuple of (device_type, address).

_new_device_callbacks list of Callable

List of callback functions to execute when a new device is added.

_device_removed_callbacks list of Callable

List of callback functions to execute when a device is removed.

_stop_flag bool

Control flag to indicate whether the registry thread should stop.

_callback_events_to_process dict of str (Literal["added", "removed"]) to list of tuple

Dictionary containing lists of registered or removed devices to process callbacks for.

Source code in plantimager/commons/deviceregistry.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def __init__(self, context: zmq.Context, addr: str = "*", port: str = "5555"):
    super().__init__()
    self.addr: str = addr
    self.port: str = port
    self.context: zmq.Context = context
    self._lock = RLock()
    self._new_device_callbacks: list[Callable] = []
    self._device_removed_callbacks: list[Callable] = []
    self._callback_events_to_process: dict[Literal["added", "removed"], list[tuple[str, str, str]]] = {
        "added": [], "removed": []
    }
    self.devices: dict[str, tuple[str, str]] = {}  # name --> (device_type, addr)
    self.device_names: dict[str, str] = {}  # uuid --> name
    self.device_health_timeout: dict[str, int] = {} # uuid --> expiration time
    self._stop_flag = False

add_device_removed_callback Link

add_device_removed_callback(callback)

Register a callback function to be executed when a device is removed.

Source code in plantimager/commons/deviceregistry.py
348
349
350
351
352
def add_device_removed_callback(self, callback: Callable):
    """
    Register a callback function to be executed when a device is removed.
    """
    self._device_removed_callbacks.append(callback)

add_new_device_callback Link

add_new_device_callback(callback)

Register a callback function to be executed when a new device is registered.

Source code in plantimager/commons/deviceregistry.py
342
343
344
345
346
def add_new_device_callback(self, callback: Callable):
    """
    Register a callback function to be executed when a new device is registered.
    """
    self._new_device_callbacks.append(callback)

get_devices Link

get_devices()

Returns the registered devices.

Returns:

Type Description
dict[str, tuple[str, str]]

A dictionary where the keys are device identifiers as strings and the values are tuples containing the name and type of the device as strings.

Source code in plantimager/commons/deviceregistry.py
354
355
356
357
358
359
360
361
362
363
364
365
def get_devices(self) -> dict[str, tuple[str, str]]:
    """
    Returns the registered devices.

    Returns
    -------
    dict[str, tuple[str, str]]
        A dictionary where the keys are device identifiers as strings and the
        values are tuples containing the name and type of the device as strings.
    """
    with self._lock:
        return self.devices

run Link

run()

Runs the main loop for the registry service, handling device registration, unregistration, and device-related event callbacks. The service listens for messages using ZeroMQ (zmq) and processes the events accordingly.

Source code in plantimager/commons/deviceregistry.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def run(self):
    """
    Runs the main loop for the registry service, handling device registration, unregistration, and
    device-related event callbacks. The service listens for messages using ZeroMQ (zmq) and processes
    the events accordingly.
    """
    with self.context.socket(zmq.REP) as socket:
        socket.bind(f"tcp://{self.addr}:{self.port}")
        logger.info(f"Starting registry on {self.addr}:{self.port}")
        while not self._stop_flag:
            self._prune_unhealthy_devices()
            # Fire callbacks for removed and added devices after responding back
            for device_type, addr, name in self._callback_events_to_process["removed"]:
                for callback in self._device_removed_callbacks:
                    callback(device_type, addr, name)
                logger.debug(f"Processed REMOVE callback for {device_type}, {addr}, {name}")
            self._callback_events_to_process["removed"].clear()
            for device_type, addr, name in self._callback_events_to_process["added"]:
                for callback in self._new_device_callbacks:
                    callback(device_type, addr, name)
                logger.debug(f"Processed ADDED callback for {device_type}, {addr}, {name}")
            self._callback_events_to_process["added"].clear()

            # in order for the thread to stop properly, it must not block indefinitely while waiting for
            # a message, hence polling for 100 ms and continue-ing if timeout
            if socket.poll(100) == 0:
                continue
            message = socket.recv_json()
            event_type: str = message["event"]
            payload: dict = message["payload"]
            logger.debug(f"Received event: {event_type}, {payload}")
            match event_type:
                case EventType.REGISTER:
                    device_type = payload["device_type"]
                    addr = payload["addr"]
                    proposed_name = payload["name"]
                    overwrite = payload["overwrite"] if "overwrite" in payload else False
                    name, uuid = self._handle_register(device_type, addr, proposed_name, overwrite=overwrite)
                    socket.send_json({
                        "event": EventType.REGISTER_ACK,
                        "payload": {
                            "name": name,
                            "uuid": str(uuid),
                        }
                    })
                case EventType.UNREGISTER:
                    uuid = payload["uuid"]
                    if uuid in self.device_names:
                        result = self._remove_device_by_uuid(uuid)
                    else:
                        result = False
                        logger.warning(f"Device of id {uuid} not found in registry")
                    socket.send_json({
                        "event": EventType.ACK,
                        "payload": {
                            "req_event": EventType.UNREGISTER,
                            "success": result,
                        }
                    })

                case EventType.CHECK_ALIVE:
                    uuid = payload["uuid"]
                    timeout = payload.get("alive_timeout", ALIVE_TIMEOUT)
                    if uuid in self.device_health_timeout:
                        self.device_health_timeout[uuid] = int(time.time()) + timeout
                        socket.send_json({
                            "event": EventType.ACK,
                            "payload": {
                                "req_event": EventType.CHECK_ALIVE,
                                "success": True,
                            }
                        })
                    else:
                        socket.send_json({
                            "event": EventType.ACK,
                            "payload": {
                                "req_event": EventType.CHECK_ALIVE,
                                "success": False,
                            }
                        })
                case _:
                    logger.warning(f"Unknown event type: {event_type}")
                    socket.send_json({
                        "event": EventType.UNSUPPORTED,
                        "payload": {
                            "req_event": event_type,
                        }
                    })
    logger.info(f"Registry stopped on {self.addr}:{self.port}")

stop Link

stop()

Stop the registry thread.

Source code in plantimager/commons/deviceregistry.py
116
117
118
119
120
def stop(self):
    """Stop the registry thread."""
    self._device_removed_callbacks = []
    self._new_device_callbacks = []
    self._stop_flag = True

register_device Link

register_device(context, device_type, addr, name, registry_url, overwrite=False)

Register device of type device_type and of address addr to registry at registry_url. Proposes name to registry.

Returns accepted device name and the uuid of the device if successful, otherwise an empty string. The uuid must be kept to unregister the device later.

Source code in plantimager/commons/deviceregistry.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
def register_device(
        context: zmq.Context, device_type: str, addr: str, name: str, registry_url: str, overwrite: bool=False
) -> tuple[str, str]:
    """
    Register device of type `device_type` and of address `addr` to registry at `registry_url`.
    Proposes name to registry.

    Returns accepted device name and the uuid of the device if successful, otherwise an empty string.
    The uuid must be kept to unregister the device later.
    """
    with context.socket(zmq.REQ) as socket:
        with socket.connect(registry_url):
            socket.send_json({
                "event": EventType.REGISTER,
                "payload": {
                    "device_type": device_type,
                    "addr": addr,
                    "name": name,
                    "overwrite": overwrite,
                }
            })
            reply = socket.recv_json()
            event_type = reply["event"]
            payload = reply["payload"]
            if event_type == EventType.REGISTER_ACK:
                registered_name = payload["name"]
                uuid = payload["uuid"]
                return registered_name, uuid
    return "", ""

send_alive_check Link

send_alive_check(context, uuid, registry_url, alive_timeout=ALIVE_TIMEOUT)

Check the liveness of a service with the registry.

Send an EventType.CHECK_ALIVE request to the registry and wait for an EventType.ACK reply. The function opens a REQ socket on the provided ZeroMQ context, sends the payload containing uuid and alive_timeout, and returns True only when an acknowledgement is received within the socket poll timeout (5s). Any other reply or the lack of a reply results in False.

Parameters:

Name Type Description Default
context Context

ZeroMQ :class:zmq.Context used to create the REQ socket.

required
uuid str

Unique identifier of the service performing the alive check.

required
registry_url str

URL of the registry (e.g., tcp://127.0.0.1:5555) to which the request is sent.

required
alive_timeout int

Timeout (in seconds) that the service claims it will stay alive. The value is included in the request payload. Defaults to ALIVE_TIMEOUT.

ALIVE_TIMEOUT

Returns:

Type Description
str

'ok' if alive check successfull 'unreachable' if the check attempt timed out 'unknown' if the registry does not know this service; might have already been removed

Raises:

Type Description
ZMQError

Propagated if ZeroMQ encounters an error while creating the socket, connecting, sending, or receiving messages.

Notes
  • The socket is created inside a with block, guaranteeing that it is closed when the block exits. The explicit socket.close() calls are retained for clarity but are not strictly required.
  • The function uses a fixed poll timeout of 5 seconds; this value is not configurable via the public API.
  • alive_timeout is merely echoed back to the registry and is not used by this function to enforce any timing constraints.
See Also

EventType – enumeration of supported event types.

Source code in plantimager/commons/deviceregistry.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
def send_alive_check(context: zmq.Context, uuid: str, registry_url: str, alive_timeout: int=ALIVE_TIMEOUT) \
        -> AliveCheckState:
    """Check the liveness of a service with the registry.

    Send an ``EventType.CHECK_ALIVE`` request to the registry and wait for an
    ``EventType.ACK`` reply.  The function opens a ``REQ`` socket on the provided
    ZeroMQ ``context``, sends the payload containing ``uuid`` and ``alive_timeout``,
    and returns ``True`` only when an acknowledgement is received within the
    socket poll timeout (5s).  Any other reply or the lack of a reply results in
    ``False``.

    Parameters
    ----------
    context
        ZeroMQ :class:`zmq.Context` used to create the ``REQ`` socket.
    uuid
        Unique identifier of the service performing the alive check.
    registry_url
        URL of the registry (e.g., ``tcp://127.0.0.1:5555``) to which the request
        is sent.
    alive_timeout
        Timeout (in seconds) that the service claims it will stay alive.  The
        value is included in the request payload.  Defaults to ``ALIVE_TIMEOUT``.

    Returns
    -------
    str
        'ok' if alive check successfull
        'unreachable' if the check attempt timed out
        'unknown' if the registry does not know this service; might have already been removed

    Raises
    ------
    zmq.ZMQError
        Propagated if ZeroMQ encounters an error while creating the socket,
        connecting, sending, or receiving messages.

    Notes
    -----
    * The socket is created inside a ``with`` block, guaranteeing that it is
      closed when the block exits.  The explicit ``socket.close()`` calls are
      retained for clarity but are not strictly required.
    * The function uses a fixed poll timeout of 5 seconds; this value is not
      configurable via the public API.
    * ``alive_timeout`` is merely echoed back to the registry and is not used by
      this function to enforce any timing constraints.

    See Also
    --------
    `EventType` – enumeration of supported event types.
    """
    with context.socket(zmq.REQ) as socket:
        socket: zmq.Socket
        socket.connect(registry_url)
        socket.send_json({
            "event": EventType.CHECK_ALIVE,
            "payload": {
                "uuid": uuid,
                "alive_timeout": alive_timeout,
            }
        })
        if socket.poll(5000, flags=zmq.POLLIN) == 0:
            logger.debug(f"No answer from registry at address {registry_url}.")
            return AliveCheckState.UNREACHABLE
        reply = socket.recv_json()
        event_type = reply["event"]
        if event_type == EventType.ACK and reply.get("payload", {}).get("success", False):
            return AliveCheckState.OK
        return AliveCheckState.UNKNOWN

unregister_device Link

unregister_device(context, uuid, registry_addr)

Unregister the device of the given uuid from the registry at registry_addr. Returns True if the device was unregistered successfully.

Source code in plantimager/commons/deviceregistry.py
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
def unregister_device(context: zmq.Context, uuid: str, registry_addr: str) -> bool:
    """
    Unregister the device of the given uuid from the registry at `registry_addr`.
    Returns True if the device was unregistered successfully.
    """
    with context.socket(zmq.REQ) as socket:
        socket: zmq.Socket
        with socket.connect(registry_addr):
            socket.send_json({
                "event": EventType.UNREGISTER,
                "payload": {
                    "uuid": uuid,
                }
            })
            if socket.poll(5000, flags=zmq.POLLIN) == 0:
                logger.debug(f"No answer from registry at address {registry_addr}. Closing")
                return False
            reply = socket.recv_json()
            event_type = reply["event"]
            if event_type == EventType.ACK:
                return True
            return False