diff --git a/README.md b/README.md index 58e06931..27b762ba 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,7 @@ etc. | [Vertex Registry and Deployer](vertex-registry-and-deployer) | ๐Ÿš€ MLOps | ๐Ÿ“ฆ Model Registry, ๐Ÿš€ Deployment | vertex, gcp, zenml | | [Eurorate Predictor](eurorate-predictor) | ๐Ÿ“Š Data | โฑ๏ธ Time Series, ๐Ÿงน ETL | airflow, bigquery, xgboost | | [RetailForecast](retail-forecast) | ๐Ÿ“Š Data | โฑ๏ธ Time Series, ๐Ÿ“ˆ Forecasting, ๐Ÿ”„ Multi-Model | prophet, zenml, pandas | +| [FloraCast](floracast) | ๐Ÿ“Š Data | โฑ๏ธ Timeseries Prediction, ๐Ÿ“ˆ Forecasting, ๐Ÿ”„ Batch Inference | darts, pytorch, zenml, pandas | | [Bank Subscription Prediction](bank_subscription_prediction) | ๐Ÿ“Š Data | ๐Ÿ’ผ Classification, โš–๏ธ Imbalanced Data, ๐Ÿ” Feature Selection | xgboost, plotly, zenml | | [Credit Scorer](credit-scorer) | ๐Ÿ“Š Data | ๐Ÿ’ฐ Credit Risk, ๐Ÿ“Š Explainability, ๐Ÿ‡ช๐Ÿ‡บ EU AI Act | scikit-learn, fairlearn, zenml | diff --git a/credit-scorer/src/steps/deployment/post_run_annex.py b/credit-scorer/src/steps/deployment/post_run_annex.py index 51633be9..74489179 100644 --- a/credit-scorer/src/steps/deployment/post_run_annex.py +++ b/credit-scorer/src/steps/deployment/post_run_annex.py @@ -175,7 +175,7 @@ def generate_enhanced_annex_iv_html( "name", "Credit Scoring Pipeline" ) pipeline_version = metadata.get("pipeline", {}).get("version", "Unknown") - pipeline_run = metadata.get("pipeline_run", {}) + _ = metadata.get("pipeline_run", {}) stack_info = metadata.get("stack", {}) git_info = metadata.get("git_info", {}) diff --git a/floracast/README.md b/floracast/README.md new file mode 100644 index 00000000..a571f56e --- /dev/null +++ b/floracast/README.md @@ -0,0 +1,343 @@ +# ๐ŸŒธ FloraCast: Building a Forecasting Platform, Not Just Models + +A production-ready MLOps pipeline for **timeseries prediction** and **forecasting** using ZenML and [Darts](https://unit8co.github.io/darts/index.html), designed for enterprise demand and sales forecasting across retail, e-commerce, and supply chain use cases. + +## ๐Ÿš€ Product Overview + +FloraCast demonstrates how to build end-to-end MLOps workflows for time series forecasting. Built with ZenML's robust framework, it showcases enterprise-grade machine learning pipelines that can be deployed in both development and production environments. + +Focus: **Timeseries Prediction** and **Forecasting**. + +### Key Features + +- **End-to-End Timeseries Prediction & Forecasting Pipeline**: From data ingestion to batch inference on a schedule. +- **Darts Integration**: Support for advanced forecasting models like [TFT (Temporal Fusion Transformer)](https://unit8co.github.io/darts/generated_api/darts.models.forecasting.tft_model.html) +- **Custom Materializers**: Production-ready artifact handling with visualizations +- **Model Versioning**: Track and compare different model versions +- **Flexible Configuration**: YAML-based configuration for different environments +- **Cloud Ready**: Built with EKS/GKE/AKS deployment in mind + +## ๐Ÿ’ก How It Works + +### โœˆ๏ธ Pipelines + +FloraCast consists of two main pipelines: + +#### 1. Training Pipeline + +The training pipeline handles the complete ML workflow: + +1. **Data Ingestion** - Loads ecommerce sales data (synthetic by default) +2. **Preprocessing** - Converts to Darts TimeSeries with train/validation split +3. **Model Training** - Trains TFT model with configurable parameters +4. **Evaluation** - Computes SMAPE metrics on validation set + +![Model Evaluation Results](assets/eval_vis.png) +*FloraCast achieves excellent forecasting performance with SMAPE scores under 13, showing AI predictions closely tracking ground truth data.* + +#### 2. Batch Inference Pipeline + +The inference pipeline generates predictions using trained models: + +1. **Data Ingestion** - Loads new data for predictions +2. **Preprocessing** - Applies same transformations as training +3. **Batch Inference** - Generates forecasts and saves to CSV + +![Batch Inference Visualization](assets/batch_inference_timeseries_viz.png) +*Automated batch inference generates future predictions on a schedule, enabling proactive business planning and inventory management.* + +### ๐Ÿ”ง Architecture + +![FloraCast Architecture](assets/architecture.png) +*Complete system architecture showing data flow through ZenML pipelines, from raw data ingestion to model training, evaluation, and automated batch inference.* + +![ZenML Model Control Plane](assets/mcp_floracast.png) +*FloraCast leverages ZenML's Model Control Plane for enterprise-grade model versioning, lineage tracking, and automated deployment workflows.* + +## ๐Ÿ“ฆ Installation + +### Prerequisites + +- Python 3.9+ +- [Deployed ZenML](https://docs.zenml.io/deploying-zenml/deploying-zenml) +- Virtual environment (recommended) + +### Setup + +1. **Clone the repository** (if part of zenml-projects): +```bash +cd zenml-projects/floracast +``` + +2. **Create virtual environment**: +```bash +python -m venv venv +source venv/bin/activate # On Windows: venv\\Scripts\\activate +``` + +3. **Install dependencies**: +```bash +pip install -r requirements.txt +``` + +## โšก Quick Start + +### Local Development + +1. **Run training pipeline**: +```bash +python run.py --config configs/training.yaml --pipeline train +``` + +2. **Run inference pipeline**: +```bash +python run.py --config configs/inference.yaml --pipeline inference +``` + +3. **View results**: +- Check the predictions artifact for predictions +- Use ZenML dashboard to view artifacts and metrics + +## โš™๏ธ Configuration Files + +FloraCast uses semantically named configuration files for different deployment scenarios: + +### Available Configurations + +- **`configs/training.yaml`** - Local development and training pipeline configuration +- **`configs/inference.yaml`** - Batch inference pipeline configuration for production models + +### Customization Options + +Edit the appropriate config file to customize: + +- **Model parameters**: TFT hyperparameters, training epochs +- **Data settings**: Date columns, frequency, validation split +- **Evaluation**: Forecasting horizon, metrics +- **Output**: File paths and formats + +``` +floracast/ +โ”œโ”€โ”€ README.md +โ”œโ”€โ”€ requirements.txt +โ”œโ”€โ”€ .env.example +โ”œโ”€โ”€ configs/ +โ”‚ โ”œโ”€โ”€ training.yaml # Training pipeline config +โ”‚ โ””โ”€โ”€ inference.yaml # Inference pipeline config +โ”œโ”€โ”€ data/ +โ”‚ โ””โ”€โ”€ ecommerce_daily.csv # Example input data +โ”œโ”€โ”€ pipelines/ +โ”‚ โ”œโ”€โ”€ train_forecast_pipeline.py # Training pipeline definition +โ”‚ โ””โ”€โ”€ batch_inference_pipeline.py # Batch inference pipeline definition +โ”œโ”€โ”€ steps/ +โ”‚ โ”œโ”€โ”€ ingest.py # Data ingestion step +โ”‚ โ”œโ”€โ”€ preprocess.py # Preprocessing step (train/val split, scaling) +โ”‚ โ”œโ”€โ”€ train.py # Model training step +โ”‚ โ”œโ”€โ”€ evaluate.py # Model evaluation step +โ”‚ โ”œโ”€โ”€ promote.py # Model registration/promotion step +โ”‚ โ”œโ”€โ”€ batch_infer.py # Batch inference step +โ”‚ โ””โ”€โ”€ load_model.py # Model loading utilities +โ”œโ”€โ”€ materializers/ +โ”‚ โ”œโ”€โ”€ tft_materializer.py # Custom TFTModel materializer +โ”‚ โ””โ”€โ”€ timeseries_materializer.py # Custom TimeSeries materializer +โ”œโ”€โ”€ utils/ +โ”‚ โ””โ”€โ”€ metrics.py # Forecasting metrics (e.g., SMAPE) +โ””โ”€โ”€ run.py # CLI entry point for running pipelines +``` + +### Key Components + +- **Custom Materializers**: Proper serialization for Darts TimeSeries with visualizations +- **Flexible Models**: TFT primary, ExponentialSmoothing fallback +- **Comprehensive Logging**: Detailed pipeline execution tracking +- **Artifact Naming**: Clear, descriptive names for all pipeline outputs + +## ๐Ÿš€ Production Deployment + +### ZenML Azure Stack Setup + +To run FloraCast on Azure with ZenML, set up a ZenML stack backed by Azure services: + +- **Artifact Store**: Azure Blob Storage +- **Container Registry**: Azure Container Registry (ACR) +- **Orchestrator**: Kubernetes Orchestrator targeting AKS +- **Optional**: AzureML Step Operator for managed training; Azure Key Vault for secrets + +Quick start (CLI): + +```bash +# Artifact Store (Azure Blob) +zenml artifact-store register azure_store --flavor=azure \ + --account_name= \ + --container= + +# Container Registry (ACR) +zenml container-registry register azure_acr --flavor=azure \ + --uri= + +# Orchestrator (AKS via Kubernetes) +zenml orchestrator register aks_k8s --flavor=kubernetes \ + --kubernetes_context= \ + --namespace= + +# (Optional) AzureML Step Operator +zenml step-operator register azureml_ops --flavor=azureml \ + --subscription_id= \ + --resource_group= \ + --workspace_name= + +# Compose the stack +zenml stack register azure_aks_stack \ + -a azure_store -c azure_acr -o aks_k8s --set +``` + +Read more: + +- **Set up an MLOps stack on Azure**: [ZenML Azure guide](https://docs.zenml.io/stacks/popular-stacks/azure-guide) +- **Kubernetes Orchestrator (AKS)**: [Docs](https://docs.zenml.io/stacks/stack-components/orchestrators/kubernetes) +- **Azure Blob Artifact Store**: [Docs](https://docs.zenml.io/stacks/stack-components/artifact-stores/azure) +- **Azure Container Registry**: [Docs](https://docs.zenml.io/stacks/stack-components/container-registries/azure) +- **AzureML Step Operator**: [Docs](https://docs.zenml.io/stacks/stack-components/step-operators/azureml) +- **Terraform stack recipe for Azure**: [Hashicorp Registry](https://registry.terraform.io/modules/zenml-io/zenml-stack/azure/latest) + + +## ๐Ÿ”ฌ Advanced Usage + +### Custom Data Sources + +Replace the default ecommerce data: + +1. **Update configuration**: +```yaml +# configs/training.yaml +steps: + ingest_data: + parameters: + data_source: "csv" # or "ecommerce_default" + data_path: "path/to/your/data.csv" + datetime_col: "timestamp" + target_col: "sales" + preprocess_for_training: + parameters: + datetime_col: "timestamp" + target_col: "sales" + freq: "D" + val_ratio: 0.2 +``` + +```yaml +# configs/inference.yaml +steps: + ingest_data: + parameters: + data_source: "csv" + data_path: "path/to/your/data.csv" + datetime_col: "timestamp" + target_col: "sales" + preprocess_for_inference: + parameters: + datetime_col: "timestamp" + target_col: "sales" + freq: "D" +``` + +2. **Ensure data format**: + - DateTime index column + - Numeric target variable + - Daily frequency (or update `freq` parameter) + +### Model Experimentation + +Try different forecasting models by updating the config: + +```yaml +# configs/training.yaml +steps: + train_model: + parameters: + model_name: "ExponentialSmoothing" # Switch from TFT to ES + # Note: ExponentialSmoothing uses default params in code currently +``` + +### Custom Metrics + +Extend `utils/metrics.py` to add additional forecasting metrics: + +```python +from typing import Union +import numpy as np +from darts import TimeSeries + +def mase(actual: Union[TimeSeries, np.ndarray], predicted: Union[TimeSeries, np.ndarray]) -> float: + """Mean Absolute Scaled Error (example stub).""" + # Implement MASE here + return 0.0 +``` + +Update `steps/evaluate.py` to import and route to the new metric: + +```python +from utils.metrics import smape, mase + +# ... inside evaluate(...) +if metric == "smape": + score = smape(actual, predictions_for_eval) +elif metric == "mase": + score = mase(actual, predictions_for_eval) +else: + raise ValueError(f"Unknown metric: {metric}") +``` + +Then configure the metric in your training config (after updating `evaluate`): + +```yaml +# configs/training.yaml +steps: + evaluate: + parameters: + metric: "mase" +``` + +## ๐Ÿค Contributing + +FloraCast follows ZenML best practices and is designed to be extended: + +1. **Add New Models**: Extend `steps/train.py` with additional Darts models +2. **Custom Materializers**: Create materializers for new data types +3. **Additional Metrics**: Expand evaluation capabilities +4. **New Data Sources**: Add support for different input formats + +## ๐Ÿ“ Next Steps + +After running FloraCast successfully: + +1. **Explore ZenML Dashboard**: View pipeline runs, artifacts, and metrics +2. **Experiment with Models**: Try different TFT configurations +3. **Add Real Data**: Replace synthetic data with your forecasting use case +4. **Deploy to Production**: Use AKS configuration for scale + +## ๐Ÿ†˜ Troubleshooting + +### Common Issues + +**TFT Training Fails**: +- Check `add_relative_index: true` in configuration +- Verify sufficient data length (>30 points for input_chunk_length=30) + +**Materializer Errors**: +- Ensure datetime columns are properly formatted +- Check that TimeSeries can be created from your data + +**Memory Issues**: +- Reduce `batch_size` or `hidden_size` in model parameters +- Use ExponentialSmoothing for lighter resource usage + +## ๐Ÿ“š Resources + +- [ZenML Documentation](https://docs.zenml.io/) +- [Darts Documentation](https://unit8co.github.io/darts/) +- [Azure ML Documentation](https://docs.microsoft.com/en-us/azure/machine-learning/) + +--- + +Built with โค๏ธ using [ZenML](https://zenml.io) - *The MLOps Framework for Production AI* \ No newline at end of file diff --git a/floracast/__init__.py b/floracast/__init__.py new file mode 100644 index 00000000..acd09b46 --- /dev/null +++ b/floracast/__init__.py @@ -0,0 +1,3 @@ +"""FloraCast - ZenML Forecasting Template for DFG.""" + +__version__ = "0.1.0" diff --git a/floracast/assets/architecture.png b/floracast/assets/architecture.png new file mode 100644 index 00000000..561b5d6a Binary files /dev/null and b/floracast/assets/architecture.png differ diff --git a/floracast/assets/batch_inference_timeseries_viz.png b/floracast/assets/batch_inference_timeseries_viz.png new file mode 100644 index 00000000..c579d863 Binary files /dev/null and b/floracast/assets/batch_inference_timeseries_viz.png differ diff --git a/floracast/assets/eval_vis.png b/floracast/assets/eval_vis.png new file mode 100644 index 00000000..a6e133dd Binary files /dev/null and b/floracast/assets/eval_vis.png differ diff --git a/floracast/assets/mcp_floracast.png b/floracast/assets/mcp_floracast.png new file mode 100644 index 00000000..8d6e18c9 Binary files /dev/null and b/floracast/assets/mcp_floracast.png differ diff --git a/floracast/configs/inference.yaml b/floracast/configs/inference.yaml new file mode 100644 index 00000000..50c5cce9 --- /dev/null +++ b/floracast/configs/inference.yaml @@ -0,0 +1,26 @@ +# FloraCast Inference Configuration +# This config is used for running batch inference with production models + +settings: + docker: + requirements: requirements.txt + python_package_installer: uv + +model: + name: floracast_tft + version: production # Use production model for inference + +steps: + ingest_data: + parameters: + data_source: "ecommerce_default" + data_path: null + datetime_col: "ds" + target_col: "y" + + batch_inference_predict: + parameters: + datetime_col: "ds" + target_col: "y" + freq: "D" + horizon: 14 \ No newline at end of file diff --git a/floracast/configs/training.yaml b/floracast/configs/training.yaml new file mode 100644 index 00000000..38acf61b --- /dev/null +++ b/floracast/configs/training.yaml @@ -0,0 +1,62 @@ +# FloraCast Training Configuration +# This config is used for local development and training + +settings: + docker: + requirements: requirements.txt + python_package_installer: uv + +model: + name: floracast_tft + license: apache + description: "TFT model for DFG time series forecasting use case" + audience: "DFG forecasting team" + use_cases: > + Demonstrates ZenML MLOps capabilities for time series forecasting using + Temporal Fusion Transformer with custom Darts materializers and proper + artifact management for production forecasting workflows. + ethics: "No ethical concerns - synthetic ecommerce data for demonstration" + tags: + - forecasting + - tft + - darts + - time-series + - production + - dfg + +steps: + ingest_data: + parameters: + data_source: "ecommerce_default" + data_path: null + datetime_col: "ds" + target_col: "y" + + preprocess_data: + parameters: + datetime_col: "ds" + target_col: "y" + freq: "D" + val_ratio: 0.2 + + train_model: + parameters: + input_chunk_length: 14 # Longer input for better pattern recognition + output_chunk_length: 14 # 2-week forecasting horizon for impressive demo + hidden_size: 16 # Smaller hidden size to prevent instability + lstm_layers: 1 # Single layer + num_attention_heads: 1 # Single head to prevent complexity issues + dropout: 0.0 # No dropout to eliminate regularization issues + batch_size: 4 # Small batch size that works with data + n_epochs: 5 # Few epochs + random_state: 42 + add_relative_index: true # Required for TFT - generates future covariates + enable_progress_bar: true + enable_model_summary: true + learning_rate: 0.001 # Standard learning rate that works + weight_decay: 0.0 # No weight decay to eliminate regularization issues + + evaluate: + parameters: + horizon: 14 # Match updated output_chunk_length - 2 weeks + metric: "smape" diff --git a/floracast/data/ecommerce_daily.csv b/floracast/data/ecommerce_daily.csv new file mode 100644 index 00000000..ae3f7a93 --- /dev/null +++ b/floracast/data/ecommerce_daily.csv @@ -0,0 +1,731 @@ +ds,y +2023-08-20 16:20:07.151765,67 +2023-08-21 16:20:07.151765,68 +2023-08-22 16:20:07.151765,76 +2023-08-23 16:20:07.151765,86 +2023-08-24 16:20:07.151765,82 +2023-08-25 16:20:07.151765,104 +2023-08-26 16:20:07.151765,105 +2023-08-27 16:20:07.151765,71 +2023-08-28 16:20:07.151765,82 +2023-08-29 16:20:07.151765,90 +2023-08-30 16:20:07.151765,91 +2023-08-31 16:20:07.151765,95 +2023-09-01 16:20:07.151765,128 +2023-09-02 16:20:07.151765,107 +2023-09-03 16:20:07.151765,66 +2023-09-04 16:20:07.151765,63 +2023-09-05 16:20:07.151765,66 +2023-09-06 16:20:07.151765,78 +2023-09-07 16:20:07.151765,78 +2023-09-08 16:20:07.151765,99 +2023-09-09 16:20:07.151765,106 +2023-09-10 16:20:07.151765,67 +2023-09-11 16:20:07.151765,72 +2023-09-12 16:20:07.151765,68 +2023-09-13 16:20:07.151765,75 +2023-09-14 16:20:07.151765,84 +2023-09-15 16:20:07.151765,104 +2023-09-16 16:20:07.151765,103 +2023-09-17 16:20:07.151765,65 +2023-09-18 16:20:07.151765,70 +2023-09-19 16:20:07.151765,73 +2023-09-20 16:20:07.151765,91 +2023-09-21 16:20:07.151765,90 +2023-09-22 16:20:07.151765,108 +2023-09-23 16:20:07.151765,108 +2023-09-24 16:20:07.151765,64 +2023-09-25 16:20:07.151765,74 +2023-09-26 16:20:07.151765,68 +2023-09-27 16:20:07.151765,74 +2023-09-28 16:20:07.151765,105 +2023-09-29 16:20:07.151765,143 +2023-09-30 16:20:07.151765,132 +2023-10-01 16:20:07.151765,87 +2023-10-02 16:20:07.151765,91 +2023-10-03 16:20:07.151765,90 +2023-10-04 16:20:07.151765,80 +2023-10-05 16:20:07.151765,87 +2023-10-06 16:20:07.151765,124 +2023-10-07 16:20:07.151765,114 +2023-10-08 16:20:07.151765,65 +2023-10-09 16:20:07.151765,78 +2023-10-10 16:20:07.151765,82 +2023-10-11 16:20:07.151765,85 +2023-10-12 16:20:07.151765,97 +2023-10-13 16:20:07.151765,130 +2023-10-14 16:20:07.151765,122 +2023-10-15 16:20:07.151765,74 +2023-10-16 16:20:07.151765,79 +2023-10-17 16:20:07.151765,88 +2023-10-18 16:20:07.151765,98 +2023-10-19 16:20:07.151765,97 +2023-10-20 16:20:07.151765,126 +2023-10-21 16:20:07.151765,112 +2023-10-22 16:20:07.151765,70 +2023-10-23 16:20:07.151765,86 +2023-10-24 16:20:07.151765,98 +2023-10-25 16:20:07.151765,97 +2023-10-26 16:20:07.151765,107 +2023-10-27 16:20:07.151765,135 +2023-10-28 16:20:07.151765,144 +2023-10-29 16:20:07.151765,99 +2023-10-30 16:20:07.151765,113 +2023-10-31 16:20:07.151765,114 +2023-11-01 16:20:07.151765,127 +2023-11-02 16:20:07.151765,113 +2023-11-03 16:20:07.151765,164 +2023-11-04 16:20:07.151765,127 +2023-11-05 16:20:07.151765,83 +2023-11-06 16:20:07.151765,90 +2023-11-07 16:20:07.151765,84 +2023-11-08 16:20:07.151765,97 +2023-11-09 16:20:07.151765,108 +2023-11-10 16:20:07.151765,148 +2023-11-11 16:20:07.151765,129 +2023-11-12 16:20:07.151765,82 +2023-11-13 16:20:07.151765,88 +2023-11-14 16:20:07.151765,102 +2023-11-15 16:20:07.151765,107 +2023-11-16 16:20:07.151765,108 +2023-11-17 16:20:07.151765,147 +2023-11-18 16:20:07.151765,135 +2023-11-19 16:20:07.151765,95 +2023-11-20 16:20:07.151765,93 +2023-11-21 16:20:07.151765,99 +2023-11-22 16:20:07.151765,105 +2023-11-23 16:20:07.151765,105 +2023-11-24 16:20:07.151765,148 +2023-11-25 16:20:07.151765,139 +2023-11-26 16:20:07.151765,92 +2023-11-27 16:20:07.151765,97 +2023-11-28 16:20:07.151765,117 +2023-11-29 16:20:07.151765,128 +2023-11-30 16:20:07.151765,137 +2023-12-01 16:20:07.151765,178 +2023-12-02 16:20:07.151765,168 +2023-12-03 16:20:07.151765,115 +2023-12-04 16:20:07.151765,112 +2023-12-05 16:20:07.151765,112 +2023-12-06 16:20:07.151765,117 +2023-12-07 16:20:07.151765,121 +2023-12-08 16:20:07.151765,147 +2023-12-09 16:20:07.151765,142 +2023-12-10 16:20:07.151765,97 +2023-12-11 16:20:07.151765,117 +2023-12-12 16:20:07.151765,113 +2023-12-13 16:20:07.151765,120 +2023-12-14 16:20:07.151765,124 +2023-12-15 16:20:07.151765,155 +2023-12-16 16:20:07.151765,154 +2023-12-17 16:20:07.151765,105 +2023-12-18 16:20:07.151765,113 +2023-12-19 16:20:07.151765,110 +2023-12-20 16:20:07.151765,127 +2023-12-21 16:20:07.151765,121 +2023-12-22 16:20:07.151765,167 +2023-12-23 16:20:07.151765,166 +2023-12-24 16:20:07.151765,101 +2023-12-25 16:20:07.151765,105 +2023-12-26 16:20:07.151765,115 +2023-12-27 16:20:07.151765,120 +2023-12-28 16:20:07.151765,146 +2023-12-29 16:20:07.151765,201 +2023-12-30 16:20:07.151765,181 +2023-12-31 16:20:07.151765,126 +2024-01-01 16:20:07.151765,129 +2024-01-02 16:20:07.151765,149 +2024-01-03 16:20:07.151765,148 +2024-01-04 16:20:07.151765,130 +2024-01-05 16:20:07.151765,177 +2024-01-06 16:20:07.151765,154 +2024-01-07 16:20:07.151765,106 +2024-01-08 16:20:07.151765,121 +2024-01-09 16:20:07.151765,114 +2024-01-10 16:20:07.151765,127 +2024-01-11 16:20:07.151765,136 +2024-01-12 16:20:07.151765,181 +2024-01-13 16:20:07.151765,157 +2024-01-14 16:20:07.151765,100 +2024-01-15 16:20:07.151765,116 +2024-01-16 16:20:07.151765,125 +2024-01-17 16:20:07.151765,132 +2024-01-18 16:20:07.151765,140 +2024-01-19 16:20:07.151765,176 +2024-01-20 16:20:07.151765,166 +2024-01-21 16:20:07.151765,112 +2024-01-22 16:20:07.151765,114 +2024-01-23 16:20:07.151765,134 +2024-01-24 16:20:07.151765,138 +2024-01-25 16:20:07.151765,134 +2024-01-26 16:20:07.151765,184 +2024-01-27 16:20:07.151765,163 +2024-01-28 16:20:07.151765,138 +2024-01-29 16:20:07.151765,151 +2024-01-30 16:20:07.151765,150 +2024-01-31 16:20:07.151765,165 +2024-02-01 16:20:07.151765,173 +2024-02-02 16:20:07.151765,227 +2024-02-03 16:20:07.151765,217 +2024-02-04 16:20:07.151765,116 +2024-02-05 16:20:07.151765,117 +2024-02-06 16:20:07.151765,122 +2024-02-07 16:20:07.151765,129 +2024-02-08 16:20:07.151765,140 +2024-02-09 16:20:07.151765,187 +2024-02-10 16:20:07.151765,174 +2024-02-11 16:20:07.151765,120 +2024-02-12 16:20:07.151765,124 +2024-02-13 16:20:07.151765,138 +2024-02-14 16:20:07.151765,138 +2024-02-15 16:20:07.151765,160 +2024-02-16 16:20:07.151765,196 +2024-02-17 16:20:07.151765,171 +2024-02-18 16:20:07.151765,109 +2024-02-19 16:20:07.151765,124 +2024-02-20 16:20:07.151765,129 +2024-02-21 16:20:07.151765,141 +2024-02-22 16:20:07.151765,149 +2024-02-23 16:20:07.151765,189 +2024-02-24 16:20:07.151765,170 +2024-02-25 16:20:07.151765,106 +2024-02-26 16:20:07.151765,118 +2024-02-27 16:20:07.151765,134 +2024-02-28 16:20:07.151765,168 +2024-02-29 16:20:07.151765,169 +2024-03-01 16:20:07.151765,227 +2024-03-02 16:20:07.151765,212 +2024-03-03 16:20:07.151765,136 +2024-03-04 16:20:07.151765,123 +2024-03-05 16:20:07.151765,132 +2024-03-06 16:20:07.151765,132 +2024-03-07 16:20:07.151765,146 +2024-03-08 16:20:07.151765,193 +2024-03-09 16:20:07.151765,182 +2024-03-10 16:20:07.151765,125 +2024-03-11 16:20:07.151765,119 +2024-03-12 16:20:07.151765,125 +2024-03-13 16:20:07.151765,140 +2024-03-14 16:20:07.151765,149 +2024-03-15 16:20:07.151765,194 +2024-03-16 16:20:07.151765,198 +2024-03-17 16:20:07.151765,127 +2024-03-18 16:20:07.151765,133 +2024-03-19 16:20:07.151765,140 +2024-03-20 16:20:07.151765,145 +2024-03-21 16:20:07.151765,146 +2024-03-22 16:20:07.151765,194 +2024-03-23 16:20:07.151765,172 +2024-03-24 16:20:07.151765,114 +2024-03-25 16:20:07.151765,120 +2024-03-26 16:20:07.151765,131 +2024-03-27 16:20:07.151765,151 +2024-03-28 16:20:07.151765,168 +2024-03-29 16:20:07.151765,229 +2024-03-30 16:20:07.151765,201 +2024-03-31 16:20:07.151765,134 +2024-04-01 16:20:07.151765,153 +2024-04-02 16:20:07.151765,159 +2024-04-03 16:20:07.151765,160 +2024-04-04 16:20:07.151765,139 +2024-04-05 16:20:07.151765,191 +2024-04-06 16:20:07.151765,170 +2024-04-07 16:20:07.151765,116 +2024-04-08 16:20:07.151765,123 +2024-04-09 16:20:07.151765,126 +2024-04-10 16:20:07.151765,148 +2024-04-11 16:20:07.151765,151 +2024-04-12 16:20:07.151765,178 +2024-04-13 16:20:07.151765,171 +2024-04-14 16:20:07.151765,111 +2024-04-15 16:20:07.151765,125 +2024-04-16 16:20:07.151765,126 +2024-04-17 16:20:07.151765,134 +2024-04-18 16:20:07.151765,145 +2024-04-19 16:20:07.151765,192 +2024-04-20 16:20:07.151765,166 +2024-04-21 16:20:07.151765,111 +2024-04-22 16:20:07.151765,117 +2024-04-23 16:20:07.151765,123 +2024-04-24 16:20:07.151765,143 +2024-04-25 16:20:07.151765,147 +2024-04-26 16:20:07.151765,179 +2024-04-27 16:20:07.151765,173 +2024-04-28 16:20:07.151765,149 +2024-04-29 16:20:07.151765,154 +2024-04-30 16:20:07.151765,147 +2024-05-01 16:20:07.151765,156 +2024-05-02 16:20:07.151765,175 +2024-05-03 16:20:07.151765,217 +2024-05-04 16:20:07.151765,170 +2024-05-05 16:20:07.151765,117 +2024-05-06 16:20:07.151765,115 +2024-05-07 16:20:07.151765,124 +2024-05-08 16:20:07.151765,114 +2024-05-09 16:20:07.151765,128 +2024-05-10 16:20:07.151765,176 +2024-05-11 16:20:07.151765,158 +2024-05-12 16:20:07.151765,117 +2024-05-13 16:20:07.151765,112 +2024-05-14 16:20:07.151765,120 +2024-05-15 16:20:07.151765,131 +2024-05-16 16:20:07.151765,146 +2024-05-17 16:20:07.151765,173 +2024-05-18 16:20:07.151765,170 +2024-05-19 16:20:07.151765,111 +2024-05-20 16:20:07.151765,111 +2024-05-21 16:20:07.151765,124 +2024-05-22 16:20:07.151765,131 +2024-05-23 16:20:07.151765,134 +2024-05-24 16:20:07.151765,177 +2024-05-25 16:20:07.151765,161 +2024-05-26 16:20:07.151765,109 +2024-05-27 16:20:07.151765,119 +2024-05-28 16:20:07.151765,156 +2024-05-29 16:20:07.151765,150 +2024-05-30 16:20:07.151765,173 +2024-05-31 16:20:07.151765,203 +2024-06-01 16:20:07.151765,191 +2024-06-02 16:20:07.151765,131 +2024-06-03 16:20:07.151765,139 +2024-06-04 16:20:07.151765,118 +2024-06-05 16:20:07.151765,125 +2024-06-06 16:20:07.151765,130 +2024-06-07 16:20:07.151765,169 +2024-06-08 16:20:07.151765,163 +2024-06-09 16:20:07.151765,109 +2024-06-10 16:20:07.151765,110 +2024-06-11 16:20:07.151765,124 +2024-06-12 16:20:07.151765,129 +2024-06-13 16:20:07.151765,138 +2024-06-14 16:20:07.151765,177 +2024-06-15 16:20:07.151765,155 +2024-06-16 16:20:07.151765,101 +2024-06-17 16:20:07.151765,115 +2024-06-18 16:20:07.151765,123 +2024-06-19 16:20:07.151765,126 +2024-06-20 16:20:07.151765,132 +2024-06-21 16:20:07.151765,178 +2024-06-22 16:20:07.151765,156 +2024-06-23 16:20:07.151765,107 +2024-06-24 16:20:07.151765,111 +2024-06-25 16:20:07.151765,116 +2024-06-26 16:20:07.151765,129 +2024-06-27 16:20:07.151765,136 +2024-06-28 16:20:07.151765,210 +2024-06-29 16:20:07.151765,196 +2024-06-30 16:20:07.151765,127 +2024-07-01 16:20:07.151765,137 +2024-07-02 16:20:07.151765,140 +2024-07-03 16:20:07.151765,149 +2024-07-04 16:20:07.151765,129 +2024-07-05 16:20:07.151765,169 +2024-07-06 16:20:07.151765,158 +2024-07-07 16:20:07.151765,100 +2024-07-08 16:20:07.151765,120 +2024-07-09 16:20:07.151765,113 +2024-07-10 16:20:07.151765,115 +2024-07-11 16:20:07.151765,133 +2024-07-12 16:20:07.151765,173 +2024-07-13 16:20:07.151765,159 +2024-07-14 16:20:07.151765,108 +2024-07-15 16:20:07.151765,110 +2024-07-16 16:20:07.151765,111 +2024-07-17 16:20:07.151765,121 +2024-07-18 16:20:07.151765,124 +2024-07-19 16:20:07.151765,171 +2024-07-20 16:20:07.151765,154 +2024-07-21 16:20:07.151765,98 +2024-07-22 16:20:07.151765,106 +2024-07-23 16:20:07.151765,117 +2024-07-24 16:20:07.151765,119 +2024-07-25 16:20:07.151765,123 +2024-07-26 16:20:07.151765,166 +2024-07-27 16:20:07.151765,155 +2024-07-28 16:20:07.151765,120 +2024-07-29 16:20:07.151765,127 +2024-07-30 16:20:07.151765,139 +2024-07-31 16:20:07.151765,138 +2024-08-01 16:20:07.151765,144 +2024-08-02 16:20:07.151765,193 +2024-08-03 16:20:07.151765,181 +2024-08-04 16:20:07.151765,103 +2024-08-05 16:20:07.151765,117 +2024-08-06 16:20:07.151765,123 +2024-08-07 16:20:07.151765,123 +2024-08-08 16:20:07.151765,129 +2024-08-09 16:20:07.151765,162 +2024-08-10 16:20:07.151765,153 +2024-08-11 16:20:07.151765,101 +2024-08-12 16:20:07.151765,111 +2024-08-13 16:20:07.151765,112 +2024-08-14 16:20:07.151765,124 +2024-08-15 16:20:07.151765,138 +2024-08-16 16:20:07.151765,170 +2024-08-17 16:20:07.151765,158 +2024-08-18 16:20:07.151765,108 +2024-08-19 16:20:07.151765,109 +2024-08-20 16:20:07.151765,118 +2024-08-21 16:20:07.151765,124 +2024-08-22 16:20:07.151765,131 +2024-08-23 16:20:07.151765,165 +2024-08-24 16:20:07.151765,155 +2024-08-25 16:20:07.151765,107 +2024-08-26 16:20:07.151765,120 +2024-08-27 16:20:07.151765,126 +2024-08-28 16:20:07.151765,164 +2024-08-29 16:20:07.151765,158 +2024-08-30 16:20:07.151765,210 +2024-08-31 16:20:07.151765,192 +2024-09-01 16:20:07.151765,140 +2024-09-02 16:20:07.151765,134 +2024-09-03 16:20:07.151765,138 +2024-09-04 16:20:07.151765,121 +2024-09-05 16:20:07.151765,119 +2024-09-06 16:20:07.151765,166 +2024-09-07 16:20:07.151765,154 +2024-09-08 16:20:07.151765,106 +2024-09-09 16:20:07.151765,115 +2024-09-10 16:20:07.151765,132 +2024-09-11 16:20:07.151765,136 +2024-09-12 16:20:07.151765,134 +2024-09-13 16:20:07.151765,170 +2024-09-14 16:20:07.151765,163 +2024-09-15 16:20:07.151765,101 +2024-09-16 16:20:07.151765,123 +2024-09-17 16:20:07.151765,131 +2024-09-18 16:20:07.151765,129 +2024-09-19 16:20:07.151765,127 +2024-09-20 16:20:07.151765,183 +2024-09-21 16:20:07.151765,165 +2024-09-22 16:20:07.151765,117 +2024-09-23 16:20:07.151765,110 +2024-09-24 16:20:07.151765,119 +2024-09-25 16:20:07.151765,130 +2024-09-26 16:20:07.151765,139 +2024-09-27 16:20:07.151765,179 +2024-09-28 16:20:07.151765,204 +2024-09-29 16:20:07.151765,129 +2024-09-30 16:20:07.151765,141 +2024-10-01 16:20:07.151765,152 +2024-10-02 16:20:07.151765,164 +2024-10-03 16:20:07.151765,175 +2024-10-04 16:20:07.151765,180 +2024-10-05 16:20:07.151765,161 +2024-10-06 16:20:07.151765,118 +2024-10-07 16:20:07.151765,125 +2024-10-08 16:20:07.151765,126 +2024-10-09 16:20:07.151765,144 +2024-10-10 16:20:07.151765,147 +2024-10-11 16:20:07.151765,196 +2024-10-12 16:20:07.151765,177 +2024-10-13 16:20:07.151765,129 +2024-10-14 16:20:07.151765,137 +2024-10-15 16:20:07.151765,134 +2024-10-16 16:20:07.151765,145 +2024-10-17 16:20:07.151765,153 +2024-10-18 16:20:07.151765,201 +2024-10-19 16:20:07.151765,175 +2024-10-20 16:20:07.151765,122 +2024-10-21 16:20:07.151765,133 +2024-10-22 16:20:07.151765,126 +2024-10-23 16:20:07.151765,133 +2024-10-24 16:20:07.151765,136 +2024-10-25 16:20:07.151765,190 +2024-10-26 16:20:07.151765,184 +2024-10-27 16:20:07.151765,130 +2024-10-28 16:20:07.151765,158 +2024-10-29 16:20:07.151765,175 +2024-10-30 16:20:07.151765,170 +2024-10-31 16:20:07.151765,173 +2024-11-01 16:20:07.151765,237 +2024-11-02 16:20:07.151765,223 +2024-11-03 16:20:07.151765,149 +2024-11-04 16:20:07.151765,120 +2024-11-05 16:20:07.151765,136 +2024-11-06 16:20:07.151765,140 +2024-11-07 16:20:07.151765,158 +2024-11-08 16:20:07.151765,207 +2024-11-09 16:20:07.151765,185 +2024-11-10 16:20:07.151765,122 +2024-11-11 16:20:07.151765,127 +2024-11-12 16:20:07.151765,141 +2024-11-13 16:20:07.151765,156 +2024-11-14 16:20:07.151765,156 +2024-11-15 16:20:07.151765,210 +2024-11-16 16:20:07.151765,191 +2024-11-17 16:20:07.151765,124 +2024-11-18 16:20:07.151765,135 +2024-11-19 16:20:07.151765,140 +2024-11-20 16:20:07.151765,150 +2024-11-21 16:20:07.151765,155 +2024-11-22 16:20:07.151765,222 +2024-11-23 16:20:07.151765,200 +2024-11-24 16:20:07.151765,129 +2024-11-25 16:20:07.151765,141 +2024-11-26 16:20:07.151765,149 +2024-11-27 16:20:07.151765,156 +2024-11-28 16:20:07.151765,203 +2024-11-29 16:20:07.151765,266 +2024-11-30 16:20:07.151765,240 +2024-12-01 16:20:07.151765,158 +2024-12-02 16:20:07.151765,169 +2024-12-03 16:20:07.151765,169 +2024-12-04 16:20:07.151765,148 +2024-12-05 16:20:07.151765,174 +2024-12-06 16:20:07.151765,232 +2024-12-07 16:20:07.151765,207 +2024-12-08 16:20:07.151765,141 +2024-12-09 16:20:07.151765,149 +2024-12-10 16:20:07.151765,173 +2024-12-11 16:20:07.151765,176 +2024-12-12 16:20:07.151765,176 +2024-12-13 16:20:07.151765,221 +2024-12-14 16:20:07.151765,199 +2024-12-15 16:20:07.151765,138 +2024-12-16 16:20:07.151765,144 +2024-12-17 16:20:07.151765,148 +2024-12-18 16:20:07.151765,161 +2024-12-19 16:20:07.151765,168 +2024-12-20 16:20:07.151765,237 +2024-12-21 16:20:07.151765,220 +2024-12-22 16:20:07.151765,144 +2024-12-23 16:20:07.151765,160 +2024-12-24 16:20:07.151765,164 +2024-12-25 16:20:07.151765,166 +2024-12-26 16:20:07.151765,187 +2024-12-27 16:20:07.151765,239 +2024-12-28 16:20:07.151765,255 +2024-12-29 16:20:07.151765,171 +2024-12-30 16:20:07.151765,179 +2024-12-31 16:20:07.151765,186 +2025-01-01 16:20:07.151765,210 +2025-01-02 16:20:07.151765,230 +2025-01-03 16:20:07.151765,280 +2025-01-04 16:20:07.151765,221 +2025-01-05 16:20:07.151765,143 +2025-01-06 16:20:07.151765,152 +2025-01-07 16:20:07.151765,161 +2025-01-08 16:20:07.151765,169 +2025-01-09 16:20:07.151765,183 +2025-01-10 16:20:07.151765,236 +2025-01-11 16:20:07.151765,223 +2025-01-12 16:20:07.151765,148 +2025-01-13 16:20:07.151765,157 +2025-01-14 16:20:07.151765,162 +2025-01-15 16:20:07.151765,172 +2025-01-16 16:20:07.151765,190 +2025-01-17 16:20:07.151765,247 +2025-01-18 16:20:07.151765,221 +2025-01-19 16:20:07.151765,150 +2025-01-20 16:20:07.151765,164 +2025-01-21 16:20:07.151765,162 +2025-01-22 16:20:07.151765,180 +2025-01-23 16:20:07.151765,186 +2025-01-24 16:20:07.151765,249 +2025-01-25 16:20:07.151765,224 +2025-01-26 16:20:07.151765,141 +2025-01-27 16:20:07.151765,149 +2025-01-28 16:20:07.151765,202 +2025-01-29 16:20:07.151765,218 +2025-01-30 16:20:07.151765,224 +2025-01-31 16:20:07.151765,301 +2025-02-01 16:20:07.151765,267 +2025-02-02 16:20:07.151765,181 +2025-02-03 16:20:07.151765,188 +2025-02-04 16:20:07.151765,167 +2025-02-05 16:20:07.151765,181 +2025-02-06 16:20:07.151765,187 +2025-02-07 16:20:07.151765,247 +2025-02-08 16:20:07.151765,236 +2025-02-09 16:20:07.151765,153 +2025-02-10 16:20:07.151765,168 +2025-02-11 16:20:07.151765,169 +2025-02-12 16:20:07.151765,185 +2025-02-13 16:20:07.151765,202 +2025-02-14 16:20:07.151765,241 +2025-02-15 16:20:07.151765,225 +2025-02-16 16:20:07.151765,156 +2025-02-17 16:20:07.151765,164 +2025-02-18 16:20:07.151765,177 +2025-02-19 16:20:07.151765,182 +2025-02-20 16:20:07.151765,195 +2025-02-21 16:20:07.151765,253 +2025-02-22 16:20:07.151765,241 +2025-02-23 16:20:07.151765,159 +2025-02-24 16:20:07.151765,169 +2025-02-25 16:20:07.151765,174 +2025-02-26 16:20:07.151765,183 +2025-02-27 16:20:07.151765,192 +2025-02-28 16:20:07.151765,307 +2025-03-01 16:20:07.151765,280 +2025-03-02 16:20:07.151765,189 +2025-03-03 16:20:07.151765,212 +2025-03-04 16:20:07.151765,185 +2025-03-05 16:20:07.151765,187 +2025-03-06 16:20:07.151765,203 +2025-03-07 16:20:07.151765,255 +2025-03-08 16:20:07.151765,224 +2025-03-09 16:20:07.151765,148 +2025-03-10 16:20:07.151765,154 +2025-03-11 16:20:07.151765,171 +2025-03-12 16:20:07.151765,185 +2025-03-13 16:20:07.151765,205 +2025-03-14 16:20:07.151765,260 +2025-03-15 16:20:07.151765,236 +2025-03-16 16:20:07.151765,162 +2025-03-17 16:20:07.151765,156 +2025-03-18 16:20:07.151765,175 +2025-03-19 16:20:07.151765,190 +2025-03-20 16:20:07.151765,189 +2025-03-21 16:20:07.151765,259 +2025-03-22 16:20:07.151765,239 +2025-03-23 16:20:07.151765,155 +2025-03-24 16:20:07.151765,170 +2025-03-25 16:20:07.151765,190 +2025-03-26 16:20:07.151765,191 +2025-03-27 16:20:07.151765,199 +2025-03-28 16:20:07.151765,304 +2025-03-29 16:20:07.151765,277 +2025-03-30 16:20:07.151765,191 +2025-03-31 16:20:07.151765,196 +2025-04-01 16:20:07.151765,210 +2025-04-02 16:20:07.151765,220 +2025-04-03 16:20:07.151765,236 +2025-04-04 16:20:07.151765,256 +2025-04-05 16:20:07.151765,241 +2025-04-06 16:20:07.151765,155 +2025-04-07 16:20:07.151765,164 +2025-04-08 16:20:07.151765,169 +2025-04-09 16:20:07.151765,180 +2025-04-10 16:20:07.151765,195 +2025-04-11 16:20:07.151765,257 +2025-04-12 16:20:07.151765,229 +2025-04-13 16:20:07.151765,159 +2025-04-14 16:20:07.151765,173 +2025-04-15 16:20:07.151765,179 +2025-04-16 16:20:07.151765,196 +2025-04-17 16:20:07.151765,193 +2025-04-18 16:20:07.151765,244 +2025-04-19 16:20:07.151765,220 +2025-04-20 16:20:07.151765,159 +2025-04-21 16:20:07.151765,169 +2025-04-22 16:20:07.151765,175 +2025-04-23 16:20:07.151765,185 +2025-04-24 16:20:07.151765,187 +2025-04-25 16:20:07.151765,262 +2025-04-26 16:20:07.151765,235 +2025-04-27 16:20:07.151765,155 +2025-04-28 16:20:07.151765,200 +2025-04-29 16:20:07.151765,211 +2025-04-30 16:20:07.151765,221 +2025-05-01 16:20:07.151765,226 +2025-05-02 16:20:07.151765,300 +2025-05-03 16:20:07.151765,286 +2025-05-04 16:20:07.151765,163 +2025-05-05 16:20:07.151765,174 +2025-05-06 16:20:07.151765,172 +2025-05-07 16:20:07.151765,175 +2025-05-08 16:20:07.151765,187 +2025-05-09 16:20:07.151765,246 +2025-05-10 16:20:07.151765,233 +2025-05-11 16:20:07.151765,143 +2025-05-12 16:20:07.151765,167 +2025-05-13 16:20:07.151765,171 +2025-05-14 16:20:07.151765,177 +2025-05-15 16:20:07.151765,182 +2025-05-16 16:20:07.151765,233 +2025-05-17 16:20:07.151765,227 +2025-05-18 16:20:07.151765,151 +2025-05-19 16:20:07.151765,152 +2025-05-20 16:20:07.151765,159 +2025-05-21 16:20:07.151765,173 +2025-05-22 16:20:07.151765,195 +2025-05-23 16:20:07.151765,244 +2025-05-24 16:20:07.151765,216 +2025-05-25 16:20:07.151765,145 +2025-05-26 16:20:07.151765,155 +2025-05-27 16:20:07.151765,151 +2025-05-28 16:20:07.151765,207 +2025-05-29 16:20:07.151765,220 +2025-05-30 16:20:07.151765,292 +2025-05-31 16:20:07.151765,278 +2025-06-01 16:20:07.151765,187 +2025-06-02 16:20:07.151765,190 +2025-06-03 16:20:07.151765,193 +2025-06-04 16:20:07.151765,188 +2025-06-05 16:20:07.151765,188 +2025-06-06 16:20:07.151765,240 +2025-06-07 16:20:07.151765,221 +2025-06-08 16:20:07.151765,148 +2025-06-09 16:20:07.151765,155 +2025-06-10 16:20:07.151765,161 +2025-06-11 16:20:07.151765,170 +2025-06-12 16:20:07.151765,181 +2025-06-13 16:20:07.151765,234 +2025-06-14 16:20:07.151765,214 +2025-06-15 16:20:07.151765,145 +2025-06-16 16:20:07.151765,153 +2025-06-17 16:20:07.151765,171 +2025-06-18 16:20:07.151765,160 +2025-06-19 16:20:07.151765,184 +2025-06-20 16:20:07.151765,244 +2025-06-21 16:20:07.151765,208 +2025-06-22 16:20:07.151765,140 +2025-06-23 16:20:07.151765,150 +2025-06-24 16:20:07.151765,154 +2025-06-25 16:20:07.151765,164 +2025-06-26 16:20:07.151765,172 +2025-06-27 16:20:07.151765,242 +2025-06-28 16:20:07.151765,267 +2025-06-29 16:20:07.151765,182 +2025-06-30 16:20:07.151765,190 +2025-07-01 16:20:07.151765,190 +2025-07-02 16:20:07.151765,201 +2025-07-03 16:20:07.151765,217 +2025-07-04 16:20:07.151765,227 +2025-07-05 16:20:07.151765,217 +2025-07-06 16:20:07.151765,143 +2025-07-07 16:20:07.151765,150 +2025-07-08 16:20:07.151765,164 +2025-07-09 16:20:07.151765,173 +2025-07-10 16:20:07.151765,178 +2025-07-11 16:20:07.151765,239 +2025-07-12 16:20:07.151765,210 +2025-07-13 16:20:07.151765,145 +2025-07-14 16:20:07.151765,156 +2025-07-15 16:20:07.151765,160 +2025-07-16 16:20:07.151765,171 +2025-07-17 16:20:07.151765,172 +2025-07-18 16:20:07.151765,235 +2025-07-19 16:20:07.151765,214 +2025-07-20 16:20:07.151765,140 +2025-07-21 16:20:07.151765,156 +2025-07-22 16:20:07.151765,158 +2025-07-23 16:20:07.151765,161 +2025-07-24 16:20:07.151765,167 +2025-07-25 16:20:07.151765,232 +2025-07-26 16:20:07.151765,207 +2025-07-27 16:20:07.151765,150 +2025-07-28 16:20:07.151765,172 +2025-07-29 16:20:07.151765,199 +2025-07-30 16:20:07.151765,206 +2025-07-31 16:20:07.151765,214 +2025-08-01 16:20:07.151765,275 +2025-08-02 16:20:07.151765,258 +2025-08-03 16:20:07.151765,171 +2025-08-04 16:20:07.151765,158 +2025-08-05 16:20:07.151765,163 +2025-08-06 16:20:07.151765,171 +2025-08-07 16:20:07.151765,177 +2025-08-08 16:20:07.151765,231 +2025-08-09 16:20:07.151765,216 +2025-08-10 16:20:07.151765,134 +2025-08-11 16:20:07.151765,142 +2025-08-12 16:20:07.151765,162 +2025-08-13 16:20:07.151765,171 +2025-08-14 16:20:07.151765,178 +2025-08-15 16:20:07.151765,233 +2025-08-16 16:20:07.151765,217 +2025-08-17 16:20:07.151765,141 +2025-08-18 16:20:07.151765,147 diff --git a/floracast/materializers/__init__.py b/floracast/materializers/__init__.py new file mode 100644 index 00000000..6cd55cbd --- /dev/null +++ b/floracast/materializers/__init__.py @@ -0,0 +1,5 @@ +"""Custom materializers for FloraCast.""" + +from .tft_materializer import TFTModelMaterializer + +__all__ = ["TFTModelMaterializer"] diff --git a/floracast/materializers/tft_materializer.py b/floracast/materializers/tft_materializer.py new file mode 100644 index 00000000..6760a28b --- /dev/null +++ b/floracast/materializers/tft_materializer.py @@ -0,0 +1,435 @@ +""" +Custom materializer for Darts TFT model objects using io_utils approach. +""" + +import json +import os +import pickle +import tempfile +from typing import Any, Dict, Type + +import numpy as np +import pandas as pd +import torch +from darts import TimeSeries +from darts.models import TFTModel +from zenml.enums import ArtifactType +from zenml.io import fileio +from zenml.logger import get_logger +from zenml.materializers.base_materializer import BaseMaterializer +from zenml.metadata.metadata_types import MetadataType + +logger = get_logger(__name__) + + +class TFTModelMaterializer(BaseMaterializer): + """Materializer for Darts TFT model objects using io_utils pattern.""" + + # Import at class level to ensure proper registration + ASSOCIATED_TYPES = (TFTModel,) + ASSOCIATED_ARTIFACT_TYPE = ArtifactType.MODEL + + def load(self, data_type: Type[Any]) -> Any: + """Load a TFT model using enhanced reconstruction strategy.""" + # using top-level TFTModel import + + # Set PyTorch default dtype to float32 for consistent precision + original_dtype = torch.get_default_dtype() + torch.set_default_dtype(torch.float32) + + try: + # Check what save strategies were used + strategy_info = self._load_strategy_info() + + # Try enhanced reconstruction if PyTorch state was saved + if strategy_info.get("pytorch_model_saved", False): + try: + return self._load_with_pytorch_state() + except Exception as e: + logger.warning(f"Enhanced reconstruction failed: {e}") + + # Fallback to pickle loading + try: + return self._load_pickle_format() + except Exception as e: + logger.error(f"All loading strategies failed: {e}") + raise + finally: + # Restore original PyTorch dtype + torch.set_default_dtype(original_dtype) + + def _load_native_format(self) -> Any: + """Load TFT model using native Darts save format.""" + # using top-level TFTModel import + + # Use temporary directory for native loading + with tempfile.TemporaryDirectory() as temp_dir: + local_model_dir = os.path.join(temp_dir, "tft_model") + os.makedirs(local_model_dir) + + # Download all files from the tft_model directory in artifact store + tft_dir = os.path.join(self.uri, "tft_model") + + # List all files in the tft_model directory and download them + try: + # Try to list files in the tft_model directory + files_to_copy = [] + try: + # Try to use fileio to list files if supported + for filename in [ + "model.pth.tar", + "model_params.pkl", + "model.pkl", + "checkpoint.pth", + ]: + src_path = os.path.join(tft_dir, filename) + if fileio.exists(src_path): + files_to_copy.append(filename) + except Exception: + # Fallback to known essential files + files_to_copy = ["model.pth.tar", "model_params.pkl"] + + # Copy each file + for filename in files_to_copy: + src_path = os.path.join(tft_dir, filename) + dst_path = os.path.join(local_model_dir, filename) + + try: + with fileio.open(src_path, "rb") as src_f: + with open(dst_path, "wb") as dst_f: + dst_f.write(src_f.read()) + logger.info(f"Downloaded {filename}") + except Exception as e: + logger.warning(f"Failed to download {filename}: {e}") + continue + + # Load using TFT's native method + logger.info("Loading TFT model from native format") + model = TFTModel.load(local_model_dir) + logger.info("Successfully loaded TFT model from native format") + return model + + except Exception as e: + logger.warning(f"Native format loading failed: {e}") + raise + + def _load_strategy_info(self) -> dict: + """Load information about what save strategies were used.""" + try: + # using top-level json import + info_path = os.path.join(self.uri, "save_info.json") + with fileio.open(info_path, "r") as f: + return json.load(f) + except Exception: + # Default to old behavior if no strategy info found + return {"pytorch_model_saved": False} + + def _load_with_pytorch_state(self) -> Any: + """Load TFT model with PyTorch state reconstruction.""" + # using top-level TFTModel, torch, json imports + + logger.info("Loading TFT model with PyTorch state reconstruction") + + # Load the pickle model first to get the structure + pickle_path = os.path.join(self.uri, "model.pkl") + with fileio.open(pickle_path, "rb") as f: + model = pickle.load(f) + + # Load the saved PyTorch state + state_dict_path = os.path.join(self.uri, "pytorch_state.pth") + with tempfile.NamedTemporaryFile() as temp_file: + with fileio.open(state_dict_path, "rb") as src_f: + with open(temp_file.name, "wb") as dst_f: + dst_f.write(src_f.read()) + + # Load the state dict + try: + state_dict = torch.load(temp_file.name, map_location="cpu") + except Exception as e: + logger.warning(f"Failed to load PyTorch state dict: {e}") + # Try with different loading strategy + state_dict = torch.load( + temp_file.name, map_location="cpu", weights_only=False + ) + + # Load TFT metadata to reconstruct the model properly + metadata_path = os.path.join(self.uri, "tft_metadata.json") + with fileio.open(metadata_path, "r") as f: + metadata = json.load(f) + + # If the internal model is None, we need to create it + if getattr(model, "model", None) is None: + logger.info("Reconstructing internal PyTorch model") + + # Create a new model with same parameters to get the architecture + temp_model = TFTModel(**metadata.get("model_params", {})) + + # Create minimal training data to initialize the model architecture + # using top-level pandas, numpy, TimeSeries imports + + # Create dummy training data based on saved metadata + metadata.get("training_series_info", {}) + dummy_length = max( + temp_model.input_chunk_length + + temp_model.output_chunk_length + + 5, + 50, + ) + + dates = pd.date_range("2020-01-01", periods=dummy_length, freq="D") + values = np.random.randn(dummy_length).astype(np.float32) + dummy_series = TimeSeries.from_dataframe( + pd.DataFrame({"ds": dates, "y": values}), + time_col="ds", + value_cols="y", + ).astype(np.float32) + + # Partially fit to create the internal model structure + temp_model.fit(dummy_series, epochs=1, verbose=False) + + # Now load the saved state into the reconstructed model + if hasattr(temp_model, "model") and temp_model.model is not None: + temp_model.model.load_state_dict(state_dict) + + # Replace the internal model in our loaded model + model.model = temp_model.model + model._fit_called = True + + logger.debug( + "Successfully reconstructed TFT model with saved PyTorch state" + ) + else: + logger.warning("Failed to create internal model structure") + + else: + # Model structure exists, just load the state + model.model.load_state_dict(state_dict) + + return model + + def _load_pickle_format(self) -> Any: + """Fallback to pickle loading.""" + pickle_path = os.path.join(self.uri, "model.pkl") + + logger.info("Loading TFT model from pickle format") + with fileio.open(pickle_path, "rb") as f: + model = pickle.load(f) + + logger.warning( + "Loaded from pickle - internal PyTorch model may be None" + ) + return model + + def save(self, data: Any) -> None: + """Save TFT model using enhanced strategy that preserves internal PyTorch model.""" + logger.info("Saving TFT model using enhanced strategy") + + # Strategy 1: Save PyTorch model state separately if model is fitted and has internal model + pytorch_model_saved = False + if ( + getattr(data, "_fit_called", False) + and getattr(data, "model", None) is not None + ): + try: + self._save_pytorch_state(data) + pytorch_model_saved = True + logger.info("Successfully saved PyTorch model state") + except Exception as e: + logger.warning(f"PyTorch state save failed: {e}") + + # Strategy 2: Save metadata and TFT attributes + try: + self._save_tft_metadata(data) + logger.info("Successfully saved TFT metadata") + except Exception as e: + logger.warning(f"TFT metadata save failed: {e}") + + # Strategy 3: Always save pickle as backup (but will need reconstruction on load) + try: + self._save_pickle_format(data) + logger.info("Successfully saved TFT model as pickle backup") + except Exception as e: + logger.warning(f"Pickle backup save failed: {e}") + + # Create a flag file to indicate which strategies succeeded + strategy_info = { + "pytorch_model_saved": pytorch_model_saved, + "fit_called": getattr(data, "_fit_called", False), + "has_internal_model": getattr(data, "model", None) is not None, + } + try: + info_path = os.path.join(self.uri, "save_info.json") + with fileio.open(info_path, "w") as f: + json.dump(strategy_info, f) + except Exception as e: + logger.warning(f"Failed to save strategy info: {e}") + + def _save_native_format(self, data: Any) -> None: + """Save TFT model using native Darts format.""" + # Check if the model is fitted (has trained internal model) + if ( + not getattr(data, "_fit_called", False) + or getattr(data, "model", None) is None + ): + logger.warning( + "TFT model not fitted or has no internal model - skipping native save" + ) + raise ValueError( + "TFT model must be fitted before saving in native format" + ) + + # Use temporary directory for native saving + with tempfile.TemporaryDirectory() as temp_dir: + local_model_dir = os.path.join(temp_dir, "tft_model") + + # Save using TFT's native method + data.save(local_model_dir) + + # Check if any files were actually created + files_created = [] + if os.path.exists(local_model_dir): + for root, dirs, files in os.walk(local_model_dir): + for filename in files: + files_created.append(filename) + + if not files_created: + logger.warning( + "No files created by TFT native save - model may not be fitted" + ) + raise ValueError("TFT native save created no files") + + logger.info(f"TFT native save created files: {files_created}") + + # Upload all files to artifact store + tft_dir = os.path.join(self.uri, "tft_model") + + for root, dirs, files in os.walk(local_model_dir): + for filename in files: + src_path = os.path.join(root, filename) + # Calculate relative path from local_model_dir + rel_path = os.path.relpath(src_path, local_model_dir) + dst_path = os.path.join(tft_dir, rel_path) + + # Create directory structure if needed + dst_dir = os.path.dirname(dst_path) + if dst_dir and dst_dir != tft_dir: + # Create intermediate directories + parts = os.path.relpath(dst_dir, self.uri).split( + os.sep + ) + current_path = self.uri + for part in parts: + current_path = os.path.join(current_path, part) + # fileio doesn't have makedirs, but we can try to write to nested paths + pass + + # Copy file to artifact store using fileio + with open(src_path, "rb") as src_f: + with fileio.open(dst_path, "wb") as dst_f: + dst_f.write(src_f.read()) + + logger.info(f"Uploaded {rel_path} to artifact store") + + def _save_pytorch_state(self, data: Any) -> None: + """Save the internal PyTorch model state dict separately.""" + # using top-level torch import + + if hasattr(data, "model") and data.model is not None: + # Save the PyTorch model state dict + state_dict_path = os.path.join(self.uri, "pytorch_state.pth") + with tempfile.NamedTemporaryFile() as temp_file: + torch.save(data.model.state_dict(), temp_file.name) + with open(temp_file.name, "rb") as src_f: + with fileio.open(state_dict_path, "wb") as dst_f: + dst_f.write(src_f.read()) + + # Also save the model architecture if available + if hasattr(data, "model_params"): + arch_path = os.path.join(self.uri, "model_arch.pkl") + with tempfile.NamedTemporaryFile() as temp_file: + # using top-level pickle import + with open(temp_file.name, "wb") as f: + pickle.dump(data.model_params, f) + with open(temp_file.name, "rb") as src_f: + with fileio.open(arch_path, "wb") as dst_f: + dst_f.write(src_f.read()) + + def _save_tft_metadata(self, data: Any) -> None: + """Save TFT model metadata and configuration.""" + # using top-level json import + + # Extract key TFT attributes that we need to reconstruct + metadata = { + "input_chunk_length": getattr(data, "input_chunk_length", None), + "output_chunk_length": getattr(data, "output_chunk_length", None), + "fit_called": getattr(data, "_fit_called", False), + "model_params": getattr(data, "_model_params", {}), + "training_series_info": None, + } + + # Save training series information if available + if ( + hasattr(data, "training_series") + and data.training_series is not None + ): + ts = data.training_series + metadata["training_series_info"] = { + "length": len(ts), + "width": ts.width, + "start_time": str(ts.start_time()), + "end_time": str(ts.end_time()), + "freq": str(ts.freq) if ts.freq else None, + "columns": list(ts.columns) if hasattr(ts, "columns") else [], + } + + # Convert any non-serializable values + def make_serializable(obj): + if hasattr(obj, "__dict__"): + return str(obj) + return obj + + # Clean metadata for JSON serialization + clean_metadata = {} + for key, value in metadata.items(): + try: + json.dumps(value) # Test if serializable + clean_metadata[key] = value + except (TypeError, ValueError): + clean_metadata[key] = str(value) + + metadata_path = os.path.join(self.uri, "tft_metadata.json") + with fileio.open(metadata_path, "w") as f: + json.dump(clean_metadata, f, indent=2) + + def _save_pickle_format(self, data: Any) -> None: + """Save TFT model using pickle format as backup.""" + pickle_path = os.path.join(self.uri, "model.pkl") + + with fileio.open(pickle_path, "wb") as f: + pickle.dump(data, f) + + def extract_metadata(self, data: Any) -> Dict[str, MetadataType]: + """Extract metadata from TFT model.""" + metadata = { + "input_chunk_length": data.input_chunk_length, + "output_chunk_length": data.output_chunk_length, + "model_name": getattr(data, "model_name", "TFTModel"), + "n_epochs": getattr(data, "n_epochs", "unknown"), + "fit_called": getattr(data, "_fit_called", False), + "has_internal_model": getattr(data, "model", None) is not None, + } + + if hasattr(data, "_model_params"): + metadata.update( + { + "hidden_size": data._model_params.get( + "hidden_size", "unknown" + ), + "lstm_layers": data._model_params.get( + "lstm_layers", "unknown" + ), + "dropout": data._model_params.get("dropout", "unknown"), + } + ) + + return metadata diff --git a/floracast/materializers/timeseries_materializer.py b/floracast/materializers/timeseries_materializer.py new file mode 100644 index 00000000..3cdaf404 --- /dev/null +++ b/floracast/materializers/timeseries_materializer.py @@ -0,0 +1,291 @@ +""" +Custom materializer for Darts TimeSeries objects. + +This materializer saves a `darts.TimeSeries` as: +- series.csv: the time-indexed values +- metadata.json: minimal reconstruction metadata (freq, columns, etc.) +- static_covariates.csv (optional): static covariates if present +""" + +import json +import os +import tempfile +from typing import Any, Dict, Type + +import matplotlib +import numpy as np +import pandas as pd + +# Use a non-interactive backend for headless environments +matplotlib.use("Agg") +import matplotlib.pyplot as plt +from darts import TimeSeries +from zenml.enums import ArtifactType, VisualizationType +from zenml.io import fileio +from zenml.logger import get_logger +from zenml.materializers.base_materializer import BaseMaterializer +from zenml.metadata.metadata_types import MetadataType + +logger = get_logger(__name__) + + +class DartsTimeSeriesMaterializer(BaseMaterializer): + """Materializer for Darts TimeSeries objects.""" + + ASSOCIATED_TYPES = (TimeSeries,) + ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA + + def load(self, data_type: Type[Any]) -> Any: + """Load a Darts TimeSeries from CSV + metadata.""" + metadata_path = os.path.join(self.uri, "metadata.json") + series_path = os.path.join(self.uri, "series.csv") + static_covariates_path = os.path.join( + self.uri, "static_covariates.csv" + ) + + # Read metadata + metadata = {} + if fileio.exists(metadata_path): + with fileio.open(metadata_path, "r") as f: + metadata = json.load(f) + else: + logger.warning( + "metadata.json not found. Proceeding with best-effort defaults." + ) + + # Read series + if not fileio.exists(series_path): + raise FileNotFoundError( + "series.csv not found for TimeSeries artifact" + ) + + with fileio.open(series_path, "r") as f: + df = pd.read_csv(f) + + time_col = metadata.get("time_col", "time") + value_cols = metadata.get("value_cols") + freq = metadata.get("freq") + _ = metadata.get("time_index_type") + time_tz = metadata.get("time_tz") + dtypes_map = metadata.get("dtypes") or {} + + if value_cols is None: + # Default to all non-time columns as values + value_cols = [c for c in df.columns if c != time_col] + + # Ensure proper dtype restoration + for col, dtype_str in dtypes_map.items(): + if col in df.columns and col != time_col: + try: + df[col] = df[col].astype(dtype_str) + except Exception: + pass + + # Ensure datetime type for the time column with timezone if present + if time_col in df.columns: + try: + df[time_col] = pd.to_datetime(df[time_col], utc=False) + if time_tz: + try: + # If parsed as timezone-aware, convert; otherwise localize + if df[time_col].dt.tz is not None: + df[time_col] = df[time_col].dt.tz_convert(time_tz) + else: + df[time_col] = df[time_col].dt.tz_localize(time_tz) + except Exception: + pass + except Exception: + pass + + ts = TimeSeries.from_dataframe( + df, time_col=time_col, value_cols=value_cols, freq=freq + ) + + # Convert to float32 for hardware compatibility (MPS, mixed precision training) + logger.debug( + "Converting TimeSeries to float32 for hardware compatibility" + ) + ts = ts.astype(np.float32) + + # Restore static covariates if present + if fileio.exists(static_covariates_path): + with fileio.open(static_covariates_path, "r") as f: + sc_df = pd.read_csv(f) + try: + ts = ts.with_static_covariates(sc_df) + except Exception as e: + logger.warning(f"Failed to attach static covariates: {e}") + + return ts + + def save(self, data: Any) -> None: + """Save a Darts TimeSeries to CSV + metadata. + + We avoid pickling for portability and artifact store compatibility. + """ + if not isinstance(data, TimeSeries): + raise TypeError( + "DartsTimeSeriesMaterializer can only handle darts.TimeSeries instances" + ) + + # Extract dataframe with time index + df = data.pd_dataframe() + time_index = df.index + df_reset = df.reset_index() + time_col_name = df_reset.columns[0] + value_cols = list(df_reset.columns[1:]) + + # Save series.csv using a temp file to support remote artifact stores + series_path = os.path.join(self.uri, "series.csv") + with tempfile.NamedTemporaryFile(suffix=".csv") as tmp: + # Write datetime in a stable ISO format including timezone when present + date_format = "%Y-%m-%dT%H:%M:%S%z" + try: + df_reset.to_csv(tmp.name, index=False, date_format=date_format) + except TypeError: + # Older pandas may not support date_format for to_csv; fallback + df_reset.to_csv(tmp.name, index=False) + with open(tmp.name, "rb") as src_f: + with fileio.open(series_path, "wb") as dst_f: + dst_f.write(src_f.read()) + + # Save static covariates if present + try: + static_covariates = data.static_covariates + except Exception: + static_covariates = None + + if static_covariates is not None: + sc_path = os.path.join(self.uri, "static_covariates.csv") + with tempfile.NamedTemporaryFile(suffix=".csv") as tmp: + # Ensure DataFrame + sc_df = ( + static_covariates + if isinstance(static_covariates, pd.DataFrame) + else pd.DataFrame(static_covariates) + ) + sc_df.to_csv(tmp.name, index=False) + with open(tmp.name, "rb") as src_f: + with fileio.open(sc_path, "wb") as dst_f: + dst_f.write(src_f.read()) + + # Determine a stable pandas frequency alias (e.g., "D") + freq_alias = None + try: + freq_alias = getattr(time_index, "freqstr", None) + if freq_alias is None: + freq_alias = pd.infer_freq(time_index) + except Exception: + freq_alias = None + + # Gather meta about time index + try: + if hasattr(time_index, "tz") and time_index.tz is not None: + time_tz = str(time_index.tz) + else: + time_tz = None + except Exception: + time_tz = None + + if str(type(time_index)).endswith("DatetimeIndex'>"): + time_index_type = "datetime" + elif str(type(time_index)).endswith("PeriodIndex'>"): + time_index_type = "period" + elif str(type(time_index)).endswith("Int64Index'>"): + time_index_type = "int" + else: + time_index_type = "other" + + # Capture dtypes for value columns + dtypes_map = {col: str(df_reset[col].dtype) for col in value_cols} + + # Save minimal metadata + metadata = { + "time_col": time_col_name, + "value_cols": value_cols, + "freq": freq_alias, + "time_index_type": time_index_type, + "time_tz": time_tz, + "dtypes": dtypes_map, + "length": len(data), + "is_univariate": data.width == 1, + } + + metadata_path = os.path.join(self.uri, "metadata.json") + with fileio.open(metadata_path, "w") as f: + json.dump(metadata, f) + + # Create a lightweight preview plot (single image) at artifact root + try: + preview_path = os.path.join(self.uri, "preview.png") + with tempfile.NamedTemporaryFile(suffix=".png") as tmp: + fig, ax = plt.subplots(figsize=(8, 3)) + x_vals = df_reset[time_col_name] + for col in value_cols: + ax.plot(x_vals, df_reset[col], label=col, linewidth=1) + ax.set_title("TimeSeries Preview") + ax.set_xlabel(str(time_col_name)) + ax.set_ylabel("value") + if len(value_cols) > 1: + ax.legend(loc="upper right", fontsize=8) + fig.tight_layout() + fig.savefig(tmp.name, dpi=120) + plt.close(fig) + with open(tmp.name, "rb") as src_f: + with fileio.open(preview_path, "wb") as dst_f: + dst_f.write(src_f.read()) + logger.debug(f"Saved TimeSeries preview to {preview_path}") + except Exception as e: + # Best-effort; do not fail the materialization for plotting issues + logger.warning(f"Failed to create TimeSeries preview plot: {e}") + + def save_visualizations(self, data: Any) -> Dict[str, VisualizationType]: + """Return a single IMAGE visualization for the preview. + + Ensures a root-level preview image exists and returns its URI. + """ + preview_uri = os.path.join(self.uri, "preview.png") + try: + if not fileio.exists(preview_uri): + # Generate preview if missing + df = data.pd_dataframe() + df_reset = df.reset_index() + time_col_name = df_reset.columns[0] + value_cols = list(df_reset.columns[1:]) + with tempfile.NamedTemporaryFile(suffix=".png") as tmp: + fig, ax = plt.subplots(figsize=(8, 3)) + x_vals = df_reset[time_col_name] + for col in value_cols: + ax.plot(x_vals, df_reset[col], label=col, linewidth=1) + ax.set_title("TimeSeries Preview") + ax.set_xlabel(str(time_col_name)) + ax.set_ylabel("value") + if len(value_cols) > 1: + ax.legend(loc="upper right", fontsize=8) + fig.tight_layout() + fig.savefig(tmp.name, dpi=120) + plt.close(fig) + with open(tmp.name, "rb") as src_f: + with fileio.open(preview_uri, "wb") as dst_f: + dst_f.write(src_f.read()) + except Exception as e: + logger.warning(f"Failed to ensure preview visualization: {e}") + + return ( + {preview_uri: VisualizationType.IMAGE} + if fileio.exists(preview_uri) + else {} + ) + + def extract_metadata(self, data: Any) -> Dict[str, MetadataType]: + """Extract lightweight metadata from a TimeSeries.""" + return { + "length": len(data), + "width": data.width, + "start_time": str(data.start_time()), + "end_time": str(data.end_time()), + "freq": str(data.freq) + if getattr(data, "freq", None) is not None + else "unknown", + "is_univariate": data.width == 1, + } diff --git a/floracast/pipelines/__init__.py b/floracast/pipelines/__init__.py new file mode 100644 index 00000000..38a3676d --- /dev/null +++ b/floracast/pipelines/__init__.py @@ -0,0 +1,6 @@ +"""ZenML pipelines for FloraCast.""" + +from .batch_inference_pipeline import batch_inference_pipeline +from .train_forecast_pipeline import train_forecast_pipeline + +__all__ = ["train_forecast_pipeline", "batch_inference_pipeline"] diff --git a/floracast/pipelines/batch_inference_pipeline.py b/floracast/pipelines/batch_inference_pipeline.py new file mode 100644 index 00000000..7e43a794 --- /dev/null +++ b/floracast/pipelines/batch_inference_pipeline.py @@ -0,0 +1,22 @@ +""" +Batch inference pipeline for FloraCast forecasting models. +""" + +from steps.batch_infer import batch_inference_predict +from steps.ingest import ingest_data +from zenml import pipeline +from zenml.logger import get_logger + +logger = get_logger(__name__) + + +@pipeline(enable_cache=False) +def batch_inference_pipeline() -> None: + """ + Batch inference pipeline that loads model from Model Control Plane and generates predictions. + """ + # Step 1: Ingest data (simulate real-time data sources) + raw_data = ingest_data(infer=True) + + # Step 2: Generate predictions using model from MCP (with scaling handled internally) + batch_inference_predict(df=raw_data) diff --git a/floracast/pipelines/train_forecast_pipeline.py b/floracast/pipelines/train_forecast_pipeline.py new file mode 100644 index 00000000..8b8a691c --- /dev/null +++ b/floracast/pipelines/train_forecast_pipeline.py @@ -0,0 +1,38 @@ +""" +Training pipeline for FloraCast forecasting models. +""" + +from steps.evaluate import evaluate +from steps.ingest import ingest_data +from steps.preprocess import preprocess_data +from steps.promote import promote_model +from steps.train import train_model +from zenml import pipeline +from zenml.logger import get_logger + +logger = get_logger(__name__) + + +@pipeline(enable_cache=True) +def train_forecast_pipeline() -> None: + """ + Training pipeline that ingests data, preprocesses it, trains a model, + evaluates it, and promotes it via ZenML Model Control Plane. + """ + # Step 1: Ingest data + raw_data = ingest_data() + + # Step 2: Preprocess data into Darts TimeSeries with train/val split + train_series, val_series, _ = preprocess_data(df=raw_data) + + # Step 3: Train the forecasting model + trained_model = train_model(train_series=train_series) + + # Step 4: Evaluate the model + evaluation_results = evaluate( + model=trained_model, train_series=train_series, val_series=val_series + ) + score = evaluation_results[0] + + # Step 5: Register model and promote if better + _ = promote_model(current_score=score) diff --git a/floracast/requirements.txt b/floracast/requirements.txt new file mode 100644 index 00000000..12040d17 --- /dev/null +++ b/floracast/requirements.txt @@ -0,0 +1,22 @@ +# ZenML and Core MLOps +zenml>=0.84.2 + +# Forecasting and ML +u8darts[torch]>=0.29.0,<0.32.0 +torch>=2.2.0,<2.5.0 +scikit-learn>=1.4.0,<1.6.0 + +# Data Processing +pandas>=2.1.0,<2.3.0 +numpy>=1.24.0,<1.27.0 + +# Configuration and Environment +pyyaml>=6.0,<7.0 +python-dotenv>=1.0.1,<2.0.0 + +# Azure Integration (optional for AKS deployment) +azure-identity>=1.16.0,<2.0.0 +azure-storage-blob>=12.20.0,<13.0.0 + +# Utilities +matplotlib>=3.7.0,<4.0.0 \ No newline at end of file diff --git a/floracast/run.py b/floracast/run.py new file mode 100644 index 00000000..c5121c2c --- /dev/null +++ b/floracast/run.py @@ -0,0 +1,69 @@ +""" +Entry point for running FloraCast pipelines using ZenML e2e pattern. +""" + +from pathlib import Path + +import click +from pipelines import batch_inference_pipeline, train_forecast_pipeline +from zenml.logger import get_logger + +logger = get_logger(__name__) + + +@click.command() +@click.option( + "--config", + "-c", + type=click.Path(exists=True, dir_okay=False), + default=None, + required=False, + help=( + "Path to configuration YAML file. If not provided, selects a sensible default " + "based on the chosen pipeline (training.yaml for train, inference.yaml for inference)." + ), +) +@click.option( + "--pipeline", + "-p", + type=click.Choice(["train", "inference", "both"]), + default="train", + help="Pipeline to run", +) +def main(config: str | None, pipeline: str): + """Run FloraCast forecasting pipelines using ZenML with_options pattern. + + When --config is omitted, a default is chosen per pipeline: + - train: floracast/configs/training.yaml + - inference: floracast/configs/inference.yaml + - both: training uses training.yaml, inference uses inference.yaml + """ + + project_root = Path(__file__).parent + default_training_config = project_root / "configs" / "training.yaml" + default_inference_config = project_root / "configs" / "inference.yaml" + + try: + if pipeline in ["train", "both"]: + chosen_config = config or str(default_training_config) + logger.info( + f"Starting training pipeline with config: {chosen_config}" + ) + train_forecast_pipeline.with_options(config_path=chosen_config)() + logger.info("Training pipeline completed successfully!") + + if pipeline in ["inference", "both"]: + chosen_config = config or str(default_inference_config) + logger.info( + f"Starting batch inference pipeline with config: {chosen_config}" + ) + batch_inference_pipeline.with_options(config_path=chosen_config)() + logger.info("Batch inference pipeline completed successfully!") + + except Exception as e: + logger.error(f"Pipeline execution failed: {e}") + raise + + +if __name__ == "__main__": + main() diff --git a/floracast/steps/__init__.py b/floracast/steps/__init__.py new file mode 100644 index 00000000..fc7aa5b5 --- /dev/null +++ b/floracast/steps/__init__.py @@ -0,0 +1,17 @@ +"""ZenML pipeline steps for FloraCast.""" + +from .batch_infer import batch_inference_predict +from .evaluate import evaluate +from .ingest import ingest_data +from .preprocess import preprocess_data +from .promote import promote_model +from .train import train_model + +__all__ = [ + "ingest_data", + "preprocess_data", + "train_model", + "evaluate", + "promote_model", + "batch_inference_predict", +] diff --git a/floracast/steps/batch_infer.py b/floracast/steps/batch_infer.py new file mode 100644 index 00000000..b461fb41 --- /dev/null +++ b/floracast/steps/batch_infer.py @@ -0,0 +1,177 @@ +"""Batch inference step for FloraCast using ZenML Model Control Plane.""" + +from typing import Annotated, Tuple + +import numpy as np +import pandas as pd +from darts import TimeSeries +from materializers.timeseries_materializer import DartsTimeSeriesMaterializer +from utils.prediction import iterative_predict +from zenml import get_step_context, log_metadata, step +from zenml.client import Client +from zenml.logger import get_logger + +logger = get_logger(__name__) + + +@step( + output_materializers={ + "prediction_series": DartsTimeSeriesMaterializer, + } +) +def batch_inference_predict( + df: pd.DataFrame, + datetime_col: str = "ds", + target_col: str = "y", + freq: str = "D", + horizon: int = 14, +) -> Tuple[ + Annotated[pd.DataFrame, "predictions"], + Annotated[TimeSeries, "prediction_series"], +]: + """Perform batch inference using the trained model from Model Control Plane. + + Args: + df: Raw DataFrame with datetime and target columns + datetime_col: Name of datetime column + target_col: Name of target column + freq: Frequency string for time series + horizon: Number of time steps to forecast + + Returns: + DataFrame containing forecast results with columns ['ds', 'yhat'] + TimeSeries containing the forecast results + """ + logger.info(f"Performing batch inference with horizon: {horizon}") + + try: + # Convert DataFrame to TimeSeries and cast to float32 for consistency + logger.info("Converting DataFrame to TimeSeries") + series = TimeSeries.from_dataframe( + df, time_col=datetime_col, value_cols=target_col, freq=freq + ).astype(np.float32) + + logger.info(f"Created TimeSeries with {len(series)} points") + logger.info( + f"Series range: {series.start_time()} to {series.end_time()}" + ) + + # Get the model from Model Control Plane + context = get_step_context() + if not context.model: + raise ValueError( + "No model found in step context. Make sure to run training first or specify model version in config." + ) + + logger.info( + f"Using model: {context.model.name}, version: {context.model.version}" + ) + + # Try to get the trained model artifact + try: + model_artifact = context.model.get_artifact("trained_model") + if model_artifact is None: + raise ValueError( + "trained_model artifact not found in model version" + ) + except Exception as e: + logger.error(f"Failed to get trained_model artifact: {e}") + # List all available artifacts for debugging + try: + logger.info("Available artifacts in model version:") + # Use the correct method to get model version artifacts + client = Client() + model_version = client.get_model_version( + model_name_or_id=context.model.name, + model_version_name_or_number_or_id=context.model.version, + ) + artifacts = model_version.model_artifacts + for artifact in artifacts: + logger.info(f" - {artifact.name}") + except Exception as list_error: + logger.warning(f"Could not list artifacts: {list_error}") + raise + + # Load the trained model + trained_model = model_artifact.load() + logger.info( + f"Loaded model from Model Control Plane: {type(trained_model).__name__}" + ) + + # Load the fitted scaler artifact + fitted_scaler = None + try: + scaler_artifact = context.model.get_artifact("fitted_scaler") + if scaler_artifact is None: + raise ValueError( + "fitted_scaler artifact not found in model version" + ) + fitted_scaler = scaler_artifact.load() + logger.info("Loaded fitted scaler artifact from model version") + + # Apply scaling to the input series + logger.info("Applying scaling to input series for inference") + series = fitted_scaler.transform(series) + logger.info("Scaling applied successfully") + except Exception as scaler_error: + logger.error(f"Failed to load or apply scaler: {scaler_error}") + logger.warning( + "Proceeding without scaling - predictions may be incorrect!" + ) + # Continue without scaling for backward compatibility + + # Generate predictions using improved multi-step approach + predictions = iterative_predict(trained_model, series, horizon) + + # Inverse transform predictions back to original scale + if fitted_scaler is not None: + try: + logger.info( + "Inverse transforming predictions back to original scale" + ) + predictions = fitted_scaler.inverse_transform(predictions) + logger.info("Inverse transformation applied successfully") + except Exception as inverse_error: + logger.error( + f"Failed to inverse transform predictions: {inverse_error}" + ) + logger.warning("Predictions remain in scaled format!") + else: + logger.warning( + "No scaler available - predictions remain in original format" + ) + + # Convert to DataFrame + pred_df = predictions.pd_dataframe().reset_index() + pred_df.columns = ["ds", "yhat"] # Standard naming + + logger.info(f"Created predictions DataFrame with {len(pred_df)} rows") + logger.info( + f"Forecast period: {pred_df['ds'].min()} to {pred_df['ds'].max()}" + ) + logger.info( + f"Prediction stats: mean={pred_df['yhat'].mean():.2f}, std={pred_df['yhat'].std():.2f}" + ) + + # Log metadata to ZenML for observability + log_metadata( + { + "horizon": horizon, + "num_predictions": len(pred_df), + "forecast_start": str(pred_df["ds"].min()), + "forecast_end": str(pred_df["ds"].max()), + "prediction_mean": float(pred_df["yhat"].mean()), + "prediction_std": float(pred_df["yhat"].std()), + "prediction_min": float(pred_df["yhat"].min()), + "prediction_max": float(pred_df["yhat"].max()), + "model_type": type(trained_model).__name__, + "model_name": context.model.name, + "model_version": context.model.version, + } + ) + + return pred_df, predictions + + except Exception as e: + logger.error(f"Batch inference failed: {str(e)}") + raise diff --git a/floracast/steps/evaluate.py b/floracast/steps/evaluate.py new file mode 100644 index 00000000..908fdca5 --- /dev/null +++ b/floracast/steps/evaluate.py @@ -0,0 +1,329 @@ +""" +Model evaluation step for FloraCast. +""" + +import base64 +import tempfile +from typing import Annotated, Tuple + +import matplotlib +import pandas as pd + +matplotlib.use("Agg") +import matplotlib.pyplot as plt +from darts import TimeSeries +from utils.metrics import smape +from utils.prediction import iterative_predict +from zenml import log_metadata, step +from zenml.logger import get_logger +from zenml.types import HTMLString + +logger = get_logger(__name__) + + +def create_evaluation_visualization( + train_series: TimeSeries, + val_series: TimeSeries, + predictions: TimeSeries, + actual: TimeSeries, + score: float, + metric: str, +) -> HTMLString: + """ + Create an HTML visualization of the evaluation results. + + Args: + train_series: Training time series data + val_series: Validation time series data + predictions: Model predictions + actual: Actual validation values used for evaluation + score: Evaluation score + metric: Metric name + + Returns: + HTMLString with embedded plot visualization + """ + try: + # Create the plot with modern styling + plt.style.use("default") # Reset to clean style + fig, ax = plt.subplots(figsize=(14, 8)) + fig.patch.set_facecolor("white") + + # Convert to pandas for easier plotting + train_df = train_series.pd_dataframe() + val_df = val_series.pd_dataframe() + pred_df = predictions.pd_dataframe() + + # Define modern color palette + colors = { + "train": "#E8F4FD", # Very light blue + "val": "#2E86AB", # Modern blue + "pred": "#F18F01", # Vibrant orange + "highlight": "#FFE66D", # Soft yellow + } + + # Focus zoom: show last 3 months of training + all validation + prediction period + zoom_start = pred_df.index.min() - pd.Timedelta(days=90) + + # Filter data for zoom + train_zoom = ( + train_df[train_df.index >= zoom_start] + if len(train_df[train_df.index >= zoom_start]) > 0 + else train_df.tail(90) + ) + + # Plot training data (minimal context) + ax.plot( + train_zoom.index, + train_zoom.iloc[:, 0], + label="Training Data (Last 90 days)", + color="#7FB3D3", # More solid blue instead of very light + alpha=0.8, # More opaque + linewidth=2, # Slightly thicker + ) + + # Plot validation data + ax.plot( + val_df.index, + val_df.iloc[:, 0], + label="Ground Truth", + color=colors["val"], + alpha=0.9, + linewidth=3, + zorder=3, + ) + + # Plot predictions with modern style + ax.plot( + pred_df.index, + pred_df.iloc[:, 0], + label="AI Predictions", + color=colors["pred"], + alpha=0.95, + linewidth=4, + linestyle="-", + zorder=4, + marker="o", + markersize=3, + markeredgewidth=0, + ) + + # Add subtle shaded region + ax.axvspan( + pred_df.index.min(), + pred_df.index.max(), + alpha=0.08, + color=colors["highlight"], + zorder=1, + ) + + # Focus the x-axis on the interesting period + ax.set_xlim(zoom_start, val_df.index.max()) + + # Modern title styling + performance_text = ( + "Excellent" + if score < 20 + else "Good" + if score < 40 + else "Needs Improvement" + ) + + ax.set_title( + f"FloraCast AI Forecasting Model\\n{metric.upper()}: {score:.2f} ({performance_text})", + fontsize=18, + fontweight="600", + color="#2C3E50", + pad=20, + ) + + # Modern axis styling + ax.set_xlabel("Date", fontsize=14, color="#34495E", fontweight="500") + ax.set_ylabel( + "Normalized Value", fontsize=14, color="#34495E", fontweight="500" + ) + + # Clean legend + legend = ax.legend( + fontsize=12, + frameon=True, + fancybox=True, + shadow=True, + framealpha=0.95, + edgecolor="none", + ) + legend.get_frame().set_facecolor("#FAFAFA") + + # Modern grid + ax.grid(True, alpha=0.2, linestyle="-", linewidth=0.5, color="#BDC3C7") + ax.set_facecolor("#FEFEFE") + + # Clean up axes + ax.spines["top"].set_visible(False) + ax.spines["right"].set_visible(False) + ax.spines["left"].set_color("#BDC3C7") + ax.spines["bottom"].set_color("#BDC3C7") + + # Better tick formatting + plt.xticks(rotation=45, ha="right", fontsize=11, color="#34495E") + plt.yticks(fontsize=11, color="#34495E") + + plt.tight_layout(pad=2.0) + + # Save plot to base64 string + with tempfile.NamedTemporaryFile(suffix=".png") as tmp: + fig.savefig(tmp.name, dpi=150, bbox_inches="tight") + plt.close(fig) + + with open(tmp.name, "rb") as f: + img_data = f.read() + img_base64 = base64.b64encode(img_data).decode("utf-8") + + # Create HTML with embedded image + html_content = f""" + + + Model Evaluation Results + + + +
+

FloraCast Model Evaluation

+
+ +
+

Evaluation Metrics

+

Metric: {metric.upper()}

+

Score: {score:.4f}

+

Prediction Horizon: {len(predictions)} time steps

+

Training Data Points: {len(train_series)}

+

Validation Data Points: {len(val_series)}

+
+ +
+

Time Series Visualization

+ Evaluation Plot +
+ + + + + """ + + return HTMLString(html_content) + + except Exception as e: + logger.error(f"Failed to create visualization: {str(e)}") + # Return simple HTML with error message + error_html = f""" + + +

Evaluation Results

+

Metric: {metric.upper()}

+

Score: {score:.4f}

+

Visualization failed to generate: {str(e)}

+ + + """ + return HTMLString(error_html) + + +@step(enable_cache=False) +def evaluate( + model: object, + train_series: TimeSeries, + val_series: TimeSeries, + horizon: int = 7, + metric: str = "smape", +) -> Tuple[ + Annotated[float, "evaluation_score"], + Annotated[HTMLString, "evaluation_visualization"], +]: + """ + Evaluate the trained model on validation data. + + Args: + model: Trained forecasting model + train_series: Training time series + val_series: Validation time series + horizon: Forecasting horizon + metric: Evaluation metric name + + Returns: + Evaluation metric score (lower is better for SMAPE) + """ + + logger.info(f"Evaluating with horizon: {horizon}, metric: {metric}") + + try: + # Generate predictions using TFT model + # TFT requires the series parameter to generate predictions + logger.info(f"Generating predictions for horizon: {horizon}") + + # For TFT models, we need to provide the series parameter + if hasattr(model, "predict"): + # Generate predictions using iterative multi-step approach for longer horizons + # This is better than single-shot prediction for long horizons + # Respect the requested horizon but do not exceed validation length + n_predict = min(len(val_series), horizon) + + # Use utility function for prediction + predictions = iterative_predict(model, train_series, n_predict) + + # Truncate validation series to match prediction length + actual = val_series[: len(predictions)] + + # Use original predictions for evaluation (no artificial perturbation) + predictions_for_eval = predictions + + # Calculate metric on predictions + if metric == "smape": + score = smape(actual, predictions_for_eval) + else: + raise ValueError(f"Unknown metric: {metric}") + + logger.info( + f"Evaluation {metric}: {score:.4f} over {len(predictions)} days" + ) + + # Log metadata to ZenML for observability + log_metadata( + { + "evaluation_metric": metric, + "score": float(score), + "horizon": horizon, + "num_predictions": len(predictions), + "actual_length": len(actual), + "model_type": type(model).__name__, + } + ) + + # Create visualization + visualization_html = create_evaluation_visualization( + train_series, val_series, predictions, actual, score, metric + ) + + return float(score), visualization_html + else: + logger.error("Model does not have predict method") + empty_viz = HTMLString( + "

Model evaluation failed - no predict method available

" + ) + return 9999.0, empty_viz + + except Exception as e: + logger.error(f"Evaluation failed: {str(e)}") + logger.info("This might be due to TFT model prediction requirements") + # Return a high penalty score for failed evaluation + empty_viz = HTMLString( + "

Evaluation failed due to exception

" + ) + return 9999.0, empty_viz diff --git a/floracast/steps/ingest.py b/floracast/steps/ingest.py new file mode 100644 index 00000000..a9cc1fd4 --- /dev/null +++ b/floracast/steps/ingest.py @@ -0,0 +1,170 @@ +""" +Data ingestion step for FloraCast. +""" + +import os +from datetime import datetime, timedelta +from typing import Annotated, Optional + +import numpy as np +import pandas as pd +from zenml import step +from zenml.logger import get_logger + +logger = get_logger(__name__) + + +def generate_ecommerce_data(num_days: int = 730) -> pd.DataFrame: + """ + Generate synthetic ecommerce daily sales data with realistic patterns. + + Args: + num_days: Number of days to generate + + Returns: + DataFrame with columns 'ds' (date) and 'y' (sales) + """ + logger.info(f"Generating synthetic ecommerce data for {num_days} days") + + # Create date range + start_date = datetime.now() - timedelta(days=num_days) + dates = pd.date_range(start=start_date, periods=num_days, freq="D") + + # Base trend with multiple components + base_trend = np.linspace(100, 200, num_days) + + # Add yearly seasonality (higher in winter/holidays) + yearly_cycle = 20 * np.sin( + 2 * np.pi * np.arange(num_days) / 365.25 - np.pi / 2 + ) + + # Weekly seasonality (higher sales on weekends) + weekly_pattern = np.array( + [0.8, 0.85, 0.9, 0.95, 1.0, 1.3, 1.2] + ) # Mon to Sun + weekly_seasonality = np.tile(weekly_pattern, (num_days // 7) + 1)[ + :num_days + ] + + # Monthly cycle (end of month sales bumps) + days_in_month = pd.date_range( + start=start_date, periods=num_days, freq="D" + ).day + monthly_boost = np.where( + (days_in_month >= 28) | (days_in_month <= 3), 1.2, 1.0 + ) + + # Add some autocorrelated noise for realism + np.random.seed(42) + noise = np.random.normal(0, 8, num_days) + # Add some persistence to noise + for i in range(1, num_days): + noise[i] = 0.3 * noise[i - 1] + 0.7 * noise[i] + + # Combine all components + sales = ( + base_trend + yearly_cycle + ) * weekly_seasonality * monthly_boost + noise + sales = np.maximum(sales, 20) # Ensure positive values + sales = sales.astype(int) + + return pd.DataFrame({"ds": dates, "y": sales}) + + +@step +def ingest_data( + data_source: str = "ecommerce_default", + data_path: Optional[str] = None, + datetime_col: str = "ds", + target_col: str = "y", + infer: bool = False, +) -> Annotated[pd.DataFrame, "raw_data"]: + """ + Ingest data based on configuration parameters. + + Args: + data_source: Type of data source ("ecommerce_default" or "csv") + data_path: Path to CSV file (when data_source is "csv") + datetime_col: Name of datetime column + target_col: Name of target column + infer: If True, simulate real-time data ingestion for inference + + Returns: + DataFrame with datetime and target columns + """ + if data_source == "ecommerce_default": + # Generate or load default ecommerce data + csv_file_path = "data/ecommerce_daily.csv" + + if not os.path.exists(csv_file_path): + logger.info( + "Default ecommerce data not found, generating new data" + ) + os.makedirs("data", exist_ok=True) + df = generate_ecommerce_data() + df.to_csv(csv_file_path, index=False) + logger.info( + f"Generated and saved ecommerce data to {csv_file_path}" + ) + else: + if infer: + logger.info( + f"๐Ÿ”„ INFERENCE MODE: Simulating real-time data ingestion from {csv_file_path}" + ) + logger.info( + "๐Ÿ“ก In production, this would connect to real-time data sources like:" + ) + logger.info(" - Database queries for latest sales data") + logger.info(" - API calls to fetch recent transactions") + logger.info(" - Stream processing from Kafka/Kinesis") + logger.info(" - Data lake queries for updated metrics") + logger.info( + "๐Ÿ“Š For demo purposes, loading same data as training to show inference workflow" + ) + else: + logger.info( + f"Loading existing ecommerce data from {csv_file_path}" + ) + df = pd.read_csv(csv_file_path, parse_dates=["ds"]) + + elif data_source == "csv": + # Load from specified CSV path + if not data_path or not os.path.exists(data_path): + raise FileNotFoundError(f"CSV file not found: {data_path}") + + logger.info(f"Loading data from CSV: {data_path}") + df = pd.read_csv(data_path) + + # Parse datetime column + if datetime_col in df.columns: + df[datetime_col] = pd.to_datetime(df[datetime_col]) + else: + raise ValueError( + f"Datetime column '{datetime_col}' not found in CSV" + ) + + else: + raise ValueError(f"Unknown data source: {data_source}") + + # Validate required columns + if datetime_col not in df.columns: + raise ValueError(f"Datetime column '{datetime_col}' not found in data") + if target_col not in df.columns: + raise ValueError(f"Target column '{target_col}' not found in data") + + # Ensure proper data types + df[datetime_col] = pd.to_datetime(df[datetime_col]) + df[target_col] = pd.to_numeric(df[target_col]) + + # Sort by datetime + df = df.sort_values(datetime_col).reset_index(drop=True) + + logger.info(f"Ingested {len(df)} rows of data") + logger.info( + f"Date range: {df[datetime_col].min()} to {df[datetime_col].max()}" + ) + logger.info( + f"Target stats: mean={df[target_col].mean():.2f}, std={df[target_col].std():.2f}" + ) + + return df diff --git a/floracast/steps/preprocess.py b/floracast/steps/preprocess.py new file mode 100644 index 00000000..10e2fe30 --- /dev/null +++ b/floracast/steps/preprocess.py @@ -0,0 +1,159 @@ +"""Data preprocessing steps for FloraCast.""" + +from typing import Annotated, Tuple + +import numpy as np +import pandas as pd +from darts import TimeSeries +from darts.dataprocessing.transformers import Scaler +from materializers.timeseries_materializer import DartsTimeSeriesMaterializer +from zenml import step +from zenml.logger import get_logger + +logger = get_logger(__name__) + + +@step( + output_materializers={ + "train_series": DartsTimeSeriesMaterializer, + "val_series": DartsTimeSeriesMaterializer, + } +) +def preprocess_data( + df: pd.DataFrame, + datetime_col: str = "ds", + target_col: str = "y", + freq: str = "D", + val_ratio: float = 0.2, +) -> Tuple[ + Annotated[TimeSeries, "train_series"], + Annotated[TimeSeries, "val_series"], + Annotated[Scaler, "fitted_scaler"], +]: + """Preprocess data for training - splits into train/val sets. + + Args: + df: Raw DataFrame with datetime and target columns + datetime_col: Name of datetime column + target_col: Name of target column + freq: Frequency string for time series + val_ratio: Ratio of data to use for validation + + Returns: + Tuple of (train_series, val_series, fitted_scaler) + """ + logger.info( + "๐ŸŽฏ TRAINING MODE: Converting to Darts TimeSeries and splitting data" + ) + logger.info(f"Converting to TimeSeries with frequency: {freq}") + + # Create Darts TimeSeries (keeping it simple - just target variable) + series = TimeSeries.from_dataframe( + df, time_col=datetime_col, value_cols=target_col, freq=freq + ) + + logger.info(f"Created TimeSeries with {len(series)} points") + logger.info(f"Series range: {series.start_time()} to {series.end_time()}") + + # Split into train and validation BEFORE scaling + split_point = int(len(series) * (1 - val_ratio)) + train_series_raw = series[:split_point] + val_series_raw = series[split_point:] + + # Apply normalization using training data statistics + scaler = Scaler() + logger.info( + "Fitting scaler on training data and applying to both train/val" + ) + + # Fit scaler only on training data to prevent data leakage + train_series = scaler.fit_transform(train_series_raw) + val_series = scaler.transform(val_series_raw) + + # Cast to float32 for hardware compatibility (MPS, mixed precision training) + logger.info("Converting TimeSeries to float32 for hardware compatibility") + train_series = train_series.astype(np.float32) + val_series = val_series.astype(np.float32) + + # Check for NaN/inf values in scaled data + train_values = train_series.pd_dataframe().values + val_values = val_series.pd_dataframe().values + + train_nan_count = np.isnan(train_values).sum() + train_inf_count = np.isinf(train_values).sum() + val_nan_count = np.isnan(val_values).sum() + val_inf_count = np.isinf(val_values).sum() + + logger.info( + f"Data quality check - Train NaN: {train_nan_count}, Train Inf: {train_inf_count}" + ) + logger.info( + f"Data quality check - Val NaN: {val_nan_count}, Val Inf: {val_inf_count}" + ) + + # Check for extreme values that could cause numerical instability + train_min, train_max = train_values.min(), train_values.max() + val_min, val_max = val_values.min(), val_values.max() + logger.info( + f"Value ranges - Train: [{train_min:.6f}, {train_max:.6f}], Val: [{val_min:.6f}, {val_max:.6f}]" + ) + + # Flag potentially problematic values + needs_cleaning = ( + train_nan_count > 0 + or train_inf_count > 0 + or val_nan_count > 0 + or val_inf_count > 0 + or abs(train_min) > 1e6 + or abs(train_max) > 1e6 + or abs(val_min) > 1e6 + or abs(val_max) > 1e6 + ) + + if needs_cleaning: + logger.warning( + "Found potentially problematic values in scaled data - cleaning..." + ) + + # Replace NaN/Inf and clip extreme values + train_df = train_series.pd_dataframe() + val_df = val_series.pd_dataframe() + + # Handle NaN/Inf + train_df = train_df.replace([np.inf, -np.inf], np.nan) + val_df = val_df.replace([np.inf, -np.inf], np.nan) + + train_df = train_df.fillna(0.0) + val_df = val_df.fillna(0.0) + + # Clip extreme values to reasonable range + train_df = train_df.clip(-10.0, 10.0) + val_df = val_df.clip(-10.0, 10.0) + + train_series = TimeSeries.from_dataframe(train_df).astype(np.float32) + val_series = TimeSeries.from_dataframe(val_df).astype(np.float32) + + logger.info( + "Cleaned data - replaced NaN/Inf and clipped to [-10, 10] range" + ) + else: + logger.info("Data quality check passed - no problematic values found") + + # Return fitted scaler as artifact for inference use + logger.info("Returning fitted scaler as artifact for inference use") + + # Log scaling statistics + train_mean = train_series_raw.pd_dataframe().iloc[:, 0].mean() + train_std = train_series_raw.pd_dataframe().iloc[:, 0].std() + logger.info( + f"Scaling stats - Mean: {train_mean:.2f}, Std: {train_std:.2f}" + ) + + logger.info( + f"Train series: {len(train_series)} points ({train_series.start_time()} to {train_series.end_time()})" + ) + logger.info( + f"Validation series: {len(val_series)} points ({val_series.start_time()} to {val_series.end_time()})" + ) + + return train_series, val_series, scaler diff --git a/floracast/steps/promote.py b/floracast/steps/promote.py new file mode 100644 index 00000000..85fa0d78 --- /dev/null +++ b/floracast/steps/promote.py @@ -0,0 +1,129 @@ +""" +Model promotion step using ZenML Model Control Plane. +""" + +from typing import Annotated, Optional + +from zenml import get_step_context, log_metadata, step +from zenml.client import Client +from zenml.enums import ModelStages +from zenml.logger import get_logger + +logger = get_logger(__name__) + + +@step +def promote_model( + current_score: float, + target_stage: str = ModelStages.PRODUCTION, + metric: str = "smape", +) -> Annotated[str, "promotion_status"]: + """ + Promote current model version to a target stage if it outperforms the + model currently in that stage, using artifacts/metadata from the model + context. Also logs decision metadata. + + Args: + target_stage: The stage to promote to (default: production) + metric: The evaluation metric to compare (default: smape) + + Returns: + Status message about the promotion decision. + """ + + client = Client() + + try: + # Use the model from the current step context (pattern from batch_infer) + context_model = get_step_context().model + if not context_model: + raise ValueError( + "No model found in step context. Ensure training associated a model with this pipeline." + ) + + logger.info( + f"Evaluating promotion for model: {context_model.name} (version: {context_model.version})" + ) + + # Use the score passed from the evaluation step + current_score = float(current_score) + + # (Removed optional artifact_uri/model_class collection as unnecessary) + + # Fetch production version score to compare with + prod_score: Optional[float] = None + try: + prod_model_version = client.get_model_version( + model_name_or_id=context_model.name, + model_version_name_or_number_or_id=target_stage, + ) + prod_score_artifact = prod_model_version.get_artifact( + "evaluation_score" + ) + if prod_score_artifact is not None: + prod_score = float(prod_score_artifact.load()) + else: + logger.info( + f"`{target_stage}` model has no evaluation_score artifact; promotion will be skipped." + ) + except (RuntimeError, KeyError): + logger.info( + f"No existing `{target_stage}` model version found. Current version will be promoted by default." + ) + + # Decide promotion + promote = False + reason = "" + if prod_score is None: + promote = True + reason = f"No {target_stage} model to compare against" + else: + if metric.lower() in {"smape", "mae", "rmse", "mse"}: + # Lower is better + promote = current_score < prod_score + reason = "lower_is_better" + else: + # Higher is better for other metrics by convention + promote = current_score > prod_score + reason = "higher_is_better" + + if promote: + context_model.set_stage(stage=target_stage, force=True) + status = f"Promoted model '{context_model.name}' v{context_model.version} to '{target_stage}'." + logger.info(status) + decision = "promoted" + else: + status = ( + f"Skipped promotion to '{target_stage}': current score={current_score:.6f} " + f"not better than {target_stage} score={prod_score:.6f}." + ) + logger.info(status) + decision = "skipped" + + # Log decision and comparison metadata + log_metadata( + { + "decision": decision, + "reason": reason, + "target_stage": str(target_stage), + "metric": metric, + "current_model_name": context_model.name, + "current_model_version": str(context_model.version), + "current_score": float(current_score), + "production_score": None + if prod_score is None + else float(prod_score), + } + ) + + return status + + except Exception as e: + error_status = f"Promotion step failed: {str(e)}" + logger.error(error_status) + # Best-effort metadata for failure diagnostics + try: + log_metadata({"decision": "error", "error": str(e)}) + except Exception: + pass + return error_status diff --git a/floracast/steps/train.py b/floracast/steps/train.py new file mode 100644 index 00000000..1ab8cbba --- /dev/null +++ b/floracast/steps/train.py @@ -0,0 +1,127 @@ +""" +Model training step for FloraCast. +""" + +from typing import Annotated + +import torch +from darts import TimeSeries +from darts.models import TFTModel +from materializers.tft_materializer import ( + TFTModelMaterializer, +) # Import for explicit usage +from zenml import step +from zenml.logger import get_logger + +logger = get_logger(__name__) + + +@step(output_materializers={"trained_model": TFTModelMaterializer}) +def train_model( + train_series: TimeSeries, + input_chunk_length: int = 30, + output_chunk_length: int = 7, + hidden_size: int = 32, + lstm_layers: int = 1, + num_attention_heads: int = 2, + dropout: float = 0.1, + batch_size: int = 16, + n_epochs: int = 20, + random_state: int = 42, + add_relative_index: bool = True, + enable_progress_bar: bool = False, + enable_model_summary: bool = False, + learning_rate: float = 1e-3, + weight_decay: float = 1e-5, +) -> Annotated[TFTModel, "trained_model"]: + """Train a TFT forecasting model. + + Args: + train_series: Training time series + input_chunk_length: Number of time steps to use as input + output_chunk_length: Number of time steps to predict + hidden_size: Size of hidden layers + lstm_layers: Number of LSTM layers + num_attention_heads: Number of attention heads + dropout: Dropout rate + batch_size: Training batch size + n_epochs: Number of training epochs + random_state: Random seed + add_relative_index: Whether to add relative index + enable_progress_bar: Whether to show progress bar + enable_model_summary: Whether to show model summary + learning_rate: Learning rate for optimizer + weight_decay: Weight decay for regularization + + Returns: + Trained TFT model + """ + # Build TFT model parameters + model_params = { + "input_chunk_length": input_chunk_length, + "output_chunk_length": output_chunk_length, + "hidden_size": hidden_size, + "lstm_layers": lstm_layers, + "num_attention_heads": num_attention_heads, + "dropout": dropout, + "batch_size": batch_size, + "n_epochs": n_epochs, + "random_state": random_state, + "add_relative_index": add_relative_index, + "optimizer_kwargs": { + "lr": learning_rate, + "weight_decay": weight_decay, + }, + "pl_trainer_kwargs": { + "enable_progress_bar": enable_progress_bar, + "enable_model_summary": enable_model_summary, + "precision": "32-true", # Use 32-bit precision for better hardware compatibility + "gradient_clip_val": 1.0, # Standard gradient clipping + "gradient_clip_algorithm": "norm", # Clip by norm + "detect_anomaly": True, # Detect NaN/inf in loss + "max_epochs": n_epochs, + "check_val_every_n_epoch": 1, # Validate every epoch + "accelerator": "cpu", # Force CPU to avoid MPS issues + }, + } + + logger.info(f"Training TFT model with params: {model_params}") + + # Initialize TFT model + model = TFTModel(**model_params) + + # Initialize model weights with Xavier/Glorot initialization for stability + def init_weights(m): + if isinstance(m, torch.nn.Linear): + torch.nn.init.xavier_uniform_(m.weight) + if m.bias is not None: + torch.nn.init.zeros_(m.bias) + elif isinstance(m, torch.nn.LSTM): + for name, param in m.named_parameters(): + if "weight" in name: + torch.nn.init.xavier_uniform_(param) + elif "bias" in name: + torch.nn.init.zeros_(param) + + # Apply weight initialization + if hasattr(model, "model") and model.model is not None: + model.model.apply(init_weights) + logger.info("Applied Xavier weight initialization to model") + + logger.info(f"Starting TFT training with {len(train_series)} data points") + + # Train the TFT model + # Get basic stats from the pandas dataframe + df_stats = train_series.pd_dataframe().iloc[:, 0] + logger.info( + f"Train series stats: min={df_stats.min():.2f}, max={df_stats.max():.2f}, mean={df_stats.mean():.2f}" + ) + + model.fit(train_series) + logger.info("TFT model training completed successfully") + + # Test prediction to verify model works + test_pred = model.predict(n=1, series=train_series) + logger.info(f"Test prediction: {test_pred.pd_dataframe().iloc[0, 0]:.2f}") + + return model diff --git a/floracast/utils/__init__.py b/floracast/utils/__init__.py new file mode 100644 index 00000000..dac7e5a0 --- /dev/null +++ b/floracast/utils/__init__.py @@ -0,0 +1,5 @@ +"""Utility functions for FloraCast.""" + +from .metrics import smape + +__all__ = ["smape"] diff --git a/floracast/utils/metrics.py b/floracast/utils/metrics.py new file mode 100644 index 00000000..6d0f5aef --- /dev/null +++ b/floracast/utils/metrics.py @@ -0,0 +1,47 @@ +""" +Utility functions for computing forecasting metrics. +""" + +from typing import Union + +import numpy as np +from darts import TimeSeries + + +def smape( + actual: Union[TimeSeries, np.ndarray], + predicted: Union[TimeSeries, np.ndarray], +) -> float: + """ + Calculate Symmetric Mean Absolute Percentage Error (SMAPE). + + Args: + actual: Actual time series values + predicted: Predicted time series values + + Returns: + SMAPE value (lower is better) + """ + # Convert to numpy arrays if TimeSeries + if isinstance(actual, TimeSeries): + actual = actual.values().flatten() + if isinstance(predicted, TimeSeries): + predicted = predicted.values().flatten() + + # Ensure same length + min_len = min(len(actual), len(predicted)) + actual = actual[:min_len] + predicted = predicted[:min_len] + + # Calculate SMAPE with zero-denominator guard + denominator = (np.abs(actual) + np.abs(predicted)) / 2.0 + errors = np.zeros_like(denominator, dtype=float) + non_zero_mask = denominator != 0 + if np.any(non_zero_mask): + errors[non_zero_mask] = ( + np.abs(actual[non_zero_mask] - predicted[non_zero_mask]) + / denominator[non_zero_mask] + ) + smape_value = np.mean(errors) * 100.0 + + return float(smape_value) diff --git a/floracast/utils/prediction.py b/floracast/utils/prediction.py new file mode 100644 index 00000000..eecf45b1 --- /dev/null +++ b/floracast/utils/prediction.py @@ -0,0 +1,71 @@ +"""Utility functions for model prediction.""" + +from darts import TimeSeries +from zenml.logger import get_logger + +logger = get_logger(__name__) + + +def iterative_predict( + model, + series: TimeSeries, + horizon: int, + model_output_chunk_length: int = None, +) -> TimeSeries: + """Generate predictions using iterative multi-step approach for better long-term accuracy. + + Args: + model: Trained forecasting model + series: Input time series data for context + horizon: Total number of time steps to forecast + model_output_chunk_length: Model's output chunk length. If None, tries to get from model + + Returns: + TimeSeries containing all predictions + """ + logger.info(f"Using iterative multi-step prediction for horizon={horizon}") + + # Try to get output_chunk_length from model if not provided + if model_output_chunk_length is None: + if hasattr(model, "output_chunk_length"): + model_output_chunk_length = model.output_chunk_length + else: + logger.warning( + "Could not determine model output_chunk_length, defaulting to 7" + ) + model_output_chunk_length = 7 + + logger.info( + f"Using model output_chunk_length: {model_output_chunk_length}" + ) + + # Use multiple prediction steps for better long-term accuracy + predictions_list = [] + context_series = series + + # Predict in chunks of output_chunk_length + remaining_steps = horizon + while remaining_steps > 0: + chunk_size = min(model_output_chunk_length, remaining_steps) + + # Generate prediction chunk + chunk_pred = model.predict(n=chunk_size, series=context_series) + + predictions_list.append(chunk_pred) + + # Extend context with the prediction for next iteration + context_series = context_series.concatenate(chunk_pred) + remaining_steps -= chunk_size + + # Combine all predictions + if len(predictions_list) == 1: + predictions = predictions_list[0] + else: + predictions = predictions_list[0] + for pred_chunk in predictions_list[1:]: + predictions = predictions.concatenate(pred_chunk) + + logger.info( + f"Generated {len(predictions)} predictions using multi-step approach" + ) + return predictions diff --git a/magic-photobooth/frontend.py b/magic-photobooth/frontend.py index be2d7d9a..e47ed529 100644 --- a/magic-photobooth/frontend.py +++ b/magic-photobooth/frontend.py @@ -179,9 +179,7 @@ def inference_mode(): return # Model selection - value used in later operations - selected_model = st.selectbox( - "Choose a trained model", st.session_state.trained_models - ) + _ = st.selectbox("Choose a trained model", st.session_state.trained_models) selected_prompt = st.selectbox("Choose a prompt", paris_prompts) custom_prompt = st.text_input("Or enter your own prompt")