Skip to content

auth

plantdb.commons.fsdb.auth Link

Group dataclass Link

Group(name, users, created_at, created_by, description=None)

Represents a group of users for sharing scan datasets.

Groups allow multiple users to collaborate on scan datasets. When a scan is shared with a group, all members of that group get CONTRIBUTOR role for that specific dataset.

Attributes:

Name Type Description
name str

The unique name of the group.

users Set[str]

A set of usernames that belong to this group.

description (Optional[str], optional)

An optional description of the group's purpose.

created_at datetime

The timestamp when the group was created.

created_by str

The username of the user who created the group.

Examples:

>>> from datetime import datetime, timezone
>>> from plantdb.commons.fsdb.auth import Group
>>> group = Group(
...     name="plant_researchers",
...     users={"alice", "bob"},
...     description="Plant research team",
...     created_at=datetime.now(timezone.utc),
...     created_by="alice"
... )
>>> print(group.name)
plant_researchers
>>> "alice" in group.users
True

add_user Link

add_user(username)

Add a user to the group.

Parameters:

Name Type Description Default
username str

The username to add to the group.

required

Returns:

Type Description
bool

True if the user was added (wasn't already in the group), False otherwise.

Source code in plantdb/commons/fsdb/auth.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
def add_user(self, username: str) -> bool:
    """
    Add a user to the group.

    Parameters
    ----------
    username : str
        The username to add to the group.

    Returns
    -------
    bool
        True if the user was added (wasn't already in the group), False otherwise.
    """
    if username in self.users:
        return False
    self.users.add(username)
    return True

has_user Link

has_user(username)

Check if a user is a member of the group.

Parameters:

Name Type Description Default
username str

The username to check.

required

Returns:

Type Description
bool

True if the user is a member of the group, False otherwise.

Source code in plantdb/commons/fsdb/auth.py
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
def has_user(self, username: str) -> bool:
    """
    Check if a user is a member of the group.

    Parameters
    ----------
    username : str
        The username to check.

    Returns
    -------
    bool
        True if the user is a member of the group, False otherwise.
    """
    return username in self.users

remove_user Link

remove_user(username)

Remove a user from the group.

Parameters:

Name Type Description Default
username str

The username to remove from the group.

required

Returns:

Type Description
bool

True if the user was removed (was in the group), False otherwise.

Source code in plantdb/commons/fsdb/auth.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
def remove_user(self, username: str) -> bool:
    """
    Remove a user from the group.

    Parameters
    ----------
    username : str
        The username to remove from the group.

    Returns
    -------
    bool
        True if the user was removed (was in the group), False otherwise.
    """
    if username not in self.users:
        return False
    self.users.remove(username)
    return True

GroupManager Link

GroupManager(groups_file='groups.json')

Manages groups for the RBAC system.

This class handles the creation, modification, and persistence of user groups. Groups are stored in a JSON file and loaded/saved as needed.

Attributes:

Name Type Description
groups_file str

Path to the JSON file where groups are stored.

groups Dict[str, Group]

Dictionary mapping group names to Group objects.

Initialize the GroupManager.

Parameters:

Name Type Description Default
groups_file str

Path to the JSON file for storing groups. Defaults to "groups.json".

'groups.json'
Source code in plantdb/commons/fsdb/auth.py
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
def __init__(self, groups_file: str = "groups.json"):
    """
    Initialize the GroupManager.

    Parameters
    ----------
    groups_file : str, optional
        Path to the JSON file for storing groups. Defaults to "groups.json".
    """
    self.groups_file = Path(groups_file)
    self.groups: Dict[str, Group] = {}
    self.logger = get_logger(__class__.__name__)
    self._load_groups()

add_user_to_group Link

add_user_to_group(group_name, username)

Add a user to a group.

Parameters:

Name Type Description Default
group_name str

The name of the group.

required
username str

The username to add to the group.

required

Returns:

Type Description
bool

True if the user was added, False if the group doesn't exist or user was already in group.

Source code in plantdb/commons/fsdb/auth.py
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
def add_user_to_group(self, group_name: str, username: str) -> bool:
    """
    Add a user to a group.

    Parameters
    ----------
    group_name : str
        The name of the group.
    username : str
        The username to add to the group.

    Returns
    -------
    bool
        True if the user was added, False if the group doesn't exist or user was already in group.
    """
    group = self.get_group(group_name)
    if not group:
        return False

    result = group.add_user(username)
    if result:
        self._save_groups()
    return result

create_group Link

create_group(name, creator, users=None, description=None)

Create a new group.

Parameters:

Name Type Description Default
name str

The unique name for the group.

required
creator str

The username of the user creating the group.

required
users Optional[Set[str]]

Initial set of users to add to the group. Creator is automatically added.

None
description Optional[str]

Optional description of the group.

None

Returns:

Type Description
Group

The created group object.

Raises:

Type Description
ValueError

If a group with the same name already exists.

Source code in plantdb/commons/fsdb/auth.py
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
def create_group(self, name: str, creator: str, users: Optional[Set[str]] = None,
                 description: Optional[str] = None) -> Group:
    """
    Create a new group.

    Parameters
    ----------
    name : str
        The unique name for the group.
    creator : str
        The username of the user creating the group.
    users : Optional[Set[str]], optional
        Initial set of users to add to the group. Creator is automatically added.
    description : Optional[str], optional
        Optional description of the group.

    Returns
    -------
    Group
        The created group object.

    Raises
    ------
    ValueError
        If a group with the same name already exists.
    """
    if name in self.groups:
        raise ValueError(f"Group '{name}' already exists")

    # Initialize users set and ensure creator is included
    if users is None:
        users = set()
    users.add(creator)

    group = Group(
        name=name,
        users=users,
        description=description,
        created_at=datetime.now(),
        created_by=creator
    )

    self.groups[name] = group
    self._save_groups()
    return group

delete_group Link

delete_group(name)

Delete a group.

Parameters:

Name Type Description Default
name str

The name of the group to delete.

required

Returns:

Type Description
bool

True if the group was deleted, False if it didn't exist.

Source code in plantdb/commons/fsdb/auth.py
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
def delete_group(self, name: str) -> bool:
    """
    Delete a group.

    Parameters
    ----------
    name : str
        The name of the group to delete.

    Returns
    -------
    bool
        True if the group was deleted, False if it didn't exist.
    """
    if name not in self.groups:
        return False

    del self.groups[name]
    self._save_groups()
    return True

get_group Link

get_group(name)

Get a group by name.

Parameters:

Name Type Description Default
name str

The name of the group to retrieve.

required

Returns:

Type Description
Optional[Group]

The group object if it exists, None otherwise.

Source code in plantdb/commons/fsdb/auth.py
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
def get_group(self, name: str) -> Optional[Group]:
    """
    Get a group by name.

    Parameters
    ----------
    name : str
        The name of the group to retrieve.

    Returns
    -------
    Optional[Group]
        The group object if it exists, None otherwise.
    """
    return self.groups.get(name)

get_user_groups Link

get_user_groups(username)

Get all groups that a user belongs to.

Parameters:

Name Type Description Default
username str

The username to search for.

required

Returns:

Type Description
List[Group]

A list of Group objects that the user belongs to.

Source code in plantdb/commons/fsdb/auth.py
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
def get_user_groups(self, username: str) -> List[Group]:
    """
    Get all groups that a user belongs to.

    Parameters
    ----------
    username : str
        The username to search for.

    Returns
    -------
    List[Group]
        A list of Group objects that the user belongs to.
    """
    user_groups = []
    for group in self.groups.values():
        if group.has_user(username):
            user_groups.append(group)
    return user_groups

group_exists Link

group_exists(name)

Check if a group exists.

Parameters:

Name Type Description Default
name str

The name of the group to check.

required

Returns:

Type Description
bool

True if the group exists, False otherwise.

Source code in plantdb/commons/fsdb/auth.py
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
def group_exists(self, name: str) -> bool:
    """
    Check if a group exists.

    Parameters
    ----------
    name : str
        The name of the group to check.

    Returns
    -------
    bool
        True if the group exists, False otherwise.
    """
    return name in self.groups

list_groups Link

list_groups()

Get a list of all groups.

Returns:

Type Description
List[Group]

A list of all Group objects.

Source code in plantdb/commons/fsdb/auth.py
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
def list_groups(self) -> List[Group]:
    """
    Get a list of all groups.

    Returns
    -------
    List[Group]
        A list of all Group objects.
    """
    return list(self.groups.values())

remove_user_from_group Link

remove_user_from_group(group_name, username)

Remove a user from a group.

Parameters:

Name Type Description Default
group_name str

The name of the group.

required
username str

The username to remove from the group.

required

Returns:

Type Description
bool

True if the user was removed, False if the group doesn't exist or user wasn't in group.

Source code in plantdb/commons/fsdb/auth.py
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
def remove_user_from_group(self, group_name: str, username: str) -> bool:
    """
    Remove a user from a group.

    Parameters
    ----------
    group_name : str
        The name of the group.
    username : str
        The username to remove from the group.

    Returns
    -------
    bool
        True if the user was removed, False if the group doesn't exist or user wasn't in group.
    """
    group = self.get_group(group_name)
    if not group:
        return False

    result = group.remove_user(username)
    if result:
        self._save_groups()
    return result

JWTSessionManager Link

JWTSessionManager(session_timeout=3600, max_concurrent_sessions=10, secret_key=None)

Bases: SessionManager

Manages user sessions with expiration and validation.

This class provides methods to create, validate, invalidate, and cleanup expired sessions. Each session is associated with a unique identifier (session_id) and has an expiry time based on the session timeout duration specified during initialization.

Attributes:

Name Type Description
sessions Dict[str, dict]

A dictionary storing active sessions. Each key is a session ID, and each value is a dictionary containing: - 'username': str - The user associated with this session. - 'created_at': datetime - When the session was created. - 'last_accessed': datetime - Last time the session was accessed. - 'expires_at': datetime - Expiry time of the session.

session_timeout int

Duration in seconds after which a session expires.

max_concurrent_sessions int

The maximum number of concurrent sessions to allow.

secret_key str

The session manager secret key to use for authentication.

logger Logger

The logger to use for this session manager.

Manage user sessions with timeout.

Parameters:

Name Type Description Default
session_timeout int

The duration for which the session should be valid in seconds. A session that exceeds this duration will be considered expired and removed. Defaults to 3600 seconds.

3600
max_concurrent_sessions int

The maximum number of concurrent sessions to allow. Defaults to 10.

10
secret_key str

Secret key for JWT signing. If None, generates a random key.

None
Source code in plantdb/commons/fsdb/auth.py
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
def __init__(self, session_timeout: int = 3600, max_concurrent_sessions: int = 10, secret_key: str = None):
    """
    Manage user sessions with timeout.

    Parameters
    ----------
    session_timeout : int, optional
        The duration for which the session should be valid in seconds.
        A session that exceeds this duration will be considered expired and removed.
        Defaults to ``3600`` seconds.
    max_concurrent_sessions : int, optional
        The maximum number of concurrent sessions to allow.
        Defaults to ``10``.
    secret_key : str, optional
        Secret key for JWT signing. If None, generates a random key.
    """
    super().__init__(session_timeout, max_concurrent_sessions)
    self.secret_key = secret_key or secrets.token_urlsafe(32)

cleanup_expired_sessions Link

cleanup_expired_sessions()

Remove expired sessions from tracking.

Source code in plantdb/commons/fsdb/auth.py
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
def cleanup_expired_sessions(self) -> None:
    """Remove expired sessions from tracking."""
    current_time = datetime.now()
    expired_sessions = [
        jti for jti, session in self.sessions.items()
        if current_time > session['expires_at']
    ]
    for jti in expired_sessions:
        del self.sessions[jti]
    return

create_session Link

create_session(username)

Create a new session for a user.

If the user already has an active session, it returns the existing session ID. Otherwise, it creates a new session and returns its ID.

Parameters:

Name Type Description Default
username str

The unique identifier of the user for whom to create a session.

required

Returns:

Name Type Description
session_id Union[str, None]

The ID of the created or existing session.

Notes

Creates a JWT token following RFC 7519 standards with registered claims: - iss (issuer): Identifies the token issuer - sub (subject): The username of the authenticated user - aud (audience): Intended audience for the token - exp (expiration time): Token expiration timestamp - iat (issued at): Token creation timestamp - jti (JWT ID): Unique identifier for the token generated using secrets.token_urlsafe.

Source code in plantdb/commons/fsdb/auth.py
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
def create_session(self, username: str) -> Union[str, None]:
    """
    Create a new session for a user.

    If the user already has an active session, it returns the existing session ID.
    Otherwise, it creates a new session and returns its ID.

    Parameters
    ----------
    username : str
        The unique identifier of the user for whom to create a session.

    Returns
    -------
    session_id : Union[str, None]
        The ID of the created or existing session.

    Notes
    -----
    Creates a JWT token following RFC 7519 standards with registered claims:
    - iss (issuer): Identifies the token issuer
    - sub (subject): The username of the authenticated user
    - aud (audience): Intended audience for the token
    - exp (expiration time): Token expiration timestamp
    - iat (issued at): Token creation timestamp
    - jti (JWT ID): Unique identifier for the token generated using `secrets.token_urlsafe`.
    """
    if self._user_has_session(username):
        self.logger.warning(f"User '{username}' already has an active session!")
        return None

    if self.n_active_sessions() >= self.max_concurrent_sessions:
        self.logger.warning(
            f"Too any users currently active, reached max concurrent sessions limit ({self.max_concurrent_sessions})")
        return None

    # Create a JWT payload with registered claims
    now = datetime.now()
    exp_time = now + timedelta(seconds=self.session_timeout)
    jti = secrets.token_urlsafe(16)  # unique token ID for tracking

    try:
        # Generate JWT token
        jwt_token = self._create_token(username, jti, exp_time, now)
    except Exception as e:
        self.logger.error(f"Failed to create JWT token for {username}: {e}")
        return None

    # Track session for concurrent limit enforcement
    self.sessions[jti] = {
        'username': username,
        'created_at': now,
        'last_accessed': now,
        'expires_at': exp_time
    }
    self.logger.debug(f"Created JWT token for '{username}'")
    return jwt_token

invalidate_session Link

invalidate_session(jwt_token=None, jti=None)

Invalidate a session by removing it from tracking.

Parameters:

Name Type Description Default
jwt_token str

JWT token to invalidate

None
jti str

Token ID to invalidate directly

None

Returns:

Type Description
bool

True if the specified session was found and removed, False otherwise.

str

The username corresponding to the invalidated JWT token

Source code in plantdb/commons/fsdb/auth.py
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
def invalidate_session(self, jwt_token: str = None, jti: str = None) -> Tuple[bool, str | None]:
    """
    Invalidate a session by removing it from tracking.

    Parameters
    ----------
    jwt_token : str, optional
        JWT token to invalidate
    jti : str, optional
        Token ID to invalidate directly

    Returns
    -------
    bool
        `True` if the specified session was found and removed, `False` otherwise.
    str
        The username corresponding to the invalidated JWT token
    """
    if jwt_token:
        try:
            payload = self._payload_from_token(jwt_token)
            jti = payload.get('jti')
        except:
            return False, None

    if jti and jti in self.sessions:
        username = self.sessions[jti]['username']
        del self.sessions[jti]
        return True, username

    return False, None

n_active_sessions Link

n_active_sessions()

Returns the number of active sessions.

Cleans up expired sessions before counting and returns the number of remaining active sessions in the collection.

Returns:

Type Description
int

The number of currently active sessions.

See Also

cleanup_expired_sessions : Cleans up the expired sessions in the collection.

Source code in plantdb/commons/fsdb/auth.py
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
def n_active_sessions(self) -> int:
    """
    Returns the number of active sessions.

    Cleans up expired sessions before counting and returns the number
    of remaining active sessions in the collection.

    Returns
    -------
    int
        The number of currently active sessions.

    See Also
    --------
    cleanup_expired_sessions : Cleans up the expired sessions in the collection.
    """
    self.cleanup_expired_sessions()
    return len(self.sessions)

refresh_session Link

refresh_session(jwt_token)

Refresh a JWT token if it's still valid.

Parameters:

Name Type Description Default
jwt_token str

Current JWT token

required

Returns:

Type Description
str or None

New JWT token if refresh successful

Source code in plantdb/commons/fsdb/auth.py
2726
2727
2728
2729
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750
def refresh_session(self, jwt_token: str) -> Optional[str]:
    """
    Refresh a JWT token if it's still valid.

    Parameters
    ----------
    jwt_token : str
        Current JWT token

    Returns
    -------
    str or None
        New JWT token if refresh successful
    """
    session_data = self.validate_session(jwt_token)
    if not session_data:
        return None

    # Invalidate old session
    old_jti = session_data['jti']
    self.invalidate_session(jti=old_jti)

    # Create new session
    username = session_data['username']
    return self.create_session(username)

session_username Link

session_username(jwt_token)

Extract username from JWT token.

Parameters:

Name Type Description Default
jwt_token str

JWT token

required

Returns:

Type Description
str or None

Username if token is valid

Source code in plantdb/commons/fsdb/auth.py
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
def session_username(self, jwt_token: str) -> Optional[str]:
    """
    Extract username from JWT token.

    Parameters
    ----------
    jwt_token : str
        JWT token

    Returns
    -------
    str or None
        Username if token is valid
    """
    session_data = self.validate_session(jwt_token)
    return session_data['username'] if session_data else None

validate_session Link

validate_session(jwt_token)

Validate a JWT token and return user information.

Parameters:

Name Type Description Default
jwt_token str

The JWT token to validate.

required

Returns:

Type Description
dict or None

User information if valid, None if invalid/expired. Returns dictionary with: - username: The authenticated user - issued_at: When the token was issued - expires_at: When the token expires - jti: Unique token identifier - issuer: Token issuer - audience: Token audience

Source code in plantdb/commons/fsdb/auth.py
2611
2612
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
2636
2637
2638
2639
2640
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
def validate_session(self, jwt_token: str) -> Optional[Dict[str, Any]]:
    """
    Validate a JWT token and return user information.

    Parameters
    ----------
    jwt_token : str
        The JWT token to validate.

    Returns
    -------
    dict or None
        User information if valid, ``None`` if invalid/expired.
        Returns dictionary with:
        - username: The authenticated user
        - issued_at: When the token was issued
        - expires_at: When the token expires
        - jti: Unique token identifier
        - issuer: Token issuer
        - audience: Token audience
    """
    try:
        # Decode and verify JWT token with proper validation
        payload = self._payload_from_token(jwt_token)

    except jwt.ExpiredSignatureError:
        self.logger.error("JWT token expired")
        return None
    except jwt.InvalidAudienceError:
        self.logger.error("JWT token has invalid audience")
        return None
    except jwt.InvalidIssuerError:
        self.logger.error("JWT token has invalid issuer")
        return None
    except jwt.InvalidTokenError as e:
        self.logger.error(f"Invalid JWT token: {e}")
        return None
    except Exception as e:
        self.logger.error(f"Error validating JWT token: {e}")
        return None

    # Update last accessed time in session tracking
    jti = payload.get('jti')
    if jti and jti in self.sessions:
        self.sessions[jti]['last_accessed'] = datetime.now()

    return {
        'username': payload['sub'],  # subject is the username
        'issued_at': payload['iat'],  # issued at timestamp
        'expires_at': payload['exp'],  # expiration timestamp
        'jti': jti,  # JWT ID
        'issuer': payload['iss'],  # issuer
        'audience': payload['aud']  # audience
    }

Permission Link

Bases: Enum

A class representing the different permission levels for user actions.

This enumeration defines various permissions that can be assigned to users, controlling their access to different functionalities within the system.

Attributes:

Name Type Description
READ str

Permission to read scan data and metadata.

WRITE str

Permission to write filesets, files, and associated metadata in existing scan datasets.

CREATE str

Permission to create new scan datasets.

DELETE str

Permission to delete scan datasets, filesets, and files.

MANAGE_USERS str

Permission to add and remove users.

MANAGE_GROUPS str

Permission to manage groups (create, delete, modify membership).

Examples:

>>> from plantdb.commons.fsdb.auth import Permission
Accessing a specific permission:
>>> print(Permission.READ)
Permission.READ
Iterating through all permissions:
>>> for perm in Permission:
...     print(f"{perm.name}: {perm.value}")
READ: read
WRITE: write
CREATE: create
DELETE: delete
MANAGE_USERS: manage_users
MANAGE_GROUPS: manage_groups

RBACManager Link

RBACManager(users_file='users.json', groups_file='groups.json', max_login_attempts=3, lockout_duration=900)

Manage Role-Based Access Control (RBAC) for users and permissions.

This class provides methods to determine which permissions a user has, check if a user has a specific permission, and verify if a user can access or perform operations on scans based on their roles and permissions.

Attributes:

Name Type Description
GUEST_USERNAME str

The username for the default guest user account.

groups GroupManager

Manager for handling group operations.

Examples:

>>> from plantdb.commons.fsdb.auth import RBACManager
>>> from plantdb.commons.test_database import test_database
>>> db = test_database('all')
>>> db.connect()
>>> scan = db.get_scan('real_plant_analyzed')
>>> rbac = RBACManager()
>>> guest_user = rbac.create_guest_user('guest')
>>> can_read = rbac.can_access_scan(guest_user, scan.metadata, Permission.READ)

Initialize the RBACManager.

Parameters:

Name Type Description Default
users_file str

Path to the JSON file for storing users database. Default is 'users.json'.

'users.json'
groups_file str

Path to the JSON file for storing groups database. Defaults to "groups.json".

'groups.json'
max_login_attempts int

Maximum number of failed login attempts before account lockout. Defaults to 3.

3
lockout_duration int or timedelta

Account lockout duration in seconds. Defaults to 900 (15 minutes).

900
Source code in plantdb/commons/fsdb/auth.py
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
def __init__(self, users_file: str = 'users.json', groups_file: str = "groups.json", max_login_attempts=3,
             lockout_duration=900):
    """
    Initialize the RBACManager.

    Parameters
    ----------
    users_file : str, optional
        Path to the JSON file for storing users database. Default is 'users.json'.
    groups_file : str, optional
        Path to the JSON file for storing groups database. Defaults to "groups.json".
    max_login_attempts : int, optional
        Maximum number of failed login attempts before account lockout. Defaults to ``3``.
    lockout_duration : int or timedelta, optional
        Account lockout duration in seconds. Defaults to ``900`` (15 minutes).
    """
    self.logger = get_logger(__class__.__name__)
    self.users = UserManager(users_file, max_login_attempts, lockout_duration)
    self.groups = GroupManager(groups_file)

activate Link

activate(requesting_user, username)

Activate a user account - requires MANAGE_USERS permission

Source code in plantdb/commons/fsdb/auth.py
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
@requires_permission(Permission.MANAGE_USERS)
def activate(self, requesting_user: User, username: str) -> bool:
    """Activate a user account - requires MANAGE_USERS permission"""
    if not self.users.exists(username):
        self.logger.warning(f"Attempt to activate non-existent user: {username}")
        return False

    user = self.users.get_user(username)
    if user.is_active:
        self.users.activate(user)
        self.logger.info(f"User {username} activated by {requesting_user.username}")
        return True
    else:
        self.logger.info(f"User {username} was already active")
    return True

add_user_to_group Link

add_user_to_group(user, group_name, username_to_add)

Add a user to a group if the requesting user has permission.

Parameters:

Name Type Description Default
user User

The user requesting to add a member.

required
group_name str

The name of the group.

required
username_to_add str

The username to add to the group.

required

Returns:

Type Description
bool

True if the user was added successfully, False if permission denied or operation failed.

Source code in plantdb/commons/fsdb/auth.py
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
def add_user_to_group(self, user: User, group_name: str, username_to_add: str) -> bool:
    """
    Add a user to a group if the requesting user has permission.

    Parameters
    ----------
    user : User
        The user requesting to add a member.
    group_name : str
        The name of the group.
    username_to_add : str
        The username to add to the group.

    Returns
    -------
    bool
        True if the user was added successfully, False if permission denied or operation failed.
    """
    if not self.can_add_to_group(user, group_name):
        return False

    return self.groups.add_user_to_group(group_name, username_to_add)

can_access_scan Link

can_access_scan(user, scan_metadata, operation)

Check if a user can perform a specific operation on a scan dataset.

This method implements the complete access control logic including: - Owner-based access (owners get CONTRIBUTOR role for their scans) - Group-based access (shared group members get CONTRIBUTOR role) - Global role-based access (fallback to user's global role) - Admin override (admins can do everything)

Parameters:

Name Type Description Default
user User

The user requesting access.

required
scan_metadata dict

The scan metadata containing 'owner' and optional 'sharing' fields.

required
operation Permission

The operation/permission being requested.

required

Returns:

Type Description
bool

True if the user can perform the operation, False otherwise.

Source code in plantdb/commons/fsdb/auth.py
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
def can_access_scan(self, user: User, scan_metadata: dict, operation: Permission) -> bool:
    """
    Check if a user can perform a specific operation on a scan dataset.

    This method implements the complete access control logic including:
    - Owner-based access (owners get CONTRIBUTOR role for their scans)
    - Group-based access (shared group members get CONTRIBUTOR role)
    - Global role-based access (fallback to user's global role)
    - Admin override (admins can do everything)

    Parameters
    ----------
    user : User
        The user requesting access.
    scan_metadata : dict
        The scan metadata containing 'owner' and optional 'sharing' fields.
    operation : Permission
        The operation/permission being requested.

    Returns
    -------
    bool
        ``True`` if the user can perform the operation, ``False`` otherwise.
    """
    scan_permissions = self.get_scan_permissions(user, scan_metadata)
    return operation in scan_permissions

can_access_scan_by_owner Link

can_access_scan_by_owner(user, scan_owner, operation)

Legacy method for checking scan access by owner name only.

This method provides backward compatibility but doesn't support group sharing. For full RBAC support, use can_access_scan() with full metadata.

Parameters:

Name Type Description Default
user User

The user requesting access.

required
scan_owner str

The username of the scan owner.

required
operation Permission

The operation/permission being requested.

required

Returns:

Type Description
bool

True if the user can perform the operation, False otherwise.

Source code in plantdb/commons/fsdb/auth.py
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
def can_access_scan_by_owner(self, user: User, scan_owner: str, operation: Permission) -> bool:
    """
    Legacy method for checking scan access by owner name only.

    This method provides backward compatibility but doesn't support group sharing.
    For full RBAC support, use can_access_scan() with full metadata.

    Parameters
    ----------
    user : User
        The user requesting access.
    scan_owner : str
        The username of the scan owner.
    operation : Permission
        The operation/permission being requested.

    Returns
    -------
    bool
        True if the user can perform the operation, False otherwise.
    """
    # Create minimal metadata with just owner
    scan_metadata = {'owner': scan_owner}
    return self.can_access_scan(user, scan_metadata, operation)

can_add_to_group Link

can_add_to_group(user, group_name)

Check if a user can add members to a specific group.

Users can add members to a group if: 1. They are an admin (MANAGE_GROUPS permission), OR 2. They are a member of the group

Parameters:

Name Type Description Default
user User

The user requesting to add members.

required
group_name str

The name of the group.

required

Returns:

Type Description
bool

True if the user can add members to the group, False otherwise.

Source code in plantdb/commons/fsdb/auth.py
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
def can_add_to_group(self, user: User, group_name: str) -> bool:
    """
    Check if a user can add members to a specific group.

    Users can add members to a group if:
    1. They are an admin (MANAGE_GROUPS permission), OR
    2. They are a member of the group

    Parameters
    ----------
    user : User
        The user requesting to add members.
    group_name : str
        The name of the group.

    Returns
    -------
    bool
        True if the user can add members to the group, False otherwise.
    """
    # Admins can manage any group
    if self.has_permission(user, Permission.MANAGE_GROUPS):
        return True

    # Group members can add users to their groups
    group = self.groups.get_group(group_name)
    return group is not None and group.has_user(user.username)

can_create_group Link

can_create_group(user)

Check if a user can create groups.

Any user with CONTRIBUTOR role or higher can create groups.

Parameters:

Name Type Description Default
user User

The user to check.

required

Returns:

Type Description
bool

True if the user can create groups, False otherwise.

Source code in plantdb/commons/fsdb/auth.py
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
def can_create_group(self, user: User) -> bool:
    """
    Check if a user can create groups.

    Any user with CONTRIBUTOR role or higher can create groups.

    Parameters
    ----------
    user : User
        The user to check.

    Returns
    -------
    bool
        True if the user can create groups, False otherwise.
    """
    return self.has_permission(user, Permission.CREATE)

can_delete_group Link

can_delete_group(user)

Check if a user can delete a group.

Only users with the MANAGE_GROUPS permission can delete groups.

Parameters:

Name Type Description Default
user User

The user requesting to delete the group.

required

Returns:

Type Description
bool

True if the user can delete the group, False otherwise.

Source code in plantdb/commons/fsdb/auth.py
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
def can_delete_group(self, user: User) -> bool:
    """
    Check if a user can delete a group.

    Only users with the `MANAGE_GROUPS` permission can delete groups.

    Parameters
    ----------
    user : User
        The user requesting to delete the group.

    Returns
    -------
    bool
        ``True`` if the user can delete the group, ``False`` otherwise.
    """
    return self.has_permission(user, Permission.MANAGE_GROUPS)

can_manage_groups Link

can_manage_groups(user)

Check if a user can manage groups (create/delete groups).

Parameters:

Name Type Description Default
user User

The user to check.

required

Returns:

Type Description
bool

True if the user can manage groups, False otherwise.

Source code in plantdb/commons/fsdb/auth.py
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
def can_manage_groups(self, user: User) -> bool:
    """
    Check if a user can manage groups (create/delete groups).

    Parameters
    ----------
    user : User
        The user to check.

    Returns
    -------
    bool
        True if the user can manage groups, False otherwise.
    """
    return self.has_permission(user, Permission.MANAGE_GROUPS)

can_modify_scan_owner Link

can_modify_scan_owner(user, scan_metadata)

Check if a user can modify the 'owner' field of a scan.

Only admins can modify scan ownership.

Parameters:

Name Type Description Default
user User

The user requesting to modify ownership.

required
scan_metadata dict

The scan metadata.

required

Returns:

Type Description
bool

True if the user can modify the owner field, False otherwise.

Source code in plantdb/commons/fsdb/auth.py
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
def can_modify_scan_owner(self, user: User, scan_metadata: dict) -> bool:
    """
    Check if a user can modify the 'owner' field of a scan.

    Only admins can modify scan ownership.

    Parameters
    ----------
    user : User
        The user requesting to modify ownership.
    scan_metadata : dict
        The scan metadata.

    Returns
    -------
    bool
        True if the user can modify the owner field, False otherwise.
    """
    return self.has_permission(user, Permission.MANAGE_USERS)

can_modify_scan_sharing Link

can_modify_scan_sharing(user, scan_metadata)

Check if a user can modify the 'sharing' field of a scan.

Users can modify sharing if they have WRITE permission for the scan (i.e., they are the owner, in a shared group, or have global CONTRIBUTOR+ role).

Parameters:

Name Type Description Default
user User

The user requesting to modify sharing.

required
scan_metadata dict

The scan metadata.

required

Returns:

Type Description
bool

True if the user can modify the sharing field, False otherwise.

Source code in plantdb/commons/fsdb/auth.py
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
def can_modify_scan_sharing(self, user: User, scan_metadata: dict) -> bool:
    """
    Check if a user can modify the 'sharing' field of a scan.

    Users can modify sharing if they have WRITE permission for the scan
    (i.e., they are the owner, in a shared group, or have global CONTRIBUTOR+ role).

    Parameters
    ----------
    user : User
        The user requesting to modify sharing.
    scan_metadata : dict
        The scan metadata.

    Returns
    -------
    bool
        True if the user can modify the sharing field, False otherwise.
    """
    return self.can_access_scan(user, scan_metadata, Permission.WRITE)

create_group Link

create_group(user, name, users=None, description=None)

Create a new group if the user has permission.

Parameters:

Name Type Description Default
user User

The user creating the group.

required
name str

The unique name for the group.

required
users Optional[Set[str]]

Initial set of users to add to the group.

None
description Optional[str]

Optional description of the group.

None

Returns:

Type Description
Optional[Group]

The created group object if successful, None if permission denied.

Raises:

Type Description
ValueError

If a group with the same name already exists.

Source code in plantdb/commons/fsdb/auth.py
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
def create_group(self, user: User, name: str, users: Optional[Set[str]] = None,
                 description: Optional[str] = None) -> Optional[Group]:
    """
    Create a new group if the user has permission.

    Parameters
    ----------
    user : User
        The user creating the group.
    name : str
        The unique name for the group.
    users : Optional[Set[str]], optional
        Initial set of users to add to the group.
    description : Optional[str], optional
        Optional description of the group.

    Returns
    -------
    Optional[Group]
        The created group object if successful, None if permission denied.

    Raises
    ------
    ValueError
        If a group with the same name already exists.
    """
    if not self.can_create_group(user):
        return None

    return self.groups.create_group(name, user.username, users, description)

deactivate Link

deactivate(requesting_user, username)

Deactivate a user account - requires MANAGE_USERS permission

Source code in plantdb/commons/fsdb/auth.py
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
@requires_permission(Permission.MANAGE_USERS)
def deactivate(self, requesting_user: User, username: str) -> bool:
    """Deactivate a user account - requires MANAGE_USERS permission"""
    if not self.users.exists(username):
        self.logger.warning(f"Attempt to deactivate non-existent user: {username}")
        return False

    user = self.users.get_user(username)
    if user.is_active:
        self.users.deactivate(user)
        self.logger.info(f"User {username} deactivated by {requesting_user.username}")
        return True
    else:
        self.logger.info(f"User {username} was already inactive")
    return True

delete_group Link

delete_group(user, group_name)

Delete a group if the user has permission.

Parameters:

Name Type Description Default
user User

The user requesting to delete the group.

required
group_name str

The name of the group to delete.

required

Returns:

Type Description
bool

True if the group was deleted successfully, False if permission denied or group not found.

Source code in plantdb/commons/fsdb/auth.py
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
def delete_group(self, user: User, group_name: str) -> bool:
    """
    Delete a group if the user has permission.

    Parameters
    ----------
    user : User
        The user requesting to delete the group.
    group_name : str
        The name of the group to delete.

    Returns
    -------
    bool
        True if the group was deleted successfully, False if permission denied or group not found.
    """
    if not self.can_delete_group(user):
        return False

    return self.groups.delete_group(group_name)

ensure_scan_owner Link

ensure_scan_owner(scan_metadata)

Ensure a scan has an owner field, defaulting to guest if missing.

This method should be called when loading or creating scans to ensure the owner field is always present.

Parameters:

Name Type Description Default
scan_metadata dict

The scan metadata to check and potentially modify.

required

Returns:

Type Description
dict

The metadata with owner field guaranteed to be present.

Source code in plantdb/commons/fsdb/auth.py
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
def ensure_scan_owner(self, scan_metadata: dict) -> dict:
    """
    Ensure a scan has an owner field, defaulting to guest if missing.

    This method should be called when loading or creating scans to ensure
    the owner field is always present.

    Parameters
    ----------
    scan_metadata : dict
        The scan metadata to check and potentially modify.

    Returns
    -------
    dict
        The metadata with owner field guaranteed to be present.
    """
    if 'owner' not in scan_metadata:
        scan_metadata = scan_metadata.copy()
        scan_metadata['owner'] = self.users.GUEST_USERNAME
    return scan_metadata

get_accessible_scans_for_user Link

get_accessible_scans_for_user(user, all_scan_metadata)

Filter scans to only include those the user has READ access to.

This method can be used to implement scan listing with proper access control.

Parameters:

Name Type Description Default
user User

The user requesting scan access.

required
all_scan_metadata Dict[str, dict]

Dictionary mapping scan IDs to their metadata.

required

Returns:

Type Description
Dict[str, dict]

Dictionary of scan IDs to metadata for scans the user can read.

Source code in plantdb/commons/fsdb/auth.py
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
def get_accessible_scans_for_user(self, user: User, all_scan_metadata: Dict[str, dict]) -> Dict[str, dict]:
    """
    Filter scans to only include those the user has READ access to.

    This method can be used to implement scan listing with proper access control.

    Parameters
    ----------
    user : User
        The user requesting scan access.
    all_scan_metadata : Dict[str, dict]
        Dictionary mapping scan IDs to their metadata.

    Returns
    -------
    Dict[str, dict]
        Dictionary of scan IDs to metadata for scans the user can read.
    """
    accessible_scans = {}

    for scan_id, metadata in all_scan_metadata.items():
        # Ensure owner field is present
        metadata = self.ensure_scan_owner(metadata)

        # Check if user has READ access to this scan
        if self.can_access_scan(user, metadata, Permission.READ):
            accessible_scans[scan_id] = metadata

    return accessible_scans

get_effective_role_for_scan Link

get_effective_role_for_scan(user, scan_metadata)

Get the effective role a user has for a specific scan dataset.

This method determines the user's role based on: 1. Dataset ownership (owner gets CONTRIBUTOR role) 2. Group sharing (shared group members get CONTRIBUTOR role) 3. User's global role (fallback)

Parameters:

Name Type Description Default
user User

The user to check.

required
scan_metadata dict

The scan metadata containing 'owner' and optional 'sharing' fields.

required

Returns:

Type Description
Role

The effective role for this specific scan dataset.

Source code in plantdb/commons/fsdb/auth.py
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
def get_effective_role_for_scan(self, user: User, scan_metadata: dict) -> Role:
    """
    Get the effective role a user has for a specific scan dataset.

    This method determines the user's role based on:
    1. Dataset ownership (owner gets CONTRIBUTOR role)
    2. Group sharing (shared group members get CONTRIBUTOR role)
    3. User's global role (fallback)

    Parameters
    ----------
    user : User
        The user to check.
    scan_metadata : dict
        The scan metadata containing 'owner' and optional 'sharing' fields.

    Returns
    -------
    Role
        The effective role for this specific scan dataset.
    """
    # Get the scan owner, default to guest if not specified
    scan_owner = scan_metadata.get('owner', self.users.GUEST_USERNAME)

    # If user is the owner, they get CONTRIBUTOR role (unless they're admin)
    if user.username == scan_owner:
        # Admins keep their admin privileges
        if Role.ADMIN in user.roles:
            return Role.ADMIN
        # Owners get at least CONTRIBUTOR privileges
        current_roles = user.roles
        if Role.ADMIN in current_roles:
            return Role.ADMIN
        elif Role.CONTRIBUTOR in current_roles or Role.READER in current_roles:
            return Role.CONTRIBUTOR
        else:
            # Fallback to CONTRIBUTOR for owners
            return Role.CONTRIBUTOR

    # Check if user belongs to any shared groups
    shared_groups = scan_metadata.get('sharing', [])
    if shared_groups:
        user_groups = self.groups.get_user_groups(user.username)
        user_group_names = {group.name for group in user_groups}

        # If user belongs to any shared group, they get CONTRIBUTOR role for this scan
        if any(group_name in user_group_names for group_name in shared_groups):
            # Admins keep their admin privileges
            if Role.ADMIN in user.roles:
                return Role.ADMIN
            # Shared group members get CONTRIBUTOR privileges for this scan
            return Role.CONTRIBUTOR

    # Fall back to user's highest global role
    if Role.ADMIN in user.roles:
        return Role.ADMIN
    elif Role.CONTRIBUTOR in user.roles:
        return Role.CONTRIBUTOR
    else:
        return Role.READER

get_scan_permissions Link

get_scan_permissions(user, scan_metadata)

Get the set of permissions a user has for a specific scan dataset.

This method considers the user's effective role for the specific scan, taking into account ownership and group sharing.

Parameters:

Name Type Description Default
user User

The user to check permissions for.

required
scan_metadata dict

The scan metadata containing 'owner' and optional 'sharing' fields.

required

Returns:

Type Description
Set[Permission]

The set of permissions the user has for this specific scan.

Source code in plantdb/commons/fsdb/auth.py
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
def get_scan_permissions(self, user: User, scan_metadata: dict) -> Set[Permission]:
    """
    Get the set of permissions a user has for a specific scan dataset.

    This method considers the user's effective role for the specific scan,
    taking into account ownership and group sharing.

    Parameters
    ----------
    user : User
        The user to check permissions for.
    scan_metadata : dict
        The scan metadata containing 'owner' and optional 'sharing' fields.

    Returns
    -------
    Set[Permission]
        The set of permissions the user has for this specific scan.
    """
    effective_role = self.get_effective_role_for_scan(user, scan_metadata)
    return effective_role.permissions

get_user_groups Link

get_user_groups(username)

Get all groups that a user belongs to.

Parameters:

Name Type Description Default
username str

The username to search for.

required

Returns:

Type Description
List[Group]

A list of Group objects that the user belongs to.

Source code in plantdb/commons/fsdb/auth.py
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
def get_user_groups(self, username: str) -> List[Group]:
    """
    Get all groups that a user belongs to.

    Parameters
    ----------
    username : str
        The username to search for.

    Returns
    -------
    List[Group]
        A list of Group objects that the user belongs to.
    """
    return self.groups.get_user_groups(username)

get_user_permissions Link

get_user_permissions(user)

Get the set of permissions a user has based on their assigned roles.

This function returns a set containing all permissions directly assigned to the user as well as those inherited from any roles they are part of. The resulting set is a union of both direct and indirect permissions.

Parameters:

Name Type Description Default
user User

A User object representing the user whose permissions will be checked. This user must have attributes permissions and roles.

required

Returns:

Type Description
Set[Permission]

A set containing all permissions that the specified user has access to, including those inherited from roles.

Examples:

>>> from plantdb.commons.fsdb.auth import Permission
>>> from plantdb.commons.fsdb.auth import User
>>> role_admin = Role('admin')
>>> role_user = Role('user')
>>> permission_a = Permission()  # Mocking a specific permission
>>> permission_b = Permission()  # Mocking another specific permission
>>> user = User(permissions={permission_a}, roles={role_user})
>>> role_permissions = {Role('admin'): {permission_b}}
>>> user.get_user_permissions(user)
{<__main__.Permission object at 0x...>}
Notes

The result depends on the role_permissions attribute of the class instance. Ensure that this dictionary is properly initialized before calling this method.

See Also

User : Represents a user with permissions and roles. Permission : Represents a permission that can be assigned to users or roles. Role : Represents a role with specific permissions.

Source code in plantdb/commons/fsdb/auth.py
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
def get_user_permissions(self, user: User) -> Set[Permission]:
    """
    Get the set of permissions a user has based on their assigned roles.

    This function returns a set containing all permissions directly assigned to
    the user as well as those inherited from any roles they are part of. The
    resulting set is a union of both direct and indirect permissions.

    Parameters
    ----------
    user : User
        A `User` object representing the user whose permissions will be checked.
        This user must have attributes `permissions` and `roles`.

    Returns
    -------
    Set[Permission]
        A set containing all permissions that the specified user has access to,
        including those inherited from roles.

    Examples
    --------
    >>> from plantdb.commons.fsdb.auth import Permission
    >>> from plantdb.commons.fsdb.auth import User
    >>> role_admin = Role('admin')
    >>> role_user = Role('user')
    >>> permission_a = Permission()  # Mocking a specific permission
    >>> permission_b = Permission()  # Mocking another specific permission
    >>> user = User(permissions={permission_a}, roles={role_user})
    >>> role_permissions = {Role('admin'): {permission_b}}
    >>> user.get_user_permissions(user)  # doctest: +SKIP
    {<__main__.Permission object at 0x...>}

    Notes
    -----
    The result depends on the `role_permissions` attribute of the class instance.
    Ensure that this dictionary is properly initialized before calling this method.

    See Also
    --------
    User : Represents a user with permissions and roles.
    Permission : Represents a permission that can be assigned to users or roles.
    Role : Represents a role with specific permissions.

    """
    permissions = set(user.permissions) if user.permissions else set()
    for role in user.roles:
        permissions.update(role.permissions)
    return permissions

get_user_scan_role_summary Link

get_user_scan_role_summary(user, scan_metadata)

Get a summary of the user's access to a specific scan.

This is useful for debugging and user interfaces to show access levels.

Parameters:

Name Type Description Default
user User

The user to analyze.

required
scan_metadata dict

The scan metadata.

required

Returns:

Type Description
dict

A dictionary containing access information including: - effective_role: The user's effective role for this scan - permissions: List of permissions the user has - access_reason: Why the user has this level of access

Source code in plantdb/commons/fsdb/auth.py
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
def get_user_scan_role_summary(self, user: User, scan_metadata: dict) -> dict:
    """
    Get a summary of the user's access to a specific scan.

    This is useful for debugging and user interfaces to show access levels.

    Parameters
    ----------
    user : User
        The user to analyze.
    scan_metadata : dict
        The scan metadata.

    Returns
    -------
    dict
        A dictionary containing access information including:
        - effective_role: The user's effective role for this scan
        - permissions: List of permissions the user has
        - access_reason: Why the user has this level of access
    """
    metadata = self.ensure_scan_owner(scan_metadata)
    effective_role = self.get_effective_role_for_scan(user, metadata)
    permissions = list(self.get_scan_permissions(user, metadata))

    # Determine access reason
    scan_owner = metadata.get('owner', self.users.GUEST_USERNAME)
    shared_groups = metadata.get('sharing', [])
    user_groups = {group.name for group in self.groups.get_user_groups(user.username)}

    access_reason = []

    if Role.ADMIN in user.roles:
        access_reason.append("admin_role")

    if user.username == scan_owner:
        access_reason.append("owner")

    shared_group_matches = [g for g in shared_groups if g in user_groups]
    if shared_group_matches:
        access_reason.append(f"group_member: {', '.join(shared_group_matches)}")

    if not access_reason:
        access_reason.append("global_role")

    return {
        'effective_role': effective_role.value,
        'permissions': [p.value for p in permissions],
        'access_reason': access_reason,
        'is_owner': user.username == scan_owner,
        'shared_groups': shared_group_matches if shared_groups else []
    }

has_permission Link

has_permission(user, permission)

Check if a user has a specific permission.

This function determines whether the given user has the specified permission, including administrative privileges.

Parameters:

Name Type Description Default
user User

The user object to check for permissions.

required
permission Permission

The permission level or type to verify against the user's permissions.

required

Returns:

Type Description
bool

True if the user has the given permission, False otherwise.

Notes

The function checks for the specific permission in the user's permission set.

See Also

get_user_permissions : Retrieve the list of permissions a user has.

Source code in plantdb/commons/fsdb/auth.py
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
def has_permission(self, user: User, permission: Permission) -> bool:
    """
    Check if a user has a specific permission.

    This function determines whether the given user has the specified permission,
    including administrative privileges.

    Parameters
    ----------
    user : User
        The user object to check for permissions.
    permission : Permission
        The permission level or type to verify against the user's permissions.

    Returns
    -------
    bool
        `True` if the user has the given permission, `False` otherwise.

    Notes
    -----
    The function checks for the specific permission in the user's permission set.

    See Also
    --------
    get_user_permissions : Retrieve the list of permissions a user has.
    """
    user_permissions = self.get_user_permissions(user)
    return permission in user_permissions

is_guest_user Link

is_guest_user(user)

Check if the given user is the guest user.

Parameters:

Name Type Description Default
user User

The user object to check.

required

Returns:

Type Description
bool

True if the user is the guest user, False otherwise.

Source code in plantdb/commons/fsdb/auth.py
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
def is_guest_user(self, user: User) -> bool:
    """
    Check if the given user is the guest user.

    Parameters
    ----------
    user : User
        The user object to check.

    Returns
    -------
    bool
        True if the user is the guest user, False otherwise.
    """
    return user.username == self.users.GUEST_USERNAME

list_groups Link

list_groups(user)

List all groups if the user has permission.

Parameters:

Name Type Description Default
user User

The user requesting the group list.

required

Returns:

Type Description
Optional[List[Group]]

A list of all groups if the user has permission, None otherwise.

Source code in plantdb/commons/fsdb/auth.py
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
def list_groups(self, user: User) -> Optional[List[Group]]:
    """
    List all groups if the user has permission.

    Parameters
    ----------
    user : User
        The user requesting the group list.

    Returns
    -------
    Optional[List[Group]]
        A list of all groups if the user has permission, None otherwise.
    """
    # For now, any authenticated user can list groups
    # This can be restricted later if needed
    return self.groups.list_groups()

remove_user_from_group Link

remove_user_from_group(user, group_name, username_to_remove)

Remove a user from a group if the requesting user has permission.

Parameters:

Name Type Description Default
user User

The user requesting to remove a member.

required
group_name str

The name of the group.

required
username_to_remove str

The username to remove from the group.

required

Returns:

Type Description
bool

True if the user was removed successfully, False if permission denied or operation failed.

Source code in plantdb/commons/fsdb/auth.py
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
def remove_user_from_group(self, user: User, group_name: str, username_to_remove: str) -> bool:
    """
    Remove a user from a group if the requesting user has permission.

    Parameters
    ----------
    user : User
        The user requesting to remove a member.
    group_name : str
        The name of the group.
    username_to_remove : str
        The username to remove from the group.

    Returns
    -------
    bool
        True if the user was removed successfully, False if permission denied or operation failed.
    """
    if not self.can_add_to_group(user, group_name):  # Same permission as adding
        return False

    return self.groups.remove_user_from_group(group_name, username_to_remove)

unlock Link

unlock(requesting_user, username)

Unlock a user account - requires MANAGE_USERS permission

Source code in plantdb/commons/fsdb/auth.py
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
@requires_permission(Permission.MANAGE_USERS)
def unlock(self, requesting_user: User, username: str) -> bool:
    """Unlock a user account - requires MANAGE_USERS permission"""
    if not self.users.exists(username):
        self.logger.warning(f"Attempt to unlock non-existent user: {username}")
        return False

    user = self.users.get_user(username)
    if user.locked_until:
        self.users.unlock_user(user)
        self.logger.info(f"User {username} unlocked by {requesting_user.username}")
    else:
        self.logger.info(f"User {username} was not locked")
    return True

validate_scan_metadata_access Link

validate_scan_metadata_access(user, old_metadata, new_metadata)

Validate that a user can make the proposed metadata changes.

This method checks if the user has permission to modify specific fields like 'owner' and 'sharing' based on the RBAC rules.

Parameters:

Name Type Description Default
user User

The user attempting to modify metadata.

required
old_metadata dict

The current scan metadata.

required
new_metadata dict

The proposed new metadata.

required

Returns:

Type Description
bool

True if all proposed changes are allowed, False otherwise.

Source code in plantdb/commons/fsdb/auth.py
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
def validate_scan_metadata_access(self, user: User, old_metadata: dict, new_metadata: dict) -> bool:
    """
    Validate that a user can make the proposed metadata changes.

    This method checks if the user has permission to modify specific fields
    like 'owner' and 'sharing' based on the RBAC rules.

    Parameters
    ----------
    user : User
        The user attempting to modify metadata.
    old_metadata : dict
        The current scan metadata.
    new_metadata : dict
        The proposed new metadata.

    Returns
    -------
    bool
        True if all proposed changes are allowed, False otherwise.
    """
    # Check if owner field is being modified
    old_owner = old_metadata.get('owner', self.users.GUEST_USERNAME)
    new_owner = new_metadata.get('owner', self.users.GUEST_USERNAME)

    if old_owner != new_owner:
        if not self.can_modify_scan_owner(user, old_metadata):
            return False

    # Check if sharing field is being modified
    old_sharing = old_metadata.get('sharing', [])
    new_sharing = new_metadata.get('sharing', [])

    if old_sharing != new_sharing:
        if not self.can_modify_scan_sharing(user, old_metadata):
            return False

    return True

validate_sharing_groups Link

validate_sharing_groups(sharing_groups)

Validate that all groups in the sharing list exist.

Parameters:

Name Type Description Default
sharing_groups List[str]

List of group names to validate.

required

Returns:

Type Description
bool

True if all groups exist, False otherwise.

Source code in plantdb/commons/fsdb/auth.py
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
def validate_sharing_groups(self, sharing_groups: List[str]) -> bool:
    """
    Validate that all groups in the sharing list exist.

    Parameters
    ----------
    sharing_groups : List[str]
        List of group names to validate.

    Returns
    -------
    bool
        True if all groups exist, False otherwise.
    """
    for group_name in sharing_groups:
        if not self.groups.group_exists(group_name):
            return False
    return True

Role Link

Bases: Enum

A class representing the role of a user in a system.

The Role enum defines three types of roles that can be assigned to users:

* READER: Has read-only access.
* CONTRIBUTOR: Can modify content but not delete.
* ADMIN: Full control over all aspects of the system.

This enum helps in managing permissions and access levels efficiently within an application.

Attributes:

Name Type Description
READER str

Represents a user with read-only access.

CONTRIBUTOR str

Represents a user who can modify content but not delete it.

ADMIN str

Represents a user with full control over the system.

Examples:

>>> from plantdb.commons.fsdb.auth import Role
>>> Role.READER
<Role.READER: 'reader'>
>>> Role.CONTRIBUTOR
<Role.CONTRIBUTOR: 'contributor'>
>>> Role.ADMIN
<Role.ADMIN: 'admin'>
>>> Role.ADMIN.permissions
{<Permission.ADMIN_ALL: 'admin_all'>,
 <Permission.CREATE_SCAN: 'create_scan'>,
 <Permission.DELETE_SCAN: 'delete_scan'>,
 <Permission.MANAGE_USERS: 'manage_users'>,
 <Permission.READ_SCAN: 'read_scan'>,
 <Permission.WRITE_SCAN: 'write_scan'>}

permissions property Link

permissions

Get the set of permissions associated with this role.

Returns:

Type Description
Set[Permission]

A set containing all permissions granted to this role.

SessionManager Link

SessionManager(session_timeout=3600, max_concurrent_sessions=10)

Manages user sessions with expiration and validation.

This class provides methods to create, validate, invalidate, and cleanup expired sessions. Each session is associated with a unique identifier (session_id) and has an expiry time based on the session timeout duration specified during initialization.

Attributes:

Name Type Description
sessions Dict[str, dict]

A dictionary storing active sessions. Each key is a session ID, and each value is a dictionary containing: - 'username': str - The user associated with this session. - 'created_at': datetime - When the session was created. - 'last_accessed': datetime - Last time the session was accessed. - 'expires_at': datetime - Expiry time of the session.

session_timeout int

Duration in seconds after which a session expires.

max_concurrent_sessions int

The maximum number of concurrent sessions to allow.

logger Logger

The logger to use for this session manager.

Manage user sessions with timeout.

Parameters:

Name Type Description Default
session_timeout int

The duration for which the session should be valid in seconds. A session that exceeds this duration will be considered expired and removed. Defaults to 3600 seconds.

3600
max_concurrent_sessions int

The maximum number of concurrent sessions to allow. Defaults to 10.

10
Source code in plantdb/commons/fsdb/auth.py
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
def __init__(self, session_timeout: int = 3600, max_concurrent_sessions: int = 10):  # 1 hour default
    """
    Manage user sessions with timeout.

    Parameters
    ----------
    session_timeout : int, optional
        The duration for which the session should be valid in seconds.
        A session that exceeds this duration will be considered expired and removed.
        Defaults to ``3600`` seconds.
    max_concurrent_sessions : int, optional
        The maximum number of concurrent sessions to allow.
        Defaults to ``10``.
    """
    self.sessions: Dict[str, dict] = {}
    self.session_timeout = session_timeout
    self.max_concurrent_sessions = max_concurrent_sessions

    self.logger = get_logger(__class__.__name__)

cleanup_expired_sessions Link

cleanup_expired_sessions()

Remove expired sessions from the session dictionary.

This method iterates through all stored sessions and deletes any that have an expiration time earlier than the current time.

Notes

This function modifies the self.sessions dictionary in-place.

Source code in plantdb/commons/fsdb/auth.py
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
def cleanup_expired_sessions(self) -> None:
    """
    Remove expired sessions from the session dictionary.

    This method iterates through all stored sessions and deletes any that have
    an expiration time earlier than the current time.

    Notes
    -----
    This function modifies the `self.sessions` dictionary in-place.
    """
    current_time = datetime.now()
    expired_sessions = [
        sid for sid, session in self.sessions.items()
        if current_time > session['expires_at']
    ]
    for sid in expired_sessions:
        del self.sessions[sid]
    return

create_session Link

create_session(username)

Create a new session for a user.

If the user already has an active session, it returns the existing session ID. Otherwise, it creates a new session and returns its ID.

Parameters:

Name Type Description Default
username str

The unique identifier of the user for whom to create a session.

required

Returns:

Type Description
Union[str, None]

The ID of the created or existing session.

Notes

The session ID is a token generated using secrets.token_urlsafe. The session data includes the user ID, creation timestamp, last accessed timestamp, and expiration timestamp.

Source code in plantdb/commons/fsdb/auth.py
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
def create_session(self, username: str) -> Union[str, None]:
    """
    Create a new session for a user.

    If the user already has an active session, it returns the existing session ID.
    Otherwise, it creates a new session and returns its ID.

    Parameters
    ----------
    username : str
        The unique identifier of the user for whom to create a session.

    Returns
    -------
    Union[str, None]
        The ID of the created or existing session.

    Notes
    -----
    The session ID is a token generated using `secrets.token_urlsafe`.
    The session data includes the user ID, creation timestamp, last accessed timestamp, and expiration timestamp.
    """
    if self._user_has_session(username):
        self.logger.warning(f"User '{username}' already has an active session!")
        return None

    if self.n_active_sessions() >= self.max_concurrent_sessions:
        self.logger.warning(
            f"Reached max concurrent sessions limit ({self.max_concurrent_sessions})")
        return None

    now = datetime.now()
    exp_time = now + timedelta(seconds=self.session_timeout)
    # Create a session token
    session_token = secrets.token_urlsafe(32)

    self.sessions[session_token] = {
        'username': username,
        'created_at': now,
        'last_accessed': now,
        'expires_at': exp_time
    }
    return session_token

invalidate_session Link

invalidate_session(session_id)

Remove the given session identifier from the active sessions.

Parameters:

Name Type Description Default
session_id str

The unique identifier of the session to be removed.

required

Returns:

Type Description
bool

True if the specified session was found and removed, False otherwise.

str

The username corresponding to the invalidated session

Notes

The session ID is removed from the internal session dictionary. If the session does not exist, this method has no effect.

Source code in plantdb/commons/fsdb/auth.py
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
def invalidate_session(self, session_id: str) -> Tuple[bool, str | None]:
    """
    Remove the given session identifier from the active sessions.

    Parameters
    ----------
    session_id : str
        The unique identifier of the session to be removed.

    Returns
    -------
    bool
        `True` if the specified session was found and removed, `False` otherwise.
    str
        The username corresponding to the invalidated session

    Notes
    -----
    The session ID is removed from the internal session dictionary.
    If the session does not exist, this method has no effect.
    """
    if session_id in self.sessions:
        username = self.sessions[session_id]['username']
        del self.sessions[session_id]
        return True, username

    return False, None

n_active_sessions Link

n_active_sessions()

Returns the number of active sessions.

Cleans up expired sessions before counting and returns the number of remaining active sessions in the collection.

Returns:

Type Description
int

The number of currently active sessions.

See Also

cleanup_expired_sessions : Cleans up the expired sessions in the collection.

Source code in plantdb/commons/fsdb/auth.py
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
def n_active_sessions(self) -> int:
    """
    Returns the number of active sessions.

    Cleans up expired sessions before counting and returns the number
    of remaining active sessions in the collection.

    Returns
    -------
    int
        The number of currently active sessions.

    See Also
    --------
    cleanup_expired_sessions : Cleans up the expired sessions in the collection.
    """
    self.cleanup_expired_sessions()
    return len(self.sessions)

refresh_session Link

refresh_session(session_id)

Refresh a session if it's still valid.

Parameters:

Name Type Description Default
session_id str

Current session token

required

Returns:

Type Description
str or None

New session token if refresh successful

Source code in plantdb/commons/fsdb/auth.py
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
def refresh_session(self, session_id: str) -> Optional[str]:
    """
    Refresh a session if it's still valid.

    Parameters
    ----------
    session_id : str
        Current session token

    Returns
    -------
    str or None
        New session token if refresh successful
    """
    session_data = self.validate_session(session_id)
    if not session_data:
        return None

    # Invalidate old session
    self.invalidate_session(session_id)

    # Create new session
    username = session_data['username']
    return self.create_session(username)

session_username Link

session_username(session_id)

Return the username associated with a session.

Source code in plantdb/commons/fsdb/auth.py
2331
2332
2333
2334
def session_username(self, session_id: str) -> Optional[str]:
    """Return the username associated with a session."""
    session_data = self.validate_session(session_id)
    return session_data['username'] if session_data else None

validate_session Link

validate_session(session_id)

Validate a given session by checking its existence and expiration status.

Parameters:

Name Type Description Default
session_id str

The unique identifier of the session to be validated.

required

Returns:

Type Description
dict or None

A dictionary with user information if valid, None if invalid/expired. Returns dictionary with: - username: The authenticated user - created_at: When the session was created - last_accessed: When the session was last validated - expires_at: When the session expires

Notes

The validate_session method updates the session's last accessed time upon successful validation.

Source code in plantdb/commons/fsdb/auth.py
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
def validate_session(self, session_id: str) -> Optional[dict]:
    """
    Validate a given session by checking its existence and expiration status.

    Parameters
    ----------
    session_id : str
        The unique identifier of the session to be validated.

    Returns
    -------
    dict or None
        A dictionary with user information if valid, ``None`` if invalid/expired.
        Returns dictionary with:
        - username: The authenticated user
        - created_at: When the session was created
        - last_accessed: When the session was last validated
        - expires_at: When the session expires

    Notes
    -----
    The `validate_session` method updates the session's last accessed time upon successful validation.
    """
    if session_id not in self.sessions:
        self.logger.warning(f"Provided session does not exist!")
        return None

    session = self.sessions[session_id]
    now = datetime.now()
    if now > session['expires_at']:
        username = session['username']
        self.logger.warning(f"The session for user '{username}' has expired. Please log back in!")
        success, username = self.invalidate_session(session_id)
        return None

    # Update last accessed time
    session['last_accessed'] = now
    return session

SingleSessionManager Link

SingleSessionManager(session_timeout=3600, **kwargs)

Bases: SessionManager

Generate a single-session manager for handling database connections.

The SingleSessionManager class is designed to manage a single active database session at any given time. It inherits from the base SessionManager class and overrides its initialization to ensure only one concurrent session is allowed, even if the base class allows more.

Parameters:

Name Type Description Default
session_timeout int

The timeout duration for each database session in seconds. If not specified, defaults to 3600 (1 hour).

3600

Attributes:

Name Type Description
sessions Dict[str, dict]

A dictionary storing active sessions. Each key is a session ID, and each value is a dictionary containing: - 'username': str - The user associated with this session. - 'created_at': datetime - When the session was created. - 'last_accessed': datetime - Last time the session was accessed. - 'expires_at': datetime - Expiry time of the session.

session_timeout int

The configured timeout duration for sessions.

max_concurrent_sessions int

Always set to 1 to ensure only one concurrent session is allowed.

logger Logger

The logger to use for this session manager.

Examples:

>>> from plantdb.commons.fsdb.auth import SingleSessionManager
>>> # Initialize the session manager
>>> manager = SingleSessionManager()
>>> # Create a new session with the username 'test'
>>> session_token = manager.create_session('test')
>>> # Attempt to create another session with the username 'test2'
>>> _ = manager.create_session('test2')
WARNING  [SessionManager] Reached max concurrent sessions limit (1)
>>> # Validate the session and get its info
>>> session = manager.validate_session(session_token)
>>> print(session['expires_at'])  # Print the expiration date
>>> # Refresh the session using the existing session token
>>> new_session_token = manager.refresh_session(session_token)
>>> # Validate the session and get its info
>>> session = manager.validate_session(new_session_token)
>>> print(session['expires_at'])  # Print the expiration date of the refreshed session
Notes

The SingleSessionManager enforces a single-session policy, which means any attempt to create more than one active session will result in an error or be handled according to the logic defined within this class.

See Also

session_manager.SessionManager : Base class for managing database sessions.

Source code in plantdb/commons/fsdb/auth.py
2422
2423
def __init__(self, session_timeout: int = 3600, **kwargs) -> None:
    super().__init__(session_timeout=session_timeout, max_concurrent_sessions=1)

cleanup_expired_sessions Link

cleanup_expired_sessions()

Remove expired sessions from the session dictionary.

This method iterates through all stored sessions and deletes any that have an expiration time earlier than the current time.

Notes

This function modifies the self.sessions dictionary in-place.

Source code in plantdb/commons/fsdb/auth.py
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
def cleanup_expired_sessions(self) -> None:
    """
    Remove expired sessions from the session dictionary.

    This method iterates through all stored sessions and deletes any that have
    an expiration time earlier than the current time.

    Notes
    -----
    This function modifies the `self.sessions` dictionary in-place.
    """
    current_time = datetime.now()
    expired_sessions = [
        sid for sid, session in self.sessions.items()
        if current_time > session['expires_at']
    ]
    for sid in expired_sessions:
        del self.sessions[sid]
    return

create_session Link

create_session(username)

Create a new session for a user.

If the user already has an active session, it returns the existing session ID. Otherwise, it creates a new session and returns its ID.

Parameters:

Name Type Description Default
username str

The unique identifier of the user for whom to create a session.

required

Returns:

Type Description
Union[str, None]

The ID of the created or existing session.

Notes

The session ID is a token generated using secrets.token_urlsafe. The session data includes the user ID, creation timestamp, last accessed timestamp, and expiration timestamp.

Source code in plantdb/commons/fsdb/auth.py
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
def create_session(self, username: str) -> Union[str, None]:
    """
    Create a new session for a user.

    If the user already has an active session, it returns the existing session ID.
    Otherwise, it creates a new session and returns its ID.

    Parameters
    ----------
    username : str
        The unique identifier of the user for whom to create a session.

    Returns
    -------
    Union[str, None]
        The ID of the created or existing session.

    Notes
    -----
    The session ID is a token generated using `secrets.token_urlsafe`.
    The session data includes the user ID, creation timestamp, last accessed timestamp, and expiration timestamp.
    """
    if self._user_has_session(username):
        self.logger.warning(f"User '{username}' already has an active session!")
        return None

    if self.n_active_sessions() >= self.max_concurrent_sessions:
        self.logger.warning(
            f"Reached max concurrent sessions limit ({self.max_concurrent_sessions})")
        return None

    now = datetime.now()
    exp_time = now + timedelta(seconds=self.session_timeout)
    # Create a session token
    session_token = secrets.token_urlsafe(32)

    self.sessions[session_token] = {
        'username': username,
        'created_at': now,
        'last_accessed': now,
        'expires_at': exp_time
    }
    return session_token

invalidate_session Link

invalidate_session(session_id)

Remove the given session identifier from the active sessions.

Parameters:

Name Type Description Default
session_id str

The unique identifier of the session to be removed.

required

Returns:

Type Description
bool

True if the specified session was found and removed, False otherwise.

str

The username corresponding to the invalidated session

Notes

The session ID is removed from the internal session dictionary. If the session does not exist, this method has no effect.

Source code in plantdb/commons/fsdb/auth.py
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
def invalidate_session(self, session_id: str) -> Tuple[bool, str | None]:
    """
    Remove the given session identifier from the active sessions.

    Parameters
    ----------
    session_id : str
        The unique identifier of the session to be removed.

    Returns
    -------
    bool
        `True` if the specified session was found and removed, `False` otherwise.
    str
        The username corresponding to the invalidated session

    Notes
    -----
    The session ID is removed from the internal session dictionary.
    If the session does not exist, this method has no effect.
    """
    if session_id in self.sessions:
        username = self.sessions[session_id]['username']
        del self.sessions[session_id]
        return True, username

    return False, None

n_active_sessions Link

n_active_sessions()

Returns the number of active sessions.

Cleans up expired sessions before counting and returns the number of remaining active sessions in the collection.

Returns:

Type Description
int

The number of currently active sessions.

See Also

cleanup_expired_sessions : Cleans up the expired sessions in the collection.

Source code in plantdb/commons/fsdb/auth.py
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
def n_active_sessions(self) -> int:
    """
    Returns the number of active sessions.

    Cleans up expired sessions before counting and returns the number
    of remaining active sessions in the collection.

    Returns
    -------
    int
        The number of currently active sessions.

    See Also
    --------
    cleanup_expired_sessions : Cleans up the expired sessions in the collection.
    """
    self.cleanup_expired_sessions()
    return len(self.sessions)

refresh_session Link

refresh_session(session_id)

Refresh a session if it's still valid.

Parameters:

Name Type Description Default
session_id str

Current session token

required

Returns:

Type Description
str or None

New session token if refresh successful

Source code in plantdb/commons/fsdb/auth.py
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
def refresh_session(self, session_id: str) -> Optional[str]:
    """
    Refresh a session if it's still valid.

    Parameters
    ----------
    session_id : str
        Current session token

    Returns
    -------
    str or None
        New session token if refresh successful
    """
    session_data = self.validate_session(session_id)
    if not session_data:
        return None

    # Invalidate old session
    self.invalidate_session(session_id)

    # Create new session
    username = session_data['username']
    return self.create_session(username)

session_username Link

session_username(session_id)

Return the username associated with a session.

Source code in plantdb/commons/fsdb/auth.py
2331
2332
2333
2334
def session_username(self, session_id: str) -> Optional[str]:
    """Return the username associated with a session."""
    session_data = self.validate_session(session_id)
    return session_data['username'] if session_data else None

validate_session Link

validate_session(session_id)

Validate a given session by checking its existence and expiration status.

Parameters:

Name Type Description Default
session_id str

The unique identifier of the session to be validated.

required

Returns:

Type Description
dict or None

A dictionary with user information if valid, None if invalid/expired. Returns dictionary with: - username: The authenticated user - created_at: When the session was created - last_accessed: When the session was last validated - expires_at: When the session expires

Notes

The validate_session method updates the session's last accessed time upon successful validation.

Source code in plantdb/commons/fsdb/auth.py
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
def validate_session(self, session_id: str) -> Optional[dict]:
    """
    Validate a given session by checking its existence and expiration status.

    Parameters
    ----------
    session_id : str
        The unique identifier of the session to be validated.

    Returns
    -------
    dict or None
        A dictionary with user information if valid, ``None`` if invalid/expired.
        Returns dictionary with:
        - username: The authenticated user
        - created_at: When the session was created
        - last_accessed: When the session was last validated
        - expires_at: When the session expires

    Notes
    -----
    The `validate_session` method updates the session's last accessed time upon successful validation.
    """
    if session_id not in self.sessions:
        self.logger.warning(f"Provided session does not exist!")
        return None

    session = self.sessions[session_id]
    now = datetime.now()
    if now > session['expires_at']:
        username = session['username']
        self.logger.warning(f"The session for user '{username}' has expired. Please log back in!")
        success, username = self.invalidate_session(session_id)
        return None

    # Update last accessed time
    session['last_accessed'] = now
    return session

User dataclass Link

User(username, fullname, password_hash, roles, created_at, permissions=None, last_login=None, is_active=True, failed_attempts=0, last_failed_attempt=None, locked_until=None, password_last_change=None)

Summarize the purpose of the User class.

The User class represents a user entity in an application. It contains attributes related to user authentication, roles, permissions, and activity timestamps. The class is designed to encapsulate user data and provide methods for user management. Users can have multiple roles and permissions, which are stored as sets. The class also tracks the creation time and last login time of a user.

Attributes:

Name Type Description
username str

The unique username of the user.

password_hash str

The hashed password of the user.

roles Set[Role]

A set containing roles assigned to the user.

created_at datetime

The timestamp when the user account was created.

permissions (Set[Permission], optional)

A set containing specific permissions granted to the user.

last_login (Optional[datetime], optional)

The timestamp of the last login. If not provided, defaults to None.

is_active (bool, optional)

Indicates if the user account is active. Defaults to True.

failed_attempts (int, optional)

Number of failed login attempts for the user. Defaults to 0.

locked_until (Optional[datetime], optional)

Timestamp until which the user is locked out due to multiple failed attempts. Defaults to None.

Notes

Ensure that sensitive data like password_hash is handled securely and not exposed in logs or error messages.

Examples:

>>> from datetime import datetime, timezone
>>> from plantdb.commons.fsdb.auth import Permission, Role, User
>>> user = User(
...     username="jdoe",
...     fullname="John Doe",
...     password_hash="hashed_password",
...     roles={Role.CONTRIBUTOR},
...     permissions={Permission.MANAGE_USERS},
...     created_at=datetime.now(timezone.utc),
... )
>>> print(user.username)
john_doe
>>> print(user.roles)  # get roles
{<Role.CONTRIBUTOR: 'contributor'>}
>>> print(user.permissions)  # get directly assigned permissions
{<Permission.MANAGE_USERS: 'manage_users'>}
>>> user.last_login = datetime.now(timezone.utc)  # set the timestamp when the user account was last login

__eq__ Link

__eq__(other)

Compare two User objects for equality.

Two User objects are considered equal if all their attributes have the same values.

Parameters:

Name Type Description Default
other Any

The object to compare with.

required

Returns:

Type Description
bool

True if the objects are equal, False otherwise.

Source code in plantdb/commons/fsdb/auth.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def __eq__(self, other):
    """
    Compare two User objects for equality.

    Two User objects are considered equal if all their attributes have the same values.

    Parameters
    ----------
    other : Any
        The object to compare with.

    Returns
    -------
    bool
        ``True`` if the objects are equal, ``False`` otherwise.
    """
    if not isinstance(other, User):
        return False

    return all(self.__dict__[attr] == other.__dict__[attr] for attr in self.__dict__)

from_dict classmethod Link

from_dict(data)

Create User object from dictionary (JSON deserialization).

Parameters:

Name Type Description Default
data dict

Dictionary containing user data.

required

Returns:

Type Description
User

User object created from the dictionary data.

Raises:

Type Description
ValueError

If required fields are missing or invalid.

KeyError

If required keys are missing from the dictionary.

Source code in plantdb/commons/fsdb/auth.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
@classmethod
def from_dict(cls, data: dict) -> 'User':
    """
    Create User object from dictionary (JSON deserialization).

    Parameters
    ----------
    data : dict
        Dictionary containing user data.

    Returns
    -------
    User
        User object created from the dictionary data.

    Raises
    ------
    ValueError
        If required fields are missing or invalid.
    KeyError
        If required keys are missing from the dictionary.
    """
    try:
        # Parse roles
        roles = set()
        for role_value in data['roles']:
            try:
                roles.add(Role(role_value))
            except ValueError:
                # Skip invalid roles for backward compatibility
                pass

        # Parse permissions
        permissions = set()
        if data.get('permissions'):
            for perm_value in data['permissions']:
                try:
                    permissions.add(Permission(perm_value))
                except ValueError:
                    # Skip invalid roles for backward compatibility
                    pass

        def _datetime_convert(data):
            if isinstance(data, datetime):
                return data
            elif isinstance(data, str):
                # Assume string is in iso format
                return datetime.fromisoformat(data)
            else:
                return None

        return cls(
            username=data['username'],
            fullname=data['fullname'],
            password_hash=data['password_hash'],
            roles=roles,
            created_at=_datetime_convert(data['created_at']),
            permissions=permissions,
            last_login=_datetime_convert(data.get('last_login')),
            is_active=data.get('is_active', True),
            failed_attempts=data.get('failed_attempts', 0),
            last_failed_attempt=_datetime_convert(data.get('last_failed_attempt')),
            locked_until=_datetime_convert(data.get('locked_until')),
            password_last_change=_datetime_convert(data.get('password_last_change')),
        )
    except KeyError as e:
        raise KeyError(f"Missing required field in user data: {e}")
    except ValueError as e:
        raise ValueError(f"Invalid data format in user data: {e}")

from_json classmethod Link

from_json(json_str)

Create User object from JSON string.

Parameters:

Name Type Description Default
json_str str

JSON string containing user data.

required

Returns:

Type Description
User

User object created from the JSON data.

Raises:

Type Description
JSONDecodeError

If the JSON string is invalid.

ValueError

If required fields are missing or invalid.

Examples:

>>> import json
>>> from plantdb.commons.fsdb.auth import User
>>>
Source code in plantdb/commons/fsdb/auth.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
@classmethod
def from_json(cls, json_str: str) -> 'User':
    """
    Create User object from JSON string.

    Parameters
    ----------
    json_str : str
        JSON string containing user data.

    Returns
    -------
    User
        User object created from the JSON data.

    Raises
    ------
    json.JSONDecodeError
        If the JSON string is invalid.
    ValueError
        If required fields are missing or invalid.

    Examples
    --------
    >>> import json
    >>> from plantdb.commons.fsdb.auth import User
    >>>
    """
    try:
        data = json.loads(json_str)
        return cls.from_dict(data)
    except json.JSONDecodeError as e:
        raise json.JSONDecodeError(f"Invalid JSON format: {e}", json_str, e.pos)

to_dict Link

to_dict()

Convert User object to dictionary for JSON serialization.

Returns:

Type Description
dict

Dictionary representation of the user object.

Source code in plantdb/commons/fsdb/auth.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def to_dict(self) -> dict:
    """
    Convert User object to dictionary for JSON serialization.

    Returns
    -------
    dict
        Dictionary representation of the user object.
    """
    return {
        'username': self.username,
        'fullname': self.fullname,
        'password_hash': self.password_hash,
        'roles': [role.value for role in self.roles],
        'created_at': self.created_at.isoformat(),
        'permissions': [perm.value for perm in self.permissions] if self.permissions else None,
        'last_login': self.last_login.isoformat() if self.last_login else None,
        'is_active': self.is_active,
        'failed_attempts': self.failed_attempts,
        'last_failed_attempt': self.last_failed_attempt.isoformat() if self.last_failed_attempt else None,
        'locked_until': self.locked_until.isoformat() if self.locked_until else None,
        'password_last_change': self.password_last_change.isoformat() if self.password_last_change else self.created_at.isoformat(),
    }

to_json Link

to_json()

Convert User object to JSON string.

Returns:

Type Description
str

JSON string representation of the user object.

Source code in plantdb/commons/fsdb/auth.py
330
331
332
333
334
335
336
337
338
339
340
341
def to_json(self) -> str:
    """
    Convert User object to JSON string.

    Returns
    -------
    str
        JSON string representation of the user object.


    """
    return json.dumps(self.to_dict(), indent=2)

UserManager Link

UserManager(users_file='users.json', max_login_attempts=3, lockout_duration=900)

UserManager class for managing user data.

The UserManager class provides methods to create, load, save, and manage users. It uses a JSON file to persist user data. It includes functionalities like password hashing and verification, user account lockout management, and handling of guest accounts.

Attributes:

Name Type Description
users_file str

Path to the JSON file where user data is stored.

users Dict[str, User]

Dictionary containing all users, indexed by username.

GUEST_USERNAME str

Default username for guest accounts.

GUEST_PASSWORD str

Default password for guest accounts.

Examples:

>>> from pathlib import Path
>>> from tempfile import gettempdir
>>> from plantdb.commons.fsdb.auth import UserManager
>>> manager = UserManager(users_file=Path(gettempdir())/'users.json')
>>> manager.validate_user_password(manager.GUEST_USERNAME, manager.GUEST_PASSWORD)
True
>>> manager.validate(manager.GUEST_USERNAME, manager.GUEST_PASSWORD)
True
>>> manager.validate('gest', manager.GUEST_PASSWORD)
False
>>> manager.deactivate(manager.get_user(manager.GUEST_USERNAME))
>>> manager.validate(manager.GUEST_USERNAME, manager.GUEST_PASSWORD)
WARNING  [UserManager] Account guest is not active.
False

User manager constructor.

Parameters:

Name Type Description Default
users_file str or Path

Path to the JSON file where user data is stored. Defaults to users.json.

'users.json'
max_login_attempts int

Maximum number of failed login attempts before account lockout. Defaults to 3.

3
lockout_duration int or timedelta

Account lockout duration in seconds. Defaults to 900 (15 minutes).

900
Source code in plantdb/commons/fsdb/auth.py
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
def __init__(self, users_file: str | Path = 'users.json', max_login_attempts=3, lockout_duration=900):
    """User manager constructor.

    Parameters
    ----------
    users_file : str or pathlib.Path, optional
        Path to the JSON file where user data is stored. Defaults to ``users.json``.
    max_login_attempts : int, optional
        Maximum number of failed login attempts before account lockout. Defaults to ``3``.
    lockout_duration : int or timedelta, optional
        Account lockout duration in seconds. Defaults to ``900`` (15 minutes).
    """
    self.ADMIN_USERNAME = "admin"
    self.GUEST_USERNAME = "guest"
    self.GUEST_PASSWORD = "guest"
    self.users_file = Path(users_file)
    self.users: Dict[str, User] = {}
    self.logger = get_logger(__class__.__name__)
    self._load_users()
    self._ensure_guest_user()
    self._ensure_admin_user()

    self.max_login_attempts = max_login_attempts
    self.lockout_duration = timedelta(seconds=lockout_duration) if isinstance(lockout_duration,
                                                                              int) else lockout_duration

activate Link

activate(user)

Activates a user.

Parameters:

Name Type Description Default
user User

The user to activate.

required
Source code in plantdb/commons/fsdb/auth.py
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
def activate(self, user: User) -> None:
    """
    Activates a user.

    Parameters
    ----------
    user : plantdb.commons.fsdb.auth.User
        The user to activate.
    """
    # Verify if the User exists
    try:
        assert self.exists(user.username)
    except AssertionError:
        self.logger.error(f"User '{user.username}' does not exists!")
        return

    user.is_active = True
    self._save_users()
    return

create Link

create(username, fullname, password, roles=None)

Create a new user and store the user information in a file.

Parameters:

Name Type Description Default
username str

The username of the user to create. This will be converted to lowercase.

required
fullname str

The full name of the user to create.

required
password str

The password of the user to create.

required
roles Role or Set[Role]

The roles to associate with the user. If None (default), use the Role.READER role.

None

Examples:

>>> from plantdb.commons.fsdb.auth import Role, UserManager
>>> manager = UserManager()
>>> manager.create('batman', "Bruce Wayne", "joker", Role.ADMIN)
>>> print(manager.users)
batman
Source code in plantdb/commons/fsdb/auth.py
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
def create(self, username: str, fullname: str, password: str, roles: Union[Role, Set[Role]] = None) -> None:
    """Create a new user and store the user information in a file.

    Parameters
    ----------
    username : str
        The username of the user to create.
        This will be converted to lowercase.
    fullname : str
        The full name of the user to create.
    password : str
        The password of the user to create.
    roles : Role or Set[Role], optional
        The roles to associate with the user.
        If ``None`` (default), use the `Role.READER` role.

    Examples
    --------
    >>> from plantdb.commons.fsdb.auth import Role, UserManager
    >>> manager = UserManager()
    >>> manager.create('batman', "Bruce Wayne", "joker", Role.ADMIN)
    >>> print(manager.users)
    batman
    """
    username = username.lower()  # Convert the username to lowercase to maintain uniformity.
    timestamp = datetime.now()  # Get the current timestamp for tracking user creation time.

    # Verify if the login is available
    try:
        assert not self.exists(username)
    except AssertionError:
        self.logger.error(f"User '{username}' already exists!")
        return

    # Convert to a list:
    if isinstance(roles, Role):
        roles = {roles}

    # Add the new user's data to the `self.users` dictionary.
    self.users[username] = User(
        username=username,
        fullname=fullname,
        password_hash=self._hash_password(password),
        roles=roles or {Role.READER},
        created_at=timestamp,
        password_last_change=timestamp,
    )
    # Save all user data (including the newly created user) to 'users.json' file.
    self._save_users()
    self.logger.debug(f"Created user '{username}' with fullname '{fullname}'.")
    if not username == self.GUEST_USERNAME:
        self.logger.info(f"Welcome {fullname}, please login...'")
    return

deactivate Link

deactivate(user)

Deactivates a user.

Parameters:

Name Type Description Default
user User

The user to deactivate.

required
Source code in plantdb/commons/fsdb/auth.py
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
def deactivate(self, user: User) -> None:
    """
    Deactivates a user.

    Parameters
    ----------
    user : plantdb.commons.fsdb.auth.User
        The user to deactivate.
    """
    # Verify if the User exists
    try:
        assert self.exists(user.username)
    except AssertionError:
        self.logger.error(f"User '{user.username}' does not exists!")
        return

    user.is_active = False
    self._save_users()
    return

exists Link

exists(username)

Check if a user exists.

Parameters:

Name Type Description Default
username str

The name of the user to check for existence.

required

Returns:

Type Description
bool

True if the user exists, otherwise False.

Notes

This function is case-sensitive. For case-insensitive checks, convert both the username and the keys to lower or upper case before comparison.

Source code in plantdb/commons/fsdb/auth.py
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
def exists(self, username: str) -> bool:
    """
    Check if a user exists.

    Parameters
    ----------
    username : str
        The name of the user to check for existence.

    Returns
    -------
    bool
        `True` if the user exists, otherwise `False`.

    Notes
    -----
    This function is case-sensitive. For case-insensitive checks, convert both
    the username and the keys to lower or upper case before comparison.
    """
    return username in self.users

get_user Link

get_user(username)

Retrieve a User object based on the provided username.

Parameters:

Name Type Description Default
username str

The unique identifier for the user to be retrieved.

required

Returns:

Type Description
Union[User, None]

An instance of the User class representing the requested user. If it does not exist, None is returned.

Source code in plantdb/commons/fsdb/auth.py
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
def get_user(self, username: str) -> Union[User, None]:
    """
    Retrieve a User object based on the provided username.

    Parameters
    ----------
    username : str
        The unique identifier for the user to be retrieved.

    Returns
    -------
    Union[User, None]
        An instance of the User class representing the requested user.
        If it does not exist, `None` is returned.
    """
    if not self.exists(username):
        self.logger.error(f"User '{username}' does not exist!")
        return None
    return self.users[username]

is_active Link

is_active(username)

Check whether a user account is active.

Parameters:

Name Type Description Default
username str

The name of the user account to check for activity status.

required

Returns:

Type Description
bool

True if the user account is active, False otherwise.

Source code in plantdb/commons/fsdb/auth.py
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
def is_active(self, username) -> bool:
    """
    Check whether a user account is active.

    Parameters
    ----------
    username : str
        The name of the user account to check for activity status.

    Returns
    -------
    bool
        ``True`` if the user account is active, ``False`` otherwise.
    """
    user = self.get_user(username)
    if not user.is_active:
        self.logger.warning(f"Account {username} is not active.")
    return user.is_active

is_locked_out Link

is_locked_out(username)

Check if an account is locked.

Parameters:

Name Type Description Default
username str

The username of the account to check.

required

Returns:

Type Description
bool

True if the account is locked, False otherwise.

Source code in plantdb/commons/fsdb/auth.py
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
def is_locked_out(self, username) -> bool:
    """
    Check if an account is locked.

    Parameters
    ----------
    username : str
        The username of the account to check.

    Returns
    -------
    bool
        ``True`` if the account is locked, ``False`` otherwise.
    """
    user = self.get_user(username)
    is_locked = user._is_locked_out()
    if is_locked:
        self.logger.info(f"Account {user} is locked, try logging in after {user.locked_until}.")
    return is_locked

unlock_user Link

unlock_user(user)

Unlock a specified user.

Parameters:

Name Type Description Default
username User

The user to unlock.

required
Source code in plantdb/commons/fsdb/auth.py
935
936
937
938
939
940
941
942
943
944
945
946
def unlock_user(self, user: User) -> None:
    """
    Unlock a specified user.

    Parameters
    ----------
    username : plantdb.commons.fsdb.auth.User
        The user to unlock.
    """
    user.locked_until = None
    self._save_users()
    return

update_password Link

update_password(username, password, new_password)

Update the password of an existing user.

Parameters:

Name Type Description Default
username str

The name of the user whose password is to be updated.

required
password str

The current password of the user to verify their identity.

required
new_password str

The new password to set for the user. If None, no change will occur.

required
Source code in plantdb/commons/fsdb/auth.py
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
def update_password(self, username: str, password: str, new_password: str) -> None:
    """
    Update the password of an existing user.

    Parameters
    ----------
    username : str
        The name of the user whose password is to be updated.
    password : str
        The current password of the user to verify their identity.
    new_password : str
        The new password to set for the user. If None, no change will occur.
    """
    # Verify if the login exists
    try:
        assert self.exists(username)
    except AssertionError:
        self.logger.error(f"User '{username}' does not exists!")
        return

    if not self.validate(username, password):
        return

    user = self.get_user(username)
    timestamp = datetime.now()  # Get the current timestamp for tracking user creation time.
    if new_password:
        user.password = self._hash_password(new_password)
        user.password_last_change = timestamp

    self._save_users()
    self.logger.info(f"Password updated for user '{username}'...")
    return

validate Link

validate(username, password)

Validate the user credentials.

Parameters:

Name Type Description Default
username str

The username provided by the user attempting to log in.

required
password str

The password provided by the user attempting to log in.

required

Returns:

Type Description
bool

True if the login attempt is successful, False otherwise.

Source code in plantdb/commons/fsdb/auth.py
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
def validate(self, username: str, password: str) -> bool:
    """Validate the user credentials.

    Parameters
    ----------
    username : str
        The username provided by the user attempting to log in.
    password : str
        The password provided by the user attempting to log in.

    Returns
    -------
    bool
        ``True`` if the login attempt is successful, ``False`` otherwise.
    """
    if not self.exists(username):
        return False
    if not self.is_active(username):
        return False
    if self.is_locked_out(username):
        return False

    user = self.get_user(username)
    if self.validate_user_password(username, password):
        # Reset failed attempts on successful login
        user.failed_attempts = 0
        user.last_login = datetime.now()
        self._save_users()
        return True
    else:
        # Record a failed attemp
        self._record_failed_attempt(username, self.max_login_attempts, self.lockout_duration)
        self.logger.error(f"Invalid credentials for user '{username}'")
        return False

validate_user_password Link

validate_user_password(username, password)

Validate a user's password.

This function checks if the provided plaintext password matches the hashed password stored for the given username.

Parameters:

Name Type Description Default
username str

The username of the user.

required
password str

The plain-text password to validate.

required

Returns:

Type Description
bool

True if the password is valid, False otherwise.

Source code in plantdb/commons/fsdb/auth.py
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
def validate_user_password(self, username: str, password: str) -> bool:
    """
    Validate a user's password.

    This function checks if the provided plaintext password matches the hashed password stored for the given username.

    Parameters
    ----------
    username : str
        The username of the user.
    password : str
        The plain-text password to validate.

    Returns
    -------
    bool
        ``True`` if the password is valid, ``False`` otherwise.

    """
    hash = self.get_user(username).password_hash
    try:
        # Verify password, raises exception if wrong.
        ph.verify(hash, password)
    except Exception as e:
        self.logger.error(f"Failed to verify password for {username}: {e}")
        return False
    else:
        return True

requires_permission Link

requires_permission(required_permissions)

Decorator to check if the specified user has the required permission(s).

Parameters:

Name Type Description Default
required_permission

A single permission string or list of permission that the user must have to access the decorated method.

required
Source code in plantdb/commons/fsdb/auth.py
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
def requires_permission(required_permissions: Union[Permission, List[Permission]]):
    """
    Decorator to check if the specified user has the required permission(s).

    Parameters
    ----------
    required_permission:  Union[Permission, List[Permission]]
        A single permission string or list of permission that the user must have to access the decorated method.
    """

    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(self, requesting_user: User, *args, **kwargs) -> Any:
            # Ensure required_permissions is always a list
            perms_to_check = (
                required_permissions if isinstance(required_permissions, list)
                else [required_permissions]
            )

            # Get user's permissions (assuming you are calling from a class who as a `_get_user_permissions` method)
            user_permissions = self.get_user_permissions(requesting_user)

            # Check if the requesting user has all the required permissions
            has_permission = all(perm in user_permissions for perm in perms_to_check)

            if not has_permission:
                perm_names = [perm.name if hasattr(perm, 'name') else str(perm) for perm in perms_to_check]
                raise PermissionError(
                    f"User '{requesting_user.username}' does not have required permission(s): {', '.join(perm_names)}"
                )

            return func(self, requesting_user, *args, **kwargs)

        return wrapper

    return decorator