forked from ietf-tools/datatracker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstorage.py
More file actions
96 lines (80 loc) · 3.18 KB
/
storage.py
File metadata and controls
96 lines (80 loc) · 3.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# Copyright The IETF Trust 2025, All Rights Reserved
from typing import Optional
from django.core.exceptions import SuspiciousFileOperation
from django.core.files.base import ContentFile
from django.core.files.storage import Storage
from django.db.models.functions import Length
from django.utils.deconstruct import deconstructible
from django.utils import timezone
from ietf.utils.storage import MetadataFile
from .models import Blob
class BlobFile(MetadataFile):
def __init__(self, content, name=None, mtime=None, content_type=""):
super().__init__(
file=ContentFile(content),
name=name,
mtime=mtime,
content_type=content_type,
)
@deconstructible
class BlobdbStorage(Storage):
def __init__(self, bucket_name: Optional[str]=None):
if bucket_name is None:
raise ValueError("BlobdbStorage bucket_name must be specified")
self.bucket_name = bucket_name
def get_queryset(self):
return Blob.objects.filter(bucket=self.bucket_name)
def delete(self, name):
blob = self.get_queryset().filter(name=name).first()
if blob is not None:
blob.delete()
def exists(self, name):
return self.get_queryset().filter(name=name).exists()
def size(self, name):
sizes = (
self.get_queryset()
.filter(name=name)
.annotate(object_size=Length("content"))
.values_list("object_size", flat=True)
)
if len(sizes) == 0:
raise FileNotFoundError(
f"No object '{name}' exists in bucket '{self.bucket_name}'"
)
return sizes[0] # unique constraint guarantees 0 or 1 entry
def _open(self, name, mode="rb"):
try:
blob = self.get_queryset().get(name=name)
except Blob.DoesNotExist:
raise FileNotFoundError(
f"No object '{name}' exists in bucket '{self.bucket_name}'"
)
return BlobFile(
content=blob.content,
name=blob.name,
mtime=blob.mtime or blob.modified, # fall back to modified time
content_type=blob.content_type,
)
def _save(self, name, content):
"""Perform the save operation
The storage API allows _save() to save to a different name than was requested. This method will
never do that, instead overwriting the existing blob.
"""
Blob.objects.update_or_create(
name=name,
bucket=self.bucket_name,
defaults={
"content": content.read(),
"modified": timezone.now(),
"mtime": getattr(content, "mtime", None),
"content_type": getattr(content, "content_type", ""),
},
)
return name
def get_available_name(self, name, max_length=None):
if max_length is not None and len(name) > max_length:
raise SuspiciousFileOperation(
f"BlobdbStorage only allows names up to {max_length}, but was"
f"asked to store the name '{name[:5]}...{name[-5:]} of length {len(name)}"
)
return name # overwrite is permitted