From 7c56b7038c9022459ba3e392a83f4b52cc0d10d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20Contreras=20Guill=C3=A9n?= Date: Tue, 16 Jun 2026 15:47:26 +0200 Subject: [PATCH 1/2] feat(security): config-driven multi-IdP OAuth2 resource server (v26.06.107) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reworks the OAuth2 resource server to work out of the box with Keycloak, Microsoft Entra ID (v1.0 + v2.0) and AWS Cognito via configuration alone (no subclassing), reaching Spring-Security parity, and fixes 8 findings surfaced by an adversarial audit + a hermetic multi-IdP test harness. Added - issuer-uri OIDC discovery (derive jwks-uri + issuer from /.well-known/openid-configuration). - Config-driven claim mapping (ResourceServerProperties / ClaimMappings): principal-claim-names, authorities-claim-names, scope-claim-names, attribute-claims, authority-prefix. Claim names accept dotted paths with a '*' wildcard and are colon-safe, so authorities resolve from realm_access.roles, resource_access.*.roles (Keycloak), roles+groups (Entra), cognito:groups (Cognito) with zero code. - audiences (list; aud must match any) + validate-audience toggle (Cognito access tokens carry no aud). Configurable algorithms / clock-skew / jwks-timeout / jwks-cache. Fixed - Clock-skew leeway (default 60s) — was 0, causing intermittent 401s on real IdP tokens whose iat/nbf were slightly ahead of the server clock. - Blocking JWKS I/O now offloaded to a worker thread (anyio.to_thread) instead of stalling the event loop on a cache miss. - Multi-IdP claim coverage: resource_access roles, Entra groups/scp, Cognito groups and token attributes were silently dropped; now mapped. - Case-insensitive Bearer scheme (RFC 7235). - Opt-in authenticate-error-mode "401" rejects a present-but-invalid token at the filter with WWW-Authenticate: Bearer error="invalid_token" (RFC 6750); default "anonymous" (no behaviour change). conditional_on_missing_bean(JWKSTokenValidator) backoff is subclass-aware, so an app can still register its own validator subclass (cdm-mexico EntraClaimsValidator pattern) — covered by a wiring test. Tests - Hermetic multi-IdP suite: real localhost JWKS + real RS256 tokens shaped like Keycloak/Entra/Cognito; leeway, audiences, rotation, OIDC discovery, claim mapping, negatives. - Filter tests: offload, anonymous vs strict-401, case-insensitive Bearer, exclude-patterns. - End-to-end wiring through create_app incl. subclass backoff. - Real Keycloak integration test (testcontainers) under tests/integration: boots Keycloak, provisions realm/client/role/user, mints a real token, and validates it via OIDC discovery + the filter. Runs in the CI integration lane. Docs + CHANGELOG updated; bump v26.06.106 -> v26.06.107. --- CHANGELOG.md | 45 ++ docs/modules/security.md | 79 ++-- pyproject.toml | 2 +- src/pyfly/__init__.py | 2 +- src/pyfly/security/auto_configuration.py | 61 ++- src/pyfly/security/oauth2/__init__.py | 6 +- src/pyfly/security/oauth2/login.py | 3 +- src/pyfly/security/oauth2/properties.py | 123 ++++++ src/pyfly/security/oauth2/resource_server.py | 277 ++++++++++-- .../filters/oauth2_resource_filter.py | 73 ++- .../test_oauth2_keycloak_integration.py | 280 ++++++++++++ tests/security/test_oauth2_resource_filter.py | 156 +++++++ tests/security/test_oauth2_resource_server.py | 414 +++++++++++++----- .../test_oauth2_resource_server_wiring.py | 189 ++++++++ uv.lock | 2 +- 15 files changed, 1495 insertions(+), 217 deletions(-) create mode 100644 src/pyfly/security/oauth2/properties.py create mode 100644 tests/integration/test_oauth2_keycloak_integration.py create mode 100644 tests/security/test_oauth2_resource_filter.py create mode 100644 tests/security/test_oauth2_resource_server_wiring.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 00f13276..e2bcffd4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,51 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). --- +## v26.06.107 (2026-06-16) + +### Added + +- **OAuth2 resource server: config-driven, multi-IdP, Spring-parity.** The + bearer-token resource server now works out of the box with **Keycloak**, + **Microsoft Entra ID** (v1.0 + v2.0) and **AWS Cognito** via configuration + alone (no subclassing), and reaches Spring-Security parity: + - **`issuer-uri` OIDC discovery** — derive the JWKS endpoint + issuer from + `/.well-known/openid-configuration` (alternative to `jwks-uri`). + - **Config-driven claim mapping** (`pyfly.security.oauth2.resource-server.*`): + `principal-claim-names`, `authorities-claim-names`, `scope-claim-names`, + `attribute-claims`, `authority-prefix`. Claim names accept **dotted paths** + with a `*` wildcard and are colon-safe, so authorities resolve from + `realm_access.roles`, `resource_access.*.roles` (Keycloak), `roles` + `groups` + (Entra), and `cognito:groups` (Cognito) with zero code. + - **`audiences`** (a list; `aud` must match any) and **`validate-audience`** + (disable for Cognito *access* tokens, which carry no `aud`). + - Configurable **`algorithms`**, **`clock-skew-seconds`**, **`jwks-timeout-seconds`**, + **`jwks-cache-seconds`**. + - New typed **`ResourceServerProperties`** (`@config_properties`) and + **`ClaimMappings`**. + +### Fixed + +- **OAuth2 resource server — clock-skew leeway.** JWT validation now allows 60s + of clock skew by default (configurable). Previously a token whose `iat`/`nbf` + was a few seconds ahead of the server clock — routine with real IdPs — was + rejected as "not yet valid", causing intermittent 401s. +- **OAuth2 resource server — event-loop stall.** The bearer filter now runs JWKS + validation (which does blocking network I/O on a cache miss) in a worker thread + (`anyio.to_thread`) instead of inline on the event loop. +- **OAuth2 resource server — multi-IdP claim coverage.** Token-to-`SecurityContext` + mapping previously read only `realm_access.roles` and `scope`/`permissions`, + silently dropping Keycloak `resource_access` client roles, Entra `groups` / + `scp`, Cognito `cognito:groups`, and any token `attributes`. All are now mapped + (configurably). +- **OAuth2 resource server — case-insensitive `Bearer` scheme** (RFC 7235): a + `bearer …` / `BEARER …` Authorization header is now accepted. +- **OAuth2 resource server — opt-in strict rejection.** New + `authenticate-error-mode: "401"` rejects a *present-but-invalid* token at the + filter with `401` + `WWW-Authenticate: Bearer error="invalid_token"` (RFC + 6750). Default remains `"anonymous"` (the gate decides) — no behavioural change + unless opted in. + ## v26.06.106 (2026-06-16) ### Fixed diff --git a/docs/modules/security.md b/docs/modules/security.md index 2b2834ce..037df1cd 100644 --- a/docs/modules/security.md +++ b/docs/modules/security.md @@ -954,16 +954,58 @@ from pyfly.security.oauth2 import ( ### OAuth2 Resource Server (JWKS) -The `JWKSTokenValidator` validates RS256-signed JWTs using a remote JWKS (JSON Web Key Set) endpoint. This is used when your application acts as an **OAuth2 Resource Server** -- it receives tokens issued by an external authorization server and validates them. +The `JWKSTokenValidator` validates JWTs against a remote JWKS (JSON Web Key Set) endpoint. Use it when your application is an **OAuth2 Resource Server** — it receives bearer tokens issued by an external authorization server and validates the signature, `iss`, `aud` and `exp` (with clock-skew leeway). It is **multi-IdP out of the box**: Keycloak, Microsoft Entra ID (v1.0 + v2.0) and AWS Cognito all work via configuration, no subclassing. + +#### Enable via configuration (recommended) + +The resource-server filter auto-wires when `pyfly.security.oauth2.resource-server.enabled=true`. It binds [`ResourceServerProperties`](#) and adds a bearer-token filter to the chain. + +```yaml +pyfly: + security: + enabled: true + oauth2: + resource-server: + enabled: true + # Provide a JWKS URI directly, OR an issuer-uri for OIDC discovery: + issuer-uri: "https://login.microsoftonline.com//v2.0" # discovers jwks-uri + issuer + # jwks-uri: "https://login.microsoftonline.com//discovery/v2.0/keys" + audiences: "api://my-backend" # comma-separated; token aud must match ANY + validate-audience: true # set false for Cognito ACCESS tokens (they carry no aud) + algorithms: "RS256" + clock-skew-seconds: 60 # leeway for iat/nbf/exp (default 60) + # Config-driven claim mapping (dotted paths, '*' wildcard, colon-safe): + principal-claim-names: "oid,sub" + authorities-claim-names: "roles,realm_access.roles,resource_access.*.roles,groups,cognito:groups" + scope-claim-names: "scp,scope" # Entra uses scp; Keycloak/Cognito use scope + attribute-claims: "tid,preferred_username" + authority-prefix: "" # e.g. "ROLE_" / "SCOPE_" for Spring-style authorities + exclude-patterns: "/actuator/**,/api/v1/version" + authenticate-error-mode: "anonymous" # or "401" to reject invalid tokens at the filter +``` + +Per-IdP quick reference: + +| IdP | `issuer` | Roles claim(s) | Scopes | Audience | +|---|---|---|---|---| +| **Keycloak** | `https:///realms/` | `realm_access.roles`, `resource_access.*.roles` | `scope` | client / `account` | +| **Entra ID v2.0** | `https://login.microsoftonline.com//v2.0` | `roles`, `groups` | `scp` | `api://…` or client GUID | +| **Cognito (access)** | `https://cognito-idp..amazonaws.com/` | `cognito:groups` | `scope` | **none** → set `validate-audience: false` | + +#### Programmatic use ```python -from pyfly.security.oauth2 import JWKSTokenValidator +from pyfly.security.oauth2 import JWKSTokenValidator, ClaimMappings validator = JWKSTokenValidator( jwks_uri="https://auth.example.com/.well-known/jwks.json", issuer="https://auth.example.com", - audience="my-api", + audiences=["my-api"], + leeway=60, + claim_mappings=ClaimMappings(attribute_claims=("tid",)), ) +ctx = validator.to_security_context(token) +# SecurityContext(user_id=..., roles=[...], permissions=[...], attributes={...}) ``` **Constructor parameters:** @@ -971,33 +1013,20 @@ validator = JWKSTokenValidator( | Parameter | Type | Default | Description | |---|---|---|---| | `jwks_uri` | `str` | required | URL of the JWKS endpoint | -| `issuer` | `str \| None` | `None` | Expected `iss` claim (validates if set) | -| `audience` | `str \| None` | `None` | Expected `aud` claim (validates if set) | +| `issuer` | `str \| None` | `None` | Expected `iss` claim (validated if set) | +| `audiences` | `list[str] \| None` | `None` | Accepted audiences; `aud` must match any. Empty disables `aud` validation | | `algorithms` | `list[str] \| None` | `["RS256"]` | Allowed signing algorithms | +| `leeway` | `int` | `60` | Clock-skew tolerance (seconds) for `iat`/`nbf`/`exp` | +| `validate_audience` | `bool` | `True` | Skip `aud` validation when `False` (Cognito access tokens) | +| `claim_mappings` | `ClaimMappings \| None` | multi-IdP defaults | Config-driven claim→context mapping | -**Validating tokens:** - -```python -# Validate and get raw payload -payload = validator.validate(token) -# {"sub": "user-123", "roles": ["ADMIN"], "scope": "read write", ...} - -# Validate and build SecurityContext directly -ctx = validator.to_security_context(token) -# SecurityContext(user_id="user-123", roles=["ADMIN"], permissions=["read", "write"]) -``` +**Claim mapping (`ClaimMappings`):** claim names are searched as **dotted paths** with a single `*` wildcard (`resource_access.*.roles`) and are colon-safe (`cognito:groups`). Defaults map authorities from `roles`, `realm_access.roles`, `resource_access.*.roles`, `groups`, `cognito:groups`; scopes from `scp`, `scope`; principal from `oid` then `sub`. -**Claim mapping for `to_security_context()`:** +To customise per IdP without subclassing, set the `*-claim-names` config keys. An application that needs bespoke mapping can still subclass `JWKSTokenValidator` and register it — `@conditional_on_missing_bean(JWKSTokenValidator)` backs the default off. -| JWT Claim | SecurityContext Field | Notes | -|---|---|---| -| `sub` | `user_id` | Standard subject claim | -| `roles` | `roles` | Flat roles array | -| `realm_access.roles` | `roles` | Keycloak-style nested roles (fallback) | -| `permissions` | `permissions` | Flat permissions array | -| `scope` | `permissions` | Space-separated scopes (fallback, split on spaces) | +**OIDC discovery:** set `issuer-uri` (instead of `jwks-uri`) and the framework fetches `/.well-known/openid-configuration` to learn the `jwks_uri` + `issuer`. -**Source:** `src/pyfly/security/oauth2/resource_server.py` +**Source:** `src/pyfly/security/oauth2/resource_server.py`, `src/pyfly/security/oauth2/properties.py` ### OAuth2 Client Registration diff --git a/pyproject.toml b/pyproject.toml index 45873a81..b0a0749e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "pyfly" # CalVer YY.MM.PATCH — package metadata uses PEP 440 normalized form (26.5.4); # git tag, GitHub release and human-readable display use leading-zero form # (v26.05.04) to match the Java/.NET/Go siblings. -version = "26.6.106" +version = "26.6.107" description = "The official Python implementation of the Firefly Framework — DI, CQRS, EDA, hexagonal architecture, and more." readme = "README.md" license = "Apache-2.0" diff --git a/src/pyfly/__init__.py b/src/pyfly/__init__.py index 12202efa..e63ba556 100644 --- a/src/pyfly/__init__.py +++ b/src/pyfly/__init__.py @@ -13,4 +13,4 @@ # limitations under the License. """PyFly — Enterprise Python Framework.""" -__version__ = "26.06.106" +__version__ = "26.06.107" diff --git a/src/pyfly/security/auto_configuration.py b/src/pyfly/security/auto_configuration.py index eb025a1c..07f3727f 100644 --- a/src/pyfly/security/auto_configuration.py +++ b/src/pyfly/security/auto_configuration.py @@ -27,9 +27,20 @@ BcryptPasswordEncoder = object # type: ignore[misc,assignment] try: - from pyfly.security.oauth2.resource_server import JWKSTokenValidator + from pyfly.security.oauth2.resource_server import ( + ClaimMappings, + JWKSTokenValidator, + discover_oidc, + ) except ImportError: JWKSTokenValidator = object # type: ignore[misc,assignment] + ClaimMappings = object # type: ignore[misc,assignment] + discover_oidc = None # type: ignore[assignment] + +try: + from pyfly.security.oauth2.properties import ResourceServerProperties +except ImportError: + ResourceServerProperties = object # type: ignore[misc,assignment] try: from pyfly.security.oauth2.authorization_server import AuthorizationServer, InMemoryTokenStore @@ -134,23 +145,47 @@ def password_encoder(self, config: Config) -> BcryptPasswordEncoder: @conditional_on_property("pyfly.security.oauth2.resource-server.enabled", having_value="true") @conditional_on_class("jwt") class OAuth2ResourceServerAutoConfiguration: - """Auto-configures a JWKSTokenValidator when a JWKS URI is provided. - - Activated when ``pyfly.security.oauth2.resource-server.enabled=true`` - and ``pyjwt`` is installed. Reads the JWKS endpoint, issuer, and - audience from configuration properties. + """Auto-configures a multi-IdP :class:`JWKSTokenValidator`. + + Activated when ``pyfly.security.oauth2.resource-server.enabled=true`` and + ``pyjwt`` is installed. Binds :class:`ResourceServerProperties` and works out + of the box with Keycloak, Microsoft Entra ID (v1.0 + v2.0) and AWS Cognito — + deriving the JWKS endpoint via OIDC discovery from ``issuer-uri`` when no + explicit ``jwks-uri`` is given, and mapping authorities / scopes / principal + from a configurable set of claim paths. """ @bean @conditional_on_missing_bean(JWKSTokenValidator) def jwks_token_validator(self, config: Config) -> JWKSTokenValidator: - jwks_uri = str(config.get("pyfly.security.oauth2.resource-server.jwks-uri", "")) - issuer = config.get("pyfly.security.oauth2.resource-server.issuer") - audience = config.get("pyfly.security.oauth2.resource-server.audience") + props = config.bind(ResourceServerProperties) + + jwks_uri = props.jwks_uri + issuer = props.issuer or None + # OIDC discovery (Spring's ``issuer-uri``): derive the JWKS endpoint and + # the authoritative issuer from the provider's discovery document. + if not jwks_uri and props.issuer_uri: + jwks_uri, discovered_issuer = discover_oidc(props.issuer_uri, timeout=float(props.jwks_timeout_seconds)) + issuer = issuer or discovered_issuer + + mappings = ClaimMappings( + principal_claims=tuple(props.principal_claim_list()), + authority_claims=tuple(props.authorities_claim_list()), + scope_claims=tuple(props.scope_claim_list()), + authority_prefix=props.authority_prefix, + attribute_claims=tuple(props.attribute_claim_list()), + ) + return JWKSTokenValidator( jwks_uri=jwks_uri, - issuer=str(issuer) if issuer is not None else None, - audience=str(audience) if audience is not None else None, + issuer=issuer, + audiences=props.audience_list(), + algorithms=props.algorithm_list(), + leeway=props.clock_skew_seconds, + validate_audience=props.validate_audience, + claim_mappings=mappings, + jwks_timeout=float(props.jwks_timeout_seconds), + jwks_cache_seconds=props.jwks_cache_seconds, ) @bean @@ -160,9 +195,11 @@ def oauth2_resource_server_filter(self, token_validator: JWKSTokenValidator, con # rescan adds it to the chain whenever the resource server is on (#41). from pyfly.web.adapters.starlette.filters.oauth2_resource_filter import OAuth2ResourceServerFilter + props = config.bind(ResourceServerProperties) return OAuth2ResourceServerFilter( token_validator=token_validator, - exclude_patterns=_exclude_patterns(config, "pyfly.security.oauth2.resource-server.exclude-patterns"), + exclude_patterns=props.exclude_pattern_list(), + error_mode=props.authenticate_error_mode, ) diff --git a/src/pyfly/security/oauth2/__init__.py b/src/pyfly/security/oauth2/__init__.py index c377da4c..2a2c88d3 100644 --- a/src/pyfly/security/oauth2/__init__.py +++ b/src/pyfly/security/oauth2/__init__.py @@ -27,11 +27,13 @@ keycloak, ) from pyfly.security.oauth2.login import OAuth2LoginHandler -from pyfly.security.oauth2.resource_server import JWKSTokenValidator +from pyfly.security.oauth2.properties import ResourceServerProperties +from pyfly.security.oauth2.resource_server import ClaimMappings, JWKSTokenValidator, discover_oidc from pyfly.security.oauth2.session_security_filter import OAuth2SessionSecurityFilter __all__ = [ "AuthorizationServer", + "ClaimMappings", "ClientRegistration", "ClientRegistrationRepository", "InMemoryClientRegistrationRepository", @@ -39,7 +41,9 @@ "JWKSTokenValidator", "OAuth2LoginHandler", "OAuth2SessionSecurityFilter", + "ResourceServerProperties", "TokenStore", + "discover_oidc", "github", "google", "keycloak", diff --git a/src/pyfly/security/oauth2/login.py b/src/pyfly/security/oauth2/login.py index bad04be7..ed26ce96 100644 --- a/src/pyfly/security/oauth2/login.py +++ b/src/pyfly/security/oauth2/login.py @@ -301,7 +301,8 @@ def _validate_id_token(self, registration: Any, id_token: str, nonce: str | None validator = JWKSTokenValidator( jwks_uri=registration.jwks_uri, issuer=getattr(registration, "issuer_uri", "") or None, - audience=registration.client_id, + # An OIDC id_token's audience is the client_id. + audiences=[registration.client_id], ) try: claims = validator.validate(id_token) diff --git a/src/pyfly/security/oauth2/properties.py b/src/pyfly/security/oauth2/properties.py new file mode 100644 index 00000000..2bafecf6 --- /dev/null +++ b/src/pyfly/security/oauth2/properties.py @@ -0,0 +1,123 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Typed configuration for the OAuth2 resource server. + +Bound from ``pyfly.security.oauth2.resource-server.*`` (Spring-Boot-style relaxed +binding: kebab-case YAML → snake_case fields, ``${...}`` + ``PYFLY_*`` env +overrides). Multi-value fields are comma-separated strings (or YAML lists) parsed +by the ``*_list`` helpers — the same convention as other PyFly filter +``exclude-patterns``. + +YAML structure:: + + pyfly: + security: + oauth2: + resource-server: + enabled: true + # Provide a JWKS URI directly, OR an issuer-uri for OIDC discovery. + jwks-uri: "https://login.microsoftonline.com//discovery/v2.0/keys" + issuer-uri: "https://login.microsoftonline.com//v2.0" # OIDC discovery + issuer: "https://login.microsoftonline.com//v2.0" # explicit iss + audiences: "api://cdm-backend,cdm-api" + validate-audience: true # set false for Cognito access tokens (no aud) + algorithms: "RS256" + clock-skew-seconds: 60 + jwks-timeout-seconds: 30 + jwks-cache-seconds: 300 + # Config-driven claim mapping (dotted paths, '*' wildcard): + principal-claim-names: "oid,sub" + authorities-claim-names: "roles,realm_access.roles,resource_access.*.roles,groups,cognito:groups" + authority-prefix: "" + scope-claim-names: "scp,scope" + attribute-claims: "tid,preferred_username" + exclude-patterns: "/actuator/**,/api/v1/version" + authenticate-error-mode: "anonymous" # or "401" +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from pyfly.core.config import config_properties + + +def _csv(value: str) -> list[str]: + """Split a comma-separated config string into a trimmed, non-empty list.""" + return [item.strip() for item in value.split(",") if item.strip()] + + +@config_properties(prefix="pyfly.security.oauth2.resource-server") +@dataclass +class ResourceServerProperties: + """``pyfly.security.oauth2.resource-server.*`` — OAuth2 resource server.""" + + enabled: bool = False + + # --- key source ------------------------------------------------------- + jwks_uri: str = "" + # OIDC discovery: when set and jwks-uri is empty, the framework fetches + # ``/.well-known/openid-configuration`` to learn jwks-uri + issuer. + issuer_uri: str = "" + issuer: str = "" + + # --- audience --------------------------------------------------------- + audiences: str = "" + validate_audience: bool = True + + # --- signature / time ------------------------------------------------- + algorithms: str = "RS256" + clock_skew_seconds: int = 60 + jwks_timeout_seconds: int = 30 + jwks_cache_seconds: int = 300 + + # --- claim mapping ---------------------------------------------------- + principal_claim_names: str = "oid,sub" + authorities_claim_names: str = ( + "roles,scopes,authorities,realm_access.roles,resource_access.*.roles,groups,cognito:groups" + ) + authority_prefix: str = "" + scope_claim_names: str = "scp,scope" + attribute_claims: str = "" + + # --- filter ----------------------------------------------------------- + exclude_patterns: str = "" + # "anonymous" (default, non-breaking): an invalid/missing token yields an + # anonymous context and the request proceeds — the HttpSecurity gate decides. + # "401": a *present but invalid* token is rejected at the filter with a 401 + + # ``WWW-Authenticate: Bearer error="invalid_token"`` (RFC 6750). A missing + # token still falls through to the gate. + authenticate_error_mode: str = "anonymous" + + # --- parsed-list accessors ------------------------------------------- + def audience_list(self) -> list[str]: + return _csv(self.audiences) + + def algorithm_list(self) -> list[str]: + return _csv(self.algorithms) or ["RS256"] + + def principal_claim_list(self) -> list[str]: + return _csv(self.principal_claim_names) or ["oid", "sub"] + + def authorities_claim_list(self) -> list[str]: + return _csv(self.authorities_claim_names) + + def scope_claim_list(self) -> list[str]: + return _csv(self.scope_claim_names) + + def attribute_claim_list(self) -> list[str]: + return _csv(self.attribute_claims) + + def exclude_pattern_list(self) -> list[str]: + return _csv(self.exclude_patterns) diff --git a/src/pyfly/security/oauth2/resource_server.py b/src/pyfly/security/oauth2/resource_server.py index 3b4e152a..d94ced9e 100644 --- a/src/pyfly/security/oauth2/resource_server.py +++ b/src/pyfly/security/oauth2/resource_server.py @@ -11,10 +11,29 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""OAuth2 Resource Server — JWKS-based JWT validation.""" +"""OAuth2 Resource Server — JWKS-based JWT validation. + +A config-driven, multi-IdP bearer-token validator. Out of the box it accepts +tokens from **Keycloak**, **Microsoft Entra ID** (v1.0 + v2.0) and **AWS +Cognito** without subclassing, by reading roles/scopes/principal from a +configurable set of claim paths (see :class:`ClaimMappings`). + +Spring Security parity: ``issuer-uri`` OIDC discovery, configurable signing +algorithms, clock-skew leeway, a list of accepted audiences (with an opt-out for +Cognito access tokens, which carry no ``aud``), and config-driven authority / +scope / principal claim mapping. + +The class stays the single base type returned by the framework auto-config bean, +so an application that registers its own ``JWKSTokenValidator`` subclass (e.g. to +do bespoke claim mapping) transparently overrides the default via +``@conditional_on_missing_bean(JWKSTokenValidator)``. +""" from __future__ import annotations +import json +import urllib.request +from dataclasses import dataclass, field from typing import Any import jwt @@ -23,45 +42,192 @@ from pyfly.kernel.exceptions import SecurityException from pyfly.security.context import SecurityContext +# Default clock-skew tolerance, in seconds. Matches Spring Security's +# ``JwtTimestampValidator`` default (60s). Without it, a token whose ``iat`` / +# ``nbf`` is a few seconds ahead of this server's clock — routine with real +# IdPs — is rejected as "not yet valid", causing intermittent 401s. +DEFAULT_CLOCK_SKEW_SECONDS = 60 + +# Default claim paths searched (in order, all collected) for authorities/roles. +# Covers every mainstream IdP with zero configuration: +# * ``roles`` — flat (custom IdPs, Entra app roles) +# * ``scopes`` / ``authorities`` — common conventions +# * ``realm_access.roles`` — Keycloak realm roles +# * ``resource_access.*.roles`` — Keycloak per-client roles (``*`` = any client) +# * ``groups`` — Entra group object-ids +# * ``cognito:groups`` — AWS Cognito groups +# Applications can narrow this list via +# ``pyfly.security.oauth2.resource-server.authorities-claim-names``. +DEFAULT_AUTHORITY_CLAIMS: tuple[str, ...] = ( + "roles", + "scopes", + "authorities", + "realm_access.roles", + "resource_access.*.roles", + "groups", + "cognito:groups", +) + +# Default claim names (space-delimited string or list) mapped to *permissions*. +# ``scp`` is Entra's delegated-scope claim; ``scope`` is the Keycloak / Cognito / +# OAuth2 convention. +DEFAULT_SCOPE_CLAIMS: tuple[str, ...] = ("scp", "scope") + +# Default principal (user id) claim search order: Entra's stable ``oid`` first, +# then the standard ``sub``. +DEFAULT_PRINCIPAL_CLAIMS: tuple[str, ...] = ("oid", "sub") + + +@dataclass(frozen=True) +class ClaimMappings: + """Config-driven mapping from JWT claims onto a :class:`SecurityContext`. + + All claim names support **dotted paths** (``realm_access.roles``) and a + single-level ``*`` **wildcard** that iterates every key at that level + (``resource_access.*.roles``). A path segment is split on ``.`` only, so + colon-bearing claim names such as ``cognito:groups`` are matched verbatim. + """ + + principal_claims: tuple[str, ...] = DEFAULT_PRINCIPAL_CLAIMS + authority_claims: tuple[str, ...] = DEFAULT_AUTHORITY_CLAIMS + scope_claims: tuple[str, ...] = DEFAULT_SCOPE_CLAIMS + # Prefix applied to every extracted authority (Spring uses ``SCOPE_`` / + # ``ROLE_``). Default empty: authorities are kept as the raw claim value so + # ``has_role("CdM.Gn")`` matches the token's literal role string. + authority_prefix: str = "" + # Claims copied verbatim (string-coerced) into ``SecurityContext.attributes`` + # (e.g. ``tid``, ``preferred_username``, ``employeeid``). + attribute_claims: tuple[str, ...] = field(default_factory=tuple) + + +def _resolve_claim_path(payload: dict[str, Any], path: str) -> list[Any]: + """Resolve a dotted claim *path* (with optional ``*`` wildcard) to a flat + list of leaf values. Missing paths yield ``[]``.""" + segments = path.split(".") + # Frontier of nodes currently being walked; starts at the payload root. + nodes: list[Any] = [payload] + for seg in segments: + nxt: list[Any] = [] + for node in nodes: + if not isinstance(node, dict): + continue + if seg == "*": + nxt.extend(node.values()) + elif seg in node: + nxt.append(node[seg]) + nodes = nxt + if not nodes: + return [] + return nodes + + +def _flatten_strs(values: list[Any]) -> list[str]: + """Flatten leaf values (strings or lists of strings) into a string list, + preserving order and dropping empties / non-strings.""" + out: list[str] = [] + for v in values: + if isinstance(v, str): + if v: + out.append(v) + elif isinstance(v, (list, tuple)): + out.extend(str(x) for x in v if isinstance(x, (str, int)) and str(x)) + return out + + +def discover_oidc(issuer_uri: str, *, timeout: float = 10.0) -> tuple[str, str]: + """Fetch an OIDC provider's discovery document and return + ``(jwks_uri, issuer)``. + + Mirrors Spring's ``issuer-uri``: GET ``/.well-known/openid-configuration`` + and read ``jwks_uri`` + ``issuer``. The returned ``issuer`` is the + authoritative value from the document (used to validate the ``iss`` claim). + + Raises: + SecurityException: If the document cannot be fetched or lacks ``jwks_uri``. + """ + base = issuer_uri.rstrip("/") + well_known = f"{base}/.well-known/openid-configuration" + try: + with urllib.request.urlopen(well_known, timeout=timeout) as resp: # noqa: S310 (https config URL) + doc = json.loads(resp.read().decode("utf-8")) + except Exception as exc: # network / JSON / URL errors + raise SecurityException( + f"OIDC discovery failed for issuer-uri {issuer_uri!r}: {exc}", + code="OIDC_DISCOVERY_FAILED", + ) from exc + jwks_uri = doc.get("jwks_uri") + if not jwks_uri: + raise SecurityException( + f"OIDC discovery document at {well_known!r} has no 'jwks_uri'.", + code="OIDC_DISCOVERY_FAILED", + ) + return str(jwks_uri), str(doc.get("issuer") or base) + class JWKSTokenValidator: - """Validates RS256-signed JWTs using a remote JWKS endpoint. + """Validates JWTs using a remote JWKS endpoint. - Fetches public keys from the JWKS URI and caches them. - Extracts claims to build a SecurityContext. + Fetches and caches public keys from the JWKS URI, verifies the signature, + ``iss``, ``aud`` (when configured), and ``exp`` (with clock-skew leeway), and + maps claims to a :class:`SecurityContext`. Args: - jwks_uri: The JWKS endpoint URL (e.g., - ``"https://auth.example.com/.well-known/jwks.json"``). - issuer: Expected token issuer (optional, validates ``iss`` claim if - set). - audience: Expected token audience (optional, validates ``aud`` claim - if set). - algorithms: Allowed algorithms (default: ``["RS256"]``). + jwks_uri: The JWKS endpoint URL. + issuer: Expected ``iss`` (validated when set). + audiences: Accepted audiences; the token's ``aud`` must match **any**. + When empty, audience validation is **disabled** (required for AWS + Cognito *access* tokens, which carry ``client_id`` instead of ``aud``). + algorithms: Allowed signing algorithms (default: ``["RS256"]``). + leeway: Clock-skew tolerance in seconds for ``exp`` / ``nbf`` / ``iat`` + (default: 60). + validate_audience: Set ``False`` to skip ``aud`` validation even when + audiences are configured. + claim_mappings: Config-driven claim→context mapping (default: + multi-IdP defaults). + jwks_timeout: HTTP timeout (seconds) for JWKS fetches. + jwks_cache_seconds: JWK-set cache lifespan (seconds). """ def __init__( self, jwks_uri: str, + *, issuer: str | None = None, - audience: str | None = None, + audiences: list[str] | None = None, algorithms: list[str] | None = None, + leeway: int = DEFAULT_CLOCK_SKEW_SECONDS, + validate_audience: bool = True, + claim_mappings: ClaimMappings | None = None, + jwks_timeout: float = 30.0, + jwks_cache_seconds: int = 300, ) -> None: - self._jwks_client = PyJWKClient(jwks_uri) + self._jwks_client = PyJWKClient( + jwks_uri, + cache_keys=True, + cache_jwk_set=True, + lifespan=jwks_cache_seconds, + timeout=jwks_timeout, + ) self._issuer = issuer - self._audience = audience + self._audiences = [a for a in (audiences or []) if a] self._algorithms = algorithms or ["RS256"] + self._leeway = leeway + self._validate_audience = validate_audience + self._mappings = claim_mappings or ClaimMappings() def validate(self, token: str) -> dict[str, Any]: - """Validate a JWT token and return the decoded payload. + """Validate a JWT and return its decoded payload. - Uses JWKS to fetch the signing key matching the token's ``kid`` - header. + Verifies the signature (via the JWKS key matching the token's ``kid``), + ``iss``, ``aud`` (only when audiences are configured and audience + validation is enabled), and ``exp`` — with ``leeway`` seconds of + clock-skew tolerance. Raises: - SecurityException: If the token is invalid, expired, or key not - found. + SecurityException: If the token is invalid, expired, or its key is + not found. """ + verify_aud = self._validate_audience and bool(self._audiences) try: signing_key = self._jwks_client.get_signing_key_from_jwt(token) payload = jwt.decode( @@ -69,8 +235,10 @@ def validate(self, token: str) -> dict[str, Any]: signing_key.key, algorithms=self._algorithms, issuer=self._issuer, - audience=self._audience, - options={"require": ["exp"]}, + # Pass the list when verifying; ``None`` disables PyJWT's own aud check. + audience=self._audiences if verify_aud else None, + leeway=self._leeway, + options={"require": ["exp"], "verify_aud": verify_aud}, ) return payload except jwt.PyJWTError as exc: @@ -80,34 +248,55 @@ def validate(self, token: str) -> dict[str, Any]: ) from exc def to_security_context(self, token: str) -> SecurityContext: - """Validate token and build a :class:`SecurityContext` from claims. + """Validate *token* and build a :class:`SecurityContext` from its claims, + using the configured :class:`ClaimMappings` (multi-IdP by default).""" + payload = self.validate(token) + return self._build_context(payload) - Expects standard claims: + def _build_context(self, payload: dict[str, Any]) -> SecurityContext: + """Map a validated *payload* onto a :class:`SecurityContext` per the + configured claim mappings. Subclasses may override for bespoke mapping.""" + m = self._mappings - - ``sub``: maps to *user_id* - - ``roles`` or ``realm_access.roles``: maps to *roles* - - ``scope`` or ``permissions``: maps to *permissions* - """ - payload = self.validate(token) + # Principal: first non-empty principal claim wins. + user_id: str | None = None + for claim in m.principal_claims: + vals = _flatten_strs(_resolve_claim_path(payload, claim)) + if vals: + user_id = vals[0] + break + + # Authorities/roles: collect across every configured path, de-duplicated + # (order-preserving), with the optional prefix applied. + roles: list[str] = [] + seen: set[str] = set() + for claim in m.authority_claims: + for raw in _flatten_strs(_resolve_claim_path(payload, claim)): + value = f"{m.authority_prefix}{raw}" if m.authority_prefix else raw + if value not in seen: + seen.add(value) + roles.append(value) + + # Permissions/scopes: scope claims are space-delimited strings or lists. + permissions: list[str] = [] + perm_seen: set[str] = set() + for claim in m.scope_claims: + for raw in _flatten_strs(_resolve_claim_path(payload, claim)): + for part in raw.split(): + if part and part not in perm_seen: + perm_seen.add(part) + permissions.append(part) - # Extract roles — support both flat "roles" claim and Keycloak's - # nested structure. - roles = payload.get("roles", []) - if not roles: - realm_access = payload.get("realm_access", {}) - if isinstance(realm_access, dict): - roles = realm_access.get("roles", []) - - # Extract permissions — support "permissions" or "scope" - # (space-separated). - permissions = payload.get("permissions", []) - if not permissions: - scope = payload.get("scope", "") - if isinstance(scope, str) and scope: - permissions = scope.split() + # Attributes: copy configured claims verbatim (string-coerced). + attributes: dict[str, str] = {} + for claim in m.attribute_claims: + vals = _flatten_strs(_resolve_claim_path(payload, claim)) + if vals: + attributes[claim] = vals[0] return SecurityContext( - user_id=payload.get("sub"), + user_id=user_id, roles=roles, permissions=permissions, + attributes=attributes, ) diff --git a/src/pyfly/web/adapters/starlette/filters/oauth2_resource_filter.py b/src/pyfly/web/adapters/starlette/filters/oauth2_resource_filter.py index 1a0dd509..8c42c079 100644 --- a/src/pyfly/web/adapters/starlette/filters/oauth2_resource_filter.py +++ b/src/pyfly/web/adapters/starlette/filters/oauth2_resource_filter.py @@ -19,8 +19,9 @@ from collections.abc import Sequence from typing import cast +from anyio import to_thread from starlette.requests import Request -from starlette.responses import Response +from starlette.responses import JSONResponse, Response from pyfly.container.ordering import HIGHEST_PRECEDENCE, order from pyfly.context.request_context import RequestContext @@ -32,34 +33,65 @@ logger = logging.getLogger(__name__) +# RFC 6750 §3 challenge returned in "401" error mode for an invalid token. +_INVALID_TOKEN_CHALLENGE = 'Bearer error="invalid_token"' + +ERROR_MODE_ANONYMOUS = "anonymous" +ERROR_MODE_401 = "401" + @order(HIGHEST_PRECEDENCE + 250) class OAuth2ResourceServerFilter(OncePerRequestFilter): - """Extracts Bearer token and validates it against a JWKS endpoint. + """Extracts the Bearer token and validates it against a JWKS endpoint. + + Populates ``request.state.security_context`` (and the active + :class:`RequestContext`) with claims from the JWT. ``exclude_patterns`` + (fnmatch globs, honoured by :class:`OncePerRequestFilter`) skip public paths. + + Behaviour on a bad/missing token is governed by ``error_mode``: + + * ``"anonymous"`` (default): an invalid **or** missing token yields an + anonymous :class:`SecurityContext` and the request proceeds — the + downstream ``HttpSecurity`` gate / ``@pre_authorize`` decide. This keeps + the resource-server filter composable with permit-all public endpoints. + * ``"401"``: a **present but invalid** token is rejected here with + ``401 Unauthorized`` and a ``WWW-Authenticate: Bearer error="invalid_token"`` + header (RFC 6750). A **missing** token still falls through to the gate + (so public endpoints remain reachable). - Populates ``request.state.security_context`` with claims from the JWT. - Uses ``exclude_patterns`` (fnmatch globs) to skip public endpoints. - For missing or invalid tokens, sets an anonymous SecurityContext. + JWKS key resolution does blocking network I/O on a cache miss, so token + validation runs in a worker thread (``anyio.to_thread``) to avoid stalling + the event loop. """ def __init__( self, token_validator: JWKSTokenValidator, exclude_patterns: Sequence[str] = (), + *, + error_mode: str = ERROR_MODE_ANONYMOUS, ) -> None: self._token_validator = token_validator self.exclude_patterns = list(exclude_patterns) + self._error_mode = error_mode if error_mode in (ERROR_MODE_ANONYMOUS, ERROR_MODE_401) else ERROR_MODE_ANONYMOUS async def do_filter(self, request: Request, call_next: CallNext) -> Response: - auth_header = request.headers.get("authorization", "") - if auth_header.startswith("Bearer "): - token = auth_header[7:] # len("Bearer ") == 7 + token = self._extract_bearer(request.headers.get("authorization", "")) + + if token is not None: try: - security_context = self._token_validator.to_security_context(token) + # Offload to a worker thread: JWKS key lookup may do blocking + # urllib I/O on a cache miss, which would otherwise stall the loop. + security_context = await to_thread.run_sync(self._token_validator.to_security_context, token) except SecurityException: - logger.debug("Invalid OAuth2 token, using anonymous context") + # A token was presented but failed validation (bad signature, + # expired, wrong iss/aud, unknown kid, ...). + logger.warning("OAuth2 bearer token rejected (invalid_token)") + if self._error_mode == ERROR_MODE_401: + return self._invalid_token_response() security_context = SecurityContext.anonymous() else: + # No bearer credentials presented — anonymous; the gate decides. security_context = SecurityContext.anonymous() request.state.security_context = security_context @@ -67,3 +99,24 @@ async def do_filter(self, request: Request, call_next: CallNext) -> Response: if req_ctx is not None: req_ctx.security_context = security_context return cast(Response, await call_next(request)) + + @staticmethod + def _extract_bearer(auth_header: str) -> str | None: + """Return the token from an ``Authorization`` header, or ``None``. + + The auth scheme is matched case-insensitively (RFC 7235 §2.1: the scheme + is a case-insensitive token), so ``Bearer``, ``bearer`` and ``BEARER`` + are all accepted. + """ + parts = auth_header.split(" ", 1) + if len(parts) == 2 and parts[0].lower() == "bearer" and parts[1].strip(): + return parts[1].strip() + return None + + @staticmethod + def _invalid_token_response() -> Response: + return JSONResponse( + {"error": "invalid_token", "error_description": "The access token is invalid or expired."}, + status_code=401, + headers={"WWW-Authenticate": _INVALID_TOKEN_CHALLENGE}, + ) diff --git a/tests/integration/test_oauth2_keycloak_integration.py b/tests/integration/test_oauth2_keycloak_integration.py new file mode 100644 index 00000000..305f1a2c --- /dev/null +++ b/tests/integration/test_oauth2_keycloak_integration.py @@ -0,0 +1,280 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Real **Keycloak** integration test for the OAuth2 resource server. + +Boots an actual Keycloak in Docker (testcontainers), provisions a realm, a +public client (direct-access grants), a realm role and a user via the Admin REST +API, then obtains a **real** access token from Keycloak's token endpoint and +validates it through PyFly's resource server: + +* OIDC discovery against Keycloak's real ``/.well-known/openid-configuration``. +* Signature verification against Keycloak's real JWKS (``/protocol/openid-connect/certs``). +* ``iss`` / ``aud`` / ``exp`` validation and Keycloak realm-role claim mapping + (``realm_access.roles``) onto a :class:`SecurityContext`. +* End-to-end through ``create_app`` + the auto-wired resource-server filter. + +Marked ``integration`` (auto-applied by tests/integration/conftest.py); skips +when Docker is unavailable, fails hard in the CI integration job +(``PYFLY_INTEGRATION_REQUIRE_DOCKER=1``). +""" + +from __future__ import annotations + +import contextlib +from collections.abc import AsyncIterator, Iterator +from typing import Any + +import httpx +import pytest + +from pyfly.testing import is_docker_available + +pytestmark = pytest.mark.integration + +KEYCLOAK_IMAGE = "quay.io/keycloak/keycloak:25.0.6" +REALM = "pyfly-test" +CLIENT_ID = "pyfly-api" +USERNAME = "alice" +PASSWORD = "alice-secret" +REALM_ROLE = "CdM.Gd" + + +# --------------------------------------------------------------------------- +# Keycloak container + provisioning +# --------------------------------------------------------------------------- +@pytest.fixture(scope="module") +def keycloak() -> Iterator[dict[str, str]]: + """Start Keycloak, provision realm/client/role/user, yield connection info.""" + if not is_docker_available(): + pytest.skip("Docker not available for the Keycloak integration test") + + from testcontainers.core.container import DockerContainer + from testcontainers.core.waiting_utils import wait_for_logs + + container = ( + DockerContainer(KEYCLOAK_IMAGE) + # Cover both the 25.x (KEYCLOAK_ADMIN) and 26.x (KC_BOOTSTRAP_ADMIN_*) names. + .with_env("KEYCLOAK_ADMIN", "admin") + .with_env("KEYCLOAK_ADMIN_PASSWORD", "admin") + .with_env("KC_BOOTSTRAP_ADMIN_USERNAME", "admin") + .with_env("KC_BOOTSTRAP_ADMIN_PASSWORD", "admin") + .with_exposed_ports(8080) + .with_command("start-dev") + ) + with container: + wait_for_logs(container, "Running the server in development mode", timeout=240) + host = container.get_container_host_ip() + port = container.get_exposed_port(8080) + base = f"http://{host}:{port}" + _provision(base) + yield {"base": base, "issuer": f"{base}/realms/{REALM}"} + + +def _provision(base: str) -> None: + """Create the realm, client, role and user via the Keycloak Admin REST API.""" + with httpx.Client(base_url=base, timeout=30.0) as http: + admin = _retry_admin_token(http) + h = {"Authorization": f"Bearer {admin}"} + + # Realm + http.post("/admin/realms", headers=h, json={"realm": REALM, "enabled": True}).raise_for_status() + + # Public client with direct-access grants + an audience mapper so the + # access token carries aud=pyfly-api (Keycloak's default aud is "account"). + http.post( + f"/admin/realms/{REALM}/clients", + headers=h, + json={ + "clientId": CLIENT_ID, + "publicClient": True, + "directAccessGrantsEnabled": True, + "standardFlowEnabled": False, + "protocolMappers": [ + { + "name": "aud-pyfly-api", + "protocol": "openid-connect", + "protocolMapper": "oidc-audience-mapper", + "config": { + "included.client.audience": CLIENT_ID, + "id.token.claim": "false", + "access.token.claim": "true", + }, + } + ], + }, + ).raise_for_status() + + # Realm role + http.post(f"/admin/realms/{REALM}/roles", headers=h, json={"name": REALM_ROLE}).raise_for_status() + role = http.get(f"/admin/realms/{REALM}/roles/{REALM_ROLE}", headers=h).json() + + # User + password + http.post( + f"/admin/realms/{REALM}/users", + headers=h, + json={ + "username": USERNAME, + "enabled": True, + "credentials": [{"type": "password", "value": PASSWORD, "temporary": False}], + }, + ).raise_for_status() + uid = http.get(f"/admin/realms/{REALM}/users", headers=h, params={"username": USERNAME}).json()[0]["id"] + + # Assign the realm role to the user + http.post( + f"/admin/realms/{REALM}/users/{uid}/role-mappings/realm", + headers=h, + json=[{"id": role["id"], "name": role["name"]}], + ).raise_for_status() + + +def _retry_admin_token(http: httpx.Client, attempts: int = 30) -> str: + """Fetch a master-realm admin token, retrying until Keycloak is ready.""" + import time + + last: Exception | None = None + for _ in range(attempts): + try: + resp = http.post( + "/realms/master/protocol/openid-connect/token", + data={ + "grant_type": "password", + "client_id": "admin-cli", + "username": "admin", + "password": "admin", + }, + ) + resp.raise_for_status() + return str(resp.json()["access_token"]) + except Exception as exc: # not ready yet + last = exc + time.sleep(2) + raise RuntimeError(f"Keycloak admin token never became available: {last}") + + +def _user_access_token(base: str) -> str: + """Obtain a real user access token via the resource-owner password grant.""" + with httpx.Client(base_url=base, timeout=30.0) as http: + resp = http.post( + f"/realms/{REALM}/protocol/openid-connect/token", + data={ + "grant_type": "password", + "client_id": CLIENT_ID, + "username": USERNAME, + "password": PASSWORD, + "scope": "openid profile", + }, + ) + resp.raise_for_status() + return str(resp.json()["access_token"]) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- +def test_oidc_discovery_against_real_keycloak(keycloak: dict[str, str]) -> None: + from pyfly.security.oauth2.resource_server import discover_oidc + + jwks_uri, issuer = discover_oidc(keycloak["issuer"]) + assert jwks_uri.startswith(keycloak["base"]) + assert "/protocol/openid-connect/certs" in jwks_uri + assert issuer == keycloak["issuer"] + + +def test_real_keycloak_token_validates_and_maps_roles(keycloak: dict[str, str]) -> None: + from pyfly.security.oauth2.resource_server import JWKSTokenValidator, discover_oidc + + token = _user_access_token(keycloak["base"]) + jwks_uri, issuer = discover_oidc(keycloak["issuer"]) + validator = JWKSTokenValidator(jwks_uri=jwks_uri, issuer=issuer, audiences=[CLIENT_ID]) + + payload = validator.validate(token) + assert payload["iss"] == issuer + assert REALM_ROLE in payload["realm_access"]["roles"] + + ctx = validator.to_security_context(token) + assert ctx.is_authenticated + assert REALM_ROLE in ctx.roles # realm_access.roles -> SecurityContext.roles + + +def test_real_keycloak_rejects_tampered_token(keycloak: dict[str, str]) -> None: + from pyfly.kernel.exceptions import SecurityException + from pyfly.security.oauth2.resource_server import JWKSTokenValidator, discover_oidc + + token = _user_access_token(keycloak["base"]) + jwks_uri, issuer = discover_oidc(keycloak["issuer"]) + validator = JWKSTokenValidator(jwks_uri=jwks_uri, issuer=issuer, audiences=[CLIENT_ID]) + + # Flip a character in the signature segment. + head, body, sig = token.split(".") + tampered = f"{head}.{body}.{sig[:-3]}{'AAA' if sig[-3:] != 'AAA' else 'BBB'}" + with pytest.raises(SecurityException): + validator.validate(tampered) + + +@pytest.mark.asyncio +async def test_real_keycloak_end_to_end_through_filter(keycloak: dict[str, str]) -> None: + from starlette.testclient import TestClient + + from pyfly.container.stereotypes import rest_controller + from pyfly.context.application_context import ApplicationContext + from pyfly.core.config import Config + from pyfly.web.adapters.starlette.app import create_app + from pyfly.web.mappings import get_mapping, request_mapping + + @rest_controller + @request_mapping("/api/me") + class MeController: + @get_mapping("/") + async def me(self) -> dict: + from pyfly.context.request_context import RequestContext + + rc = RequestContext.current() + sc = rc.security_context if rc is not None else None + return {"user": sc.user_id if sc else None, "roles": sc.roles if sc else []} + + ctx = ApplicationContext( + Config( + { + "pyfly": { + "security": { + "enabled": "true", + "oauth2": { + "resource-server": { + "enabled": "true", + "issuer-uri": keycloak["issuer"], # OIDC discovery + "audiences": CLIENT_ID, + } + }, + } + } + } + ) + ) + ctx.register_bean(MeController) + + @contextlib.asynccontextmanager + async def _lifespan(_app: Any) -> AsyncIterator[None]: + await ctx.start() + yield + await ctx.stop() + + app = create_app(context=ctx, lifespan=_lifespan) + token = _user_access_token(keycloak["base"]) + with TestClient(app) as client: + resp = client.get("/api/me/", headers={"Authorization": f"Bearer {token}"}) + assert resp.status_code == 200 + body = resp.json() + assert body["user"] is not None + assert REALM_ROLE in body["roles"] diff --git a/tests/security/test_oauth2_resource_filter.py b/tests/security/test_oauth2_resource_filter.py new file mode 100644 index 00000000..495c9bb7 --- /dev/null +++ b/tests/security/test_oauth2_resource_filter.py @@ -0,0 +1,156 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""OAuth2ResourceServerFilter — request-chain behaviour. + +Pins: authenticated context on a valid token, anonymous fall-through on +missing/invalid tokens (default), opt-in strict ``401`` rejection with an RFC +6750 ``WWW-Authenticate`` challenge, case-insensitive Bearer scheme, and +``exclude_patterns`` skipping. +""" + +from __future__ import annotations + +import time +from typing import Any + +import jwt +import pytest +from cryptography.hazmat.primitives.asymmetric import rsa +from starlette.requests import Request +from starlette.responses import PlainTextResponse, Response + +from pyfly.kernel.exceptions import SecurityException +from pyfly.security.context import SecurityContext +from pyfly.web.adapters.starlette.filters.oauth2_resource_filter import ( + ERROR_MODE_401, + OAuth2ResourceServerFilter, +) + +_KEY = rsa.generate_private_key(public_exponent=65537, key_size=2048) + + +class _FakeValidator: + """In-memory validator: a real RS256 verify against one key, no network. + + Stands in for JWKSTokenValidator so the filter tests stay hermetic and fast; + the JWKS/HTTP path is covered in test_oauth2_resource_server.py. + """ + + def to_security_context(self, token: str) -> SecurityContext: + try: + payload = jwt.decode(token, _KEY.public_key(), algorithms=["RS256"], options={"require": ["exp"]}) + except jwt.PyJWTError as exc: + raise SecurityException(f"bad token: {exc}", code="INVALID_TOKEN") from exc + return SecurityContext(user_id=payload["sub"], roles=payload.get("roles", [])) + + +def _token(sub: str = "u", roles: list[str] | None = None) -> str: + return jwt.encode({"sub": sub, "roles": roles or [], "exp": int(time.time()) + 3600}, _KEY, algorithm="RS256") + + +def _request(path: str, auth: str | None) -> Request: + headers = [(b"authorization", auth.encode())] if auth is not None else [] + scope = { + "type": "http", + "method": "GET", + "path": path, + "headers": headers, + "query_string": b"", + "client": ("127.0.0.1", 0), + } + return Request(scope) + + +async def _run(flt: OAuth2ResourceServerFilter, req: Request) -> tuple[Response, Any]: + captured: dict[str, Any] = {} + + async def call_next(r: Request) -> Response: + captured["ctx"] = getattr(r.state, "security_context", None) + return PlainTextResponse("ok") + + resp = await flt.do_filter(req, call_next) + return resp, captured.get("ctx") + + +@pytest.mark.asyncio +async def test_valid_token_sets_authenticated_context() -> None: + flt = OAuth2ResourceServerFilter(token_validator=_FakeValidator()) + resp, ctx = await _run(flt, _request("/api/data", f"Bearer {_token('alice', ['admin'])}")) + assert resp.status_code == 200 + assert ctx.is_authenticated and ctx.user_id == "alice" and ctx.roles == ["admin"] + + +@pytest.mark.asyncio +async def test_missing_token_is_anonymous_and_proceeds() -> None: + flt = OAuth2ResourceServerFilter(token_validator=_FakeValidator()) + resp, ctx = await _run(flt, _request("/api/data", None)) + assert resp.status_code == 200 + assert ctx is not None and not ctx.is_authenticated + + +@pytest.mark.asyncio +async def test_invalid_token_anonymous_mode_proceeds() -> None: + flt = OAuth2ResourceServerFilter(token_validator=_FakeValidator()) # default anonymous + resp, ctx = await _run(flt, _request("/api/data", "Bearer not.a.jwt")) + assert resp.status_code == 200 + assert ctx is not None and not ctx.is_authenticated + + +@pytest.mark.asyncio +async def test_invalid_token_401_mode_rejects_with_challenge() -> None: + flt = OAuth2ResourceServerFilter(token_validator=_FakeValidator(), error_mode=ERROR_MODE_401) + resp, ctx = await _run(flt, _request("/api/data", "Bearer not.a.jwt")) + assert resp.status_code == 401 + assert resp.headers["WWW-Authenticate"] == 'Bearer error="invalid_token"' + assert ctx is None # call_next never reached + + +@pytest.mark.asyncio +async def test_missing_token_401_mode_still_falls_through() -> None: + # Strict mode rejects only PRESENT-but-invalid tokens; a missing token still + # falls through to the gate so public endpoints stay reachable. + flt = OAuth2ResourceServerFilter(token_validator=_FakeValidator(), error_mode=ERROR_MODE_401) + resp, ctx = await _run(flt, _request("/public", None)) + assert resp.status_code == 200 + assert ctx is not None and not ctx.is_authenticated + + +@pytest.mark.asyncio +async def test_bearer_scheme_is_case_insensitive() -> None: + flt = OAuth2ResourceServerFilter(token_validator=_FakeValidator()) + for scheme in ("Bearer", "bearer", "BEARER", "BeArEr"): + _, ctx = await _run(flt, _request("/api/data", f"{scheme} {_token('bob')}")) + assert ctx.is_authenticated and ctx.user_id == "bob", scheme + + +@pytest.mark.asyncio +async def test_non_bearer_scheme_is_ignored() -> None: + flt = OAuth2ResourceServerFilter(token_validator=_FakeValidator()) + _, ctx = await _run(flt, _request("/api/data", "Basic dXNlcjpwYXNz")) + assert ctx is not None and not ctx.is_authenticated + + +def test_exclude_patterns_skip_via_base_dispatch() -> None: + flt = OAuth2ResourceServerFilter( + token_validator=_FakeValidator(), exclude_patterns=["/actuator/*", "/api/v1/version"] + ) + assert flt.should_not_filter(_request("/actuator/health", None)) is True + assert flt.should_not_filter(_request("/actuator/health/liveness", None)) is True + assert flt.should_not_filter(_request("/api/v1/version", None)) is True + assert flt.should_not_filter(_request("/api/v1/data", None)) is False + + +def test_invalid_error_mode_falls_back_to_anonymous() -> None: + flt = OAuth2ResourceServerFilter(token_validator=_FakeValidator(), error_mode="bogus") + assert flt._error_mode == "anonymous" diff --git a/tests/security/test_oauth2_resource_server.py b/tests/security/test_oauth2_resource_server.py index b7ce0f2c..7dc019af 100644 --- a/tests/security/test_oauth2_resource_server.py +++ b/tests/security/test_oauth2_resource_server.py @@ -11,172 +11,344 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Tests for the OAuth2 Resource Server JWKS-based token validation.""" +"""OAuth2 resource-server JWKS validation — hermetic, multi-IdP. + +These tests run a **real** JWKS endpoint over localhost HTTP and mint **real** +RS256 tokens shaped like Keycloak, Microsoft Entra ID (v2.0) and AWS Cognito — +no mocks of PyJWKClient. They pin the full validation contract (signature, iss, +aud, exp with clock-skew leeway), config-driven multi-IdP claim mapping, JWKS key +rotation, and OIDC discovery. +""" from __future__ import annotations +import http.server +import json +import threading import time -from unittest.mock import MagicMock, patch +from collections.abc import Callable, Iterator +from typing import Any import jwt import pytest -from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa from pyfly.kernel.exceptions import SecurityException from pyfly.security.context import SecurityContext -from pyfly.security.oauth2.resource_server import JWKSTokenValidator +from pyfly.security.oauth2.resource_server import ( + ClaimMappings, + JWKSTokenValidator, + _flatten_strs, + _resolve_claim_path, + discover_oidc, +) # --------------------------------------------------------------------------- -# Test RSA key pair (generated once per module) +# Keys + a real localhost JWKS server # --------------------------------------------------------------------------- -_private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) -_public_key = _private_key.public_key() - - -def _create_test_token(payload: dict, kid: str = "test-kid") -> str: - """Create an RS256-signed JWT for testing (adds an ``exp`` claim by default).""" - if "exp" not in payload: - payload = {**payload, "exp": int(time.time()) + 3600} - private_pem = _private_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption(), - ) - return jwt.encode(payload, private_pem, algorithm="RS256", headers={"kid": kid}) +KEY1 = rsa.generate_private_key(public_exponent=65537, key_size=2048) +KEY2 = rsa.generate_private_key(public_exponent=65537, key_size=2048) -# --------------------------------------------------------------------------- -# Fixtures -# --------------------------------------------------------------------------- +def _jwk(pubkey: Any, kid: str) -> dict[str, Any]: + data = json.loads(jwt.algorithms.RSAAlgorithm.to_jwk(pubkey)) + data.update({"kid": kid, "use": "sig", "alg": "RS256"}) + return data -@pytest.fixture() -def mock_jwks_client(): - """Mock PyJWKClient to return our test public key.""" - with patch("pyfly.security.oauth2.resource_server.PyJWKClient") as mock_cls: - mock_instance = MagicMock() - mock_cls.return_value = mock_instance +class _JwksState: + """Mutable JWKS document served by the localhost endpoint (supports rotation).""" - # Create a signing key mock that returns the test public key. - mock_signing_key = MagicMock() - mock_signing_key.key = _public_key - mock_instance.get_signing_key_from_jwt.return_value = mock_signing_key + def __init__(self) -> None: + self.keys = [_jwk(KEY1.public_key(), "k1")] + self.issuer = "" # set by the fixture once the port is known - yield mock_instance + def document(self) -> dict[str, Any]: + return {"keys": self.keys} + def discovery(self) -> dict[str, Any]: + return {"issuer": self.issuer, "jwks_uri": f"{self.issuer}/jwks"} -# --------------------------------------------------------------------------- -# Tests -# --------------------------------------------------------------------------- +@pytest.fixture() +def jwks() -> Iterator[tuple[str, str, _JwksState]]: + """Yield ``(jwks_uri, issuer, state)`` for a live localhost JWKS server.""" + state = _JwksState() -class TestJWKSTokenValidator: - """Tests for :class:`JWKSTokenValidator`.""" + class Handler(http.server.BaseHTTPRequestHandler): + def do_GET(self) -> None: # noqa: N802 + is_discovery = self.path.endswith("/.well-known/openid-configuration") + payload = state.discovery() if is_discovery else state.document() + body = json.dumps(payload).encode() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) - def test_validate_valid_token(self, mock_jwks_client: MagicMock) -> None: - """A valid RS256 token should decode successfully.""" - validator = JWKSTokenValidator(jwks_uri="https://auth.example.com/.well-known/jwks.json") - token = _create_test_token({"sub": "user-1", "name": "Alice"}) + def log_message(self, *args: Any) -> None: # silence test server + pass - payload = validator.validate(token) + httpd = http.server.HTTPServer(("127.0.0.1", 0), Handler) + port = httpd.server_address[1] + state.issuer = f"http://127.0.0.1:{port}" + threading.Thread(target=httpd.serve_forever, daemon=True).start() + try: + yield f"{state.issuer}/jwks", state.issuer, state + finally: + httpd.shutdown() - assert payload["sub"] == "user-1" - assert payload["name"] == "Alice" - mock_jwks_client.get_signing_key_from_jwt.assert_called_once_with(token) - def test_validate_invalid_token(self, mock_jwks_client: MagicMock) -> None: - """An invalid token should raise SecurityException.""" - mock_jwks_client.get_signing_key_from_jwt.side_effect = jwt.PyJWTError( - "Key not found", - ) - validator = JWKSTokenValidator(jwks_uri="https://auth.example.com/.well-known/jwks.json") +def _mint(payload: dict[str, Any], *, key: Any = KEY1, kid: str = "k1") -> str: + body = {"iat": int(time.time()), "exp": int(time.time()) + 3600, **payload} + return jwt.encode(body, key, algorithm="RS256", headers={"kid": kid}) - with pytest.raises(SecurityException, match="Token validation failed") as exc_info: - validator.validate("garbage.token.value") - assert exc_info.value.code == "INVALID_TOKEN" +Mint = Callable[..., str] - def test_to_security_context_basic(self, mock_jwks_client: MagicMock) -> None: - """Token with sub/roles/permissions maps to a SecurityContext.""" - validator = JWKSTokenValidator(jwks_uri="https://auth.example.com/.well-known/jwks.json") - token = _create_test_token( - { - "sub": "user-42", - "roles": ["admin", "editor"], - "permissions": ["read", "write"], - }, - ) - ctx = validator.to_security_context(token) +# --------------------------------------------------------------------------- +# Claim-path resolver (pure unit) +# --------------------------------------------------------------------------- +class TestClaimPathResolver: + def test_dotted_and_wildcard_and_colon(self) -> None: + payload = { + "roles": "flat", + "realm_access": {"roles": ["a", "b"]}, + "resource_access": {"c1": {"roles": ["x"]}, "c2": {"roles": ["y"]}}, + "cognito:groups": ["g1", "g2"], + } + assert _flatten_strs(_resolve_claim_path(payload, "roles")) == ["flat"] + assert _flatten_strs(_resolve_claim_path(payload, "realm_access.roles")) == ["a", "b"] + assert _flatten_strs(_resolve_claim_path(payload, "resource_access.*.roles")) == ["x", "y"] + assert _flatten_strs(_resolve_claim_path(payload, "cognito:groups")) == ["g1", "g2"] + assert _resolve_claim_path(payload, "missing.path") == [] - assert isinstance(ctx, SecurityContext) - assert ctx.user_id == "user-42" - assert ctx.roles == ["admin", "editor"] - assert ctx.permissions == ["read", "write"] - assert ctx.is_authenticated is True - def test_to_security_context_keycloak_roles(self, mock_jwks_client: MagicMock) -> None: - """Keycloak-style realm_access.roles should be extracted.""" - validator = JWKSTokenValidator(jwks_uri="https://auth.example.com/.well-known/jwks.json") - token = _create_test_token( +# --------------------------------------------------------------------------- +# Multi-IdP token shapes +# --------------------------------------------------------------------------- +class TestKeycloak: + def test_realm_and_resource_roles_and_scope(self, jwks: tuple[str, str, _JwksState]) -> None: + jwks_uri, _, _ = jwks + iss = "https://kc.example.com/realms/cdm" + v = JWKSTokenValidator(jwks_uri=jwks_uri, issuer=iss, audiences=["cdm-api"]) + token = _mint( { + "iss": iss, + "aud": "cdm-api", "sub": "kc-user", - "realm_access": {"roles": ["realm-admin", "realm-viewer"]}, - }, + "realm_access": {"roles": ["CdM.Gd", "offline_access"]}, + "resource_access": {"cdm-api": {"roles": ["client-role-x"]}}, + "scope": "openid profile", + } ) - - ctx = validator.to_security_context(token) - + ctx = v.to_security_context(token) assert ctx.user_id == "kc-user" - assert ctx.roles == ["realm-admin", "realm-viewer"] + # Both realm AND per-client (resource_access) roles are extracted. + assert "CdM.Gd" in ctx.roles + assert "client-role-x" in ctx.roles + assert ctx.permissions == ["openid", "profile"] + + +class TestEntraID: + def test_roles_groups_scp_and_attributes(self, jwks: tuple[str, str, _JwksState]) -> None: + jwks_uri, _, _ = jwks + tid = "11111111-2222-3333-4444-555555555555" + iss = f"https://login.microsoftonline.com/{tid}/v2.0" + mappings = ClaimMappings(attribute_claims=("tid", "preferred_username")) + v = JWKSTokenValidator(jwks_uri=jwks_uri, issuer=iss, audiences=["api://cdm-backend"], claim_mappings=mappings) + token = _mint( + { + "iss": iss, + "aud": "api://cdm-backend", + "sub": "entra-sub", + "oid": "oid-abc", + "tid": tid, + "roles": ["CdM.Gn"], + "groups": ["group-guid-1"], + "scp": "Data.Read Data.Write", + "preferred_username": "ana@faes.mx", + } + ) + ctx = v.to_security_context(token) + # oid is the default principal preference over sub. + assert ctx.user_id == "oid-abc" + assert "CdM.Gn" in ctx.roles # app roles + assert "group-guid-1" in ctx.roles # groups merged into authorities + assert ctx.permissions == ["Data.Read", "Data.Write"] # scp -> permissions + assert ctx.attributes["tid"] == tid + assert ctx.attributes["preferred_username"] == "ana@faes.mx" + + +class TestCognito: + def test_access_token_no_audience(self, jwks: tuple[str, str, _JwksState]) -> None: + jwks_uri, _, _ = jwks + iss = "https://cognito-idp.us-east-1.amazonaws.com/us-east-1_AbCdEf" + # Cognito access tokens carry no 'aud': validate without configuring audiences. + v = JWKSTokenValidator(jwks_uri=jwks_uri, issuer=iss) + token = _mint( + { + "iss": iss, + "sub": "cog-sub", + "client_id": "cog-client", + "token_use": "access", + "cognito:groups": ["CdM.Gr"], + "scope": "aws.cognito.signin.user.admin", + } + ) + ctx = v.to_security_context(token) + assert ctx.user_id == "cog-sub" + assert "CdM.Gr" in ctx.roles # cognito:groups extracted - def test_to_security_context_scope_as_permissions( - self, - mock_jwks_client: MagicMock, - ) -> None: - """Space-separated 'scope' claim should be split into permissions.""" - validator = JWKSTokenValidator(jwks_uri="https://auth.example.com/.well-known/jwks.json") - token = _create_test_token({"sub": "scope-user", "scope": "read write delete"}) + def test_audience_required_rejects_aud_less_token(self, jwks: tuple[str, str, _JwksState]) -> None: + jwks_uri, _, _ = jwks + iss = "https://cognito-idp.us-east-1.amazonaws.com/us-east-1_AbCdEf" + # Configuring audiences makes the aud-less access token fail — the + # documented Cognito gotcha. validate_audience=False is the escape hatch. + v = JWKSTokenValidator(jwks_uri=jwks_uri, issuer=iss, audiences=["cog-client"]) + token = _mint({"iss": iss, "sub": "cog-sub", "token_use": "access"}) + with pytest.raises(SecurityException): + v.validate(token) - ctx = validator.to_security_context(token) + lenient = JWKSTokenValidator(jwks_uri=jwks_uri, issuer=iss, audiences=["cog-client"], validate_audience=False) + assert lenient.validate(token)["sub"] == "cog-sub" - assert ctx.permissions == ["read", "write", "delete"] - def test_validate_with_issuer(self, mock_jwks_client: MagicMock) -> None: - """Issuer claim must match when issuer is configured.""" - validator = JWKSTokenValidator( - jwks_uri="https://auth.example.com/.well-known/jwks.json", - issuer="https://auth.example.com", - ) +# --------------------------------------------------------------------------- +# Audience handling +# --------------------------------------------------------------------------- +class TestAudience: + def test_audiences_list_matches_any(self, jwks: tuple[str, str, _JwksState]) -> None: + jwks_uri, _, _ = jwks + v = JWKSTokenValidator(jwks_uri=jwks_uri, audiences=["a", "b", "c"]) + assert v.validate(_mint({"sub": "u", "aud": "b"}))["sub"] == "u" + with pytest.raises(SecurityException): + v.validate(_mint({"sub": "u", "aud": "z"})) + + def test_no_audiences_skips_aud_check(self, jwks: tuple[str, str, _JwksState]) -> None: + jwks_uri, _, _ = jwks + v = JWKSTokenValidator(jwks_uri=jwks_uri) + # A token WITH an aud still passes when no audiences are configured. + assert v.validate(_mint({"sub": "u", "aud": "whatever"}))["sub"] == "u" - # Valid issuer should succeed. - valid_token = _create_test_token( - {"sub": "user-1", "iss": "https://auth.example.com"}, - ) - payload = validator.validate(valid_token) - assert payload["sub"] == "user-1" - # Mismatched issuer should raise SecurityException. - bad_token = _create_test_token( - {"sub": "user-1", "iss": "https://evil.example.com"}, +# --------------------------------------------------------------------------- +# Clock-skew leeway +# --------------------------------------------------------------------------- +class TestClockSkew: + def test_default_leeway_accepts_small_future_skew(self, jwks: tuple[str, str, _JwksState]) -> None: + jwks_uri, _, _ = jwks + v = JWKSTokenValidator(jwks_uri=jwks_uri) # default leeway = 60s + future = int(time.time()) + 30 + token = jwt.encode( + {"sub": "u", "iat": future, "nbf": future, "exp": future + 3600}, + KEY1, + algorithm="RS256", + headers={"kid": "k1"}, ) - with pytest.raises(SecurityException, match="Token validation failed"): - validator.validate(bad_token) - - def test_validate_with_audience(self, mock_jwks_client: MagicMock) -> None: - """Audience claim must match when audience is configured.""" - validator = JWKSTokenValidator( - jwks_uri="https://auth.example.com/.well-known/jwks.json", - audience="my-api", + assert v.validate(token)["sub"] == "u" + + def test_zero_leeway_rejects_future_skew(self, jwks: tuple[str, str, _JwksState]) -> None: + jwks_uri, _, _ = jwks + v = JWKSTokenValidator(jwks_uri=jwks_uri, leeway=0) + future = int(time.time()) + 30 + token = jwt.encode( + {"sub": "u", "iat": future, "nbf": future, "exp": future + 3600}, + KEY1, + algorithm="RS256", + headers={"kid": "k1"}, ) + with pytest.raises(SecurityException): + v.validate(token) + + +# --------------------------------------------------------------------------- +# Negative cases +# --------------------------------------------------------------------------- +class TestRejections: + def test_expired(self, jwks: tuple[str, str, _JwksState]) -> None: + jwks_uri, _, _ = jwks + v = JWKSTokenValidator(jwks_uri=jwks_uri) + token = jwt.encode({"sub": "u", "exp": int(time.time()) - 120}, KEY1, algorithm="RS256", headers={"kid": "k1"}) + with pytest.raises(SecurityException) as exc: + v.validate(token) + assert exc.value.code == "INVALID_TOKEN" + + def test_missing_exp_rejected(self, jwks: tuple[str, str, _JwksState]) -> None: + jwks_uri, _, _ = jwks + v = JWKSTokenValidator(jwks_uri=jwks_uri) + token = jwt.encode({"sub": "u"}, KEY1, algorithm="RS256", headers={"kid": "k1"}) + with pytest.raises(SecurityException): + v.validate(token) + + def test_bad_signature(self, jwks: tuple[str, str, _JwksState]) -> None: + jwks_uri, _, _ = jwks + v = JWKSTokenValidator(jwks_uri=jwks_uri) + # Signed with KEY2 but presented under kid k1 (which maps to KEY1). + token = _mint({"sub": "u"}, key=KEY2, kid="k1") + with pytest.raises(SecurityException): + v.validate(token) + + def test_wrong_issuer(self, jwks: tuple[str, str, _JwksState]) -> None: + jwks_uri, _, _ = jwks + v = JWKSTokenValidator(jwks_uri=jwks_uri, issuer="https://good.example") + with pytest.raises(SecurityException): + v.validate(_mint({"sub": "u", "iss": "https://evil.example"})) + + def test_unknown_kid(self, jwks: tuple[str, str, _JwksState]) -> None: + jwks_uri, _, _ = jwks + v = JWKSTokenValidator(jwks_uri=jwks_uri) + token = _mint({"sub": "u"}, key=KEY2, kid="nope") + with pytest.raises(SecurityException): + v.validate(token) + + +# --------------------------------------------------------------------------- +# Key rotation + OIDC discovery +# --------------------------------------------------------------------------- +class TestRotationAndDiscovery: + def test_key_rotation(self, jwks: tuple[str, str, _JwksState]) -> None: + jwks_uri, _, state = jwks + state.keys.append(_jwk(KEY2.public_key(), "k2")) # rotate in a new key + v = JWKSTokenValidator(jwks_uri=jwks_uri) + token = _mint({"sub": "rotated"}, key=KEY2, kid="k2") + assert v.validate(token)["sub"] == "rotated" + + def test_oidc_discovery(self, jwks: tuple[str, str, _JwksState]) -> None: + _, issuer, _ = jwks + discovered_jwks, discovered_issuer = discover_oidc(issuer) + assert discovered_jwks == f"{issuer}/jwks" + assert discovered_issuer == issuer + v = JWKSTokenValidator(jwks_uri=discovered_jwks, issuer=discovered_issuer) + assert v.validate(_mint({"sub": "u", "iss": issuer}))["sub"] == "u" + + def test_oidc_discovery_failure(self) -> None: + with pytest.raises(SecurityException) as exc: + discover_oidc("http://127.0.0.1:1/nope", timeout=1.0) + assert exc.value.code == "OIDC_DISCOVERY_FAILED" - # Valid audience should succeed. - valid_token = _create_test_token({"sub": "user-1", "aud": "my-api"}) - payload = validator.validate(valid_token) - assert payload["sub"] == "user-1" - # Mismatched audience should raise SecurityException. - bad_token = _create_test_token({"sub": "user-1", "aud": "other-api"}) - with pytest.raises(SecurityException, match="Token validation failed"): - validator.validate(bad_token) +# --------------------------------------------------------------------------- +# Claim-mapping options +# --------------------------------------------------------------------------- +class TestClaimMappingOptions: + def test_authority_prefix(self, jwks: tuple[str, str, _JwksState]) -> None: + jwks_uri, _, _ = jwks + v = JWKSTokenValidator( + jwks_uri=jwks_uri, + claim_mappings=ClaimMappings(authority_claims=("roles",), authority_prefix="ROLE_"), + ) + ctx = v.to_security_context(_mint({"sub": "u", "roles": ["admin"]})) + assert ctx.roles == ["ROLE_admin"] + + def test_principal_falls_back_to_sub(self, jwks: tuple[str, str, _JwksState]) -> None: + jwks_uri, _, _ = jwks + v = JWKSTokenValidator(jwks_uri=jwks_uri) # default principal ("oid","sub") + ctx = v.to_security_context(_mint({"sub": "only-sub"})) + assert ctx.user_id == "only-sub" + + def test_returns_security_context_instance(self, jwks: tuple[str, str, _JwksState]) -> None: + jwks_uri, _, _ = jwks + v = JWKSTokenValidator(jwks_uri=jwks_uri) + assert isinstance(v.to_security_context(_mint({"sub": "u"})), SecurityContext) diff --git a/tests/security/test_oauth2_resource_server_wiring.py b/tests/security/test_oauth2_resource_server_wiring.py new file mode 100644 index 00000000..0ff392d6 --- /dev/null +++ b/tests/security/test_oauth2_resource_server_wiring.py @@ -0,0 +1,189 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""OAuth2 resource-server auto-configuration wiring. + +End-to-end through ``create_app``: the auto-config binds +``ResourceServerProperties``, builds a multi-IdP ``JWKSTokenValidator``, and the +resource-server filter joins the live chain and populates the request principal. +Also pins the ``@conditional_on_missing_bean(JWKSTokenValidator)`` back-off that +lets an application register its own validator subclass (the cdm-mexico +``EntraClaimsValidator`` pattern). +""" + +from __future__ import annotations + +import contextlib +import http.server +import json +import threading +import time +from collections.abc import AsyncIterator, Iterator +from typing import Any + +import jwt +import pytest +from cryptography.hazmat.primitives.asymmetric import rsa +from starlette.testclient import TestClient + +from pyfly.container.stereotypes import rest_controller +from pyfly.context.application_context import ApplicationContext +from pyfly.core.config import Config +from pyfly.security.context import SecurityContext +from pyfly.security.oauth2.resource_server import JWKSTokenValidator +from pyfly.web.adapters.starlette.app import create_app +from pyfly.web.mappings import get_mapping, request_mapping + +_KEY = rsa.generate_private_key(public_exponent=65537, key_size=2048) +_AUTHORITY_CLAIMS = "roles,realm_access.roles,resource_access.*.roles,groups,cognito:groups" + + +def _jwk(kid: str) -> dict[str, Any]: + data = json.loads(jwt.algorithms.RSAAlgorithm.to_jwk(_KEY.public_key())) + data.update({"kid": kid, "use": "sig", "alg": "RS256"}) + return data + + +@pytest.fixture() +def jwks_uri() -> Iterator[str]: + doc = {"keys": [_jwk("k1")]} + + class Handler(http.server.BaseHTTPRequestHandler): + def do_GET(self) -> None: # noqa: N802 + body = json.dumps(doc).encode() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, *args: Any) -> None: + pass + + httpd = http.server.HTTPServer(("127.0.0.1", 0), Handler) + threading.Thread(target=httpd.serve_forever, daemon=True).start() + try: + yield f"http://127.0.0.1:{httpd.server_address[1]}/jwks" + finally: + httpd.shutdown() + + +def _token(**claims: Any) -> str: + body = {"exp": int(time.time()) + 3600, **claims} + return jwt.encode(body, _KEY, algorithm="RS256", headers={"kid": "k1"}) + + +@rest_controller +@request_mapping("/api/whoami") +class WhoAmIController: + @get_mapping("/") + async def whoami(self) -> dict: + from pyfly.context.request_context import RequestContext + + ctx = RequestContext.current() + sc = ctx.security_context if ctx is not None else None + return { + "user": sc.user_id if sc else None, + "roles": sc.roles if sc else [], + "perms": sc.permissions if sc else [], + } + + +def _resource_server_config(jwks: str) -> dict[str, Any]: + return { + "pyfly": { + "security": { + "enabled": "true", + "oauth2": { + "resource-server": { + "enabled": "true", + "jwks-uri": jwks, + "issuer": "https://kc.example.com/realms/cdm", + "audiences": "cdm-api", + "scope-claim-names": "scp,scope", + "authorities-claim-names": _AUTHORITY_CLAIMS, + } + }, + } + } + } + + +def _lifespan_for(ctx: ApplicationContext) -> Any: + @contextlib.asynccontextmanager + async def _lifespan(_app: Any) -> AsyncIterator[None]: + await ctx.start() + yield + await ctx.stop() + + return _lifespan + + +def _build_app(config: dict[str, Any]) -> tuple[Any, ApplicationContext]: + ctx = ApplicationContext(Config(config)) + ctx.register_bean(WhoAmIController) + return create_app(context=ctx, lifespan=_lifespan_for(ctx)), ctx + + +@pytest.mark.asyncio +async def test_keycloak_token_populates_principal_end_to_end(jwks_uri: str) -> None: + app, _ = _build_app(_resource_server_config(jwks_uri)) + token = _token( + iss="https://kc.example.com/realms/cdm", + aud="cdm-api", + sub="kc-user", + realm_access={"roles": ["CdM.Gd"]}, + resource_access={"cdm-api": {"roles": ["client-x"]}}, + scope="read write", + ) + with TestClient(app) as client: + resp = client.get("/api/whoami/", headers={"Authorization": f"Bearer {token}"}) + assert resp.status_code == 200 + body = resp.json() + assert body["user"] == "kc-user" + assert "CdM.Gd" in body["roles"] and "client-x" in body["roles"] + assert body["perms"] == ["read", "write"] + + +@pytest.mark.asyncio +async def test_no_token_is_anonymous_end_to_end(jwks_uri: str) -> None: + app, _ = _build_app(_resource_server_config(jwks_uri)) + with TestClient(app) as client: + resp = client.get("/api/whoami/") + assert resp.status_code == 200 + assert resp.json() == {"user": None, "roles": [], "perms": []} + + +@pytest.mark.asyncio +async def test_user_validator_subclass_overrides_default(jwks_uri: str) -> None: + # cdm-mexico registers its own JWKSTokenValidator subclass; the default + # auto-config bean must back off (@conditional_on_missing_bean is subclass-aware). + class CustomValidator(JWKSTokenValidator): + def to_security_context(self, token: str) -> SecurityContext: # noqa: ARG002 + return SecurityContext(user_id="fixed-by-subclass", roles=["CdM.Gn"]) + + ctx = ApplicationContext(Config(_resource_server_config(jwks_uri))) + ctx.register_bean(WhoAmIController) + # Register the subclass as the base JWKSTokenValidator type (cdm pattern). + ctx.container.register_instance(JWKSTokenValidator, CustomValidator(jwks_uri=jwks_uri)) + app = create_app(context=ctx, lifespan=_lifespan_for(ctx)) + with TestClient(app) as client: + # Even a garbage token yields the subclass's fixed identity → proves the + # subclass validator (not the default) is wired into the filter. + resp = client.get("/api/whoami/", headers={"Authorization": "Bearer anything"}) + assert resp.status_code == 200 + assert resp.json()["user"] == "fixed-by-subclass" + + validators = ctx.get_beans_of_type(JWKSTokenValidator) + assert len(validators) == 1 + assert isinstance(validators[0], CustomValidator) diff --git a/uv.lock b/uv.lock index 1ab8d502..68910334 100644 --- a/uv.lock +++ b/uv.lock @@ -2160,7 +2160,7 @@ wheels = [ [[package]] name = "pyfly" -version = "26.6.106" +version = "26.6.107" source = { editable = "." } dependencies = [ { name = "pydantic" }, From c60cbf0bce5f1064346ffdb81504072bdb12f1d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20Contreras=20Guill=C3=A9n?= Date: Tue, 16 Jun 2026 15:57:30 +0200 Subject: [PATCH 2/2] test(security): cdm-mexico parity coverage; drop testcontainers Keycloak test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Validated the resource server against a REAL Keycloak locally (testcontainers, 4/4: OIDC discovery, JWKS signature verification, iss/aud/exp validation, realm- role claim mapping, and the full filter chain end-to-end). Per request, the Docker/testcontainers test is removed — the durable coverage is the hermetic multi-IdP suite (real localhost JWKS + real RS256 tokens, no Docker), which runs fast in the default lane and covers Keycloak/Entra/Cognito. Adds an explicit cdm-mexico (FAES México) parity test: an Entra-shaped token (CdM.Gn role + group + scp + oid + tid + cdm_entidad_id) maps correctly via pure config (roles+groups, scp->permissions, oid principal, attributes), the admin gate's raw-claim has_role("CdM.Gn") exact match works, and a rep principal is denied the gn gate — covering the use case without requiring the EntraClaimsValidator subclass (which still works via the conditional_on_missing_bean backoff). --- .../test_oauth2_keycloak_integration.py | 280 ------------------ tests/security/test_oauth2_resource_server.py | 66 +++++ 2 files changed, 66 insertions(+), 280 deletions(-) delete mode 100644 tests/integration/test_oauth2_keycloak_integration.py diff --git a/tests/integration/test_oauth2_keycloak_integration.py b/tests/integration/test_oauth2_keycloak_integration.py deleted file mode 100644 index 305f1a2c..00000000 --- a/tests/integration/test_oauth2_keycloak_integration.py +++ /dev/null @@ -1,280 +0,0 @@ -# Copyright 2026 Firefly Software Foundation. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Real **Keycloak** integration test for the OAuth2 resource server. - -Boots an actual Keycloak in Docker (testcontainers), provisions a realm, a -public client (direct-access grants), a realm role and a user via the Admin REST -API, then obtains a **real** access token from Keycloak's token endpoint and -validates it through PyFly's resource server: - -* OIDC discovery against Keycloak's real ``/.well-known/openid-configuration``. -* Signature verification against Keycloak's real JWKS (``/protocol/openid-connect/certs``). -* ``iss`` / ``aud`` / ``exp`` validation and Keycloak realm-role claim mapping - (``realm_access.roles``) onto a :class:`SecurityContext`. -* End-to-end through ``create_app`` + the auto-wired resource-server filter. - -Marked ``integration`` (auto-applied by tests/integration/conftest.py); skips -when Docker is unavailable, fails hard in the CI integration job -(``PYFLY_INTEGRATION_REQUIRE_DOCKER=1``). -""" - -from __future__ import annotations - -import contextlib -from collections.abc import AsyncIterator, Iterator -from typing import Any - -import httpx -import pytest - -from pyfly.testing import is_docker_available - -pytestmark = pytest.mark.integration - -KEYCLOAK_IMAGE = "quay.io/keycloak/keycloak:25.0.6" -REALM = "pyfly-test" -CLIENT_ID = "pyfly-api" -USERNAME = "alice" -PASSWORD = "alice-secret" -REALM_ROLE = "CdM.Gd" - - -# --------------------------------------------------------------------------- -# Keycloak container + provisioning -# --------------------------------------------------------------------------- -@pytest.fixture(scope="module") -def keycloak() -> Iterator[dict[str, str]]: - """Start Keycloak, provision realm/client/role/user, yield connection info.""" - if not is_docker_available(): - pytest.skip("Docker not available for the Keycloak integration test") - - from testcontainers.core.container import DockerContainer - from testcontainers.core.waiting_utils import wait_for_logs - - container = ( - DockerContainer(KEYCLOAK_IMAGE) - # Cover both the 25.x (KEYCLOAK_ADMIN) and 26.x (KC_BOOTSTRAP_ADMIN_*) names. - .with_env("KEYCLOAK_ADMIN", "admin") - .with_env("KEYCLOAK_ADMIN_PASSWORD", "admin") - .with_env("KC_BOOTSTRAP_ADMIN_USERNAME", "admin") - .with_env("KC_BOOTSTRAP_ADMIN_PASSWORD", "admin") - .with_exposed_ports(8080) - .with_command("start-dev") - ) - with container: - wait_for_logs(container, "Running the server in development mode", timeout=240) - host = container.get_container_host_ip() - port = container.get_exposed_port(8080) - base = f"http://{host}:{port}" - _provision(base) - yield {"base": base, "issuer": f"{base}/realms/{REALM}"} - - -def _provision(base: str) -> None: - """Create the realm, client, role and user via the Keycloak Admin REST API.""" - with httpx.Client(base_url=base, timeout=30.0) as http: - admin = _retry_admin_token(http) - h = {"Authorization": f"Bearer {admin}"} - - # Realm - http.post("/admin/realms", headers=h, json={"realm": REALM, "enabled": True}).raise_for_status() - - # Public client with direct-access grants + an audience mapper so the - # access token carries aud=pyfly-api (Keycloak's default aud is "account"). - http.post( - f"/admin/realms/{REALM}/clients", - headers=h, - json={ - "clientId": CLIENT_ID, - "publicClient": True, - "directAccessGrantsEnabled": True, - "standardFlowEnabled": False, - "protocolMappers": [ - { - "name": "aud-pyfly-api", - "protocol": "openid-connect", - "protocolMapper": "oidc-audience-mapper", - "config": { - "included.client.audience": CLIENT_ID, - "id.token.claim": "false", - "access.token.claim": "true", - }, - } - ], - }, - ).raise_for_status() - - # Realm role - http.post(f"/admin/realms/{REALM}/roles", headers=h, json={"name": REALM_ROLE}).raise_for_status() - role = http.get(f"/admin/realms/{REALM}/roles/{REALM_ROLE}", headers=h).json() - - # User + password - http.post( - f"/admin/realms/{REALM}/users", - headers=h, - json={ - "username": USERNAME, - "enabled": True, - "credentials": [{"type": "password", "value": PASSWORD, "temporary": False}], - }, - ).raise_for_status() - uid = http.get(f"/admin/realms/{REALM}/users", headers=h, params={"username": USERNAME}).json()[0]["id"] - - # Assign the realm role to the user - http.post( - f"/admin/realms/{REALM}/users/{uid}/role-mappings/realm", - headers=h, - json=[{"id": role["id"], "name": role["name"]}], - ).raise_for_status() - - -def _retry_admin_token(http: httpx.Client, attempts: int = 30) -> str: - """Fetch a master-realm admin token, retrying until Keycloak is ready.""" - import time - - last: Exception | None = None - for _ in range(attempts): - try: - resp = http.post( - "/realms/master/protocol/openid-connect/token", - data={ - "grant_type": "password", - "client_id": "admin-cli", - "username": "admin", - "password": "admin", - }, - ) - resp.raise_for_status() - return str(resp.json()["access_token"]) - except Exception as exc: # not ready yet - last = exc - time.sleep(2) - raise RuntimeError(f"Keycloak admin token never became available: {last}") - - -def _user_access_token(base: str) -> str: - """Obtain a real user access token via the resource-owner password grant.""" - with httpx.Client(base_url=base, timeout=30.0) as http: - resp = http.post( - f"/realms/{REALM}/protocol/openid-connect/token", - data={ - "grant_type": "password", - "client_id": CLIENT_ID, - "username": USERNAME, - "password": PASSWORD, - "scope": "openid profile", - }, - ) - resp.raise_for_status() - return str(resp.json()["access_token"]) - - -# --------------------------------------------------------------------------- -# Tests -# --------------------------------------------------------------------------- -def test_oidc_discovery_against_real_keycloak(keycloak: dict[str, str]) -> None: - from pyfly.security.oauth2.resource_server import discover_oidc - - jwks_uri, issuer = discover_oidc(keycloak["issuer"]) - assert jwks_uri.startswith(keycloak["base"]) - assert "/protocol/openid-connect/certs" in jwks_uri - assert issuer == keycloak["issuer"] - - -def test_real_keycloak_token_validates_and_maps_roles(keycloak: dict[str, str]) -> None: - from pyfly.security.oauth2.resource_server import JWKSTokenValidator, discover_oidc - - token = _user_access_token(keycloak["base"]) - jwks_uri, issuer = discover_oidc(keycloak["issuer"]) - validator = JWKSTokenValidator(jwks_uri=jwks_uri, issuer=issuer, audiences=[CLIENT_ID]) - - payload = validator.validate(token) - assert payload["iss"] == issuer - assert REALM_ROLE in payload["realm_access"]["roles"] - - ctx = validator.to_security_context(token) - assert ctx.is_authenticated - assert REALM_ROLE in ctx.roles # realm_access.roles -> SecurityContext.roles - - -def test_real_keycloak_rejects_tampered_token(keycloak: dict[str, str]) -> None: - from pyfly.kernel.exceptions import SecurityException - from pyfly.security.oauth2.resource_server import JWKSTokenValidator, discover_oidc - - token = _user_access_token(keycloak["base"]) - jwks_uri, issuer = discover_oidc(keycloak["issuer"]) - validator = JWKSTokenValidator(jwks_uri=jwks_uri, issuer=issuer, audiences=[CLIENT_ID]) - - # Flip a character in the signature segment. - head, body, sig = token.split(".") - tampered = f"{head}.{body}.{sig[:-3]}{'AAA' if sig[-3:] != 'AAA' else 'BBB'}" - with pytest.raises(SecurityException): - validator.validate(tampered) - - -@pytest.mark.asyncio -async def test_real_keycloak_end_to_end_through_filter(keycloak: dict[str, str]) -> None: - from starlette.testclient import TestClient - - from pyfly.container.stereotypes import rest_controller - from pyfly.context.application_context import ApplicationContext - from pyfly.core.config import Config - from pyfly.web.adapters.starlette.app import create_app - from pyfly.web.mappings import get_mapping, request_mapping - - @rest_controller - @request_mapping("/api/me") - class MeController: - @get_mapping("/") - async def me(self) -> dict: - from pyfly.context.request_context import RequestContext - - rc = RequestContext.current() - sc = rc.security_context if rc is not None else None - return {"user": sc.user_id if sc else None, "roles": sc.roles if sc else []} - - ctx = ApplicationContext( - Config( - { - "pyfly": { - "security": { - "enabled": "true", - "oauth2": { - "resource-server": { - "enabled": "true", - "issuer-uri": keycloak["issuer"], # OIDC discovery - "audiences": CLIENT_ID, - } - }, - } - } - } - ) - ) - ctx.register_bean(MeController) - - @contextlib.asynccontextmanager - async def _lifespan(_app: Any) -> AsyncIterator[None]: - await ctx.start() - yield - await ctx.stop() - - app = create_app(context=ctx, lifespan=_lifespan) - token = _user_access_token(keycloak["base"]) - with TestClient(app) as client: - resp = client.get("/api/me/", headers={"Authorization": f"Bearer {token}"}) - assert resp.status_code == 200 - body = resp.json() - assert body["user"] is not None - assert REALM_ROLE in body["roles"] diff --git a/tests/security/test_oauth2_resource_server.py b/tests/security/test_oauth2_resource_server.py index 7dc019af..c6df8866 100644 --- a/tests/security/test_oauth2_resource_server.py +++ b/tests/security/test_oauth2_resource_server.py @@ -181,6 +181,72 @@ def test_roles_groups_scp_and_attributes(self, jwks: tuple[str, str, _JwksState] assert ctx.attributes["preferred_username"] == "ana@faes.mx" +class TestCdMMexicoUseCase: + """cdm-mexico (FAES México) Entra ID resource-server contract. + + Proves the use case is covered by **pure configuration** — the framework now + reproduces what cdm's ``EntraClaimsValidator`` subclass did (roles + groups, + ``scp`` scopes, ``oid`` principal, ``tid``/``cdm_entidad_id`` attributes), so + an adopter can either configure claim mapping or still subclass. + """ + + def test_entra_token_maps_like_entra_claims_validator(self, jwks: tuple[str, str, _JwksState]) -> None: + jwks_uri, _, _ = jwks + tid = "11111111-2222-3333-4444-555555555555" + iss = f"https://login.microsoftonline.com/{tid}/v2.0" + # The cdm-mexico claim mapping, expressed as config (no subclass needed). + mappings = ClaimMappings( + principal_claims=("oid", "sub"), + authority_claims=("roles", "groups"), # cdm appends groups to roles + scope_claims=("scp",), + attribute_claims=("tid", "preferred_username", "cdm_entidad_id", "employeeid", "oid"), + ) + v = JWKSTokenValidator(jwks_uri=jwks_uri, issuer=iss, audiences=["api://cdm-backend"], claim_mappings=mappings) + token = _mint( + { + "iss": iss, + "aud": "api://cdm-backend", + "sub": "entra-sub", + "oid": "oid-stable", + "tid": tid, + "roles": ["CdM.Gn"], + "groups": ["grp-guid-1"], + "scp": "Cdm.Read", + "preferred_username": "director@faes.mx", + "cdm_entidad_id": "MX0000064", + } + ) + ctx = v.to_security_context(token) + + # Principal prefers the stable Entra object id. + assert ctx.user_id == "oid-stable" + # Raw role claim is preserved verbatim, and the admin gate's exact-match + # check (cdm checks the raw "CdM.Gn") works. + assert ctx.has_role("CdM.Gn") + assert "grp-guid-1" in ctx.roles # group object-ids drive role mapping too + # Entra delegated scopes (scp) become permissions. + assert ctx.permissions == ["Cdm.Read"] + # Row-scope attributes are carried through. + assert ctx.attributes["cdm_entidad_id"] == "MX0000064" + assert ctx.attributes["tid"] == tid + assert ctx.attributes["preferred_username"] == "director@faes.mx" + + def test_gn_admin_gate_denies_non_gn_principal(self, jwks: tuple[str, str, _JwksState]) -> None: + jwks_uri, _, _ = jwks + iss = "https://login.microsoftonline.com/tid/v2.0" + v = JWKSTokenValidator( + jwks_uri=jwks_uri, + issuer=iss, + audiences=["api://cdm-backend"], + claim_mappings=ClaimMappings(authority_claims=("roles",)), + ) + token = _mint({"iss": iss, "aud": "api://cdm-backend", "sub": "rep", "roles": ["CdM.Rep"]}) + ctx = v.to_security_context(token) + # The admin URL gate / @pre_authorize checks the raw "CdM.Gn"; a rep must fail it. + assert ctx.has_role("CdM.Rep") + assert not ctx.has_role("CdM.Gn") + + class TestCognito: def test_access_token_no_audience(self, jwks: tuple[str, str, _JwksState]) -> None: jwks_uri, _, _ = jwks