Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ietf/api/views_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from ietf.doc.storage_utils import remove_from_storage, store_file, exists_in_storage
from ietf.doc.tasks import signal_update_rfc_metadata_task
from ietf.person.models import Email, Person
from ietf.sync.tasks import create_rfc_index_task
from ietf.sync.tasks import create_rfc_index_txt_task, create_rfc_index_xml_task


class Conflict(APIException):
Expand Down Expand Up @@ -530,5 +530,6 @@ class RfcIndexView(APIView):
request=None,
)
def post(self, request):
create_rfc_index_task.delay()
create_rfc_index_txt_task.delay()
create_rfc_index_xml_task.delay()
return Response(status=202)
11 changes: 11 additions & 0 deletions ietf/blobdb/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ def delete(self, name):
def exists(self, name):
return self.get_queryset().filter(name=name).exists()

def save(self, name, content, max_length=None):
# Wrap content in a MetadataFile if needed
if not isinstance(content, MetadataFile):
content = MetadataFile(
content,
name=getattr(content, "name", None),
mtime=getattr(content, "mtime", None),
content_type=getattr(content, "content_type", ""),
)
return super().save(name, content, max_length)

def size(self, name):
sizes = (
self.get_queryset()
Expand Down
10 changes: 10 additions & 0 deletions ietf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,15 @@ def skip_unreadable_post(record):
]

# Other storages

# shared scratch space accessible between containers- think about garbage collection
# before using it
STORAGES["shared_tmp"] = {
"BACKEND": "ietf.doc.storage.StoredObjectBlobdbStorage",
"OPTIONS": {"bucket_name": "shared_tmp"}
}

# the RFC Editor ("red") bucket
STORAGES["red_bucket"] = {
"BACKEND": "django.core.files.storage.InMemoryStorage",
"OPTIONS": {"location": "red_bucket"},
Expand Down Expand Up @@ -1349,6 +1358,7 @@ def skip_unreadable_post(record):
CELERY_RESULT_BACKEND = 'django-cache' # use a Django cache for results
CELERY_CACHE_BACKEND = 'celery-results' # which Django cache to use
CELERY_RESULT_EXPIRES = datetime.timedelta(minutes=5) # how long are results valid? (Default is 1 day)
CELERY_RESULT_CHORD_RETRY_INTERVAL = 5.0 # seconds, default is 1.0
CELERY_TASK_IGNORE_RESULT = True # ignore results unless specifically enabled for a task
CELERY_TASK_ROUTES = {
"ietf.blobdb.tasks.pybob_the_blob_replicator_task": {"queue": "blobdb"}
Expand Down
9 changes: 4 additions & 5 deletions ietf/sync/rfcindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ def add_rfc_xml_index_entries(rfc_index):
entries.append(entry)


def create_rfc_txt_index():
def create_rfc_txt_index() -> bytes:
"""Create text index of published documents"""
DATE_FMT = "%m/%d/%Y"
created_on = timezone.now().strftime(DATE_FMT)
Expand All @@ -443,9 +443,8 @@ def create_rfc_txt_index():
"created_on": created_on,
"rfcs": get_rfc_text_index_entries(),
},
)
save_to_red_bucket("rfc-index.txt", index)

).encode("utf-8")
return index

def create_rfc_xml_index():
"""Create XML index of published documents"""
Expand Down Expand Up @@ -480,4 +479,4 @@ def create_rfc_xml_index():
xml_declaration=True,
pretty_print=4,
)
save_to_red_bucket("rfc-index.xml", pretty_index)
return pretty_index
90 changes: 67 additions & 23 deletions ietf/sync/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,23 @@
from tempfile import NamedTemporaryFile
import requests

from celery import shared_task
from celery import shared_task, group

from django.conf import settings
from django.core.files.base import ContentFile
from django.core.files.storage import storages
from django.utils import timezone

from ietf.doc.models import DocEvent, RelatedDocument
from ietf.doc.tasks import rebuild_reference_relations_task
from ietf.sync import iana
from ietf.sync import rfceditor
from ietf.sync.rfceditor import MIN_QUEUE_RESULTS, parse_queue, update_drafts_from_queue
from ietf.sync.rfcindex import create_rfc_txt_index, create_rfc_xml_index
from ietf.sync.rfcindex import (
create_rfc_txt_index,
create_rfc_xml_index,
save_to_red_bucket,
)
from ietf.sync.utils import build_from_file_content, load_rfcs_into_blobdb, rsync_helper
from ietf.utils import log
from ietf.utils.timezone import date_today
Expand All @@ -27,13 +33,13 @@
@shared_task
def rfc_editor_index_update_task(full_index=False):
"""Update metadata from the RFC index

Default is to examine only changes in the past 365 days. Call with full_index=True to update
the full RFC index.

According to comments on the original script, a year's worth took about 20s on production as of
August 2022

The original rfc-editor-index-update script had a long-disabled provision for running the
rebuild_reference_relations scripts after the update. That has not been brought over
at all because it should be implemented as its own task if it is needed.
Expand All @@ -51,7 +57,7 @@ def rfc_editor_index_update_task(full_index=False):
timeout=30, # seconds
)
except requests.Timeout as exc:
log.log(f'GET request timed out retrieving RFC editor index: {exc}')
log.log(f"GET request timed out retrieving RFC editor index: {exc}")
return # failed
rfc_index_xml = response.text
index_data = rfceditor.parse_index(io.StringIO(rfc_index_xml))
Expand All @@ -61,9 +67,9 @@ def rfc_editor_index_update_task(full_index=False):
timeout=30, # seconds
)
except requests.Timeout as exc:
log.log(f'GET request timed out retrieving RFC editor errata: {exc}')
log.log(f"GET request timed out retrieving RFC editor errata: {exc}")
return # failed
errata_data = response.json()
errata_data = response.json()
if len(index_data) < rfceditor.MIN_INDEX_RESULTS:
log.log("Not enough index entries, only %s" % len(index_data))
return # failed
Expand Down Expand Up @@ -96,15 +102,15 @@ def rfc_editor_queue_updates_task():
drafts, warnings = parse_queue(io.StringIO(response.text))
for w in warnings:
log.log(f"Warning: {w}")

if len(drafts) < MIN_QUEUE_RESULTS:
log.log("Not enough results, only %s" % len(drafts))
return # failed

changed, warnings = update_drafts_from_queue(drafts)
for w in warnings:
log.log(f"Warning: {w}")

for c in changed:
log.log(f"Updated {c}")

Expand All @@ -120,9 +126,11 @@ def iana_changes_update_task():
MAX_INTERVAL_ACCEPTED_BY_IANA = datetime.timedelta(hours=23)

start = (
timezone.now()
- datetime.timedelta(hours=23)
+ datetime.timedelta(seconds=CLOCK_SKEW_COMPENSATION,)
timezone.now()
- datetime.timedelta(hours=23)
+ datetime.timedelta(
seconds=CLOCK_SKEW_COMPENSATION,
)
)
end = start + datetime.timedelta(hours=23)

Expand All @@ -133,7 +141,9 @@ def iana_changes_update_task():
# requests if necessary

text = iana.fetch_changes_json(
settings.IANA_SYNC_CHANGES_URL, t, min(end, t + MAX_INTERVAL_ACCEPTED_BY_IANA)
settings.IANA_SYNC_CHANGES_URL,
t,
min(end, t + MAX_INTERVAL_ACCEPTED_BY_IANA),
)
log.log(f"Retrieved the JSON: {text}")

Expand All @@ -159,9 +169,9 @@ def iana_protocols_update_task():
# "this needs to be the date where this tool is first deployed" in the original
# iana-protocols-updates script)"
rfc_must_published_later_than = datetime.datetime(
2012,
11,
26,
2012,
11,
26,
tzinfo=datetime.UTC,
)

Expand All @@ -171,17 +181,17 @@ def iana_protocols_update_task():
timeout=30,
)
except requests.Timeout as exc:
log.log(f'GET request timed out retrieving IANA protocols page: {exc}')
log.log(f"GET request timed out retrieving IANA protocols page: {exc}")
return

rfc_numbers = iana.parse_protocol_page(response.text)

def batched(l, n):
"""Split list l up in batches of max size n.

For Python 3.12 or later, replace this with itertools.batched()
"""
return (l[i:i + n] for i in range(0, len(l), n))
return (l[i : i + n] for i in range(0, len(l), n))

for batch in batched(rfc_numbers, 100):
updated = iana.update_rfc_log_from_protocol_page(
Expand All @@ -192,6 +202,7 @@ def batched(l, n):
for d in updated:
log.log("Added history entry for %s" % d.display_name())


@shared_task
def fix_subseries_docevents_task():
"""Repairs DocEvents related to bugs around removing docs from subseries
Expand Down Expand Up @@ -233,6 +244,7 @@ def fix_subseries_docevents_task():
time=obsoleting_time
)


@shared_task
def rsync_rfcs_from_rfceditor_task(rfc_numbers: list[int]):
log.log(f"Rsyncing rfcs from rfc-editor: {rfc_numbers}")
Expand Down Expand Up @@ -277,6 +289,38 @@ def load_rfcs_into_blobdb_task(start: int, end: int):

@shared_task
def create_rfc_index_task():
create_rfc_txt_index()
create_rfc_xml_index()
# This does not really need to be a task, but makes it possible to call the
# chain
generate_indexes = group(
create_rfc_index_txt_task.s(), create_rfc_index_xml_task.s()
)
# N.b., when using results backends other than memcached, redis, or dynamodb,
# the group/chord in this chain leads to repeated polling of the chord status.
# Switching to one of those three would reduce overhead.
(generate_indexes | save_rfc_indexes_task.s()).delay()


@shared_task
def save_rfc_indexes_task(blobspecs):
for spec in blobspecs:
with storages["shared_tmp"].open(spec["blob_name"], "rb") as f:
save_to_red_bucket(spec["dest_name"], f.read())
storages["shared_tmp"].delete(spec["blob_name"])


@shared_task(ignore_result=False)
def create_rfc_index_txt_task() -> dict[str, str]:
index_bytes = create_rfc_txt_index()
blob_name = storages["shared_tmp"].save(
"unpublished-rfc-index.txt", ContentFile(index_bytes)
)
return {"dest_name": "rfc-index.txt", "blob_name": blob_name}


@shared_task(ignore_result=False)
def create_rfc_index_xml_task() -> dict[str, str]:
index_bytes = create_rfc_xml_index()
blob_name = storages["shared_tmp"].save(
"unpublished-rfc-index.xml", ContentFile(index_bytes)
)
return {"dest_name": "rfc-index.xml", "blob_name": blob_name}
Loading