import email.policy from email import errors from email._header_value_parser import Terminal, _fold_mime_parameters, _steal_trailing_WSP_if_exists, _fold_as_ew, \ BareQuotedString, ValueTerminal, quote_string from email.headerregistry import Address, UniqueAddressHeader, BaseHeader from email.message import EmailMessage from email.parser import BytesParser ####################################################################### # Setup: a policy that triggers folding of long headers ####################################################################### MAX_LINE_LEN = 72 GOOD_SMTP_POLICY = email.policy.default.clone(linesep='\r\n', max_line_length=MAX_LINE_LEN) ####################################################################### # Illustrate the problem at a high level: serialization of address # header and subsequent parsing do not achieve the same semantics. ####################################################################### display_name = r'[email protected] ' + 'a' * MAX_LINE_LEN addr_spec = '[email protected]' address = Address(display_name=display_name, addr_spec=addr_spec) message = EmailMessage(policy=GOOD_SMTP_POLICY) message['From'] = Address(display_name=display_name, addr_spec=addr_spec) # Trigger folding (via as_string()), then parse it back in. msg_string = message.as_string() msg_bytes = msg_string.encode('utf-8') msg_deserialized = BytesParser(policy=GOOD_SMTP_POLICY).parsebytes(msg_bytes) # Verify badness. from_hdr = msg_deserialized['From'] assert from_hdr != str(address) assert len(from_hdr.addresses) == 1 assert from_hdr.addresses[0].display_name != display_name assert from_hdr.addresses[0].addr_spec != addr_spec assert from_hdr.addresses[0].addr_spec == '[email protected]' # Definitely wrong. ####################################################################### # Illustrate the problem at a low level: folding of address produces # an unstructured header folding which does not respect mailbox # structure. ####################################################################### cls = UniqueAddressHeader _UniqueAddressHeader = type('_' + cls.__name__, (cls, BaseHeader), {}) hdr = _UniqueAddressHeader('From', address) assert len(hdr.addresses) == 1 assert hdr.addresses[0].display_name == display_name assert hdr.addresses[0].addr_spec == addr_spec bad_folded = hdr.fold(policy=GOOD_SMTP_POLICY) assert bad_folded == ('From: [email protected]\r\n' ' aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\r\n' ' \r\n') ######################################################################### # Fix the problem: respect use of quotes in _refold_parse_tree ######################################################################### def _refold_parse_tree(parse_tree, *, policy): """Return string of contents of parse_tree folded according to RFC rules. """ # max_line_length 0/None means no limit, ie: infinitely long. maxlen = policy.max_line_length or float("+inf") encoding = 'utf-8' if policy.utf8 else 'us-ascii' lines = [''] last_ew = None wrap_as_ew_blocked = 0 want_encoding = False end_ew_not_allowed = Terminal('', 'wrap_as_ew_blocked') parts = list(parse_tree) while parts: part = parts.pop(0) if part is end_ew_not_allowed: wrap_as_ew_blocked -= 1 continue tstr = str(part) try: tstr.encode(encoding) charset = encoding except UnicodeEncodeError: if any(isinstance(x, errors.UndecodableBytesDefect) for x in part.all_defects): charset = 'unknown-8bit' else: # If policy.utf8 is false this should really be taken from a # 'charset' property on the policy. charset = 'utf-8' want_encoding = True if part.token_type == 'mime-parameters': # Mime parameter folding (using RFC2231) is extra special. _fold_mime_parameters(part, lines, maxlen, encoding) continue if want_encoding and not wrap_as_ew_blocked: if not part.as_ew_allowed: want_encoding = False last_ew = None if part.syntactic_break: encoded_part = part.fold(policy=policy)[:-1] # strip nl if policy.linesep not in encoded_part: # It fits on a single line if len(encoded_part) > maxlen - len(lines[-1]): # But not on this one, so start a new one. newline = _steal_trailing_WSP_if_exists(lines) # XXX what if encoded_part has no leading FWS? lines.append(newline) lines[-1] += encoded_part continue # Either this is not a major syntactic break, so we don't # want it on a line by itself even if it fits, or it # doesn't fit on a line by itself. Either way, fall through # to unpacking the subparts and wrapping them. if not hasattr(part, 'encode'): # It's not a Terminal, do each piece individually. parts = list(part) + parts else: # It's a terminal, wrap it as an encoded word, possibly # combining it with previously encoded words if allowed. last_ew = _fold_as_ew(tstr, lines, maxlen, last_ew, part.ew_combine_allowed, charset) want_encoding = False continue if len(tstr) \r\n') ####################################################################### # Verify fix at high level: serialize, deserialize, assert equal. ####################################################################### message = EmailMessage(policy=GOOD_SMTP_POLICY) message['From'] = address # Fold with new refold, read it in again. msg_string = message.as_string() msg_bytes = msg_string.encode('utf-8') msg_deserialized = BytesParser(policy=GOOD_SMTP_POLICY).parsebytes(msg_bytes) from_hdr = msg_deserialized['From'] assert from_hdr == str(address) assert len(from_hdr.addresses) == 1 assert from_hdr.addresses[0].display_name == display_name assert from_hdr.addresses[0].addr_spec == addr_spec