import email.policy
from email import errors
from email._header_value_parser import Terminal, _fold_mime_parameters, _steal_trailing_WSP_if_exists, _fold_as_ew, \
BareQuotedString, ValueTerminal, quote_string
from email.headerregistry import Address, UniqueAddressHeader, BaseHeader
from email.message import EmailMessage
from email.parser import BytesParser
#######################################################################
# Setup: a policy that triggers folding of long headers
#######################################################################
MAX_LINE_LEN = 72
GOOD_SMTP_POLICY = email.policy.default.clone(linesep='\r\n', max_line_length=MAX_LINE_LEN)
#######################################################################
# Illustrate the problem at a high level: serialization of address
# header and subsequent parsing do not achieve the same semantics.
#######################################################################
display_name = r'[email protected] ' + 'a' * MAX_LINE_LEN
addr_spec = '[email protected]'
address = Address(display_name=display_name, addr_spec=addr_spec)
message = EmailMessage(policy=GOOD_SMTP_POLICY)
message['From'] = Address(display_name=display_name, addr_spec=addr_spec)
# Trigger folding (via as_string()), then parse it back in.
msg_string = message.as_string()
msg_bytes = msg_string.encode('utf-8')
msg_deserialized = BytesParser(policy=GOOD_SMTP_POLICY).parsebytes(msg_bytes)
# Verify badness.
from_hdr = msg_deserialized['From']
assert from_hdr != str(address)
assert len(from_hdr.addresses) == 1
assert from_hdr.addresses[0].display_name != display_name
assert from_hdr.addresses[0].addr_spec != addr_spec
assert from_hdr.addresses[0].addr_spec == '[email protected]' # Definitely wrong.
#######################################################################
# Illustrate the problem at a low level: folding of address produces
# an unstructured header folding which does not respect mailbox
# structure.
#######################################################################
cls = UniqueAddressHeader
_UniqueAddressHeader = type('_' + cls.__name__, (cls, BaseHeader), {})
hdr = _UniqueAddressHeader('From', address)
assert len(hdr.addresses) == 1
assert hdr.addresses[0].display_name == display_name
assert hdr.addresses[0].addr_spec == addr_spec
bad_folded = hdr.fold(policy=GOOD_SMTP_POLICY)
assert bad_folded == ('From: [email protected]\r\n'
' aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\r\n'
' \r\n')
#########################################################################
# Fix the problem: respect use of quotes in _refold_parse_tree
#########################################################################
def _refold_parse_tree(parse_tree, *, policy):
"""Return string of contents of parse_tree folded according to RFC rules.
"""
# max_line_length 0/None means no limit, ie: infinitely long.
maxlen = policy.max_line_length or float("+inf")
encoding = 'utf-8' if policy.utf8 else 'us-ascii'
lines = ['']
last_ew = None
wrap_as_ew_blocked = 0
want_encoding = False
end_ew_not_allowed = Terminal('', 'wrap_as_ew_blocked')
parts = list(parse_tree)
while parts:
part = parts.pop(0)
if part is end_ew_not_allowed:
wrap_as_ew_blocked -= 1
continue
tstr = str(part)
try:
tstr.encode(encoding)
charset = encoding
except UnicodeEncodeError:
if any(isinstance(x, errors.UndecodableBytesDefect)
for x in part.all_defects):
charset = 'unknown-8bit'
else:
# If policy.utf8 is false this should really be taken from a
# 'charset' property on the policy.
charset = 'utf-8'
want_encoding = True
if part.token_type == 'mime-parameters':
# Mime parameter folding (using RFC2231) is extra special.
_fold_mime_parameters(part, lines, maxlen, encoding)
continue
if want_encoding and not wrap_as_ew_blocked:
if not part.as_ew_allowed:
want_encoding = False
last_ew = None
if part.syntactic_break:
encoded_part = part.fold(policy=policy)[:-1] # strip nl
if policy.linesep not in encoded_part:
# It fits on a single line
if len(encoded_part) > maxlen - len(lines[-1]):
# But not on this one, so start a new one.
newline = _steal_trailing_WSP_if_exists(lines)
# XXX what if encoded_part has no leading FWS?
lines.append(newline)
lines[-1] += encoded_part
continue
# Either this is not a major syntactic break, so we don't
# want it on a line by itself even if it fits, or it
# doesn't fit on a line by itself. Either way, fall through
# to unpacking the subparts and wrapping them.
if not hasattr(part, 'encode'):
# It's not a Terminal, do each piece individually.
parts = list(part) + parts
else:
# It's a terminal, wrap it as an encoded word, possibly
# combining it with previously encoded words if allowed.
last_ew = _fold_as_ew(tstr, lines, maxlen, last_ew,
part.ew_combine_allowed, charset)
want_encoding = False
continue
if len(tstr) \r\n')
#######################################################################
# Verify fix at high level: serialize, deserialize, assert equal.
#######################################################################
message = EmailMessage(policy=GOOD_SMTP_POLICY)
message['From'] = address
# Fold with new refold, read it in again.
msg_string = message.as_string()
msg_bytes = msg_string.encode('utf-8')
msg_deserialized = BytesParser(policy=GOOD_SMTP_POLICY).parsebytes(msg_bytes)
from_hdr = msg_deserialized['From']
assert from_hdr == str(address)
assert len(from_hdr.addresses) == 1
assert from_hdr.addresses[0].display_name == display_name
assert from_hdr.addresses[0].addr_spec == addr_spec