-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjson_to_csv.py
97 lines (92 loc) · 3.99 KB
/
json_to_csv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import csv
import datetime
import sys
import json
from collections import OrderedDict
###########################################################
# json_to_csv.py
#
# Purpose:
# convert tweets json to csv with reduced columns
#
# Example:
# cat coronavirus_us.json | python json_to_csv.py >> coronavirus_us.csv
#
# Author:
# Qiushi Bai (baiqiushi@gmail.com)
###########################################################
schema = OrderedDict()
schema['id'] = 'bigint'
schema['created_at'] = 'datetime'
schema['text'] = 'varchar(500)'
schema['in_reply_to_status_id'] = 'bigint'
schema['in_reply_to_user_id'] = 'bigint'
schema['favorite_count'] = 'int'
schema['retweet_count'] = 'int'
schema['lang'] = 'varchar(10)'
schema['retweeted'] = 'boolean'
schema['hashtags'] = 'varchar(500)'
schema['user_mentions'] = 'varchar(500)'
schema['user_id'] = 'bigint'
schema['user_name'] = 'varchar(500)'
schema['user_screen_name'] = 'varchar(500)'
schema['user_location'] = 'varchar(500)'
schema['user_created_at'] = 'datetime'
schema['user_description'] = 'varchar(500)'
schema['user_followers_count'] = 'int'
schema['user_friends_count'] = 'int'
schema['user_listed_count'] = 'int'
schema['user_favourites_count'] = 'int'
schema['user_statuses_count'] = 'int'
schema['stateID'] = 'int'
schema['stateName'] = 'varchar(100)'
schema['countyID'] = 'int'
schema['countyName'] = 'varchar(100)'
schema['cityID'] = 'int'
schema['cityName'] = 'varchar(100)'
schema['country'] = 'varchar(100)'
schema['bounding_box'] = 'varchar(500)'
if __name__ == '__main__':
# print out converted csv to stdout
csv_writer = csv.writer(sys.stdout)
for line in sys.stdin:
tweet_json = json.loads(line)
csv_row = []
for column_name, column_type in schema.items():
# 1. Flatten the attributes
# bounding_box and country are inside "place" sub-object
if column_name in ["bounding_box", "country"] and tweet_json.get("place"):
value = tweet_json.get("place").get(column_name)
# stateName, countyName, cityName are inside "geo_tag" sub-object generated by TwitterGeoTagger
elif column_name in ["stateID", "stateName", "countyID", "countyName", "cityID", "cityName"] and \
tweet_json.get("geo_tag"):
value = tweet_json.get("geo_tag").get(column_name)
# attributes inside "user" sub-object
elif column_name in ["user_id", "user_name", "user_screen_name", "user_location", "user_created_at",
"user_description", "user_followers_count", "user_friends_count", "user_listed_count",
"user_favourites_count", "user_statuses_count"] \
and tweet_json.get("user"):
# user_created_at is specially handled due to its type is datetime
if column_name == 'user_created_at':
value = datetime.datetime.strptime(tweet_json.get("user").get(column_name[5:]),
'%a %b %d %H:%M:%S %z %Y')
else:
value = tweet_json.get("user").get(column_name[5:])
# created_at needs to be formatted acceptable by MySQL
elif column_name == 'created_at' and tweet_json.get(column_name):
value = datetime.datetime.strptime(tweet_json.get(column_name), '%a %b %d %H:%M:%S %z %Y')
# hashtags is inside "entities" sub-object
elif column_name == 'hashtags' and tweet_json.get("entities"):
value = tweet_json.get("entities").get(column_name)
else:
value = tweet_json.get(column_name)
# 2. Handle special data types
if column_type in ["bigint", "int"] and value is None:
value = -1
elif column_type == "boolean":
if value is True:
value = 1
else:
value = 0
csv_row.append(value)
csv_writer.writerow(csv_row)