-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathloader.py
98 lines (88 loc) · 3.6 KB
/
loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from os import path
from datetime import datetime
import pymongo
time_format = "%Y-%m-%d %H:%M:%S"
def parse_trip_data(fname):
documents = []
with pymongo.MongoClient() as client:
db = client.taxi
with open(fname, "r") as fd:
fd.readline()
for line in fd:
(medallion, hack_license, vendor_id, rate_code,
store_and_fwd_flag, pickup_time, drop_time, passenger_count,
trip_time_in_secs, trip_distance, pickup_lng, pickup_lat,
drop_lng, drop_lat) = line.split(',')
doc = {
"medallion": medallion,
"license": hack_license,
"vendor": vendor_id,
"rate_code": int(rate_code),
"pickup_time": datetime.strptime(pickup_time, time_format),
"drop_time": datetime.strptime(drop_time, time_format),
"passengers": int(passenger_count),
"trip_time": int(trip_time_in_secs),
"distance": float(trip_distance)}
try:
doc["pickup_loc"] = {
"type": "Point",
"coordinates": [float(pickup_lat),
float(pickup_lng)]}
except:
pass
try:
doc["drop_loc"] = {
"type": "Point",
"coordinates": [float(drop_lat),
float(drop_lng)]}
except:
pass
documents.append(doc)
if len(documents) >= 4000:
db.trips.insert(documents)
documents = []
if len(documents) != 0:
db.trips.insert(documents)
def parse_trip_fare(fname):
documents = []
with pymongo.MongoClient() as client:
db = client.taxi
with open(fname, "r") as fd:
fd.readline()
for line in fd:
(medallion, hack_license, vendor_id, pickup_time, payment_type,
fare_amount, surcharge, mta_tax, tip_amount, tolls_amount,
total_amount) = line.strip().split(",")
doc = {
"medallion": medallion,
"license": hack_license,
"vendor": vendor_id,
"pickup_time": datetime.strptime(pickup_time, time_format),
"payment_type": payment_type,
"fare_amount": float(fare_amount),
"surcharge": float(surcharge),
"mta_tax": float(mta_tax),
"tip_amount": float(tip_amount),
"tolls_amount": float(tolls_amount),
"total_amount": float(total_amount)
}
documents.append(doc)
if len(documents) >= 4000:
db.fares.insert(documents)
documents = []
if len(documents) != 0:
db.fares.insert(documents)
data_files = ['trip_data_small.csv']
fare_files = ['trip_fare_small.csv']
if __name__ == '__main__':
with pymongo.MongoClient() as client:
print('Removing old database.')
client.drop_database('taxi')
print('Inserting new documents...')
for fname in data_files:
print('\tParsing %s' % fname)
parse_trip_data(path.join('taxi', fname))
for fname in fare_files:
print('\tParsing %s' % fname)
parse_trip_fare(path.join('taxi', fname))
print('Done.')