Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 54 additions & 14 deletions ietf/utils/mail.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
from email.header import Header, decode_header
from email import message_from_bytes, message_from_string
from email import charset as Charset
from typing import Optional

from django.conf import settings
from django.contrib import messages
from django.core.exceptions import ImproperlyConfigured, ValidationError
from django.core.validators import validate_email
from django.http import HttpRequest
from django.template.loader import render_to_string
from django.template import Context,RequestContext
from django.utils import timezone
Expand Down Expand Up @@ -64,6 +66,18 @@ def add_headers(msg):
msg['From'] = settings.DEFAULT_FROM_EMAIL
return msg


def decode_header_value(value: str) -> str:
"""Decode a header value

Easier-to-use wrapper around email.message.decode_header()
"""
return "".join(
part.decode(charset if charset else "utf-8") if isinstance(part, bytes) else part
for part, charset in decode_header(value)
)


class SMTPSomeRefusedRecipients(smtplib.SMTPException):

def __init__(self, message, original_msg, refusals):
Expand Down Expand Up @@ -251,8 +265,7 @@ def parseaddr(addr):

"""

addr = ''.join( [ ( s.decode(m) if m else s.decode()) if isinstance(s, bytes) else s for (s,m) in decode_header(addr) ] )
name, addr = simple_parseaddr(addr)
name, addr = simple_parseaddr(decode_header_value(addr))
return name, addr

def excludeaddrs(addrlist, exlist):
Expand Down Expand Up @@ -330,18 +343,45 @@ def condition_message(to, frm, subject, msg, cc, extra):
msg['Message-ID'] = make_msgid()


def show_that_mail_was_sent(request,leadline,msg,bcc):
if request and request.user:
from ietf.ietfauth.utils import has_role
if has_role(request.user,['Area Director','Secretariat','IANA','RFC Editor','ISE','IAD','IRTF Chair','WG Chair','RG Chair','WG Secretary','RG Secretary']):
info = "%s at %s %s\n" % (leadline,timezone.now().strftime("%Y-%m-%d %H:%M:%S"),settings.TIME_ZONE)
info += "Subject: %s\n" % force_str(msg.get('Subject','[no subject]'))
info += "To: %s\n" % msg.get('To','[no to]')
if msg.get('Cc'):
info += "Cc: %s\n" % msg.get('Cc')
if bcc:
info += "Bcc: %s\n" % bcc
messages.info(request,info,extra_tags='preformatted',fail_silently=True)
def show_that_mail_was_sent(request: HttpRequest, leadline: str, msg: Message, bcc: Optional[str]):
if request and request.user:
from ietf.ietfauth.utils import has_role

if has_role(
request.user,
[
"Area Director",
"Secretariat",
"IANA",
"RFC Editor",
"ISE",
"IAD",
"IRTF Chair",
"WG Chair",
"RG Chair",
"WG Secretary",
"RG Secretary",
],
):
subject = decode_header_value(msg.get("Subject", "[no subject]"))
_to = decode_header_value(msg.get("To", "[no to]"))
info_lines = [
f"{leadline} at {timezone.now():%Y-%m-%d %H:%M:%S %Z}",
f"Subject: {subject}",
f"To: {_to}",
]
cc = msg.get("Cc", None)
if cc:
info_lines.append(f"Cc: {decode_header_value(cc)}")
if bcc:
info_lines.append(f"Bcc: {decode_header_value(bcc)}")
messages.info(
request,
"\n".join(info_lines),
extra_tags="preformatted",
fail_silently=True,
)


def save_as_message(request, msg, bcc):
by = ((request and request.user and not request.user.is_anonymous and request.user.person)
Expand Down
143 changes: 141 additions & 2 deletions ietf/utils/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
import shutil
import types

from mock import patch
from mock import call, patch
from pyquery import PyQuery
from typing import Dict, List # pyflakes:ignore

from email.message import Message
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
Expand All @@ -32,6 +33,7 @@
from django.template.defaulttags import URLNode
from django.template.loader import get_template, render_to_string
from django.templatetags.static import StaticNode
from django.test import RequestFactory
from django.urls import reverse as urlreverse

import debug # pyflakes:ignore
Expand All @@ -42,7 +44,15 @@
from ietf.utils.draft import PlaintextDraft, getmeta
from ietf.utils.fields import SearchableField
from ietf.utils.log import unreachable, assertion
from ietf.utils.mail import send_mail_preformatted, send_mail_text, send_mail_mime, outbox, get_payload_text
from ietf.utils.mail import (
send_mail_preformatted,
send_mail_text,
send_mail_mime,
outbox,
get_payload_text,
decode_header_value,
show_that_mail_was_sent,
)
from ietf.utils.test_runner import get_template_paths, set_coverage_checking
from ietf.utils.test_utils import TestCase, unicontent
from ietf.utils.text import parse_unicode
Expand Down Expand Up @@ -109,6 +119,135 @@ def test_send_mail_preformatted(self):
recv = outbox[-1]
self.assertEqual(recv['Fuzz'], 'bucket, monger')


class MailUtilsTests(TestCase):
def test_decode_header_value(self):
self.assertEqual(
decode_header_value("cake"),
"cake",
"decodes simple string value",
)
self.assertEqual(
decode_header_value("=?utf-8?b?8J+Ogg==?="),
"\U0001f382",
"decodes single utf-8-encoded part",
)
self.assertEqual(
decode_header_value("=?utf-8?b?8J+Ogg==?= = =?macintosh?b?jYxrjg==?="),
"\U0001f382 = çåké",
"decodes a value with non-utf-8 encodings",
)

# Patch in a side_effect so we can distinguish values that came from decode_header_value.
@patch("ietf.utils.mail.decode_header_value", side_effect=lambda s: f"decoded-{s}")
@patch("ietf.utils.mail.messages")
def test_show_that_mail_was_sent(self, mock_messages, mock_decode_header_value):
request = RequestFactory().get("/some/path")
request.user = object() # just needs to exist
msg = Message()
msg["To"] = "to-value"
msg["Subject"] = "subject-value"
msg["Cc"] = "cc-value"
with patch("ietf.ietfauth.utils.has_role", return_value=True):
show_that_mail_was_sent(request, "mail was sent", msg, "bcc-value")
self.assertCountEqual(
mock_decode_header_value.call_args_list,
[call("to-value"), call("subject-value"), call("cc-value"), call("bcc-value")],
)
self.assertEqual(mock_messages.info.call_args[0][0], request)
self.assertIn("mail was sent", mock_messages.info.call_args[0][1])
self.assertIn("decoded-subject-value", mock_messages.info.call_args[0][1])
self.assertIn("decoded-to-value", mock_messages.info.call_args[0][1])
self.assertIn("decoded-cc-value", mock_messages.info.call_args[0][1])
self.assertIn("decoded-bcc-value", mock_messages.info.call_args[0][1])
mock_messages.reset_mock()
mock_decode_header_value.reset_mock()

# no bcc
with patch("ietf.ietfauth.utils.has_role", return_value=True):
show_that_mail_was_sent(request, "mail was sent", msg, None)
self.assertCountEqual(
mock_decode_header_value.call_args_list,
[call("to-value"), call("subject-value"), call("cc-value")],
)
self.assertEqual(mock_messages.info.call_args[0][0], request)
self.assertIn("mail was sent", mock_messages.info.call_args[0][1])
self.assertIn("decoded-subject-value", mock_messages.info.call_args[0][1])
self.assertIn("decoded-to-value", mock_messages.info.call_args[0][1])
self.assertIn("decoded-cc-value", mock_messages.info.call_args[0][1])
# Note: here and below - when using assertNotIn(), leaving off the "decoded-" prefix
# proves that neither the original value nor the decoded value appear.
self.assertNotIn("bcc-value", mock_messages.info.call_args[0][1])
mock_messages.reset_mock()
mock_decode_header_value.reset_mock()

# no cc
del msg["Cc"]
with patch("ietf.ietfauth.utils.has_role", return_value=True):
show_that_mail_was_sent(request, "mail was sent", msg, None)
self.assertCountEqual(
mock_decode_header_value.call_args_list,
[call("to-value"), call("subject-value")],
)
self.assertEqual(mock_messages.info.call_args[0][0], request)
self.assertIn("mail was sent", mock_messages.info.call_args[0][1])
self.assertIn("decoded-subject-value", mock_messages.info.call_args[0][1])
self.assertIn("decoded-to-value", mock_messages.info.call_args[0][1])
self.assertNotIn("cc-value", mock_messages.info.call_args[0][1])
self.assertNotIn("bcc-value", mock_messages.info.call_args[0][1])
mock_messages.reset_mock()
mock_decode_header_value.reset_mock()

# no to
del msg["To"]
with patch("ietf.ietfauth.utils.has_role", return_value=True):
show_that_mail_was_sent(request, "mail was sent", msg, None)
self.assertCountEqual(
mock_decode_header_value.call_args_list,
[call("[no to]"), call("subject-value")],
)
self.assertEqual(mock_messages.info.call_args[0][0], request)
self.assertIn("mail was sent", mock_messages.info.call_args[0][1])
self.assertIn("decoded-subject-value", mock_messages.info.call_args[0][1])
self.assertIn("decoded-[no to]", mock_messages.info.call_args[0][1])
self.assertNotIn("to-value", mock_messages.info.call_args[0][1])
self.assertNotIn("cc-value", mock_messages.info.call_args[0][1])
self.assertNotIn("bcc-value", mock_messages.info.call_args[0][1])
mock_messages.reset_mock()
mock_decode_header_value.reset_mock()

# no subject
del msg["Subject"]
with patch("ietf.ietfauth.utils.has_role", return_value=True):
show_that_mail_was_sent(request, "mail was sent", msg, None)
self.assertCountEqual(
mock_decode_header_value.call_args_list,
[call("[no to]"), call("[no subject]")],
)
self.assertEqual(mock_messages.info.call_args[0][0], request)
self.assertIn("mail was sent", mock_messages.info.call_args[0][1])
self.assertIn("decoded-[no subject]", mock_messages.info.call_args[0][1])
self.assertNotIn("subject-value", mock_messages.info.call_args[0][1])
self.assertIn("decoded-[no to]", mock_messages.info.call_args[0][1])
self.assertNotIn("to-value", mock_messages.info.call_args[0][1])
self.assertNotIn("cc-value", mock_messages.info.call_args[0][1])
self.assertNotIn("bcc-value", mock_messages.info.call_args[0][1])
mock_messages.reset_mock()
mock_decode_header_value.reset_mock()

# user does not have role
with patch("ietf.ietfauth.utils.has_role", return_value=False):
show_that_mail_was_sent(request, "mail was sent", msg, None)
self.assertFalse(mock_messages.called)

# no user
request.user = None
with patch("ietf.ietfauth.utils.has_role", return_value=True) as mock_has_role:
show_that_mail_was_sent(request, "mail was sent", msg, None)
self.assertFalse(mock_messages.called)
self.assertFalse(mock_has_role.called)


class TestSMTPServer(TestCase):

def test_address_rejected(self):
Expand Down