Skip to content

scan

plantimager.controller.scanner.scan Link

Define the Scan class which handles the scanning process by using the grbl CNC and DataUploader to upload data to PlantDB

DataUploader Link

DataUploader(db_client, queue_size)

Worker thread that uploads scan data from a queue to a plantdb instance.

This class manages asynchronous uploads of image data to a PlantDB database using a thread pool. It limits the number of concurrent uploads and provides a queue mechanism to handle backpressure.

Attributes:

Name Type Description
db_client PlantDBClient

Client for communicating with the PlantDB database

jobs set[Future]

Set of active upload job futures

pool ThreadPoolExecutor

Thread pool for executing upload tasks

queue_size int

Maximum number of concurrent upload jobs

Notes
  • Uses ThreadPoolExecutor with 4 worker threads for parallel uploads
  • Automatically shuts down the thread pool when the object is garbage collected
  • Blocks new uploads when the queue is full until a slot becomes available

Examples:

>>> import numpy as np
>>> from plantdb.client.plantdb_client import PlantDBClient
>>> from plantimager.controller.scanner.scanner import DataUploader
>>> from plantimager.controller.scanner.hal import DataItem
>>> client = PlantDBClient("http://localhost:5000")
>>> uploader = DataUploader(client, queue_size=10)
>>> # Generate random RGB data (values from 0-255)
>>> rgb_data = np.random.randint(0, 256, (200, 150, 3), dtype=np.uint8)
>>> metadata = {'description': 'Random RGB test image', 'author': 'John Doe'}
>>> data_item = DataItem(rgb_data, metadata)
>>> uploader.upload("scan_001", "images", data_item)

Initialize the DataUploader with a database client and queue size.

Parameters:

Name Type Description Default
db_client PlantDBClient

Client for communicating with the PlantDB database

required
queue_size int

Maximum number of concurrent upload jobs

required
Source code in plantimager/controller/scanner/scan.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(self, db_client: PlantDBClient, queue_size: int):
    """Initialize the DataUploader with a database client and queue size.

    Parameters
    ----------
    db_client : plantdb.client.plantdb_client.PlantDBClient
        Client for communicating with the PlantDB database
    queue_size : int
        Maximum number of concurrent upload jobs
    """
    self.db_client = db_client
    self.jobs: set[Future] = set()  # Track active upload jobs
    self.pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix=__name__)
    self.queue_size = queue_size
    # Ensure thread pool is properly shut down when object is garbage collected
    finalize(self, self.pool.shutdown, wait=True, cancel_futures=True)

upload Link

upload(scan_id, fileset, data)

Upload data file to specified fileset of scan_id in a plantdb instance.

This method queues an upload job to the thread pool. If the queue is full, it blocks until a slot becomes available.

Parameters:

Name Type Description Default
scan_id str

Identifier of the scan in the database

required
fileset str

Identifier of the fileset within the scan

required
data DataItem

Data item containing the image and metadata to upload

required
Notes

This method may block indefinitely if the upload queue is full and no upload jobs are completing.

Source code in plantimager/controller/scanner/scan.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def upload(self, scan_id: str, fileset: str, data: DataItem):
    """Upload data file to specified fileset of scan_id in a plantdb instance.

    This method queues an upload job to the thread pool. If the queue is full,
    it blocks until a slot becomes available.

    Parameters
    ----------
    scan_id : str
        Identifier of the scan in the database
    fileset : str
        Identifier of the fileset within the scan
    data : DataItem
        Data item containing the image and metadata to upload

    Notes
    -----
    This method may block indefinitely if the upload queue is full and
    no upload jobs are completing.
    """
    # Wait if number of jobs submitted is greater than queue_size
    if len(self.jobs) >= self.queue_size:
        cf_wait(self.jobs, return_when=FIRST_COMPLETED)  # blocking

    # Submit the upload job to the thread pool
    future_: Future = self.pool.submit(self._upload, scan_id, fileset, data)
    self.jobs.add(future_)  # Track the job
    future_.add_done_callback(self.jobs.remove)  # Remove job when done

Scan Link

Scan(cnc, db_client, cameras, path, scan_id, config, parent=None)

Bases: QObject

Represents a scanner for executing complex scanning operations, including positioning, capturing data from multiple cameras, and uploading results to a remote database.

This class integrates hardware (such as CNC controllers and cameras) and software components to automate the scanning and data gathering workflow. It also provides progress tracking and metadata management, making it suitable for dynamic and complex scanning needs.

Attributes:

Name Type Description
progressChanged Signal(int)

Signal emitted when the scan progress changes.

maxProgressChanged Signal(int)

Signal emitted when the maximum progress value changes.

cnc AbstractCNC

The CNC controller for managing scanner movements.

db_url str

URL of the database to upload scan data.

db_client PlantDBClient

Client used for database operations.

uploader DataUploader

Uploads data items to the database.

cameras list of PiCameraComm

List of camera objects used for capturing images.

path Path

Path object representing the set of positions for the scan.

scan_id str

Unique identifier for the scan.

fileset str

Name of the fileset for storing images.

_progress int

Tracks the current scan progress.

_max_progress int

Maximum possible progress value, derived from the length of the scan path.

config dict

Configuration for the scan, including metadata and hardware settings.

dataset_metadata Any

Metadata related to the biological or scanned object.

hw_metadata Any

Metadata related to the hardware used.

_start_time int or None

Start time of the scan in UNIX timestamp format, or None if not started.

_stop_time int or None

Stop time of the scan in UNIX timestamp format, or None if not completed.

Notes
  • This class is designed for integration with QML and emits progress signals.
  • Attributes such as _progress and _max_progress track scanning operations.
  • The scanning process is highly configurable using config — ensure it contains the necessary fields such as metadata and hardware details.
Source code in plantimager/controller/scanner/scan.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def __init__(self, cnc: AbstractCNC, db_client: PlantDBClient, cameras: list[PiCameraComm], path: Path, scan_id: str,
             config: dict[str, Any], parent=None):
    super().__init__(parent)
    self.cnc = cnc
    self.db_client = db_client
    self.uploader = DataUploader(self.db_client, queue_size=DB_UPLOADER_QUEUE_SIZE)
    self.cameras = cameras
    self.path = path
    self.scan_id = scan_id
    self.fileset = "images"
    self._progress = 0
    self._max_progress = len(path)
    self.config = config
    # Store metadata for the scan
    self.dataset_metadata = config["Metadata"]["object"]  # Biological metadata
    self.hw_metadata = config["Metadata"]["hardware"]  # Hardware metadata

    # time
    self._start_time: int | None = None
    self._stop_time: int | None = None

get_position Link

get_position()

Get the current position of the scanner.

Returns:

Type Description
Pose

Current position as a 5D pose (x, y, z, pan, tilt)

Notes

The Z and tilt values are always set to 0 as the scanner only supports 3D movement (X, Y, and pan rotation).

Source code in plantimager/controller/scanner/scan.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def get_position(self) -> Pose:
    """Get the current position of the scanner.

    Returns
    -------
    Pose
        Current position as a 5D pose (x, y, z, pan, tilt)

    Notes
    -----
    The Z and tilt values are always set to 0 as the scanner only
    supports 3D movement (X, Y, and pan rotation).
    """
    # Get raw position from CNC controller
    x, y, z = self.cnc.get_position()
    # Convert to Pose object (z is pan, tilt is always 0)
    pose = Pose(x, y, 0, pan=z, tilt=0)
    return pose

get_target_pose Link

get_target_pose(x)

Calculate the target pose from a path element.

This method creates a target pose by combining the current position with the specified values from the path element. For any attribute not specified in the path element, the current position value is used.

Parameters:

Name Type Description Default
x PathElement

Path element containing the desired position attributes

required

Returns:

Type Description
Pose

The calculated target pose

Notes

If a coordinate in the path element is None, the current position value for that coordinate is used instead.

Source code in plantimager/controller/scanner/scan.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
def get_target_pose(self, x: PathElement) -> Pose:
    """Calculate the target pose from a path element.

    This method creates a target pose by combining the current position
    with the specified values from the path element. For any attribute
    not specified in the path element, the current position value is used.

    Parameters
    ----------
    x : PathElement
        Path element containing the desired position attributes

    Returns
    -------
    Pose
        The calculated target pose

    Notes
    -----
    If a coordinate in the path element is None, the current position
    value for that coordinate is used instead.
    """
    # Get current position
    pos = self.get_position()
    # Create new pose
    target_pose = Pose()

    # For each attribute (x, y, z, pan, tilt)
    for attr in pos.attributes():
        if getattr(x, attr) is None:
            # If path element doesn't specify this attribute, use current position
            setattr(target_pose, attr, getattr(pos, attr))
        else:
            # Otherwise use the value from the path element
            setattr(target_pose, attr, getattr(x, attr))

    return target_pose

grab Link

grab(idx, metadata, camera)

Capture an image from a camera and upload it to the database.

This method captures an image from the specified camera, adds metadata, creates a DataItem, and uploads it to the database.

Parameters:

Name Type Description Default
idx int

Identifier for the data item to create

required
metadata dict

Dictionary of metadata to associate with the image

required
camera PiCameraComm

Camera object to use for capturing the image

required

Returns:

Type Description
DataItem

The created data item containing the image and metadata

Notes

The method performs these steps: 1. Capture image from camera 2. Update metadata with image information 3. Create a DataItem 4. Upload the data to the database

Examples:

>>> metadata = {"camera_name": "cam1", "approximate_pose": [100, 100, 0, 45, 0]}
>>> data_item = scan.grab(1, metadata, camera)
Source code in plantimager/controller/scanner/scan.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def grab(self, idx: int, metadata: dict, camera: PiCameraComm) -> DataItem:
    """Capture an image from a camera and upload it to the database.

    This method captures an image from the specified camera, adds metadata,
    creates a DataItem, and uploads it to the database.

    Parameters
    ----------
    idx : int
        Identifier for the data item to create
    metadata : dict
        Dictionary of metadata to associate with the image
    camera : PiCameraComm
        Camera object to use for capturing the image

    Returns
    -------
    DataItem
        The created data item containing the image and metadata

    Notes
    -----
    The method performs these steps:
    1. Capture image from camera
    2. Update metadata with image information
    3. Create a DataItem
    4. Upload the data to the database

    Examples
    --------
    >>> metadata = {"camera_name": "cam1", "approximate_pose": [100, 100, 0, 45, 0]}
    >>> data_item = scan.grab(1, metadata, camera)
    """
    # Capture image from camera
    image_future = camera.getImage(lores=False)
    buffer, buffer_info = image_future.result()  # Wait for image capture to complete

    # Update metadata with image information from camera
    metadata.update(buffer_info)

    # Create data item with image and metadata
    data = DataItem(idx, buffer, image_ext=buffer_info["format"], metadata=metadata)

    # Upload data to database
    self.uploader.upload(scan_id=self.scan_id, fileset=self.fileset, data=data)

    return data

max_progress Link

max_progress()

Get the maximum scan progress value.

Returns:

Type Description
int

Maximum progress value

Notes

This property is exposed to QML and notifies via maxProgressChanged signal.

Source code in plantimager/controller/scanner/scan.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
@Property(int, notify=maxProgressChanged)
def max_progress(self) -> int:
    """Get the maximum scan progress value.

    Returns
    -------
    int
        Maximum progress value

    Notes
    -----
    This property is exposed to QML and notifies via maxProgressChanged signal.
    """
    return self._max_progress

progress Link

progress()

Get the current scan progress.

Returns:

Type Description
int

Current progress value

Notes

This property is exposed to QML and notifies via progressChanged signal.

Source code in plantimager/controller/scanner/scan.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
@Property(int, notify=progressChanged)
def progress(self) -> int:
    """Get the current scan progress.

    Returns
    -------
    int
        Current progress value

    Notes
    -----
    This property is exposed to QML and notifies via progressChanged signal.
    """
    return self._progress

scan Link

scan()

Execute the complete scanning process.

This method performs a full scan by: 1. Validating that all required components are available 2. Creating the scan and fileset in the database 3. Following the scan path and capturing images at each position 4. Uploading all captured images with metadata

Raises:

Type Description
RuntimeError

If any required component is missing

Notes
  • Validates all prerequisites before starting
  • Creates scan and fileset in the database
  • Follows the scan path, moving the CNC to each position
  • Captures images from all cameras at each position
  • Uploads images with position and camera metadata
  • Uses a thread pool for parallel image capture
  • Updates progress throughout the scan
Source code in plantimager/controller/scanner/scan.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
def scan(self) -> None:
    """Execute the complete scanning process.

    This method performs a full scan by:
    1. Validating that all required components are available
    2. Creating the scan and fileset in the database
    3. Following the scan path and capturing images at each position
    4. Uploading all captured images with metadata

    Raises
    ------
    RuntimeError
        If any required component is missing

    Notes
    -----
    - Validates all prerequisites before starting
    - Creates scan and fileset in the database
    - Follows the scan path, moving the CNC to each position
    - Captures images from all cameras at each position
    - Uploads images with position and camera metadata
    - Uses a thread pool for parallel image capture
    - Updates progress throughout the scan

    """
    # Validate all required components are available
    if not self.config: raise RuntimeError("Config not set for scan")
    if not self.path: raise RuntimeError("Path not set for scan")
    if not self.db_client: raise RuntimeError("DB client not set for scan")
    if not self.uploader: raise RuntimeError("Uploader not set for scan")
    if not self.scan_id: raise RuntimeError("Scan id not set for scan")
    if not self.cameras: raise RuntimeError("No Cameras connected")

    # Update metadata if using dummy CNC
    if isinstance(self.cnc, DummyCNC):
        self.hw_metadata["name"] = "DummyCNC"

    self._start_time = time.time()
    time_info = {
        "timezone": TZ,
        "start_time": datetime.fromtimestamp(self._start_time).astimezone().isoformat(timespec="seconds"),
        "stop_time": None
    }
    self.config.update(time_info)

    # Create the scan on the remote database
    try:
        # Combine dataset and hardware metadata
        self.db_client.create_scan(self.scan_id, metadata=self.config)
    except RequestException as e:
        logger.error(f"{e}")
    except ValueError as e:
        logger.error(f"{e}")
        logger.error(f"Scan {self.scan_id} already exists in plantdb")

    # Create the image fileset on the remote database
    try:
        self.db_client.create_fileset(self.fileset, self.scan_id)
    except RequestException as e:
        logger.error(f"{e}")
    except ValueError as e:
        logger.error(f"{e}")
        logger.error(f"Fileset {self.fileset} already exists for scan {self.scan_id}")

    # Execute the scan using a thread pool for parallel image capture
    with ThreadPoolExecutor(max_workers=4) as executor:
        shot_id = 0  # Initialize shot counter
        self._progress = 0  # Reset progress

        # Follow each point in the scan path
        for x in self.path:
            # Update progress
            self._progress += 1
            self.progressChanged.emit(self._progress)

            # Calculate and move to the target position
            pose = self.get_target_pose(x)
            self.set_position(pose)

            # Get actual arm position after movement
            arm_pose = self.get_position()
            jobs = []  # List to track image capture jobs

            # Capture images from each camera
            for camera in self.cameras:
                name: str = camera.name
                # Skip cameras not in config
                if name not in self.config:
                    logger.debug(f"Camera {name} not in config, skipping")
                    continue

                # Get camera parameters and calculate camera position
                camera_param = self.config[name]
                camera_offset = Pose(**camera_param["offset"])
                camera_pose = arm_pose + camera_offset  # Apply offset to arm position

                # Prepare metadata for this shot
                metadata = {
                    **camera_param,  # Include all camera parameters
                    "camera_name": name,
                    "approximate_pose": [camera_pose.x, camera_pose.y, camera_pose.z, camera_pose.pan,
                                         camera_pose.tilt],  # Camera position
                    "shot_id": shot_id,  # Unique ID for this shot
                }

                # Submit image capture job to thread pool
                jobs.append(executor.submit(self.grab, shot_id, metadata, camera))

            shot_id += 1
            # Wait for all image captures to complete before moving to next position
            cf_wait(jobs, return_when=ALL_COMPLETED)

    # Move the arm back close to origin
    #self.cnc.moveto(10, 10,-10)
    time.sleep(1)
    #self.cnc.home()
    self.cnc.moveto(20, 20, 45)

    self._stop_time = time.time()
    self.db_client.update_scan_metadata(self.scan_id, {
        "stop_time": datetime.fromtimestamp(self._stop_time).astimezone().isoformat(timespec="seconds"),
    })
    logger.info(f"Scan completed")  # Log completion

set_position Link

set_position(pose)

Set the position of the scanner from a 5D Pose.

Parameters:

Name Type Description Default
pose Pose

Target position as a 5D pose (x, y, z, pan, tilt)

required
Notes

Only X, Y, and pan values are used; Z and tilt are ignored.

Examples:

>>> pose = Pose(100, 100, 0, pan=45, tilt=0)
>>> scan.set_position(pose)
Source code in plantimager/controller/scanner/scan.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
def set_position(self, pose: Pose) -> None:
    """Set the position of the scanner from a 5D Pose.

    Parameters
    ----------
    pose : Pose
        Target position as a 5D pose (x, y, z, pan, tilt)

    Notes
    -----
    Only X, Y, and pan values are used; Z and tilt are ignored.

    Examples
    --------
    >>> pose = Pose(100, 100, 0, pan=45, tilt=0)
    >>> scan.set_position(pose)
    """
    logger.info(f"Moving arm to {pose}")
    # Move CNC to the specified position (only x, y, and pan are used)
    self.cnc.moveto(pose.x, pose.y, pose.pan)
    time.sleep(0.1)  # Wait for movement to complete as grbl returns a bit early