Skip to content

session

Session Management

Provides a flexible session management system supporting both standard and JWT-based sessions. It handles session creation, validation, expiration, concurrency limits, and refresh logic, making it suitable for web applications and database connections.

Key Features
  • Centralized session store with expiration tracking
  • Support for plain token and JSON Web Tokens with standard claims
  • Concurrency control (max concurrent sessions)
  • Automatic cleanup of expired sessions
  • Session refresh mechanism to extend validity
Usage Examples

from plantdb.commons.auth.session import JWTSessionManager manager = JWTSessionManager(session_timeout=1800, secret_key='my_secret') token = manager.create_session('alice') user_info = manager.validate_session(token) print(user_info) {'username': 'alice', 'issued_at': 1769011058, 'expires_at': 1769012858, 'jti': 'HVaAR4XHmIJgCKbZMDqmwg', 'issuer': 'plantdb-api', 'audience': 'plantdb-client'} new_token = manager.refresh_session(token)

AccessTokenNotFoundError Link

Bases: SessionValidationError

Raised when an access token isn’t present in the active‑session store.

InvalidTokenProcessingError Link

Bases: SessionValidationError

Raised for unexpected errors while processing a token (e.g. decoding issues).

JWTSessionManager Link

Bases: SessionManager

Manage JWT-based user sessions with configurable timeouts and concurrency limits.

This session manager extends SessionManager by issuing JSON Web Tokens (JWT) for authentication. An access token is short‑lived and is used for authorizing API calls, while a refresh token is long‑lived and can be exchanged for a new access token when the original expires. An api token is long‑lived and adds dataset-specific rights to the token with a custom datasets claims. The manager keeps track of active access tokens to enforce a maximum number of concurrent sessions per application instance. Tokens are signed with a secret key that is either supplied by the caller or generated automatically. All tokens conform to RFC7519 and contain the standard registered claims (iss, sub, aud, exp, iat, jti) plus a custom type claim that identifies the token as 'access', 'api' or 'refresh'.

Attributes:

Name Type Description
sessions Dict[str, dict]

A dictionary storing active sessions. Each key is a session ID, and each value is a dictionary containing: - 'username': str - The user associated with this session. - 'created_at': datetime - When the session was created. - 'last_accessed': datetime - Last time the session was accessed. - 'expires_at': datetime - Expiry time of the session.

session_timeout int

Duration in seconds after which a session expires. The default value (900) corresponds to 15 minutes.

max_concurrent_sessions int

The maximum number of concurrent sessions to allow.

logger Logger

The logger to use for this session manager.

refresh_timeout int

Lifetime of a refresh token in seconds. The default value (86400) corresponds to 24 hours.

secret_key str | bytes | None

Secret used for HS512 signing of JWTs. If None is passed to the constructor, a random 64‑byte key is generated. This will break the API tokens persistence across restarts as, after a restart, the new instance gets a different secret key, so all previously persisted API tokens in the file become unverifiable.

refresh_tokens dict

Mapping from refresh token identifier (jti) to a dictionary containing username, creation and expiration timestamps, token type and the associated access token identifier. Used to validate and rotate refresh tokens.

_lock Lock

A locking mechanism to lock self.session dict for thread‑safe changes

api_token_dir str or Path

Directory where the api_token.txt file will be stored.

Manage user sessions with timeout.

Parameters:

Name Type Description Default

session_timeout Link

int

The duration for which the access token should be valid in seconds. Defaults to 900 seconds (15 minutes).

900

refresh_timeout Link

int

The duration for which the refresh token should be valid in seconds. Defaults to 86400 seconds (24 hours).

86400

max_concurrent_sessions Link

int

The maximum number of concurrent sessions to allow. Defaults to 10.

10

secret_key Link

str | bytes | None

Secret used for HS512 signing of JWTs. - If a bytes object is supplied, it must be ≥ 64 bytes. - If a str (pass‑phrase) is supplied, it will be stretched with Argon2 to produce a 64‑byte key. - If None (default) a fresh random 64‑byte key is generated. This will break the API tokens persistence across restarts as, after a restart, the new instance gets a different secret key, so all previously persisted API tokens in the file become unverifiable.

None

leeway Link

int

Allowed leeway, in seconds, after tokens expiration date, to accommodate for clock-skew. Set it to 0 so that the token is considered expired immediately after its exp claim passes. Defaults to 2.

2

api_token_dir Link

str or Path

Directory where the api_token.txt file will be stored. Defaults to the system temporary directory.

gettempdir()
Source code in plantdb/commons/auth/session.py
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
def __init__(self, session_timeout: int = 900, refresh_timeout: int = 86400, max_concurrent_sessions: int = 10,
             secret_key: Union[str, bytes] = None, leeway: int = 2, api_token_dir=gettempdir()):
    """Manage user sessions with timeout.

    Parameters
    ----------
    session_timeout : int, optional
        The duration for which the access token should be valid in seconds.
        Defaults to ``900`` seconds (15 minutes).
    refresh_timeout : int, optional
        The duration for which the refresh token should be valid in seconds.
        Defaults to ``86400`` seconds (24 hours).
    max_concurrent_sessions : int, optional
        The maximum number of concurrent sessions to allow.
        Defaults to ``10``.
    secret_key : str | bytes | None
        Secret used for HS512 signing of JWTs.
        - If a ``bytes`` object is supplied, it must be ≥ 64 bytes.
        - If a ``str`` (pass‑phrase) is supplied, it will be stretched with Argon2 to produce a 64‑byte key.
        - If ``None`` (default) a fresh random 64‑byte key is generated.
          This will break the API tokens persistence across restarts as, after a restart, the new instance
          gets a different secret key, so all previously persisted API tokens in the file become unverifiable.
    leeway : int, optional
        Allowed leeway, in seconds, after tokens expiration date, to accommodate for clock-skew.
        Set it to `0` so that the token is considered expired immediately after its exp claim passes.
        Defaults to ``2``.
    api_token_dir : str or pathlib.Path, optional
        Directory where the ``api_token.txt`` file will be stored.
        Defaults to the system temporary directory.
    """
    super().__init__(session_timeout, max_concurrent_sessions)
    self.refresh_timeout = refresh_timeout
    self.leeway = leeway
    self.refresh_tokens = {}  # Track valid refresh tokens (jti -> session_info)
    self._lock = RLock()  # to lock `self.session` dict for thread‑safe changes

    self.secret_key = _init_secret_key(secret_key)
    self.api_token_file = Path(api_token_dir) / 'api_token.txt'
    self._api_tokens: dict[str, str] = {}  # jti -> ISO expiry string
    self._safe_secret_init(secret_key)

    # Load any existing API tokens from disk
    self._load_api_tokens()

    # ---- Start the daily clean‑up thread ----
    self._start_daily_cleanup_thread(hour=3, minute=0, tz=timezone.utc)

cleanup_expired_sessions Link

cleanup_expired_sessions()

Remove expired sessions from tracking.

Source code in plantdb/commons/auth/session.py
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
def cleanup_expired_sessions(self) -> None:
    """Remove expired sessions from tracking."""
    current_time = datetime.now(timezone.utc)

    with self._lock:
        expired_access = [
            jti for jti, session in self.sessions.items()
            if current_time > session['expires_at']
        ]
        for jti in expired_access:
            del self.sessions[jti]

        expired_refresh = [
            jti for jti, session in self.refresh_tokens.items()
            if current_time > session['expires_at']
        ]
        for jti in expired_refresh:
            del self.refresh_tokens[jti]

    return

create_api_token Link

create_api_token(username, token_exp=3600, datasets=None)

Generate a new API token for a user.

This method creates a time‑limited API token associated with the specified username. If token_exp is None the token will expire one hour from the moment of creation. An optional list of datasets can be provided to restrict the token's access scope. The generated token is persisted via the internal storage mechanism.

Parameters:

Name Type Description Default

username Link

str

Identifier of the user for whom the token is being created.

required

token_exp Link

If a string, there should be an iso-formatted expiration date. If an integer, act as an expiration interval in seconds. Use of a datetime is possible. By default, use None to set the default one‑hour lifetime.

3600

datasets Link

Optional collection of dataset identifiers that the token should be allowed to access.

None

Returns:

Type Description
str

The newly created API token.

Notes

The token includes a unique identifier generated with secrets and is timestamped using UTC. The internal helper methods handle token assembly and storage, ensuring consistency across calls.

Examples:

>>> from plantdb.commons.auth.session import JWTSessionManager
>>> from plantdb.commons.auth.models import Permission
>>> manager = JWTSessionManager()
>>> api_token = manager.create_api_token('batman', datasets={'joker': [Permission.DELETE]})
>>> print(manager.api_token_file)
>>> with open(manager.api_token_file, 'rb') as f: print(f.read())
Source code in plantdb/commons/auth/session.py
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
def create_api_token(self, username: str, token_exp=3600, datasets=None) -> str:
    """Generate a new API token for a user.

    This method creates a time‑limited API token associated with the specified ``username``.
    If ``token_exp`` is ``None`` the token will expire one hour from the moment of creation.
    An optional list of ``datasets`` can be provided to restrict the token's access scope.
    The generated token is persisted via the internal storage mechanism.

    Parameters
    ----------
    username
        Identifier of the user for whom the token is being created.
    token_exp
        If a string, there should be an iso-formatted expiration date.
        If an integer, act as an expiration interval in seconds.
        Use of a datetime is possible.
        By default, use ``None`` to set the default one‑hour lifetime.
    datasets
        Optional collection of dataset identifiers that the token should be allowed to access.

    Returns
    -------
    str
        The newly created API token.

    Notes
    -----
    The token includes a unique identifier generated with ``secrets`` and is timestamped using UTC.
    The internal helper methods handle token assembly and storage, ensuring consistency across calls.

    Examples
    --------
    >>> from plantdb.commons.auth.session import JWTSessionManager
    >>> from plantdb.commons.auth.models import Permission
    >>> manager = JWTSessionManager()
    >>> api_token = manager.create_api_token('batman', datasets={'joker': [Permission.DELETE]})
    >>> print(manager.api_token_file)
    >>> with open(manager.api_token_file, 'rb') as f: print(f.read())
    """
    now = datetime.now(timezone.utc)
    # Set a default token expiration date
    if token_exp is None:
        token_exp = 3600

    # Convert integer or string representations
    if isinstance(token_exp, int):
        token_exp = now + timedelta(seconds=token_exp)
    elif isinstance(token_exp, str):
        token_exp = datetime.fromisoformat(token_exp)

    # Check the validity of the expiration date
    if isinstance(token_exp, datetime):
        if token_exp <= now:
            self.logger.error(f"Token expiration date ({token_exp}) is in the past.")
            return ""
    else:
        self.logger.error(f"Token expiration date ({token_exp}) is not valid.")
        return ""

    token_jti = secrets.token_urlsafe(16)
    # Create the token:
    api_token = self._create_token(username, token_jti, token_exp, now, "api", datasets=datasets)

    # Register the token by jti:
    self._dump_api_token(token_jti, token_exp)

    return api_token

create_session Link

create_session(username)

Create a new session for a user.

If the user already has an active session, it returns the existing session ID. Otherwise, it creates a new session and returns its ID.

Parameters:

Name Type Description Default

username Link

str

The unique identifier of the user for whom to create a session.

required

Returns:

Type Description
Tuple[str, str] or None

A tuple containing (access_token, refresh_token) if successful, None otherwise.

Notes

Creates JSON Web Tokens following RFC 7519 standards with registered claims: - iss (issuer): Identifies the token issuer - sub (subject): The username of the authenticated user - aud (audience): Intended audience for the token - exp (expiration time): Token expiration timestamp - iat (issued at): Token creation timestamp - jti (JWT ID): Unique identifier for the token generated using secrets.token_urlsafe.

Source code in plantdb/commons/auth/session.py
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
def create_session(self, username: str) -> Union[Tuple[str, str], None]:
    """Create a new session for a user.

    If the user already has an active session, it returns the existing session ID.
    Otherwise, it creates a new session and returns its ID.

    Parameters
    ----------
    username : str
        The unique identifier of the user for whom to create a session.

    Returns
    -------
    Tuple[str, str] or None
        A tuple containing (access_token, refresh_token) if successful, ``None`` otherwise.

    Notes
    -----
    Creates JSON Web Tokens following RFC 7519 standards with registered claims:
    - iss (issuer): Identifies the token issuer
    - sub (subject): The username of the authenticated user
    - aud (audience): Intended audience for the token
    - exp (expiration time): Token expiration timestamp
    - iat (issued at): Token creation timestamp
    - jti (JWT ID): Unique identifier for the token generated using `secrets.token_urlsafe`.
    """
    if self.n_active_sessions() >= self.max_concurrent_sessions:
        self.logger.warning(
            f"Too many users currently active, reached max concurrent sessions limit ({self.max_concurrent_sessions})")
        return None

    # Create an access token
    now = datetime.now(timezone.utc)
    access_exp = now + timedelta(seconds=self.session_timeout)
    access_jti = secrets.token_urlsafe(16)

    # Create a refresh token
    refresh_exp = now + timedelta(seconds=self.refresh_timeout)
    refresh_jti = secrets.token_urlsafe(16)

    try:
        # Generate JSON Web Tokens
        access_token = self._create_token(username, access_jti, access_exp, now, token_type='access')
        refresh_token = self._create_token(username, refresh_jti, refresh_exp, now, token_type='refresh')
    except Exception as e:
        self.logger.error(f"Failed to create JSON Web Tokens for {username}: {e}")
        return None

    with self._lock:
        # Track access session for concurrent‑limit enforcement
        self.sessions[access_jti] = {
            'username': username,
            'created_at': now,
            'last_accessed': now,
            'expires_at': access_exp,
            'type': 'access'
        }

        # Track refresh token
        self.refresh_tokens[refresh_jti] = {
            'username': username,
            'created_at': now,
            'expires_at': refresh_exp,
            'type': 'refresh',
            'access_jti': access_jti
        }

    self.logger.debug(f"Created session for '{username}'")
    return access_token, refresh_token

has_logged_user Link

has_logged_user()

Check if there is at least one active (logged‑in) user.

This method first cleans up any expired sessions and then determines whether the internal sessions dictionary contains any entries.

Returns:

Type Description
bool

True if at least one user has an active session, False otherwise.

Source code in plantdb/commons/auth/session.py
244
245
246
247
248
249
250
251
252
253
254
255
256
def has_logged_user(self) -> bool:
    """Check if there is at least one active (logged‑in) user.

    This method first cleans up any expired sessions and then determines
    whether the internal ``sessions`` dictionary contains any entries.

    Returns
    -------
    bool
        ``True`` if at least one user has an active session, ``False`` otherwise.
    """
    self.cleanup_expired_sessions()
    return len(self.sessions) > 0

invalidate_api_token Link

invalidate_api_token(token)

Revoke an API token, removing it from memory and the persistent file.

Parameters:

Name Type Description Default

token Link

str

The full JWT string of the API token to revoke.

required

Returns:

Type Description
bool

True if the token was found and removed, False otherwise.

Source code in plantdb/commons/auth/session.py
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
def invalidate_api_token(self, token: str) -> bool:
    """Revoke an API token, removing it from memory and the persistent file.

    Parameters
    ----------
    token : str
        The full JWT string of the API token to revoke.

    Returns
    -------
    bool
        ``True`` if the token was found and removed, ``False`` otherwise.
    """
    try:
        payload = self._payload_from_token(token)
    except jwt.ExpiredSignatureError:
        # Token is expired, but we still want to clean it up from storage
        # Decode without verification to extract the jti
        payload = jwt.decode(
            token,
            self.secret_key,
            algorithms=['HS512'],
            audience='plantdb-client',
            issuer='plantdb-api',
            options={"verify_exp": False},
            leeway=self.leeway
        )
    except (jwt.InvalidTokenError, Exception) as e:
        self.logger.error(f"Failed to decode API token for invalidation: {e}")
        return False

    if payload.get('type') != 'api':
        self.logger.warning("Token is not an API token, use invalidate_session() instead")
        return False

    jti = payload.get('jti')
    if not jti:
        return False

    with self._lock:
        if jti in self._api_tokens:
            del self._api_tokens[jti]
            self._save_api_tokens()
            self.logger.debug(f"Invalidated API token jti={jti}")
            return True

    self.logger.warning(f"API token jti={jti} not found in active tokens")
    return False

invalidate_session Link

invalidate_session(token=None, jti=None)

Invalidate a session by removing it from tracking.

Parameters:

Name Type Description Default

token Link

str

JSON Web Token to invalidate

None

jti Link

str

Token ID to invalidate directly

None

Returns:

Type Description
bool

True if the specified session was found and removed, False otherwise.

str

The username corresponding to the invalidated JSON Web Token

Source code in plantdb/commons/auth/session.py
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
def invalidate_session(self, token: str = None, jti: str = None) -> Tuple[bool, str | None]:
    """Invalidate a session by removing it from tracking.

    Parameters
    ----------
    token : str, optional
        JSON Web Token to invalidate
    jti : str, optional
        Token ID to invalidate directly

    Returns
    -------
    bool
        `True` if the specified session was found and removed, `False` otherwise.
    str
        The username corresponding to the invalidated JSON Web Token
    """
    if token:
        try:
            payload = self._payload_from_token(token)
            jti = payload.get('jti')
            token_type = payload.get('type')
        except jwt.PyJWTError as e:
            self.logger.error(f"Failed to decode token for invalidation: {e}")
            return False, None
        except KeyError as e:
            self.logger.error(f"Failed to access payload key: {e}")
            return False, None
    else:
        # If jti is provided, we need to know its type or check both
        token_type = None

    with self._lock:
        if token_type == 'access' or token_type is None:
            if jti and jti in self.sessions:
                username = self.sessions[jti]['username']
                del self.sessions[jti]
                # Also invalidate the linked refresh token if any
                refresh_jtis = [rj for rj, rs in self.refresh_tokens.items() if rs.get('access_jti') == jti]
                for rj in refresh_jtis:
                    del self.refresh_tokens[rj]
                return True, username

        if token_type == 'refresh' or token_type is None:
            if jti and jti in self.refresh_tokens:
                username = self.refresh_tokens[jti]['username']
                # Optionally invalidate the linked access token?
                # Usually we just invalidate the refresh token.
                del self.refresh_tokens[jti]
                return True, username

    return False, None

n_active_sessions Link

n_active_sessions()

Returns the number of active sessions.

Cleans up expired sessions before counting and returns the number of remaining active sessions in the collection.

Returns:

Type Description
int

The number of currently active sessions.

See Also

cleanup_expired_sessions : Cleans up the expired sessions in the collection.

Source code in plantdb/commons/auth/session.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def n_active_sessions(self) -> int:
    """Returns the number of active sessions.

    Cleans up expired sessions before counting and returns the number
    of remaining active sessions in the collection.

    Returns
    -------
    int
        The number of currently active sessions.

    See Also
    --------
    cleanup_expired_sessions : Cleans up the expired sessions in the collection.
    """
    self.cleanup_expired_sessions()
    return len(self.sessions)

refresh_session Link

refresh_session(refresh_token)

Refresh a session using a valid refresh token.

Parameters:

Name Type Description Default

refresh_token Link

str

The refresh token to use.

required

Returns:

Type Description
Tuple[str, str]

A tuple containing (new_access_token, new_refresh_token) if successful.

Source code in plantdb/commons/auth/session.py
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
def refresh_session(self, refresh_token: str) -> Tuple[str, str]:
    """Refresh a session using a valid refresh token.

    Parameters
    ----------
    refresh_token : str
        The refresh token to use.

    Returns
    -------
    Tuple[str, str]
        A tuple containing (new_access_token, new_refresh_token) if successful.
    """
    # Validate the refresh token - will raise if the refresh token is revoked or malformed
    session_data = self.validate_session(refresh_token)

    username = session_data['username']
    old_refresh_jti = session_data['jti']
    old_access_jti = self.refresh_tokens[old_refresh_jti].get('access_jti')
    # Invalidate old tokens (Rotation)
    self.invalidate_session(jti=old_refresh_jti)
    if old_access_jti:
        self.invalidate_session(jti=old_access_jti)

    # Create a new session (new access + new refresh)
    return self.create_session(username)

session_token Link

session_token(username)

Retrieve the active session token, if any, for a given username.

This method cleans up any expired sessions first and then searches the internal sessions attribute dictionary for a session belonging to the supplied username.

Parameters:

Name Type Description Default

username Link

str

The username whose session ID is requested.

required

Returns:

Type Description
Optional[str]

The session ID associated with username if an active session exists; otherwise, None.

Source code in plantdb/commons/auth/session.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def session_token(self, username) -> Optional[str]:
    """Retrieve the active session token, if any, for a given username.

    This method cleans up any expired sessions first and then searches the internal
    ``sessions`` attribute dictionary for a session belonging to the supplied username.

    Parameters
    ----------
    username : str
        The username whose session ID is requested.

    Returns
    -------
    Optional[str]
        The session ID associated with `username` if an active session exists; otherwise, ``None``.
    """
    self.cleanup_expired_sessions()
    if username:
        for session_id, session in self.sessions.items():
            if session['username'] == username:
                return session_id
    return None

session_username Link

session_username(token)

Extract username from JSON Web Token.

Parameters:

Name Type Description Default

token Link

str

Current JSON Web Token.

required

Returns:

Type Description
str or None

The corresponding username if the token is valid.

Source code in plantdb/commons/auth/session.py
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
def session_username(self, token: str) -> Optional[str]:
    """Extract username from JSON Web Token.

    Parameters
    ----------
    token : str
        Current JSON Web Token.

    Returns
    -------
    str or None
        The corresponding username if the token is valid.
    """
    try:
        session_data = self.validate_session(token)
    except SessionValidationError as e:
        self.logger.warning(f"Provided session does not exist: {e}")
        return None
    return session_data['username']

validate_session Link

validate_session(token)

Validate a JSON Web Token and return user information.

Parameters:

Name Type Description Default

token Link

str

The JSON Web Token to validate.

required

Returns:

Type Description
dict or None

User information if valid, None if invalid/expired.

  • username: The authenticated user
  • issued_at: When the token was issued
  • expires_at: When the token expires
  • jti: Unique token identifier
  • issuer: Token issuer
  • audience: Token audience
  • type: Token type
Source code in plantdb/commons/auth/session.py
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
def validate_session(self, token: str) -> Optional[Dict[str, Any]]:
    """Validate a JSON Web Token and return user information.

    Parameters
    ----------
    token : str
        The JSON Web Token to validate.

    Returns
    -------
    dict or None
        User information if valid, ``None`` if invalid/expired.

        - username: The authenticated user
        - issued_at: When the token was issued
        - expires_at: When the token expires
        - jti: Unique token identifier
        - issuer: Token issuer
        - audience: Token audience
        - type: Token type
    """
    # Decode and verify JSON Web Token with proper validation
    try:
        payload = self._payload_from_token(token)
    except jwt.ExpiredSignatureError as e:
        self.logger.error(f"JSON Web Token expired")
        raise SessionValidationError(e) from e
    except jwt.InvalidAudienceError as e:
        self.logger.error("JSON Web Token has invalid audience")
        raise SessionValidationError(e) from e
    except jwt.InvalidIssuerError as e:
        self.logger.error("JSON Web Token has invalid issuer")
        raise SessionValidationError(e) from e
    except jwt.InvalidTokenError as e:
        self.logger.error(f"Invalid JSON Web Token: {e}")
        raise SessionValidationError(e) from e
    except Exception as e:
        self.logger.error(f"Error validating JSON Web Token: {e}")
        raise InvalidTokenProcessingError(e) from e

    jti = payload.get('jti')
    token_type = payload.get('type')

    # Verify it's in our tracking list
    if token_type == TokenType.ACCESS:
        if jti not in self.sessions:
            self.logger.error("Access token not found in active sessions")
            raise AccessTokenNotFoundError(f"Access token jti={jti} not found")
        # Update last accessed time
        with self._lock:
            self.sessions[jti]['last_accessed'] = datetime.now(timezone.utc)
    elif token_type == TokenType.REFRESH:
        if jti not in self.refresh_tokens:
            self.logger.error("Refresh token not found in active refresh tokens")
            raise RefreshTokenNotFoundError(f"Refresh token jti={jti} not found")
    elif token_type == TokenType.API:
        # Check active API tokens in the file
        if not self._is_api_token_active(jti):
            self.logger.error("API token not found in token store or has expired")
            raise SessionValidationError(f"API token jti={jti} is not active")
    else:
        WrongTokenType(
            f"Unsupported token_type '{token_type}'. "
            f"Supported values are {[t.value for t in TokenType]}."
        )

    payload_dict = {
        'username': payload['sub'],  # subject is the username
        'issued_at': payload['iat'],  # issued at timestamp
        'expires_at': payload['exp'],  # expiration timestamp
        'jti': jti,  # JWT ID
        'issuer': payload['iss'],  # issuer
        'audience': payload['aud'],  # audience
        'type': payload.get('type')  # type of token, 'access', 'api' or 'refresh'
    }
    # Update the payload with the dataset permissions
    if 'datasets' in payload:
        payload_dict['datasets'] = parse_dataset_perm(payload['datasets'])
    return payload_dict

NoAuthSessionManager Link

NoAuthSessionManager(session_timeout=3600)

Bases: SessionManager

A session manager for testing where every request is considered authenticated as the built‑in admin user.

  • The admin token is created once at construction and reused.
  • validate_session always succeeds (refreshes the token only when it has expired).
  • Suitable for use in plantdb.commons.fsdb.core.FSDB tests and docstring examples where security is irrelevant.

Examples:

>>> from plantdb.commons.auth.session import NoAuthSessionManager
>>> noauth_sm = NoAuthSessionManager()
>>> token = noauth_sm.admin_token()
>>> noauth_sm.validate_session(token)['username']
'admin'
>>> # Any request that expects a session manager can now receive `noauth_sm` without worrying about authentication.
>>> from plantdb.commons.test_database import setup_test_database
>>> from plantdb.commons.fsdb.core import FSDB
>>> db_path = setup_test_database('real_plant')
>>> db = FSDB(db_path, session_manager=NoAuthSessionManager())
>>> db.connect()
>>> scan = db.get_scan('real_plant')  # no need to log in to access the dataset
>>> scan.set_metadata('test', "No authentication required to write stuff!")
>>> print(scan.get_metadata('test'))
No authentication required to write stuff!
Source code in plantdb/commons/auth/session.py
553
554
555
556
557
558
559
560
561
def __init__(self, session_timeout: int = 3600) -> None:
    # Abort if environment explicitly disables NoAuth mode.
    if os.environ.get('ROMI_DB_NOAUTH', 1) == 0:
        raise RuntimeError('Unable to use NoAuthSessionManager, forbidden by environment variable!')
    # Initialize base SessionManager with single‑session limit.
    super().__init__(session_timeout=session_timeout, max_concurrent_sessions=1)
    self._admin_username = "admin"  # Fixed admin username.
    # Create the admin session eagerly.
    self._admin_token = super().create_session(self._admin_username)

admin_token Link

admin_token()

Return the (static) admin token.

Source code in plantdb/commons/auth/session.py
566
567
568
def admin_token(self) -> str | None:
    """Return the (static) admin token."""
    return self._admin_token

cleanup_expired_sessions Link

cleanup_expired_sessions()

Remove expired sessions from the session dictionary.

This method iterates through all stored sessions and deletes any that have an expiration time earlier than the current time.

Notes

This function modifies the self.sessions dictionary in-place.

Source code in plantdb/commons/auth/session.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
def cleanup_expired_sessions(self) -> None:
    """Remove expired sessions from the session dictionary.

    This method iterates through all stored sessions and deletes any that have
    an expiration time earlier than the current time.

    Notes
    -----
    This function modifies the `self.sessions` dictionary in-place.
    """
    current_time = datetime.now(timezone.utc)
    expired_sessions = [
        sid for sid, session in self.sessions.items()
        if current_time > session['expires_at']
    ]
    for sid in expired_sessions:
        del self.sessions[sid]
    return

create_session Link

create_session(username=None)

Ignore the supplied username and always return the pre‑created admin token.

Source code in plantdb/commons/auth/session.py
573
574
575
def create_session(self, username: str | None = None) -> str | None:
    """Ignore the supplied `username` and always return the pre‑created admin token."""
    return self._admin_token

has_logged_user Link

has_logged_user()

Check if there is at least one active (logged‑in) user.

This method first cleans up any expired sessions and then determines whether the internal sessions dictionary contains any entries.

Returns:

Type Description
bool

True if at least one user has an active session, False otherwise.

Source code in plantdb/commons/auth/session.py
244
245
246
247
248
249
250
251
252
253
254
255
256
def has_logged_user(self) -> bool:
    """Check if there is at least one active (logged‑in) user.

    This method first cleans up any expired sessions and then determines
    whether the internal ``sessions`` dictionary contains any entries.

    Returns
    -------
    bool
        ``True`` if at least one user has an active session, ``False`` otherwise.
    """
    self.cleanup_expired_sessions()
    return len(self.sessions) > 0

invalidate_session Link

invalidate_session(session_id)

Remove the given session identifier from the active sessions.

Parameters:

Name Type Description Default

session_id Link

str

The unique identifier of the session to be removed.

required

Returns:

Type Description
bool

True if the specified session was found and removed, False otherwise.

str | None

The username corresponding to the invalidated session

Notes

The session ID is removed from the internal session dictionary. If the session does not exist, this method has no effect.

Source code in plantdb/commons/auth/session.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
def invalidate_session(self, session_id: str) -> Tuple[bool, str | None]:
    """Remove the given session identifier from the active sessions.

    Parameters
    ----------
    session_id : str
        The unique identifier of the session to be removed.

    Returns
    -------
    bool
        ``True`` if the specified session was found and removed, ``False`` otherwise.
    str | None
        The username corresponding to the invalidated session

    Notes
    -----
    The session ID is removed from the internal session dictionary.
    If the session does not exist, this method has no effect.
    """
    if session_id in self.sessions:
        username = self.sessions[session_id]['username']
        del self.sessions[session_id]
        return True, username

    return False, None

n_active_sessions Link

n_active_sessions()

Returns the number of active sessions.

Cleans up expired sessions before counting and returns the number of remaining active sessions in the collection.

Returns:

Type Description
int

The number of currently active sessions.

See Also

cleanup_expired_sessions : Cleans up the expired sessions in the collection.

Source code in plantdb/commons/auth/session.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def n_active_sessions(self) -> int:
    """Returns the number of active sessions.

    Cleans up expired sessions before counting and returns the number
    of remaining active sessions in the collection.

    Returns
    -------
    int
        The number of currently active sessions.

    See Also
    --------
    cleanup_expired_sessions : Cleans up the expired sessions in the collection.
    """
    self.cleanup_expired_sessions()
    return len(self.sessions)

refresh_session Link

refresh_session(session_id)

Refresh a session if it's still valid.

Parameters:

Name Type Description Default

session_id Link

str

Current session token

required

Returns:

Type Description
str or None

New session token if refresh is successful

Source code in plantdb/commons/auth/session.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
def refresh_session(self, session_id: str) -> Optional[str]:
    """Refresh a session if it's still valid.

    Parameters
    ----------
    session_id : str
        Current session token

    Returns
    -------
    str or None
        New session token if refresh is successful
    """
    session_data = self.validate_session(session_id)
    if not session_data:
        return None

    # Invalidate old session
    self.invalidate_session(session_id)

    # Create a new session
    username = session_data['username']
    return self.create_session(username)

session_token Link

session_token(username)

Retrieve the active session token, if any, for a given username.

This method cleans up any expired sessions first and then searches the internal sessions attribute dictionary for a session belonging to the supplied username.

Parameters:

Name Type Description Default

username Link

str

The username whose session ID is requested.

required

Returns:

Type Description
Optional[str]

The session ID associated with username if an active session exists; otherwise, None.

Source code in plantdb/commons/auth/session.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def session_token(self, username) -> Optional[str]:
    """Retrieve the active session token, if any, for a given username.

    This method cleans up any expired sessions first and then searches the internal
    ``sessions`` attribute dictionary for a session belonging to the supplied username.

    Parameters
    ----------
    username : str
        The username whose session ID is requested.

    Returns
    -------
    Optional[str]
        The session ID associated with `username` if an active session exists; otherwise, ``None``.
    """
    self.cleanup_expired_sessions()
    if username:
        for session_id, session in self.sessions.items():
            if session['username'] == username:
                return session_id
    return None

session_username Link

session_username(session_id)

Always return admin for any session identifier.

Source code in plantdb/commons/auth/session.py
591
592
593
594
def session_username(self, session_id: str) -> str | None:
    """Always return ``admin`` for any session identifier."""
    # We bypass the parent implementation because it would look up the token in ``self.sessions``
    return self._admin_username

validate_session Link

validate_session(session_id)

Validate the stored admin token.

If it has expired, recreate it and return the fresh session information.

Source code in plantdb/commons/auth/session.py
577
578
579
580
581
582
583
584
585
586
587
588
589
def validate_session(self, session_id: str) -> dict | None:
    """
    Validate the stored admin token.

    If it has expired, recreate it and return the fresh session information.
    """
    # First try the existing token; ``super().validate_session`` also updates ``last_accessed``.
    session = super().validate_session(self._admin_token)
    if session is None:
        # Token is expired, so we generate a new one.
        self._admin_token = super().create_session(self._admin_username)
        session = super().validate_session(self._admin_token)
    return session

RefreshTokenNotFoundError Link

Bases: SessionValidationError

Raised when a refresh token isn’t present in the active‑refresh‑store.

SessionManager Link

SessionManager(session_timeout=3600, max_concurrent_sessions=10)

Manages user sessions with expiration and validation.

This class provides methods to create, validate, invalidate, and cleanup expired sessions. Each session is associated with a unique identifier (session_id) and has an expiry time based on the session timeout duration specified during initialization.

Attributes:

Name Type Description
sessions Dict[str, dict]

A dictionary storing active sessions. Each key is a session ID, and each value is a dictionary containing: - 'username': str - The user associated with this session. - 'created_at': datetime - When the session was created. - 'last_accessed': datetime - Last time the session was accessed. - 'expires_at': datetime - Expiry time of the session.

session_timeout int

Duration in seconds after which a session expires.

max_concurrent_sessions int

The maximum number of concurrent sessions to allow.

logger Logger

The logger to use for this session manager.

Manage user sessions with timeout.

Parameters:

Name Type Description Default

session_timeout Link

int

The duration for which the session should be valid in seconds. A session that exceeds this duration will be considered expired and removed. Defaults to 3600 seconds.

3600

max_concurrent_sessions Link

int

The maximum number of concurrent sessions to allow. Defaults to 10.

10
Source code in plantdb/commons/auth/session.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def __init__(self, session_timeout: int = 3600, max_concurrent_sessions: int = 10):
    """Manage user sessions with timeout.

    Parameters
    ----------
    session_timeout : int, optional
        The duration for which the session should be valid in seconds.
        A session that exceeds this duration will be considered expired and removed.
        Defaults to ``3600`` seconds.
    max_concurrent_sessions : int, optional
        The maximum number of concurrent sessions to allow.
        Defaults to ``10``.
    """
    self.sessions: Dict[str, dict] = {}
    self.session_timeout = session_timeout
    self.max_concurrent_sessions = max_concurrent_sessions

    self.logger = get_logger(__class__.__name__)

cleanup_expired_sessions Link

cleanup_expired_sessions()

Remove expired sessions from the session dictionary.

This method iterates through all stored sessions and deletes any that have an expiration time earlier than the current time.

Notes

This function modifies the self.sessions dictionary in-place.

Source code in plantdb/commons/auth/session.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
def cleanup_expired_sessions(self) -> None:
    """Remove expired sessions from the session dictionary.

    This method iterates through all stored sessions and deletes any that have
    an expiration time earlier than the current time.

    Notes
    -----
    This function modifies the `self.sessions` dictionary in-place.
    """
    current_time = datetime.now(timezone.utc)
    expired_sessions = [
        sid for sid, session in self.sessions.items()
        if current_time > session['expires_at']
    ]
    for sid in expired_sessions:
        del self.sessions[sid]
    return

create_session Link

create_session(username)

Create a new session for a user.

If the user already has an active session, it returns the existing session ID. Otherwise, it creates a new session and returns its ID.

Parameters:

Name Type Description Default

username Link

str

The unique identifier of the user for whom to create a session.

required

Returns:

Type Description
str | None

The ID of the created or existing session.

Notes

The session ID is a token generated using secrets.token_urlsafe. The session data includes the user ID, creation timestamp, last accessed timestamp, and expiration timestamp.

Source code in plantdb/commons/auth/session.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def create_session(self, username: str) -> str | None:
    """Create a new session for a user.

    If the user already has an active session, it returns the existing session ID.
    Otherwise, it creates a new session and returns its ID.

    Parameters
    ----------
    username : str
        The unique identifier of the user for whom to create a session.

    Returns
    -------
    str | None
        The ID of the created or existing session.

    Notes
    -----
    The session ID is a token generated using `secrets.token_urlsafe`.
    The session data includes the user ID, creation timestamp, last accessed timestamp, and expiration timestamp.
    """
    if self.n_active_sessions() >= self.max_concurrent_sessions:
        self.logger.warning(
            f"Reached max concurrent sessions limit ({self.max_concurrent_sessions})")
        return None

    now = datetime.now(timezone.utc)
    exp_time = now + timedelta(seconds=self.session_timeout)
    # Create a session token
    session_token = secrets.token_urlsafe(32)

    self.sessions[session_token] = {
        'username': username,
        'created_at': now,
        'last_accessed': now,
        'expires_at': exp_time
    }
    return session_token

has_logged_user Link

has_logged_user()

Check if there is at least one active (logged‑in) user.

This method first cleans up any expired sessions and then determines whether the internal sessions dictionary contains any entries.

Returns:

Type Description
bool

True if at least one user has an active session, False otherwise.

Source code in plantdb/commons/auth/session.py
244
245
246
247
248
249
250
251
252
253
254
255
256
def has_logged_user(self) -> bool:
    """Check if there is at least one active (logged‑in) user.

    This method first cleans up any expired sessions and then determines
    whether the internal ``sessions`` dictionary contains any entries.

    Returns
    -------
    bool
        ``True`` if at least one user has an active session, ``False`` otherwise.
    """
    self.cleanup_expired_sessions()
    return len(self.sessions) > 0

invalidate_session Link

invalidate_session(session_id)

Remove the given session identifier from the active sessions.

Parameters:

Name Type Description Default

session_id Link

str

The unique identifier of the session to be removed.

required

Returns:

Type Description
bool

True if the specified session was found and removed, False otherwise.

str | None

The username corresponding to the invalidated session

Notes

The session ID is removed from the internal session dictionary. If the session does not exist, this method has no effect.

Source code in plantdb/commons/auth/session.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
def invalidate_session(self, session_id: str) -> Tuple[bool, str | None]:
    """Remove the given session identifier from the active sessions.

    Parameters
    ----------
    session_id : str
        The unique identifier of the session to be removed.

    Returns
    -------
    bool
        ``True`` if the specified session was found and removed, ``False`` otherwise.
    str | None
        The username corresponding to the invalidated session

    Notes
    -----
    The session ID is removed from the internal session dictionary.
    If the session does not exist, this method has no effect.
    """
    if session_id in self.sessions:
        username = self.sessions[session_id]['username']
        del self.sessions[session_id]
        return True, username

    return False, None

n_active_sessions Link

n_active_sessions()

Returns the number of active sessions.

Cleans up expired sessions before counting and returns the number of remaining active sessions in the collection.

Returns:

Type Description
int

The number of currently active sessions.

See Also

cleanup_expired_sessions : Cleans up the expired sessions in the collection.

Source code in plantdb/commons/auth/session.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def n_active_sessions(self) -> int:
    """Returns the number of active sessions.

    Cleans up expired sessions before counting and returns the number
    of remaining active sessions in the collection.

    Returns
    -------
    int
        The number of currently active sessions.

    See Also
    --------
    cleanup_expired_sessions : Cleans up the expired sessions in the collection.
    """
    self.cleanup_expired_sessions()
    return len(self.sessions)

refresh_session Link

refresh_session(session_id)

Refresh a session if it's still valid.

Parameters:

Name Type Description Default

session_id Link

str

Current session token

required

Returns:

Type Description
str or None

New session token if refresh is successful

Source code in plantdb/commons/auth/session.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
def refresh_session(self, session_id: str) -> Optional[str]:
    """Refresh a session if it's still valid.

    Parameters
    ----------
    session_id : str
        Current session token

    Returns
    -------
    str or None
        New session token if refresh is successful
    """
    session_data = self.validate_session(session_id)
    if not session_data:
        return None

    # Invalidate old session
    self.invalidate_session(session_id)

    # Create a new session
    username = session_data['username']
    return self.create_session(username)

session_token Link

session_token(username)

Retrieve the active session token, if any, for a given username.

This method cleans up any expired sessions first and then searches the internal sessions attribute dictionary for a session belonging to the supplied username.

Parameters:

Name Type Description Default

username Link

str

The username whose session ID is requested.

required

Returns:

Type Description
Optional[str]

The session ID associated with username if an active session exists; otherwise, None.

Source code in plantdb/commons/auth/session.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def session_token(self, username) -> Optional[str]:
    """Retrieve the active session token, if any, for a given username.

    This method cleans up any expired sessions first and then searches the internal
    ``sessions`` attribute dictionary for a session belonging to the supplied username.

    Parameters
    ----------
    username : str
        The username whose session ID is requested.

    Returns
    -------
    Optional[str]
        The session ID associated with `username` if an active session exists; otherwise, ``None``.
    """
    self.cleanup_expired_sessions()
    if username:
        for session_id, session in self.sessions.items():
            if session['username'] == username:
                return session_id
    return None

session_username Link

session_username(session_id)

Retrieve the username associated with a given session ID.

The method validates the supplied session ID by delegating to validate_session. If the session is active, the username stored in the session data is returned; otherwise None is returned.

Parameters:

Name Type Description Default

session_id Link

str

The unique identifier for the session to query.

required

Returns:

Type Description
Optional[str]

The username linked to the session, or None if the session is not found or is invalid.

Source code in plantdb/commons/auth/session.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def session_username(self, session_id: str) -> Optional[str]:
    """Retrieve the username associated with a given session ID.

    The method validates the supplied session ID by delegating to `validate_session`.
    If the session is active, the username stored in the session data is returned;
    otherwise ``None`` is returned.

    Parameters
    ----------
    session_id
        The unique identifier for the session to query.

    Returns
    -------
    Optional[str]
        The username linked to the session, or ``None`` if the
        session is not found or is invalid.
    """
    session_data = self.validate_session(session_id)
    return session_data['username'] if session_data else None

validate_session Link

validate_session(session_id)

Validate a given session by checking its existence and expiration status.

Parameters:

Name Type Description Default

session_id Link

str

The unique identifier of the session to be validated.

required

Returns:

Type Description
dict | None

A dictionary with user information if valid, None if invalid/expired.

  • username: The authenticated user
  • created_at: When the session was created
  • last_accessed: When the session was last validated
  • expires_at: When the session expires
Notes

The validate_session method updates the session's last accessed time upon successful validation.

Source code in plantdb/commons/auth/session.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
def validate_session(self, session_id: str) -> dict | None:
    """Validate a given session by checking its existence and expiration status.

    Parameters
    ----------
    session_id : str
        The unique identifier of the session to be validated.

    Returns
    -------
    dict | None
        A dictionary with user information if valid, ``None`` if invalid/expired.

          - username: The authenticated user
          - created_at: When the session was created
          - last_accessed: When the session was last validated
          - expires_at: When the session expires

    Notes
    -----
    The `validate_session` method updates the session's last accessed time upon successful validation.
    """
    if session_id not in self.sessions:
        self.logger.warning(f"Provided session does not exist!")
        return None

    session = self.sessions[session_id]
    now = datetime.now(timezone.utc)
    if now > session['expires_at']:
        username = session['username']
        self.logger.warning(f"The session for user '{username}' has expired. Please log back in!")
        _, _ = self.invalidate_session(session_id)
        return None

    # Update last accessed time
    session['last_accessed'] = now
    return session

SessionValidationError Link

Bases: Exception

Base class for all session‑validation‑related errors.

SingleSessionManager Link

SingleSessionManager(session_timeout=3600, **kwargs)

Bases: SessionManager

Generate a single-session manager for handling database connections.

The SingleSessionManager class is designed to manage a single active database session at any given time. It inherits from the base SessionManager class and overrides its initialization to ensure only one concurrent session is allowed, even if the base class allows more.

Parameters:

Name Type Description Default

session_timeout Link

int

The timeout duration for each database session in seconds. If not specified, defaults to 3600 (1 hour).

3600

Attributes:

Name Type Description
sessions Dict[str, dict]

A dictionary storing active sessions. Each key is a session ID, and each value is a dictionary containing: - 'username': str - The user associated with this session. - 'created_at': datetime - When the session was created. - 'last_accessed': datetime - Last time the session was accessed. - 'expires_at': datetime - Expiry time of the session.

session_timeout int

The configured timeout duration for sessions.

max_concurrent_sessions int

Always set to 1 to ensure only one concurrent session is allowed.

logger Logger

The logger to use for this session manager.

Examples:

>>> from plantdb.commons.auth.session import SingleSessionManager
>>> # Initialize the session manager
>>> manager = SingleSessionManager()
>>> # Create a new session with the username 'test'
>>> session_token = manager.create_session('test')
>>> # Attempt to create another session with the username 'test2'
>>> _ = manager.create_session('test2')
WARNING  [SessionManager] Reached max concurrent sessions limit (1)
>>> # Validate the session and get its info
>>> session = manager.validate_session(session_token)
>>> print(session['expires_at'])  # Print the expiration date
>>> # Refresh the session using the existing session token
>>> new_session_token = manager.refresh_session(session_token)
>>> # Validate the session and get its info
>>> session = manager.validate_session(new_session_token)
>>> print(session['expires_at'])  # Print the expiration date of the refreshed session
Notes

The SingleSessionManager enforces a single-session policy, which means any attempt to create more than one active session will result in an error or be handled according to the logic defined within this class.

See Also

session_manager.SessionManager : Base class for managing database sessions.

Source code in plantdb/commons/auth/session.py
520
521
def __init__(self, session_timeout: int = 3600, **kwargs) -> None:
    super().__init__(session_timeout=session_timeout, max_concurrent_sessions=1)

cleanup_expired_sessions Link

cleanup_expired_sessions()

Remove expired sessions from the session dictionary.

This method iterates through all stored sessions and deletes any that have an expiration time earlier than the current time.

Notes

This function modifies the self.sessions dictionary in-place.

Source code in plantdb/commons/auth/session.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
def cleanup_expired_sessions(self) -> None:
    """Remove expired sessions from the session dictionary.

    This method iterates through all stored sessions and deletes any that have
    an expiration time earlier than the current time.

    Notes
    -----
    This function modifies the `self.sessions` dictionary in-place.
    """
    current_time = datetime.now(timezone.utc)
    expired_sessions = [
        sid for sid, session in self.sessions.items()
        if current_time > session['expires_at']
    ]
    for sid in expired_sessions:
        del self.sessions[sid]
    return

create_session Link

create_session(username)

Create a new session for a user.

If the user already has an active session, it returns the existing session ID. Otherwise, it creates a new session and returns its ID.

Parameters:

Name Type Description Default

username Link

str

The unique identifier of the user for whom to create a session.

required

Returns:

Type Description
str | None

The ID of the created or existing session.

Notes

The session ID is a token generated using secrets.token_urlsafe. The session data includes the user ID, creation timestamp, last accessed timestamp, and expiration timestamp.

Source code in plantdb/commons/auth/session.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def create_session(self, username: str) -> str | None:
    """Create a new session for a user.

    If the user already has an active session, it returns the existing session ID.
    Otherwise, it creates a new session and returns its ID.

    Parameters
    ----------
    username : str
        The unique identifier of the user for whom to create a session.

    Returns
    -------
    str | None
        The ID of the created or existing session.

    Notes
    -----
    The session ID is a token generated using `secrets.token_urlsafe`.
    The session data includes the user ID, creation timestamp, last accessed timestamp, and expiration timestamp.
    """
    if self.n_active_sessions() >= self.max_concurrent_sessions:
        self.logger.warning(
            f"Reached max concurrent sessions limit ({self.max_concurrent_sessions})")
        return None

    now = datetime.now(timezone.utc)
    exp_time = now + timedelta(seconds=self.session_timeout)
    # Create a session token
    session_token = secrets.token_urlsafe(32)

    self.sessions[session_token] = {
        'username': username,
        'created_at': now,
        'last_accessed': now,
        'expires_at': exp_time
    }
    return session_token

has_logged_user Link

has_logged_user()

Check if there is at least one active (logged‑in) user.

This method first cleans up any expired sessions and then determines whether the internal sessions dictionary contains any entries.

Returns:

Type Description
bool

True if at least one user has an active session, False otherwise.

Source code in plantdb/commons/auth/session.py
244
245
246
247
248
249
250
251
252
253
254
255
256
def has_logged_user(self) -> bool:
    """Check if there is at least one active (logged‑in) user.

    This method first cleans up any expired sessions and then determines
    whether the internal ``sessions`` dictionary contains any entries.

    Returns
    -------
    bool
        ``True`` if at least one user has an active session, ``False`` otherwise.
    """
    self.cleanup_expired_sessions()
    return len(self.sessions) > 0

invalidate_session Link

invalidate_session(session_id)

Remove the given session identifier from the active sessions.

Parameters:

Name Type Description Default

session_id Link

str

The unique identifier of the session to be removed.

required

Returns:

Type Description
bool

True if the specified session was found and removed, False otherwise.

str | None

The username corresponding to the invalidated session

Notes

The session ID is removed from the internal session dictionary. If the session does not exist, this method has no effect.

Source code in plantdb/commons/auth/session.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
def invalidate_session(self, session_id: str) -> Tuple[bool, str | None]:
    """Remove the given session identifier from the active sessions.

    Parameters
    ----------
    session_id : str
        The unique identifier of the session to be removed.

    Returns
    -------
    bool
        ``True`` if the specified session was found and removed, ``False`` otherwise.
    str | None
        The username corresponding to the invalidated session

    Notes
    -----
    The session ID is removed from the internal session dictionary.
    If the session does not exist, this method has no effect.
    """
    if session_id in self.sessions:
        username = self.sessions[session_id]['username']
        del self.sessions[session_id]
        return True, username

    return False, None

n_active_sessions Link

n_active_sessions()

Returns the number of active sessions.

Cleans up expired sessions before counting and returns the number of remaining active sessions in the collection.

Returns:

Type Description
int

The number of currently active sessions.

See Also

cleanup_expired_sessions : Cleans up the expired sessions in the collection.

Source code in plantdb/commons/auth/session.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def n_active_sessions(self) -> int:
    """Returns the number of active sessions.

    Cleans up expired sessions before counting and returns the number
    of remaining active sessions in the collection.

    Returns
    -------
    int
        The number of currently active sessions.

    See Also
    --------
    cleanup_expired_sessions : Cleans up the expired sessions in the collection.
    """
    self.cleanup_expired_sessions()
    return len(self.sessions)

refresh_session Link

refresh_session(session_id)

Refresh a session if it's still valid.

Parameters:

Name Type Description Default

session_id Link

str

Current session token

required

Returns:

Type Description
str or None

New session token if refresh is successful

Source code in plantdb/commons/auth/session.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
def refresh_session(self, session_id: str) -> Optional[str]:
    """Refresh a session if it's still valid.

    Parameters
    ----------
    session_id : str
        Current session token

    Returns
    -------
    str or None
        New session token if refresh is successful
    """
    session_data = self.validate_session(session_id)
    if not session_data:
        return None

    # Invalidate old session
    self.invalidate_session(session_id)

    # Create a new session
    username = session_data['username']
    return self.create_session(username)

session_token Link

session_token(username)

Retrieve the active session token, if any, for a given username.

This method cleans up any expired sessions first and then searches the internal sessions attribute dictionary for a session belonging to the supplied username.

Parameters:

Name Type Description Default

username Link

str

The username whose session ID is requested.

required

Returns:

Type Description
Optional[str]

The session ID associated with username if an active session exists; otherwise, None.

Source code in plantdb/commons/auth/session.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def session_token(self, username) -> Optional[str]:
    """Retrieve the active session token, if any, for a given username.

    This method cleans up any expired sessions first and then searches the internal
    ``sessions`` attribute dictionary for a session belonging to the supplied username.

    Parameters
    ----------
    username : str
        The username whose session ID is requested.

    Returns
    -------
    Optional[str]
        The session ID associated with `username` if an active session exists; otherwise, ``None``.
    """
    self.cleanup_expired_sessions()
    if username:
        for session_id, session in self.sessions.items():
            if session['username'] == username:
                return session_id
    return None

session_username Link

session_username(session_id)

Retrieve the username associated with a given session ID.

The method validates the supplied session ID by delegating to validate_session. If the session is active, the username stored in the session data is returned; otherwise None is returned.

Parameters:

Name Type Description Default

session_id Link

str

The unique identifier for the session to query.

required

Returns:

Type Description
Optional[str]

The username linked to the session, or None if the session is not found or is invalid.

Source code in plantdb/commons/auth/session.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def session_username(self, session_id: str) -> Optional[str]:
    """Retrieve the username associated with a given session ID.

    The method validates the supplied session ID by delegating to `validate_session`.
    If the session is active, the username stored in the session data is returned;
    otherwise ``None`` is returned.

    Parameters
    ----------
    session_id
        The unique identifier for the session to query.

    Returns
    -------
    Optional[str]
        The username linked to the session, or ``None`` if the
        session is not found or is invalid.
    """
    session_data = self.validate_session(session_id)
    return session_data['username'] if session_data else None

validate_session Link

validate_session(session_id)

Validate a given session by checking its existence and expiration status.

Parameters:

Name Type Description Default

session_id Link

str

The unique identifier of the session to be validated.

required

Returns:

Type Description
dict | None

A dictionary with user information if valid, None if invalid/expired.

  • username: The authenticated user
  • created_at: When the session was created
  • last_accessed: When the session was last validated
  • expires_at: When the session expires
Notes

The validate_session method updates the session's last accessed time upon successful validation.

Source code in plantdb/commons/auth/session.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
def validate_session(self, session_id: str) -> dict | None:
    """Validate a given session by checking its existence and expiration status.

    Parameters
    ----------
    session_id : str
        The unique identifier of the session to be validated.

    Returns
    -------
    dict | None
        A dictionary with user information if valid, ``None`` if invalid/expired.

          - username: The authenticated user
          - created_at: When the session was created
          - last_accessed: When the session was last validated
          - expires_at: When the session expires

    Notes
    -----
    The `validate_session` method updates the session's last accessed time upon successful validation.
    """
    if session_id not in self.sessions:
        self.logger.warning(f"Provided session does not exist!")
        return None

    session = self.sessions[session_id]
    now = datetime.now(timezone.utc)
    if now > session['expires_at']:
        username = session['username']
        self.logger.warning(f"The session for user '{username}' has expired. Please log back in!")
        _, _ = self.invalidate_session(session_id)
        return None

    # Update last accessed time
    session['last_accessed'] = now
    return session

TokenType Link

Bases: Enum

Canonical token types used throughout the API.

__eq__ Link

__eq__(other)

Equality operator.

Parameters:

Name Type Description Default

other Link

Any

Object to compare with the token type instance. If a string is provided, it is interpreted as a token type name and converted to a Permission instance for comparison.

required

Returns:

Type Description
bool

True if other represents the same token type as self, otherwise False.

Notes

The method safely handles objects that do not expose a value attribute. This ensures that comparisons with unrelated types do not raise unexpected exceptions.

Examples:

>>> from plantdb.commons.auth.session import TokenType
>>> tt = TokenType.ACCESS
>>> tt == TokenType.ACCESS  # direct comparison with TokenType
True
>>> tt == 'access'  # comparison with string
True
>>> tt == 'Access'  # comparison with string is case-insensitive
True
>>> tt == TokenType.API  # direct comparison with TokenType
False
>>> tt == 'api'  # comparison with string
Fasle
Source code in plantdb/commons/auth/session.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def __eq__(self, other: Any) -> bool:
    """Equality operator.

    Parameters
    ----------
    other : Any
        Object to compare with the token type instance.
        If a string is provided, it is interpreted as a token type name and converted to a
        ``Permission`` instance for comparison.

    Returns
    -------
    bool
        ``True`` if ``other`` represents the same token type as ``self``, otherwise ``False``.

    Notes
    -----
    The method safely handles objects that do not expose a ``value`` attribute.
    This ensures that comparisons with unrelated types do not raise unexpected exceptions.

    Examples
    --------
    >>> from plantdb.commons.auth.session import TokenType
    >>> tt = TokenType.ACCESS
    >>> tt == TokenType.ACCESS  # direct comparison with TokenType
    True
    >>> tt == 'access'  # comparison with string
    True
    >>> tt == 'Access'  # comparison with string is case-insensitive
    True
    >>> tt == TokenType.API  # direct comparison with TokenType
    False
    >>> tt == 'api'  # comparison with string
    Fasle
    """
    if isinstance(other, str):
        try:
            return TokenType.from_string(other) == self
        except ValueError:
            return False
    else:
        try:
            return other.value == self.value
        except AttributeError:
            return False