-
Notifications
You must be signed in to change notification settings - Fork 2
/
CensusEcon.py
173 lines (115 loc) · 13.7 KB
/
CensusEcon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#Load Dependencies
#%AddDeps com.databricks spark-csv_2.11 1.5.0
#//%AddDeps org.vegas-viz vegas-spark_2.11 0.3.11
#%AddDeps org.vegas-viz vegas_2.11 0.3.11
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import trim
import cdsw
#Initialize Spark Session
#initalize Spark Session
spark = SparkSession.builder \
.appName("CenusEcon") \
.master("local[*]") \
.config('spark.shuffle.service.enabled',"True") \
.getOrCreate()
spark.version
#Load Data Files
#Create a data frame from CSV File
df_WholeSetRaw = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/tmp/cfs_2012_pumf_csv.txt")
#Create Table from DataFrame
df_WholeSetRaw.createOrReplaceTempView("CensusECON")
#Display resulting Infered schema
df_WholeSetRaw.printSchema()
#Load Helper Files and Build Tables
#CFS Area
df_Table_CFSArea = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/tmp/CFS_2012_table_CFSArea.csv")
#Mode of Transportation
df_Table_ModeofTrans = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/tmp/CFS_2012_table_ModeofTrans.csv")
#NAICS - North American Industry Classificatio System Codes
df_Table_NAICS = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/tmp/CFS_2012_table_NAICS.csv")
#SCTG - Standard Classification of Transported Goods Codes
df_Table_SCTG = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/tmp/CFS_2012_table_SCTG.csv")
#State Codes
df_Table_StateCodes = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/tmp/CFS_2012_table_StateCodes.csv")
#Create Table from DataFrames
df_Table_CFSArea.createOrReplaceTempView("CensusECON_CFSArea")
df_Table_ModeofTrans.createOrReplaceTempView("CensusECON_MoT")
df_Table_NAICS.createOrReplaceTempView("CensusECON_NAICS")
df_Table_SCTG.createOrReplaceTempView("CensusECON_SCTG")
df_Table_StateCodes.createOrReplaceTempView("CensusECON_StateCodes")
#Merge Datasets
mergedatasets = spark.sql("select A.shipmt_id, A.orig_state, H.state as OrigState, A.orig_ma, A.orig_cfs_area, B.description as OrgCSFArea, A.dest_state, H.state as DestState, A.dest_ma, A.dest_cfs_area, C.description as DestCSFArea, A.naics, D.description as NAICSDesc, A.quarter, A.sctg, E.description as SCTGDesc, A.mode, F.description as ModeDesc, A.shipmt_value, A.shipmt_wght, A.shipmt_dist_gc, A.shipmt_dist_routed, A.temp_cntl_yn, A.export_yn, A.export_cntry, A.hazmat, A.wgt_factor, A.quarter from censusecon as A, censusecon_cfsarea as B, censusecon_cfsarea as C, censusecon_naics as D, censusecon_sctg as E, censusecon_mot as F, censusecon_statecodes as G, censusecon_statecodes as H where A.orig_cfs_area = B.CFSArea and A.dest_cfs_area = C.CFSArea and A.naics = D.naics and A.sctg = E.sctg and A.mode = F.modecode and A.orig_state = G.code and A.dest_state = H.code")
#PDFmerged = mergedatasets.toPandas()
#Table B1 - Shipments by Mode of Transportation
B1ShipModeTran = spark.sql("select mode, count(shipmt_id) as count, format_number( sum(shipmt_value*wgt_factor), 0) as TotalValue, format_number( sum( wgt_factor*(shipmt_wght/2000)), 0) as TotalTonnage, format_number( (sum( wgt_factor * shipmt_dist_routed)) / sum(wgt_factor), 0) as AvgerageMilesPerShip from CensusEcon group by mode order by mode")
B1ShipModeTran.show()
#Table B1 - Transportation Mode Count
B1TranModeCount = spark.sql("select F.description as ModeDesc, count(A.shipmt_id) as count, format_number( sum(A.shipmt_value*A.wgt_factor), 0) as TotalValue, format_number( sum( A.wgt_factor*(A.shipmt_wght/2000)), 0) as TotalTonnage, format_number( (sum( A.wgt_factor * A.shipmt_dist_routed)) / sum(A.wgt_factor), 0) as AvgerageMilesPerShip from CensusEcon as A, censusecon_mot as F where A.mode = F.modecode group by F.description order by F.description")
B1TranModeCount.show()
#Table B1 - Transportation Mode by Tonnage
B1TranModeTonnage = spark.sql("select F.description as ModeDesc, format_number( sum(A.shipmt_value*A.wgt_factor), 0) as TotalValue, sum( A.wgt_factor*(A.shipmt_wght/2000)) as TotalTonnage, format_number( (sum( A.wgt_factor * A.shipmt_dist_routed)) / sum(A.wgt_factor), 0) as AvgerageMilesPerShip from CensusEcon as A, censusecon_mot as F where A.mode = F.modecode group by F.description order by F.description")
B1TranModeTonnage.show()
#Table B1 - Average Miles Per Shipment by Mode of Transportation
B1AvgMilePerShip = spark.sql("select F.description as ModeDesc, count(A.shipmt_id) as count, format_number( sum(A.shipmt_value*A.wgt_factor), 0) as TotalValue, format_number( sum( A.wgt_factor*(A.shipmt_wght/2000)), 0) as TotalTonnage, format_number( (sum( A.wgt_factor * A.shipmt_dist_routed)) / sum(A.wgt_factor), 0) as AvgerageMilesPerShip from CensusEcon as A, censusecon_mot as F where A.mode = F.modecode group by F.description order by F.description")
B1AvgMilePerShip.show()
#Table B1 - Transportation Mode by Shipment Value
B1TranModeByShipValue = spark.sql("select F.description as ModeDesc, sum(A.shipmt_value*A.wgt_factor) as TotalValue, sum( A.wgt_factor*(A.shipmt_wght/2000)) as TotalTonnage, format_number( (sum( A.wgt_factor * A.shipmt_dist_routed)) / sum(A.wgt_factor), 0) as AvgerageMilesPerShip from CensusEcon as A, censusecon_mot as F where A.mode = F.modecode group by F.description order by F.description")
B1TranModeByShipValue.show()
#Table B2 - Shipments by Classification of Goods
B2ShipByClass = spark.sql("select sctg, count(shipmt_id) as count, format_number( sum(shipmt_value*wgt_factor), 0) as TotalValue, format_number(sum( wgt_factor*(shipmt_wght/2000)),0) as TotalTonnage, format_number((sum( wgt_factor * shipmt_dist_routed)) / sum(wgt_factor),0) as AvgerageMilesPerShip from CensusEcon as A group by sctg order by sctg")
B2ShipByClass.show()
#Table B2 - Shipments by Classification of Goods by Count
B2ShipByClassCount = spark.sql("select E.description as SCTGDesc, count(A.shipmt_id) as count, sum(A.shipmt_value*A.wgt_factor) as TotalValue, sum( A.wgt_factor*(A.shipmt_wght/2000)) as TotalTonnage, (sum( A.wgt_factor * A.shipmt_dist_routed)) / sum(A.wgt_factor) as AvgerageMilesPerShip from CensusEcon as A, censusecon_sctg as E where A.sctg = E.sctg group by E.description order by count")
B2ShipByClassCount.show()
#Table B2 - Shipments by Classification of Goods by Value
B2ShipByClassValue = spark.sql("select E.description as SCTGDesc, count(A.shipmt_id) as count, sum(A.shipmt_value*A.wgt_factor) as TotalValue, sum( A.wgt_factor*(A.shipmt_wght/2000)) as TotalTonnage, (sum( A.wgt_factor * A.shipmt_dist_routed)) / sum(A.wgt_factor) as AvgerageMilesPerShip from CensusEcon as A, censusecon_sctg as E where A.sctg = E.sctg group by E.description order by TotalValue")
B2ShipByClassValue.show()
#Table B2 - Shipments by Classification of Goods by Tonnage
B2ShipByClassTonnage = spark.sql("select E.description as SCTGDesc, count(A.shipmt_id) as count, sum(A.shipmt_value*A.wgt_factor) as TotalValue, sum( A.wgt_factor*(A.shipmt_wght/2000)) as TotalTonnage, (sum( A.wgt_factor * A.shipmt_dist_routed)) / sum(A.wgt_factor) as AvgerageMilesPerShip from CensusEcon as A, censusecon_sctg as E where A.sctg = E.sctg group by E.description order by totaltonnage")
B2ShipByClassTonnage.show()
#Table B2- Shipments by Classification of Goods by Average Miles Per Shipment
B2ShipByClassAvgMilePerShip = spark.sql("select E.description as SCTGDesc, count(A.shipmt_id) as count, sum(A.shipmt_value*A.wgt_factor) as TotalValue, sum( A.wgt_factor*(A.shipmt_wght/2000)) as TotalTonnage, (sum( A.wgt_factor * A.shipmt_dist_routed)) / sum(A.wgt_factor) as AvgerageMilesPerShip from CensusEcon as A, censusecon_sctg as E where A.sctg = E.sctg group by E.description order by E.description")
B2ShipByClassAvgMilePerShip.show()
#Table B3
B3Base = spark.sql("select orig_state, count(shipmt_id) as count, format_number(sum(shipmt_value*wgt_factor),0) as TotalValue, format_number(sum( wgt_factor*(shipmt_wght/2000)),0) as TotalTonnage, format_number((sum( wgt_factor * shipmt_dist_routed)) / sum(wgt_factor),0) as AvgerageMilesPerShip from CensusEcon group by orig_state order by orig_state")
B3Base.show()
#Table B3 - Shipment Count by State of Origin
B3ShipCountStateOrigin = spark.sql("select B.state, A.orig_state, count(A.shipmt_id) as count, sum(A.shipmt_value*A.wgt_factor) as TotalValue, sum(A.wgt_factor*(A.shipmt_wght/2000)) as TotalTonnage, (sum(A.wgt_factor * A.shipmt_dist_routed)) / sum(A.wgt_factor) as AvgerageMilesPerShip from censusecon as A, censusecon_statecodes as B where A.orig_state = B.code group by orig_state, B.state order by count(A.shipmt_id)")
B3ShipCountStateOrigin.show()
#Table B3 - Shipment Value by State of Origin
B3ShipValueStateOrigin = spark.sql("select B.state, A.orig_state, count(A.shipmt_id) as count, sum(A.shipmt_value*A.wgt_factor) as TotalValue, sum( A.wgt_factor*(A.shipmt_wght/2000)) as TotalTonnage, (sum(A.wgt_factor * A.shipmt_dist_routed)) / sum(A.wgt_factor) as AvgerageMilesPerShip from censusecon as A, censusecon_statecodes as B where A.orig_state = B.code group by orig_state, B.state order by TotalValue")
B3ShipValueStateOrigin.show()
#Table B3 - Shipment Count By State of Destination
B3ShipCountStateDest = spark.sql("select B.state, A.dest_state, count(A.shipmt_id) as count, sum(A.shipmt_value*A.wgt_factor) as TotalValue, sum(A.wgt_factor*(A.shipmt_wght/2000)) as TotalTonnage, (sum(A.wgt_factor * A.shipmt_dist_routed)) / sum(A.wgt_factor) as AvgerageMilesPerShip from censusecon as A, censusecon_statecodes as B where A.dest_state = B.code group by dest_state, B.state order by count(A.shipmt_id)")
B3ShipCountStateDest.show()
#Table B3 - Shipment Value by State of Destination
B3ShipValueStateDest = spark.sql("select B.state, A.dest_state, count(A.shipmt_id) as count, sum(A.shipmt_value*A.wgt_factor) as TotalValue, sum( A.wgt_factor*(A.shipmt_wght/2000)) as TotalTonnage, (sum(A.wgt_factor * A.shipmt_dist_routed)) / sum(A.wgt_factor) as AvgerageMilesPerShip from censusecon as A, censusecon_statecodes as B where A.dest_state = B.code group by dest_state, B.state order by TotalValue")
B3ShipValueStateDest.show()
#Table C1 - Variance by Mode
C1VarianceByMode = spark.sql(" select mode, format_number(exp( 3.844 + 0.039 * log(2.71828, count(shipmt_id) ) - 0.020 * pow( log(2.71828,count(shipmt_id)), 2)), 2) as Value, format_number(exp( 3.761 + 0.076 * log(2.71828, count(shipmt_id) ) - 0.019 * pow( log(2.71828,count(shipmt_id)), 2)), 2) as WGHT, format_number(exp( 4.092 - 0.015 * log(2.71828, count(shipmt_id) ) - 0.012 * pow( log(2.71828,count(shipmt_id)), 2)), 2) as TonMiles, format_number(exp( 5.168 - 0.084 * log(2.71828, count(shipmt_id) ) - 0.376 * log(2.71828, sum(wgt_factor * shipmt_dist_routed) / sum(wgt_factor))), 2) as AvgMilesShipped from CensusEcon group by mode order by mode")
C1VarianceByMode.show()
#Table C2 - Variance by SCTG
C1VarianceBySTCG = spark.sql("select sctg, format_number(exp( 3.844 + 0.039 * log(2.71828, count(shipmt_id) ) - 0.020 * pow( log(2.71828,count(shipmt_id)), 2)), 2) as Value, format_number(exp( 3.761 + 0.076 * log(2.71828, count(shipmt_id) ) - 0.019 * pow( log(2.71828,count(shipmt_id)), 2)), 2) as WGHT, format_number(exp( 4.092 - 0.015 * log(2.71828, count(shipmt_id) ) - 0.012 * pow( log(2.71828,count(shipmt_id)), 2)), 2) as TonMiles, format_number(exp( 5.168 - 0.084 * log(2.71828, count(shipmt_id) ) - 0.376 * log(2.71828, sum(wgt_factor * shipmt_dist_routed) / sum(wgt_factor))), 2) as AvgMilesShipped from CensusEcon group by sctg order by sctg")
C1VarianceBySTCG.show()
#Table C3 - Variance by Original State
C1VarianceByOriginalState = spark.sql("select orig_state, format_number(exp( 3.844 + 0.039 * log(2.71828, count(shipmt_id) ) - 0.020 * pow( log(2.71828,count(shipmt_id)), 2)), 2) as Value, format_number(exp( 3.761 + 0.076 * log(2.71828, count(shipmt_id) ) - 0.019 * pow( log(2.71828,count(shipmt_id)), 2 )), 2) as WGHT, format_number(exp( 4.092 - 0.015 * log(2.71828, count(shipmt_id) ) - 0.012 * pow( log(2.71828,count(shipmt_id)), 2 )), 2) as TonMiles, format_number(exp( 5.168 - 0.084 * log(2.71828, count(shipmt_id) ) - 0.376 * log(2.71828, sum(wgt_factor * shipmt_dist_routed) / sum(wgt_factor))), 2) as AvgMilesShipped from CensusEcon group by orig_state order by orig_state")
C1VarianceByOriginalState.show()
#Shipments by Hazards
HazardsShips = spark.sql("select hazmat, count(hazmat) from CensusEcon group by hazmat order by count(hazmat)")
HazardsShips.show()
#Hazards / Non-Hazards Shipments by Quarter
HazardsShipsByQtr = spark.sql("select quarter, hazmat, count(quarter) from CensusEcon group by quarter, hazmat order by count(quarter)")
HazardsShipsByQtr.show()
#Hazmat Shipments By State of Origin
HazardsShipsByStateOrigin = spark.sql("select B.state, hazmat, count(hazmat) from CensusEcon as A, CensusEcon_Statecodes as B where A.orig_state = B.code and A.hazmat != 'N' group by B.state, A.hazmat order by count(hazmat)")
HazardsShipsByStateOrigin.show()
#Hazmat Shipments by State of Destination
HazardsShipsByStateDest = spark.sql(" select B.state, hazmat, count(hazmat) from CensusEcon as A, CensusEcon_Statecodes as B where A.dest_state = B.code and A.hazmat != 'N' group by B.state, A.hazmat order by count(hazmat)")
HazardsShipsByStateDest.show()
#States of Origin Shipment Counts by International Destinations
HazardsShipsCountIntOrigin = spark.sql("select B.state, A.export_cntry, count(A.export_cntry) as count from censusecon as A, censusecon_statecodes as B where export_yn = 'Y' and A.orig_state = B.code group by B.state, A.export_cntry order by count")
HazardsShipsCountIntOrigin.show()
#Shipment Count by International Destination
HazardsShipsCountIntDest = spark.sql("select A.export_cntry, count(A.export_cntry) as count from censusecon as A where export_yn = 'Y' group by export_cntry order by count")
HazardsShipsCountIntDest.show()