Skip to content
This repository has been archived by the owner on Mar 23, 2023. It is now read-only.

Commit

Permalink
Add mimetools module (#326)
Browse files Browse the repository at this point in the history
  • Loading branch information
trotterdylan authored Jun 10, 2017
1 parent 64014f0 commit 16b7b4a
Show file tree
Hide file tree
Showing 3 changed files with 306 additions and 0 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ STDLIB_TESTS := \
test/test_dict \
test/test_list \
test/test_md5 \
test/test_mimetools \
test/test_operator \
test/test_quopri \
test/test_rfc822 \
Expand Down
250 changes: 250 additions & 0 deletions third_party/stdlib/mimetools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
"""Various tools used by MIME-reading or MIME-writing programs."""


import os
import sys
import tempfile
from warnings import filterwarnings, catch_warnings
with catch_warnings():
if sys.py3kwarning:
filterwarnings("ignore", ".*rfc822 has been removed", DeprecationWarning)
import rfc822

from warnings import warnpy3k
warnpy3k("in 3.x, mimetools has been removed in favor of the email package",
stacklevel=2)

__all__ = ["Message","choose_boundary","encode","decode","copyliteral",
"copybinary"]

class Message(rfc822.Message):
"""A derived class of rfc822.Message that knows about MIME headers and
contains some hooks for decoding encoded and multipart messages."""

def __init__(self, fp, seekable = 1):
rfc822.Message.__init__(self, fp, seekable)
self.encodingheader = \
self.getheader('content-transfer-encoding')
self.typeheader = \
self.getheader('content-type')
self.parsetype()
self.parseplist()

def parsetype(self):
str = self.typeheader
if str is None:
str = 'text/plain'
if ';' in str:
i = str.index(';')
self.plisttext = str[i:]
str = str[:i]
else:
self.plisttext = ''
fields = str.split('/')
for i in range(len(fields)):
fields[i] = fields[i].strip().lower()
self.type = '/'.join(fields)
self.maintype = fields[0]
self.subtype = '/'.join(fields[1:])

def parseplist(self):
str = self.plisttext
self.plist = []
while str[:1] == ';':
str = str[1:]
if ';' in str:
# XXX Should parse quotes!
end = str.index(';')
else:
end = len(str)
f = str[:end]
if '=' in f:
i = f.index('=')
f = f[:i].strip().lower() + \
'=' + f[i+1:].strip()
self.plist.append(f.strip())
str = str[end:]

def getplist(self):
return self.plist

def getparam(self, name):
name = name.lower() + '='
n = len(name)
for p in self.plist:
if p[:n] == name:
return rfc822.unquote(p[n:])
return None

def getparamnames(self):
result = []
for p in self.plist:
i = p.find('=')
if i >= 0:
result.append(p[:i].lower())
return result

def getencoding(self):
if self.encodingheader is None:
return '7bit'
return self.encodingheader.lower()

def gettype(self):
return self.type

def getmaintype(self):
return self.maintype

def getsubtype(self):
return self.subtype




# Utility functions
# -----------------

#try:
import thread
#except ImportError:
# import dummy_thread as thread
_counter_lock = thread.allocate_lock()
del thread

_counter = 0
def _get_next_counter():
global _counter
_counter_lock.acquire()
_counter += 1
result = _counter
_counter_lock.release()
return result

_prefix = None

#def choose_boundary():
# """Return a string usable as a multipart boundary.
#
# The string chosen is unique within a single program run, and
# incorporates the user id (if available), process id (if available),
# and current time. So it's very unlikely the returned string appears
# in message text, but there's no guarantee.
#
# The boundary contains dots so you have to quote it in the header."""
#
# global _prefix
# import time
# if _prefix is None:
# import socket
# try:
# hostid = socket.gethostbyname(socket.gethostname())
# except socket.gaierror:
# hostid = '127.0.0.1'
# try:
# uid = repr(os.getuid())
# except AttributeError:
# uid = '1'
# try:
# pid = repr(os.getpid())
# except AttributeError:
# pid = '1'
# _prefix = hostid + '.' + uid + '.' + pid
# return "%s.%.3f.%d" % (_prefix, time.time(), _get_next_counter())


# Subroutines for decoding some common content-transfer-types

def decode(input, output, encoding):
"""Decode common content-transfer-encodings (base64, quopri, uuencode)."""
if encoding == 'base64':
import base64
return base64.decode(input, output)
if encoding == 'quoted-printable':
import quopri
return quopri.decode(input, output)
if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'):
import uu
return uu.decode(input, output)
if encoding in ('7bit', '8bit'):
return output.write(input.read())
if encoding in decodetab:
pipethrough(input, decodetab[encoding], output)
else:
raise ValueError, \
'unknown Content-Transfer-Encoding: %s' % encoding

def encode(input, output, encoding):
"""Encode common content-transfer-encodings (base64, quopri, uuencode)."""
if encoding == 'base64':
import base64
return base64.encode(input, output)
if encoding == 'quoted-printable':
import quopri
return quopri.encode(input, output, 0)
if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'):
import uu
return uu.encode(input, output)
if encoding in ('7bit', '8bit'):
return output.write(input.read())
if encoding in encodetab:
pipethrough(input, encodetab[encoding], output)
else:
raise ValueError, \
'unknown Content-Transfer-Encoding: %s' % encoding

# The following is no longer used for standard encodings

# XXX This requires that uudecode and mmencode are in $PATH

uudecode_pipe = '''(
TEMP=/tmp/@uu.$$
sed "s%^begin [0-7][0-7]* .*%begin 600 $TEMP%" | uudecode
cat $TEMP
rm $TEMP
)'''

decodetab = {
'uuencode': uudecode_pipe,
'x-uuencode': uudecode_pipe,
'uue': uudecode_pipe,
'x-uue': uudecode_pipe,
'quoted-printable': 'mmencode -u -q',
'base64': 'mmencode -u -b',
}

encodetab = {
'x-uuencode': 'uuencode tempfile',
'uuencode': 'uuencode tempfile',
'x-uue': 'uuencode tempfile',
'uue': 'uuencode tempfile',
'quoted-printable': 'mmencode -q',
'base64': 'mmencode -b',
}

def pipeto(input, command):
pipe = os.popen(command, 'w')
copyliteral(input, pipe)
pipe.close()

def pipethrough(input, command, output):
(fd, tempname) = tempfile.mkstemp()
temp = os.fdopen(fd, 'w')
copyliteral(input, temp)
temp.close()
pipe = os.popen(command + ' <' + tempname, 'r')
copybinary(pipe, output)
pipe.close()
os.unlink(tempname)

def copyliteral(input, output):
while 1:
line = input.readline()
if not line: break
output.write(line)

def copybinary(input, output):
BUFSIZE = 8192
while 1:
line = input.read(BUFSIZE)
if not line: break
output.write(line)
55 changes: 55 additions & 0 deletions third_party/stdlib/test/test_mimetools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import unittest
from test import test_support

import string
import StringIO

#mimetools = test_support.import_module("mimetools", deprecated=True)
import mimetools

msgtext1 = mimetools.Message(StringIO.StringIO(
"""Content-Type: text/plain; charset=iso-8859-1; format=flowed
Content-Transfer-Encoding: 8bit
Foo!
"""))

class MimeToolsTest(unittest.TestCase):

def test_decodeencode(self):
start = string.ascii_letters + "=" + string.digits + "\n"
for enc in ['7bit','8bit','base64','quoted-printable',
'uuencode', 'x-uuencode', 'uue', 'x-uue']:
i = StringIO.StringIO(start)
o = StringIO.StringIO()
mimetools.encode(i, o, enc)
i = StringIO.StringIO(o.getvalue())
o = StringIO.StringIO()
mimetools.decode(i, o, enc)
self.assertEqual(o.getvalue(), start)

@unittest.expectedFailure
def test_boundary(self):
s = set([""])
for i in xrange(100):
nb = mimetools.choose_boundary()
self.assertNotIn(nb, s)
s.add(nb)

def test_message(self):
msg = mimetools.Message(StringIO.StringIO(msgtext1))
self.assertEqual(msg.gettype(), "text/plain")
self.assertEqual(msg.getmaintype(), "text")
self.assertEqual(msg.getsubtype(), "plain")
self.assertEqual(msg.getplist(), ["charset=iso-8859-1", "format=flowed"])
self.assertEqual(msg.getparamnames(), ["charset", "format"])
self.assertEqual(msg.getparam("charset"), "iso-8859-1")
self.assertEqual(msg.getparam("format"), "flowed")
self.assertEqual(msg.getparam("spam"), None)
self.assertEqual(msg.getencoding(), "8bit")

def test_main():
test_support.run_unittest(MimeToolsTest)

if __name__=="__main__":
test_main()

0 comments on commit 16b7b4a

Please sign in to comment.