-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathfirst_example.py
More file actions
122 lines (100 loc) · 3.64 KB
/
first_example.py
File metadata and controls
122 lines (100 loc) · 3.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import os
import sys
import ctypes
from dataclasses import dataclass
JRTC_APP_PATH = os.environ.get("JRTC_APP_PATH")
if JRTC_APP_PATH is None:
raise ValueError("JRTC_APP_PATH not set")
sys.path.append(f"{JRTC_APP_PATH}")
import jrtc_app
from jrtc_app import *
generated_data = sys.modules.get("generated_data")
simple_input = sys.modules.get("simple_input")
from simple_input import simple_input
from jrtc_bindings import (
struct_jrtc_router_data_entry,
)
##########################################################################
# Define the state variables for the application
@dataclass
class AppStateVars:
app: JrtcApp
agg_cnt: int
received_counter: int
##########################################################################
def app_handler(
timeout: bool,
stream_idx: int,
data_entry: struct_jrtc_router_data_entry,
state: AppStateVars,
):
GENERATOR_OUT_STREAM_IDX = 0
SIMPLE_INPUT_IN_STREAM_IDX = 1
if timeout:
# Timeout processing (not implemented in this example)
return
if stream_idx == GENERATOR_OUT_STREAM_IDX:
state.received_counter += 1
data_ptr = ctypes.cast(data_entry.data, ctypes.POINTER(ctypes.c_char))
raw_data = ctypes.string_at(data_ptr, ctypes.sizeof(ctypes.c_char) * 4)
value = int.from_bytes(raw_data, byteorder="little")
state.agg_cnt += value # For now, just increment as an example
if state.received_counter % 5 == 0 and state.received_counter > 0:
aggregate_counter = simple_input()
aggregate_counter.aggregate_counter = state.agg_cnt
data_to_send = bytes(aggregate_counter)
# send the data
res = jrtc_app_router_channel_send_input_msg(
state.app, SIMPLE_INPUT_IN_STREAM_IDX, data_to_send, len(data_to_send)
)
assert res == 0, "Failed to send aggregate counter to input map"
print(
f"FirstExample: Aggregate counter so far is: {state.agg_cnt}",
flush=True,
)
##########################################################################
# Main function to start the app (converted from jrtc_start_app)
def jrtc_start_app(capsule):
print("Starting FirstExample app...", flush=True)
streams = [
# GENERATOR_OUT_STREAM_IDX
JrtcStreamCfg_t(
JrtcStreamIdCfg_t(
JRTC_ROUTER_REQ_DEST_ANY,
JRTC_ROUTER_REQ_DEVICE_ID_ANY,
b"FirstExample://jbpf_agent/data_generator_codeletset/codelet",
b"ringbuf",
),
True, # is_rx
None, # No AppChannelCfg
),
# SIMPLE_INPUT_IN_STREAM_IDX
JrtcStreamCfg_t(
JrtcStreamIdCfg_t(
JRTC_ROUTER_REQ_DEST_NONE,
1,
b"FirstExample://jbpf_agent/simple_input_codeletset/codelet",
b"input_map",
),
False, # is_rx
None, # No AppChannelCfg
),
]
app_cfg = JrtcAppCfg_t(
b"FirstExample", # context
100, # q_size
len(streams), # num_streams
(JrtcStreamCfg_t * len(streams))(*streams), # streams
10.0, # initialization_timeout_secs
0.1, # sleep_timeout_secs
1.0, # inactivity_timeout_secs
)
# Initialize the app
state = AppStateVars(agg_cnt=0, received_counter=0, app=None)
state.app = jrtc_app_create(capsule, app_cfg, app_handler, state, log_level="INFO")
# run the app - This is blocking until the app exists
jrtc_app_run(state.app)
# clean up app resources
jrtc_app_destroy(state.app)