-
Notifications
You must be signed in to change notification settings - Fork 15
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
brick example added but not tested fully
- Loading branch information
Showing
6 changed files
with
422 additions
and
1,298 deletions.
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
import sqlite3 | ||
import pandas as pd | ||
|
||
# Step 1: Connect to SQLite database (or create it) | ||
conn = sqlite3.connect("brick_timeseries.db") | ||
cursor = conn.cursor() | ||
|
||
# Step 2: Create tables for timeseries data and metadata | ||
cursor.execute( | ||
""" | ||
CREATE TABLE IF NOT EXISTS TimeseriesData ( | ||
id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
sensor_name TEXT NOT NULL, | ||
timestamp TEXT NOT NULL, | ||
value REAL NOT NULL, | ||
fc1_flag INTEGER DEFAULT 0 -- Add this line to store fault condition 1 flags | ||
) | ||
""" | ||
) | ||
|
||
cursor.execute( | ||
""" | ||
CREATE TABLE IF NOT EXISTS TimeseriesReference ( | ||
id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
timeseries_id TEXT NOT NULL, | ||
stored_at TEXT NOT NULL | ||
) | ||
""" | ||
) | ||
|
||
cursor.execute( | ||
""" | ||
CREATE TABLE IF NOT EXISTS DatabaseStorage ( | ||
id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
label TEXT NOT NULL, | ||
connstring TEXT NOT NULL | ||
) | ||
""" | ||
) | ||
|
||
# Step 3: Insert database metadata (SQLite reference) | ||
cursor.execute( | ||
""" | ||
INSERT INTO DatabaseStorage (label, connstring) | ||
VALUES | ||
('SQLite Timeseries Storage', 'sqlite:///brick_timeseries.db') | ||
""" | ||
) | ||
|
||
# Step 4: Load the CSV data | ||
csv_file = r"C:\Users\bbartling\Documents\WPCRC_Master.csv" | ||
df = pd.read_csv(csv_file) | ||
print("df.columns", df.columns) | ||
|
||
print("Starting step 5") | ||
|
||
# Step 5: Insert CSV data into the TimeseriesData table | ||
for column in df.columns: | ||
for index, row in df.iterrows(): | ||
cursor.execute( | ||
""" | ||
INSERT INTO TimeseriesData (sensor_name, timestamp, value) | ||
VALUES (?, ?, ?) | ||
""", | ||
(column, index, row[column]), | ||
) | ||
print(f"Doing {column} in step 5") | ||
|
||
conn.commit() | ||
|
||
print("Starting step 6") | ||
|
||
# Step 6: Insert timeseries references based on sensor names | ||
for column in df.columns: | ||
cursor.execute( | ||
""" | ||
INSERT INTO TimeseriesReference (timeseries_id, stored_at) | ||
VALUES (?, ?) | ||
""", | ||
(column, "SQLite Timeseries Storage"), | ||
) | ||
|
||
print(f"Doing {column} in step 6") | ||
|
||
conn.commit() | ||
|
||
print("Step 6 is done") | ||
|
||
# Close the connection | ||
conn.close() | ||
|
||
print("SQLite database created and populated with CSV data.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
import sqlite3 | ||
from rdflib import Graph, Literal, Namespace, RDF, URIRef | ||
from rdflib.namespace import RDFS, XSD | ||
|
||
# Step 1: Set up RDF graph | ||
g = Graph() | ||
brick = Namespace("https://brickschema.org/schema/Brick#") | ||
unit = Namespace("http://qudt.org/vocab/unit/") | ||
ref = Namespace("https://brickschema.org/schema/Reference#") | ||
g.bind("brick", brick) | ||
g.bind("unit", unit) | ||
g.bind("ref", ref) | ||
|
||
# Step 2: Connect to SQLite database | ||
conn = sqlite3.connect("brick_timeseries.db") | ||
cursor = conn.cursor() | ||
|
||
# Step 3: Retrieve timeseries metadata from SQLite database | ||
cursor.execute("SELECT timeseries_id, stored_at FROM TimeseriesReference") | ||
timeseries_refs = cursor.fetchall() | ||
|
||
# Define the database URI | ||
database_uri = URIRef("http://example.org/database") | ||
g.add((database_uri, RDF.type, ref.Database)) | ||
g.add( | ||
( | ||
database_uri, | ||
RDFS.label, | ||
Literal("SQLite Timeseries Storage", datatype=XSD.string), | ||
) | ||
) | ||
g.add( | ||
( | ||
database_uri, | ||
URIRef("http://example.org/connstring"), | ||
Literal("sqlite:///brick_timeseries.db", datatype=XSD.string), | ||
) | ||
) | ||
|
||
# Step 4: Build RDF model based on the timeseries references | ||
for timeseries_id, stored_at in timeseries_refs: | ||
sensor_uri = URIRef(f"http://example.org/{timeseries_id.replace(' ', '_')}") | ||
|
||
# Adjust sensor type and unit based on sensor name | ||
if "SaTempSP" in timeseries_id or "SaStatic" in timeseries_id: | ||
if "SPt" in timeseries_id or "SPt" in timeseries_id: # Adjust setpoint type | ||
g.add((sensor_uri, RDF.type, brick.Supply_Air_Static_Pressure_Setpoint)) | ||
g.add((sensor_uri, brick.hasUnit, unit.Inch_Water_Column)) | ||
else: | ||
g.add((sensor_uri, RDF.type, brick.Supply_Air_Static_Pressure_Sensor)) | ||
g.add((sensor_uri, brick.hasUnit, unit.Inch_Water_Column)) | ||
elif "Sa_FanSpeed" in timeseries_id: | ||
g.add((sensor_uri, RDF.type, brick.Supply_Fan_VFD_Speed_Sensor)) | ||
g.add((sensor_uri, brick.hasUnit, unit.Percent)) | ||
else: | ||
# Default case (adjust as needed) | ||
g.add((sensor_uri, RDF.type, brick.Temperature_Sensor)) | ||
g.add( | ||
(sensor_uri, brick.hasUnit, unit.DEG_F) | ||
) # Assuming degrees Fahrenheit, adjust if needed | ||
|
||
timeseries_ref_uri = URIRef( | ||
f"http://example.org/timeseries_{timeseries_id.replace(' ', '_')}" | ||
) | ||
g.add((timeseries_ref_uri, RDF.type, ref.TimeseriesReference)) | ||
g.add( | ||
( | ||
timeseries_ref_uri, | ||
ref.hasTimeseriesId, | ||
Literal(timeseries_id, datatype=XSD.string), | ||
) | ||
) | ||
g.add((timeseries_ref_uri, ref.storedAt, database_uri)) | ||
g.add((sensor_uri, ref.hasExternalReference, timeseries_ref_uri)) | ||
|
||
# Step 5: Serialize the graph to Turtle format | ||
g.serialize("brick_model_with_timeseries.ttl", format="turtle") | ||
|
||
# Close the connection | ||
conn.close() | ||
|
||
print("RDF model created and saved to 'brick_model_with_timeseries.ttl'.") |
174 changes: 174 additions & 0 deletions
174
examples/brick_model_and_sqlite/3_run_query_fc1_brick.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
import sqlite3 | ||
import pandas as pd | ||
from rdflib import Graph, Namespace | ||
|
||
from open_fdd.air_handling_unit.faults.fault_condition_one import FaultConditionOne | ||
|
||
PERCENTAGE_COLS_TO_CONVERT = [ | ||
"Supply_Fan_VFD_Speed_Sensor", # BRICK formatted column name | ||
] | ||
|
||
# Minimal config dict just for fc1 | ||
config_dict = { | ||
"INDEX_COL_NAME": "timestamp", | ||
"DUCT_STATIC_COL": "Supply_Air_Static_Pressure_Sensor", | ||
"DUCT_STATIC_SETPOINT_COL": "Supply_Air_Static_Pressure_Setpoint", | ||
"SUPPLY_VFD_SPEED_COL": "Supply_Fan_VFD_Speed_Sensor", | ||
"VFD_SPEED_PERCENT_ERR_THRES": 0.05, | ||
"VFD_SPEED_PERCENT_MAX": 0.99, | ||
"DUCT_STATIC_INCHES_ERR_THRES": 0.1, | ||
"TROUBLESHOOT_MODE": False, | ||
"ROLLING_WINDOW_SIZE": 10, | ||
} | ||
|
||
|
||
def load_rdf_graph(file_path): | ||
print("Loading RDF graph...") | ||
g = Graph() | ||
g.parse(file_path, format="turtle") | ||
return g | ||
|
||
|
||
def run_sparql_query(graph): | ||
print("Running SPARQL query...") | ||
query = """ | ||
PREFIX brick: <https://brickschema.org/schema/Brick#> | ||
PREFIX ref: <https://brickschema.org/schema/Reference#> | ||
SELECT ?sensor ?sensorType WHERE { | ||
?sensor a ?sensorType . | ||
FILTER (?sensorType IN (brick:Supply_Air_Static_Pressure_Sensor, brick:Supply_Air_Static_Pressure_Setpoint, brick:Supply_Fan_VFD_Speed_Sensor)) | ||
} | ||
""" | ||
return graph.query(query) | ||
|
||
|
||
def extract_sensor_data(query_result): | ||
print("SPARQL query completed. Checking results...") | ||
sensor_data = {} | ||
for row in query_result: | ||
sensor_type = str(row.sensorType).split("#")[-1] | ||
sensor_data[sensor_type] = row.sensor | ||
print(f"Found sensor: {sensor_type} -> {row.sensor}") | ||
return sensor_data | ||
|
||
|
||
def retrieve_timeseries_data(sensor_data, conn): | ||
dfs = [] | ||
for sensor_type, sensor_uri in sensor_data.items(): | ||
sensor_id = sensor_uri.split("/")[-1] | ||
print(f"Querying SQLite for sensor: {sensor_id} of type: {sensor_type}") | ||
sql_query = """ | ||
SELECT timestamp, value | ||
FROM TimeseriesData | ||
WHERE sensor_name = ? | ||
""" | ||
df_sensor = pd.read_sql_query(sql_query, conn, params=(sensor_id,)) | ||
|
||
if df_sensor.empty: | ||
print( | ||
f"No data found for sensor: {sensor_type} with sensor_id: {sensor_id}" | ||
) | ||
else: | ||
print( | ||
f"Data found for sensor: {sensor_type}, number of records: {len(df_sensor)}" | ||
) | ||
df_sensor = df_sensor.rename(columns={"value": sensor_type}) | ||
dfs.append(df_sensor) | ||
|
||
return dfs | ||
|
||
|
||
def combine_dataframes(dfs): | ||
if not dfs: | ||
print("No data found for any sensors.") | ||
return None | ||
|
||
print("Combining DataFrames...") | ||
df_combined = dfs[0].set_index("timestamp") | ||
for df in dfs[1:]: | ||
df_combined = pd.merge( | ||
df_combined, df.set_index("timestamp"), left_index=True, right_index=True | ||
) | ||
|
||
print("The df is combined successfully.") | ||
return df_combined | ||
|
||
|
||
def convert_floats(df, columns): | ||
# This data has floats between 0.0 and 100.0 | ||
# so we need to convert to 0.0 and 1.0 ranges | ||
for column in columns: | ||
df[column] = df[column] / 100.0 | ||
|
||
print(df.head()) | ||
return df | ||
|
||
|
||
def run_fault_one(config_dict, df): | ||
fc1 = FaultConditionOne(config_dict) | ||
df = fc1.apply(df) | ||
print(f"Total faults detected: {df['fc1_flag'].sum()}") | ||
return df | ||
|
||
|
||
def update_fault_flags_in_db(df, conn): | ||
cursor = conn.cursor() | ||
|
||
update_data = [ | ||
(int(row["fc1_flag"]), index, "Supply_Fan_VFD_Speed_Sensor") | ||
for index, row in df.iterrows() | ||
] | ||
|
||
cursor.executemany( | ||
""" | ||
UPDATE TimeseriesData | ||
SET fc1_flag = ? | ||
WHERE timestamp = ? AND sensor_name = ? | ||
""", | ||
update_data, | ||
) | ||
|
||
conn.commit() | ||
print("Database updated with fault flags.") | ||
|
||
|
||
def main(): | ||
# Step 1: Load the RDF graph from the Turtle file | ||
g = load_rdf_graph("brick_model_with_timeseries.ttl") | ||
|
||
# Step 2: Run SPARQL query to find sensors | ||
rdf_result = run_sparql_query(g) | ||
|
||
# Step 3: Extract sensor data from SPARQL query result | ||
sensor_data = extract_sensor_data(rdf_result) | ||
|
||
# Step 4: Connect to SQLite database | ||
print("Connecting to SQLite database...") | ||
conn = sqlite3.connect("brick_timeseries.db") | ||
|
||
# Step 5: Retrieve timeseries data from the database | ||
dfs = retrieve_timeseries_data(sensor_data, conn) | ||
|
||
# Step 6: Combine the retrieved dataframes | ||
df_combined = combine_dataframes(dfs) | ||
print(df_combined.columns) | ||
|
||
if df_combined is not None: | ||
# Step 7: Convert analog outputs to floats | ||
df_combined = convert_floats(df_combined, PERCENTAGE_COLS_TO_CONVERT) | ||
|
||
# Step 8: Run fault condition one | ||
df_combined = run_fault_one(config_dict, df_combined) | ||
|
||
# Step 9: Write the fault flags back to the database | ||
update_fault_flags_in_db(df_combined, conn) | ||
|
||
print("columns: \n", df_combined.columns) | ||
|
||
# Close the database connection | ||
conn.close() | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
Oops, something went wrong.