-
Notifications
You must be signed in to change notification settings - Fork 1
/
getbookmarks.py
executable file
·209 lines (164 loc) · 7.05 KB
/
getbookmarks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import base64
import json
import random
import sys
import json
from urllib.parse import quote
import psycopg2
import requests
import configparser
def initialize_database():
# Initialize the parser and read the configuration file
config = configparser.ConfigParser()
config.read('db_config.ini')
# Connect to the PostgreSQL server
conn = psycopg2.connect(
host=config['postgresql']['host'],
dbname=config['postgresql']['dbname'],
user=config['postgresql']['user'],
password=config['postgresql']['password']
)
c = conn.cursor()
# Create tables if they don't yet exist
c.execute('''
CREATE TABLE IF NOT EXISTS tweets (
rest_id VARCHAR(20) PRIMARY KEY,
sort_index VARCHAR(20) NOT NULL,
screen_name TEXT NOT NULL,
created_at TIMESTAMP NOT NULL,
fetched_at TIMESTAMP NOT NULL DEFAULT NOW(),
full_text TEXT NOT NULL,
bookmarked BOOLEAN NOT NULL DEFAULT False,
liked BOOLEAN NOT NULL DEFAULT False,
important BOOLEAN NOT NULL DEFAULT False,
archived BOOLEAN NOT NULL DEFAULT False,
source_json JSONB NOT NULL
)
''')
c.execute('''
CREATE TABLE IF NOT EXISTS categories (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL UNIQUE
)
''')
c.execute('''
CREATE TABLE IF NOT EXISTS tweet_categories (
tweet_id VARCHAR(20),
category_id INTEGER,
PRIMARY KEY (tweet_id, category_id),
FOREIGN KEY (tweet_id) REFERENCES tweets(rest_id),
FOREIGN KEY (category_id) REFERENCES categories(id)
)
''')
conn.commit()
print("database initialized")
return conn
def save_tweets_to_database(conn, items: list[tuple[str, str, str, str, str, str, str, str]]):
c = conn.cursor()
# Check if we have data
if not items:
return True
# Check if we have been here before
for item in items:
rest_id = int(item[0])
# this will skip over tweets that were included as part of a different list
c.execute(f"SELECT COUNT(rest_id) FROM tweets WHERE rest_id = '%s' AND ((bookmarked != liked) AND source_json IS NOT NULL)", (rest_id,))
row = c.fetchone()
if row[0] > 0:
print(f"found existing bookmarked record for rest_id {rest_id}")
return True
# New data, insert it
c.execute(
"INSERT INTO tweets (rest_id, sort_index, screen_name, created_at, bookmarked, liked, full_text, source_json) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (rest_id) DO UPDATE SET bookmarked = EXCLUDED.bookmarked, liked = EXCLUDED.liked, source_json = EXCLUDED.source_json",
item)
conn.commit()
print(f"added rest_id {rest_id}")
return False
def fetch_data(url, headers):
# print(f"fetching data from {url}")
response = requests.get(url, headers=headers)
return response.json()
def parse_entries(entries) -> tuple[list[tuple[str, str, str, str, str, str, str, str]], str | None]:
parsed_data = []
next_cursor = None
for entry in entries:
if 'content' in entry and 'itemContent' in entry['content']:
result = entry['content']['itemContent']['tweet_results']['result']
if result['__typename'] == 'TweetWithVisibilityResults':
result = result['tweet']
rest_id = result['rest_id']
sort_index = entry['sortIndex']
screen_name = result['core']['user_results']['result']['legacy']['screen_name']
created_at = result['legacy']['created_at']
full_text = (
result['note_tweet']['note_tweet_results']['result']['text'] if 'note_tweet' in result
else result['legacy']['full_text']
)
bookmarked = result['legacy']['bookmarked']
liked = result['legacy']['favorited']
entry_json = json.dumps(entry)
parsed_data.append((rest_id, sort_index, screen_name, created_at, bookmarked, liked, full_text, entry_json))
elif 'content' in entry and 'cursorType' in entry['content'] and entry['content']['cursorType'] == "Bottom":
next_cursor = entry['content']['value']
return parsed_data, next_cursor
def construct_next_url(initial_url, cursor):
return str.replace(initial_url, 'includePromotedContent',
'cursor%22%3A%22' + cursor + '%3D%3D%22%2C%22includePromotedContent')
def random_transaction_id() -> str:
random_data = ''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789', k=70))
return base64.urlsafe_b64encode(random_data.encode()).decode()
def extract_likes(json_data):
return json_data['data']['user']['result']['timeline_v2']['timeline']['instructions'][0]['entries']
def extract_bookmarks(json_data):
return json_data['data']['bookmark_timeline_v2']['timeline']['instructions'][0]['entries']
def fetch_until_done(fetch_command, extractor):
# Strip off the outer fetch and split into URL and headers
url, headers = fetch_command[6:-2].split(',', 1)
original_url = url.strip().strip('"')
next_url = original_url
headers = json.loads(headers.strip())
# Update the transaction id
headers['headers']['x-client-transaction-id'] = random_transaction_id()
while True:
# Fetch the data
json_data = fetch_data(next_url, headers['headers'])
entries = extractor(json_data)
print(f"got {len(entries)} entries")
# Parse the data
data: list[tuple[str, str, str, str, str, str, str, str]]
data, next_cursor = parse_entries(entries)
# Save to database
up_to_date = save_tweets_to_database(connection, data)
if up_to_date:
print("up to date")
break
if not next_cursor:
print("end of list reached")
break
# Construct the next URL
print(f"next cursor is {next_cursor}")
next_cursor_encoded = quote(next_cursor.replace('=', ''))
# print(f"next cursor urlencoded is {next_cursor_encoded}")
next_url = construct_next_url(original_url, next_cursor_encoded)
# print(f"next url is {next_url}")
# Update the transaction id
headers['headers']['x-client-transaction-id'] = random_transaction_id()
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python3 getbookmarks.py 'fetch(...)' [, 'fetch(...)', ...]")
sys.exit(1)
else:
connection = initialize_database()
for arg in sys.argv[1:]:
# Initialize the database
if 'Bookmarks' in arg:
print("fetching Bookmarks")
fetch_until_done(arg, extract_bookmarks)
print("Bookmarks done")
elif 'Likes' in arg:
print("fetching Likes")
fetch_until_done(arg, extract_likes)
print("Likes done")
else:
print(f"unexpected fetch command: {arg}")
connection.close()