Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ There are three types of data available via ORD.
5. Public Meteogate API available via [meteo gateway](https://api.meteogate.eu/ord/edr)
6. [ORD S3 24h cache](https://s3.waw3-1.cloudferro.com/openradar-24h/)
7. [ORD S3 Archive](https://s3.waw3-1.cloudferro.com/openradar-archive/) TBD
8. [Python real-time download script example](examples/ord_dl_example/)



## Contacts
Expand Down
35 changes: 35 additions & 0 deletions examples/ord_dl_example/ORD_Download.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Open Radar Data Download Example

This script connects to the MQTT_BROKER and subscribes to the TOPIC. Then is downloads ODIM files from S3 bucket.

## Installation
1. Create python virtual environment and activate

```bash
python3 -m /path_to/ord-venv
source /path_to/ord-venv/bin/activate
```

2. Install requirements
```bash
pip install --upgrade pip
pip install -r ./requirements.txt
```

3. Set environment variables(optional)

| Name | Description | Default value |
|---------------------|------------------------------------|-----------------------|
| ODIM_DL_DIR | Download directory for ODIM files | ./odim_files |
| MQTT_BROKER | MQTT Broker address | radar.meteogate.eu |
| MQTT_PORT | MQTT Port | 1883 |
| S3_BUCKET_NAME | S3 bucker name | openradar-24h |
| S3_ENDPOINT_URL | S3 endpoint url | https://s3.waw3-1.cloudferro.com/ |
| TOPIC | Topic to subscribe | # |


## Usage
```bash
[ord-venv] python3 ord_dl_example.py
```

162 changes: 162 additions & 0 deletions examples/ord_dl_example/ord_dl_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#
# Open Radar Data Example
#
# istvans@met.no
#
# This script connects to the MQTT_BROKER and subscribes to the TOPIC
# Then is downloads ODIM files when
# the link is started by S3_ENDPOINT_URL + S3_BUCKET_NAME.


import boto3
import json
import os
import paho.mqtt.client as mqtt
import time
from botocore import UNSIGNED
from botocore.client import Config

cnt_ok = 0
cnt_fail = 0

# Check old files after dl_max downloads
# Don't delete old files: dl_max = 0
dl_max = 300
# Delete files older than dl_min minutes
dl_min = 2
dl_count = 0
dl_dir = os.getenv("ODIM_DL_DIR", "./odim_files")
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe dl_dir is a constant value right. According to variable name conventions it would make sense to make the name all caps

os.makedirs(dl_dir, exist_ok=True)

# ########################## ENV VALUES ####################################

MQTT_BROKER = os.getenv("MQTT_BROKER", "radar.meteogate.eu")
MQTT_PORT = int(os.getenv("MQTT_PORT", "1883"))

# Examples: eu.eumetnet no.met nl.knmi
TOPIC = os.getenv("ORD_TOPIC","#") # all
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "openradar-24h")
S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL", "https://s3.waw3-1.cloudferro.com/")

# ########################## S3 BUCKET #####################################

if S3_ENDPOINT_URL[-1] != "/":
S3_ENDPOINT_URL += "/"
print(S3_ENDPOINT_URL)
s3_url = S3_ENDPOINT_URL + S3_BUCKET_NAME
s3_url_len = len(s3_url)

s3_client = boto3.client(
"s3",
endpoint_url=S3_ENDPOINT_URL,
config=Config(signature_version=UNSIGNED)
)

prefix = ""
# Check S3 bucket
try:
response = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME, Prefix=prefix)
# for obj in response.get('Contents', []):
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove the commented out lines of code? I think that as an example it needs to be as clean as possible.

# print(f"Object Key: {obj['Key']}")

except Exception as e:
print(f"Error listing objects: {e}")
exit(1)


# ####################### DELETE OLD FILES #################################

def delete_old_files(directory, mins):
del_cnt = 0
cur_time = time.time()
cutoff_time = cur_time - mins * 60

print("Deleting files, at {0}".format(time.ctime(cur_time)))

for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)

# Check if it's a file (not a directory)
if os.path.isfile(file_path):
# Get the file's last modification time
file_mtime = os.path.getmtime(file_path)

# Check if the file is older than the cutoff time
if file_mtime < cutoff_time:
try:
# Delete the file
os.remove(file_path)
del_cnt += 1
except Exception as e:
print(f"Error deleting file {file_path}: {e}")

print(f"Deleted {del_cnt} files.")


# ######################## MQTT BROKER ####################################

# Define the callback when the client connects to the broker
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to the broker!")
# Subscribe to the desired topic
client.subscribe(TOPIC) # Replace with your topic
else:
print(f"Failed to connect, return code {rc}")


# Define the callback when a message is received on the subscribed topic
def on_message(client, userdata, msg):
global dl_count
global dl_max
global dl_min
global s3_client
global cnt_ok
global cnt_fail
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The global keyword typically considered bad practice, even though I do see why you used it here.

I would however, remove the entire delete_old_files logic and leave that up to the user. Again since this is supposed to be an example I would make it as concise as possible.

If you delete the entire delete_old_files logic, you would not need to have these global keywords here anymore, as you would not need to be keeping a state any longer.


ord_msg = json.loads(msg.payload.decode())
for link in ord_msg["links"]:
if link["href"][:s3_url_len] == s3_url:
# print("URL: {0}".format(link["href"]))
dl_file = link["href"]
delim = "/"
last_delim = dl_file.rfind(delim)
ingest_file = dl_dir + "/" + dl_file[last_delim+1:]

if os.path.exists(ingest_file):
# print(f"File already downloaded, skip: {ingest_file}")
break

dl_key = dl_file[s3_url_len+1:]

try:
print("Downloading: {0}".format(dl_key), end="")
s3_client.download_file(S3_BUCKET_NAME, dl_key, ingest_file)
print("\tOK")
dl_count += 1

except Exception as e:
print(f"Error downloading file: {e}")

if dl_max > 0 and dl_count >= dl_max:
dl_count = 0
# delete old files, if older 2 minutes
delete_old_files(dl_dir, dl_min)
break


# Create an MQTT client instance
client = mqtt.Client()

# Attach the callbacks
client.on_connect = on_connect
client.on_message = on_message

# Connect to the MQTT broker
broker = MQTT_BROKER
port = MQTT_PORT

client.connect(broker, port)

# Start the network loop to process incoming and outgoing messages
client.loop_forever()
3 changes: 3 additions & 0 deletions examples/ord_dl_example/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
boto3
paho-mqtt