-
Notifications
You must be signed in to change notification settings - Fork 14
/
umail.py
executable file
·107 lines (92 loc) · 3.66 KB
/
umail.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# uMail (MicroMail) for MicroPython
# Copyright (c) 2018 Shawwwn <shawwwn1@gmail.com>
# License: MIT
import socket
DEFAULT_TIMEOUT = 10 # sec
LOCAL_DOMAIN = '127.0.0.1'
CMD_EHLO = 'EHLO'
CMD_STARTTLS = 'STARTTLS'
CMD_AUTH = 'AUTH'
CMD_MAIL = 'MAIL'
AUTH_PLAIN = 'PLAIN'
AUTH_LOGIN = 'LOGIN'
class SMTP:
def cmd(self, cmd_str):
sock = self._sock;
sock.write('%s\r\n' % cmd_str)
resp = []
next = True
while next:
code = sock.read(3)
next = sock.read(1) == b'-'
resp.append(sock.readline().strip().decode())
return int(code), resp
def __init__(self, host, port, ssl=False, username=None, password=None):
import ssl
self.username = username
addr = socket.getaddrinfo(host, port)[0][-1]
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(DEFAULT_TIMEOUT)
sock.connect(addr)
if ssl:
sock = ssl.wrap_socket(sock)
code = int(sock.read(3))
sock.readline()
assert code==220, 'cant connect to server %d, %s' % (code, resp)
self._sock = sock
code, resp = self.cmd(CMD_EHLO + ' ' + LOCAL_DOMAIN)
assert code==250, '%d' % code
if not ssl and CMD_STARTTLS in resp:
code, resp = self.cmd(CMD_STARTTLS)
assert code==220, 'start tls failed %d, %s' % (code, resp)
self._sock = ssl.wrap_socket(sock)
if username and password:
self.login(username, password)
def login(self, username, password):
self.username = username
code, resp = self.cmd(CMD_EHLO + ' ' + LOCAL_DOMAIN)
assert code==250, '%d, %s' % (code, resp)
auths = None
for feature in resp:
if feature[:4].upper() == CMD_AUTH:
auths = feature[4:].strip('=').upper().split()
assert auths!=None, "no auth method"
from ubinascii import b2a_base64 as b64
if AUTH_PLAIN in auths:
cren = b64("\0%s\0%s" % (username, password))[:-1].decode()
code, resp = self.cmd('%s %s %s' % (CMD_AUTH, AUTH_PLAIN, cren))
elif AUTH_LOGIN in auths:
code, resp = self.cmd("%s %s %s" % (CMD_AUTH, AUTH_LOGIN, b64(username)[:-1].decode()))
assert code==334, 'wrong username %d, %s' % (code, resp)
code, resp = self.cmd(b64(password)[:-1].decode())
else:
raise Exception("auth(%s) not supported " % ', '.join(auths))
assert code==235 or code==503, 'auth error %d, %s' % (code, resp)
return code, resp
def to(self, addrs, mail_from=None):
mail_from = self.username if mail_from==None else mail_from
code, resp = self.cmd('MAIL FROM: <%s>' % mail_from)
assert code==250, 'sender refused %d, %s' % (code, resp)
if isinstance(addrs, str):
addrs = [addrs]
count = 0
for addr in addrs:
code, resp = self.cmd('RCPT TO: <%s>' % addr)
if code!=250 and code!=251:
print('%s refused, %s' % (addr, resp))
count += 1
assert count!=len(addrs), 'recipient refused, %d, %s' % (code, resp)
code, resp = self.cmd('DATA')
assert code==354, 'data refused, %d, %s' % (code, resp)
return code, resp
def write(self, content):
self._sock.write(content)
def send(self, content=''):
if content:
self.write(content)
self._sock.write('\r\n.\r\n') # the five letter sequence marked for ending
line = self._sock.readline()
return (int(line[:3]), line[4:].strip().decode())
def quit(self):
self.cmd("QUIT")
self._sock.close()