Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- `CODEOWNERS` file and dependabot automation [#7](https://github.com/ie3-institute/simonaMarkovLoad/issues/7)
- Added full Markov-model pipeline [#10](https://github.com/ie3-institute/simonaMarkovLoad/issues/10)
- Added GMM feature to project [#12](https://github.com/ie3-institute/simonaMarkovLoad/issues/12)

### Changed
- Compute instantaneous kW from cumulative kWh via 15-minute differencing [#1](https://github.com/ie3-institute/simonaMarkovLoad/issues/1)
127 changes: 110 additions & 17 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,124 @@
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd


from src.markov.buckets import assign_buckets, bucket_id
from src.markov.gmm import fit_gmms, sample_value
from src.markov.transition_counts import build_transition_counts
from src.markov.transitions import build_transition_matrices
from src.markov.transitions import build_transition_matrices
from src.markov.postprocess import apply_dwell_time, two_point_smooth
from src.preprocessing.loader import load_timeseries

df = load_timeseries(normalize=True, discretize=True)
SIM_DAYS = 10
PER_DAY = 96

def _detect_value_col(df: pd.DataFrame) -> str:
for c in ["x", "value", "load", "power", "p_norm", "load_norm"]:
if c in df.columns and np.issubdtype(df[c].dtype, np.number):
return c
raise KeyError("numeric load column missing")


def _simulate_states(
probs: np.ndarray,
start_ts: pd.Timestamp,
start_state: int,
periods: int,
rng: np.random.Generator | None = None,
) -> np.ndarray:
rng = np.random.default_rng() if rng is None else rng

states = np.empty(periods, dtype=int)
s = start_state
for i, t in enumerate(pd.date_range(start_ts, periods=periods, freq="15min")):
b = bucket_id(t)
s = rng.choice(probs.shape[1], p=probs[b, s])
states[i] = s
return states


def _states_to_series(
states: np.ndarray,
gmms,
start_ts: pd.Timestamp,
rng: np.random.Generator | None = None,
) -> pd.Series:
rng = np.random.default_rng() if rng is None else rng
ts = pd.date_range(start_ts, periods=len(states), freq="15min")
xs = np.empty_like(states, dtype=float)
for i, (t, s) in enumerate(zip(ts, states)):
b = bucket_id(t)
xs[i] = sample_value(gmms, b, s, rng=rng)
return pd.Series(xs, index=ts, name="x_sim")



def _plot_simulation_diagnostics(df: pd.DataFrame,
sim: pd.Series,
value_col: str) -> None:
first_day = sim.iloc[:PER_DAY]
plt.figure(figsize=(10, 3))
plt.plot(first_day.index, first_day, marker=".")
plt.title("Simulated power – first day")
plt.ylabel("normalised load x")
plt.tight_layout()
plt.show()

plt.figure(figsize=(6, 4))
plt.hist(df[value_col], bins=50, alpha=0.6, density=True, label="original")
plt.hist(sim, bins=50, alpha=0.6, density=True, label="simulated")
plt.title("Original vs simulated load distribution")
plt.xlabel("normalised load x")
plt.ylabel("density")
plt.legend()
plt.tight_layout()
plt.show()

sim_hr = pd.DataFrame({"x_sim": sim, "hour": sim.index.hour})
plt.figure(figsize=(10, 4))
sim_hr.boxplot(column="x_sim", by="hour", grid=False)
plt.title("Simulated power by hour of day")
plt.xlabel("hour of day")
plt.ylabel("normalised load x")
plt.tight_layout()
plt.show()


def main() -> None:
df = load_timeseries(normalize=True, discretize=True)
if "bucket" not in df.columns:
df = assign_buckets(df)

val_col = _detect_value_col(df)

counts = build_transition_counts(df)
probs = build_transition_matrices(df)

gmms = fit_gmms(df, value_col=val_col, verbose=1,
heartbeat_seconds=60)

counts = build_transition_counts(df)
probs = build_transition_matrices(df, alpha=1.0)
periods = SIM_DAYS * PER_DAY
start_ts = df["timestamp"].min().normalize()
raw_states = _simulate_states(
probs,
start_ts=start_ts,
start_state=int(df["state"].iloc[0]),
periods=periods,
)

print("counts shape :", counts.shape)
print("probs shape :", probs.shape)
states = apply_dwell_time(raw_states, p_extend=0.7)

sim_series = _states_to_series(states, gmms, start_ts)
sim_series = two_point_smooth(sim_series)

active_buckets = np.where(counts.sum(axis=(1, 2)) > 0)[0]
bucket = int(active_buckets[0]) if active_buckets.size else 0
print(f"\nUsing bucket {bucket}")
real_kwh = df.set_index("timestamp")[val_col].resample("D").sum() / 4
sim_kwh = sim_series.resample("D").sum() / 4
scale = df[val_col].mean() / sim_series.mean()
sim_series *= scale

_plot_simulation_diagnostics(df, sim_series, val_col)

print("row sums :", probs[bucket].sum(axis=1))

plt.imshow(probs[bucket], aspect="auto")
plt.title(f"Bucket {bucket} – transition probabilities")
plt.xlabel("state t+1")
plt.ylabel("state t")
plt.colorbar()
plt.tight_layout()
plt.show()
if __name__ == "__main__":
main()
136 changes: 136 additions & 0 deletions src/markov/gmm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import math
from typing import List, Optional, Tuple

import joblib
import numpy as np
import pandas as pd
from sklearn.mixture import GaussianMixture

try:
from tqdm.auto import tqdm
except ImportError:
tqdm = None

from .buckets import NUM_BUCKETS
from .transition_counts import N_STATES

__all__ = ["GaussianBucketModels", "fit_gmms", "sample_value"]

GmmTuple = Tuple[np.ndarray, np.ndarray, np.ndarray]
GaussianBucketModels = List[List[Optional[GmmTuple]]]

_rng = np.random.default_rng()


def _fit_single(
x: np.ndarray,
*,
min_samples: int = 30,
k_candidates: Tuple[int, ...] = (1, 2, 3),
random_state: int | None = None,
) -> Optional[GmmTuple]:
if x.size == 0:
return None
if len(x) < min_samples:
mean = float(np.mean(x))
var = float(max(np.var(x), 1e-5))
return (
np.array([1.0], dtype=float),
np.array([mean], dtype=float),
np.array([var], dtype=float),
)
best_bic = math.inf
best_gmm: GaussianMixture | None = None
for k in k_candidates:
gmm = GaussianMixture(
n_components=k,
covariance_type="spherical",
n_init=1,
random_state=random_state,
).fit(x.reshape(-1, 1))
bic = gmm.bic(x.reshape(-1, 1))
if bic < best_bic:
best_bic = bic
best_gmm = gmm
return (
best_gmm.weights_,
best_gmm.means_.ravel(),
best_gmm.covariances_.ravel(),
)


def fit_gmms(
df: pd.DataFrame,
*,
value_col: str = "x",
bucket_col: str = "bucket",
state_col: str = "state",
min_samples: int = 5,
k_candidates: Tuple[int, ...] = (1, 2, 3),
n_jobs: int = -1,
random_state: int | None = None,
verbose: int = 0,
heartbeat_seconds: int | None = None,
) -> GaussianBucketModels:
if heartbeat_seconds:
import faulthandler

faulthandler.enable()
faulthandler.dump_traceback_later(heartbeat_seconds, repeat=True)

grouped = (
df[[bucket_col, state_col, value_col]]
.groupby([bucket_col, state_col])[value_col]
.apply(list)
.to_dict()
)

tasks = [
((b, s), grouped.get((b, s), []))
for b in range(NUM_BUCKETS)
for s in range(N_STATES)
]

iterable = (
tqdm(tasks, desc="Fitting GMMs", unit="model") if verbose and tqdm else tasks
)

def _worker(item):
(b, s), x_list = item
x = np.asarray(x_list, dtype=float)
return (
b,
s,
_fit_single(
x,
min_samples=min_samples,
k_candidates=k_candidates,
random_state=random_state,
),
)

results = joblib.Parallel(n_jobs=n_jobs, verbose=verbose)(
joblib.delayed(_worker)(task) for task in iterable
)

gmms: GaussianBucketModels = [
[None for _ in range(N_STATES)] for _ in range(NUM_BUCKETS)
]
for b, s, gmm_tuple in results:
gmms[b][s] = gmm_tuple
return gmms


def sample_value(
gmms: GaussianBucketModels,
bucket: int,
state: int,
rng: np.random.Generator | None = None,
) -> float:
gmm = gmms[bucket][state]
if gmm is None:
raise ValueError(f"No GMM trained for bucket {bucket}, state {state}")
weights, means, vars_ = gmm
rng = _rng if rng is None else rng
comp = rng.choice(len(weights), p=weights)
return float(rng.normal(means[comp], math.sqrt(vars_[comp])))
21 changes: 21 additions & 0 deletions src/markov/postprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import numpy as np
import pandas as pd

def apply_dwell_time(states: np.ndarray, p_extend: float = 0.7) -> np.ndarray:
if states.size < 2:
return states

out = states.copy()
hold_next = False
for t in range(1, len(states)):
if hold_next:
out[t] = out[t-1]
hold_next = False
elif states[t] != states[t-1] and np.random.rand() < p_extend:
out[t] = out[t-1]
hold_next = True
return out


def two_point_smooth(series: pd.Series) -> pd.Series:
return 0.5 * (series.shift(1, fill_value=series.iloc[0]) + series)
5 changes: 0 additions & 5 deletions src/markov/transition_counts.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@ def build_transition_counts(
bucket_col: str = "bucket",
dtype=np.uint32,
) -> np.ndarray:
"""
Absolute transition counts:
C[b, i, j] = # of times state_t=i → state_{t+1}=j in bucket b
Shape = (2 304, 10, 10).
"""
df = df.sort_values("timestamp")

s_t = df[state_col].to_numpy(dtype=int)[:-1]
Expand Down
37 changes: 29 additions & 8 deletions src/markov/transitions.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,36 @@
import numpy as np
import pandas as pd
from ._core import _transition_counts

from src.config import CONFIG

from ._core import _transition_counts

alpha = float(CONFIG["model"]["laplace_alpha"])
DEFAULT_MIN_COUNT = 25
DEFAULT_UNIFORM_EPS = 0.25


def build_transition_matrices(
df: pd.DataFrame,
*,
dtype: np.dtype = np.float32,
min_count: int = DEFAULT_MIN_COUNT,
uniform_eps: float = DEFAULT_UNIFORM_EPS,
) -> np.ndarray:


counts: np.ndarray = _transition_counts(df, dtype=np.float64)
_, n_states, _ = counts.shape

row_sum = counts.sum(axis=2, keepdims=True)
mask_empty = (row_sum == 0)
mask_sparse = (row_sum < min_count) & ~mask_empty

if mask_empty.any():
counts[mask_empty.repeat(n_states, axis=2)] = 1.0

if mask_sparse.any():
counts[mask_sparse.repeat(n_states, axis=2)] += uniform_eps / n_states

row_sum = counts.sum(axis=2, keepdims=True)
probs = counts / row_sum

def build_transition_matrices(df: pd.DataFrame, *, dtype=np.float32) -> np.ndarray:
counts = _transition_counts(df, dtype=dtype)
counts += alpha
counts /= counts.sum(axis=2, keepdims=True)
return counts.astype(dtype)
return probs.astype(dtype)
Loading