Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 107 additions & 18 deletions spp_dci_server/models/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ class DCISubscription(models.Model):
string="Filter Expression",
help="Optional DCI expression to filter which events trigger notifications",
)
filter_type = fields.Char(
string="Filter Type",
help=(
"How to interpret filter_expression (DCI query_type: idtype-value, "
"expression, predicate). Required to evaluate the filter at notify time; "
"an unparseable/unknown filter is treated as non-matching (fail closed)."
),
)

# Subscription State
active = fields.Boolean(
Expand Down Expand Up @@ -195,28 +203,101 @@ def check_expired_subscriptions(self):
_logger.info("Deactivated %d expired subscriptions", len(expired))
return True

def notify_event(self, event_type: str, records: list, reg_type: str):
"""Queue notifications for matching subscriptions.
@api.model
def _matching_subscriptions(self, event_type: str, reg_type: str):
"""Active, unexpired subscriptions for an event/registry type."""
return self.search(
[
("event_type", "=", event_type),
("reg_type", "=", reg_type),
("active", "=", True),
("state", "=", "active"),
"|",
("expires", "=", False),
("expires", ">", fields.Datetime.now()),
]
)

Called when a registry event occurs (registration, update, delete).
def _consent_allows_partner(self, partner_id: int) -> bool:
"""Whether this subscription's sender may receive this registrant's data.

Args:
event_type: Type of event (registration, update, delete)
records: List of affected record data dicts
reg_type: Registry type (SOCIAL_REGISTRY, DR, etc.)
Uses the proper consent primitive (legal-basis bypass, or per-registrant
consent via the API consent service). Fail-closed when no sender.
"""
# Find matching active subscriptions
domain = [
("event_type", "=", event_type),
("reg_type", "=", reg_type),
("active", "=", True),
("state", "=", "active"),
"|",
("expires", "=", False),
("expires", ">", fields.Datetime.now()),
self.ensure_one()
from ..services.consent_adapter import DCIConsentAdapter

adapter = DCIConsentAdapter(self.env, self.sender_id)
if adapter.has_legal_basis_bypass():
return True
return adapter.can_access_registrant(partner_id)

def _filter_matching_partners(self, partner_ids: list) -> list:
"""Subset of partner_ids matching this subscription's filter_expression.

Base policy: no filter -> all; a filter that this (generic) layer cannot
evaluate -> none (fail closed). Registry modules override this to
evaluate their filter shape in a single batched query (avoiding an
O(partners x subscriptions) per-record query pattern).
"""
self.ensure_one()
if not self.filter_expression:
return list(partner_ids or [])
return []

def _partner_matches_filter(self, partner_id: int) -> bool:
"""Whether a single partner matches this subscription's filter."""
self.ensure_one()
return bool(self._filter_matching_partners([partner_id]))

def _eligible_partner_ids(self, partner_ids: list) -> list:
"""Subset of partner_ids this subscription is allowed to be told about.

Applies consent per record, then the filter in a single batch.
"""
self.ensure_one()
if not partner_ids:
return []
consented = [pid for pid in partner_ids if self._consent_allows_partner(pid)]
return self._filter_matching_partners(consented)

def _build_notification_records(self, partner_ids: list) -> list:
"""Build the DCI record payloads for partner_ids (registry-specific).

Base returns nothing; registry server modules (e.g. spp_dci_server_social)
override this to materialise records with this subscription's sender scope.
"""
self.ensure_one()
return []

def _delete_records_for(self, delete_payloads: list) -> list:
"""Identifier payloads this subscription is eligible to receive on delete.

Eligibility is precomputed at unlink time (while the record still exists)
and carried per payload as ``eligible_subscription_ids``.
"""
self.ensure_one()
return [
{"identifiers": p.get("identifiers", [])}
for p in (delete_payloads or [])
if self.id in (p.get("eligible_subscription_ids") or [])
]
subscriptions = self.search(domain)

def notify_event(self, event_type: str, partner_ids: list, reg_type: str, delete_payloads: list = None):
"""Queue notifications for matching subscriptions, scoped per subscriber.

Each matching subscription only receives records its sender is permitted
to see (consent/legal-basis) and that match its filter_expression; the
payload is built with that subscription's sender context.

Args:
event_type: Event type (registration, update, delete)
partner_ids: Affected res.partner ids (create/update). Ignored for delete.
reg_type: Registry type (SOCIAL_REGISTRY, DR, ...)
delete_payloads: For delete, per-record identifier payloads carrying
precomputed ``eligible_subscription_ids`` (records are gone by now).
"""
subscriptions = self._matching_subscriptions(event_type, reg_type)
_logger.info(
"Found %d subscriptions for event %s/%s",
len(subscriptions),
Expand All @@ -225,7 +306,15 @@ def notify_event(self, event_type: str, records: list, reg_type: str):
)

for sub in subscriptions:
# Queue notification job for each subscription
if event_type == "delete":
records = sub._delete_records_for(delete_payloads or [])
else:
eligible = sub._eligible_partner_ids(partner_ids or [])
records = sub._build_notification_records(eligible) if eligible else []

if not records:
continue

sub.with_delay(
channel="dci",
timeout=60,
Expand Down
1 change: 1 addition & 0 deletions spp_dci_server/routers/async_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ async def subscribe(
"event_type": event_type,
"reg_type": reg_type,
"filter_expression": filter_expression,
"filter_type": req_item.subscribe_criteria.filter_type,
"original_message_id": envelope.header.message_id,
"original_transaction_id": sub_request.transaction_id,
"state": "pending",
Expand Down
78 changes: 72 additions & 6 deletions spp_dci_server/tests/test_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,15 +269,21 @@ def test_notify_event_finds_matching_subscriptions(self):
}
)

# Patch with_delay at the model class level to avoid read-only attribute error
with patch.object(type(self.Subscription), "with_delay") as mock_delay:
mock_job = MagicMock()
mock_delay.return_value = mock_job
# notify_event now scopes per subscription: it queues a job only when the
# subscription's sender is eligible AND records can be built. The generic
# layer builds no records (registry modules do), so stub those hooks to
# verify the matching/queueing orchestration.
with (
patch.object(type(self.Subscription), "with_delay") as mock_delay,
patch.object(type(self.Subscription), "_eligible_partner_ids", return_value=[1]),
patch.object(type(self.Subscription), "_build_notification_records", return_value=[{"id": "rec-001"}]),
):
mock_delay.return_value = MagicMock()

# Trigger notification
self.Subscription.notify_event(
event_type="registration",
records=[{"id": "rec-001", "name": "Test"}],
partner_ids=[1],
reg_type="SOCIAL_REGISTRY",
)

Expand All @@ -302,13 +308,73 @@ def test_notify_event_ignores_inactive(self):
# Trigger notification
self.Subscription.notify_event(
event_type="registration",
records=[{"id": "rec-001"}],
partner_ids=[1],
reg_type="SOCIAL_REGISTRY",
)

# Should not have been called for inactive subscription
mock_delay.assert_not_called()

def _make_sub(self, **vals):
base = {
"sender_id": self.test_sender.id,
"event_type": "update",
"reg_type": "SOCIAL_REGISTRY",
"state": "active",
}
base.update(vals)
return self.Subscription.create(base)

def test_filter_matching_partners_base_policy(self):
"""Generic layer: no filter -> all; an (unparseable here) filter -> none."""
self.assertEqual(self._make_sub()._filter_matching_partners([1, 2]), [1, 2])
# A present filter cannot be evaluated by the generic layer -> fail closed.
self.assertEqual(self._make_sub(filter_expression="{}")._filter_matching_partners([1, 2]), [])

def test_partner_matches_filter_wrapper(self):
self.assertTrue(self._make_sub()._partner_matches_filter(1))
self.assertFalse(self._make_sub(filter_expression="{}")._partner_matches_filter(1))

def test_consent_allows_partner_with_legal_basis_bypass(self):
self.test_sender.write({"legal_basis": "legal_obligation"})
# Bypass short-circuits before any per-registrant consent lookup.
self.assertTrue(self._make_sub()._consent_allows_partner(999999))

def test_eligible_partner_ids_consent_then_filter(self):
self.test_sender.write({"legal_basis": "legal_obligation"})
self.assertEqual(self._make_sub()._eligible_partner_ids([1, 2, 3]), [1, 2, 3])
# Present-but-unevaluable filter drops everything even with consent.
self.assertEqual(self._make_sub(filter_expression="{}")._eligible_partner_ids([1, 2, 3]), [])

def test_delete_records_for_scopes_by_eligibility(self):
sub = self._make_sub(event_type="delete")
payloads = [
{"identifiers": [{"identifier_value": "A"}], "eligible_subscription_ids": [sub.id]},
{"identifiers": [{"identifier_value": "B"}], "eligible_subscription_ids": [sub.id + 999]},
]
self.assertEqual(sub._delete_records_for(payloads), [{"identifiers": [{"identifier_value": "A"}]}])

def test_notify_event_delete_scoped_to_eligible(self):
sub = self._make_sub(event_type="delete")
with patch.object(type(self.Subscription), "with_delay") as mock_delay:
mock_delay.return_value = MagicMock()
self.Subscription.notify_event(
"delete",
[],
"SOCIAL_REGISTRY",
delete_payloads=[{"identifiers": [], "eligible_subscription_ids": [sub.id]}],
)
mock_delay.assert_called()

with patch.object(type(self.Subscription), "with_delay") as mock_delay_none:
self.Subscription.notify_event(
"delete",
[],
"SOCIAL_REGISTRY",
delete_payloads=[{"identifiers": [], "eligible_subscription_ids": [sub.id + 999]}],
)
mock_delay_none.assert_not_called()

def test_build_notification_structure(self):
"""Test notification envelope structure."""
sub = self.Subscription.create(
Expand Down
1 change: 1 addition & 0 deletions spp_dci_server_social/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Part of OpenSPP. See LICENSE file for full copyright and licensing details.

from . import fastapi_endpoint_social
from . import dci_subscription_social
from . import res_partner_dci_notify
115 changes: 115 additions & 0 deletions spp_dci_server_social/models/dci_subscription_social.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
"""Social Registry scoping for DCI event subscriptions.

Implements the registry-specific hooks the generic subscription model uses to
deliver notifications per subscriber: evaluating the subscription's stored
filter against a registrant, and building the DCI record payload with the
subscription's sender context (so consent/sender-scoped serialization applies).
"""

import json
import logging
from types import SimpleNamespace

from odoo import models

_logger = logging.getLogger(__name__)

_SOCIAL = "SOCIAL_REGISTRY"


class DCISubscriptionSocial(models.Model):
_inherit = "spp.dci.subscription"

def _filter_matching_partners(self, partner_ids):
"""Evaluate the subscription's filter_expression against registrants in batch.

No filter -> all. Otherwise the stored filter (with its filter_type
discriminator) is compiled to a domain via the Social Registry search
service and intersected with partner_ids in a single query. Any
parse/eval failure is treated as matching NOTHING (fail closed) so an
unparseable filter never widens delivery.
"""
self.ensure_one()
if self.reg_type != _SOCIAL:
return super()._filter_matching_partners(partner_ids)
if not self.filter_expression:
return list(partner_ids or [])
if not partner_ids:
return []

try:
raw_filter = json.loads(self.filter_expression)
except Exception as e:
_logger.warning(
"Subscription %s: unparseable filter, dropping records (fail-closed): %s",
self.subscription_code,
e,
)
return []

# The SPDCI "match all" wildcard means no real filter -> deliver all
# (consent still gates non-bypass senders).
if self._is_match_all_filter(raw_filter):
return list(partner_ids)

# A real filter needs its discriminator to be interpreted correctly.
# Missing/unknown filter_type must NOT be guessed: defaulting to
# "expression" silently collapses an idtype-value filter to "all
# registrants" (over-delivery). Fail closed instead.
query_type = (self.filter_type or "").strip().lower()
if query_type not in ("idtype-value", "expression", "predicate"):
_logger.warning(
"Subscription %s: filter present but filter_type is %r; dropping records (fail-closed)",
self.subscription_code,
self.filter_type,
)
return []

try:
from ..services.search_service import DCISocialSearchService

criteria = SimpleNamespace(query_type=query_type, query=raw_filter)
service = DCISocialSearchService(self.env, self.sender_id)
domain = service._build_domain(criteria)
# sudo: matching the sender's declared filter against the registry;
# consent/authorization is enforced separately by _consent_allows_partner.
# nosemgrep: odoo-sudo-on-sensitive-models, odoo-sudo-without-context
Partner = self.env["res.partner"].sudo()
return Partner.search([("id", "in", list(partner_ids))] + domain).ids
except Exception as e:
_logger.warning(
"Subscription %s: could not evaluate filter, dropping records (fail-closed): %s",
self.subscription_code,
e,
)
return []

@staticmethod
def _is_match_all_filter(raw_filter):
"""SPDCI clients use {"type": "*", "value": "*"} to mean 'all events'."""
return isinstance(raw_filter, dict) and raw_filter.get("type") == "*" and raw_filter.get("value") == "*"

def _build_notification_records(self, partner_ids):
"""Build DCI records for partner_ids using this subscription's sender.

Called only for already-eligible partners (consent + filter checked by
the generic layer). Builds with sender context so any sender-scoped
serialization applies.
"""
self.ensure_one()
if self.reg_type != _SOCIAL:
return super()._build_notification_records(partner_ids)

from ..services.search_service import DCISocialSearchService

service = DCISocialSearchService(self.env, self.sender_id)
partners = self.env["res.partner"].browse(partner_ids).exists()
records = []
for partner in partners:
try:
dci_record = service._to_dci_group(partner) if partner.is_group else service._to_dci_person(partner)
records.append(dci_record.model_dump(mode="json", by_alias=True, exclude_none=True))
except Exception as e:
_logger.warning("Failed to convert partner %d to DCI format: %s", partner.id, e)
return records
Loading
Loading