-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathopen_elevation_request.py
138 lines (111 loc) · 4.36 KB
/
open_elevation_request.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
## IMPORTS
import requests
import json
import time
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from dotenv import dotenv_values
from sqlalchemy import create_engine, text, inspect
#SIGN IN TO PostgreSQL
print ("\n--- DB SIGN IN: ---")
#load values from the .env file
print("reading .env files...")
config = dotenv_values()
# define variables for the login
pg_user = config['POSTGRES_USER']
pg_host = config['POSTGRES_HOST']
pg_port = config['POSTGRES_PORT']
pg_db = config['POSTGRES_DB']
pg_pass = config['POSTGRES_PASS']
pg_schema = config['POSTGRES_SCHEMA']
# Now building the URL with the values from the .env file
url = f'postgresql://{pg_user}:{pg_pass}@{pg_host}:{pg_port}/{pg_db}'
#create engine
engine = create_engine(url, echo=False)
# check if the connection is successfully established or not
inspector = inspect(engine)
schemas = inspector.get_schema_names()
if schemas != []:
print(f"succesfully connected to: \t {pg_db}")
print(f"available schemas: \t\t {", ".join(schemas)}")
else:
print(f'connection to database failed: aborting')
# set default schema defined in .env
if 'pg_schema' in locals():
print(f'setting default schema to: \t {pg_schema}')
with engine.begin() as conn:
result = conn.execute(text(f'SET search_path TO {pg_schema};')) #possible to cascade with ,public; then it will search in my_schema first and then in public
else:
print(f'no default schema set: \t defaulting to public')
pg_schema = "public"
with engine.begin() as conn:
result = conn.execute(text(f'SET search_path TO public;')) #possible to cascade with ,public; then it will search in my_schema first and then in public
tables = inspector.get_table_names(schema=pg_schema)
print(f'tables in default schema: \t {", ".join(tables)}:')
#FETCH CORDINATES
print ("\n--- FETCHING Coordinate grid: ---")
#checkout one of the earthdata tables
earthdata_table = ""
for t in tables:
if "earthdata" in t:
earthdata_table = t
if earthdata_table != "":
break
print(f"fetching from '{earthdata_table}'")
#and read latitude and longitude it into a pd dataframe
earthdata_table = pd.read_sql(sql=text(f'SELECT lon,lat FROM {earthdata_table};'), con=engine)
print(f'retrieved {earthdata_table.shape[0]} pixels')
# parse data into API compatible format
data = {
"locations": [{"latitude": row['lat'], "longitude": row['lon']} for index, row in earthdata_table.iterrows()]
}
print ("\n--- REQUESTING DATA FROM OPEN-ELEVATION: ---")
# Define the API endpoint and headers
url = "https://api.open-elevation.com/api/v1/lookup"
headers = {
"Accept": "application/json",
"Content-Type": "application/json"
}
# Function to make a POST request to the API
def make_request(batch):
try:
response = requests.post(url, headers=headers, data=json.dumps({"locations": batch}))
response.raise_for_status() # Raise an error for bad responses (4xx or 5xx)
return response.json()
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
return None
# Setup for progress bar
total_rows = len(data["locations"])
#total_rows = 200 # for DEBUGGING
bar_length = 30
batch_size = 100 # Adjust based on API limitations
sleep_time = 1 # Adjust dependent on API limitations
locations = data["locations"]
results = []
print(f"API url: \t\t\t{url}")
print(f"request batch size: \t\t{batch_size}")
print(f"sleep time between requests: \t{sleep_time}")
print("")
# Loop through batches with progress bar
for i in range(0, total_rows, batch_size):
batch = locations[i:i + batch_size]
result = make_request(batch)
if result is not None and 'results' in result:
results.extend(result['results'])
else:
print(f"Batch {i // batch_size + 1} failed, skipping...")
# Update and display progress bar
percent_complete = (i + batch_size) / total_rows
bar = '#' * int(bar_length * percent_complete) + '-' * (bar_length - int(bar_length * percent_complete))
print(f"Progress: |{bar}| {percent_complete * 100:.2f}%", end="\r")
# Optional: Add a delay to avoid rate-limiting
time.sleep(sleep_time)
print("\nCompleted!")
print("\n--- UPLOADING TO DATABASE: ---\n")
pd.DataFrame(results).to_sql("elevation", if_exists='replace', index=False, con=engine)
#engine.dispose()
print("Upload successfull")
## PRINT END OF SCRIPT STATEMENT
print("\n--- END ---")