forked from ietf-tools/datatracker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
201 lines (161 loc) · 7.09 KB
/
utils.py
File metadata and controls
201 lines (161 loc) · 7.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# Copyright The IETF Trust 2016-2023, All Rights Reserved
# -*- coding: utf-8 -*-
import re
from django.db.models import Q
from django.conf import settings
import debug # pyflakes:ignore
from ietf.community.models import CommunityList, EmailSubscription, SearchRule
from ietf.doc.models import Document, State
from ietf.group.models import Role
from ietf.person.models import Person
from ietf.ietfauth.utils import has_role
from ietf.utils.mail import send_mail
def states_of_significant_change():
return State.objects.filter(used=True).filter(
Q(type="draft-stream-ietf", slug__in=['adopt-wg', 'wg-lc', 'writeupw', 'parked', 'dead']) |
Q(type="draft-iesg", slug__in=['pub-req', 'lc', 'iesg-eva', 'rfcqueue']) |
Q(type="draft-stream-iab", slug__in=['active', 'review-c', 'rfc-edit']) |
Q(type="draft-stream-irtf", slug__in=['active', 'rg-lc', 'irsg-w', 'iesg-rev', 'rfc-edit', 'iesghold']) |
Q(type="draft-stream-ise", slug__in=['receive', 'ise-rev', 'iesg-rev', 'rfc-edit', 'iesghold']) |
Q(type="draft", slug__in=['rfc', 'dead'])
)
def can_manage_community_list(user, clist):
if not user or not user.is_authenticated:
return False
if clist.person:
return user == clist.person.user
elif clist.group:
if has_role(user, 'Secretariat'):
return True
if clist.group.type_id in ['area', 'wg', 'rg', 'ag', 'rag', 'program', ]:
return Role.objects.filter(name__slug__in=clist.group.features.groupman_roles, person__user=user, group=clist.group).exists()
return False
def reset_name_contains_index_for_rule(rule):
if not rule.rule_type == "name_contains":
return
rule.name_contains_index.set(Document.objects.filter(name__regex=rule.text))
def update_name_contains_indexes_with_new_doc(doc):
for r in SearchRule.objects.filter(rule_type="name_contains"):
# in theory we could use the database to do this query, but
# Django doesn't support a reversed regex operator, and regexp
# support needs backend-specific code so custom SQL is a bit
# cumbersome too
if re.search(r.text, doc.name) and not doc in r.name_contains_index.all():
r.name_contains_index.add(doc)
def docs_matching_community_list_rule(rule):
docs = Document.objects.all()
if rule.rule_type.endswith("_rfc"):
docs = docs.filter(type_id="rfc") # rule.state is ignored for RFCs
else:
docs = docs.filter(type_id="draft", states=rule.state)
if rule.rule_type in ['group', 'area', 'group_rfc', 'area_rfc']:
return docs.filter(Q(group=rule.group_id) | Q(group__parent=rule.group_id))
elif rule.rule_type in ['group_exp']:
return docs.filter(group=rule.group_id)
elif rule.rule_type.startswith("state_"):
return docs
elif rule.rule_type in ["author", "author_rfc"]:
return docs.filter(documentauthor__person=rule.person)
elif rule.rule_type == "ad":
return docs.filter(ad=rule.person)
elif rule.rule_type == "shepherd":
return docs.filter(shepherd__person=rule.person)
elif rule.rule_type == "name_contains":
return docs.filter(searchrule=rule)
raise NotImplementedError
def community_list_rules_matching_doc(doc):
rules = SearchRule.objects.none()
if doc.type_id not in ["draft", "rfc"]:
return rules # none
states = list(doc.states.values_list("pk", flat=True))
# group and area rules
if doc.group_id:
groups = [doc.group_id]
if doc.group.parent_id:
groups.append(doc.group.parent_id)
rules_to_add = SearchRule.objects.filter(group__in=groups)
if doc.type_id == "rfc":
rules_to_add = rules_to_add.filter(rule_type__in=["group_rfc", "area_rfc"])
else:
rules_to_add = rules_to_add.filter(
rule_type__in=["group", "area", "group_exp"],
state__in=states,
)
rules |= rules_to_add
# state rules (only relevant for I-Ds)
if doc.type_id == "draft":
rules |= SearchRule.objects.filter(
rule_type__in=[
"state_iab",
"state_iana",
"state_iesg",
"state_irtf",
"state_ise",
"state_rfceditor",
"state_ietf",
],
state__in=states,
)
# author rules
if doc.type_id == "rfc":
rules |= SearchRule.objects.filter(
rule_type="author_rfc",
person__in=list(Person.objects.filter(documentauthor__document=doc)),
)
else:
rules |= SearchRule.objects.filter(
rule_type="author",
state__in=states,
person__in=list(Person.objects.filter(documentauthor__document=doc)),
)
# Other draft-only rules rules
if doc.type_id == "draft":
if doc.ad_id:
rules |= SearchRule.objects.filter(
rule_type="ad",
state__in=states,
person=doc.ad_id,
)
if doc.shepherd_id:
rules |= SearchRule.objects.filter(
rule_type="shepherd",
state__in=states,
person__email=doc.shepherd_id,
)
rules |= SearchRule.objects.filter(
rule_type="name_contains",
state__in=states,
name_contains_index=doc, # search our materialized index to avoid full scan
)
return rules
def docs_tracked_by_community_list(clist):
if clist.pk is None:
return Document.objects.none()
# in theory, we could use an OR query, but databases seem to have
# trouble with OR queries and complicated joins so do the OR'ing
# manually
doc_ids = set()
for doc in clist.added_docs.all():
doc_ids.add(doc.pk)
doc_ids.update(rfc.pk for rfc in doc.related_that_doc("became_rfc"))
for rule in clist.searchrule_set.all():
doc_ids = doc_ids | set(docs_matching_community_list_rule(rule).values_list("pk", flat=True))
return Document.objects.filter(pk__in=doc_ids)
def community_lists_tracking_doc(doc):
return CommunityList.objects.filter(Q(added_docs=doc) | Q(searchrule__in=community_list_rules_matching_doc(doc)))
def notify_event_to_subscribers(event):
try:
significant = event.type == "changed_state" and event.state_id in [s.pk for s in states_of_significant_change()]
except AttributeError:
significant = False
subscriptions = EmailSubscription.objects.filter(community_list__in=community_lists_tracking_doc(event.doc)).distinct()
if not significant:
subscriptions = subscriptions.filter(notify_on="all")
for sub in subscriptions.select_related("community_list", "email"):
clist = sub.community_list
subject = '%s notification: Changes to %s' % (clist.long_name(), event.doc.name)
send_mail(None, sub.email.address, settings.DEFAULT_FROM_EMAIL, subject, 'community/notification_email.txt',
context = {
'event': event,
'clist': clist,
})