-
Notifications
You must be signed in to change notification settings - Fork 0
/
spatial_to_rdf.py
182 lines (133 loc) · 6.53 KB
/
spatial_to_rdf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import numpy as np
import polars as pl
import re
import os
from rdflib import Graph, URIRef, Literal, BNode
# from rdflib.namespace import RDF, XSD
from variables import RDF, RDFS, XSD, MFD, OBOE, GEO, KWG_ONT
import gzip
import pyarrow.parquet as pq
from TBox import create_tbox
remove_suffix_numbering = lambda x: re.sub(r'[_\d]+$', "", x)
def convert_s2_id_to_bit_id(s2_hex: str) -> str:
"""
Converts a given S2 ID from hexadecimal format to binary format.
Args:
s2_hex (str): The S2 ID in hexadecimal format.
Returns:
str: The S2 ID converted to binary format with a length of 64 characters.
"""
# Adds 0's at the front until length of id is 16
s2_id = s2_hex.ljust(16, "0")
# Converts Hex to Bit and adds trailing 0's until length is 64
bit_id = np.base_repr(int(s2_id, 16), base=2).rjust(64, "0")
return bit_id
def get_raster_cell_id(r: tuple) -> str:
"""
Returns the raster cell ID for a given row in a raster. A raster cell ID is the combination of the cell's four corners (4 corners x 2 dimensions = 8 entries)
i.e., ll_easting, ll_northing, ul_easting, ul_northing, ur_easting, ur_northing, lr_easting, lr_northing
Args:
r (tuple): A row in the mappings file.
Returns:
str: The raster cell ID.
"""
return f"{int(r[0])}_{int(r[1])}_{int(r[2])}_{int(r[3])}_{int(r[4])}_{int(r[5])}_{int(r[6])}_{int(r[7])}"
def get_literal(literal: str | int | float | bool) -> tuple[str, str]:
"""
Returns a tuple containing the literal value and its corresponding XSD data type.
Parameters:
literal (str, int, float, bool): The literal value to be processed.
Returns:
tuple: A tuple containing the literal value and its corresponding XSD data type.
"""
if isinstance(literal, str):
object_type = XSD.string
elif isinstance(literal, int):
object_type = XSD.integer
elif isinstance(literal, float):
object_type = XSD.decimal
elif isinstance(literal, bool):
object_type = XSD.boolean
else:
print(f"Instance unknown for literal: {literal}. Type: {type(literal)}.")
object_type = XSD.string
return literal, object_type
def raster_values_to_rdf(parquet_folder: str, save_file: str) -> None:
"""Converts the raster files, raster cells and raster cell values to RDF format.
Args:
parquet_folder (_type_): _description_
save_file (_type_): _description_
"""
# Open the file to write the triples
with gzip.open(filename=save_file, mode="at", encoding="utf-8") as triple_file:
# Iterate over the parquet files in the folder
for file in os.listdir(parquet_folder):
df = pl.read_parquet(parquet_folder + file)
G = Graph() # Initialize an empty graph
file_name = file.removesuffix(".parquet")
file_observation = file_name.split(".")[0]
file_observation = remove_suffix_numbering(file_observation)
G.add(triple=(URIRef(MFD + file_name),
URIRef(RDF.type),
URIRef(MFD + "RasterFile")))
for r in df.iter_rows(): # for row in dataframe - r[0] value of row in column 1, r[1] value of row in column 2, etc.
raster_cell = get_raster_cell_id(r)
# measurement = BNode()
measurement = URIRef(MFD + file_name + "_" + raster_cell)
# RasterFile hasMember RasterCell
G.add(triple=(URIRef(MFD + file_name),
URIRef(OBOE + "hasMember"),
URIRef(MFD + raster_cell)))
# RasterCell hasMeasurement Measurement
G.add(triple=(URIRef(MFD + raster_cell),
URIRef(OBOE + "hasMeasurement"),
measurement))
G.add(triple=(URIRef(MFD + raster_cell),
URIRef(RDF.type),
URIRef(MFD + "RasterCell")))
# Measurement hasValue literal
lit, lit_type = get_literal(r[8])
G.add(triple=(measurement,
URIRef(OBOE + "hasValue"),
Literal(lit, datatype=lit_type)))
# Measurement Type Measurement
G.add(triple=(measurement,
URIRef(RDF.type),
URIRef(OBOE + "Measurement")))
# Measurement ContainsMeasurementsOfType MeasurementType
G.add(triple=(measurement,
URIRef(OBOE + "containsMeasurementsOfType"),
URIRef(MFD + file_observation))) # TODO: Consider linking to an external ontology.
# MeasurementType Type MeasurementType
G.add(triple=(URIRef(MFD + file_observation),
URIRef(RDF.type),
URIRef(OBOE + "MeasurementType")))
triple_file.write(G.serialize(format='nt'))
def raster_mappings_to_rdf(parquet_datasets_path_or_folder: str, save_file: str) -> None:
"""Converts the raster-S2 mappings to RDF format.
Args:
parquet_datasets_path_or_folder (str): _description_
save_file (str): _description_
"""
with gzip.open(filename=save_file, mode="at", encoding="utf-8") as triple_file:
df = pq.ParquetDataset(parquet_datasets_path_or_folder).read().to_pandas()
for r in df.iterrows():
G = Graph() # Initialize an empty graph
s2_cells = [convert_s2_id_to_bit_id(hex_id) for hex_id in r[1].iloc[8]]
raster_cell = get_raster_cell_id(list(r[1]))
for s2_cell in s2_cells:
# S2Cell Type KWG:S2Cell
G.add(triple=(URIRef(KWG_ONT + s2_cell),
URIRef(RDF.type),
URIRef(KWG_ONT + "S2Cell")))
# RasterCell Covers S2Cell
G.add(triple=(URIRef(MFD + raster_cell),
URIRef(GEO + "ehCovers"),
URIRef(KWG_ONT + s2_cell)))
triple_file.write(G.serialize(format='nt'))
if __name__ == "__main__":
create_tbox(save_file="tbox.nt.gz")
raster_values_to_rdf(parquet_folder="/projects/mdm/S2Mappings/raster_cells/",
save_file="raster_values.nt.gz")
raster_mappings_to_rdf(parquet_datasets_path_or_folder="/projects/mdm/S2Mappings/corner_mappings/",
save_file="raster_s2_mappings.nt.gz")