forked from ietf-tools/datatracker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodels.py
More file actions
118 lines (98 loc) · 3.91 KB
/
models.py
File metadata and controls
118 lines (98 loc) · 3.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# Copyright The IETF Trust 2025, All Rights Reserved
import json
from functools import partial
from hashlib import sha384
from django.db import models, transaction
from django.utils import timezone
from .apps import get_blobdb
from .replication import replication_enabled
from .tasks import pybob_the_blob_replicator_task
class BlobQuerySet(models.QuerySet):
"""QuerySet customized for Blob management
Operations that bypass save() / delete() won't correctly notify watchers of changes
to the blob contents. Disallow them.
"""
def delete(self):
raise NotImplementedError("Only deleting individual Blobs is supported")
def bulk_create(self, *args, **kwargs):
raise NotImplementedError("Only creating individual Blobs is supported")
def update(self, *args, **kwargs):
# n.b., update_or_create() _does_ call save()
raise NotImplementedError("Updating BlobQuerySets is not supported")
def bulk_update(self, *args, **kwargs):
raise NotImplementedError("Updating Blobs in bulk is not supported")
class Blob(models.Model):
objects = BlobQuerySet.as_manager()
name = models.CharField(max_length=1024, help_text="Name of the blob")
bucket = models.CharField(
max_length=1024, help_text="Name of the bucket containing this blob"
)
modified = models.DateTimeField(
default=timezone.now, help_text="Last modification time of the blob"
)
content = models.BinaryField(help_text="Content of the blob")
checksum = models.CharField(
max_length=96, help_text="SHA-384 digest of the content", editable=False
)
mtime = models.DateTimeField(
default=None,
blank=True,
null=True,
help_text="mtime associated with the blob as a filesystem object",
)
content_type = models.CharField(
max_length=1024,
blank=True,
help_text="content-type header value for the blob contents",
)
class Meta:
constraints = [
models.UniqueConstraint(
fields=["bucket", "name"], name="unique_name_per_bucket"
),
]
def save(self, **kwargs):
db = get_blobdb()
with transaction.atomic(using=db):
self.checksum = sha384(self.content, usedforsecurity=False).hexdigest()
super().save(**kwargs)
self._emit_blob_change_event(using=db)
def delete(self, **kwargs):
db = get_blobdb()
with transaction.atomic(using=db):
retval = super().delete(**kwargs)
self._emit_blob_change_event(using=db)
return retval
def _emit_blob_change_event(self, using=None):
if not replication_enabled(self.bucket):
return
# For now, fire a celery task we've arranged to guarantee in-order processing.
# Later becomes pushing an event onto a queue to a dedicated worker.
transaction.on_commit(
partial(
pybob_the_blob_replicator_task.delay,
json.dumps(
{
"name": self.name,
"bucket": self.bucket,
}
)
),
using=using,
)
class ResolvedMaterial(models.Model):
# A Document name can be 255 characters; allow this name to be a bit longer
name = models.CharField(max_length=300, help_text="Name to resolve")
meeting_number = models.CharField(
max_length=64, help_text="Meeting material is related to"
)
bucket = models.CharField(max_length=255, help_text="Resolved bucket name")
blob = models.CharField(max_length=300, help_text="Resolved blob name")
class Meta:
constraints = [
models.UniqueConstraint(
fields=["name", "meeting_number"], name="unique_name_per_meeting"
)
]
def __str__(self):
return f"{self.name}@{self.meeting_number} -> {self.bucket}:{self.blob}"