-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathemlv
executable file
·307 lines (254 loc) · 10.5 KB
/
emlv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
import os
import sys
import traceback
import locale
import gettext
from gemlv.sysutils import warnx
try:
import better_exchook
except ImportError:
pass
else:
better_exchook.install()
import email
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email.mime.message import MIMEMessage
from gemlv.contenttypestring import ContentTypeString
import gemlv.email
import re
from tempfile import mkstemp
from tempfile import mkdtemp
from mimetypes import guess_extension
import magic
import argparse
import time
try:
import setproctitle
except ImportError:
warnx("No module named setproctitle. Degrading.")
class setproctitle(object):
@classmethod
def setproctitle(self, *x, **kv):
pass
import xdg.BaseDirectory
from gemlv.utils import AddressLine
from gemlv.utils import getaddresslines
from gemlv.utils import decode_mimetext
from gemlv.utils import decode_readably
from gemlv.mimetext import MimeDecoded
from gemlv.mimetext import MimeEncoded
import gemlv.addressbook
from gemlv.constants import *
import gemlv.mime as mime
from gemlv.textutils import human_size
from gemlv.textutils import wildcardmatch
from gemlv.pythonutils import SubstractableList
def output_write(s):
if isinstance(s, unicode): s = s.encode('utf-8')
output.write(str(s))
def td_esc_cb(m):
return repr(m.group(0)).replace('\'', '')
def td_esc(s):
return re.sub(r'[\t\n\r]', td_esc_cb, s)
def print_mime_part_td_header():
output_write("INDEX MIME_TYPE SIZE FILENAME DESCRIPTION\n")
def list_mime_parts(eml, input_file_index):
for path, eml in walk_mime_parts(eml, [input_file_index]):
size = human_size(eml.size_decoded_approx)
output_write('\t'.join(['.'.join(map(str, path)), eml.content_type, size, td_esc(eml.filename), td_esc(eml.header['Content-Description'].decoded)])+'\n')
def display_headers(eml, extra_headers=[]):
headers = CommonUserInterestedHeaders
headers.extend(SubstractableList(extra_headers) - CommonUserInterestedHeaders)
for hname_patt in headers:
for header in eml.headers.items:
if wildcardmatch(header.raw_name, hname_patt):
hval = header.value.decoded.strip().replace('\n', '\n\t')
output_write(header.raw_name+': '+hval+'\n')
class DecodeIfTTY(object):
def __init__(self, output, decoder):
self.output = output
self.decoder = decoder
def decode(self, data):
if self.output.isatty():
return self.decoder(data)
else:
return data
def display(eml):
if eml.is_multipart():
if eml.content_type == MIMETYPE_EMAIL:
output_write(eml.payload_encoded)
else:
output_write(eml.as_string())
else:
decoder = DecodeIfTTY(output, lambda data: decode_readably(data, eml))
output_write(decoder.decode(eml.payload_decoded))
def file_or_stdio(filepath, openmode, stdio):
if filepath == '-':
return stdio
else:
return open(filepath, openmode)
def load_email(filepath):
return gemlv.email.Email(email.message_from_file(file_or_stdio(filepath, 'r', sys.stdin)))
def walk_mime_parts(eml, multipart_path=[]):
yield (multipart_path, eml)
for part_idx, part in enumerate(eml.parts):
for x in walk_mime_parts(part, multipart_path+[part_idx]):
yield x
def find_first_part(multipart, content_type=None):
for multipart_path, sub_part in walk_mime_parts(multipart):
if sub_part.content_type == content_type:
return multipart_path
return None
def safe_filename(s):
# replace directory separator to unicode U+2044 FRACTION SLASH
s = s.replace(os.path.sep, u'⁄')
return s
default_filter_prefix = '~/.local/bin/emlv-filter/'
argparser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter)
argparser.description = """Display RFC-822 email file(s) headers, content, and attachments, list MIME parts, extract attachments from it.
Without --list or --extract option, i.e. read-mode, display the first text/plain part as content (or text/html if no plain text part found).
It searches executables named <MAIN> and <MAIN>_<SUB> in """+default_filter_prefix+""" to filter content with MIME type <MAIN>/<SUB> through them.
You can disable this by "--body_filter=-" parameter."""
argp_mxg1 = argparser.add_mutually_exclusive_group()
argp_mxg1.add_argument('--list', '-l', action='store_true', help="List all MIME parts in email files given in FILE.")
argp_mxg1.add_argument('--extract', '-x', action='append', metavar='INDEX', help="Extract MIME part specified by INDEX to stdout. See --list to get the indices of each part of a MIME-multipart file. This option may be repeated.")
argp_mxg1.add_argument('--save', '-s', action='append', metavar='INDEX', help="Save MIME part specified by INDEX to file. The output file name is either the attachment's name if it's given, or \"attached-email-INDEX.eml\" if the attachment is itself an Email, or \"attachment-INDEX.dat\" otherwise. This option may be repeated.")
argparser.add_argument('--plaintext', action='store_true', help="Always take the first text/plain part as content. Default is to fall back to text/html if no text/plain found.")
argparser.add_argument('--html', action='store_true', help="Take the first text/html part as content, not text/plain. Option --body-filter is also recommended here.")
argparser.add_argument('--header', '-H', action='append', default=[], metavar='HEADER', help="Extra headers to show in read-mode besides "+', '.join(CommonUserInterestedHeaders)+". Wildcards are supported.")
argparser.add_argument('--header-filter', '-fh', metavar='COMMAND', help="Filter the headers through this command.")
argparser.add_argument('--body-filter', '-fb', metavar='COMMAND', help=("Filter the message body through this command."))
argparser.add_argument('--attachments-filter', '-fa', metavar='COMMAND', help="Filter the table of attachments through this command (in read-mode).")
argparser.add_argument('FILE', nargs='*', help="Process the given FILEs as raw RFC-822 emails. Read STDIN if FILE is not given.")
option = argparser.parse_args(args=sys.argv[1:])
if option.header_filter == '-': option.header_filter = ''
if option.body_filter == '-': option.body_filter = ''
if option.attachments_filter == '-': option.attachments_filter = ''
if option.save and option.body_filter:
raise Exception("Filter options can not be combined with --save.")
if not option.list and not option.extract and not option.save:
# "read primary part" mode
read_mode = True
else:
read_mode = False
if option.header_filter or option.attachments_filter:
raise Exception("Filter options (header, attachments) are meaningless here.")
primary_content_type = MIMETYPE_TEXT
fallback_to_html = True
if option.html:
primary_content_type = MIMETYPE_HTML
if option.plaintext:
fallback_to_html = False
if not option.FILE:
option.FILE.append('/dev/stdin')
if read_mode:
option.extract = []
for input_file_index, filename in enumerate(option.FILE):
option.extract.append('%d.primary' % input_file_index)
output = sys.stdout
if option.list:
print_mime_part_td_header()
for idx, filename in enumerate(option.FILE):
eml = load_email(filename)
list_mime_parts(eml, idx)
else:
filter_status_codes = []
if option.save:
option.extract = option.save
for idx in option.extract:
indices = idx.split('.')
input_file_index = int(indices[0])
eml = load_email(option.FILE[input_file_index])
part = eml
mime_part_indices = []
mime_part_to_display_is_found = False
if len(indices) == 2 and indices[1] == 'primary':
mime_part_indices = find_first_part(part, content_type = primary_content_type)
if mime_part_indices is not None:
mime_part_to_display_is_found = True
else:
if fallback_to_html and primary_content_type != MIMETYPE_HTML:
mime_part_indices = find_first_part(part, content_type = MIMETYPE_HTML)
if mime_part_indices is not None:
mime_part_to_display_is_found = True
else:
mime_part_indices = indices[1:]
mime_part_to_display_is_found = True
if read_mode:
if option.header_filter:
output.flush()
output = os.popen(option.header_filter, 'w')
display_headers(eml, extra_headers=option.header)
if output != sys.stdout:
output.flush()
filter_status = output.close()
filter_status_codes.append(filter_status)
output = sys.stdout
output_write("\n")
output.flush()
if read_mode and not mime_part_to_display_is_found:
sys.stderr.write("Not found any MIME part to display.\n")
else:
for part_idx in map(int, mime_part_indices):
part = part.parts[part_idx]
if option.save:
attachment_filename = part.filename
output_filename = safe_filename(attachment_filename)
if not output_filename:
if part.content_type == MIMETYPE_EMAIL:
output_filename = 'attached-email-'+idx.replace('.', '-')+'.eml'
else:
output_filename = 'attachment-'+idx.replace('.', '-')+'.dat'
output = file_or_stdio(output_filename, 'w', sys.stdout)
if part.is_multipart():
if part.content_type == MIMETYPE_EMAIL:
output.write(part.payload_encoded)
else:
output.write(part.as_string())
else:
output.write(part.payload_decoded)
output.flush()
output.close()
sys.stderr.write("emlv: saved: "+output_filename+"\n")
output = sys.stdout
else:
if option.body_filter:
output = os.popen(option.body_filter, 'w')
elif read_mode and option.body_filter != '':
for exename in part.content_type.main, part.content_type.replace('/', '_'):
exepath = os.path.expanduser(os.path.join(default_filter_prefix, exename))
if os.path.exists(exepath):
os.environ['CONTENT_TYPE_CHARSET'] = str(part.header[HDR_CT].param['charset'].decoded)
output = os.popen(exepath, 'w')
break
display(part)
if output != sys.stdout:
output.flush()
filter_status = output.close()
filter_status_codes.append(filter_status)
output = sys.stdout
if read_mode:
output_write('\n') # in case of body did not end with newline
output_write('\n') # separate content from table of attachments
if option.attachments_filter:
output.flush()
output = os.popen(option.attachments_filter, 'w')
print_mime_part_td_header()
list_mime_parts(eml, input_file_index)
if output != sys.stdout:
output.flush()
filter_status = output.close()
filter_status_codes.append(filter_status)
output = sys.stdout
output.flush()
filter_status_codes = [code for code in filter_status_codes if code is not None]
if filter_status_codes:
highest_status_code = max(filter_status_codes)
if os.WIFSIGNALED(highest_status_code):
sys.exit(128 + os.WTERMSIG(highest_status_code))
else:
sys.exit(os.WEXITSTATUS(highest_status_code))