-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathexport.py
126 lines (110 loc) · 3.76 KB
/
export.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import os, importlib, csv
import database
from cmd import Cmd
import numpy as np
class Exporter(Cmd):
prompt = "> "
loaded = {} # prefix: db
variables = []
def do_available(self, line):
"""List all available databases.
Usage: available
"""
for root, dir, files in os.walk("."):
if '__init__.py' in files:
print root[2:]
def do_load(self, module):
"""Load an available database.
Usage: available [DB name]
"""
print "Loading " + module + "..."
try:
db = importlib.import_module(module + '.main').load()
self.loaded[module] = db
print self.loaded.keys()
except Exception as ex:
print "Failed to load " + module
print ex
def do_describe(self, variable):
"""Describe a variable.
Usage: describe [variable]
"""
try:
# Look for this variable
db = self.get_database()
print db.describe_variable(variable)
print "Years: " + str(db.get_years(variable))
except:
print "Could not find variable " + variable
def do_list(self, line):
"""List the available variables across all loaded databases.
Usage: list
"""
db = self.get_database()
if db is None:
print "None."
else:
print db.get_variables()
def do_add(self, variable):
"""Add a variable to the export list.
Usage: add [variable]
"""
try:
# Look for this variable
db = self.get_database()
print db.describe_variable(variable)
print "Years: " + str(db.get_years(variable))
self.variables.append(variable)
except:
print "Could not find variable " + variable
def do_export(self, filepath):
"""Export all variables on the export list to a file.
Usage: export [file path]
"""
# Collect all data
data = {}
db = self.get_database()
for variable in self.variables:
years = db.get_years(variable)
if years is None:
data[variable] = np.array(db.get_data(variable, None))
else:
for year in years:
data[variable + ':' + str(year)] = np.array(db.get_data(variable, year))
try:
with open(os.path.expanduser(filepath), 'w') as fp:
writer = csv.writer(fp)
writer.writerow(['fips'] + data.keys())
for ii in range(len(db.get_fips())):
row = [db.get_fips()[ii]] + [data[key][ii] for key in data.keys()]
writer.writerow(row)
except Exception as ex:
print "Error: " + str(ex)
def do_import(self, filepath):
"""Import a database with a FIPS column.
Usage: import [file path]
"""
db = database.CSVDatabase.smart_import(filepath)
self.loaded[filepath] = db
print "Loaded."
def do_script(self, filepath):
try:
with open(os.path.expanduser(filepath), 'r') as fp:
for line in fp:
self.onecmd(line)
except Exception as ex:
print "Error: " + str(ex)
def get_database(self):
if not self.loaded:
return None
if self.loaded and len(self.loaded) == 1:
return self.loaded.values()[0]
return database.CombinedDatabase(self.loaded.values(), self.loaded.keys(), '.')
def do_bye(self, line):
"""Close the interface.
Usage: bye
"""
return True
print "Welcome to the County Data Exporter."
print "For help, type `help`.\n"
Exporter().cmdloop()