forked from adamlaska/datatracker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path0041_create_liaison_contact_roles.py
More file actions
158 lines (136 loc) · 6.59 KB
/
0041_create_liaison_contact_roles.py
File metadata and controls
158 lines (136 loc) · 6.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# Generated by Django 2.2.17 on 2020-12-09 06:59
from django.db import migrations
from ietf.person.name import plain_name
from ietf.utils.mail import parseaddr
def find_or_create_email(email_model, person_model, formatted_email, group):
"""Look up an email address or create if needed
Also creates a Person if the email does not have one. Created Email will have
the origin field set to the origin parameter to this method.
"""
name, address = parseaddr(formatted_email)
if not address:
raise ValueError('Could not parse email "%s"' % formatted_email)
email, _ = email_model.objects.get_or_create(
address=address,
defaults=dict(origin='liaison contact: ' + group.acronym)
)
if not email.person:
person = person_model.objects.create(name=name if name else address)
email.person = person
email.save()
# Display an alert if the formatted address sent from the Role will differ
# from what was in the original contacts list
if not email.person.plain and email.person.name == email.address:
recreated_contact_email = email.address
else:
person_plain = email.person.plain if email.person.plain else plain_name(email.person.name)
recreated_contact_email = "%s <%s>" % (person_plain, email.address)
if recreated_contact_email != formatted_email:
print('>> Note: address "%s" is now "%s" (%s)' % (
formatted_email,
recreated_contact_email,
group.acronym,
))
return email
def forward(apps, schema_editor):
"""Perform forward migration
Creates liaison_contact and liaison_cc_contact Roles corresponding to existing
LiaisonStatementGroupContact instances.
"""
Group = apps.get_model('group', 'Group')
Role = apps.get_model('group', 'Role')
Email = apps.get_model('person', 'Email')
Person = apps.get_model('person', 'Person')
RoleName = apps.get_model('name', 'RoleName')
contact_role_name = RoleName.objects.get(slug='liaison_contact')
cc_contact_role_name = RoleName.objects.get(slug='liaison_cc_contact')
print()
LiaisonStatementGroupContacts = apps.get_model('liaisons', 'LiaisonStatementGroupContacts')
for lsgc in LiaisonStatementGroupContacts.objects.all():
group = lsgc.group
for contact_email in lsgc.contacts.split(','):
if contact_email:
email = find_or_create_email(Email, Person,
contact_email.strip(),
group)
Role.objects.create(
group=group,
name=contact_role_name,
person=email.person,
email=email,
)
for contact_email in lsgc.cc_contacts.split(','):
if contact_email:
email = find_or_create_email(Email, Person,
contact_email.strip(),
group)
Role.objects.create(
group=group,
name=cc_contact_role_name,
person=email.person,
email=email,
)
# Now validate that we got them all. As much as possible, use independent code
# to avoid replicating any bugs from the original migration.
for group in Group.objects.all():
lsgc = LiaisonStatementGroupContacts.objects.filter(group_id=group.pk).first()
if not lsgc:
if group.role_set.filter(name__in=[contact_role_name, cc_contact_role_name]).exists():
raise ValueError('%s group has contact roles after migration but had no LiaisonStatementGroupContacts' % (
group.acronym,
))
else:
contacts = group.role_set.filter(name=contact_role_name)
num_lsgc_contacts = len(lsgc.contacts.split(',')) if lsgc.contacts else 0
if len(contacts) != num_lsgc_contacts:
raise ValueError(
'%s group has %d contact(s) but only %d address(es) in its LiaisonStatementGroupContacts (contact addresses = "%s", LSGC.contacts="%s")' % (
group.acronym, len(contacts), num_lsgc_contacts,
'","'.join([c.email.address for c in contacts]),
lsgc.contacts,
)
)
for contact in contacts:
email = contact.email.address
if email.lower() not in lsgc.contacts.lower():
raise ValueError(
'%s group has "%s" contact but not found in LiaisonStatementGroupContacts.contacts = "%s"' % (
group.acronym, email, lsgc.contacts,
)
)
cc_contacts = group.role_set.filter(name=cc_contact_role_name)
num_lsgc_cc_contacts = len(lsgc.cc_contacts.split(',')) if lsgc.cc_contacts else 0
if len(cc_contacts) != num_lsgc_cc_contacts:
raise ValueError(
'%s group has %d CC contact(s) but %d address(es) in its LiaisonStatementGroupContacts (cc_contact addresses = "%s", LSGC.cc_contacts="%s")' % (
group.acronym, len(cc_contacts), num_lsgc_cc_contacts,
'","'.join([c.email.address for c in cc_contacts]),
lsgc.cc_contacts,
)
)
for cc_contact in cc_contacts:
email = cc_contact.email.address
if email.lower() not in lsgc.cc_contacts.lower():
raise ValueError(
'%s group has "%s" CC contact but not found in LiaisonStatementGroupContacts.cc_contacts = "%s"' % (
group.acronym, email, lsgc.cc_contacts,
)
)
def reverse(apps, schema_editor):
"""Perform reverse migration
Removes liaison_contact and liaison_cc_contact Roles. The forward migration creates missing
Email and Person instances, but these are not removed because it's difficult to do this
safely and correctly.
"""
Role = apps.get_model('group', 'Role')
Role.objects.filter(
name_id__in=['liaison_contact', 'liaison_cc_contact']
).delete()
class Migration(migrations.Migration):
dependencies = [
('group', '0040_lengthen_used_roles_fields'),
('name', '0022_add_liaison_contact_rolenames'),
]
operations = [
migrations.RunPython(forward, reverse),
]