-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
196 lines (151 loc) · 8.48 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
import argparse
import datetime
import pandas as pd
import requests
from datetime import datetime, timedelta
import xml.etree.ElementTree as ET
def xml_to_gen_data(xml_data) -> dict:
"""
Parse the XML data of generation into a dictionary of DataFrames, one for each PsrType.
"""
# Define the XML namespace
namespace = {'ns': 'urn:iec62325.351:tc57wg16:451-6:generationloaddocument:3:0'}
# Parse the XML data
root = ET.fromstring(xml_data)
# Get all TimeSeries tags
time_series_tags = root.findall('.//ns:TimeSeries', namespace)
# Initialize a dictionary to hold the data
data = {"StartTime": [], "EndTime": [], "AreaID": [], "UnitName": [], "PsrType": [], "quantity": []}
# Loop over all TimeSeries tags
for ts in time_series_tags:
# Extract PsrType from MktPSRType if it exists
psr_type_tag = ts.find('ns:MktPSRType/ns:psrType', namespace)
psr_type = psr_type_tag.text if psr_type_tag is not None else None
# Extract AreaID and UnitName if they exist
area_id_tag = ts.find('ns:inBiddingZone_Domain.mRID', namespace)
area_id = area_id_tag.text if area_id_tag is not None else None
unit_name_tag = ts.find('ns:quantity_Measure_Unit.name', namespace)
unit_name = unit_name_tag.text if unit_name_tag is not None else None
# Extract the time period start and end if it exists
time_period = ts.find('ns:Period', namespace)
if time_period is not None:
period_start = time_period.find('ns:timeInterval/ns:start', namespace).text
period_end = time_period.find('ns:timeInterval/ns:end', namespace).text
resolution = time_period.find('ns:resolution', namespace).text
# Resolution is PT15M or PT60M
resolution_minutes = int(resolution.replace('PT', '').replace('M', ''))
# Extract the point values
points = time_period.findall('ns:Point', namespace)
for point in points:
position = point.find('ns:position', namespace).text
quantity = point.find('ns:quantity', namespace).text
# Calculate the actual start and end time for each resolution_minutes interval
start_time_interval = datetime.fromisoformat(period_start.replace('Z', '+00:00'))
end_time_interval = start_time_interval + timedelta(minutes=resolution_minutes*(int(position)-1))
start_time_interval = end_time_interval - timedelta(minutes=resolution_minutes)
# Append the StartTime, EndTime, AreaID, UnitName, PsrType, and quantity values to the data dictionary
data["StartTime"].append(start_time_interval.isoformat(timespec='minutes')+'Z')
data["EndTime"].append(end_time_interval.isoformat(timespec='minutes')+'Z')
data["AreaID"].append(area_id)
data["UnitName"].append(unit_name)
data["PsrType"].append(psr_type)
data["quantity"].append(quantity)
# Convert the data dictionary into a pandas DataFrame
df = pd.DataFrame(data)
# Create a separate DataFrame for each PsrType
df_dict = {psr_type: df[df["PsrType"] == psr_type] for psr_type in df["PsrType"].unique()}
return df_dict
def xml_to_load_dataframe(xml_data) -> pd.DataFrame:
"""
Parse the XML data of Load into a pandas DataFrame.
"""
namespace = {'ns': 'urn:iec62325.351:tc57wg16:451-6:generationloaddocument:3:0'}
root = ET.fromstring(xml_data)
data = []
for time_series in root.findall('.//ns:TimeSeries', namespace):
series_id = time_series.find('ns:mRID', namespace).text
business_type = time_series.find('ns:businessType', namespace).text
object_aggregation = time_series.find('ns:objectAggregation', namespace).text
domain_mrid = time_series.find('ns:outBiddingZone_Domain.mRID', namespace).text
unit_name = time_series.find('ns:quantity_Measure_Unit.name', namespace).text
curve_type = time_series.find('ns:curveType', namespace).text
for period in time_series.findall('ns:Period', namespace):
start_time = period.find('ns:timeInterval/ns:start', namespace).text
end_time = period.find('ns:timeInterval/ns:end', namespace).text
resolution = period.find('ns:resolution', namespace).text
# Resolution is PT15M or PT60M
resolution_minutes = int(resolution.replace('PT', '').replace('M', ''))
for point in period.findall('ns:Point', namespace):
position = point.find('ns:position', namespace).text
quantity = point.find('ns:quantity', namespace).text
# calculate the actual start and end time for each resolution_minutes interval
start_time_interval = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
end_time_interval = start_time_interval + timedelta(minutes=resolution_minutes*(int(position)-1))
start_time_interval = end_time_interval - timedelta(minutes=resolution_minutes)
data.append([start_time_interval.isoformat(timespec='minutes')+'Z', end_time_interval.isoformat(timespec='minutes')+'Z',
domain_mrid, unit_name, quantity])
df = pd.DataFrame(data, columns=['StartTime', 'EndTime', 'AreaID', 'UnitName', 'Load'])
return df
def make_url(base_url, params):
query_string = "&".join([f"{k}={v}" for k, v in params.items()])
return f"{base_url}?{query_string}"
def perform_get_request(base_url, params):
url = make_url(base_url, params)
response = requests.get(url)
if response.status_code == 200:
return response.text
else:
return response.content
def get_gen_data_from_entsoe(regions, periodStart='202201010000', periodEnd='202301010000', output_path='./data'):
# TODO: There is a period range limit of 1 day for this API. Process in 1 day chunks if needed
# URL of the RESTful API
url = 'https://web-api.tp.entsoe.eu/api'
# General parameters for the API
params = {
'securityToken': '1d9cd4bd-f8aa-476c-8cc1-3442dc91506d',
'documentType': 'A75',
'processType': 'A16',
'outBiddingZone_Domain': 'FILL_IN', # used for Load data
'in_Domain': 'FILL_IN', # used for Generation data
'periodStart': periodStart, # in the format YYYYMMDDHHMM
'periodEnd': periodEnd # in the format YYYYMMDDHHMM
}
# Loop through the regions and get data for each region
for region, area_code in regions.items():
print(f'Fetching data for {region}...')
params['outBiddingZone_Domain'] = area_code
params['in_Domain'] = area_code
# Use the requests library to get data from the API for the specified time range
response_content = perform_get_request(url, params)
# Response content is a string of XML data
dfs = xml_to_gen_data(response_content)
# Save the dfs to CSV files
for psr_type, df in dfs.items():
# Save the DataFrame to a CSV file
df.to_csv(f'{output_path}/gen_{region}_{psr_type}.csv', index=False)
return
def get_load_data_from_entsoe(regions, periodStart='202201010000', periodEnd='202301010000', output_path='./data'):
# TODO: There is a period range limit of 1 year for this API. Process in 1 year chunks if needed
# URL of the RESTful API
url = 'https://web-api.tp.entsoe.eu/api'
# General parameters for the API
# Refer to https://transparency.entsoe.eu/content/static_content/Static%20content/web%20api/Guide.html#_documenttype
params = {
'securityToken': '1d9cd4bd-f8aa-476c-8cc1-3442dc91506d',
'documentType': 'A65',
'processType': 'A16',
'outBiddingZone_Domain': 'FILL_IN', # used for Load data
'periodStart': periodStart, # in the format YYYYMMDDHHMM
'periodEnd': periodEnd # in the format YYYYMMDDHHMM
}
# Loop through the regions and get data for each region
for region, area_code in regions.items():
print(f'Fetching data for {region}...')
params['outBiddingZone_Domain'] = area_code
# Use the requests library to get data from the API for the specified time range
response_content = perform_get_request(url, params)
# Response content is a string of XML data
df = xml_to_load_dataframe(response_content)
# Save the DataFrame to a CSV file
df.to_csv(f'{output_path}/load_{region}.csv', index=False)
return