Skip to content

security

Security and Request Handling UtilitiesLink

This module provides decorators and utility functions to enhance the security of Flask-based Web APIs. It offers mechanisms for rate limiting to prevent abuse and utilities for extracting JSON Web Tokens (JWT) from HTTP headers to facilitate authentication.

Key FeaturesLink

  • Rate Limiting: A thread-safe decorator to restrict the number of requests from a specific IP address within a rolling time window.
  • JWT Extraction: Functions to seamlessly extract Bearer tokens from the Authorization header and inject them into function arguments.
  • Flask Integration: Designed to work directly with Flask's request context and response objects, returning standard HTTP status codes (e.g., 429 Too Many Requests).

Usage ExamplesLink

The following example demonstrates how to protect a Flask-RESTful resource using the security decorators:

from flask import Flask
from flask_restful import Api, Resource
from security import rate_limit, add_jwt_from_header

app = Flask(__name__)
api = Api(app)

class SecureResource(Resource):
    @rate_limit(max_requests=10, window_seconds=60)
    @add_jwt_from_header
    def get(self, **kwargs):
        # The 'token' is automatically extracted and passed in kwargs
        token = kwargs.get('token')
        return {"message": "Access granted", "token_found": bool(token)}

api.add_resource(SecureResource, '/secure')

if __name__ == '__main__':
    app.run()

add_jwt_from_header Link

add_jwt_from_header(f)

Retrieve the JSON Web Token from the header and add it to keyword arguments, if any.

This enables automatic token definition from the request header.

Source code in plantdb/server/core/security.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def add_jwt_from_header(f: Callable) -> Callable:
    """Retrieve the JSON Web Token from the header and add it to keyword arguments, if any.

    This enables automatic token definition from the request header.
    """

    @wraps(f)
    def decorated_function(*args, **kwargs):
        # Get JSON Web Token from the request header
        jwt_token = jwt_from_header(request)
        if jwt_token:
            kwargs['token'] = jwt_token
        return f(*args, **kwargs)

    return decorated_function

jwt_from_header Link

jwt_from_header(request)

Extract the JWT from the Authorizationrequest header.

Parameters:

Name Type Description Default

request Link

An HTTP request exposing a headers mapping.

required

Returns:

Type Description
str | None

The extracted JWT or None.

Notes
  • The function expects the header to be in the form Bearer <token>.
  • The function performs a simple string replacement and does not perform any verification.
Source code in plantdb/server/core/security.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def jwt_from_header(request) -> str | None:
    """Extract the JWT from the ``Authorization``request header.

    Parameters
    ----------
    request: requests.models.Request
        An HTTP request exposing a ``headers`` mapping.

    Returns
    -------
    str | None
        The extracted JWT or ``None``.

    Notes
    -----
    * The function expects the header to be in the form ``Bearer <token>``.
    * The function performs a simple string replacement and does **not** perform any verification.
    """
    auth = request.headers.get('Authorization')
    if not auth:
        return None
    return auth.replace('Bearer ', '')

rate_limit Link

rate_limit(max_requests=5, window_seconds=60)

Limits the number of requests a client can make within a specified time window.

This function is a decorator that enforces rate limiting based on the maximum number of allowed requests (max_requests) and the time window size in seconds (window_seconds). It tracks incoming requests from clients using their IP addresses and ensures that they do not exceed the specified limit within the time window. If the limit is exceeded, it returns an HTTP 429 response.

Parameters:

Name Type Description Default

max_requests Link

int

The maximum number of requests permitted within the time window (default is 5).

5

window_seconds Link

int

The duration of the rate-limiting window, in seconds (default is 60 seconds).

60

Returns:

Name Type Description
decorator Callable

A decorator that can wrap any function or endpoint to enforce rate limiting.

Raises:

Type Description
HTTPException

If the rate limit is exceeded, it returns an HTTP 429 ("Too Many Requests") response to the client.

Notes

This implementation uses a thread lock to ensure thread safety when handling requests, making it suitable for multithreaded environments. The requests data structure is a defaultdict that maps client IPs to a list of their request timestamps. Old requests outside the rate-limiting window are removed to maintain efficient memory usage.

Source code in plantdb/server/core/security.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def rate_limit(max_requests=5, window_seconds=60):
    """Limits the number of requests a client can make within a specified time window.

    This function is a decorator that enforces rate limiting based on the maximum
    number of allowed requests (`max_requests`) and the time window size in seconds
    (`window_seconds`). It tracks incoming requests from clients using their IP
    addresses and ensures that they do not exceed the specified limit within the
    time window. If the limit is exceeded, it returns an HTTP ``429`` response.

    Parameters
    ----------
    max_requests : int, optional
        The maximum number of requests permitted within the time window (default is 5).
    window_seconds : int, optional
        The duration of the rate-limiting window, in seconds (default is 60 seconds).

    Returns
    -------
    decorator : Callable
        A decorator that can wrap any function or endpoint to enforce rate limiting.

    Raises
    ------
    http.client.HTTPException
        If the rate limit is exceeded, it returns an HTTP 429 ("Too Many Requests")
        response to the client.

    Notes
    -----
    This implementation uses a thread lock to ensure thread safety when handling
    requests, making it suitable for multithreaded environments. The requests
    data structure is a `defaultdict` that maps client IPs to a list of their
    request timestamps. Old requests outside the rate-limiting window are removed
    to maintain efficient memory usage.
    """
    requests = defaultdict(list)
    lock = threading.Lock()

    def decorator(f):
        @wraps(f)
        def wrapped(*args, **kwargs):
            client_ip = request.remote_addr
            current_time = time.time()

            with lock:
                # Remove old requests outside the window
                requests[client_ip] = [req_time for req_time in requests[client_ip]
                                       if current_time - req_time < window_seconds]

                # Check if the rate limit is exceeded
                if len(requests[client_ip]) >= max_requests:
                    return Response(
                        "Rate limit exceeded. Please try again later.",
                        status=429
                    )

                # Add current request
                requests[client_ip].append(current_time)

            return f(*args, **kwargs)

        return wrapped

    return decorator

sanitize_ids Link

sanitize_ids(*param_names)

Decorator that sanitizes the given identifier parameters using sanitize_name.

If sanitization fails, logs a warning and returns a 400 response.

Source code in plantdb/server/core/security.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def sanitize_ids(*param_names):
    """Decorator that sanitizes the given identifier parameters using ``sanitize_name``.

    If sanitization fails, logs a warning and returns a 400 response.
    """

    def decorator(func):
        @wraps(func)
        def wrapper(self, *args, **kwargs):
            # Bind the incoming args/kwargs to the function signature so we can
            # address parameters by name regardless of how they were passed.
            bound = inspect.signature(func).bind(self, *args, **kwargs)
            bound.apply_defaults()

            for name in param_names:
                if name in bound.arguments:
                    original = bound.arguments[name]
                    try:
                        bound.arguments[name] = sanitize_name(original)
                    except ValueError as e:
                        # Consistent logging / response pattern
                        self.logger.warning(f"Invalid {name} format: {original}")
                        return {'message': str(e)}, 400

            # Call the original method with the (now sanitized) arguments
            return func(**bound.arguments)

        return wrapper

    return decorator

sanitize_name Link

sanitize_name(name)

Sanitizes and validates the provided name.

The function ensures that the input string adheres to predefined naming rules by:

  • stripping leading/trailing spaces,
  • isolating the last segment after splitting by slashes,
  • validating the name against an alphanumeric pattern with optional underscores (_), dashes (-), or periods (.).

Parameters:

Name Type Description Default

name Link

str

The name to sanitize and validate.

required

Returns:

Type Description
str

A sanitized name that conforms to the rules.

Raises:

Type Description
ValueError

If the provided name contains invalid characters or does not meet the naming rules.

Source code in plantdb/server/core/security.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def sanitize_name(name):
    """Sanitizes and validates the provided name.

    The function ensures that the input string adheres to predefined naming rules by:

    - stripping leading/trailing spaces,
    - isolating the last segment after splitting by slashes,
    - validating the name against an alphanumeric pattern
      with optional underscores (`_`), dashes (`-`), or periods (`.`).

    Parameters
    ----------
    name : str
        The name to sanitize and validate.

    Returns
    -------
    str
        A sanitized name that conforms to the rules.

    Raises
    ------
    ValueError
        If the provided name contains invalid characters or does not meet the naming rules.
    """
    import re
    sanitized_name = name.strip()  # Remove leading/trailing spaces
    sanitized_name = sanitized_name.split('/')[-1]  # isolate the last segment after splitting by slashes
    # Validate against an alphanumeric pattern with optional underscores, dashes, or periods
    if not re.match(r"^[a-zA-Z0-9_.-]+$", sanitized_name):
        raise ValueError(
            f"Invalid name: '{name}'. Names must be alphanumeric and can include underscores, dashes, or periods.")
    return sanitized_name

use_guest_as_default Link

use_guest_as_default(f)

Injects a guest user when no authentication token is provided.

This enables methods that expect an authenticated user to operate transparently with a default guest context.

Source code in plantdb/server/core/security.py
187
188
189
190
191
192
193
194
195
196
197
198
199
def use_guest_as_default(f: Callable) -> Callable:
    """Injects a guest user when no authentication token is provided.

    This enables methods that expect an authenticated user to operate transparently with a default guest context.
    """

    @wraps(f)
    def decorated_function(self, *args, **kwargs):
        if not kwargs.get('token'):
            kwargs["current_user"] = self.db.get_guest_user()
        return f(self, *args, **kwargs)

    return decorated_function