Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions spp_dci_server_social/services/search_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""DCI Social Registry Search Service - Maps Odoo partners to DCI schemas."""

import logging
import re
import uuid
from datetime import UTC, datetime
from typing import Any
Expand Down Expand Up @@ -34,6 +35,37 @@
"social",
}

# DCI CEL metrics that expose registry-derived facts which are not safe for
# unauthorised external predicate filtering. DCI Social Registry search
# predicates are a caller-supplied oracle (including total_count), so a boolean
# metric like r.dci.crvs.is_alive == false discloses the value one query at a
# time even though it never appears in the response schema. Deny these
# sensitive metrics before compiling the expression.
#
# Scope: disability (r.dci.dr.*) and vital/civil-status (r.dci.crvs.*)
# accessors -- the private-data tier this guard protects. Lower-risk Social
# Registry / inter-registry metrics (r.dci.sr.*, r.dci.ibr.*) are intentionally
# left filterable.
#
# Keep this list local instead of importing spp_dci_indicators: that addon is
# optional and is not a dependency of the DCI Social Registry server.
_DCI_PREDICATE_DENIED_METRICS = frozenset(
(
# Disability registry (functional severity + disability flags)
"r.dci.dr.severity",
"r.dci.dr.has_disability",
"r.dci.dr.assessed",
"r.dci.dr.vision_severe",
"r.dci.dr.hearing_severe",
"r.dci.dr.mobility_severe",
# CRVS (vital events / civil status)
"r.dci.crvs.has_event",
"r.dci.crvs.is_alive",
"r.dci.crvs.birth_verified",
"r.dci.crvs.is_married",
)
)


class DCISocialSearchService:
"""Service for DCI Social Registry search operations.
Expand Down Expand Up @@ -366,6 +398,8 @@ def _parse_predicate(self, predicate) -> list:
if not expression or not expression.strip():
return []

self._validate_external_predicate_expression(expression)

# Use CEL service to compile expression to domain
cel_service = self.env["spp.cel.service"]

Expand All @@ -387,6 +421,27 @@ def _parse_predicate(self, predicate) -> list:

return result.get("domain", [])

def _validate_external_predicate_expression(self, expression: str) -> None:
"""Reject sensitive DCI metrics in sender-supplied predicates.

DCI predicate searches return counts and pageable matches, so allowing
callers to filter on raw DCI indicator cache values can disclose the
value by repeated queries even when the value is not present in the
response schema. Internal CEL use can still compile these metrics; this
guard only applies to external Social Registry predicate search.
"""
for accessor in _DCI_PREDICATE_DENIED_METRICS:
# Match the accessor as a dotted member path in any spelling that
# resolves to the metric: bare (r.dci.dr.has_disability), called
# (r.dci.dr.severity('Vision')), or quoted inside metric('...').
# CEL ignores whitespace around member-selection dots, so allow it
# (`r . dci . dr . severity`). The boundaries reject a denied name
# that is only a prefix/substring of a longer identifier
# (r.dci.dr.severity_score, my_r.dci..., r.dci.crvs.is_alive_x).
accessor_pattern = r"\s*\.\s*".join(map(re.escape, accessor.split(".")))
if re.search(rf"(?<![\w.]){accessor_pattern}(?![\w.])", expression):
raise ValueError(_("Predicate searches cannot filter on sensitive DCI metric '%s'.") % accessor)
Comment thread
gonzalesedwin1123 marked this conversation as resolved.

def _parse_expression(self, expression) -> list:
"""
Parse DCI expression into Odoo domain.
Expand Down
40 changes: 40 additions & 0 deletions spp_dci_server_social/tests/test_search_service_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,46 @@ def test_parse_predicate_compile_failure_raises(self):
self.service._parse_predicate("r.broken ==")
self.assertIn("bad syntax", str(ctx.exception))

def test_parse_predicate_rejects_sensitive_dci_method_metrics(self):
blocked = [
# Parameterized methods (accessor-call + metric() forms, incl. spaced dots)
"r.dci.dr.severity('Vision') >= 3",
"r . dci . dr . severity('Vision') >= 3",
"r.dci.crvs.has_event('death') == true",
"metric('r.dci.dr.severity', me, arg='Vision') >= 3",
'metric("r.dci.crvs.has_event", me, arg="death") == true',
# Non-parameterized disability flags (boolean oracles)
"r.dci.dr.has_disability == true",
"r.dci.dr.vision_severe == true",
"metric('r.dci.dr.mobility_severe', me) == true",
# Non-parameterized CRVS vital/civil status (boolean oracles)
"r.dci.crvs.is_alive == false",
"r.dci.crvs.is_married == true",
"metric('r.dci.crvs.birth_verified', me) == true",
]
Comment thread
gonzalesedwin1123 marked this conversation as resolved.
for expression in blocked:
# Through _parse_predicate so the test also pins that the guard is
# wired in ahead of CEL compilation (it raises before the compiler).
with self.assertRaises(ValueError) as ctx:
self.service._parse_predicate(expression)
self.assertIn("sensitive DCI metric", str(ctx.exception))

def test_validate_external_predicate_allows_benign_and_lower_risk_metrics(self):
# The guard must not over-match: benign registry predicates, identifiers
# that merely contain a denied name as a substring, and the intentionally
# allowed lower-risk SR/IBR metrics all pass validation untouched.
allowed = [
"r.gender == 'female'",
"age_years(r.birthdate) >= 18",
"r.dci.dr.severity_score >= 3", # not a call, different accessor
"my_r.dci.dr.severity('Vision') >= 3", # prefixed identifier
"r.dci.sr.household_size >= 5",
"r.dci.ibr.has_duplicate == true",
]
for expression in allowed:
# Should not raise.
self.service._validate_external_predicate_expression(expression)

# --- _to_dci_member ------------------------------------------------------

def test_to_dci_member_with_identifier_and_demographics(self):
Expand Down
Loading