-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtweetHook.py
65 lines (53 loc) · 2.36 KB
/
tweetHook.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import requests,json, tweepy
from random import randint
import os
from boto.s3.connection import S3Connection
import time
CONSUMER_KEY = os.environ['consumer_key']
CONSUMER_SECRET = os.environ['consumer_secret']
ACCESS_TOKEN = os.environ['access_token']
ACCESS_TOKEN_SECRET = os.environ['access_token_secret']
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
api = tweepy.API(auth)
def create_url_tweets():
return "https://api.twitter.com/2/tweets/search/stream?tweet.fields=in_reply_to_user_id,author_id,referenced_tweets,conversation_id,public_metrics,created_at,source"
def create_headers(bearer_token):
headers = {"Authorization": "Bearer {}".format(bearer_token)}
return headers
def connect_to_endpoint_tweet(url, headers):
response = requests.request("GET", url, headers=headers, stream=True)
print('twitter response: ',response.status_code)
# print(response.headers)
if response.status_code != 200:
print(f"Request returned an error code: {response.status_code} and text: {response.text}")
print(f"header: {response.headers}")
time.sleep(2)
return
else:
try:
for response_line in response.iter_lines():
if response_line:
json_response = json.loads(response_line)
print(json.dumps(json_response, indent =4,sort_keys=True))
authID = json_response["data"]["author_id"]
parentTwtID = json_response["data"]["referenced_tweets"][0]["id"]
parentauthor = json_response["data"]["in_reply_to_user_id"]
message = "https://twitter.com/"+parentauthor+"/status/"+parentTwtID
try:
dm = api.send_direct_message(recipient_id = authID,text = message)
print(dm.message_create['message_data']['text'])
except tweepy.TweepError as e:
print("ThreaderBot:Error in sending '{}' as dm response, {}".format(message,e))
except:
pass
def main():
bearer_token = os.environ['brToken']
url = create_url_tweets()
headers = create_headers(bearer_token)
timeout = 0
while(1):
connect_to_endpoint_tweet(url,headers)
timeout += 1
if __name__ == "__main__":
main()