Skip to content

RPC

plantimager.commons.RPC Link

NoResult Link

NoResult(error, traceback)

Represents an outcome with no result, typically used to indicate an operation failure along with associated error details.

This class encapsulates the error message and traceback information to provide a structured representation of operation failures, useful for logging or debugging purposes.

This class is Falsey

Attributes:

Name Type Description
error str

A string containing the error message describing the nature of the failure.

traceback str

A string containing the traceback or detailed information about where and why the failure occurred.

Source code in plantimager/commons/RPC.py
59
60
61
def __init__(self, error: str, traceback: str):
    self.error = error
    self.traceback = traceback

RPCClient Link

RPCClient(context, url)

Abstract class for RPC clients. To use, create a class inheriting from the interface and this. This new class must be decorated with @RPCClient.register_interface

Source code in plantimager/commons/RPC.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def __init__(self, context: zmq.Context, url: str):
    super().__init__()
    self.context: zmq.Context = context
    self.url: str = url
    self.socket: zmq.Socket = context.socket(zmq.REQ)
    self.socket.connect(self.url)

    # replacing class attribute signals with instance signals
    for base in self.__class__.__bases__:
        for key, value in base.__dict__.items():
            if isinstance(value, RPCSignal):
                setattr(self, key, copy.deepcopy(value))

    # Finding peer address (zmq abstract addresses so we use native sockets here)
    self.socket.send_json({"event": RPCEvents.FIND_PEER_ADDRESS})
    reply = self.socket.recv_json()
    if not reply["success"]:
        raise RuntimeError(f"FIND_PEER_ADDRESS failed")
    protocol, ip_addr, port = url_parser.match(url).groups()
    s = socket.create_connection((ip_addr, reply["port"]))
    self.own_address = s.getsockname()[0]
    self.peer_address = s.getpeername()[0]
    s.close()
    logger.debug(f"Client at address {self.own_address} connected to server at {self.peer_address}")

    # Getting RPCServer inventory
    self.socket.send_json({
        "event": RPCEvents.GET_INVENTORY
    })
    reply = self.socket.recv_json()
    logger.debug(f"Got inv: {reply}",)
    self._json_methods: dict[str, int|None] = reply["json_methods"]
    self._buffer_methods: dict[str, int|None] = reply["buffer_methods"]
    self._signals = {sig: getattr(self, sig) for sig in reply["signals"] if hasattr(self, sig)}
    self._properties: list = reply["properties"]
    self.name: str = reply["name"]

    # If signals initiate
    self._signal_receiver = None
    if self._signals:
        logger.info("Initializing signal handling")
        self._signal_receiver = RPCSignalReceiver(
            context=self.context, url=f"tcp://{self.own_address}", signals=self._signals
        )
        self._signal_receiver.daemon = True
        self._signal_receiver.start()
        signal_port = self._signal_receiver.port
        self.socket.send_json({"event": RPCEvents.INIT_SIGNALS_HANDLING, "address": self.own_address, "port": signal_port})
        reply = self.socket.recv_json()
        if not reply["success"]:
            self._signal_receiver.stop()
            self._signal_receiver.join(2)
            self._signal_receiver = None
            raise RuntimeError(f"INIT_SIGNALS_HANDLING failed")
        logger.info("Successfully initialized signal handling")

    def _finalizer():
        self.socket.close()
        if self._signal_receiver:
            self._signal_receiver.stop()
            self._signal_receiver.join(2)

    finalize(self, _finalizer)

RPCProperty Link

RPCProperty(fget=None, fset=None, fdel=None, doc=None, notify=None)

Bases: property

Declares a property for RPC usage. When the notify signal is emitted the proxy on the RPC Client is updated. The signal provided in notify must be emitted in the setter when the property changes.

Source code in plantimager/commons/RPC.py
93
94
95
def __init__(self, fget=None, fset=None, fdel=None, doc=None, notify: RPCSignal = None):
    super().__init__(fget=fget, fset=fset, fdel=fdel, doc=doc)
    self._notifier = notify

RPCServer Link

RPCServer(context, url)

RPCServer to use in combination with RPCClient. The server holds the concrete implementation of an interface that is made available on the network. To create an RPCServer create a class inheriting from an interface and RPCServer. Callable methods must be decorated using RPCServer.register_method_buffer or RPCServer.register_method_json. The server is also capable of sending signals defined by the RPCSignal class and proxying properties defined with RPCProperty.

Parameters:

Name Type Description Default
context Context

ZMQ context used to make the various sockets necessary for communicating with the client.

required
url str

Url where the RPCServer should listen. It should be of the form "tcp://" where ip is one of the local network interface ip which must be accessible from the client.

required

Attributes:

Name Type Description
context Context

ZMQ context used to make the various sockets necessary for communicating with the client.

url str

Url where the RPCServer is opened.

port int

Port where the RPCServer is opened.

name int

Name of the RPCServer as given by the deviceregistry once registered

uuid str

Unique identifier of this RPCServer given by the registry once registered..

peer_addr str

Ip address of the client once connected.

Source code in plantimager/commons/RPC.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
def __init__(self, context: zmq.Context, url: str):
    """
    RPCServer to use in combination with RPCClient.
    The server holds the concrete implementation of an interface that is made available on the network.
    To create an RPCServer create a class inheriting from an interface and RPCServer. Callable
    methods must be decorated using `RPCServer.register_method_buffer` or `RPCServer.register_method_json`.
    The server is also capable of sending signals defined by the RPCSignal class and proxying properties
    defined with RPCProperty.

    Parameters
    ----------
    context: zmq.Context
        ZMQ context used to make the various sockets necessary for communicating with the client.
    url: str
        Url where the RPCServer should listen. It should be of the form "tcp://<ip>" where ip is one
        of the local network interface ip which must be accessible from the client.

    Attributes
    ----------
    context: zmq.Context
        ZMQ context used to make the various sockets necessary for communicating with the client.
    url: str
        Url where the RPCServer is opened.
    port: int
        Port where the RPCServer is opened.
    name: int
        Name of the RPCServer as given by the deviceregistry once registered
    uuid: str
        Unique identifier of this RPCServer given by the registry once registered..
    peer_addr: str
        Ip address of the client once connected.

    """
    super().__init__()

    self.uuid: None | str = None
    self._json_methods: dict[str, Callable] = dict()
    self._buffer_methods: dict[str, Callable] = dict()
    self._rpc_properties: dict[str, property] = dict()
    self._signals: dict[str, RPCSignal] = dict()

    # replacing class attribute signals from other bases by instance signals
    for base in self.__class__.__bases__:
        for key, value in base.__dict__.items():
            if isinstance(value, RPCSignal):
                setattr(self, key, copy.deepcopy(value))
                self._signals[key] = getattr(self, key)

    for key, val in self.__class__.__dict__.items():
        if inspect.isfunction(val) and hasattr(val, "_is_json_method") and val._is_json_method:
            self._json_methods[key] = val
        elif inspect.isfunction(val) and hasattr(val, "_is_buffer_method") and val._is_buffer_method:
            self._buffer_methods[key] = val
        elif isinstance(val, RPCProperty):
            self._rpc_properties[key] = val
        elif isinstance(val, RPCSignal):
            # replacing class attribute signal with instance specific copy
            setattr(self, key, copy.deepcopy(val))
            self._signals[key] = getattr(self, key)

    self.context: zmq.Context = context
    self.url: str = url
    self._socket: zmq.Socket[zmq.REP] = context.socket(zmq.REP)

    protocol, ip_addr, port = url_parser.match(url).groups()
    if port:
        self.port = int(port)
        self._socket.bind(url)
    else:
        self.port = self._socket.bind_to_random_port(url, 10000, 12000)
    logger.debug(f"RPCServer of type {type(self)} bound to {ip_addr} on port {self.port}")

    self.name = ""
    self.registry_addr = ""
    self._signal_socket: zmq.Socket[zmq.REQ] | None = None
    self.peer_addr: str | None = None

    self._stop = False

    finalize(self, self._finalize)

register_method_buffer staticmethod Link

register_method_buffer(timeout=10000)

Registers this method as remote callable procedure which will transmit its output as a buffer-like object as well as a buffer_info dictionary.

method may take any input which will be serialized via json and must output a 2-tuple: (memoryview or bytes, dict).

Parameters:

Name Type Description Default
timeout int | None

Specifies how much time in ms a client is expected to wait for the method to finish. If None, waits indefinitely.

10000

Returns:

Name Type Description
decorator Callable[Callable[..., tuple[memoryview | bytes, dict]], Callable[..., tuple[memoryview | bytes, dict]]]
Source code in plantimager/commons/RPC.py
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
@staticmethod
def register_method_buffer(timeout: int | None = 10000):
    """
    Registers this method as remote callable procedure which will transmit its output as a buffer-like object
    as well as a buffer_info dictionary.

    `method` may take any input which will be serialized via json and must output a 2-tuple:
    (memoryview or bytes, dict).


    Parameters
    ----------
    timeout: int | None
        Specifies how much time in ms a client is expected to wait for the method to finish. If None, waits indefinitely.

    Returns
    -------
    decorator: Callable[Callable[..., tuple[memoryview|bytes, dict]], Callable[..., tuple[memoryview|bytes, dict]]]

    """
    if inspect.isfunction(timeout):
        timeout._is_buffer_method = True
        timeout._timeout = 10000
        return timeout
    def _decorator(method: Callable[..., tuple[memoryview|bytes, dict]]):
        method._is_buffer_method = True
        method._timeout = timeout
        return method
    return _decorator

register_method_json staticmethod Link

register_method_json(timeout=10000)

Registers this method as remote callable procedure which will transmit its output via json. It is advised to only send basic types and containers as output (int, float, str, bool, list, tuple, dict, ...) Arguments are also serialized via json.

Parameters:

Name Type Description Default
timeout int | None

Specifies how much time in ms a client is expected to wait for the method to finish. If None, waits indefinitely.

10000

Returns:

Name Type Description
decorator Callable[Callable[..., Any], Callable[..., Any]]
Source code in plantimager/commons/RPC.py
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
@staticmethod
def register_method_json(timeout: int | None = 10000):
    """
    Registers this method as remote callable procedure which will transmit its output via json.
    It is advised to only send basic types and containers as output (int, float, str, bool, list, tuple, dict, ...)
    Arguments are also serialized via json.

    Parameters
    ----------
    timeout: int | None
        Specifies how much time in ms a client is expected to wait for the method to finish. If None, waits indefinitely.

    Returns
    -------
    decorator: Callable[Callable[..., Any], Callable[..., Any]]

    """
    if inspect.isfunction(timeout):
        timeout._is_json_method = True
        timeout._timeout = 10000
        return timeout
    def _decorator(method: Callable[..., Any]):
        method._is_json_method = True
        method._timeout = timeout
        return method
    return _decorator

register_to_registry Link

register_to_registry(type_, name, registry_url)

Register this RPCServer to the registry at registry_address as a device of type type_ and name name.

Note: The name not be accepted as is and may be modified by the registry to avoid duplicate. This method returns the accepted name of this device.

Parameters:

Name Type Description Default
type_ str

Name of the device type.

required
name str

Proposed name of the device.

required
registry_url str

Url of the device registry. Must have the form "tcp://:" if the registry uses tcp

required

Returns:

Name Type Description
accepted_name str

Name of this device as accepted by the registry.

Source code in plantimager/commons/RPC.py
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
def register_to_registry(self, type_: str, name: str, registry_url: str) -> str:
    """
    Register this RPCServer to the registry at `registry_address` as a device of type `type_` and name `name`.

    Note: The name not be accepted as is and may be modified by the registry to avoid duplicate. This method
    returns the accepted name of this device.

    Parameters
    ----------
    type_: str
        Name of the device type.
    name: str
        Proposed name of the device.
    registry_url: str
        Url of the device registry. Must have the form "tcp://<ip>:<port>" if the registry uses tcp

    Returns
    -------
    accepted_name: str
        Name of this device as accepted by the registry.

    """
    logger.debug(f"Register device {name} of type {type_} to {registry_url}")
    self.name, self.uuid = register_device(
        self.context, type_,
        f"{self.url}:{self.port}",
        name, registry_url,
        overwrite=True,
    )
    self.registry_addr = registry_url if self.name else ""
    return self.name

serve_forever Link

serve_forever()

Starts serving requests for this RPCServer.

Source code in plantimager/commons/RPC.py
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
def serve_forever(self):
    """
    Starts serving requests for this RPCServer.

    Returns
    -------

    """
    self._stop = False
    while not self._stop:
        notify_watchdog()
        if self._socket.poll(100, zmq.POLLIN) == 0:
            continue
        request = self._socket.recv_json()
        match request["event"]:
            case RPCEvents.FIND_PEER_ADDRESS:
                logger.info("Finding peer address")
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                port = random.randint(49152, 65535)
                s.bind(("", port))  # may crash if unlucky
                s.listen(1)
                self._socket.send_json({"success": True, "port": port})
                c, addr_info = s.accept()
                self.peer_addr = addr_info[0]
                c.shutdown(socket.SHUT_RDWR)
                c.close()
                s.shutdown(socket.SHUT_RDWR)
                s.close()
                del s
                logger.info("Connected to peer at address: {}".format(self.peer_addr))
            case RPCEvents.GET_INVENTORY:
                logger.info("Sending inventory")
                response = {
                    "json_methods": {name: func._timeout for name, func in self._json_methods.items()},
                    "buffer_methods": {name: func._timeout for name, func in self._buffer_methods.items()},
                    "signals": list(self._signals.keys()),
                    "properties": list(self._rpc_properties.keys()),
                    "name": self.name,
                }
                logger.info(response)
                self._socket.send_json(response)
            case RPCEvents.INIT_SIGNALS_HANDLING:
                logger.info("Initializing signal handling")
                address, port = request["address"], request["port"]
                self._signal_socket = self.context.socket(zmq.REQ)
                self._signal_socket.connect(f"tcp://{address}:{port}")
                for sig_name, sig in self._signals.items():
                    sig.connect(partial(self._send_signal, sig_name))
                self._socket.send_json({"success": True})
                logger.info("Successfully initialized signal handling")
            case RPCEvents.METHOD_CALL:
                method: str = request["method"]
                params: dict[str, bytes] = request["params"]
                logger.info(f"Executing {method} with params {params}")
                if method in self._json_methods:
                    self._exec_json(self._json_methods[method], params)
                elif method in self._buffer_methods:
                    self._exec_buffer(self._buffer_methods[method], params)
                else:
                    logger.error(f"Method {method} not implemented")
                    self._socket.send_json({"success": False, "error": f"Method {method} not implemented"})
            case RPCEvents.PROPERTY_GET:
                prop_name: str = request["property"]
                logger.debug(f"Getting property {prop_name}")
                try:
                    val = getattr(self, prop_name)
                except Exception as e:
                    logger.error(f"Failed to execute 'get property' for property {prop_name}")
                    traceback_str = traceback.format_exc(limit=10)
                    print(traceback_str, file=sys.stderr)
                    self._socket.send_json({"success": False, "error": str(e), "traceback": traceback_str})
                else:
                    self._socket.send_json({
                        "success": True, "value": val,
                    })
            case RPCEvents.PROPERTY_SET:
                prop_name: str = request["property"]
                val = request["value"]
                logger.debug(f"Setting property {prop_name} to {val}")
                try:
                    setattr(self, prop_name, val)
                except Exception as e:
                    logger.error(f"Failed to execute 'set property' for property {prop_name}")
                    traceback_str = traceback.format_exc(limit=10)
                    print(traceback_str, file=sys.stderr)
                    self._socket.send_json({"success": False, "error": str(e), "traceback": traceback_str})
                else:
                    self._socket.send_json({
                        "success": True,
                    })
            case RPCEvents.STOP_SERVER:
                self._socket.send_json({"success": True})
                break
    logger.info("Server stopped")

RPCSignalReceiver Link

RPCSignalReceiver(context, url, signals)

Bases: Thread

A receiver thread for RPCClient to listen for and receive RPC Signals from the server and in turn emit the same signals in the proxy.

Source code in plantimager/commons/RPC.py
106
107
108
109
110
111
112
113
114
def __init__(self, context: zmq.Context, url: str, signals: dict[str, RPCSignal]):
    super().__init__()
    self.context = context
    self.url = url
    self._stop = False
    self.signals = signals
    self.socket: zmq.Socket = context.socket(zmq.REP)
    self.port = self.socket.bind_to_random_port(url)
    finalize(self.socket, self.socket.close)