Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion ietf/blobdb/admin.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# Copyright The IETF Trust 2025, All Rights Reserved
# Copyright The IETF Trust 2025-2026, All Rights Reserved
from django.contrib import admin
from django.db.models import QuerySet
from django.db.models.functions import Length
from rangefilter.filters import DateRangeQuickSelectListFilterBuilder

from .apps import get_blobdb
from .models import Blob, ResolvedMaterial
from .utils import queue_for_replication


@admin.register(Blob)
Expand All @@ -17,6 +20,7 @@ class BlobAdmin(admin.ModelAdmin):
]
search_fields = ["name"]
list_display_links = ["name"]
actions = ["replicate_blob"]

def get_queryset(self, request):
return (
Expand All @@ -30,6 +34,20 @@ def object_size(self, instance):
"""Get the size of the object"""
return instance.object_size # annotation added in get_queryset()

@admin.action(description="Replicate blobs")
def replicate_blob(self, request, queryset: QuerySet[Blob]):
blob_count = 0
for blob in queryset.all():
if isinstance(blob, Blob):
queue_for_replication(
bucket=blob.bucket, name=blob.name, using=get_blobdb()
)
blob_count += 1
self.message_user(
request,
f"Queued replication of a total of {blob_count} Blob(s)",
)


@admin.register(ResolvedMaterial)
class ResolvedMaterialAdmin(admin.ModelAdmin):
Expand Down
27 changes: 4 additions & 23 deletions ietf/blobdb/models.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
# Copyright The IETF Trust 2025, All Rights Reserved
import json
from functools import partial
# Copyright The IETF Trust 2025-2026, All Rights Reserved
from hashlib import sha384

from django.db import models, transaction
from django.utils import timezone

from .apps import get_blobdb
from .replication import replication_enabled
from .tasks import pybob_the_blob_replicator_task
from .utils import queue_for_replication


class BlobQuerySet(models.QuerySet):
Expand Down Expand Up @@ -81,24 +78,8 @@ def delete(self, **kwargs):
self._emit_blob_change_event(using=db)
return retval

def _emit_blob_change_event(self, using=None):
if not replication_enabled(self.bucket):
return

# For now, fire a celery task we've arranged to guarantee in-order processing.
# Later becomes pushing an event onto a queue to a dedicated worker.
transaction.on_commit(
partial(
pybob_the_blob_replicator_task.delay,
json.dumps(
{
"name": self.name,
"bucket": self.bucket,
}
)
),
using=using,
)
def _emit_blob_change_event(self, using: str | None=None):
queue_for_replication(self.bucket, self.name, using=using)


class ResolvedMaterial(models.Model):
Expand Down
12 changes: 11 additions & 1 deletion ietf/blobdb/storage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright The IETF Trust 2025, All Rights Reserved
# Copyright The IETF Trust 2025-2026, All Rights Reserved
from typing import Optional

from django.core.exceptions import SuspiciousFileOperation
Expand All @@ -10,6 +10,7 @@

from ietf.utils.storage import MetadataFile
from .models import Blob
from .utils import queue_for_replication


class BlobFile(MetadataFile):
Expand Down Expand Up @@ -94,3 +95,12 @@ def get_available_name(self, name, max_length=None):
f"asked to store the name '{name[:5]}...{name[-5:]} of length {len(name)}"
)
return name # overwrite is permitted

def force_replication(self, name: str):
"""Force replication of a blob by name

Be careful with this - replication includes replicating deletion of blobs, so
if you call it with a name that does not exist in blobdb, it will be removed
from R2 if it exists there!
"""
queue_for_replication(bucket=self.bucket_name, name=name)
32 changes: 32 additions & 0 deletions ietf/blobdb/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright The IETF Trust 2026, All Rights Reserved
import json
from functools import partial

from django.db import transaction

from ietf.blobdb.replication import replication_enabled
from ietf.blobdb.tasks import pybob_the_blob_replicator_task


def queue_for_replication(bucket: str, name: str, using: str | None=None):
"""Queue a blob for replication

This is private to the blobdb app. Do not call it directly from other apps.
"""
if not replication_enabled(bucket):
return

# For now, fire a celery task we've arranged to guarantee in-order processing.
# Later becomes pushing an event onto a queue to a dedicated worker.
transaction.on_commit(
partial(
pybob_the_blob_replicator_task.delay,
json.dumps(
{
"name": name,
"bucket": bucket,
}
)
),
using=using,
)
40 changes: 38 additions & 2 deletions ietf/doc/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from django.contrib import admin
from django.db import models
from django import forms
from django.db.models import QuerySet
from rangefilter.filters import DateRangeQuickSelectListFilterBuilder

from .models import (StateType, State, RelatedDocument, DocumentAuthor, Document, RelatedDocHistory,
Expand All @@ -18,6 +19,9 @@

from ietf.utils.admin import SaferTabularInline
from ietf.utils.validators import validate_external_resource_value
from .storage_utils import force_replication
from .utils import replicate_stored_objects_for_document


class StateTypeAdmin(admin.ModelAdmin):
list_display = ["slug", "label"]
Expand Down Expand Up @@ -73,14 +77,17 @@ class DocumentAuthorAdmin(admin.ModelAdmin):
search_fields = ['document__name', 'person__name', 'email__address', 'affiliation', 'country']
raw_id_fields = ["document", "person", "email"]
admin.site.register(DocumentAuthor, DocumentAuthorAdmin)




class DocumentAdmin(admin.ModelAdmin):
list_display = ['name', 'rev', 'group', 'pages', 'intended_std_level', 'author_list', 'time']
search_fields = ['name']
list_filter = ['type']
raw_id_fields = ['group', 'shepherd', 'ad']
inlines = [DocAuthorInline, DocActionHolderInline, RelatedDocumentInline, AdditionalUrlInLine]
form = DocumentForm
actions = ["replicate_stored_objects"]

def save_model(self, request, obj, form, change):
e = DocEvent.objects.create(
Expand All @@ -95,6 +102,22 @@ def save_model(self, request, obj, form, change):
def state(self, instance):
return self.get_state()

@admin.action(description="Replicate related blobs")
def replicate_stored_objects(self, request, queryset: QuerySet[Document]):
doc_count = 0
stored_obj_count = 0
for doc in queryset.all():
doc_count += 1
if isinstance(doc, Document):
stored_obj_count += replicate_stored_objects_for_document(doc)
self.message_user(
request,
(
f"Queued replication of a total of {stored_obj_count} StoredObject(s) "
f"for {doc_count} Document(s)"
)
)

admin.site.register(Document, DocumentAdmin)

class DocHistoryAdmin(admin.ModelAdmin):
Expand Down Expand Up @@ -232,11 +255,24 @@ class StoredObjectAdmin(admin.ModelAdmin):
]
search_fields = ['name', 'doc_name', 'doc_rev']
list_display_links = ['name']
actions = ["replicate_stored_object"]

@admin.display(boolean=True, description="Deleted?", ordering="deleted")
def is_deleted(self, instance):
return instance.deleted is not None


@admin.action(description="Replicate related blobs")
def replicate_stored_object(self, request, queryset: QuerySet[StoredObject]):
stored_obj_count = 0
for stored_object in queryset.all():
if isinstance(stored_object, StoredObject):
force_replication(kind=stored_object.store, name=stored_object.name)
stored_obj_count += 1
self.message_user(
request,
f"Queued replication of a total of {stored_obj_count} StoredObject(s)",
)


admin.site.register(StoredObject, StoredObjectAdmin)

Expand Down
9 changes: 9 additions & 0 deletions ietf/doc/storage_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,12 @@ def retrieve_str(kind: str, name: str) -> str:
log(f"Blobstore Error: Failed to read string from {kind}:{name}: {repr(err)}")
raise
return content


def force_replication(kind: str, name: str):
if not settings.ENABLE_BLOBSTORAGE:
return
storage = _get_storage(kind)
from ietf.blobdb.storage import BlobdbStorage
if isinstance(storage, BlobdbStorage):
storage.force_replication(name)
25 changes: 24 additions & 1 deletion ietf/doc/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@
DocHistoryAuthor,
Document,
DocumentAuthor,
EditedRfcAuthorsDocEvent,
RfcAuthor,
State, EditedRfcAuthorsDocEvent,
State,
StoredObject,
)
from ietf.doc.models import RelatedDocument, RelatedDocHistory, BallotType, DocReminder
from ietf.doc.models import DocEvent, ConsensusDocEvent, BallotDocEvent, IRSGBallotDocEvent, NewRevisionDocEvent, StateDocEvent
from ietf.doc.models import TelechatDocEvent, DocumentActionHolder, EditedAuthorsDocEvent, BallotPositionDocEvent
from ietf.doc.storage_utils import force_replication
from ietf.name.models import DocReminderTypeName, DocRelationshipName
from ietf.group.models import Role, Group, GroupFeatures
from ietf.ietfauth.utils import has_role, is_authorized_in_doc_stream, is_individual_draft_author, is_bofreq_editor
Expand Down Expand Up @@ -1713,3 +1716,23 @@ def update_or_create_draft_bibxml_file(doc, rev):

def ensure_draft_bibxml_path_exists():
(Path(settings.BIBXML_BASE_PATH) / "bibxml-ids").mkdir(exist_ok=True)


def replicate_stored_objects_for_document(doc: Document) -> int:
"""Sync all StoredObjects associated with doc to the replica blob store

Returns count of StoredObjects queued for replication (which may or may not
be replicated, depending on whether replication is enabled / the storages are
actually BlobdbStorage instances, etc).
"""
# n.b., StoredObjects have a nullable doc_rev field, but Documents do not.
# Until / unless we straighten that out, treat "" and None equivalently when
# matching rev.
qs_matching_rev = StoredObject.objects.filter(doc_rev=doc.rev)
if doc.rev == "":
qs_matching_rev |= StoredObject.objects.filter(doc_rev__isnull=True)
count = 0
for stored_object in qs_matching_rev.filter(doc_name=doc.name):
force_replication(kind=stored_object.store, name=stored_object.name)
count += 1
return count
Loading