Skip to content

RPC

plantimager.commons.RPC Link

Remote Procedure Call (RPC) Framework.

This module provides a lightweight RPC framework built on top of ZeroMQ. It supports method execution (both JSON-serializable and binary buffers), property proxying, and a publish-subscribe signaling mechanism.

Classes:

Name Description
NoResult

Represents an operation failure with error details.

RPCSignal

Lightweight publish-subscribe signal implementation.

RPCProperty

RPC-enabled property descriptor for proxying attribute access.

RPCClient

Abstract client class for connecting to and interacting with an RPC server.

RPCServer

Server class that exposes methods, properties, and signals over the network.

NoResult Link

NoResult(error, traceback)

Represents an outcome with no result, typically used to indicate an operation failure along with associated error details.

This class encapsulates the error message and traceback information to provide a structured representation of operation failures, useful for logging or debugging purposes.

This class is Falsey

Attributes:

Name Type Description
error str

A string containing the error message describing the nature of the failure.

traceback str

A string containing the traceback or detailed information about where and why the failure occurred.

Source code in plantimager/commons/RPC.py
85
86
87
def __init__(self, error: str, traceback: str):
    self.error = error
    self.traceback = traceback

RPCClient Link

RPCClient(context, url, timeout=1000)

Abstract base class for RPC clients.

This class sets up a ZeroMQ REQ socket to communicate with an RPCServer. To use this, create a subclass that inherits from both a target interface and this class, and decorate it with @RPCClient.register_interface.

Parameters:

Name Type Description Default
context Context

The ZeroMQ context used for networking.

required
url str

The URL of the target RPC server to connect to.

required

Attributes:

Name Type Description
own_address str

The local IP address of the client.

peer_address str

The IP address of the connected RPC server.

name str

The designated name of the connected RPC server.

Source code in plantimager/commons/RPC.py
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
def __init__(self, context: zmq.Context, url: str, timeout=1000):
    super().__init__()
    self.context: zmq.Context = context
    self.url: str = url
    self.socket: zmq.Socket = context.socket(zmq.REQ)
    self.socket.connect(self.url)

    # replacing class attribute signals with instance signals
    for base in self.__class__.__bases__:
        for key, value in base.__dict__.items():
            if isinstance(value, RPCSignal):
                setattr(self, key, copy.deepcopy(value))

    # Getting RPCServer inventory
    logger.debug("--> Getting Inventory")
    self.socket.send_json({
        "event": RPCEvents.GET_INVENTORY
    })
    if self.socket.poll(timeout=timeout, flags=zmq.POLLIN) == 0:
        logger.error("Failed to get inventory. Server did not respond.")
        raise TimeoutError("Failed to get inventory.")
    reply = self.socket.recv_json()
    logger.debug(f"Got inv: {reply}",)
    self._json_methods: dict[str, int|None] = reply["json_methods"]
    self._buffer_methods: dict[str, int|None] = reply["buffer_methods"]
    self._signals = {sig: getattr(self, sig) for sig in reply["signals"] if hasattr(self, sig)}
    self._properties: list = reply["properties"]
    self.name: str = reply["name"]
    logger.debug("<-- Inventory Received")

    # If signals initiate
    self._signal_receiver = None
    if self._signals:
        logger.info("Initializing signal handling")
        logger.debug("--> Initializing Signals Handling")
        self.socket.send_json({"event": RPCEvents.INIT_SIGNALS_HANDLING})

        if self.socket.poll(timeout=timeout, flags=zmq.POLLIN) == 0:
            logger.error("Failed to initialize signals. Server did not respond.")
            raise TimeoutError("Failed to initialize signals.")
        reply = self.socket.recv_json()
        if not reply["success"]:
            raise RuntimeError("INIT_SIGNALS_HANDLING failed")

        signal_port = reply["signal_port"]
        proto, addr, _ = url_parser.search(self.url).groups()
        self._signal_receiver = RPCSignalReceiver(
            context=self.context,
            url=f"{proto}://{addr}:{signal_port}",
            signals=self._signals
        )
        self._signal_receiver.daemon = True
        self._signal_receiver.start()
        logger.info("<-- Successfully initialized signal handling")

    def _finalizer(sock, receiver):
        sock.close()
        if receiver:
            receiver.stop()
            receiver.join(2)
        logger.debug("Client finalized")

    finalize(self, _finalizer, self.socket, self._signal_receiver)

execute Link

execute(method_name, params)

Execute a remote method on the RPC server.

Parameters:

Name Type Description Default
method_name str

The name of the method to execute.

required
params dict

A dictionary containing the args and kwargs for the method.

required

Returns:

Type Description
tuple

A 2-tuple (success, result). If success is True, result contains the return value of the method. If False, result is a tuple containing the (error_message, traceback).

Raises:

Type Description
TimeoutError

If the server does not respond within the configured timeout.

Source code in plantimager/commons/RPC.py
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
def execute(self, method_name: str, params: dict) -> tuple[bool, object]:
    """
    Execute a remote method on the RPC server.

    Parameters
    ----------
    method_name : str
        The name of the method to execute.
    params : dict
        A dictionary containing the `args` and `kwargs` for the method.

    Returns
    -------
    tuple
        A 2-tuple `(success, result)`. If `success` is True, `result`
        contains the return value of the method. If False, `result`
        is a tuple containing the `(error_message, traceback)`.

    Raises
    ------
    TimeoutError
        If the server does not respond within the configured timeout.
    """
    package = {
        "event": RPCEvents.METHOD_CALL,
        "method": method_name,
        "params": params,
    }
    if self.socket.poll(timeout=1000, flags=zmq.POLLOUT) == 0:
        logger.error(f"Proxy of {self._interface} at {self.url} did not respond")
        raise TimeoutError(f"Proxy of {self._interface} at {self.url} did not respond")
    logger.debug(f"Executing {package}")
    self.socket.send_json(package, flags=zmq.NOBLOCK)

    if method_name in self._json_methods:
        if self.socket.poll(timeout=self._json_methods[method_name], flags=zmq.POLLIN) == 0:
            logger.error(f"Proxy of {self._interface} at {self.url} did not respond")
            raise TimeoutError(f"Proxy of {self._interface} at {self.url} did not respond")
        reply =  self.socket.recv_json()
        if reply["success"]:
            return True, reply["result"]
        else:
            return False, (reply["error"], reply["traceback"])
    elif method_name in self._buffer_methods:
        if self.socket.poll(timeout=self._buffer_methods[method_name], flags=zmq.POLLIN) == 0:
            logger.error(f"Proxy of {self._interface} at {self.url} did not respond")
            raise TimeoutError(f"Proxy of {self._interface} at {self.url} did not respond")
        reply_frames: list[zmq.Frame] = self.socket.recv_multipart(copy=False)
        buffer_info = json.loads(reply_frames[0].bytes)
        if "error" in buffer_info:
            return False, (buffer_info["error"], buffer_info["traceback"])
        else:
            return True, (reply_frames[1].buffer, buffer_info)
    return False, (Warning(f"Unknown method {method_name}"), "")

register_interface classmethod Link

register_interface(interface)

Class decorator to bind an interface to an RPCClient subclass.

This decorator inspects the provided interface and dynamically proxies its methods and properties so that calls are forwarded over the network to the RPC server.

Parameters:

Name Type Description Default
interface type

The interface class defining the methods and properties to proxy.

required

Returns:

Type Description
callable

A class decorator that applies the proxy logic.

Raises:

Type Description
RuntimeError

If the decorated class does not inherit from both interface and RPCClient.

Source code in plantimager/commons/RPC.py
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
@classmethod
def register_interface(cls, interface: type):
    """
    Class decorator to bind an interface to an RPCClient subclass.

    This decorator inspects the provided `interface` and dynamically
    proxies its methods and properties so that calls are forwarded
    over the network to the RPC server.

    Parameters
    ----------
    interface : type
        The interface class defining the methods and properties to proxy.

    Returns
    -------
    callable
        A class decorator that applies the proxy logic.

    Raises
    ------
    RuntimeError
        If the decorated class does not inherit from both `interface`
        and `RPCClient`.
    """
    def _decorator(target_cls: type):
        if interface not in target_cls.__bases__ or cls not in target_cls.__bases__:
            raise RuntimeError(f"{target_cls} must inherit from {interface} and {cls}.")
        for key, val in interface.__dict__.items():
            if inspect.isfunction(val) and not (key.startswith("__") and key.endswith("__")):
                logger.debug(f"registering method {key} in {target_cls}")
                func = decorate(val, cls._method_proxy)
                func.__isabstractmethod__ = False  # Counts as en actual implementation
                setattr(target_cls, key, func)
            elif isinstance(val, RPCSignal):
                pass # nothing to do at this stage
            elif isinstance(val, RPCProperty):
                fget = wraps(val.fget)(partial(cls._property_getter_proxy, property_name=key))
                fset = wraps(val.fset)(partial(cls._property_setter_proxy, property_name=key))
                prop = RPCProperty(fget=fget, fset=fset, fdel=val.fdel, doc=val.__doc__, notify=val._notifier)
                setattr(target_cls, key, prop)
        # abstract methods have been implemented
        target_cls.__abstractmethods__ = frozenset()
        target_cls._interface = interface.__name__
        return target_cls
    return _decorator

stop_server Link

stop_server()

Request the connected RPC server to shut down.

Sends a non-blocking STOP_SERVER event to the remote server. If the server responds, it logs the reply.

Source code in plantimager/commons/RPC.py
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
def stop_server(self):
    """
    Request the connected RPC server to shut down.

    Sends a non-blocking `STOP_SERVER` event to the remote server.
    If the server responds, it logs the reply.
    """
    logger.info(f"Stopping server {self.url}")
    if self.socket.poll(timeout=1000, flags=zmq.POLLOUT) == 0:
        logger.info(f"Server {self.url} could not be joined (might already be dead)")
        return
    self.socket.send_json({
        "event": RPCEvents.STOP_SERVER
    }, flags=zmq.NOBLOCK)
    if self.socket.poll(timeout=1000, flags=zmq.POLLIN) == 0:
        logger.info(f"Server {self.url} did not respond (might already be dead)")
        return
    reply = self.socket.recv_json()
    logger.debug(f"Got stop reply {reply}")

RPCEvents Link

Bases: StrEnum

Enumeration of valid RPC event types used for communication between clients and servers.

RPCProperty Link

RPCProperty(fget=None, fset=None, fdel=None, doc=None, notify=None, auto_notify=False)

Bases: property

RPC-enabled property descriptor.

This subclass of :class:property adds optional notification support via an :class:RPCSignal. When a property created with RPCProperty is assigned a new value, the supplied notify signal (if provided) can be emitted to inform remote listeners of the change. The class is typically used as a decorator on a getter function; the optional notify argument is stored on the resulting property object and can be accessed by custom setter implementations. The setter must explicitly emit the ''notify'' signal if a change occurred.

Attributes:

Name Type Description
_notifier RPCSignal or None

Signal that will be emitted when the property's value changes. It is initialized from the notify argument of __init__ and may be None when no notification is required.

_auto_notify bool

When True, when the setter of the property is called and the value is modified, the _notifier is emitted.

Initialize the property with optional RPC notification.

This subclass of property adds support for emitting an RPC signal when the property value changes. If auto_notify is True the signal provided via notify is emitted automatically after a successful set operation; otherwise the caller must trigger the notification manually.

Parameters:

Name Type Description Default
fget callable

Getter function that receives the instance and returns the attribute value.

None
fset callable

Setter function that receives the instance and the value to assign.

None
fdel callable

Deleter function that receives the instance and removes the attribute.

None
doc str

Documentation string for the property.

None
notify RPCSignal

RPC signal emitted when the property value is changed.

None
auto_notify bool

When True, automatically emit notify after a successful set.

``False``
See Also

property Built‑in property class that this class extends.

Source code in plantimager/commons/RPC.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
def __init__(self, fget=None, fset=None, fdel=None, doc=None, notify: RPCSignal = None, auto_notify=False):
    """
    Initialize the property with optional RPC notification.

    This subclass of ``property`` adds support for emitting an RPC signal
    when the property value changes.  If ``auto_notify`` is ``True`` the
    signal provided via ``notify`` is emitted automatically after a successful
    set operation; otherwise the caller must trigger the notification
    manually.

    Parameters
    ----------
    fget : callable, optional
        Getter function that receives the instance and returns the attribute
        value.
    fset : callable, optional
        Setter function that receives the instance and the value to assign.
    fdel : callable, optional
        Deleter function that receives the instance and removes the attribute.
    doc : str, optional
        Documentation string for the property.
    notify : RPCSignal, optional
        RPC signal emitted when the property value is changed.
    auto_notify : bool, default: ``False``
        When ``True``, automatically emit ``notify`` after a successful set.

    See Also
    --------
    property
        Built‑in ``property`` class that this class extends.
    """
    super().__init__(fget=fget, fset=fset, fdel=fdel, doc=doc)
    self._notifier: RPCSignal | None = notify
    self._auto_notify: bool = auto_notify

__set__ Link

__set__(obj, value)

Set the property on obj and emit the _notifier signal if the observable value actually changes.

The logic is: 1. Retrieve the old value via the getter (if any). 2. Call the original setter. 3. Retrieve the new value via the getter. 4. If old != new and a notifier exists, emit the signal with the new value.

Source code in plantimager/commons/RPC.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
def __set__(self, obj, value):
    """
    Set the property on *obj* and emit the ``_notifier`` signal if the
    observable value actually changes.

    The logic is:
    1. Retrieve the old value via the getter (if any).
    2. Call the original setter.
    3. Retrieve the new value via the getter.
    4. If ``old != new`` and a notifier exists, emit the signal with the
       new value.
    """
    # Step 1 – capture the old value (may be None if no getter)
    old_val = None
    if self._auto_notify and self.fget is not None:
        try:
            old_val = self.fget(obj)
        except Exception:
            # If the getter fails we simply ignore the old value;
            # the change‑detection will fall back to always emit.
            old_val = object()  # unique sentinel

    # Step 2 – invoke the original setter (if any)
    super().__set__(obj, value)

    # Step 3 – capture the new value via the getter (if any)
    new_val = None
    if self._auto_notify and self.fget is not None:
        try:
            new_val = self.fget(obj)
        except Exception:
            new_val = object()  # unique sentinel

    # Step 4 – emit if changed and a notifier is present
    if self._auto_notify and self._notifier is not None and old_val != new_val:
        try:
            self._notifier.emit(new_val)
        except Exception as exc:
            # We never want a notification failure to break the setter.
            logger.error(f"Failed to emit RPCProperty change signal: {exc}")

RPCServer Link

RPCServer(context, url, alive_timeout=60)

RPCServer to use in combination with RPCClient. The server holds the concrete implementation of an interface that is made available on the network. To create an RPCServer create a class inheriting from an interface and RPCServer. Callable methods must be decorated using RPCServer.register_method_buffer or RPCServer.register_method_json. The server is also capable of sending signals defined by the RPCSignal class and proxying properties defined with RPCProperty.

Parameters:

Name Type Description Default
context Context

ZMQ context used to make the various sockets necessary for communicating with the client.

required
url str

Url where the RPCServer should listen. It should be of the form "tcp://" where ip is one of the local network interface ip which must be accessible from the client.

required
alive_timeout int

Time in seconds after which the device_registry will consider this service dead or unreachable.

60

Attributes:

Name Type Description
context Context

ZMQ context used to make the various sockets necessary for communicating with the client.

url str

Url where the RPCServer is opened. The url should include the port in the form tcp://<ip>:<port>.

name str

Name of the RPCServer as given by the deviceregistry once registered

uuid str

Unique identifier of this RPCServer given by the registry once registered..

peer_addr str

Ip address of the client once connected.

Source code in plantimager/commons/RPC.py
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
def __init__(self, context: zmq.Context, url: str, alive_timeout: int = 60):
    """
    RPCServer to use in combination with RPCClient.
    The server holds the concrete implementation of an interface that is made available on the network.
    To create an RPCServer create a class inheriting from an interface and RPCServer. Callable
    methods must be decorated using `RPCServer.register_method_buffer` or `RPCServer.register_method_json`.
    The server is also capable of sending signals defined by the RPCSignal class and proxying properties
    defined with RPCProperty.

    Parameters
    ----------
    context: zmq.Context
        ZMQ context used to make the various sockets necessary for communicating with the client.
    url: str
        Url where the RPCServer should listen. It should be of the form "tcp://<ip>" where ip is one
        of the local network interface ip which must be accessible from the client.
    alive_timeout: int, optional
        Time in seconds after which the device_registry will consider this service dead or unreachable.

    Attributes
    ----------
    context: zmq.Context
        ZMQ context used to make the various sockets necessary for communicating with the client.
    url: str
        Url where the RPCServer is opened. The url should include the port in the form
        ``tcp://<ip>:<port>``.
    name: str
        Name of the RPCServer as given by the deviceregistry once registered
    uuid: str
        Unique identifier of this RPCServer given by the registry once registered..
    peer_addr: str
        Ip address of the client once connected.

    """
    super().__init__()

    self._type = None
    self.context: zmq.Context = context
    self.url: str = url
    self.alive_timeout = alive_timeout
    self.uuid: str | None = None
    self.name = ""
    self.registry_addr = ""
    self.peer_addr: str | None = None
    self._unreachable_counter = 0

    # Containers for RPC members
    self._json_methods: dict[str, Callable] = {}
    self._buffer_methods: dict[str, Callable] = {}
    self._rpc_properties: dict[str, property] = {}
    self._signals: dict[str, RPCSignal] = {}

    # Initialize internals
    self._initialize_rpc_members()
    self._socket, self.port = self._bind_socket(url)
    self._signal_socket: zmq.Socket[zmq.REQ] | None = None
    self._stop = False

    self._setup_lifecycle_cleanup()
    self._dead = False

register_method_buffer staticmethod Link

register_method_buffer(timeout=10000)

Registers this method as remote callable procedure which will transmit its output as a buffer-like object as well as a buffer_info dictionary.

method may take any input which will be serialized via json and must output a 2-tuple: (memoryview or bytes, dict).

Parameters:

Name Type Description Default
timeout int | None

Specifies how much time in ms a client is expected to wait for the method to finish. If None, waits indefinitely.

10000

Returns:

Name Type Description
decorator Callable[Callable[..., tuple[memoryview | bytes, dict]], Callable[..., tuple[memoryview | bytes, dict]]]
Source code in plantimager/commons/RPC.py
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
@staticmethod
def register_method_buffer(timeout: int | None = 10000):
    """
    Registers this method as remote callable procedure which will transmit its output as a buffer-like object
    as well as a buffer_info dictionary.

    `method` may take any input which will be serialized via json and must output a 2-tuple:
    (memoryview or bytes, dict).


    Parameters
    ----------
    timeout: int | None
        Specifies how much time in ms a client is expected to wait for the method to finish. If None, waits indefinitely.

    Returns
    -------
    decorator: Callable[Callable[..., tuple[memoryview|bytes, dict]], Callable[..., tuple[memoryview|bytes, dict]]]

    """
    if inspect.isfunction(timeout):
        timeout._is_buffer_method = True
        timeout._timeout = 10000
        return timeout
    def _decorator(method: Callable[..., tuple[memoryview|bytes, dict]]):
        method._is_buffer_method = True
        method._timeout = timeout
        return method
    return _decorator

register_method_json staticmethod Link

register_method_json(timeout=10000)

Registers this method as remote callable procedure which will transmit its output via json. It is advised to only send basic types and containers as output (int, float, str, bool, list, tuple, dict, ...) Arguments are also serialized via json.

Parameters:

Name Type Description Default
timeout int | None

Specifies how much time in ms a client is expected to wait for the method to finish. If None, waits indefinitely.

10000

Returns:

Name Type Description
decorator Callable[Callable[..., Any], Callable[..., Any]]
Source code in plantimager/commons/RPC.py
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
@staticmethod
def register_method_json(timeout: int | None = 10000):
    """
    Registers this method as remote callable procedure which will transmit its output via json.
    It is advised to only send basic types and containers as output (int, float, str, bool, list, tuple, dict, ...)
    Arguments are also serialized via json.

    Parameters
    ----------
    timeout: int | None
        Specifies how much time in ms a client is expected to wait for the method to finish. If None, waits indefinitely.

    Returns
    -------
    decorator: Callable[Callable[..., Any], Callable[..., Any]]

    """
    if inspect.isfunction(timeout):
        timeout._is_json_method = True
        timeout._timeout = 10000
        return timeout
    def _decorator(method: Callable[..., Any]):
        method._is_json_method = True
        method._timeout = timeout
        return method
    return _decorator

register_to_registry Link

register_to_registry(type_, name, registry_url, overwrite=True)

Register this RPCServer to the registry at registry_address as a device of type type_ and name name.

Note: The name not be accepted as is and may be modified by the registry to avoid duplicate. This method returns the accepted name of this device.

Parameters:

Name Type Description Default
type_ str

Name of the device type.

required
name str

Proposed name of the device.

required
registry_url str

Url of the device registry. Must have the form "tcp://:" if the registry uses tcp

required
overwrite

Wether or not this device should take preference for the use of this name and overwrite other conflicting devices.

True

Returns:

Name Type Description
accepted_name str

Name of this device as accepted by the registry.

Source code in plantimager/commons/RPC.py
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
def register_to_registry(self, type_: str, name: str, registry_url: str, overwrite=True) -> str:
    """
    Register this RPCServer to the registry at `registry_address` as a device of type `type_` and name `name`.

    Note: The name not be accepted as is and may be modified by the registry to avoid duplicate. This method
    returns the accepted name of this device.

    Parameters
    ----------
    type_: str
        Name of the device type.
    name: str
        Proposed name of the device.
    registry_url: str
        Url of the device registry. Must have the form "tcp://<ip>:<port>" if the registry uses tcp
    overwrite: bool, optional
        Wether or not this device should take preference for the use of this name and overwrite other
        conflicting devices.

    Returns
    -------
    accepted_name: str
        Name of this device as accepted by the registry.

    """
    logger.debug(f"Register device {name} of type {type_} to {registry_url}")
    self._type = type_
    self.name, self.uuid = register_device(
        self.context, type_,
        f"{self.url}:{self.port}",
        name, registry_url,
        overwrite=overwrite,
    )
    if not self.name:
        logger.warning(f"Failed to register device {name} of type {type_} as {registry_url}")
    self.registry_addr = registry_url if self.name else ""

    # Update cleanup state so finalizer knows what to unregister
    self._cleanup_state["uuid"] = self.uuid
    self._cleanup_state["registry_addr"] = self.registry_addr

    logger.info(f"Successfully registered device {name} of type {type_} to {registry_url}")

    return self.name

serve_forever Link

serve_forever()

Serve the RPC server indefinitely, handling incoming requests until a stop signal is received.

The method enters a loop that repeatedly notifies the watchdog, checks the server's liveness, waits for a request, and dispatches the request based on its event field. Supported events include peer discovery, inventory retrieval, signal handling initialization, method invocation, property access, and server shutdown. Upon receiving a STOP_SERVER event the loop terminates, sockets are closed, and a log entry is written.

If registered to the registry and the check_alive fails, exits the loop

Returns:

Type Description
None

The server runs until it is stopped; no value is returned.

Notes
  • The private attribute _stop controls the loop termination. It is set to True only when a STOP_SERVER request is processed.
  • _socket and _signal_socket are closed during cleanup; if either attribute is None the corresponding close call is skipped.
  • notify_watchdog and _alive_check are called on every iteration to maintain server health monitoring.
  • The method assumes that _wait_for_request returns a mapping with an event key; a falsy return value causes the loop to continue without processing.
See Also

RPCEvents : Enum defining the possible request events. _handle_find_peer_address, _handle_get_inventory, _handle_init_signal_handling, _handle_method_call, _handle_property_get, _handle_property_set : Private helper methods that implement the handling logic for each event type.

Source code in plantimager/commons/RPC.py
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
def serve_forever(self):
    """
    Serve the RPC server indefinitely, handling incoming requests until a stop signal is
    received.

    The method enters a loop that repeatedly notifies the watchdog, checks the server's
    liveness, waits for a request, and dispatches the request based on its ``event`` field.
    Supported events include peer discovery, inventory retrieval, signal handling
    initialization, method invocation, property access, and server shutdown.  Upon receiving
    a ``STOP_SERVER`` event the loop terminates, sockets are closed, and a log entry is
    written.

    If registered to the registry and the check_alive fails, exits the loop

    Returns
    -------
    None
        The server runs until it is stopped; no value is returned.

    Notes
    -----
    * The private attribute ``_stop`` controls the loop termination.  It is set to
      ``True`` only when a ``STOP_SERVER`` request is processed.
    * ``_socket`` and ``_signal_socket`` are closed during cleanup; if either attribute is
      ``None`` the corresponding ``close`` call is skipped.
    * ``notify_watchdog`` and ``_alive_check`` are called on every iteration to maintain
      server health monitoring.
    * The method assumes that ``_wait_for_request`` returns a mapping with an ``event``
      key; a falsy return value causes the loop to continue without processing.

    See Also
    --------
    RPCEvents : Enum defining the possible request events.
    _handle_find_peer_address, _handle_get_inventory, _handle_init_signal_handling,
    _handle_method_call, _handle_property_get, _handle_property_set :
        Private helper methods that implement the handling logic for each event type.
    """
    if self._dead:
        logger.error("RPCServer is in dead state. A new instance must be created.")
        raise RuntimeError("RPCServer is in dead state. A new instance must be created.")
    self._stop = False
    while not self._stop:
        notify_watchdog()
        if not self._alive_check():
            break
        request = self._wait_for_request()
        if not request:
            continue
        try:
            logger.debug(f"Received request: {request['event']}")
            t0 = time.monotonic()
            match request["event"]:
                case RPCEvents.GET_INVENTORY:
                    reply = self._handle_get_inventory()
                    self._send_reply(reply)
                case RPCEvents.INIT_SIGNALS_HANDLING:
                    reply = self._handle_init_signal_handling(request)
                    self._send_reply(reply)
                case RPCEvents.METHOD_CALL:
                    reply, use_multipart = self._handle_method_call(request)
                    self._send_reply(reply, use_multipart)
                case RPCEvents.PROPERTY_GET:
                    reply = self._handle_property_get(request)
                    self._send_reply(reply)
                case RPCEvents.PROPERTY_SET:
                    reply = self._handle_property_set(request)
                    self._send_reply(reply)
                case RPCEvents.STOP_SERVER:
                    self._socket.send_json({"success": True})
                    self._stop = True
            logger.debug(f"Event {request['event']} treated in {time.monotonic() - t0}s")
        except Exception as exc:
            logger.error(f"Unexpected error while handling request: {exc}")
            # If we have a request, we can at least try to answer with a generic error.
            if isinstance(request, dict) and "event" in request:
                self._send_reply(self._make_error_reply(exc, "serve_forever"))
            break   # abort the loop – we are in an inconsistent state

    # cleanup
    if self._socket:
        self._socket.close()
    if self._signal_socket:
        self._signal_socket.close()
    self._dead = True
    logger.info("Server stopped")

stop_server Link

stop_server()

Stop the server and unregister the device if it has been registered.

The method sets the internal stop flag, optionally calls :func:unregister_device to remove the device from a remote registry, clears the identifying attributes (name, uuid, registry_addr), and resets the corresponding entries in _cleanup_state to None. This prevents a second unregister attempt during cleanup.

Raises:

Type Description
Exception

Propagates any exception raised by :func:unregister_device, e.g. network errors or authentication failures.

Notes
  • The device is only unregistered when all three attributes self.name, self.registry_addr and self.uuid evaluate to True. If any of them is falsy, the function simply sets the stop flag and returns.
  • self._cleanup_state is updated after a successful unregister to avoid duplicate cleanup actions later in the object's lifecycle.
See Also

unregister_device : Function that removes a device from the registry.

Source code in plantimager/commons/RPC.py
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
def stop_server(self):
    """
    Stop the server and unregister the device if it has been registered.

    The method sets the internal stop flag, optionally calls
    :func:`unregister_device` to remove the device from a remote registry,
    clears the identifying attributes (``name``, ``uuid``, ``registry_addr``),
    and resets the corresponding entries in ``_cleanup_state`` to ``None``.
    This prevents a second unregister attempt during cleanup.

    Raises
    ------
    Exception
        Propagates any exception raised by :func:`unregister_device`, e.g.
        network errors or authentication failures.

    Notes
    -----
    * The device is only unregistered when all three attributes
      ``self.name``, ``self.registry_addr`` and ``self.uuid`` evaluate to
      ``True``.  If any of them is falsy, the function simply sets the
      stop flag and returns.
    * ``self._cleanup_state`` is updated after a successful unregister to
      avoid duplicate cleanup actions later in the object's lifecycle.

    See Also
    --------
    unregister_device : Function that removes a device from the registry.
    """
    self._stop = True
    if self.name and self.registry_addr and self.uuid:
        unregister_device(self.context, self.uuid, self.registry_addr)
        self.name = ""
        self.uuid = ""
        self.registry_addr = ""

        # Clear cleanup state to avoid double unregister
        self._cleanup_state["uuid"] = None
        self._cleanup_state["registry_addr"] = None

        logger.info("Device unregistered successfully")

RPCSignal Link

RPCSignal(*arg_types)
RPCSignal

Lightweight publish‑subscribe signal implementation that stores either strong or weak references to callables and invokes them with supplied arguments.

Instances are created with a variable number of type specifications that describe the expected arguments for the signal. These specifications are stored unchanged in the :attr:args attribute; they are not validated at runtime but may be used by callers for documentation or static checking.

Connections are added via :meth:connect and removed via :meth:disconnect. When the signal is emitted with :meth:emit, each stored connection is called in the order it was added. Weak references that have been garbage‑collected are silently ignored.

Parameters:

Name Type Description Default
*arg_types tuple

Positional argument type specifications supplied at construction time. The contents are opaque to the implementation – they are kept only for external introspection.

()

Attributes:

Name Type Description
arg_types tuple

The positional argument type specifications supplied to __init__. Callers may inspect this attribute for documentation or validation purposes.

connections list

Mutable list of connected callables or weak references. Each entry is either a callable object or a :class:weakref.WeakMethod. The list is modified by :meth:connect and :meth:disconnect. Callables are invoked in the order they were added when :meth:emit is called.

See Also

weakref.WeakMethod : Standard library class used to store weak references to bound methods.

Initialize a new RPCSignal instance.

Parameters:

Name Type Description Default
*arg_types tuple

Positional argument type specifications. The values are stored in :attr:args but are otherwise not interpreted by the signal.

()
Notes

arg_types can be any hashable objects (e.g., int, str, numpy.ndarray) that the user wishes to document as the expected argument types for the signal.

Source code in plantimager/commons/RPC.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def __init__(self, *arg_types):
    """
    Initialize a new ``RPCSignal`` instance.

    Parameters
    ----------
    *arg_types : tuple
        Positional argument type specifications.  The values are stored in
        :attr:`args` but are otherwise not interpreted by the signal.

    Notes
    -----
    ``arg_types`` can be any hashable objects (e.g., ``int``, ``str``,
    ``numpy.ndarray``) that the user wishes to document as the expected
    argument types for the signal.
    """
    self.arg_types = arg_types
    self.connections = []
    self.args = arg_types
    self.connections: list[Callable | WeakMethod] = []

connect Link

connect(conn)

Connect a callable (or weak reference) to the signal.

Parameters:

Name Type Description Default
conn Callable or WeakMethod

The function, bound method, or weak reference that should be called when the signal is emitted.

required

Raises:

Type Description
TypeError

If conn is neither a callable nor a :class:weakref.WeakMethod.

Notes

The same conn is added only once; duplicate connections are ignored.

Source code in plantimager/commons/RPC.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def connect(self, conn: Callable | WeakMethod):
    """Connect a callable (or weak reference) to the signal.

    Parameters
    ----------
    conn : Callable or weakref.WeakMethod
        The function, bound method, or weak reference that should be called
        when the signal is emitted.

    Raises
    ------
    TypeError
        If ``conn`` is neither a callable nor a :class:`weakref.WeakMethod`.

    Notes
    -----
    The same ``conn`` is added only once; duplicate connections are ignored.
    """
    if not isinstance(conn, (weakref.WeakMethod, Callable)):
        raise TypeError("Expected callable or weakref.WeakMethod, got {}".format(type(conn)))
    if conn not in self.connections:
        self.connections.append(conn)

disconnect Link

disconnect(conn=None)

Remove a previously connected callable or clear all connections.

Parameters:

Name Type Description Default
conn Callable

The specific callable or weak reference to remove. If omitted (or None), all connections are cleared.

None

Raises:

Type Description
ValueError

If conn is provided but is not present in :attr:connections.

Notes

After a successful call, the target is no longer invoked by future :meth:emit calls.

Source code in plantimager/commons/RPC.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def disconnect(self, conn: Callable=None):
    """Remove a previously connected callable or clear all connections.

    Parameters
    ----------
    conn : Callable, optional
        The specific callable or weak reference to remove.  If omitted (or
        ``None``), *all* connections are cleared.

    Raises
    ------
    ValueError
        If ``conn`` is provided but is not present in :attr:`connections`.

    Notes
    -----
    After a successful call, the target is no longer invoked by future
    :meth:`emit` calls.
    """
    if conn:
        self.connections.remove(conn)
    else:
        self.connections.clear()

emit Link

emit(*args)

Emit the signal, invoking all connected callables.

Parameters:

Name Type Description Default
*args tuple

Positional arguments that will be forwarded to each connected callable. The number and type of arguments should match the specifications stored in :attr:args, but this is not enforced.

()
Notes
  • Weak references stored as :class:weakref.WeakMethod are dereferenced before the call; if the underlying object has been garbage‑collected, the entry is simply skipped.
  • Any exception raised by a connected callable propagates to the caller of :meth:emit. The method does not catch or suppress errors.
Source code in plantimager/commons/RPC.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def emit(self, *args):
    """Emit the signal, invoking all connected callables.

    Parameters
    ----------
    *args : tuple
        Positional arguments that will be forwarded to each connected
        callable.  The number and type of arguments should match the
        specifications stored in :attr:`args`, but this is not enforced.

    Notes
    -----
    - Weak references stored as :class:`weakref.WeakMethod` are dereferenced
      before the call; if the underlying object has been garbage‑collected,
      the entry is simply skipped.
    - Any exception raised by a connected callable propagates to the caller
      of :meth:`emit`.  The method does **not** catch or suppress errors.
    """
    args = self.validate_args(*args, coerce=True)
    for conn in self.connections:
        if isinstance(conn, WeakMethod) and (func:=conn()):
            # If the onnection is a WeakMethod and the method still lives
            func(*args)
        elif not isinstance(conn, weakref.WeakMethod):
            # If this is not a weak method (ergo an actual method)
            conn(*args)

validate_args Link

validate_args(*args, coerce=False)

Validates and coerces input arguments based on expected types.

This method validates the provided args against the expected types specified in self.arg_types. If coerce is set to True, it attempts to coerce the arguments to the expected types when a mismatch occurs. If validation or coercion fails, the method raises appropriate exceptions.

Parameters:

Name Type Description Default
*args

Positional arguments to be validated against self.arg_types. The number of arguments must match the number of expected types.

()
coerce bool

If True, attempts to coerce arguments to the expected type in case of a mismatch. Defaults to False.

False

Returns:

Type Description
tuple

A tuple of validated (or coerced) arguments in the same order as the input.

Raises:

Type Description
RuntimeError

Raised if the number of provided arguments does not match the number of expected types in self.arg_types.

TypeError

Raised if an argument type mismatch occurs and coerce is False, or if coercion fails when coerce is True.

See Also

coerce_to_generic : Function used to coerce arguments to generic types. is_instance_of_generic : Function to check whether an argument matches an expected type.

Source code in plantimager/commons/RPC.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def validate_args(self, *args, coerce=False) -> tuple:
    """
    Validates and coerces input arguments based on expected types.

    This method validates the provided `args` against the expected types specified
    in `self.arg_types`. If `coerce` is set to `True`, it attempts to coerce the
    arguments to the expected types when a mismatch occurs. If validation or coercion
    fails, the method raises appropriate exceptions.

    Parameters
    ----------
    *args
        Positional arguments to be validated against `self.arg_types`. The number
        of arguments must match the number of expected types.
    coerce : bool, optional
        If `True`, attempts to coerce arguments to the expected type in case of a
        mismatch. Defaults to `False`.

    Returns
    -------
    tuple
        A tuple of validated (or coerced) arguments in the same order as the input.

    Raises
    ------
    RuntimeError
        Raised if the number of provided arguments does not match the number of
        expected types in `self.arg_types`.
    TypeError
        Raised if an argument type mismatch occurs and `coerce` is `False`, or if
        coercion fails when `coerce` is `True`.

    See Also
    --------
    coerce_to_generic : Function used to coerce arguments to generic types.
    is_instance_of_generic : Function to check whether an argument matches an expected type.
    """
    if len(args) != len(self.arg_types):
        logger.error(f"Expected {len(self.arg_types)} arguments, got {len(args)}.")
        raise RuntimeError(f"Expected {len(self.arg_types)} arguments, got {len(args)}.")
    new_args = []
    # Validates and coerces each argument; errors on type mismatch
    for i, (arg, arg_type) in enumerate(zip(args, self.arg_types)):
        if coerce:
            new_args.append(coerce_to_generic(arg, arg_type))
        elif is_instance_of_generic(arg, arg_type):
            new_args.append(arg)
        else:
            logger.error(f"Argument {i} of type {type(arg)} is not an instance of {arg_type}.")
            raise TypeError(f"Argument {i} of type {type(arg)} is not an instance of {arg_type}.")
    return tuple(new_args)

RPCSignalReceiver Link

RPCSignalReceiver(context, url, signals)

Bases: Thread

Background thread that listens for and dispatches RPC signals from the server.

This thread binds a ZeroMQ REP socket to a random port and continuously polls for incoming signal events. When a signal is received, it emits the corresponding proxy signal on the client side.

Parameters:

Name Type Description Default
context Context

The ZeroMQ context used to create the socket.

required
url str

The base URL to bind the receiver socket (e.g., "tcp://127.0.0.1").

required
signals dict of str to RPCSignal

A dictionary mapping signal names to their corresponding RPCSignal instances on the client.

required

Attributes:

Name Type Description
port int

The randomly selected port number this receiver is bound to.

Source code in plantimager/commons/RPC.py
407
408
409
410
411
412
413
414
415
def __init__(self, context: zmq.Context, url: str, signals: dict[str, RPCSignal]):
    super().__init__(name="RPCSignalReceiver")
    self.context = context
    self.url = url  # now the server's PUB endpoint
    self._stop_flag = False
    self.signals = signals
    self.socket: zmq.Socket = context.socket(zmq.SUB)
    self.socket.connect(url)
    self.socket.setsockopt_string(zmq.SUBSCRIBE, "")  # subscribe to all

run Link

run()

Continuously processes incoming socket requests to emit signals.

This method runs an event loop that listens for incoming JSON requests via a ZeroMQ socket. Requests are expected to contain an event key and associated data such as signals and arguments. If a request matches the expected event type (RPCEvents.EMIT_SIGNAL), a corresponding signal is emitted. The loop runs until the _stop_flag is set to True.

Notes
  • If the "blocking" flag in the request is True, the signal emission is performed before sending back a success response. Otherwise, the success response is sent immediately, and the signal emission happens afterward.

Raises:

Type Description
KeyError

If the incoming request JSON does not contain the required keys.

ZMQError

If there is an issue receiving or sending data via the ZeroMQ socket.

RuntimeError

If an error occurs while emitting a signal.

Source code in plantimager/commons/RPC.py
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
def run(self):
    """
    Continuously processes incoming socket requests to emit signals.

    This method runs an event loop that listens for incoming JSON requests via
    a ZeroMQ socket. Requests are expected to contain an event key and
    associated data such as signals and arguments. If a request matches the
    expected event type (`RPCEvents.EMIT_SIGNAL`), a corresponding signal
    is emitted. The loop runs until the `_stop_flag` is set to `True`.

    Notes
    -----
    - If the `"blocking"` flag in the request is `True`, the signal emission is
      performed before sending back a success response. Otherwise, the success
      response is sent immediately, and the signal emission happens afterward.

    Raises
    ------
    KeyError
        If the incoming request JSON does not contain the required keys.
    zmq.ZMQError
        If there is an issue receiving or sending data via the ZeroMQ socket.
    RuntimeError
        If an error occurs while emitting a signal.

    """
    try:
        while not self._stop_flag:
            if self.socket.poll(100, zmq.POLLIN) == 0:
                continue
            request = self.socket.recv_json()
            if request["event"] != RPCEvents.EMIT_SIGNAL:
                logger.error(f"Expected event {RPCEvents.EMIT_SIGNAL}, got {request['event']} instead.")
                continue
            signal = request["signal"]
            args = request["args"]
            logger.debug(f"Emitting signal {signal} with args {args}")
            self.signals[signal].emit(*args)
    finally:
        self.socket.close()
        del self.socket
    logger.debug(f"Stopping signal receiver {self}")

stop Link

stop()

Signal the receiver thread to stop polling and shut down.

Source code in plantimager/commons/RPC.py
460
461
462
463
464
def stop(self):
    """
    Signal the receiver thread to stop polling and shut down.
    """
    self._stop_flag = True