Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 26 additions & 47 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,71 +61,50 @@ The webserver provides an intuitive interface with four main pages:

## Archive Data

The database can be initialized with archive CML data using two methods:
On `docker compose up` the `archive_generator` service automatically generates
a 1-day archive at 10-second resolution from the 3-month OpenMRG NetCDF file
and the `archive_loader` service bulk-loads it into the database.

### Method 1: CSV Files (Default, Fast)

Pre-generated CSV files included in the repository:
**Defaults** (overridable via environment variables):
- **728 CML sublinks** (364 unique CML IDs) covering Berlin area
- **~1.5M data rows** at 5-minute intervals over 7 days
- **Gzip-compressed** (~7.6 MB total, included in repo)
- **Loads in ~3 seconds** via PostgreSQL COPY

Files are located in `/database/archive_data/` and loaded automatically on first database startup.

### Method 2: Load from NetCDF (For Larger/Higher Resolution Archives)
- **~6.3M data rows** at 10-second intervals over 1 day
- Generates in ~15 s, loads in ~15 s

Load data directly from the full 3-month NetCDF archive with configurable time range:
**NetCDF source file** (`openMRG_cmls_20150827_3months.nc`, ~193 MB) is
gitignored. If not present in `parser/example_data/`, it is downloaded
automatically at startup via `NETCDF_FILE_URL`.

#### Default: 7 Days at 10-Second Resolution (~44M rows, ~5 minutes)
### Configuring the archive

```sh
# Rebuild parser if needed
docker compose build parser

# Start database
docker compose up -d database

# Load last 7 days from NetCDF
docker compose run --rm -e DB_HOST=database parser python /app/parser/parse_netcdf_archive.py
# Longer archive or different resolution via environment variables:
ARCHIVE_DAYS=7 ARCHIVE_INTERVAL_SECONDS=60 docker compose up -d
```

#### Custom Time Range
| Variable | Default | Description |
|---|---|---|
| `ARCHIVE_DAYS` | `1` | Days of history to generate |
| `ARCHIVE_INTERVAL_SECONDS` | `10` | Time step in seconds |
| `NETCDF_FILE_URL` | KIT download link | URL to fetch the NetCDF file if absent |

Use `ARCHIVE_MAX_DAYS` to control how much data to load:
### Reloading archive data

```sh
# Load last 14 days (~88M rows, ~10 minutes)
docker compose run --rm -e DB_HOST=database -e ARCHIVE_MAX_DAYS=14 parser python /app/parser/parse_netcdf_archive.py

# Load full 3 months (~579M rows, ~1 hour)
docker compose run --rm -e DB_HOST=database -e ARCHIVE_MAX_DAYS=0 parser python /app/parser/parse_netcdf_archive.py
docker compose down -v # Remove volumes
docker compose up -d # Regenerate and reload from scratch
```

**Note**: Set `ARCHIVE_MAX_DAYS=0` to disable the time limit and load the entire dataset. Larger datasets require more database memory (recommend at least 4GB RAM for full 3-month archive).

**Features**:
- Auto-downloads 3-month NetCDF file (~209 MB) on first run
- **10-second resolution** (vs 5-minute for CSV method)
- **Automatic timestamp shifting** - data ends at current time
- **Progress reporting** with batch-by-batch status (~155K rows/sec)
- PostgreSQL COPY for maximum performance
- Configurable time window to balance demo realism vs load time

The NetCDF file is downloaded to `parser/example_data/openMRG_cmls_20150827_3months.nc` and gitignored.
### Loading a larger archive directly from NetCDF

### Managing Archive Data
For a full 3-month archive at native 10-second resolution (~579M rows):

To regenerate CSV archive data:
```sh
python mno_data_source_simulator/generate_archive.py
docker compose run --rm -e DB_HOST=database parser \
python /app/parser/parse_netcdf_archive.py
```

To reload archive data (either method):
```sh
docker compose down -v # Remove volumes
docker compose up -d # Restart with fresh database
```
Use `ARCHIVE_MAX_DAYS` to limit the time window (default: 7 days,
`0` = no limit). Requires at least 4 GB RAM for the full dataset.

## Storage Backend

Expand Down
11 changes: 7 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
services:
archive_generator:
build: ./mno_data_source_simulator
command: python generate_archive.py --days ${ARCHIVE_DAYS:-30} --interval-seconds ${ARCHIVE_INTERVAL_SECONDS:-300}
command: python generate_archive.py --days ${ARCHIVE_DAYS:-1} --interval-seconds ${ARCHIVE_INTERVAL_SECONDS:-10}
environment:
- ARCHIVE_OUTPUT_DIR=/archive_output
- NETCDF_FILE=/app/example_data/openMRG_cmls_20150827_12hours.nc
- NETCDF_FILE=/app/example_data/openMRG_cmls_20150827_3months.nc
- NETCDF_FILE_URL=https://bwsyncandshare.kit.edu/s/jSAFftGXcJjQbSJ/download
volumes:
- ./parser/example_data:/app/example_data:ro
- ./parser/example_data:/app/example_data # writable so the file can be downloaded
- archive_data:/archive_output

mno_simulator:
build: ./mno_data_source_simulator
depends_on:
- sftp_receiver
volumes:
- ./parser/example_data:/app/example_data:ro
- ./parser/example_data:/app/example_data # writable so the file can be downloaded
- mno_data_to_upload:/app/data_to_upload
- mno_data_uploaded:/app/data_uploaded
- ./ssh_keys:/app/ssh_keys:ro
environment:
- NETCDF_FILE=/app/example_data/openMRG_cmls_20150827_3months.nc
- NETCDF_FILE_URL=https://bwsyncandshare.kit.edu/s/jSAFftGXcJjQbSJ/download
- SFTP_HOST=sftp_receiver
- SFTP_PORT=22
- SFTP_USERNAME=cml_user
Expand Down
17 changes: 13 additions & 4 deletions mno_data_source_simulator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,30 @@ python main.py

## Configuration

Edit `config.yml`:
Edit `config.yml` for local/standalone use, or set environment variables for Docker deployment:

```yaml
data_source:
loop_duration_seconds: 3600 # How fast to replay historical data
netcdf_file: "/app/example_data/openMRG_cmls_20150827_3months.nc"
loop_duration_seconds: 3600 # Lookback window for real-time replay

generator:
generation_frequency_seconds: 60 # How often to generate files
generation_frequency_seconds: 10 # How often to generate files

sftp:
enabled: true
upload_frequency_seconds: 60 # How often to upload
upload_frequency_seconds: 10 # How often to upload
private_key_path: "/path/to/ssh/key" # Recommended
known_hosts_path: "/path/to/known_hosts" # For host verification
```

**Key environment variables** (override config.yml in Docker):

| Variable | Description |
|---|---|
| `NETCDF_FILE` | Path to the NetCDF source file |
| `NETCDF_FILE_URL` | URL to download the file if not already present |

### Authentication

**SSH Key Authentication (Recommended):**
Expand Down
7 changes: 5 additions & 2 deletions mno_data_source_simulator/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

# Data source configuration
data_source:
# Path to the NetCDF file with CML data
netcdf_file: "/app/example_data/openMRG_cmls_20150827_12hours.nc"
# Path to the NetCDF file with CML data.
# The 3-month / 10-second-resolution file is used for production; it is
# downloaded automatically at startup if NETCDF_FILE_URL is set.
# Override via the NETCDF_FILE environment variable.
netcdf_file: "/app/example_data/openMRG_cmls_20150827_3months.nc"
# Loop duration in seconds (1 hour = 3600 seconds)
loop_duration_seconds: 3600

Expand Down
52 changes: 51 additions & 1 deletion mno_data_source_simulator/data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
by altering timestamps and looping through the existing data.
"""

import urllib.request
import urllib.error
import xarray as xr
import pandas as pd
import numpy as np
Expand All @@ -15,6 +17,49 @@
logger = logging.getLogger(__name__)


def ensure_netcdf_file(path: Path, url: str | None) -> None:
"""Download the NetCDF file from *url* if *path* does not exist yet.

Downloads via a temp file so an interrupted transfer never leaves a
truncated file behind. Does nothing if the file already exists or if no
URL is provided.
"""
if path.exists():
logger.info(f"NetCDF file found: {path}")
return
if not url:
return # caller's existence check will log the error
logger.info(f"NetCDF file not found at {path}")
logger.info(f"Downloading from: {url}")
path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = path.with_suffix(".nc.download")
try:
with urllib.request.urlopen(url) as response, open(tmp_path, "wb") as out:
total_raw = response.headers.get("Content-Length")
total = int(total_raw) if total_raw else None
downloaded = 0
block_size = 8 * 1024 * 1024 # 8 MB chunks
while True:
block = response.read(block_size)
if not block:
break
out.write(block)
downloaded += len(block)
if total:
pct = downloaded / total * 100
logger.info(
f" {pct:.0f}% ({downloaded / 1e6:.0f} / {total / 1e6:.0f} MB)"
)
else:
logger.info(f" {downloaded / 1e6:.0f} MB downloaded")
tmp_path.rename(path)
logger.info(f"Download complete: {path} ({path.stat().st_size / 1e6:.1f} MB)")
except Exception as exc:
tmp_path.unlink(missing_ok=True)
logger.error(f"Download failed: {exc}")
raise


class CMLDataGenerator:
"""Generate fake real-time CML data from historical NetCDF files."""

Expand Down Expand Up @@ -100,7 +145,12 @@ def _get_netcdf_index_for_timestamp(self, timestamp: pd.Timestamp) -> int:
).total_seconds()

if original_duration > 0:
time_fraction = loop_position / self.loop_duration_seconds
# Cycle through the source data at its native pace rather than
# stretching/compressing it to fill loop_duration_seconds. This
# avoids long plateaus of identical values followed by sudden jumps
# when the archive period is much longer than the source file.
position_in_original = loop_position % original_duration
time_fraction = position_in_original / original_duration
original_index = int(time_fraction * (len(self.original_time_points) - 1))
else:
original_index = 0
Expand Down
Loading