Skip to content

sync

plantdb.client.sync Link

Synchronization Utility for PlantDB Databases

This module provides a robust synchronization mechanism for File System Databases (FSDB) in the PlantDB project, enabling seamless data transfer between local and remote database instances.

Key FeaturesLink

  • Support for local and remote database synchronization
  • Intelligent file transfer with modification time and size checking
  • Automatic locking and unlocking of source and target databases
  • Recursive directory synchronization
  • SSH-based remote file transfer using SFTP
  • HTTP(S) REST API synchronization with archive transfer
  • Support for FSDB instances, local paths, and various remote protocols
  • Error handling and validation of database paths

Usage ExamplesLink

Local Path to Local PathLink

Create two test databases, a source with a dataset and a target without dataset, then sync them.

>>> from plantdb.client.sync import FSDBSync
>>> from plantdb.commons.test_database import test_database
>>> # Create a test source database
>>> db_source = test_database()
>>> db_source.connect()
>>> print(db_source.list_scans())  # list scans in the source database
['real_plant_analyzed']
>>> db_source._unlock_db()  # Unlock the database (remove lock file)
>>> # Create a test target database
>>> db_target = test_database(dataset=None)
>>> db_target.connect()
>>> print(db_target.list_scans())  # verify that target database is empty
[]
>>> db_target._unlock_db()  # Unlock the database (remove lock file)
>>> # Sync target database with source
>>> db_sync = FSDBSync(db_source.path(), db_target.path())
>>> db_sync.sync()
>>> # List directories in the target database
>>> print([i for i in db_target.path().iterdir()])
>>> # Reload target database to ensure that the new scans are available
>>> db_target.reload()
>>> print(db_target.list_scans())  # verify that target database contains 1 new scan
['real_plant_analyzed']
>>> db_source.disconnect()  # Remove the test database
>>> db_target.disconnect()  # Remove the test database

FSDB instance to Local PathLink
>>> from plantdb.client.sync import FSDBSync
>>> from plantdb.commons.test_database import test_database
>>> # Create a test source database
>>> db_source = test_database()
>>> # Create a test target database
>>> db_target = test_database(dataset=None)
>>> # FSDB instance to local path
>>> db_sync = FSDBSync(db_source, db_target.path())
>>> db_sync.sync()
>>> # Connect to the target and list scans
>>> db_target.connect()
>>> print(db_target.list_scans())  # verify that target now has the 'real_plant_analyzed' dataset
['real_plant_analyzed']
>>> db_target.disconnect()
>>> db_source.disconnect()
Local path to HTTP REST APILink
>>> from plantdb.client.sync import FSDBSync
>>> from plantdb.server.test_rest_api import TestRestApiServer
>>> from plantdb.client.rest_api import list_scan_names
>>> from plantdb.commons.test_database import test_database
>>> # Create a test source database with all 5 test dataset
>>> db_source = test_database("all")
>>> # Create a test target database
>>> db_target = TestRestApiServer(test=True, empty=True)
>>> db_target.start()
Test REST API server started at http://127.0.0.1:5000
>>> server_cfg = db_target.get_server_config()
>>> # Use REST API to list scans and verify target DB is empty
>>> scans_list = list_scan_names(**server_cfg)
>>> print(scans_list)
[]
>>> # Asynchronous sync target database with source
>>> db_sync = FSDBSync(db_source.path(), db_target.get_base_url())
>>> thread = db_sync.sync(thread=True)  # Returns immediately
>>> print(f"Database synchronization in progress: {db_sync.is_synchronizing()}")
>>> # Monitor progress
>>> import time
>>> while db_sync.is_synchronizing():
...     progress = db_sync.get_sync_progress()
...     print(f"Progress: {progress:.1f}%")
...     time.sleep(0.3)
>>> # Wait for completion
>>> db_sync.wait_for_sync()
>>> # Check for errors
>>> if db_sync.get_sync_error():
...     print(f"Sync failed: {db_sync.get_sync_error()}")
>>> else:
...     print("Sync completed successfully")
>>> # Use REST API endpoint to refresh scans
>>> from plantdb.client.rest_api import refresh
>>> refresh(**server_cfg)
>>> # Use REST API to list scans and verify target DB contains the new scans
>>> scans_list = list_scan_names(**server_cfg)
>>> print(scans_list)
['arabidopsis000', 'real_plant', 'real_plant_analyzed', 'virtual_plant', 'virtual_plant_analyzed']
>>> db_target.stop()
SSH to local with credentialsLink
>>> db_sync = FSDBSync("ssh://server.example.com:/data/sourcedb", "/local/target/db")
>>> db_sync.set_ssh_credentials("server.example.com", "username", "password")
>>> db_sync.sync()

FSDBSync Link

FSDBSync(source, target)

Class for sync between two FSDB databases with support for multiple protocols.

It supports synchronization between different types of database sources and targets: - FSDB instances or local paths: Uses file system operations - HTTP(S) URLs: Uses REST API with archive transfer - SSH URLs: Uses SFTP protocol

It checks for the validity of both source and target and locks the databases during sync.

Attributes:

Name Type Description
source_str str or FSDB or Path

Source database specification

target_str str or FSDB or Path

Target database specification

source dict

Source database description

target dict

Target database description

ssh_clients dict

Dictionary of SSH clients, keyed by host name.

ssh_credentials dict

Dictionary of SSH credentials, keyed by host name.

synchronizing bool

Flag indicating whether a sync operation is currently in progress.

sync_progress float

Progress of the sync operation as a percentage.

Examples:

>>> from plantdb.client.sync import FSDBSync
>>> from plantdb.commons.test_database import test_database
>>> # Example: Create two test databases, a source with a dataset and a target without dataset, then sync them.
>>> # Create a test source database
>>> db_source = test_database()
>>> db_source.connect()
>>> print(db_source.list_scans())  # list scans in the source database
['real_plant_analyzed']
>>> db_source._unlock_db()  # Unlock the database (remove lock file)
>>> # Create a test target database
>>> db_target = test_database(dataset=None)
>>> db_target.connect()
>>> print(db_target.list_scans())  # verify that target database is empty
[]
>>> db_target._unlock_db()  # Unlock the database (remove lock file)
>>> # Sync target database with source
>>> db_sync = FSDBSync(db_source.path(), db_target.path())
>>> db_sync.sync()
>>> # List directories in the target database
>>> print([i for i in db_target.path().iterdir()])
>>> # Reload target database to ensure that the new scans are available
>>> db_target.reload()
>>> print(db_target.list_scans())  # verify that target database contains 1 new scan
['real_plant_analyzed']
>>> db_source.disconnect()  # Remove the test database
>>> db_target.disconnect()  # Remove the test database

Class constructor.

Parameters:

Name Type Description Default
source str or Path or FSDB

Source database specification: - FSDB instance or local path for file system databases - HTTP(S) URL for REST API access - SSH URL (ssh://server.example.com:/path/to/db) for SFTP access

required
target str or Path or FSDB

Target database specification (same format as source)

required
Source code in plantdb/client/sync.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def __init__(self, source, target):
    """Class constructor.

    Parameters
    ----------
    source : str or pathlib.Path or FSDB
        Source database specification:
        - FSDB instance or local path for file system databases
        - HTTP(S) URL for REST API access
        - SSH URL (ssh://server.example.com:/path/to/db) for SFTP access
    target : str or pathlib.Path or FSDB
        Target database specification (same format as source)
    """
    self.source_str = source
    self.target_str = target
    self.source = _parse_database_spec(source)
    self.target = _parse_database_spec(target)
    self.ssh_clients = {}  # Store SSH connections
    self.ssh_credentials = {}  # Store SSH credentials
    self.sync_progress = 0.
    self._sync_thread = None
    self._sync_error = None

__del__ Link

__del__()

Ensure unlocking on object destruction.

Source code in plantdb/client/sync.py
248
249
250
251
252
253
def __del__(self):
    """Ensure unlocking on object destruction."""
    try:
        self.unlock()
    except:
        return

get_sync_error Link

get_sync_error()

Get any error that occurred during threaded synchronization.

Returns:

Type Description
Exception or None

The exception that occurred during sync, or None if no error.

Source code in plantdb/client/sync.py
419
420
421
422
423
424
425
426
427
def get_sync_error(self):
    """Get any error that occurred during threaded synchronization.

    Returns
    -------
    Exception or None
        The exception that occurred during sync, or `None` if no error.
    """
    return self._sync_error

get_sync_progress Link

get_sync_progress()

Get the current synchronization progress.

Returns:

Type Description
float

Progress as a percentage (0.0 to 100.0).

Source code in plantdb/client/sync.py
409
410
411
412
413
414
415
416
417
def get_sync_progress(self):
    """Get the current synchronization progress.

    Returns
    -------
    float
        Progress as a percentage (0.0 to 100.0).
    """
    return self.sync_progress

is_synchronizing Link

is_synchronizing()

Check if synchronization is currently in progress.

Returns:

Type Description
bool

True if synchronization is in progress, False otherwise.

Source code in plantdb/client/sync.py
399
400
401
402
403
404
405
406
407
def is_synchronizing(self):
    """Check if synchronization is currently in progress.

    Returns
    -------
    bool
        `True` if synchronization is in progress, `False` otherwise.
    """
    return self._sync_thread is not None

lock Link

lock()

Lock both source and target databases prior to sync.

Source code in plantdb/client/sync.py
272
273
274
275
276
def lock(self):
    """Lock both source and target databases prior to sync."""
    for db in [self.source, self.target]:
        if db["type"] == "ssh":
            self._lock_remote(db)

set_ssh_credentials Link

set_ssh_credentials(host, username, password=None)

Set SSH credentials for a specific host.

Parameters:

Name Type Description Default
host str

The hostname for SSH connection

required
username str

SSH username

required
password str

SSH password. If not provided, will be prompted during connection.

None
Source code in plantdb/client/sync.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def set_ssh_credentials(self, host, username, password=None):
    """Set SSH credentials for a specific host.

    Parameters
    ----------
    host : str
        The hostname for SSH connection
    username : str
        SSH username
    password : str, optional
        SSH password. If not provided, will be prompted during connection.
    """
    self.ssh_credentials[host] = {
        'username': username,
        'password': password
    }

sync Link

sync(source_scans=None, thread=False)

Sync the two DBs using the appropriate strategy based on database types.

Parameters:

Name Type Description Default
source_scans list of str

List of scan IDs to synchronize. If None, synchronizes all scans.

None
thread bool

If True, run synchronization in a separate thread. Default is False. When running in a thread, use is_synchronizing() to check status and get_sync_progress() to track progress.

False

Returns:

Type Description
Thread or None

If thread=True, returns the Thread object. Otherwise, returns None.

Source code in plantdb/client/sync.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
def sync(self, source_scans=None, thread=False):
    """Sync the two DBs using the appropriate strategy based on database types.

    Parameters
    ----------
    source_scans : list of str, optional
        List of scan IDs to synchronize. If `None`, synchronizes all scans.
    thread : bool, optional
        If `True`, run synchronization in a separate thread. Default is `False`.
        When running in a thread, use `is_synchronizing()` to check status
        and `get_sync_progress()` to track progress.

    Returns
    -------
    threading.Thread or None
        If `thread=True`, returns the `Thread` object. Otherwise, returns `None`.
    """
    if thread:
        if self.is_synchronizing():
            raise RuntimeError("Synchronization is already in progress")

        self._sync_thread = threading.Thread(target=self._sync_worker, args=(source_scans,))
        self._sync_thread.daemon = True
        self._sync_thread.start()
        return self._sync_thread
    else:
        self._sync_worker(source_scans)

unlock Link

unlock()

Unlock both source and target databases after sync.

Source code in plantdb/client/sync.py
280
281
282
283
284
def unlock(self):
    """Unlock both source and target databases after sync."""
    for db in [self.source, self.target]:
        if db["type"] == "ssh":
            self._unlock_remote(db)

wait_for_sync Link

wait_for_sync(timeout=None)

Wait for the synchronization thread to complete.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait in seconds. If None, wait indefinitely.

None

Returns:

Type Description
bool

True if the thread is completed, False if timeout was reached.

Source code in plantdb/client/sync.py
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
def wait_for_sync(self, timeout=None):
    """Wait for the synchronization thread to complete.

    Parameters
    ----------
    timeout : float, optional
        Maximum time to wait in seconds. If `None`, wait indefinitely.

    Returns
    -------
    bool
        `True` if the thread is completed, `False` if timeout was reached.
    """
    if self._sync_thread is not None:
        self._sync_thread.join(timeout)
        return not self._sync_thread.is_alive()
    return True

config_from_url Link

config_from_url(url)

Parse URL into a configuration dictionary.

Examples:

>>> from plantdb.client.sync import config_from_url
>>> config = config_from_url("http://localhost:5014/api/")
>>> print(config)
{'protocol': 'http', 'host': 'localhost', 'port': 5014, 'prefix': '/api/', 'ssl': False}
Source code in plantdb/client/sync.py
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
def config_from_url(url):
    """Parse URL into a configuration dictionary.

    Examples
    --------
    >>> from plantdb.client.sync import config_from_url
    >>> config = config_from_url("http://localhost:5014/api/")
    >>> print(config)
    {'protocol': 'http', 'host': 'localhost', 'port': 5014, 'prefix': '/api/', 'ssl': False}
    """
    parsed_url = urllib3.util.parse_url(url)
    config = {}
    config["protocol"] = parsed_url.scheme.lower()
    config["host"] = parsed_url.host
    config["port"] = parsed_url.port
    config["prefix"] = parsed_url.path
    config["ssl"] = True if "https" in url else False
    return config