A proof-of-concept platform for managing gigabytes of time series data with reproducible computation graphs (DAGs), visualization capabilities, and a Python SDK.
- Time Series Data Storage: Store and manage large time series datasets using MinIO (S3-compatible) and DuckDB
- Computation Graphs: Build reproducible workflows as DAGs with operation lineage tracking
- Pre-built Operations: FFT, filtering (low-pass, high-pass, band-pass), unit conversion
- Custom Operations: Extend the platform with custom Python functions
- Visualization: Web-based DAG visualization and side-by-side data plots
- Python SDK: High-level abstraction layer for easy interaction without knowing REST APIs
- Containerized: Everything runs in Docker containers for easy deployment
┌─────────────────────────────────────────────────────────────┐
│ Web Client │
│ (DAG Viz + Plot Viz + Workflow Builder) │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ FastAPI Backend │
│ (REST API for workflows & data) │
└─────────────────────────────────────────────────────────────┘
│
┌─────────────────┼─────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ MinIO │ │ PostgreSQL │ │ DuckDB │
│ (Raw Data) │ │ (Metadata) │ │ (Analytics) │
└──────────────┘ └──────────────┘ └──────────────┘
Components:
- MinIO: S3-compatible object storage for raw CSV files
- PostgreSQL: Metadata storage (workflows, nodes, datasets, lineage)
- DuckDB: In-process analytical database with Ibis interface
- Prefect: Workflow orchestration and task management
- FastAPI: REST API backend
- Web Client: Simple HTML/JS interface with Plotly and D3.js
- Docker and Docker Compose
- Python 3.10+ (for local development)
- 2GB+ free disk space
# Build and start all services
docker-compose up --build -d
# Check service status
docker-compose psServices will be available at:
- API: http://localhost:8000
- API Docs: http://localhost:8000/docs
- Web Client: http://localhost:3000
- MinIO Console: http://localhost:9001
# Generate sample time series dataset
python scripts/generate_sample_data.pyThis creates data/sample_timeseries.csv with:
- 10 channels
- 100 seconds at 1kHz (1,000,000 total samples)
- Mix of sine waves, noise, and transient events
# Install dependencies locally (for SDK)
pip install -e .
# Run example workflow
python examples/basic_workflow.pyThis example:
- Uploads the sample dataset
- Creates a workflow with filter → FFT operations
- Executes the workflow
- Retrieves and displays results
from sdk.client import TimeSeriesClient
# Initialize client
client = TimeSeriesClient("http://localhost:8000")
# Upload dataset
dataset_id = client.upload_dataset(
"data/sample_timeseries.csv",
name="My Data"
)
# Create workflow
workflow_id = client.create_workflow("Signal Processing")
# Add operations
filter_node = client.add_operation(
workflow_id,
operation_type="filter",
config={
"filter_type": "lowpass",
"cutoff": 50,
"order": 4
},
dataset_id=dataset_id
)
fft_node = client.add_operation(
workflow_id,
operation_type="fft",
config={"normalize": True}
)
# Connect operations
client.connect_nodes(workflow_id, filter_node, fft_node)
# Execute workflow
result = client.execute_workflow(workflow_id)
# Get results
fft_output = client.get_node_output(fft_node)
plot_data = client.get_node_plot(fft_node, channel_id=0)- Open http://localhost:3000
- Datasets Tab: Upload CSV files
- Workflows Tab: Create and execute workflows
- Visualization Tab: View DAG and plots
Access the interactive API documentation at http://localhost:8000/docs
Example API calls:
# Upload dataset
curl -X POST "http://localhost:8000/api/datasets/upload" \
-F "file=@data/sample_timeseries.csv" \
-F "name=Test Data"
# Create workflow
curl -X POST "http://localhost:8000/api/workflows/" \
-H "Content-Type: application/json" \
-d '{"name": "My Workflow"}'
# Get workflow DAG
curl "http://localhost:8000/api/workflows/{workflow_id}/dag"Converts time-domain signals to frequency domain.
{
"operation_type": "fft",
"config": {
"window": "hann", # Optional: hann, hamming, blackman
"normalize": true # Normalize magnitude
}
}Apply digital filters to time series.
{
"operation_type": "filter",
"config": {
"filter_type": "lowpass", # lowpass, highpass, bandpass
"cutoff": 50, # Cutoff frequency (Hz)
"order": 4 # Filter order
}
}For bandpass filters:
{
"filter_type": "bandpass",
"cutoff": [10, 100], # [low, high] frequencies
"order": 4
}Convert units of time series values.
{
"operation_type": "unit_conversion",
"config": {
"conversion": "celsius_to_fahrenheit"
# Supported: celsius_to_fahrenheit, meters_to_feet,
# mps_to_mph, pa_to_psi, mv_to_v, scale, offset, etc.
}
}Extend the platform with custom Python functions using the Prefect API (see examples).
Input CSV files should have the following structure:
timestamp,channel_id,value
2024-01-01 00:00:00.000,0,1.234
2024-01-01 00:00:00.001,0,1.245
2024-01-01 00:00:00.002,0,1.256
...
- timestamp: ISO format timestamp
- channel_id: Integer channel identifier
- value: Numeric value
.
├── src/
│ ├── api/ # FastAPI application
│ ├── data_layer/ # DuckDB and MinIO clients
│ ├── operations/ # Time series operations
│ ├── workflow/ # Prefect workflow executor
│ ├── models.py # SQLAlchemy models
│ ├── database.py # Database connection
│ └── config.py # Configuration
├── sdk/ # Python SDK
├── web/ # Web client
├── db/ # Database schemas
├── scripts/ # Utility scripts
├── examples/ # Example workflows
├── tests/ # Test suite
├── docker-compose.yml # Container orchestration
└── README.md
# Install dev dependencies
pip install -e .[dev]
# Run tests
pytest tests/
# Run specific test
pytest tests/test_operations.py -v- Create a new operation class in
src/operations/:
from src.operations.base import Operation
class MyOperation(Operation):
@property
def operation_type(self) -> str:
return "my_operation"
def _validate_config(self):
# Validate configuration
pass
def execute(self, data: pd.DataFrame) -> pd.DataFrame:
# Transform data
return transformed_data- Register in
src/operations/__init__.py:
OPERATIONS = {
'my_operation': MyOperation,
# ... existing operations
}# Stop all services
docker-compose down
# Stop and remove volumes (deletes all data)
docker-compose down -v# Check logs
docker-compose logs api
docker-compose logs postgres
docker-compose logs minio
# Restart specific service
docker-compose restart api# Ensure PostgreSQL is healthy
docker-compose ps postgres
# Reinitialize database
docker-compose down -v
docker-compose up -dIf ports 8000, 3000, 5432, or 9000 are in use:
- Edit
docker-compose.yml - Change port mappings (e.g.,
8001:8000) - Update
API_BASEinweb/public/app.js
This is a POC focused on demonstrating features, not production performance:
- DuckDB runs in-process (single file)
- No connection pooling optimization
- Minimal caching
- Synchronous workflow execution
For production use, consider:
- Distributed DuckDB or ClickHouse
- Redis caching layer
- Async workflow execution
- Horizontal scaling with Kubernetes
MIT License - This is a proof of concept for demonstration purposes.
After testing the POC, consider:
- Authentication: Add user authentication and authorization
- Scheduling: Add scheduled workflow execution
- Monitoring: Add metrics and logging (Prometheus, Grafana)
- Scalability: Move to distributed architecture
- UI Enhancement: Build full-featured React application
- Data Validation: Add schema validation and data quality checks
- Versioning: Track dataset and operation versions
For issues or questions:
- Check logs:
docker-compose logs -f - Review API docs: http://localhost:8000/docs
- Inspect database: Connect to PostgreSQL on port 5432
Built with FastAPI, DuckDB, Prefect, and ❤️