-
Notifications
You must be signed in to change notification settings - Fork 3
/
weibo.py
347 lines (293 loc) · 14 KB
/
weibo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
file: weibo.py
author:darkbull(http://darkbull.net)
date: 2011-10-22
desc:
weibo api的python封装.(基于oauth1.0)
weibo 微博在线api文档参考:http://open.weibo.com/wiki/API文档
说明:
调用接口的参数名称,如果官方文档以":"开始,用"__"代替,例如::id 用 __id 代替
python版本要求:python2.6+,不支持python3.x
example:
api = OAuthApi('appkey', 'appsecret')
token = api.create_token()
url = token.get_auth_url()
import webbrowser
webbrowser.open(url)
code = raw_input(">>").strip()
token.set_verifier(code)
# api.statuses.update.post(token, status = u'测试数据3')
api.statuses.upload.post(token, status = u'好好学习,天天向上。', pic = '~/test.jpg')
'''
__version__ = '0.1b'
__author__ = 'darkbull(http://darkbull.net)'
import urllib
import binascii
import time
import random
import json
import httplib
import uuid
import mimetypes
import hmac
import hashlib
from os.path import getsize, isfile, basename
from urlparse import urlparse
hmac_sha1 = lambda key, val: binascii.b2a_base64(hmac.new(str(key), val, hashlib.sha1).digest())[:-1]
nonce = lambda: str(random.randint(1000000, 9999999))
tm = lambda: str(int(time.time()))
utf8 = lambda u: u.encode('utf-8')
urlencode = lambda p: urllib.quote_plus(p, safe = '~')
urldecode = lambda s: urllib.unquote(s)
class OAuthError(IOError):
pass
class WeiBoError(Exception):
pass
class DictObject(dict):
def __init__(self, d):
if isinstance(d, basestring):
d = json.loads(d)
dict.__init__(self, d)
def __getattr__(self, attr):
if attr in self:
ret = self[attr]
if type(ret) is dict:
return DictObject(ret)
elif type(ret) is list:
for idx, item in enumerate(ret):
if type(item) is dict:
ret[idx] = DictObject(item)
return ret
else:
raise AttributeError, attr
class OAuthToken(object):
def __init__(self, appkey, appsecret, oauth_token, oauth_token_secret, user_id = 0, original_data = '', callback = 'oob'):
"""
@param callback: 回调方式
"""
self.appkey = appkey # 该token对应的应用
self.appsecret = appsecret # 该token对应的应用的secret
# appsecret是可以重置的,重置之后,原有授权过的token将不能再使用。
self.oauth_token = oauth_token
self.oauth_token_secret = oauth_token_secret
self.user_id = user_id
self.original_data = original_data
self.callback = callback
@property
def verified(self):
"""Token是否已经授权过
"""
return True if self.user_id != 0 else False
def __str__(self):
return self.original_data
def get_auth_url(self, display = 'page'):
"""获取用户授权url
@param display: 参考http://open.weibo.com/wiki/Oauth/authorize
"""
_AUTHORIZE_URL = 'http://api.t.sina.com.cn/oauth/authorize?oauth_token={token}&display={display}'
url = _AUTHORIZE_URL.format(token = self.oauth_token, display = display)
if self.callback.lower().startswith('http'): # 参考 http://open.weibo.com/wiki/OAuth "小提示“: 建议始终加上oauth_callback参数
url += '&oauth_callback=' + urlencode(self.callback)
return url
_SIGNATURE_BASE_STRING2 = ('POST', 'http://api.t.sina.com.cn/oauth/access_token', 'oauth_consumer_key={app_key}&oauth_nonce={nonce}&oauth_signature_method=HMAC-SHA1&oauth_timestamp={timestamp}&oauth_token={token}&oauth_verifier={verifier}&oauth_version=1.0')
_ACCESS_TOKEN_URL = 'http://api.t.sina.com.cn/oauth/access_token?oauth_consumer_key={app_key}&oauth_nonce={nonce}&oauth_signature_method=HMAC-SHA1&oauth_timestamp={timestamp}&oauth_token={token}&oauth_verifier={verifier}&oauth_version=1.0&oauth_signature={signature}'
def set_verifier(self, verifier):
'''通过授权的request token换取access token
'''
if self.verified:
raise OAuthError, ('oauth error', 'token is already authorized.')
n, t = nonce(), tm()
http_method, uri, query = OAuthToken._SIGNATURE_BASE_STRING2
args = query.format(app_key = self.appkey, nonce = n, timestamp = t, token = self.oauth_token, verifier = verifier)
sig_base_str = '&'.join(urlencode(item) for item in (http_method, uri, args))
sig = hmac_sha1('%s&%s' % (self.appsecret, self.oauth_token_secret), sig_base_str) # 新浪的weibo签名验证只检查前x位
url = OAuthToken._ACCESS_TOKEN_URL.format(app_key = self.appkey, nonce = n, signature = urlencode(sig), timestamp = t, token = self.oauth_token, verifier = verifier)
try:
errcode, reason, html = _request(http_method, url)
# eg: html: oauth_token=a8caa50e0b6612778b3c849e27e398b2&oauth_token_secret=55729344a581f9acd24ad1551a7cc3cb&user_id=2617375872
except IOError as ex:
raise OAuthError(ex)
if errcode != 200:
raise OAuthError, (errcode, 'oauth error', reason)
self.oauth_token, self.oauth_token_secret, user_id = (item.split('=')[1] for item in html.split('&'))
self.user_id = int(user_id)
def to_header(self):
if not self.verified:
raise OAuthError, ('oauth error', 'unauthorized token.')
return {
'oauth_consumer_key': self.appkey,
'oauth_token': self.oauth_token,
'oauth_signature_method': 'HMAC-SHA1',
'oauth_timestamp': tm(),
'oauth_nonce': nonce(),
'oauth_version': '1.0',
}
def _request(http_method, url, query = None, timeout = 10, token = None):
'''向远程服务器发送一个http request
@param http_method: 请求方法
@param url: 网址
@param query: 提交的参数. dict: key: 表单域名称, value: 域值
@return: 元组(response status, reason, response html)
'''
scheme, netloc, path, params, args = urlparse(url)[:5]
if args:
path += '?' + args
conn = httplib.HTTPConnection(netloc, timeout = timeout) if scheme == 'http' else httplib.HTTPSConnection(netloc, timeout = timeout)
headers = {
'User-Agent': 'WeiBo-Python-Client; Created by darkbull(http://darkbull.net)',
'Host': netloc,
}
upload_pic = 'statuses/upload' in url
if token and query:
# 生成签名
# 如果有上传图片,只需签名"oauth_"开头的参数. Fuck, 在api文档里没有一点说明,浪费了我n多时间。fuck....
if upload_pic:
items = [(key, val) for key, val in query.items() if key.startswith('oauth_')]
else:
items = query.items()
items.sort()
t = '&'.join(('%s=%s' % (urlencode(key), urlencode(val)) for key, val in items))
sig_base_str = '%s&%s&%s' % (http_method, urlencode(url), urlencode(t))
sig = hmac_sha1('%s&%s' % (token.appsecret, token.oauth_token_secret), sig_base_str)
query['oauth_signature'] = sig
t = [ ]
auth_header = ['OAuth realm=""']
for key in query:
if key.startswith('oauth_'):
auth_header.append('%s="%s"' % (urlencode(key), urlencode(query[key])))
t.append(key)
for key in t:
del query[key]
headers['Authorization'] = ', '.join(auth_header)
if upload_pic: # 需要上传图片
assert http_method == 'POST'
assert 'pic' in query
pic_path = query.pop('pic')
if not isfile(pic_path):
raise WeiBoError(u'File "{0}" not exist' % pic_path)
if getsize(pic_path) > 1024 * 1024 * 5: # 新浪微博上传图片大小限制是5M.
raise WeiBoError('Size of file "{0}" must be less than 5M.' % pic_path)
boundary = '------' + str(uuid.uuid4())
body = [ ]
if query:
for field_name, val in query.items():
body.append('--' + boundary)
body.append('Content-Disposition: form-data; name="%s"' % field_name)
body.append('Content-Type: text/plain; charset=US-ASCII')
body.append('Content-Transfer-Encoding: 8bit')
body.append('')
if type(val) is unicode:
body.append(utf8(val))
else:
body.append(val)
mimetype = mimetypes.guess_type(pic_path)[0]
with open(pic_path, 'rb') as f:
data = f.read()
filename = basename(pic_path)
body.append('--' + boundary)
body.append('Content-Disposition: form-data; name="%s"; filename="%s"' % ('pic', filename))
if mimetype:
body.append('Content-Type: ' + mimetype)
body.append('Content-Transfer-Encoding: binary')
body.append('')
body.append(data)
body.append('--' + boundary + '--')
body.append('')
body = '\r\n'.join(body)
headers['Content-Type'] = 'multipart/form-data; boundary=' + boundary
headers['Content-Length'] = str(len(body))
headers['Connection'] = 'keep-alive'
else:
body = urllib.urlencode(query) if query else ''
if http_method == 'POST':
headers['Content-Type'] = 'application/x-www-form-urlencoded'
headers['Content-Length'] = str(len(body))
else:
if body:
if args:
path += '&' + body
else:
path += '?' + body
body = ''
conn.request(http_method, path, body = body, headers = headers)
resp = conn.getresponse()
result = (resp.status, resp.reason, resp.read())
conn.close()
return result
_URI_COMMON = 'http://api.t.sina.com.cn/'
def _call(http_method, uri, token, **kwargs):
if not uri.startswith('http'):
uri = _URI_COMMON + uri
if not uri.endswith('.json'):
uri = uri + '.json'
http_method = http_method.upper()
params = token.to_header()
for key, val in kwargs.items():
if key.startswith('__'): # 很恶心的参数,如::id, 这里用 __id代替
key = ':' + key[2:]
if type(key) is unicode:
key = utf8(key)
if type(val) is unicode:
val = utf8(val)
params[str(key)] = str(val)
try:
errcode, reason, html = _request(http_method, uri, params, token = token)
except IOError as ex:
raise WeiBoError(ex)
if errcode != 200:
try:
json_obj = json.loads(html)
raise WeiBoError(u'[error:%s occur when request "%s"]:%s' % (json_obj['error_code'], json_obj['request'], json_obj['error']))
except WeiBoError:
raise
except Exception:
raise WeiBoError('errcode: %d, reason: %s, html: %s' % (errcode, reason, html))
response = html.decode('utf-8') # utf-8 => unicode
json_obj = json.loads(response)
if type(json_obj) is dict and json_obj.get('error_code'):
# 错误具体信息查询: http://open.weibo.com/wiki/Error_code
raise WeiBoError(u'[error:%s occur when request "%s"]:%s' % (json_obj['error_code'], json_obj['request'], json_obj['error']))
return DictObject(json_obj)
class OAuthApi(object):
def __init__(self, appkey, appsecret):
self.appkey = appkey
self.appsecret = appsecret
self._attrs = [ ]
_SIGNATURE_BASE_STRING = ('GET', 'http://api.t.sina.com.cn/oauth/request_token', 'oauth_callback={callback}&oauth_consumer_key={app_key}&oauth_nonce={nonce}&oauth_signature_method=HMAC-SHA1&oauth_timestamp={timestamp}&oauth_version=1.0')
_REQUEST_TOKEN_URL = 'http://api.t.sina.com.cn/oauth/request_token?oauth_callback={callback}&oauth_consumer_key={app_key}&oauth_nonce={nonce}&oauth_signature={signature}&oauth_signature_method=HMAC-SHA1&oauth_timestamp={timestamp}&oauth_version=1.0'
def create_token(self, callback = 'oob'):
'''创建未授权的request token
@param callback: 回调地址. oob 表示使用PIN码(桌面应用)
'''
n, t, cbk = nonce(), tm(), urlencode(callback)
http_method, uri, query = OAuthApi._SIGNATURE_BASE_STRING
args = query.format(callback = cbk, app_key = self.appkey, nonce = n, timestamp = t)
sig_base_str = '&'.join(urlencode(item) for item in (http_method, uri, args))
sig = hmac_sha1(self.appsecret + '&', sig_base_str)
url = OAuthApi._REQUEST_TOKEN_URL.format(callback = cbk, app_key = self.appkey, nonce = n, signature = urlencode(sig), timestamp = t)
try:
errcode, reason, html = _request(http_method, url)
# eg: html: oauth_token=2f74a40e6419251641b4f28fa7aee417&oauth_token_secret=404dcbe420806af2acd4e1583035939d
except IOError as ex:
raise OAuthError(ex)
if errcode != 200:
raise OAuthError, (errcode, 'oauth error', reason)
token, secret = (item.split('=')[1] for item in html.split('&'))
token = OAuthToken(self.appkey, self.appsecret, token, secret, original_data = html, callback = callback)
return token
def __getattr__(self, attr):
self._attrs.append(attr)
return self
def __call__(self, token = None, **kwargs):
"""调用接口,如:api.statuses.public_timeline.get(token) # 以get方式提交请求
"""
http_method = self._attrs[-1]
api_uri = '/'.join(self._attrs[:-1])
self._attrs = [ ]
return _call(http_method, api_uri, token, **kwargs)
# 通过授权的token,可以直接通过 weibo.api.进行调用
api = OAuthApi('', '')
if __name__ == '__main__':
pass