Skip to content

RPC

plantimager.commons.RPC Link

Remote Procedure Call (RPC) Framework.

This module provides a lightweight RPC framework built on top of ZeroMQ. It supports method execution (both JSON-serializable and binary buffers), property proxying, and a publish-subscribe signaling mechanism.

Classes:

Name Description
NoResult

Represents an operation failure with error details.

RPCSignal

Lightweight publish-subscribe signal implementation.

RPCProperty

RPC-enabled property descriptor for proxying attribute access.

RPCClient

Abstract client class for connecting to and interacting with an RPC server.

RPCServer

Server class that exposes methods, properties, and signals over the network.

NoResult Link

NoResult(error, traceback)

Represents an outcome with no result, typically used to indicate an operation failure along with associated error details.

This class encapsulates the error message and traceback information to provide a structured representation of operation failures, useful for logging or debugging purposes.

This class is Falsey

Attributes:

Name Type Description
error str

A string containing the error message describing the nature of the failure.

traceback str

A string containing the traceback or detailed information about where and why the failure occurred.

Source code in plantimager/commons/RPC.py
96
97
98
def __init__(self, error: str, traceback: str):
    self.error = error
    self.traceback = traceback

RPCClient Link

RPCClient(context, url, timeout=1000)

Abstract base class for RPC clients.

This class sets up a ZeroMQ REQ socket to communicate with an RPCServer. To use this, create a subclass that inherits from both a target interface and this class, and decorate it with @RPCClient.register_interface.

Attributes:

Name Type Description
context Context

The ZeroMQ context used for creating sockets and managing connections.

url str

The full URL (e.g., tcp://127.0.0.1:5555) of the target RPC server.

socket Socket

A REQ socket connected to url used for all RPC communication.

_json_methods dict[str, int | None]

Mapping of JSON‑serialisable remote method names to their timeout values (in milliseconds). Populated from the server inventory.

_buffer_methods dict[str, int | None]

Mapping of buffer‑based remote method names to their timeout values. Populated from the server inventory.

_signals dict[str, RPCSignal]

Signals discovered from the server inventory; keys are signal names and values are the corresponding RPCSignal instances bound to this client.

_properties list

List of property names exposed by the server.

name str

The designated name of the connected RPC server (provided by the server during inventory exchange).

own_address str

The local IP address of the client (retrieved from the server inventory).

peer_address str

The IP address of the connected RPC server (retrieved from the server inventory).

_signal_receiver RPCSignalReceiver | None

Background thread that receives and dispatches signals from the server. None if the server does not expose any signals.

Initialize a new RPCClient.

Parameters:

Name Type Description Default
context Context

The ZeroMQ context used for networking.

required
url str

The URL of the target RPC server to connect to.

required
Source code in plantimager/commons/RPC.py
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
def __init__(self, context: zmq.Context, url: str, timeout=1000):
    """
    Initialize a new RPCClient.

    Parameters
    ----------
    context : zmq.Context
        The ZeroMQ context used for networking.
    url : str
        The URL of the target RPC server to connect to.
    """
    super().__init__()
    self.context: zmq.Context = context
    self.url: str = url
    self.socket: zmq.Socket = context.socket(zmq.REQ)
    self.socket.connect(self.url)

    # replacing class attribute signals with instance signals
    for base in self.__class__.__bases__:
        for key, value in base.__dict__.items():
            if isinstance(value, RPCSignal):
                setattr(self, key, copy.deepcopy(value))

    # Getting RPCServer inventory
    logger.debug("--> Getting Inventory")
    self.socket.send_json({
        "event": RPCEvents.GET_INVENTORY
    })
    if self.socket.poll(timeout=timeout, flags=zmq.POLLIN) == 0:
        logger.error("Failed to get inventory. Server did not respond.")
        raise TimeoutError("Failed to get inventory.")
    reply = self.socket.recv_json()
    logger.debug(f"Got inv: {reply}", )
    self._json_methods: dict[str, int | None] = reply["json_methods"]
    self._buffer_methods: dict[str, int | None] = reply["buffer_methods"]
    self._signals = {sig: getattr(self, sig) for sig in reply["signals"] if hasattr(self, sig)}
    self._properties: list = reply["properties"]
    self.name: str = reply["name"]
    logger.debug("<-- Inventory Received")

    # If signals initiate
    self._signal_receiver = None
    if self._signals:
        logger.info("Initializing signal handling")
        logger.debug("--> Initializing Signals Handling")
        self.socket.send_json({"event": RPCEvents.INIT_SIGNALS_HANDLING})

        if self.socket.poll(timeout=timeout, flags=zmq.POLLIN) == 0:
            logger.error("Failed to initialize signals. Server did not respond.")
            raise TimeoutError("Failed to initialize signals.")
        reply = self.socket.recv_json()
        if not reply["success"]:
            raise RuntimeError("INIT_SIGNALS_HANDLING failed")

        signal_port = reply["signal_port"]
        proto, addr, _ = url_parser.search(self.url).groups()
        self._signal_receiver = RPCSignalReceiver(
            context=self.context,
            url=f"{proto}://{addr}:{signal_port}",
            signals=self._signals
        )
        self._signal_receiver.daemon = True
        self._signal_receiver.start()
        logger.info("<-- Successfully initialized signal handling")

    def _finalizer(sock, receiver):
        sock.close()
        if receiver:
            receiver.stop()
            receiver.join(2)
        if os.getenv("PI_LOG_FINALIZE") : logger.info("RPCClient finalized")

    finalize(self, _finalizer, self.socket, self._signal_receiver)

execute Link

execute(method_name, params)

Execute a remote method on the RPC server.

Parameters:

Name Type Description Default
method_name str

The name of the method to execute.

required
params dict

A dictionary containing the args and kwargs for the method.

required

Returns:

Type Description
tuple

A 2-tuple (success, result). If success is True, result contains the return value of the method. If False, result is a tuple containing the (error_message, traceback).

Raises:

Type Description
TimeoutError

If the server does not respond within the configured timeout.

Source code in plantimager/commons/RPC.py
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
def execute(self, method_name: str, params: dict) -> tuple[bool, object]:
    """
    Execute a remote method on the RPC server.

    Parameters
    ----------
    method_name : str
        The name of the method to execute.
    params : dict
        A dictionary containing the `args` and `kwargs` for the method.

    Returns
    -------
    tuple
        A 2-tuple `(success, result)`. If `success` is True, `result`
        contains the return value of the method. If False, `result`
        is a tuple containing the `(error_message, traceback)`.

    Raises
    ------
    TimeoutError
        If the server does not respond within the configured timeout.
    """
    package = {
        "event": RPCEvents.METHOD_CALL,
        "method": method_name,
        "params": params,
    }
    if self.socket.poll(timeout=1000, flags=zmq.POLLOUT) == 0:
        logger.error(f"Proxy of {self._interface} at {self.url} did not respond")
        raise TimeoutError(f"Proxy of {self._interface} at {self.url} did not respond")
    logger.debug(f"Executing {package}")
    self.socket.send_json(package, flags=zmq.NOBLOCK)

    if method_name in self._json_methods:
        if self.socket.poll(timeout=self._json_methods[method_name], flags=zmq.POLLIN) == 0:
            logger.error(f"Proxy of {self._interface} at {self.url} did not respond")
            raise TimeoutError(f"Proxy of {self._interface} at {self.url} did not respond")
        reply = self.socket.recv_json()
        if reply["success"]:
            return True, reply["result"]
        else:
            return False, (reply["error"], reply["traceback"])
    elif method_name in self._buffer_methods:
        if self.socket.poll(timeout=self._buffer_methods[method_name], flags=zmq.POLLIN) == 0:
            logger.error(f"Proxy of {self._interface} at {self.url} did not respond")
            raise TimeoutError(f"Proxy of {self._interface} at {self.url} did not respond")
        reply_frames: list[zmq.Frame] = self.socket.recv_multipart(copy=False)
        buffer_info = json.loads(reply_frames[0].bytes)
        if "error" in buffer_info:
            return False, (buffer_info["error"], buffer_info["traceback"])
        else:
            return True, (reply_frames[1].buffer, buffer_info)
    return False, (Warning(f"Unknown method {method_name}"), "")

register_interface classmethod Link

register_interface(interface)

Class decorator to bind an interface to an RPCClient subclass.

This decorator inspects the provided interface and dynamically proxies its methods and properties so that calls are forwarded over the network to the RPC server.

Parameters:

Name Type Description Default
interface type

The interface class defining the methods and properties to proxy.

required

Returns:

Type Description
callable

A class decorator that applies the proxy logic.

Raises:

Type Description
RuntimeError

If the decorated class does not inherit from both interface and RPCClient.

Source code in plantimager/commons/RPC.py
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
@classmethod
def register_interface(cls, interface: type):
    """
    Class decorator to bind an interface to an RPCClient subclass.

    This decorator inspects the provided `interface` and dynamically
    proxies its methods and properties so that calls are forwarded
    over the network to the RPC server.

    Parameters
    ----------
    interface : type
        The interface class defining the methods and properties to proxy.

    Returns
    -------
    callable
        A class decorator that applies the proxy logic.

    Raises
    ------
    RuntimeError
        If the decorated class does not inherit from both `interface`
        and `RPCClient`.
    """

    def _decorator(target_cls: type):
        if interface not in target_cls.__bases__ or cls not in target_cls.__bases__:
            raise RuntimeError(f"{target_cls} must inherit from {interface} and {cls}.")
        for key, val in interface.__dict__.items():
            if inspect.isfunction(val) and not (key.startswith("__") and key.endswith("__")):
                logger.debug(f"registering method {key} in {target_cls}")
                func = decorate(val, cls._method_proxy)
                func.__isabstractmethod__ = False  # Counts as en actual implementation
                setattr(target_cls, key, func)
            elif isinstance(val, RPCSignal):
                pass  # nothing to do at this stage
            elif isinstance(val, RPCProperty):
                fget = wraps(val.fget)(partial(cls._property_getter_proxy, property_name=key))
                fset = wraps(val.fset)(partial(cls._property_setter_proxy, property_name=key))
                prop = RPCProperty(fget=fget, fset=fset, fdel=val.fdel, doc=val.__doc__, notify=val._notifier)
                setattr(target_cls, key, prop)
        # abstract methods have been implemented
        target_cls.__abstractmethods__ = frozenset()
        target_cls._interface = interface.__name__
        return target_cls

    return _decorator

stop_server Link

stop_server()

Request the connected RPC server to shut down.

Sends a non-blocking STOP_SERVER event to the remote server. If the server responds, it logs the reply.

Source code in plantimager/commons/RPC.py
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
def stop_server(self):
    """
    Request the connected RPC server to shut down.

    Sends a non-blocking `STOP_SERVER` event to the remote server.
    If the server responds, it logs the reply.
    """
    logger.info(f"Stopping server {self.url}")
    if self.socket.poll(timeout=1000, flags=zmq.POLLOUT) == 0:
        logger.info(f"Server {self.url} could not be joined (might already be dead)")
        return
    self.socket.send_json({
        "event": RPCEvents.STOP_SERVER
    }, flags=zmq.NOBLOCK)
    if self.socket.poll(timeout=1000, flags=zmq.POLLIN) == 0:
        logger.info(f"Server {self.url} did not respond (might already be dead)")
        return
    reply = self.socket.recv_json()
    logger.debug(f"Got stop reply {reply}")

RPCEvents Link

Bases: StrEnum

Enumeration of valid RPC event types used for communication between clients and servers.

RPCProperty Link

RPCProperty(fget=None, fset=None, fdel=None, doc=None, notify=None, auto_notify=False)

Bases: property

RPC-enabled property descriptor.

This subclass of :class:property adds optional notification support via an :class:RPCSignal. When a property created with RPCProperty is assigned a new value, the supplied notify signal (if provided) can be emitted to inform remote listeners of the change. The class is typically used as a decorator on a getter function; the optional notify argument is stored on the resulting property object and can be accessed by custom setter implementations. The setter must explicitly emit the ''notify'' signal if a change occurred.

Attributes:

Name Type Description
_notifier RPCSignal or None

Signal that will be emitted when the property's value changes. It is initialized from the notify argument of __init__ and may be None when no notification is required.

_auto_notify bool

When True, when the setter of the property is called and the value is modified, the _notifier is emitted.

Initialize the property with optional RPC notification.

This subclass of property adds support for emitting an RPC signal when the property value changes. If auto_notify is True the signal provided via notify is emitted automatically after a successful set operation; otherwise the caller must trigger the notification manually.

Parameters:

Name Type Description Default
fget callable

Getter function that receives the instance and returns the attribute value.

None
fset callable

Setter function that receives the instance and the value to assign.

None
fdel callable

Deleter function that receives the instance and removes the attribute.

None
doc str

Documentation string for the property.

None
notify RPCSignal

RPC signal emitted when the property value is changed.

None
auto_notify bool

When True, automatically emit notify after a successful set.

``False``
See Also

property Built‑in property class that this class extends.

Source code in plantimager/commons/RPC.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def __init__(self, fget=None, fset=None, fdel=None, doc=None, notify: RPCSignal = None, auto_notify=False):
    """
    Initialize the property with optional RPC notification.

    This subclass of ``property`` adds support for emitting an RPC signal
    when the property value changes. If ``auto_notify`` is ``True`` the
    signal provided via ``notify`` is emitted automatically after a successful
    set operation; otherwise the caller must trigger the notification
    manually.

    Parameters
    ----------
    fget : callable, optional
        Getter function that receives the instance and returns the attribute
        value.
    fset : callable, optional
        Setter function that receives the instance and the value to assign.
    fdel : callable, optional
        Deleter function that receives the instance and removes the attribute.
    doc : str, optional
        Documentation string for the property.
    notify : RPCSignal, optional
        RPC signal emitted when the property value is changed.
    auto_notify : bool, default: ``False``
        When ``True``, automatically emit ``notify`` after a successful set.

    See Also
    --------
    property
        Built‑in ``property`` class that this class extends.
    """
    super().__init__(fget=fget, fset=fset, fdel=fdel, doc=doc)
    self._notifier: RPCSignal | None = notify
    self._auto_notify: bool = auto_notify

__set__ Link

__set__(obj, value)

Set the property on obj and emit the _notifier signal if the observable value actually changes.

The logic is: 1. Retrieve the old value via the getter (if any). 2. Call the original setter. 3. Retrieve the new value via the getter. 4. If old != new and a notifier exists, emit the signal with the new value.

Source code in plantimager/commons/RPC.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
def __set__(self, obj, value):
    """
    Set the property on *obj* and emit the ``_notifier`` signal if the
    observable value actually changes.

    The logic is:
    1. Retrieve the old value via the getter (if any).
    2. Call the original setter.
    3. Retrieve the new value via the getter.
    4. If ``old != new`` and a notifier exists, emit the signal with the
       new value.
    """
    # Step 1 - capture the old value (may be None if no getter)
    old_val = None
    if self._auto_notify and self.fget is not None:
        try:
            old_val = self.fget(obj)
        except Exception:
            # If the getter fails we simply ignore the old value;
            # the change‑detection will fall back to always emit.
            old_val = object()  # unique sentinel

    # Step 2 - invoke the original setter (if any)
    super().__set__(obj, value)

    # Step 3 - capture the new value via the getter (if any)
    new_val = None
    if self._auto_notify and self.fget is not None:
        try:
            new_val = self.fget(obj)
        except Exception:
            new_val = object()  # unique sentinel

    # Step 4 - emit if changed and a notifier is present
    if self._auto_notify and self._notifier is not None and old_val != new_val:
        try:
            self._notifier.emit(new_val)
        except Exception as exc:
            # We never want a notification failure to break the setter.
            logger.error(f"Failed to emit RPCProperty change signal: {exc}")

RPCServer Link

RPCServer(context, url, alive_timeout=60)

RPCServer to use in combination with RPCClient.

The server holds the concrete implementation of an interface that is made available on the network. To create an RPCServer create a class inheriting from an interface and RPCServer. Callable methods must be decorated using RPCServer.register_method_buffer or RPCServer.register_method_json. The server is also capable of sending signals defined by the RPCSignal class and proxying properties defined with RPCProperty.

Attributes:

Name Type Description
context Context

ZMQ context used to make the various sockets necessary for communicating with the client.

url str

The base URL where the RPCServer is opened. The url should include the port in the form tcp://<ip>:<port>.

alive_timeout int

Timeout (in seconds) used for alive checks against the device registry. If the server cannot be reached for three consecutive checks it is considered dead and will stop serving. uuid : str | None Unique identifier provided by the registry. None until the server is registered.

uuid str | None

Unique identifier provided by the registry. None until the server is registered.

name str

Name of the RPCServer assigned by the device registry after successful registration.

registry_addr str

URL of the device registry to which the server is registered. Empty string if the server has not been registered.

peer_addr str | None

IP address of the connected client once a peer discovery request has been processed.

_unreachable_counter int

Counter of consecutive failed alive checks against the registry. When the count reaches 3 the server is considered dead and will stop serving.

_json_methods dict[str, Callable]

Mapping of method names to the underlying callables that are exposed as JSON‑based RPC procedures.

_buffer_methods dict[str, Callable]

Mapping of method names to the underlying callables that are exposed as buffer‑based RPC procedures.

_rpc_properties dict[str, RPCProperty]

RPC properties discovered on the class.

_signals dict[str, RPCSignal]

Signal objects discovered on the class.

_socket Socket[REP]

REP socket bound to url (or to a randomly chosen port when url does not specify one) that receives incoming RPC requests.

port int

The TCP port selected by :meth:_bind_socket. If the url does not contain an explicit port, a random one in the range 10000-12000 is chosen.

_signal_socket Socket[REQ] | None

Socket used for sending signal notifications to the client.

_stop bool

Flag controlling the termination of :meth:serve_forever.

_dead bool

Indicates whether the server has already been shut down.

Initialize a new RPCServer.

Parameters:

Name Type Description Default
context Context

ZMQ context used to make the various sockets necessary for communicating with the client.

required
url str

Url where the RPCServer should listen. It should be of the form "tcp://" where ip is one of the local network interface ip which must be accessible from the client.

required
alive_timeout int

Time in seconds after which the device_registry will consider this service dead or unreachable.

60
Source code in plantimager/commons/RPC.py
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
def __init__(self, context: zmq.Context, url: str, alive_timeout: int = 60):
    """
    Initialize a new RPCServer.

    Parameters
    ----------
    context: zmq.Context
        ZMQ context used to make the various sockets necessary for communicating with the client.
    url: str
        Url where the RPCServer should listen. It should be of the form "tcp://<ip>" where ip is one
        of the local network interface ip which must be accessible from the client.
    alive_timeout: int, optional
        Time in seconds after which the device_registry will consider this service dead or unreachable.
    """
    super().__init__()

    self._type = None
    self.context: zmq.Context = context
    self.url: str = url
    self.alive_timeout = alive_timeout
    self.uuid: str | None = None
    self.name = ""
    self.registry_addr = ""
    self.peer_addr: str | None = None
    self._unreachable_counter = 0

    # Containers for RPC members
    self._json_methods: dict[str, Callable] = {}
    self._buffer_methods: dict[str, Callable] = {}
    self._rpc_properties: dict[str, property] = {}
    self._signals: dict[str, RPCSignal] = {}

    # Initialize internals
    self._initialize_rpc_members()
    self._socket, self.port = self._bind_socket(url)
    self._signal_socket: zmq.Socket[zmq.REQ] | None = None
    self._stop = False

    self._setup_lifecycle_cleanup()
    self._dead = False

register_method_buffer staticmethod Link

register_method_buffer(timeout=10000)

Registers this method as remote callable procedure which will transmit its output as a buffer-like object as well as a buffer_info dictionary.

method may take any input which will be serialized via json and must output a 2-tuple: (memoryview or bytes, dict).

Parameters:

Name Type Description Default
timeout int | None

Specifies how much time in ms a client is expected to wait for the method to finish. If None, waits indefinitely.

10000

Returns:

Name Type Description
decorator Callable[Callable[..., tuple[memoryview | bytes, dict]], Callable[..., tuple[memoryview | bytes, dict]]]
Source code in plantimager/commons/RPC.py
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
@staticmethod
def register_method_buffer(timeout: int | None = 10000):
    """
    Registers this method as remote callable procedure which will transmit its output as a buffer-like object
    as well as a buffer_info dictionary.

    `method` may take any input which will be serialized via json and must output a 2-tuple:
    (memoryview or bytes, dict).


    Parameters
    ----------
    timeout: int | None
        Specifies how much time in ms a client is expected to wait for the method to finish. If None, waits indefinitely.

    Returns
    -------
    decorator: Callable[Callable[..., tuple[memoryview|bytes, dict]], Callable[..., tuple[memoryview|bytes, dict]]]

    """
    if inspect.isfunction(timeout):
        timeout._is_buffer_method = True
        timeout._timeout = 10000
        return timeout

    def _decorator(method: Callable[..., tuple[memoryview | bytes, dict]]):
        method._is_buffer_method = True
        method._timeout = timeout
        return method

    return _decorator

register_method_json staticmethod Link

register_method_json(timeout=10000)

Registers this method as remote callable procedure which will transmit its output via json.

It is advised to only send basic types and containers as output (int, float, str, bool, list, tuple, dict, ...) Arguments are also serialized via json.

Parameters:

Name Type Description Default
timeout int | None

Specifies how much time in ms a client is expected to wait for the method to finish. If None, waits indefinitely.

10000

Returns:

Type Description
Callable[Callable[..., Any], Callable[..., Any]]
Source code in plantimager/commons/RPC.py
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
@staticmethod
def register_method_json(timeout: int | None = 10000):
    """
    Registers this method as remote callable procedure which will transmit its output via json.

    It is advised to only send basic types and containers as output (int, float, str, bool, list, tuple, dict, ...)
    Arguments are also serialized via json.

    Parameters
    ----------
    timeout: int | None
        Specifies how much time in ms a client is expected to wait for the method to finish. If None, waits indefinitely.

    Returns
    -------
    Callable[Callable[..., Any], Callable[..., Any]]

    """
    if inspect.isfunction(timeout):
        timeout._is_json_method = True
        timeout._timeout = 10000
        return timeout

    def _decorator(method: Callable[..., Any]):
        method._is_json_method = True
        method._timeout = timeout
        return method

    return _decorator

register_to_registry Link

register_to_registry(type_, name, registry_url, overwrite=True)

Register this RPCServer to the registry at registry_address as a device of type type_ and name name.

Note: The name not be accepted as is and may be modified by the registry to avoid duplicate. This method returns the accepted name of this device.

Parameters:

Name Type Description Default
type_ str

Name of the device type.

required
name str

Proposed name of the device.

required
registry_url str

Url of the device registry. Must have the form "tcp://:" if the registry uses tcp

required
overwrite

Wether or not this device should take preference for the use of this name and overwrite other conflicting devices.

True

Returns:

Type Description
str

Name of this device as accepted by the registry.

Source code in plantimager/commons/RPC.py
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
def register_to_registry(self, type_: str, name: str, registry_url: str, overwrite=True) -> str:
    """
    Register this RPCServer to the registry at `registry_address` as a device of type `type_` and name `name`.

    Note: The name not be accepted as is and may be modified by the registry to avoid duplicate. This method
    returns the accepted name of this device.

    Parameters
    ----------
    type_: str
        Name of the device type.
    name: str
        Proposed name of the device.
    registry_url: str
        Url of the device registry. Must have the form "tcp://<ip>:<port>" if the registry uses tcp
    overwrite: bool, optional
        Wether or not this device should take preference for the use of this name and overwrite other
        conflicting devices.

    Returns
    -------
    str
        Name of this device as accepted by the registry.
    """
    logger.debug(f"Register device {name} of type {type_} to {registry_url}")
    self._type = type_
    self.name, self.uuid = register_device(
        self.context, type_,
        f"{self.url}:{self.port}",
        name, registry_url,
        overwrite=overwrite,
    )
    if not self.name:
        logger.warning(f"Failed to register device {name} of type {type_} as {registry_url}")
    self.registry_addr = registry_url if self.name else ""

    # Update cleanup state so finalizer knows what to unregister
    self._cleanup_state["uuid"] = self.uuid
    self._cleanup_state["registry_addr"] = self.registry_addr

    logger.info(f"Successfully registered device {name} of type {type_} to {registry_url}")

    return self.name

serve_forever Link

serve_forever()

Serve the RPC server indefinitely, handling incoming requests until a stop signal is received.

The method enters a loop that repeatedly notifies the watchdog, checks the server's liveness, waits for a request, and dispatches the request based on its event field. Supported events include peer discovery, inventory retrieval, signal handling initialization, method invocation, property access, and server shutdown. Upon receiving a STOP_SERVER event the loop terminates, sockets are closed, and a log entry is written.

If registered to the registry and the check_alive fails, exits the loop

Returns:

Type Description
None

The server runs until it is stopped; no value is returned.

Notes
  • The private attribute _stop controls the loop termination. It is set to True only when a STOP_SERVER request is processed.
  • _socket and _signal_socket are closed during cleanup; if either attribute is None the corresponding close call is skipped.
  • notify_watchdog and _alive_check are called on every iteration to maintain server health monitoring.
  • The method assumes that _wait_for_request returns a mapping with an event key; a falsy return value causes the loop to continue without processing.
See Also

RPCEvents : Enum defining the possible request events. _handle_find_peer_address, _handle_get_inventory, _handle_init_signal_handling, _handle_method_call, _handle_property_get, _handle_property_set : Private helper methods that implement the handling logic for each event type.

Source code in plantimager/commons/RPC.py
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
def serve_forever(self):
    """
    Serve the RPC server indefinitely, handling incoming requests until a stop signal is
    received.

    The method enters a loop that repeatedly notifies the watchdog, checks the server's
    liveness, waits for a request, and dispatches the request based on its ``event`` field.
    Supported events include peer discovery, inventory retrieval, signal handling
    initialization, method invocation, property access, and server shutdown. Upon receiving
    a ``STOP_SERVER`` event the loop terminates, sockets are closed, and a log entry is
    written.

    If registered to the registry and the check_alive fails, exits the loop

    Returns
    -------
    None
        The server runs until it is stopped; no value is returned.

    Notes
    -----
    * The private attribute ``_stop`` controls the loop termination. It is set to
      ``True`` only when a ``STOP_SERVER`` request is processed.
    * ``_socket`` and ``_signal_socket`` are closed during cleanup; if either attribute is
      ``None`` the corresponding ``close`` call is skipped.
    * ``notify_watchdog`` and ``_alive_check`` are called on every iteration to maintain
      server health monitoring.
    * The method assumes that ``_wait_for_request`` returns a mapping with an ``event``
      key; a falsy return value causes the loop to continue without processing.

    See Also
    --------
    RPCEvents : Enum defining the possible request events.
    _handle_find_peer_address, _handle_get_inventory, _handle_init_signal_handling,
    _handle_method_call, _handle_property_get, _handle_property_set :
        Private helper methods that implement the handling logic for each event type.
    """
    if self._dead:
        logger.error("RPCServer is in dead state. A new instance must be created.")
        raise RuntimeError("RPCServer is in dead state. A new instance must be created.")
    self._stop = False
    while not self._stop:
        notify_watchdog()
        if not self._alive_check():
            break
        request = self._wait_for_request()
        if not request:
            continue
        try:
            logger.debug(f"Received request: {request['event']}")
            t0 = time.monotonic()
            match request["event"]:
                case RPCEvents.GET_INVENTORY:
                    reply = self._handle_get_inventory()
                    self._send_reply(reply)
                case RPCEvents.INIT_SIGNALS_HANDLING:
                    reply = self._handle_init_signal_handling(request)
                    self._send_reply(reply)
                case RPCEvents.METHOD_CALL:
                    reply, use_multipart = self._handle_method_call(request)
                    self._send_reply(reply, use_multipart)
                case RPCEvents.PROPERTY_GET:
                    reply = self._handle_property_get(request)
                    self._send_reply(reply)
                case RPCEvents.PROPERTY_SET:
                    reply = self._handle_property_set(request)
                    self._send_reply(reply)
                case RPCEvents.STOP_SERVER:
                    self._socket.send_json({"success": True})
                    self._stop = True
            logger.debug(f"Event {request['event']} treated in {time.monotonic() - t0}s")
        except Exception as exc:
            logger.error(f"Unexpected error while handling request: {exc}")
            # If we have a request, we can at least try to answer with a generic error.
            if isinstance(request, dict) and "event" in request:
                self._send_reply(self._make_error_reply(exc, "serve_forever"))
            break  # abort the loop - we are in an inconsistent state

    # cleanup
    if self._socket:
        self._socket.close()
    if self._signal_socket:
        self._signal_socket.close()
    self._dead = True
    logger.info("Server stopped")

stop_server Link

stop_server()

Stop the server and unregister the device if it has been registered.

The method sets the internal stop flag, optionally calls :func:unregister_device to remove the device from a remote registry, clears the identifying attributes (name, uuid, registry_addr), and resets the corresponding entries in _cleanup_state to None. This prevents a second unregister attempt during cleanup.

Raises:

Type Description
Exception

Propagates any exception raised by :func:unregister_device, e.g. network errors or authentication failures.

Notes
  • The device is only unregistered when all three attributes self.name, self.registry_addr and self.uuid evaluate to True. If any of them is falsy, the function simply sets the stop flag and returns.
  • self._cleanup_state is updated after a successful unregister to avoid duplicate cleanup actions later in the object's lifecycle.
See Also

unregister_device : Function that removes a device from the registry.

Source code in plantimager/commons/RPC.py
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
def stop_server(self):
    """
    Stop the server and unregister the device if it has been registered.

    The method sets the internal stop flag, optionally calls
    :func:`unregister_device` to remove the device from a remote registry,
    clears the identifying attributes (``name``, ``uuid``, ``registry_addr``),
    and resets the corresponding entries in ``_cleanup_state`` to ``None``.
    This prevents a second unregister attempt during cleanup.

    Raises
    ------
    Exception
        Propagates any exception raised by :func:`unregister_device`, e.g.
        network errors or authentication failures.

    Notes
    -----
    * The device is only unregistered when all three attributes
      ``self.name``, ``self.registry_addr`` and ``self.uuid`` evaluate to
      ``True``. If any of them is falsy, the function simply sets the
      stop flag and returns.
    * ``self._cleanup_state`` is updated after a successful unregister to
      avoid duplicate cleanup actions later in the object's lifecycle.

    See Also
    --------
    unregister_device : Function that removes a device from the registry.
    """
    self._stop = True
    if self.name and self.registry_addr and self.uuid:
        unregister_device(self.context, self.uuid, self.registry_addr)
        self.name = ""
        self.uuid = ""
        self.registry_addr = ""

        # Clear cleanup state to avoid double unregister
        self._cleanup_state["uuid"] = None
        self._cleanup_state["registry_addr"] = None

        logger.info("Device unregistered successfully")

RPCSignal Link

RPCSignal(*arg_types)

Lightweight publish‑subscribe signal implementation.

This class stores either strong or weak references to callables and invokes them with supplied arguments.

Instances are created with a variable number of type specifications that describe the expected arguments for the signal. These specifications are stored unchanged in the :attr:args attribute; they are not validated at runtime but may be used by callers for documentation or static checking.

Connections are added via :meth:connect and removed via :meth:disconnect. When the signal is emitted with :meth:emit, each stored connection is called in the order it was added. Weak references that have been garbage‑collected are silently ignored.

Parameters:

Name Type Description Default
*arg_types tuple

Positional argument type specifications supplied at construction time. The contents are opaque to the implementation - they are kept only for external introspection.

()

Attributes:

Name Type Description
arg_types tuple

The positional argument type specifications supplied to __init__. Callers may inspect this attribute for documentation or validation purposes.

connections list

A mutable list of connected callables or weak references. Each entry is either a callable object or a :class:weakref.WeakMethod. The list is modified by :meth:connect and :meth:disconnect. Callables are invoked in the order they were added when :meth:emit is called.

See Also

weakref.WeakMethod : Standard library class used to store weak references to bound methods.

Examples:

>>> from plantimager.commons.RPC import RPCSignal
>>> def listener(x, y): print(f"Received: {x}, {y}")
>>> signal = RPCSignal(int, str)
>>> signal.connect(listener)
>>> signal.emit(10, "hello")
Received: 10, hello
>>> signal.disconnect(listener)

Initialize a new RPCSignal instance.

Parameters:

Name Type Description Default
*arg_types tuple

Positional argument type specifications. The values are stored in :attr:args but are otherwise not interpreted by the signal.

()
Notes

arg_types can be any hashable objects (e.g., int, str, numpy.ndarray) that the user wishes to document as the expected argument types for the signal.

Source code in plantimager/commons/RPC.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def __init__(self, *arg_types):
    """
    Initialize a new ``RPCSignal`` instance.

    Parameters
    ----------
    *arg_types : tuple
        Positional argument type specifications. The values are stored in
        :attr:`args` but are otherwise not interpreted by the signal.

    Notes
    -----
    ``arg_types`` can be any hashable objects (e.g., ``int``, ``str``,
    ``numpy.ndarray``) that the user wishes to document as the expected
    argument types for the signal.
    """
    self.arg_types = arg_types
    self.connections: list[Callable | WeakMethod] = []

connect Link

connect(conn)

Connect a callable (or weak reference) to the signal.

Parameters:

Name Type Description Default
conn Callable or WeakMethod

The function, bound method, or weak reference that should be called when the signal is emitted.

required

Raises:

Type Description
TypeError

If conn is neither a callable nor a :class:weakref.WeakMethod.

Notes

The same conn is added only once; duplicate connections are ignored.

Examples:

>>> from plantimager.commons.RPC import RPCSignal
>>> def listener(x, y): print(f"Received: {x}, {y}")
>>> signal = RPCSignal(int, str)
>>> signal.connect(listener)
>>> signal.emit(10, "hello")
Received: 10, hello
Source code in plantimager/commons/RPC.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def connect(self, conn: Callable | WeakMethod):
    """
    Connect a callable (or weak reference) to the signal.

    Parameters
    ----------
    conn : Callable or weakref.WeakMethod
        The function, bound method, or weak reference that should be called
        when the signal is emitted.

    Raises
    ------
    TypeError
        If ``conn`` is neither a callable nor a :class:`weakref.WeakMethod`.

    Notes
    -----
    The same ``conn`` is added only once; duplicate connections are ignored.

    Examples
    --------
    >>> from plantimager.commons.RPC import RPCSignal
    >>> def listener(x, y): print(f"Received: {x}, {y}")
    >>> signal = RPCSignal(int, str)
    >>> signal.connect(listener)
    >>> signal.emit(10, "hello")
    Received: 10, hello
    """
    if not isinstance(conn, (weakref.WeakMethod, Callable)):
        raise TypeError("Expected callable or weakref.WeakMethod, got {}".format(type(conn)))
    if conn not in self.connections:
        self.connections.append(conn)

disconnect Link

disconnect(conn=None)

Remove a previously connected callable or clear all connections.

Parameters:

Name Type Description Default
conn Callable

The specific callable or weak reference to remove. If omitted (or None), all connections are cleared.

None

Raises:

Type Description
ValueError

If conn is provided but is not present in :attr:connections.

Notes

After a successful call, the target is no longer invoked by future :meth:emit calls.

Examples:

>>> from plantimager.commons.RPC import RPCSignal
>>> def listener(x, y): print(f"Received: {x}, {y}")
>>> signal = RPCSignal(int, str)
>>> signal.connect(listener)
>>> signal.emit(10, "hello")
Received: 10, hello
>>> signal.disconnect(listener)
>>> signal.emit(10, "hello")
Source code in plantimager/commons/RPC.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def disconnect(self, conn: Callable = None):
    """
    Remove a previously connected callable or clear all connections.

    Parameters
    ----------
    conn : Callable, optional
        The specific callable or weak reference to remove. If omitted (or
        ``None``), *all* connections are cleared.

    Raises
    ------
    ValueError
        If ``conn`` is provided but is not present in :attr:`connections`.

    Notes
    -----
    After a successful call, the target is no longer invoked by future
    :meth:`emit` calls.

    Examples
    --------
    >>> from plantimager.commons.RPC import RPCSignal
    >>> def listener(x, y): print(f"Received: {x}, {y}")
    >>> signal = RPCSignal(int, str)
    >>> signal.connect(listener)
    >>> signal.emit(10, "hello")
    Received: 10, hello
    >>> signal.disconnect(listener)
    >>> signal.emit(10, "hello")
    """
    if conn:
        self.connections.remove(conn)
    else:
        self.connections.clear()

emit Link

emit(*args)

Emit the signal, invoking all connected callables.

Parameters:

Name Type Description Default
*args tuple

Positional arguments that will be forwarded to each connected callable. The number and type of arguments should match the specifications stored in :attr:args, but this is not enforced.

()
Notes
  • Weak references stored as :class:weakref.WeakMethod are dereferenced before the call; if the underlying object has been garbage‑collected, the entry is simply skipped.
  • Any exception raised by a connected callable propagates to the caller of :meth:emit. The method does not catch or suppress errors.

Examples:

>>> from plantimager.commons.RPC import RPCSignal
>>> def listener(x, y): print(f"Received: {x}, {y}")
>>> signal = RPCSignal(int, str)
>>> signal.connect(listener)
>>> signal.emit(10, "hello")
Received: 10, hello
Source code in plantimager/commons/RPC.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def emit(self, *args):
    """
    Emit the signal, invoking all connected callables.

    Parameters
    ----------
    *args : tuple
        Positional arguments that will be forwarded to each connected
        callable. The number and type of arguments should match the
        specifications stored in :attr:`args`, but this is not enforced.

    Notes
    -----
    - Weak references stored as :class:`weakref.WeakMethod` are dereferenced
      before the call; if the underlying object has been garbage‑collected,
      the entry is simply skipped.
    - Any exception raised by a connected callable propagates to the caller
      of :meth:`emit`. The method does **not** catch or suppress errors.

    Examples
    --------
    >>> from plantimager.commons.RPC import RPCSignal
    >>> def listener(x, y): print(f"Received: {x}, {y}")
    >>> signal = RPCSignal(int, str)
    >>> signal.connect(listener)
    >>> signal.emit(10, "hello")
    Received: 10, hello
    """
    args = self.validate_args(*args, coerce=True)
    for conn in self.connections:
        if isinstance(conn, WeakMethod) and (func := conn()):
            # If the onnection is a WeakMethod and the method still lives
            func(*args)
        elif not isinstance(conn, weakref.WeakMethod):
            # If this is not a weak method (ergo an actual method)
            conn(*args)

validate_args Link

validate_args(*args, coerce=False)

Validates and coerces input arguments based on expected types.

This method validates the provided args against the expected types specified in self.arg_types. If coerce is set to True, it attempts to coerce the arguments to the expected types when a mismatch occurs. If validation or coercion fails, the method raises appropriate exceptions.

Parameters:

Name Type Description Default
*args

Positional arguments to be validated against self.arg_types. The number of arguments must match the number of expected types.

()
coerce bool

If True, attempts to coerce arguments to the expected type in case of a mismatch. Defaults to False.

False

Returns:

Type Description
tuple

A tuple of validated (or coerced) arguments in the same order as the input.

Raises:

Type Description
RuntimeError

Raised if the number of provided arguments does not match the number of expected types in self.arg_types.

TypeError

Raised if an argument type mismatch occurs and coerce is False, or if coercion fails when coerce is True.

See Also

coerce_to_generic : Function used to coerce arguments to generic types. is_instance_of_generic : Function to check whether an argument matches an expected type.

Source code in plantimager/commons/RPC.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def validate_args(self, *args, coerce=False) -> tuple:
    """
    Validates and coerces input arguments based on expected types.

    This method validates the provided `args` against the expected types specified
    in `self.arg_types`. If `coerce` is set to `True`, it attempts to coerce the
    arguments to the expected types when a mismatch occurs. If validation or coercion
    fails, the method raises appropriate exceptions.

    Parameters
    ----------
    *args
        Positional arguments to be validated against `self.arg_types`. The number
        of arguments must match the number of expected types.
    coerce : bool, optional
        If `True`, attempts to coerce arguments to the expected type in case of a
        mismatch. Defaults to `False`.

    Returns
    -------
    tuple
        A tuple of validated (or coerced) arguments in the same order as the input.

    Raises
    ------
    RuntimeError
        Raised if the number of provided arguments does not match the number of
        expected types in `self.arg_types`.
    TypeError
        Raised if an argument type mismatch occurs and `coerce` is `False`, or if
        coercion fails when `coerce` is `True`.

    See Also
    --------
    coerce_to_generic : Function used to coerce arguments to generic types.
    is_instance_of_generic : Function to check whether an argument matches an expected type.
    """
    if len(args) != len(self.arg_types):
        logger.error(f"Expected {len(self.arg_types)} arguments, got {len(args)}.")
        raise RuntimeError(f"Expected {len(self.arg_types)} arguments, got {len(args)}.")
    new_args = []
    # Validates and coerces each argument; errors on type mismatch
    for i, (arg, arg_type) in enumerate(zip(args, self.arg_types)):
        if coerce:
            new_args.append(coerce_to_generic(arg, arg_type))
        elif is_instance_of_generic(arg, arg_type):
            new_args.append(arg)
        else:
            logger.error(f"Argument {i} of type {type(arg)} is not an instance of {arg_type}.")
            raise TypeError(f"Argument {i} of type {type(arg)} is not an instance of {arg_type}.")
    return tuple(new_args)

RPCSignalReceiver Link

RPCSignalReceiver(context, url, signals)

Bases: Thread

Background thread that listens for and dispatches RPC signals from the server.

This thread binds a ZeroMQ REP socket to a random port and continuously polls for incoming signal events. When a signal is received, it emits the corresponding proxy signal on the client side.

Attributes:

Name Type Description
context Context

The ZeroMQ context used to create the socket.

url str

The base URL to bind the receiver socket (e.g., "tcp://127.0.0.1").

signals dict[str, RPCSignal]

A dictionary mapping signal names to their corresponding RPCSignal instances on the client.

socket Socket

A ZeroMQ SUB socket that receives broadcasted signal events from the server.

_stop_flag bool

A flag used by run() to know when to exit the loop.

Initialize a new RPCSignalReceiver.

Parameters:

Name Type Description Default
context Context

The ZeroMQ context used to create the socket.

required
url str

The base URL of the server's PUB endpoint (e.g. "tcp://127.0.0.1").

required
signals dict[str, RPCSignal]

Dictionary mapping signal names to the corresponding RPCSignal instances on the client.

required
Notes

The underlying socket is a SUB socket that subscribes to all topics (socket.setsockopt_string(zmq.SUBSCRIBE, "")). This class does not expose the socket directly; use the public attributes if custom behaviour is required.

Raises:

Type Description
TypeError

If context is not a :class:zmq.Context instance, url is not a string, or signals is not a dict with string keys.

Source code in plantimager/commons/RPC.py
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
def __init__(self, context: zmq.Context, url: str, signals: dict[str, RPCSignal]):
    """
    Initialize a new `RPCSignalReceiver`.

    Parameters
    ----------
    context : zmq.Context
        The ZeroMQ context used to create the socket.
    url : str
        The base URL of the server's ``PUB`` endpoint (e.g. ``"tcp://127.0.0.1"``).
    signals : dict[str, RPCSignal]
        Dictionary mapping signal names to the corresponding
        `RPCSignal` instances on the client.

    Notes
    -----
    The underlying socket is a ``SUB`` socket that subscribes to *all*
    topics (``socket.setsockopt_string(zmq.SUBSCRIBE, "")``). This class
    does not expose the socket directly; use the public attributes if
    custom behaviour is required.

    Raises
    ------
    TypeError
        If ``context`` is not a :class:`zmq.Context` instance, ``url`` is not
        a string, or ``signals`` is not a ``dict`` with string keys.
    """
    super().__init__(name="RPCSignalReceiver")
    self.context = context
    self.url = url  # now the server's PUB endpoint
    self._stop_flag = False
    self.signals = signals
    self.socket: zmq.Socket = context.socket(zmq.SUB)
    self.socket.connect(url)  # connect to the server's PUB endpoint
    self.socket.setsockopt_string(zmq.SUBSCRIBE, "")  # subscribe to all

run Link

run()

Continuously processes incoming socket requests to emit signals.

This method runs an event loop that listens for incoming JSON requests via a ZeroMQ socket. Requests are expected to contain an event key and associated data such as signals and arguments. If a request matches the expected event type (RPCEvents.EMIT_SIGNAL), a corresponding signal is emitted. The loop runs until the _stop_flag is set to True.

Notes
  • If the "blocking" flag in the request is True, the signal emission is performed before sending back a success response. Otherwise, the success response is sent immediately, and the signal emission happens afterward.

Raises:

Type Description
KeyError

If the incoming request JSON does not contain the required keys.

ZMQError

If there is an issue receiving or sending data via the ZeroMQ socket.

RuntimeError

If an error occurs while emitting a signal.

Examples:

>>> import zmq
>>> from threading import Thread
>>> from plantimager.commons.RPC import RPCSignal
>>> from plantimager.commons.RPC import RPCSignalReceiver
>>> ctx = zmq.Context()
>>> signals = {"update": RPCSignal()}
>>> receiver = RPCSignalReceiver(ctx, "tcp://127.0.0.1:5555", signals)
>>> receiver.start()               # start the background thread
>>> # ... elsewhere, the server publishes a signal ...
>>> receiver.stop()                # request graceful shutdown
>>> receiver.join()                # wait for the thread to finish
Source code in plantimager/commons/RPC.py
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
def run(self):
    """
    Continuously processes incoming socket requests to emit signals.

    This method runs an event loop that listens for incoming JSON requests via
    a ZeroMQ socket. Requests are expected to contain an event key and
    associated data such as signals and arguments. If a request matches the
    expected event type (`RPCEvents.EMIT_SIGNAL`), a corresponding signal
    is emitted. The loop runs until the `_stop_flag` is set to `True`.

    Notes
    -----
    - If the `"blocking"` flag in the request is `True`, the signal emission is
      performed before sending back a success response. Otherwise, the success
      response is sent immediately, and the signal emission happens afterward.

    Raises
    ------
    KeyError
        If the incoming request JSON does not contain the required keys.
    zmq.ZMQError
        If there is an issue receiving or sending data via the ZeroMQ socket.
    RuntimeError
        If an error occurs while emitting a signal.

    Examples
    --------
    >>> import zmq
    >>> from threading import Thread
    >>> from plantimager.commons.RPC import RPCSignal
    >>> from plantimager.commons.RPC import RPCSignalReceiver
    >>> ctx = zmq.Context()
    >>> signals = {"update": RPCSignal()}
    >>> receiver = RPCSignalReceiver(ctx, "tcp://127.0.0.1:5555", signals)
    >>> receiver.start()               # start the background thread
    >>> # ... elsewhere, the server publishes a signal ...
    >>> receiver.stop()                # request graceful shutdown
    >>> receiver.join()                # wait for the thread to finish
    """
    try:
        while not self._stop_flag:
            if self.socket.poll(100, zmq.POLLIN) == 0:
                continue
            request = self.socket.recv_json()
            if request["event"] != RPCEvents.EMIT_SIGNAL:
                logger.error(f"Expected event {RPCEvents.EMIT_SIGNAL}, got {request['event']} instead.")
                continue
            signal = request["signal"]
            args = request["args"]
            logger.debug(f"Emitting signal {signal} with args {args}")
            self.signals[signal].emit(*args)
    finally:
        self.socket.close()
        del self.socket
    logger.debug(f"Stopping signal receiver {self}")

stop Link

stop()

Signal the receiver thread to stop polling and shut down.

Source code in plantimager/commons/RPC.py
552
553
554
555
556
def stop(self):
    """
    Signal the receiver thread to stop polling and shut down.
    """
    self._stop_flag = True