Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/controller/jrtc.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ using atomic_bool = std::atomic<bool>;
*/
#define MAX_APP_NAME_SIZE 16
#define MAX_APP_PARAMS 255
#define MAX_DEVICE_MAPPING 255
#define MAX_APP_MODULES 255

typedef pthread_t app_id_t;

typedef struct _app_param_key_value_pair
typedef struct _key_value_pair
{
char* key;
char* val;
} app_param_key_value_pair_t;
} key_value_pair_t;

/**
* @brief The jrtc_app_env struct
Expand Down Expand Up @@ -60,7 +61,8 @@ struct jrtc_app_env
atomic_bool app_exit;
jrtc_sched_config_t sched_config;
char* app_path;
app_param_key_value_pair_t params[MAX_APP_PARAMS];
key_value_pair_t params[MAX_APP_PARAMS];
key_value_pair_t device_mapping[MAX_DEVICE_MAPPING];
char* app_modules[MAX_APP_MODULES];
void* shared_python_state; // Pointer to shared Python state for multi-threaded apps
};
Expand Down
76 changes: 60 additions & 16 deletions src/controller/jrtc_int.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,29 +239,58 @@ _jrtc_release_app_id(int app_id)
}

bool
_is_app_loaded(load_app_request_t load_req)
_is_app_loaded(load_app_request_t* load_req)
{
if (load_req.app_type == NULL) {
jrtc_logger(JRTC_INFO, "App type is NULL for app %s\n", load_req.app_name);
if ((load_req == NULL) || (load_req->app_type == NULL)) {
jrtc_logger(JRTC_INFO, "App type is NULL for app %s\n", load_req->app_name);
return false;
}
// if it is python app type
if ((load_req.app_type != NULL) && (strcmp(load_req.app_type, "python") == 0)) {
if ((load_req->app_type != NULL) && (strcmp(load_req->app_type, "python") == 0)) {
jrtc_logger(JRTC_DEBUG, "Checking if Python app %s is already loaded\n", load_req->app_name);
// check the app_envs[].params[0]
for (int i = 0; i < MAX_NUM_JRTC_APPS; i++) {
if (app_envs[i] != NULL && app_envs[i]->params[0].key != NULL &&
strcmp(app_envs[i]->params[0].key, load_req.params[0].key) == 0 && app_envs[i]->params[0].val != NULL &&
strcmp(app_envs[i]->params[0].val, load_req.params[0].val) == 0) {
return true;
if (load_req->params[0].key != NULL && load_req->params[0].val != NULL) {
for (int i = 0; i < MAX_NUM_JRTC_APPS; i++) {
if (app_envs[i] != NULL && app_envs[i]->params[0].key != NULL &&
strcmp(app_envs[i]->params[0].key, load_req->params[0].key) == 0 &&
app_envs[i]->params[0].val != NULL &&
strcmp(app_envs[i]->params[0].val, load_req->params[0].val) == 0) {
jrtc_logger(
JRTC_INFO,
"Python App %s with params %s:%s is already loaded\n",
load_req->app_name,
load_req->params[0].key,
load_req->params[0].val);
return true;
}
}
} else {
jrtc_logger(
JRTC_WARN,
"Python App %s has no params set, cannot check if it is already loaded\n",
load_req->app_name);
}
} else {
// check if the app loaded by app_path
for (int i = 0; i < MAX_NUM_JRTC_APPS; i++) {
if ((app_envs[i] != NULL) && (app_envs[i]->app_path != NULL) &&
(strcmp(app_envs[i]->app_path, load_req.app_path) == 0)) {
return true;
jrtc_logger(
JRTC_DEBUG,
"Checking if app %s with app_path %s is already loaded\n",
load_req->app_name,
load_req->app_path);
if (load_req->app_path != NULL) {
for (int i = 0; i < MAX_NUM_JRTC_APPS; i++) {
if (app_envs[i] != NULL && app_envs[i]->app_path != NULL &&
strcmp(app_envs[i]->app_path, load_req->app_path) == 0) {
jrtc_logger(
JRTC_INFO,
"App %s with app_path %s is already loaded\n",
load_req->app_name,
load_req->app_path);
return true;
}
}
} else {
jrtc_logger(
JRTC_WARN, "App %s has no app_path set, cannot check if it is already loaded\n", load_req->app_name);
}
}
return false;
Expand All @@ -280,7 +309,7 @@ load_app(load_app_request_t load_req)

// check if the app is already loaded
jrtc_logger(JRTC_DEBUG, "Checking if app %s is already loaded\n", load_req.app_name);
if (_is_app_loaded(load_req)) {
if (_is_app_loaded(&load_req)) {
jrtc_logger(
JRTC_WARN,
"App %s: %s is already loaded\n",
Expand Down Expand Up @@ -309,8 +338,10 @@ load_app(load_app_request_t load_req)

app_env->app_handle = app_handle;
app_env->app_exit = false;
app_env->app_path = strdup(load_req.app_path ? load_req.app_path : "unknown");
app_env->io_queue_size = load_req.ioq_size;
memset(app_env->params, 0, sizeof(app_env->params));
memset(app_env->device_mapping, 0, sizeof(app_env->device_mapping));

for (int i = 0; i < MAX_APP_PARAMS; i++) {
if (load_req.params[i].key != NULL) {
Expand All @@ -321,6 +352,15 @@ load_app(load_app_request_t load_req)
}
}

for (int i = 0; i < MAX_DEVICE_MAPPING; i++) {
if (load_req.device_mapping[i].key != NULL) {
app_env->device_mapping[i].key = strdup(load_req.device_mapping[i].key);
}
if (load_req.device_mapping[i].val != NULL) {
app_env->device_mapping[i].val = strdup(load_req.device_mapping[i].val);
}
}

for (int i = 0; i < MAX_APP_MODULES; i++) {
if (load_req.app_modules[i] != NULL) {
app_env->app_modules[i] = strdup(load_req.app_modules[i]);
Expand Down Expand Up @@ -377,6 +417,7 @@ unload_app(int app_id)
jrtc_router_deregister_app(env->dapp_ctx);
jrtc_logger(JRTC_INFO, "App %s shut down\n", env->app_name);
free(env->app_name);
free(env->app_path);
_jrtc_release_app_id(app_id);
return 0;
}
Expand Down Expand Up @@ -407,14 +448,16 @@ load_default_north_io_app()
return -1;
}

load_app_request_t load_req_north_io;
load_app_request_t load_req_north_io = {0};
load_req_north_io.app = north_io_data;
load_req_north_io.app_size = north_io_size;
load_req_north_io.ioq_size = 1000;

load_req_north_io.deadline_us = 0;
load_req_north_io.app_name = strdup("north_io_app");
load_req_north_io.app_path = strdup(north_io_app_name);
memset(load_req_north_io.params, 0, sizeof(load_req_north_io.params));
memset(load_req_north_io.device_mapping, 0, sizeof(load_req_north_io.device_mapping));
memset(load_req_north_io.app_modules, 0, sizeof(load_req_north_io.app_modules));

int res = load_app(load_req_north_io);
Expand All @@ -425,6 +468,7 @@ load_default_north_io_app()
}

free(load_req_north_io.app_name);
free(load_req_north_io.app_path);
free(north_io_app_name);
free(north_io_data);
return res;
Expand Down
8 changes: 8 additions & 0 deletions src/pythonapp_loader/jrtc_pythonapp_loader.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,14 @@ jrtc_start_app(void* args)
char* full_path = env_ctx->params[0].val;
printf_and_flush("Python App Full path: %s\n", full_path);

for (int i = 0; i < MAX_DEVICE_MAPPING; i++) {
if (env_ctx->device_mapping[i].key == NULL) {
break;
}
printf_and_flush(
"Device Mapping %d: %s = %s\n", i, env_ctx->device_mapping[i].key, env_ctx->device_mapping[i].val);
}

// Initialize Python interpreter once and create a new sub-interpreter per thread
pthread_mutex_lock(&shared_python_state->python_lock);
if (!shared_python_state->initialized) {
Expand Down
3 changes: 3 additions & 0 deletions src/rest_api_lib/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@
"app_params": {
"type": "object"
},
"device_mapping": {
"type": "object"
},
"app_modules": {
"type": "array",
"items": {
Expand Down
3 changes: 2 additions & 1 deletion src/rest_api_lib/src/jrtc_rest_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ typedef struct load_app_request
uint32_t ioq_size;
char* app_path;
char* app_type;
app_param_key_value_pair_t params[MAX_APP_PARAMS];
key_value_pair_t params[MAX_APP_PARAMS];
key_value_pair_t device_mapping[MAX_DEVICE_MAPPING];
char* app_modules[MAX_APP_MODULES];
} load_app_request_t;

Expand Down
23 changes: 19 additions & 4 deletions src/rest_api_lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use utoipa_swagger_ui::SwaggerUi;
use std::collections::HashMap;

#[repr(C)]
pub struct AppParamKeyValuePair {
pub struct KeyValuePair {
pub key: *mut i8,
pub val: *mut i8,
}
Expand All @@ -41,7 +41,8 @@ pub struct LoadAppRequest {
pub ioq_size: u32,
pub app_path: *mut c_char,
pub app_type: *mut c_char,
pub app_params: [AppParamKeyValuePair; 255], // Fixed-size array
pub app_params: [KeyValuePair; 255], // Fixed-size array
pub device_mapping: [KeyValuePair; 255], // Fixed-size array
pub app_modules: [*mut c_char; 255], // Fixed-size array
}

Expand Down Expand Up @@ -73,6 +74,7 @@ struct JrtcAppLoadRequest {
app_path: String,
app_type: String,
app_params: HashMap<String, String>,
device_mapping: HashMap<String, String>,
app_modules: Vec<String>,
}

Expand Down Expand Up @@ -184,6 +186,7 @@ async fn load_app(
let app_path = payload_cloned.app_path;
let app_type = payload_cloned.app_type;
let app_params = payload_cloned.app_params;
let device_mapping = payload_cloned.device_mapping;
let app_modules = payload_cloned.app_modules;

let c_app_name = match CString::new(app_name.clone()) {
Expand Down Expand Up @@ -228,7 +231,8 @@ async fn load_app(
}
};

let mut c_app_params: [AppParamKeyValuePair; 255] = unsafe { std::mem::zeroed() }; // Initialize
let mut c_app_params: [KeyValuePair; 255] = unsafe { std::mem::zeroed() }; // Initialize
let mut c_device_mapping: [KeyValuePair; 255] = unsafe { std::mem::zeroed() }; // Initialize

for (i, (key, value)) in app_params.iter().enumerate() {
if i >= 255 {
Expand All @@ -237,7 +241,17 @@ async fn load_app(
let c_key = CString::new(key.clone()).unwrap().into_raw();
let c_val = CString::new(value.clone()).unwrap().into_raw();

c_app_params[i] = AppParamKeyValuePair { key: c_key, val: c_val };
c_app_params[i] = KeyValuePair { key: c_key, val: c_val };
}

for (i, (key, value)) in device_mapping.iter().enumerate() {
if i >= 255 {
break; // Prevent overflow
}
let c_key = CString::new(key.clone()).unwrap().into_raw();
let c_val = CString::new(value.clone()).unwrap().into_raw();

c_device_mapping[i] = KeyValuePair { key: c_key, val: c_val };
}

let mut c_app_modules: [*mut c_char; 255] = unsafe { std::mem::zeroed() }; // Initialize
Expand All @@ -260,6 +274,7 @@ async fn load_app(
app_path: c_app_path.into_raw(),
app_type: c_app_type.into_raw(),
app_params: c_app_params,
device_mapping: c_device_mapping,
app_modules: c_app_modules,
};

Expand Down
17 changes: 15 additions & 2 deletions src/wrapper_apis/python/jrtc_wrapper_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ def register_dll(path):
raise ImportError(f"Failed to load {path} {e}")
return None

class KeyValuePair(ctypes.Structure):
_fields_ = [
("key", ctypes.c_char_p),
("value", ctypes.c_char_p),
]

MAX_APP_PARAMS = 255
MAX_DEVICE_MAPPING = 255
MAX_APP_MODULES = 255

class JrtcAppEnv(ctypes.Structure):
_fields_ = [
("app_name", ctypes.c_char_p), # Define as c_char_p
Expand All @@ -33,8 +43,11 @@ class JrtcAppEnv(ctypes.Structure):
("io_queue_size", ctypes.c_uint),
("app_exit", ctypes.c_int),
("sched_config", jrtc_bindings.struct_jrtc_sched_config),
("app_params",ctypes.c_char_p * 255),
("app_modules", ctypes.c_char_p * 255),
("app_path", ctypes.c_char_p),
("params", KeyValuePair * MAX_APP_PARAMS),
("device_mapping", KeyValuePair * MAX_DEVICE_MAPPING),
("app_modules", ctypes.c_char_p * MAX_APP_MODULES),
("shared_python_state", ctypes.c_void_p),
]

def get_ctx_from_capsule(capsule):
Expand Down
1 change: 1 addition & 0 deletions test_apps/first_example_py/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ app:
path: ${JRTC_PATH}/test_apps/first_example_py/first_example.py
type: python
port: 3001
deadline_us: 12345
modules:
- ${JRTC_PATH}/test_apps/jbpf_codelets/data_generator/generated_data.py
- ${JRTC_PATH}/test_apps/jbpf_codelets/simple_input/simple_input.py
Expand Down
23 changes: 23 additions & 0 deletions test_apps/first_example_py/first_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
raise ValueError("JRTC_APP_PATH not set")
sys.path.append(f"{JRTC_APP_PATH}")

from jrtc_wrapper_utils import get_ctx_from_capsule
import jrtc_app
from jrtc_app import *

Expand Down Expand Up @@ -76,6 +77,28 @@ def app_handler(
# Main function to start the app (converted from jrtc_start_app)
def jrtc_start_app(capsule):
print("Starting FirstExample app...", flush=True)
env_ctx = get_ctx_from_capsule(capsule)
if not env_ctx:
raise ValueError("Failed to retrieve JrtcAppEnv from capsule")

## Extract necessary fields from env_ctx and make assertions
app_name = env_ctx.app_name.decode("utf-8")
io_queue_size = env_ctx.io_queue_size
app_path = env_ctx.app_path.decode("utf-8")
app_modules = env_ctx.app_modules
app_params = env_ctx.params
device_mapping = env_ctx.device_mapping

## assertion
assert app_name == "app1", f"Unexpected app name: {app_name}"
assert io_queue_size == 1000, f"Unexpected IO queue size: {io_queue_size}"
assert app_path.split("/")[-1] == "libjrtc_pythonapp_loader.so", f"Unexpected app path: {app_path}"
assert app_modules[0].decode("utf-8").split('/')[-1] == "generated_data.py", f"Unexpected first module: {app_modules[0]}"
assert app_modules[1].decode("utf-8").split('/')[-1] == "simple_input.py", f"Unexpected second module: {app_modules[1]}"
assert app_params[0].key.decode("utf-8") == "python", f"Unexpected app parameter key: {app_params[0].key}"
assert app_params[0].value.decode("utf-8").split('/')[-1] == "first_example.py", f"Unexpected app parameter value: {app_params[0].value}"
assert device_mapping[0].key.decode("utf-8") == "1", f"Unexpected device mapping key: {device_mapping[0].key}"
assert device_mapping[0].value.decode("utf-8") == "127.0.0.1:8080", f"Unexpected device mapping value: {device_mapping[0].value}"

streams = [
# GENERATOR_OUT_STREAM_IDX
Expand Down
3 changes: 2 additions & 1 deletion tools/jrtc-ctl/cmd/app/load/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type runOptions struct {
sharedLibraryPath string
appType string
appParams map[string]interface{}
deviceMapping map[string]interface{}
appModules []string
}

Expand Down Expand Up @@ -116,7 +117,7 @@ func run(cmd *cobra.Command, opts *runOptions) error {
return err
}

req, err := jrtc.NewJrtcAppLoadRequest(opts.sharedLibraryPath, opts.appName, opts.ioqSize, opts.deadline, opts.period, opts.runtime, opts.appType, &opts.appModules, &opts.appParams)
req, err := jrtc.NewJrtcAppLoadRequest(opts.sharedLibraryPath, opts.appName, opts.ioqSize, opts.deadline, opts.period, opts.runtime, opts.appType, &opts.appModules, &opts.appParams, &opts.deviceMapping)
if err != nil {
return err
}
Expand Down
Loading