# -*- test-case-name: twisted.mail.test.test_imap -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Test case for twisted.mail.imap4
"""
from __future__ import annotations
import base64
import codecs
import functools
import locale
import os
import uuid
from collections import OrderedDict
from io import BytesIO
from itertools import chain
from typing import Optional, Type
from unittest import skipIf
from zope.interface import implementer
from zope.interface.verify import verifyClass, verifyObject
from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
from twisted.cred.credentials import (
CramMD5Credentials,
IUsernameHashedPassword,
IUsernamePassword,
)
from twisted.cred.error import UnauthorizedLogin
from twisted.cred.portal import IRealm, Portal
from twisted.internet import defer, error, interfaces, reactor
from twisted.internet.defer import Deferred
from twisted.internet.task import Clock
from twisted.internet.testing import StringTransport, StringTransportWithDisconnection
from twisted.mail import imap4
from twisted.mail.imap4 import MessageSet
from twisted.mail.interfaces import (
IChallengeResponse,
IClientAuthentication,
ICloseableMailboxIMAP,
)
from twisted.protocols import loopback
from twisted.python import failure, log, util
from twisted.python.compat import iterbytes, nativeString, networkString
from twisted.trial.unittest import SynchronousTestCase, TestCase
try:
from twisted.test.ssl_helpers import ClientTLSContext, ServerTLSContext
except ImportError:
ClientTLSContext = None # type: ignore[assignment,misc]
ServerTLSContext = None # type: ignore[assignment,misc]
def strip(f):
return lambda result, f=f: f()
class IMAP4UTF7Tests(TestCase):
tests = [
["Hello world", b"Hello world"],
["Hello & world", b"Hello &- world"],
["Hello\xffworld", b"Hello&AP8-world"],
["\xff\xfe\xfd\xfc", b"&AP8A,gD9APw-"],
[
"~peter/mail/\u65e5\u672c\u8a9e/\u53f0\u5317",
b"~peter/mail/&ZeVnLIqe-/&U,BTFw-",
], # example from RFC 2060
]
def test_encodeWithErrors(self):
"""
Specifying an error policy to C{unicode.encode} with the
I{imap4-utf-7} codec should produce the same result as not
specifying the error policy.
"""
text = "Hello world"
self.assertEqual(
text.encode("imap4-utf-7", "strict"), text.encode("imap4-utf-7")
)
def test_decodeWithErrors(self):
"""
Similar to L{test_encodeWithErrors}, but for C{bytes.decode}.
"""
bytes = b"Hello world"
self.assertEqual(
bytes.decode("imap4-utf-7", "strict"), bytes.decode("imap4-utf-7")
)
def test_encodeAmpersand(self):
"""
Unicode strings that contain an ampersand (C{&}) can be
encoded to bytes with the I{imap4-utf-7} codec.
"""
text = "&Hello&\N{VULGAR FRACTION ONE HALF}&"
self.assertEqual(
text.encode("imap4-utf-7"),
b"&-Hello&-&AL0-&-",
)
def test_decodeWithoutFinalASCIIShift(self):
"""
An I{imap4-utf-7} encoded string that does not shift back to
ASCII (i.e., it lacks a final C{-}) can be decoded.
"""
self.assertEqual(
b"&AL0".decode("imap4-utf-7"),
"\N{VULGAR FRACTION ONE HALF}",
)
def test_getreader(self):
"""
C{codecs.getreader('imap4-utf-7')} returns the I{imap4-utf-7} stream
reader class.
"""
reader = codecs.getreader("imap4-utf-7")(BytesIO(b"Hello&AP8-world"))
self.assertEqual(reader.read(), "Hello\xffworld")
def test_getwriter(self):
"""
C{codecs.getwriter('imap4-utf-7')} returns the I{imap4-utf-7} stream
writer class.
"""
output = BytesIO()
writer = codecs.getwriter("imap4-utf-7")(output)
writer.write("Hello\xffworld")
self.assertEqual(output.getvalue(), b"Hello&AP8-world")
def test_encode(self):
"""
The I{imap4-utf-7} can be used to encode a unicode string into a byte
string according to the IMAP4 modified UTF-7 encoding rules.
"""
for input, output in self.tests:
self.assertEqual(input.encode("imap4-utf-7"), output)
def test_decode(self):
"""
The I{imap4-utf-7} can be used to decode a byte string into a unicode
string according to the IMAP4 modified UTF-7 encoding rules.
"""
for input, output in self.tests:
self.assertEqual(input, output.decode("imap4-utf-7"))
def test_printableSingletons(self):
"""
The IMAP4 modified UTF-7 implementation encodes all printable
characters which are in ASCII using the corresponding ASCII byte.
"""
# All printables represent themselves
for o in chain(range(0x20, 0x26), range(0x27, 0x7F)):
charbyte = chr(o).encode()
self.assertEqual(charbyte, chr(o).encode("imap4-utf-7"))
self.assertEqual(chr(o), charbyte.decode("imap4-utf-7"))
self.assertEqual("&".encode("imap4-utf-7"), b"&-")
self.assertEqual(b"&-".decode("imap4-utf-7"), "&")
class BufferingConsumer:
def __init__(self):
self.buffer = []
def write(self, bytes):
self.buffer.append(bytes)
if self.consumer:
self.consumer.resumeProducing()
def registerProducer(self, consumer, streaming):
self.consumer = consumer
self.consumer.resumeProducing()
def unregisterProducer(self):
self.consumer = None
class MessageProducerTests(SynchronousTestCase):
def testSinglePart(self):
body = b"This is body text. Rar."
headers = OrderedDict()
headers["from"] = "sender@host"
headers["to"] = "recipient@domain"
headers["subject"] = "booga booga boo"
headers["content-type"] = "text/plain"
msg = FakeyMessage(headers, (), None, body, 123, None)
c = BufferingConsumer()
p = imap4.MessageProducer(msg)
d = p.beginProducing(c)
def cbProduced(result):
self.assertIdentical(result, p)
self.assertEqual(
b"".join(c.buffer),
b"{119}\r\n"
b"From: sender@host\r\n"
b"To: recipient@domain\r\n"
b"Subject: booga booga boo\r\n"
b"Content-Type: text/plain\r\n"
b"\r\n" + body,
)
return d.addCallback(cbProduced)
def testSingleMultiPart(self):
outerBody = b""
innerBody = b"Contained body message text. Squarge."
headers = OrderedDict()
headers["from"] = "sender@host"
headers["to"] = "recipient@domain"
headers["subject"] = "booga booga boo"
headers["content-type"] = 'multipart/alternative; boundary="xyz"'
innerHeaders = OrderedDict()
innerHeaders["subject"] = "this is subject text"
innerHeaders["content-type"] = "text/plain"
msg = FakeyMessage(
headers,
(),
None,
outerBody,
123,
[FakeyMessage(innerHeaders, (), None, innerBody, None, None)],
)
c = BufferingConsumer()
p = imap4.MessageProducer(msg)
d = p.beginProducing(c)
def cbProduced(result):
self.failUnlessIdentical(result, p)
self.assertEqual(
b"".join(c.buffer),
b"{239}\r\n"
b"From: sender@host\r\n"
b"To: recipient@domain\r\n"
b"Subject: booga booga boo\r\n"
b'Content-Type: multipart/alternative; boundary="xyz"\r\n'
b"\r\n"
b"\r\n"
b"--xyz\r\n"
b"Subject: this is subject text\r\n"