Skip to content

controller_proxy

plantimager.webui.controller_proxy Link

Controller Proxy Module

This module provides a bridge between the local controller device functionality and remote RPC communication, allowing applications to interact with controller hardware through a network connection.

Key Features
  • Singleton pattern implementation ensuring only one controller proxy exists
  • Transparent proxying of controller device methods to remote systems
  • ZeroMQ-based RPC communication for reliable client-server interactions
  • Consistent interface matching the local controller device API
Usage Examples
>>> import zmq
>>> from plantimager.webui.controller_proxy import RPCController
>>>
>>> # Initialize the controller proxy
>>> context = zmq.Context()
>>> controller = RPCController(context, "tcp://localhost:14567")
>>>
>>> # Access the singleton instance elsewhere in your code
>>> same_controller = RPCController.instance()
>>>
>>> # Use controller methods as if they were local
>>> controller.some_method()  # This will be executed on the remote system

RPCController Link

RPCController(context, url)

Bases: ControllerDevice, RPCClient

Proxy of controller and RPC server.

A singleton class that serves as a proxy between a controller device and an RPC server. Only one instance of this class can exist at a time, and it can be accessed through the instance class method.

Parameters:

Name Type Description Default
context Context

The ZeroMQ context to use for communication.

required
url str

The URL to connect to for RPC communication.

required

Attributes:

Name Type Description
_instance RPCController or None

Class variable that holds the single instance of the class.

Notes

This class implements the Singleton design pattern. The first time it is instantiated, it creates a new instance and stores it in the _instance class variable. Subsequent instantiations return the existing instance.

The class inherits from both ControllerDevice and RPCClient to provide controller functionality over an RPC connection.

See Also

plantimager.commons.controller_device.ControllerDevice : Base class for controller device functionality. plantimager.commons.RPC.RPCClient : Base class for RPC client functionality.

Examples:

>>> import zmq
>>> from plantimager.webui.controller_proxy import RPCController
>>> context = zmq.Context()
>>> controller = RPCController(context, "tcp://localhost:14567")
>>> # This returns the same instance
>>> controller2 = RPCController(context, "tcp://localhost:14567")
>>> controller is controller2
True
>>> # Get the singleton instance
>>> RPCController.instance()
RuntimeError: Controller proxy not initialized.

Initialize the RPCController.

Parameters:

Name Type Description Default
context Context

The ZeroMQ context to use for communication.

required
url str

The URL to connect to for RPC communication.

required
Source code in plantimager/webui/controller_proxy.py
109
110
111
112
113
114
115
116
117
118
119
120
def __init__(self, context: zmq.Context, url: str):
    """Initialize the RPCController.

    Parameters
    ----------
    context : zmq.Context
        The ZeroMQ context to use for communication.
    url : str
        The URL to connect to for RPC communication.
    """
    RPCClient.__init__(self, context, url, timeout=120_000)
    self.__class__._instance = self

__new__ Link

__new__(context, url)

Create a new instance or return the existing instance.

Parameters:

Name Type Description Default
context Context

The ZeroMQ context to use for communication.

required
url str

The URL to connect to for RPC communication.

required

Returns:

Type Description
RPCController

The singleton instance of this class.

Source code in plantimager/webui/controller_proxy.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def __new__(cls, context: zmq.Context, url: str):
    """Create a new instance or return the existing instance.

    Parameters
    ----------
    context : zmq.Context
        The ZeroMQ context to use for communication.
    url : str
        The URL to connect to for RPC communication.

    Returns
    -------
    plantimager.commons.controller_device.RPCController
        The singleton instance of this class.
    """
    if cls._instance is None:
        instance = super(RPCController, cls).__new__(cls)
        return instance
    return cls._instance

camera_names abstractmethod Link

camera_names()

Return the list of camera names.

Source code in plantimager/commons/controller_device.py
65
66
67
68
69
@RPCProperty(notify=cameraNamesChanged)
@abstractmethod
def camera_names(self) -> list[str]:
    """Return the list of camera names."""
    pass

execute Link

execute(method_name, params)

Execute a remote method on the RPC server.

Parameters:

Name Type Description Default
method_name str

The name of the method to execute.

required
params dict

A dictionary containing the args and kwargs for the method.

required

Returns:

Type Description
tuple

A 2-tuple (success, result). If success is True, result contains the return value of the method. If False, result is a tuple containing the (error_message, traceback).

Raises:

Type Description
TimeoutError

If the server does not respond within the configured timeout.

Source code in plantimager/commons/RPC.py
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
def execute(self, method_name: str, params: dict) -> tuple[bool, object]:
    """
    Execute a remote method on the RPC server.

    Parameters
    ----------
    method_name : str
        The name of the method to execute.
    params : dict
        A dictionary containing the `args` and `kwargs` for the method.

    Returns
    -------
    tuple
        A 2-tuple `(success, result)`. If `success` is True, `result`
        contains the return value of the method. If False, `result`
        is a tuple containing the `(error_message, traceback)`.

    Raises
    ------
    TimeoutError
        If the server does not respond within the configured timeout.
    """
    package = {
        "event": RPCEvents.METHOD_CALL,
        "method": method_name,
        "params": params,
    }
    if self.socket.poll(timeout=1000, flags=zmq.POLLOUT) == 0:
        logger.error(f"Proxy of {self._interface} at {self.url} did not respond")
        raise TimeoutError(f"Proxy of {self._interface} at {self.url} did not respond")
    logger.debug(f"Executing {package}")
    self.socket.send_json(package, flags=zmq.NOBLOCK)

    if method_name in self._json_methods:
        if self.socket.poll(timeout=self._json_methods[method_name], flags=zmq.POLLIN) == 0:
            logger.error(f"Proxy of {self._interface} at {self.url} did not respond")
            raise TimeoutError(f"Proxy of {self._interface} at {self.url} did not respond")
        reply =  self.socket.recv_json()
        if reply["success"]:
            return True, reply["result"]
        else:
            return False, (reply["error"], reply["traceback"])
    elif method_name in self._buffer_methods:
        if self.socket.poll(timeout=self._buffer_methods[method_name], flags=zmq.POLLIN) == 0:
            logger.error(f"Proxy of {self._interface} at {self.url} did not respond")
            raise TimeoutError(f"Proxy of {self._interface} at {self.url} did not respond")
        reply_frames: list[zmq.Frame] = self.socket.recv_multipart(copy=False)
        buffer_info = json.loads(reply_frames[0].bytes)
        if "error" in buffer_info:
            return False, (buffer_info["error"], buffer_info["traceback"])
        else:
            return True, (reply_frames[1].buffer, buffer_info)
    return False, (Warning(f"Unknown method {method_name}"), "")

instance classmethod Link

instance()

Get the singleton instance of the RPCController.

Returns:

Type Description
RPCController

The singleton instance of this class.

Raises:

Type Description
RuntimeError

If the controller proxy has not been initialized yet.

Source code in plantimager/webui/controller_proxy.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
@classmethod
def instance(cls) -> "RPCController":
    """Get the singleton instance of the RPCController.

    Returns
    -------
    plantimager.commons.controller_device.RPCController
        The singleton instance of this class.

    Raises
    ------
    RuntimeError
        If the controller proxy has not been initialized yet.
    """
    if cls._instance is None:
        raise RuntimeError("Controller proxy not initialized.")
    return cls._instance

max_progress abstractmethod Link

max_progress()

Return the maximum progress of the scan.

Source code in plantimager/commons/controller_device.py
53
54
55
56
57
@RPCProperty(notify=maxProgressChanged)
@abstractmethod
def max_progress(self) -> int:
    """Return the maximum progress of the scan."""
    pass

progress abstractmethod Link

progress()

Return the current progress of the scan.

Source code in plantimager/commons/controller_device.py
47
48
49
50
51
@RPCProperty(notify=progressChanged)
@abstractmethod
def progress(self) -> int:
    """Return the current progress of the scan."""
    pass

ready_to_scan abstractmethod Link

ready_to_scan()

Return whether the controller is ready to start a scan.

Source code in plantimager/commons/controller_device.py
59
60
61
62
63
@RPCProperty(notify=readyToScanChanged)
@abstractmethod
def ready_to_scan(self) -> bool:
    """Return whether the controller is ready to start a scan."""
    pass

register_interface classmethod Link

register_interface(interface)

Class decorator to bind an interface to an RPCClient subclass.

This decorator inspects the provided interface and dynamically proxies its methods and properties so that calls are forwarded over the network to the RPC server.

Parameters:

Name Type Description Default
interface type

The interface class defining the methods and properties to proxy.

required

Returns:

Type Description
callable

A class decorator that applies the proxy logic.

Raises:

Type Description
RuntimeError

If the decorated class does not inherit from both interface and RPCClient.

Source code in plantimager/commons/RPC.py
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
@classmethod
def register_interface(cls, interface: type):
    """
    Class decorator to bind an interface to an RPCClient subclass.

    This decorator inspects the provided `interface` and dynamically
    proxies its methods and properties so that calls are forwarded
    over the network to the RPC server.

    Parameters
    ----------
    interface : type
        The interface class defining the methods and properties to proxy.

    Returns
    -------
    callable
        A class decorator that applies the proxy logic.

    Raises
    ------
    RuntimeError
        If the decorated class does not inherit from both `interface`
        and `RPCClient`.
    """
    def _decorator(target_cls: type):
        if interface not in target_cls.__bases__ or cls not in target_cls.__bases__:
            raise RuntimeError(f"{target_cls} must inherit from {interface} and {cls}.")
        for key, val in interface.__dict__.items():
            if inspect.isfunction(val) and not (key.startswith("__") and key.endswith("__")):
                logger.debug(f"registering method {key} in {target_cls}")
                func = decorate(val, cls._method_proxy)
                func.__isabstractmethod__ = False  # Counts as en actual implementation
                setattr(target_cls, key, func)
            elif isinstance(val, RPCSignal):
                pass # nothing to do at this stage
            elif isinstance(val, RPCProperty):
                fget = wraps(val.fget)(partial(cls._property_getter_proxy, property_name=key))
                fset = wraps(val.fset)(partial(cls._property_setter_proxy, property_name=key))
                prop = RPCProperty(fget=fget, fset=fset, fdel=val.fdel, doc=val.__doc__, notify=val._notifier)
                setattr(target_cls, key, prop)
        # abstract methods have been implemented
        target_cls.__abstractmethods__ = frozenset()
        target_cls._interface = interface.__name__
        return target_cls
    return _decorator

run_scan abstractmethod Link

run_scan()

Start the scan.

Source code in plantimager/commons/controller_device.py
42
43
44
45
@abstractmethod
def run_scan(self):
    """Start the scan."""
    pass

set_api_token abstractmethod Link

set_api_token(token)

Set the session token to use for authenticated requests.

Source code in plantimager/commons/controller_device.py
37
38
39
40
@abstractmethod
def set_api_token(self, token: str):
    """Set the session token to use for authenticated requests."""
    pass

set_config abstractmethod Link

set_config(config)

Send a configuration dictionary to the controller.

Source code in plantimager/commons/controller_device.py
27
28
29
30
@abstractmethod
def set_config(self, config: dict):
    """Send a configuration dictionary to the controller."""
    pass

set_dataset_name abstractmethod Link

set_dataset_name(name)

Set the name of the dataset to be created.

Source code in plantimager/commons/controller_device.py
32
33
34
35
@abstractmethod
def set_dataset_name(self, name: str):
    """Set the name of the dataset to be created."""
    pass

set_db_url abstractmethod Link

set_db_url(url)

Set the database URL for the controller.

Source code in plantimager/commons/controller_device.py
22
23
24
25
@abstractmethod
def set_db_url(self, url: str):
    """Set the database URL for the controller."""
    pass

stop_server Link

stop_server()

Request the connected RPC server to shut down.

Sends a non-blocking STOP_SERVER event to the remote server. If the server responds, it logs the reply.

Source code in plantimager/commons/RPC.py
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
def stop_server(self):
    """
    Request the connected RPC server to shut down.

    Sends a non-blocking `STOP_SERVER` event to the remote server.
    If the server responds, it logs the reply.
    """
    logger.info(f"Stopping server {self.url}")
    if self.socket.poll(timeout=1000, flags=zmq.POLLOUT) == 0:
        logger.info(f"Server {self.url} could not be joined (might already be dead)")
        return
    self.socket.send_json({
        "event": RPCEvents.STOP_SERVER
    }, flags=zmq.NOBLOCK)
    if self.socket.poll(timeout=1000, flags=zmq.POLLIN) == 0:
        logger.info(f"Server {self.url} did not respond (might already be dead)")
        return
    reply = self.socket.recv_json()
    logger.debug(f"Got stop reply {reply}")