diff --git a/spp_dci_server_social/services/search_service.py b/spp_dci_server_social/services/search_service.py index 456e63e2..0a7a22db 100644 --- a/spp_dci_server_social/services/search_service.py +++ b/spp_dci_server_social/services/search_service.py @@ -2,6 +2,7 @@ """DCI Social Registry Search Service - Maps Odoo partners to DCI schemas.""" import logging +import re import uuid from datetime import UTC, datetime from typing import Any @@ -34,6 +35,37 @@ "social", } +# DCI CEL metrics that expose registry-derived facts which are not safe for +# unauthorised external predicate filtering. DCI Social Registry search +# predicates are a caller-supplied oracle (including total_count), so a boolean +# metric like r.dci.crvs.is_alive == false discloses the value one query at a +# time even though it never appears in the response schema. Deny these +# sensitive metrics before compiling the expression. +# +# Scope: disability (r.dci.dr.*) and vital/civil-status (r.dci.crvs.*) +# accessors -- the private-data tier this guard protects. Lower-risk Social +# Registry / inter-registry metrics (r.dci.sr.*, r.dci.ibr.*) are intentionally +# left filterable. +# +# Keep this list local instead of importing spp_dci_indicators: that addon is +# optional and is not a dependency of the DCI Social Registry server. +_DCI_PREDICATE_DENIED_METRICS = frozenset( + ( + # Disability registry (functional severity + disability flags) + "r.dci.dr.severity", + "r.dci.dr.has_disability", + "r.dci.dr.assessed", + "r.dci.dr.vision_severe", + "r.dci.dr.hearing_severe", + "r.dci.dr.mobility_severe", + # CRVS (vital events / civil status) + "r.dci.crvs.has_event", + "r.dci.crvs.is_alive", + "r.dci.crvs.birth_verified", + "r.dci.crvs.is_married", + ) +) + class DCISocialSearchService: """Service for DCI Social Registry search operations. @@ -366,6 +398,8 @@ def _parse_predicate(self, predicate) -> list: if not expression or not expression.strip(): return [] + self._validate_external_predicate_expression(expression) + # Use CEL service to compile expression to domain cel_service = self.env["spp.cel.service"] @@ -387,6 +421,27 @@ def _parse_predicate(self, predicate) -> list: return result.get("domain", []) + def _validate_external_predicate_expression(self, expression: str) -> None: + """Reject sensitive DCI metrics in sender-supplied predicates. + + DCI predicate searches return counts and pageable matches, so allowing + callers to filter on raw DCI indicator cache values can disclose the + value by repeated queries even when the value is not present in the + response schema. Internal CEL use can still compile these metrics; this + guard only applies to external Social Registry predicate search. + """ + for accessor in _DCI_PREDICATE_DENIED_METRICS: + # Match the accessor as a dotted member path in any spelling that + # resolves to the metric: bare (r.dci.dr.has_disability), called + # (r.dci.dr.severity('Vision')), or quoted inside metric('...'). + # CEL ignores whitespace around member-selection dots, so allow it + # (`r . dci . dr . severity`). The boundaries reject a denied name + # that is only a prefix/substring of a longer identifier + # (r.dci.dr.severity_score, my_r.dci..., r.dci.crvs.is_alive_x). + accessor_pattern = r"\s*\.\s*".join(map(re.escape, accessor.split("."))) + if re.search(rf"(? list: """ Parse DCI expression into Odoo domain. diff --git a/spp_dci_server_social/tests/test_search_service_internals.py b/spp_dci_server_social/tests/test_search_service_internals.py index 3a2fa694..bf94d99c 100644 --- a/spp_dci_server_social/tests/test_search_service_internals.py +++ b/spp_dci_server_social/tests/test_search_service_internals.py @@ -84,6 +84,46 @@ def test_parse_predicate_compile_failure_raises(self): self.service._parse_predicate("r.broken ==") self.assertIn("bad syntax", str(ctx.exception)) + def test_parse_predicate_rejects_sensitive_dci_method_metrics(self): + blocked = [ + # Parameterized methods (accessor-call + metric() forms, incl. spaced dots) + "r.dci.dr.severity('Vision') >= 3", + "r . dci . dr . severity('Vision') >= 3", + "r.dci.crvs.has_event('death') == true", + "metric('r.dci.dr.severity', me, arg='Vision') >= 3", + 'metric("r.dci.crvs.has_event", me, arg="death") == true', + # Non-parameterized disability flags (boolean oracles) + "r.dci.dr.has_disability == true", + "r.dci.dr.vision_severe == true", + "metric('r.dci.dr.mobility_severe', me) == true", + # Non-parameterized CRVS vital/civil status (boolean oracles) + "r.dci.crvs.is_alive == false", + "r.dci.crvs.is_married == true", + "metric('r.dci.crvs.birth_verified', me) == true", + ] + for expression in blocked: + # Through _parse_predicate so the test also pins that the guard is + # wired in ahead of CEL compilation (it raises before the compiler). + with self.assertRaises(ValueError) as ctx: + self.service._parse_predicate(expression) + self.assertIn("sensitive DCI metric", str(ctx.exception)) + + def test_validate_external_predicate_allows_benign_and_lower_risk_metrics(self): + # The guard must not over-match: benign registry predicates, identifiers + # that merely contain a denied name as a substring, and the intentionally + # allowed lower-risk SR/IBR metrics all pass validation untouched. + allowed = [ + "r.gender == 'female'", + "age_years(r.birthdate) >= 18", + "r.dci.dr.severity_score >= 3", # not a call, different accessor + "my_r.dci.dr.severity('Vision') >= 3", # prefixed identifier + "r.dci.sr.household_size >= 5", + "r.dci.ibr.has_duplicate == true", + ] + for expression in allowed: + # Should not raise. + self.service._validate_external_predicate_expression(expression) + # --- _to_dci_member ------------------------------------------------------ def test_to_dci_member_with_identifier_and_demographics(self):