Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,5 @@ Pipfile

# Version file generated by setuptools-scm
morango/_version.py

graphify-out/
1 change: 1 addition & 0 deletions morango/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,5 +178,6 @@ class Meta:
"profile",
"rmcb_list",
"_self_ref_fk",
"_self_ref_order",
)
read_only_fields = fields
1 change: 1 addition & 0 deletions morango/constants/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
ALLOW_CERTIFICATE_PUSHING = "ALLOW_CERTIFICATE_PUSHING"
ASYNC_OPERATIONS = "ASYNC_OPERATIONS"
FSIC_V2_FORMAT = "FSIC_V2_FORMAT"
SELF_REF_ORDER = "SELF_REF_ORDER"
24 changes: 24 additions & 0 deletions morango/migrations/0004_self_ref_order.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Generated by Django 3.2.25 on 2026-04-22 10:53

import django.core.validators
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('morango', '0003_store_deserialization_errors'),
]

operations = [
migrations.AddField(
model_name='buffer',
name='_self_ref_order',
field=models.IntegerField(blank=True, null=True, validators=[django.core.validators.MinValueValidator(0)]),
),
migrations.AddField(
model_name='store',
name='_self_ref_order',
field=models.IntegerField(blank=True, null=True, validators=[django.core.validators.MinValueValidator(0)]),
),
]
5 changes: 5 additions & 0 deletions morango/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from functools import reduce

from django.core import exceptions
from django.core.validators import MinValueValidator
from django.db import connection, models, router, transaction
from django.db.models import F, Func, Max, Q, TextField, Value, signals
from django.db.models.deletion import Collector
Expand Down Expand Up @@ -391,6 +392,9 @@ class AbstractStore(models.Model):
conflicting_serialized_data = models.TextField(blank=True)

_self_ref_fk = models.CharField(max_length=32, blank=True)
_self_ref_order = models.IntegerField(
blank=True, null=True, validators=[MinValueValidator(0)]
)

class Meta:
abstract = True
Expand Down Expand Up @@ -786,6 +790,7 @@ class SyncableModel(UUIDModelMixin):

_morango_internal_fields_not_to_serialize = ("_morango_dirty_bit",)
morango_model_dependencies = ()
morango_ordering = ()
morango_fields_not_to_serialize = ()
morango_profile = None

Expand Down
45 changes: 40 additions & 5 deletions morango/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

import inspect
import sys
from collections import OrderedDict
from typing import Generator
from collections import OrderedDict, defaultdict
from typing import Generator, Optional

from django.db.models import QuerySet
from django.db.models import F, QuerySet
from django.db.models.fields.related import ForeignKey

from morango.constants import transfer_stages
Expand All @@ -17,7 +17,9 @@
ModelRegistryNotReady,
UnsupportedFieldType,
)
from morango.utils import SETTINGS, do_import
from morango.utils import SETTINGS, do_import, self_referential_fk

_UNSET = object()


def _get_foreign_key_classes(m):
Expand Down Expand Up @@ -57,6 +59,7 @@ def __init__(self):
self.profile_models = {}
self.ready = False
self.models_ready = {}
self.self_referential_fks = defaultdict(dict)
if hasattr(sys.modules[__name__], "syncable_models"):
raise RuntimeError("Master registry has already been initialized.")

Expand All @@ -79,13 +82,45 @@ def get_models(self, profile):
self.check_models_ready(profile)
return list(self.profile_models.get(profile, {}).values())

def get_self_referential_fk(self, model) -> Optional[str]:
"""
Cached helper for determining a syncable model's self-referential foreign key attribute name
:param model: The Morango syncable model
:type model: Type[MorangoSyncableModel]
"""
profile_self_ref_fks = self.self_referential_fks[model.morango_profile]
model_self_ref_fk = profile_self_ref_fks.get(model.morango_model_name, _UNSET)
if model_self_ref_fk is _UNSET:
model_self_ref_fk = self_referential_fk(model)
profile_self_ref_fks[model.morango_model_name] = model_self_ref_fk
return model_self_ref_fk

def get_model_querysets(self, profile) -> Generator[QuerySet, None, None]:
"""
Method for future enhancement to iterate over model's and their querysets in a fashion
(particularly, an order) that is aware of FK dependencies.
"""
for model in self.get_models(profile):
yield model.syncing_objects.all()
queryset = model.syncing_objects.all()
ordering = getattr(model, "morango_ordering", ())
if ordering:
queryset = queryset.order_by(*self._get_nulls_last_ordering(ordering))
yield queryset

@staticmethod
def _get_nulls_last_ordering(ordering):
normalized = []
for order_expr in ordering:
if isinstance(order_expr, str):
descending = order_expr.startswith("-")
field_name = order_expr[1:] if descending else order_expr
if descending:
normalized.append(F(field_name).desc(nulls_last=True))
else:
normalized.append(F(field_name).asc(nulls_last=True))
else:
normalized.append(order_expr)
return normalized

def _insert_model_in_dependency_order(self, model, profile):
# When we add models to be synced, we need to make sure
Expand Down
9 changes: 8 additions & 1 deletion morango/sync/backends/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ def _dequeuing_merge_conflict_buffer(self, cursor, current_id, transfersession_i
conflicting_serialized_data,
dirty_bit,
_self_ref_fk,
_self_ref_order,
deserialization_error,
deserialization_exception,
last_transfer_session_id
Expand All @@ -233,6 +234,7 @@ def _dequeuing_merge_conflict_buffer(self, cursor, current_id, transfersession_i
CASE buffer.hard_deleted WHEN TRUE THEN '' ELSE buffer.serialized || '\n' || store.conflicting_serialized_data END,
TRUE,
store._self_ref_fk,
store._self_ref_order,
NULL,
NULL,
'{transfer_session_id}'
Expand Down Expand Up @@ -320,7 +322,8 @@ def _dequeuing_insert_remaining_buffer(self, cursor, transfersession_id):
buffer.partition,
buffer.source_id,
buffer.conflicting_serialized_data,
buffer._self_ref_fk
buffer._self_ref_fk,
buffer._self_ref_order
FROM {buffer} as buffer
WHERE buffer.transfer_session_id = '{transfer_session_id}'
),
Expand All @@ -339,6 +342,7 @@ def _dequeuing_insert_remaining_buffer(self, cursor, transfersession_id):
conflicting_serialized_data,
dirty_bit,
_self_ref_fk,
_self_ref_order,
deserialization_error,
deserialization_exception,
last_transfer_session_id
Expand All @@ -355,6 +359,7 @@ def _dequeuing_insert_remaining_buffer(self, cursor, transfersession_id):
nv.conflicting_serialized_data,
TRUE,
nv._self_ref_fk,
nv._self_ref_order,
NULL,
NULL,
'{transfer_session_id}'
Expand All @@ -377,6 +382,7 @@ def _dequeuing_insert_remaining_buffer(self, cursor, transfersession_id):
conflicting_serialized_data,
dirty_bit,
_self_ref_fk,
_self_ref_order,
deserialization_error,
deserialization_exception,
last_transfer_session_id
Expand All @@ -395,6 +401,7 @@ def _dequeuing_insert_remaining_buffer(self, cursor, transfersession_id):
ut.conflicting_serialized_data,
TRUE,
ut._self_ref_fk,
ut._self_ref_order,
NULL,
NULL,
'{transfer_session_id}'
Expand Down
4 changes: 4 additions & 0 deletions morango/sync/backends/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def _dequeuing_merge_conflict_buffer(self, cursor, current_id, transfersession_i
conflicting_serialized_data,
dirty_bit,
_self_ref_fk,
_self_ref_order,
deserialization_error,
deserialization_exception,
last_transfer_session_id
Expand All @@ -175,6 +176,7 @@ def _dequeuing_merge_conflict_buffer(self, cursor, current_id, transfersession_i
),
1,
store._self_ref_fk,
store._self_ref_order,
NULL,
NULL,
'{transfer_session_id}'
Expand Down Expand Up @@ -244,6 +246,7 @@ def _dequeuing_insert_remaining_buffer(self, cursor, transfersession_id):
conflicting_serialized_data,
dirty_bit,
_self_ref_fk,
_self_ref_order,
deserialization_error,
deserialization_exception,
last_transfer_session_id
Expand All @@ -262,6 +265,7 @@ def _dequeuing_insert_remaining_buffer(self, cursor, transfersession_id):
buffer.conflicting_serialized_data,
1,
buffer._self_ref_fk,
buffer._self_ref_order,
NULL,
NULL,
'{transfer_session_id}'
Expand Down
53 changes: 47 additions & 6 deletions morango/sync/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@
from django.db import transaction
from django.db.models import CharField
from django.db.models import F
from django.db.models import Exists
from django.db.models import OuterRef
from django.db.models import Q
from django.db.models import Subquery
from django.db.models import signals
from django.db.models import Value
from django.db.models.fields import BooleanField
from django.db.models.functions import NullIf
from django.db.models.functions import NullIf, Cast
from django.db.utils import IntegrityError
from django.db.utils import OperationalError
from django.utils import timezone
Expand All @@ -25,6 +28,7 @@
from morango.constants import transfer_statuses
from morango.constants.capabilities import ASYNC_OPERATIONS
from morango.constants.capabilities import FSIC_V2_FORMAT
from morango.constants.capabilities import SELF_REF_ORDER
from morango.errors import MorangoDirtyParent
from morango.errors import MorangoInvalidFSICPartition
from morango.errors import MorangoLimitExceeded
Expand Down Expand Up @@ -525,7 +529,7 @@ def _queue_into_buffer_v1(transfersession):
"""SELECT
id, serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, profile,
partition, source_id, conflicting_serialized_data,
CAST ('{transfer_session_id}' AS {transfer_session_id_type}), _self_ref_fk
CAST ('{transfer_session_id}' AS {transfer_session_id_type}), _self_ref_fk, _self_ref_order
FROM {store} WHERE {condition}
""".format(
transfer_session_id=transfersession.id,
Expand Down Expand Up @@ -556,7 +560,7 @@ def _queue_into_buffer_v1(transfersession):
"""INSERT INTO {outgoing_buffer}
(model_uuid, serialized, deleted, last_saved_instance, last_saved_counter,
hard_deleted, model_name, profile, partition, source_id, conflicting_serialized_data,
transfer_session_id, _self_ref_fk)
transfer_session_id, _self_ref_fk, _self_ref_order)
{select}
""".format(
outgoing_buffer=Buffer._meta.db_table,
Expand Down Expand Up @@ -674,7 +678,7 @@ def _queue_into_buffer_v2(transfersession, chunk_size=200):
"""SELECT
id, serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, profile,
partition, source_id, conflicting_serialized_data,
CAST ('{transfer_session_id}' AS {transfer_session_id_type}), _self_ref_fk
CAST ('{transfer_session_id}' AS {transfer_session_id_type}), _self_ref_fk, _self_ref_order
FROM {store} WHERE {condition}
""".format(
transfer_session_id=transfersession.id,
Expand Down Expand Up @@ -703,7 +707,7 @@ def _queue_into_buffer_v2(transfersession, chunk_size=200):
"""INSERT INTO {outgoing_buffer}
(model_uuid, serialized, deleted, last_saved_instance, last_saved_counter,
hard_deleted, model_name, profile, partition, source_id, conflicting_serialized_data,
transfer_session_id, _self_ref_fk)
transfer_session_id, _self_ref_fk, _self_ref_order)
{select}
""".format(
outgoing_buffer=Buffer._meta.db_table,
Expand All @@ -721,7 +725,41 @@ def _queue_into_buffer_v2(transfersession, chunk_size=200):
)


def _dequeue_into_store(transfer_session, fsic, v2_format=False):
def _update_legacy_self_ref_order_for_model(queryset):
# root nodes set the _self_ref_order to 0
queryset.filter(_self_ref_fk="").exclude(_self_ref_order=0).update(_self_ref_order=0)
# reset the _self_ref_order to None for all records that have a parent
queryset.exclude(_self_ref_fk="").exclude(_self_ref_order=None).update(
_self_ref_order=None
)

parent = Store.objects.filter(
id=Cast(OuterRef("_self_ref_fk"), UUIDField()),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the Cast here, as postgres complained about a mismatch in type.

_self_ref_order__isnull=False,
)
parent_order = parent.values("_self_ref_order")[:1]
pending = queryset.exclude(_self_ref_fk="").filter(_self_ref_order=None)

while pending.filter(Exists(parent)).update(_self_ref_order=Subquery(parent_order) + 1):
pass


def _update_legacy_self_ref_order(transfer_session):
profile = transfer_session.sync_session.profile
transferred_store_records = Store.objects.filter(
last_transfer_session_id=transfer_session.id,
profile=profile,
)

for Model in syncable_models.get_models(profile):
queryset = transferred_store_records.filter(model_name=Model.morango_model_name)
if self_referential_fk(Model):
_update_legacy_self_ref_order_for_model(queryset)
else:
queryset.exclude(_self_ref_order=None).update(_self_ref_order=None)


def _dequeue_into_store(transfer_session, fsic, v2_format=False, self_ref_order=True):
"""
Takes data from the buffers and merges into the store and record max counters.

Expand All @@ -745,6 +783,8 @@ def _dequeue_into_store(transfer_session, fsic, v2_format=False):
DBBackend._dequeuing_delete_mc_buffer(cursor, transfer_session.id)
DBBackend._dequeuing_insert_remaining_buffer(cursor, transfer_session.id)
DBBackend._dequeuing_insert_remaining_rmcb(cursor, transfer_session.id)
if not self_ref_order:
_update_legacy_self_ref_order(transfer_session)
DBBackend._dequeuing_delete_remaining_rmcb(cursor, transfer_session.id)
DBBackend._dequeuing_delete_remaining_buffer(cursor, transfer_session.id)

Expand Down Expand Up @@ -1083,6 +1123,7 @@ def handle(self, context):
context.transfer_session,
fsic,
v2_format=FSIC_V2_FORMAT in context.capabilities,
self_ref_order=SELF_REF_ORDER in context.capabilities,
)

return transfer_statuses.COMPLETED
Expand Down
Loading
Loading