diff --git a/ietf/blobdb/admin.py b/ietf/blobdb/admin.py index 3e1a2a311f..44a30d1d7f 100644 --- a/ietf/blobdb/admin.py +++ b/ietf/blobdb/admin.py @@ -1,9 +1,12 @@ -# Copyright The IETF Trust 2025, All Rights Reserved +# Copyright The IETF Trust 2025-2026, All Rights Reserved from django.contrib import admin +from django.db.models import QuerySet from django.db.models.functions import Length from rangefilter.filters import DateRangeQuickSelectListFilterBuilder +from .apps import get_blobdb from .models import Blob, ResolvedMaterial +from .utils import queue_for_replication @admin.register(Blob) @@ -17,6 +20,7 @@ class BlobAdmin(admin.ModelAdmin): ] search_fields = ["name"] list_display_links = ["name"] + actions = ["replicate_blob"] def get_queryset(self, request): return ( @@ -30,6 +34,20 @@ def object_size(self, instance): """Get the size of the object""" return instance.object_size # annotation added in get_queryset() + @admin.action(description="Replicate blobs") + def replicate_blob(self, request, queryset: QuerySet[Blob]): + blob_count = 0 + for blob in queryset.all(): + if isinstance(blob, Blob): + queue_for_replication( + bucket=blob.bucket, name=blob.name, using=get_blobdb() + ) + blob_count += 1 + self.message_user( + request, + f"Queued replication of a total of {blob_count} Blob(s)", + ) + @admin.register(ResolvedMaterial) class ResolvedMaterialAdmin(admin.ModelAdmin): diff --git a/ietf/blobdb/models.py b/ietf/blobdb/models.py index 27325ada5d..6dbb615fa0 100644 --- a/ietf/blobdb/models.py +++ b/ietf/blobdb/models.py @@ -1,14 +1,11 @@ -# Copyright The IETF Trust 2025, All Rights Reserved -import json -from functools import partial +# Copyright The IETF Trust 2025-2026, All Rights Reserved from hashlib import sha384 from django.db import models, transaction from django.utils import timezone from .apps import get_blobdb -from .replication import replication_enabled -from .tasks import pybob_the_blob_replicator_task +from .utils import queue_for_replication class BlobQuerySet(models.QuerySet): @@ -81,24 +78,8 @@ def delete(self, **kwargs): self._emit_blob_change_event(using=db) return retval - def _emit_blob_change_event(self, using=None): - if not replication_enabled(self.bucket): - return - - # For now, fire a celery task we've arranged to guarantee in-order processing. - # Later becomes pushing an event onto a queue to a dedicated worker. - transaction.on_commit( - partial( - pybob_the_blob_replicator_task.delay, - json.dumps( - { - "name": self.name, - "bucket": self.bucket, - } - ) - ), - using=using, - ) + def _emit_blob_change_event(self, using: str | None=None): + queue_for_replication(self.bucket, self.name, using=using) class ResolvedMaterial(models.Model): diff --git a/ietf/blobdb/storage.py b/ietf/blobdb/storage.py index 4213ec801d..e304dabc5d 100644 --- a/ietf/blobdb/storage.py +++ b/ietf/blobdb/storage.py @@ -1,4 +1,4 @@ -# Copyright The IETF Trust 2025, All Rights Reserved +# Copyright The IETF Trust 2025-2026, All Rights Reserved from typing import Optional from django.core.exceptions import SuspiciousFileOperation @@ -10,6 +10,7 @@ from ietf.utils.storage import MetadataFile from .models import Blob +from .utils import queue_for_replication class BlobFile(MetadataFile): @@ -94,3 +95,12 @@ def get_available_name(self, name, max_length=None): f"asked to store the name '{name[:5]}...{name[-5:]} of length {len(name)}" ) return name # overwrite is permitted + + def force_replication(self, name: str): + """Force replication of a blob by name + + Be careful with this - replication includes replicating deletion of blobs, so + if you call it with a name that does not exist in blobdb, it will be removed + from R2 if it exists there! + """ + queue_for_replication(bucket=self.bucket_name, name=name) diff --git a/ietf/blobdb/utils.py b/ietf/blobdb/utils.py new file mode 100644 index 0000000000..93f8f2f521 --- /dev/null +++ b/ietf/blobdb/utils.py @@ -0,0 +1,32 @@ +# Copyright The IETF Trust 2026, All Rights Reserved +import json +from functools import partial + +from django.db import transaction + +from ietf.blobdb.replication import replication_enabled +from ietf.blobdb.tasks import pybob_the_blob_replicator_task + + +def queue_for_replication(bucket: str, name: str, using: str | None=None): + """Queue a blob for replication + + This is private to the blobdb app. Do not call it directly from other apps. + """ + if not replication_enabled(bucket): + return + + # For now, fire a celery task we've arranged to guarantee in-order processing. + # Later becomes pushing an event onto a queue to a dedicated worker. + transaction.on_commit( + partial( + pybob_the_blob_replicator_task.delay, + json.dumps( + { + "name": name, + "bucket": bucket, + } + ) + ), + using=using, + ) diff --git a/ietf/doc/admin.py b/ietf/doc/admin.py index 0d04e8db3a..757d3da9f9 100644 --- a/ietf/doc/admin.py +++ b/ietf/doc/admin.py @@ -5,6 +5,7 @@ from django.contrib import admin from django.db import models from django import forms +from django.db.models import QuerySet from rangefilter.filters import DateRangeQuickSelectListFilterBuilder from .models import (StateType, State, RelatedDocument, DocumentAuthor, Document, RelatedDocHistory, @@ -18,6 +19,9 @@ from ietf.utils.admin import SaferTabularInline from ietf.utils.validators import validate_external_resource_value +from .storage_utils import force_replication +from .utils import replicate_stored_objects_for_document + class StateTypeAdmin(admin.ModelAdmin): list_display = ["slug", "label"] @@ -73,7 +77,9 @@ class DocumentAuthorAdmin(admin.ModelAdmin): search_fields = ['document__name', 'person__name', 'email__address', 'affiliation', 'country'] raw_id_fields = ["document", "person", "email"] admin.site.register(DocumentAuthor, DocumentAuthorAdmin) - + + + class DocumentAdmin(admin.ModelAdmin): list_display = ['name', 'rev', 'group', 'pages', 'intended_std_level', 'author_list', 'time'] search_fields = ['name'] @@ -81,6 +87,7 @@ class DocumentAdmin(admin.ModelAdmin): raw_id_fields = ['group', 'shepherd', 'ad'] inlines = [DocAuthorInline, DocActionHolderInline, RelatedDocumentInline, AdditionalUrlInLine] form = DocumentForm + actions = ["replicate_stored_objects"] def save_model(self, request, obj, form, change): e = DocEvent.objects.create( @@ -95,6 +102,22 @@ def save_model(self, request, obj, form, change): def state(self, instance): return self.get_state() + @admin.action(description="Replicate related blobs") + def replicate_stored_objects(self, request, queryset: QuerySet[Document]): + doc_count = 0 + stored_obj_count = 0 + for doc in queryset.all(): + doc_count += 1 + if isinstance(doc, Document): + stored_obj_count += replicate_stored_objects_for_document(doc) + self.message_user( + request, + ( + f"Queued replication of a total of {stored_obj_count} StoredObject(s) " + f"for {doc_count} Document(s)" + ) + ) + admin.site.register(Document, DocumentAdmin) class DocHistoryAdmin(admin.ModelAdmin): @@ -232,11 +255,24 @@ class StoredObjectAdmin(admin.ModelAdmin): ] search_fields = ['name', 'doc_name', 'doc_rev'] list_display_links = ['name'] + actions = ["replicate_stored_object"] @admin.display(boolean=True, description="Deleted?", ordering="deleted") def is_deleted(self, instance): return instance.deleted is not None - + + @admin.action(description="Replicate related blobs") + def replicate_stored_object(self, request, queryset: QuerySet[StoredObject]): + stored_obj_count = 0 + for stored_object in queryset.all(): + if isinstance(stored_object, StoredObject): + force_replication(kind=stored_object.store, name=stored_object.name) + stored_obj_count += 1 + self.message_user( + request, + f"Queued replication of a total of {stored_obj_count} StoredObject(s)", + ) + admin.site.register(StoredObject, StoredObjectAdmin) diff --git a/ietf/doc/storage_utils.py b/ietf/doc/storage_utils.py index 9c18bb8a8a..c7cc6989cd 100644 --- a/ietf/doc/storage_utils.py +++ b/ietf/doc/storage_utils.py @@ -192,3 +192,12 @@ def retrieve_str(kind: str, name: str) -> str: log(f"Blobstore Error: Failed to read string from {kind}:{name}: {repr(err)}") raise return content + + +def force_replication(kind: str, name: str): + if not settings.ENABLE_BLOBSTORAGE: + return + storage = _get_storage(kind) + from ietf.blobdb.storage import BlobdbStorage + if isinstance(storage, BlobdbStorage): + storage.force_replication(name) diff --git a/ietf/doc/utils.py b/ietf/doc/utils.py index 8cbe5e8f3e..6f32ed454f 100644 --- a/ietf/doc/utils.py +++ b/ietf/doc/utils.py @@ -39,12 +39,15 @@ DocHistoryAuthor, Document, DocumentAuthor, + EditedRfcAuthorsDocEvent, RfcAuthor, - State, EditedRfcAuthorsDocEvent, + State, + StoredObject, ) from ietf.doc.models import RelatedDocument, RelatedDocHistory, BallotType, DocReminder from ietf.doc.models import DocEvent, ConsensusDocEvent, BallotDocEvent, IRSGBallotDocEvent, NewRevisionDocEvent, StateDocEvent from ietf.doc.models import TelechatDocEvent, DocumentActionHolder, EditedAuthorsDocEvent, BallotPositionDocEvent +from ietf.doc.storage_utils import force_replication from ietf.name.models import DocReminderTypeName, DocRelationshipName from ietf.group.models import Role, Group, GroupFeatures from ietf.ietfauth.utils import has_role, is_authorized_in_doc_stream, is_individual_draft_author, is_bofreq_editor @@ -1713,3 +1716,23 @@ def update_or_create_draft_bibxml_file(doc, rev): def ensure_draft_bibxml_path_exists(): (Path(settings.BIBXML_BASE_PATH) / "bibxml-ids").mkdir(exist_ok=True) + + +def replicate_stored_objects_for_document(doc: Document) -> int: + """Sync all StoredObjects associated with doc to the replica blob store + + Returns count of StoredObjects queued for replication (which may or may not + be replicated, depending on whether replication is enabled / the storages are + actually BlobdbStorage instances, etc). + """ + # n.b., StoredObjects have a nullable doc_rev field, but Documents do not. + # Until / unless we straighten that out, treat "" and None equivalently when + # matching rev. + qs_matching_rev = StoredObject.objects.filter(doc_rev=doc.rev) + if doc.rev == "": + qs_matching_rev |= StoredObject.objects.filter(doc_rev__isnull=True) + count = 0 + for stored_object in qs_matching_rev.filter(doc_name=doc.name): + force_replication(kind=stored_object.store, name=stored_object.name) + count += 1 + return count