diff --git a/Cargo.lock b/Cargo.lock index b521fd2..8b265a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,20 +2,6 @@ # It is not intended for manual editing. version = 4 -[[package]] -name = "ahash" -version = "0.8.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" -dependencies = [ - "cfg-if", - "const-random", - "getrandom 0.3.4", - "once_cell", - "version_check", - "zerocopy", -] - [[package]] name = "aho-corasick" version = "1.1.3" @@ -211,7 +197,7 @@ version = "4.5.49" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a0b5487afeab2deb2ff4e03a807ad1a03ac532ff5a2cee5d86884440c7f7671" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2", "quote", "syn", @@ -238,26 +224,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "const-random" -version = "0.1.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" -dependencies = [ - "const-random-macro", -] - -[[package]] -name = "const-random-macro" -version = "0.1.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" -dependencies = [ - "getrandom 0.2.16", - "once_cell", - "tiny-keccak", -] - [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -270,12 +236,6 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" -[[package]] -name = "crunchy" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" - [[package]] name = "curl" version = "0.4.49" @@ -494,24 +454,18 @@ dependencies = [ "wasi", ] -[[package]] -name = "getrandom" -version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" -dependencies = [ - "cfg-if", - "libc", - "r-efi", - "wasip2", -] - [[package]] name = "hashbrown" version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d" +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "heck" version = "0.5.0" @@ -676,6 +630,15 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "indoc" +version = "2.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79cf5c93f93228cf8efb3ba362535fb11199ac548a09ce117c9b1adc3030d706" +dependencies = [ + "rustversion", +] + [[package]] name = "instant" version = "0.1.13" @@ -797,9 +760,8 @@ dependencies = [ "futures", "isahc", "parking_lot", + "pyo3", "regex", - "rhai", - "rhai-rand", "serde", "serde_json", "thiserror", @@ -813,6 +775,15 @@ version = "2.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" +[[package]] +name = "memoffset" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.17" @@ -830,15 +801,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "no-std-compat" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" -dependencies = [ - "spin", -] - [[package]] name = "num-traits" version = "0.2.19" @@ -853,9 +815,6 @@ name = "once_cell" version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" -dependencies = [ - "portable-atomic", -] [[package]] name = "once_cell_polyfill" @@ -1014,19 +973,77 @@ dependencies = [ ] [[package]] -name = "quote" -version = "1.0.41" +name = "pyo3" +version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce25767e7b499d1b604768e7cde645d14cc8584231ea6b295e9c9eb22c02e1d1" +checksum = "a5e00b96a521718e08e03b1a622f01c8a8deb50719335de3f60b3b3950f069d8" +dependencies = [ + "cfg-if", + "indoc", + "libc", + "memoffset", + "parking_lot", + "portable-atomic", + "pyo3-build-config", + "pyo3-ffi", + "pyo3-macros", + "serde", + "unindent", +] + +[[package]] +name = "pyo3-build-config" +version = "0.21.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7883df5835fafdad87c0d888b266c8ec0f4c9ca48a5bed6bbb592e8dedee1b50" +dependencies = [ + "once_cell", + "target-lexicon", +] + +[[package]] +name = "pyo3-ffi" +version = "0.21.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01be5843dc60b916ab4dad1dca6d20b9b4e6ddc8e15f50c47fe6d85f1fb97403" +dependencies = [ + "libc", + "pyo3-build-config", +] + +[[package]] +name = "pyo3-macros" +version = "0.21.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77b34069fc0682e11b31dbd10321cbf94808394c56fd996796ce45217dfac53c" dependencies = [ "proc-macro2", + "pyo3-macros-backend", + "quote", + "syn", ] [[package]] -name = "r-efi" -version = "5.3.0" +name = "pyo3-macros-backend" +version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +checksum = "08260721f32db5e1a5beae69a55553f56b99bd0e1c3e6e0a5e8851a9d0f5a85c" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "pyo3-build-config", + "quote", + "syn", +] + +[[package]] +name = "quote" +version = "1.0.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce25767e7b499d1b604768e7cde645d14cc8584231ea6b295e9c9eb22c02e1d1" +dependencies = [ + "proc-macro2", +] [[package]] name = "rand" @@ -1055,7 +1072,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom 0.2.16", + "getrandom", ] [[package]] @@ -1096,48 +1113,6 @@ version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" -[[package]] -name = "rhai" -version = "1.23.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "527390cc333a8d2cd8237890e15c36518c26f8b54c903d86fc59f42f08d25594" -dependencies = [ - "ahash", - "bitflags 2.10.0", - "instant", - "no-std-compat", - "num-traits", - "once_cell", - "rhai_codegen", - "serde", - "smallvec", - "smartstring", - "thin-vec", -] - -[[package]] -name = "rhai-rand" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4314e7e2a1f5d5de224ae3bc9ce2af4c0146d07d3c3aadf9840f08d80ef28800" -dependencies = [ - "rand", - "rhai", - "serde", - "serde_json", -] - -[[package]] -name = "rhai_codegen" -version = "3.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4322a2a4e8cf30771dd9f27f7f37ca9ac8fe812dddd811096a98483080dabe6" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "rustversion" version = "1.0.22" @@ -1246,21 +1221,6 @@ name = "smallvec" version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" -dependencies = [ - "serde", -] - -[[package]] -name = "smartstring" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fb72c633efbaa2dd666986505016c32c3044395ceaf881518399d2f4127ee29" -dependencies = [ - "autocfg", - "serde", - "static_assertions", - "version_check", -] [[package]] name = "socket2" @@ -1272,24 +1232,12 @@ dependencies = [ "windows-sys 0.60.2", ] -[[package]] -name = "spin" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" - [[package]] name = "stable_deref_trait" version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" -[[package]] -name = "static_assertions" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" - [[package]] name = "strsim" version = "0.11.1" @@ -1319,13 +1267,10 @@ dependencies = [ ] [[package]] -name = "thin-vec" -version = "0.2.14" +name = "target-lexicon" +version = "0.12.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "144f754d318415ac792f9d69fc87abbbfc043ce2ef041c60f16ad828f638717d" -dependencies = [ - "serde", -] +checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" [[package]] name = "thiserror" @@ -1347,15 +1292,6 @@ dependencies = [ "syn", ] -[[package]] -name = "tiny-keccak" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" -dependencies = [ - "crunchy", -] - [[package]] name = "tinystr" version = "0.8.1" @@ -1442,6 +1378,12 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "462eeb75aeb73aea900253ce739c8e18a67423fadf006037cd3ff27e82748a06" +[[package]] +name = "unindent" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7264e107f553ccae879d21fbea1d6724ac785e8c3bfc762137959b5802826ef3" + [[package]] name = "url" version = "2.5.7" @@ -1482,12 +1424,6 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" -[[package]] -name = "version_check" -version = "0.9.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" - [[package]] name = "waker-fn" version = "1.2.0" @@ -1500,15 +1436,6 @@ version = "0.11.1+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" -[[package]] -name = "wasip2" -version = "1.0.1+wasi-0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" -dependencies = [ - "wit-bindgen", -] - [[package]] name = "wasm-bindgen" version = "0.2.104" @@ -1849,12 +1776,6 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" -[[package]] -name = "wit-bindgen" -version = "0.46.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" - [[package]] name = "writeable" version = "0.6.1" diff --git a/Cargo.toml b/Cargo.toml index f60f9ce..0ca271c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,8 +14,7 @@ form-data-builder = "1.0.1" url_encoded_data = "0.6.1" thiserror = "1.0.69" async-recursion = "1.0.4" -rhai = { version = "1", features = ["serde", "sync"] } -rhai-rand = "0.1.6" +pyo3 = { version = "0.21.2", features = ["auto-initialize", "serde"] } futures = "0.3.31" parking_lot = "0.12.3" chrono = "0.4.39" diff --git a/flow.json b/flow.json index 9d5baf6..befb866 100644 --- a/flow.json +++ b/flow.json @@ -22,13 +22,13 @@ } }, { - "RunRhaiCode": { - "code": "let user_id = http_response[\"data\"].id;" + "RunPythonCode": { + "code": "user_id = http_response['data']['id']" } }, { - "RunRhaiCode": { - "code": "print(\"Picked user_id: \" + user_id);" + "RunPythonCode": { + "code": "print(f'Picked user_id: {user_id}')" } }, { @@ -47,8 +47,8 @@ } }, { - "RunRhaiCode": { - "code": "let data = http_response.data;\nprint(data.first_name + \" \" + data.last_name);" + "RunPythonCode": { + "code": "data = http_response['data']\nprint(f\"{data['first_name']} {data['last_name']}\")" } } ] diff --git a/src/flow/function.rs b/src/flow/function.rs index 676bbb8..01168fe 100644 --- a/src/flow/function.rs +++ b/src/flow/function.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -use crate::functions::{http_request, load_gen, rhai_code, sleep}; +use crate::functions::{http_request, load_gen, python_code, sleep}; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Flow { @@ -12,7 +12,7 @@ pub enum Function { HttpRequest(http_request::HttpRequestParam), Sleep(sleep::SleepParam), LoadGen(load_gen::LoadGenParam), - RunRhaiCode(rhai_code::RhaiCodeParam), + RunPythonCode(python_code::PythonCodeParam), // Pick random item from li // Append item to ilst } diff --git a/src/functions/http_request.rs b/src/functions/http_request.rs index 8adbf1a..03b9876 100644 --- a/src/functions/http_request.rs +++ b/src/functions/http_request.rs @@ -7,7 +7,7 @@ use isahc::{config::RedirectPolicy, prelude::*, Request}; use isahc::{AsyncBody, AsyncReadResponseExt, HttpClient}; use form_data_builder::FormData; -use rhai::Dynamic; +use serde_json::{json, Value as JsonValue}; use tokio::sync::oneshot; use url_encoded_data::UrlEncodedData; @@ -17,7 +17,7 @@ use crate::kv_store::commands::{Command, Sender}; use super::result::*; -async fn set_local_value(local_kv_tx: &Sender, key: &str, value: Dynamic) -> Result<()> { +async fn set_local_value(local_kv_tx: &Sender, key: &str, value: JsonValue) -> Result<()> { let (resp_tx, resp_rx) = oneshot::channel(); local_kv_tx .send(Command::Set { @@ -31,11 +31,12 @@ async fn set_local_value(local_kv_tx: &Sender, key: &str, value: Dynamic) -> Res } async fn append_metric(global_kv_tx: &Sender, metric: HttpMetric) -> Result<()> { + let metric_value = serde_json::to_value(metric)?; let (resp_tx, resp_rx) = oneshot::channel(); global_kv_tx .send(Command::Append { key: "load_gen_metrics".into(), - value: Dynamic::from(metric), + value: metric_value, resp: resp_tx, }) .await?; @@ -67,25 +68,15 @@ async fn record_http_error( global_kv_tx: &Sender, local_kv_tx: &Sender, ) -> Result<()> { - set_local_value( - local_kv_tx, - "http_response", - Dynamic::from(error_message.clone()), - ) - .await?; + set_local_value(local_kv_tx, "http_response", json!(error_message.clone())).await?; set_local_value( local_kv_tx, "http_status_code", - Dynamic::from_int(status_code.unwrap_or(0)), + json!(status_code.unwrap_or(0)), ) .await?; let headers_json = headers_json.unwrap_or_else(|| "{}".to_string()); - set_local_value( - local_kv_tx, - "http_response_headers", - Dynamic::from(headers_json), - ) - .await?; + set_local_value(local_kv_tx, "http_response_headers", json!(headers_json)).await?; if should_collect_metrics { let metric = HttpMetric { @@ -368,21 +359,16 @@ pub async fn make_request( } }; - set_local_value(&local_kv_tx, "http_response", Dynamic::from(body.clone())).await?; + set_local_value(&local_kv_tx, "http_response", json!(body.clone())).await?; set_local_value( &local_kv_tx, "http_status_code", - Dynamic::from_int(response.status().as_u16() as i64), + json!(response.status().as_u16() as i64), ) .await?; let headers_json = headers_to_json(response.headers())?; - set_local_value( - &local_kv_tx, - "http_response_headers", - Dynamic::from(headers_json), - ) - .await?; + set_local_value(&local_kv_tx, "http_response_headers", json!(headers_json)).await?; // Collect metrics if the key is set. if should_collect_metrics { diff --git a/src/functions/load_gen.rs b/src/functions/load_gen.rs index 88c2cd7..ca75754 100644 --- a/src/functions/load_gen.rs +++ b/src/functions/load_gen.rs @@ -1,12 +1,12 @@ -use std::path::PathBuf; +use std::{collections::BTreeMap, path::PathBuf}; -use rhai::Array; use serde::{Deserialize, Serialize}; +use serde_json::{json, Value as JsonValue}; use crate::{ flow::Function, - functions::{http_request::HttpMetric, run::run_functions}, - kv_store::commands::{Command, Sender, Value}, + functions::{http_request::HttpMetric, python_code, run::run_functions}, + kv_store::commands::{Command, Sender}, }; use tokio::{ @@ -28,35 +28,29 @@ pub struct LoadGenParam { functions_to_execute: Vec, } -fn max(a: i64, b: i64) -> i64 { - if a > b { - a - } else { - b - } -} - -fn min(a: i64, b: i64) -> i64 { - if a < b { - a - } else { - b - } -} - fn eval_task_count( + interpreter: &python_code::PythonInterpreter, expression: &str, tick: i64, ) -> std::result::Result> { - let mut engine = rhai::Engine::new(); - engine.register_fn("max", max); - engine.register_fn("min", min); - - let mut scope = rhai::Scope::new(); - scope.push_constant("TICK", tick); - - let result = engine.eval_expression_with_scope(&mut scope, expression)?; - Ok(result) + let mut context = BTreeMap::new(); + context.insert("TICK".to_string(), json!(tick)); + + let result = python_code::eval_expression_with_context(interpreter, expression, &context)?; + match result { + JsonValue::Number(num) => { + if let Some(value) = num.as_i64() { + Ok(value) + } else if let Some(value) = num.as_u64() { + Ok(value as i64) + } else { + Err(format!("expression '{expression}' produced non-integer value").into()) + } + } + other => { + Err(format!("expression '{expression}' produced non-numeric value: {other}").into()) + } + } } pub async fn load_gen(param: LoadGenParam, kv_tx: Sender) -> FunctionResult { @@ -65,10 +59,12 @@ pub async fn load_gen(param: LoadGenParam, kv_tx: Sender) -> FunctionResult { config_display.functions_to_execute = Vec::new(); println!("{:?}", config_display); - let metrics: Array = Vec::new(); + let eval_interpreter = python_code::PythonInterpreter::new()?; + + let metrics = json!([]); let (resp_tx, resp_rx) = oneshot::channel(); kv_tx - .send(Command::SetArray { + .send(Command::Set { key: "load_gen_metrics".into(), value: metrics, resp: resp_tx, @@ -98,7 +94,7 @@ pub async fn load_gen(param: LoadGenParam, kv_tx: Sender) -> FunctionResult { param.timeout, ))); - let spawn_rate = eval_task_count(¶m.spawn_rate, tick)?.max(1) as u64; + let spawn_rate = eval_task_count(&eval_interpreter, ¶m.spawn_rate, tick)?.max(1) as u64; if (i + 1) % spawn_rate == 0 { sleep(Duration::from_secs(1)).await; tick += 1; @@ -144,12 +140,12 @@ pub async fn load_gen(param: LoadGenParam, kv_tx: Sender) -> FunctionResult { .await?; let metrics = resp_rx.await??; - if let Value::Array(mut metrics) = metrics { + if let Some(JsonValue::Array(metrics)) = metrics { println!("Collected metrics array size: {:?}", metrics.len()); let metrics: Vec = metrics - .iter_mut() - .map(|x| x.take().cast::()) - .collect(); + .into_iter() + .map(serde_json::from_value) + .collect::>()?; let json_str = serde_json::to_string(&metrics)?; @@ -160,10 +156,15 @@ pub async fn load_gen(param: LoadGenParam, kv_tx: Sender) -> FunctionResult { resp: resp_tx, }) .await?; - let metrics_output_path = match resp_rx.await?? { - Value::Dynamic(value) => value.clone_cast::(), - Value::Array(_) => unreachable!(), - }; + let metrics_output_value = resp_rx.await??; + let metrics_output_path: PathBuf = metrics_output_value + .and_then(|value| serde_json::from_value(value).ok()) + .ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::Other, + "metrics_output_path is missing or invalid", + ) + })?; println!("Saving collected metrics to: {:?}", metrics_output_path); std::fs::write(metrics_output_path, json_str)?; diff --git a/src/functions/mod.rs b/src/functions/mod.rs index b7a11d5..d688495 100644 --- a/src/functions/mod.rs +++ b/src/functions/mod.rs @@ -1,6 +1,6 @@ pub mod http_request; pub mod load_gen; +pub mod python_code; pub mod result; -pub mod rhai_code; pub mod run; pub mod sleep; diff --git a/src/functions/python_code.rs b/src/functions/python_code.rs new file mode 100644 index 0000000..d90b2d3 --- /dev/null +++ b/src/functions/python_code.rs @@ -0,0 +1,241 @@ +use std::collections::BTreeMap; +use std::fmt; + +use pyo3::prelude::*; +use pyo3::types::{PyAny, PyDict}; +use pyo3::Bound; +use pyo3::PyErr; +use serde::{Deserialize, Serialize}; +use serde_json::Value as JsonValue; +use tokio::sync::oneshot; + +use crate::kv_store::commands::{Command, Sender}; + +use super::result::*; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct PythonCodeParam { + pub code: String, +} + +#[derive(Debug)] +struct PythonError(String); + +impl fmt::Display for PythonError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl std::error::Error for PythonError {} + +fn python_error(message: String) -> Box { + Box::new(PythonError(message)) +} + +fn map_py_err(err: PyErr) -> Box { + python_error(err.to_string()) +} + +#[derive(Clone)] +pub struct PythonInterpreter { + globals: Py, +} + +impl PythonInterpreter { + pub fn new() -> Result { + Python::with_gil(|py| -> PyResult { + let globals = PyDict::new_bound(py); + let builtins = py.import_bound("builtins")?; + globals.set_item("__builtins__", builtins)?; + + Ok(Self { + globals: globals.unbind(), + }) + }) + .map_err(map_py_err) + } + + fn json_to_python(&self, py: Python<'_>, value: &JsonValue) -> Result> { + let json_str = serde_json::to_string(value) + .map_err(|err| python_error(format!("failed to serialize JSON for Python: {err}")))?; + let json_module = py.import_bound("json").map_err(map_py_err)?; + let loads = json_module.getattr("loads").map_err(map_py_err)?; + let loaded = loads.call1((json_str,)).map_err(map_py_err)?; + Ok(loaded.unbind()) + } + + fn python_to_json(&self, py: Python<'_>, value: &Bound<'_, PyAny>) -> Result { + let json_module = py.import_bound("json").map_err(map_py_err)?; + let dumps = json_module.getattr("dumps").map_err(map_py_err)?; + let json_str: String = dumps + .call1((value,)) + .map_err(map_py_err)? + .extract() + .map_err(map_py_err)?; + serde_json::from_str(&json_str).map_err(|err| { + python_error(format!("failed to deserialize Python value to JSON: {err}")) + }) + } + + fn sync_context( + &self, + py: Python<'_>, + globals: &Bound<'_, PyDict>, + context: &BTreeMap, + ) -> Result<()> { + for (key, value) in context { + let py_value = self.json_to_python(py, value)?; + globals.set_item(key, py_value).map_err(map_py_err)?; + } + Ok(()) + } + + fn collect_globals( + &self, + py: Python<'_>, + globals: &Bound<'_, PyDict>, + ) -> Result> { + let mut values = BTreeMap::new(); + + for (key, value) in globals.iter() { + let key_str = match key.extract::() { + Ok(name) => name, + Err(err) => { + eprintln!("Skipping Python scope entry with non-string key: {}", err); + continue; + } + }; + if key_str.starts_with("__") { + continue; + } + + match self.python_to_json(py, &value) { + Ok(json_value) => { + values.insert(key_str, json_value); + } + Err(err) => { + eprintln!( + "Skipping non-serializable Python variable '{}': {}", + key_str, err + ); + } + } + } + + Ok(values) + } + + pub fn run_script( + &self, + code: &str, + context: &BTreeMap, + ) -> Result> { + Python::with_gil(|py| -> Result> { + let globals = self.globals.bind(py); + self.sync_context(py, &globals, context)?; + py.run_bound(code, Some(&globals), Some(&globals)) + .map_err(map_py_err)?; + self.collect_globals(py, &globals) + }) + } + + pub fn eval_expression( + &self, + code: &str, + context: &BTreeMap, + ) -> Result { + Python::with_gil(|py| -> Result { + let globals = self.globals.bind(py); + self.sync_context(py, &globals, context)?; + let result = py + .eval_bound(code, Some(&globals), Some(&globals)) + .map_err(map_py_err)?; + self.python_to_json(py, &result) + }) + } +} + +async fn collect_variables(local_kv_tx: &Sender) -> Result> { + let (resp_tx, resp_rx) = oneshot::channel(); + local_kv_tx + .send(Command::ListKeys { resp: resp_tx }) + .await?; + let keys = resp_rx.await??; + + let mut variables = BTreeMap::new(); + for key in keys { + let (resp_tx, resp_rx) = oneshot::channel(); + local_kv_tx + .send(Command::Get { + key: key.clone(), + resp: resp_tx, + }) + .await?; + + if let Some(value) = resp_rx.await?? { + variables.insert(key, value); + } + } + + Ok(variables) +} + +async fn delete_variable(local_kv_tx: &Sender, key: &str) -> Result<()> { + let (resp_tx, resp_rx) = oneshot::channel(); + local_kv_tx + .send(Command::Delete { + key: key.to_owned(), + resp: resp_tx, + }) + .await?; + resp_rx.await??; + Ok(()) +} + +pub async fn run_python_code( + param: PythonCodeParam, + python: PythonInterpreter, + local_kv_tx: Sender, +) -> FunctionResult { + let variables = collect_variables(&local_kv_tx).await?; + let existing_keys: Vec = variables.keys().cloned().collect(); + let updated_values = python.run_script(¶m.code, &variables)?; + + for key in existing_keys { + if !updated_values.contains_key(&key) { + delete_variable(&local_kv_tx, &key).await?; + } + } + + for (key, value) in updated_values { + let (resp_tx, resp_rx) = oneshot::channel(); + local_kv_tx + .send(Command::Set { + key, + value, + resp: resp_tx, + }) + .await?; + resp_rx.await??; + } + + Ok(FunctionStatus::Passed) +} + +pub async fn eval_python_code( + code: &str, + python: PythonInterpreter, + local_kv_tx: Sender, +) -> Result { + let variables = collect_variables(&local_kv_tx).await?; + python.eval_expression(code, &variables) +} + +pub fn eval_expression_with_context( + python: &PythonInterpreter, + code: &str, + context: &BTreeMap, +) -> Result { + python.eval_expression(code, context) +} diff --git a/src/functions/rhai_code.rs b/src/functions/rhai_code.rs deleted file mode 100644 index 7aa3ab6..0000000 --- a/src/functions/rhai_code.rs +++ /dev/null @@ -1,109 +0,0 @@ -use rhai::packages::Package; -use rhai::Dynamic; -use rhai_rand::RandomPackage; -use serde::{Deserialize, Serialize}; -use tokio::sync::oneshot; - -use crate::kv_store::commands::{Command, Sender, Value}; - -use super::result::*; - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct RhaiCodeParam { - code: String, -} - -fn max(a: i64, b: i64) -> i64 { - if a > b { - a - } else { - b - } -} - -fn min(a: i64, b: i64) -> i64 { - if a < b { - a - } else { - b - } -} - -/// Initializes the scope with all the variables from the kv store and registers -/// some necessary functions to the engine. -async fn init_engine_and_scope( - local_kv_tx: &tokio::sync::mpsc::Sender, -) -> std::result::Result< - (rhai::Engine, rhai::Scope<'static>), - Box, -> { - let mut engine = rhai::Engine::new(); - engine.register_fn("max", max); - engine.register_fn("min", min); - let (resp_tx, resp_rx) = oneshot::channel(); - local_kv_tx - .send(Command::ListKeys { resp: resp_tx }) - .await?; - let keys = resp_rx.await??; - let mut scope = rhai::Scope::new(); - for key in keys { - let (resp_tx, resp_rx) = oneshot::channel(); - local_kv_tx - .send(Command::Get { - key: key.clone(), - resp: resp_tx, - }) - .await?; - let value = resp_rx.await??; - let mut value = match value { - Value::Dynamic(val) => val, - Value::Array(val) => Dynamic::from_array(val), - }; - - // If it's a json string, we try to convert it to a rhai::Map, - // otherwise just store the plain string. - if value.is::() { - value = match engine.parse_json(value.take().cast::(), true) { - Ok(map) => Dynamic::from_map(map), - _ => value, - }; - } - scope.set_or_push(key, value); - } - let random = RandomPackage::new(); - random.register_into_engine(&mut engine); - Ok((engine, scope)) -} - -pub async fn run_rhai_code( - param: RhaiCodeParam, - _global_kv_tx: Sender, - local_kv_tx: Sender, -) -> FunctionResult { - let (engine, mut scope) = init_engine_and_scope(&local_kv_tx).await?; - - // Run the code. - engine.run_with_scope(&mut scope, ¶m.code)?; - - // Read all the variables and store/overwrite them in the store. - for (key, _is_constant, value) in scope.iter() { - let (resp_tx, resp_rx) = oneshot::channel(); - local_kv_tx - .send(Command::Set { - key: key.into(), - value, - resp: resp_tx, - }) - .await?; - resp_rx.await??; - } - - Ok(FunctionStatus::Passed) -} - -pub async fn eval_rhai_code(code: &str, local_kv_tx: Sender) -> Result { - let (engine, mut scope) = init_engine_and_scope(&local_kv_tx).await?; - - // Run the code - Ok(engine.eval_with_scope::(&mut scope, code)?) -} diff --git a/src/functions/run.rs b/src/functions/run.rs index fbce6e1..cdab9f2 100644 --- a/src/functions/run.rs +++ b/src/functions/run.rs @@ -4,7 +4,7 @@ use std::time::Duration; use regex::Regex; -use rhai::Dynamic; +use serde_json::Value as JsonValue; use tokio::time::Instant; use crate::flow::{Flow, Function}; @@ -12,8 +12,8 @@ use crate::kv_store::{commands::Sender, store::new as kv_store_new}; use super::http_request; use super::load_gen; +use super::python_code; use super::result::*; -use super::rhai_code; use super::sleep; pub async fn run_flow(flow: Flow, kv_tx: Sender) -> FunctionResult { @@ -34,8 +34,12 @@ pub async fn run_loadgen(functions: Vec, kv_tx: Sender) -> FunctionRes Ok(FunctionStatus::Passed) } -async fn interpolate_variables(input: &str, local_kv_tx: Sender) -> Result> { - let mut map: BTreeMap<&str, Dynamic> = BTreeMap::new(); +async fn interpolate_variables( + input: &str, + python: python_code::PythonInterpreter, + local_kv_tx: Sender, +) -> Result> { + let mut map: BTreeMap = BTreeMap::new(); let re = Regex::new(r"%\|(.+?)\|%").unwrap(); @@ -44,9 +48,9 @@ async fn interpolate_variables(input: &str, local_kv_tx: Sender) -> Result Result value.to_string(), + Some(value) => match value { + JsonValue::String(s) => s, + other => other.to_string(), + }, None => format!("NO_SUCH_VARIABLE:{key}"), } }); @@ -71,6 +78,7 @@ pub async fn run_functions( // scoping mechanisms with scope names that can be referred from inside // functions. Maybe a graph of scopes that child scopes can refer back to? let (local_kv_handle, local_kv_tx) = kv_store_new().await; + let python_interpreter = python_code::PythonInterpreter::new()?; let end_time = Instant::now() + Duration::from_secs(timeout); let mut final_status = FunctionStatus::Passed; @@ -84,13 +92,18 @@ pub async fn run_functions( let exec_result: FunctionResult = { let exec_local_kv = local_kv_tx.clone(); let exec_global_kv = global_kv_tx.clone(); + let exec_python = python_interpreter.clone(); async move { // 1. Convert the Function to a string. let function_str = serde_json::to_string(&function)?; // 2. Perform variable (string) interpolation and insert variable values. - let interpolated = - interpolate_variables(&function_str, exec_local_kv.clone()).await?; + let interpolated = interpolate_variables( + &function_str, + exec_python.clone(), + exec_local_kv.clone(), + ) + .await?; // 3. Convert the interpolated string back to a Function that can be executed. let executable_function: Function = serde_json::from_str(interpolated.as_ref())?; @@ -110,8 +123,8 @@ pub async fn run_functions( Function::Sleep(param) => { sleep::sleep(param, remaining_time, exec_global_kv).await } - Function::RunRhaiCode(param) => { - rhai_code::run_rhai_code(param, exec_global_kv, exec_local_kv).await + Function::RunPythonCode(param) => { + python_code::run_python_code(param, exec_python, exec_local_kv).await } Function::LoadGen(_) => panic!("load gen function cannot be nested"), } diff --git a/src/kv_store/commands.rs b/src/kv_store/commands.rs index 44de544..9704af8 100644 --- a/src/kv_store/commands.rs +++ b/src/kv_store/commands.rs @@ -1,4 +1,4 @@ -use rhai::{Array, Dynamic}; +use serde_json::Value as JsonValue; use tokio::sync::{mpsc, oneshot}; type Result = std::result::Result>; @@ -9,18 +9,12 @@ pub type Responder = oneshot::Sender>; pub type Sender = mpsc::Sender; -#[derive(Debug, Clone)] -pub enum Value { - Dynamic(Dynamic), - Array(Array), -} - #[allow(dead_code)] #[derive(Debug)] pub enum Command { Get { key: String, - resp: Responder, + resp: Responder>, }, Exists { key: String, @@ -28,12 +22,7 @@ pub enum Command { }, Set { key: String, - value: Dynamic, - resp: Responder<()>, - }, - SetArray { - key: String, - value: Array, + value: JsonValue, resp: Responder<()>, }, Delete { @@ -42,7 +31,7 @@ pub enum Command { }, Append { key: String, - value: Dynamic, + value: JsonValue, resp: Responder<()>, }, ListKeys { diff --git a/src/kv_store/store.rs b/src/kv_store/store.rs index 07901ea..18f6955 100644 --- a/src/kv_store/store.rs +++ b/src/kv_store/store.rs @@ -1,16 +1,16 @@ use std::collections::BTreeMap; -use rhai::Dynamic; +use serde_json::Value as JsonValue; use tokio::{sync::mpsc, task::JoinHandle}; -use crate::kv_store::commands::{Command, Sender, Value}; +use crate::kv_store::commands::{Command, Sender}; // 1. Create the receiver, transmitter // 2. Create the hashmap/btreemap to hold the data // 3. Return the receivers and transmitters struct KvStore { - data: BTreeMap, + data: BTreeMap, } #[allow(dead_code)] @@ -21,7 +21,7 @@ impl KvStore { } } - pub fn get(&self, key: impl ToString) -> Option<&Value> { + pub fn get(&self, key: impl ToString) -> Option<&JsonValue> { let key = key.to_string(); self.data.get(&key) } @@ -31,17 +31,17 @@ impl KvStore { self.data.contains_key(&key) } - pub fn set(&mut self, key: impl ToString, value: Value) -> Option { + pub fn set(&mut self, key: impl ToString, value: JsonValue) -> Option { let key = key.to_string(); self.data.insert(key, value) } - pub fn delete(&mut self, key: impl ToString) -> Option { + pub fn delete(&mut self, key: impl ToString) -> Option { let key = key.to_string(); self.data.remove(&key) } - pub fn append(&mut self, key: impl ToString, value: Dynamic) { + pub fn append(&mut self, key: impl ToString, value: JsonValue) { let key = key.to_string(); // self.data @@ -57,7 +57,7 @@ impl KvStore { None => return, }; - if let Value::Array(arr) = arr { + if let JsonValue::Array(arr) = arr { arr.push(value); } } @@ -84,13 +84,13 @@ pub async fn new() -> (JoinHandle<()>, Sender) { Command::Get { key, resp } => { let res = store.get(key); match res { - Some(val) => resp.send(Ok(val.clone())), - None => resp.send(Ok(Value::Dynamic(Dynamic::from(())))), + Some(val) => resp.send(Ok(Some(val.clone()))), + None => resp.send(Ok(None)), } .expect("setting values should never fail"); } Command::Set { key, value, resp } => { - store.set(key, Value::Dynamic(value)); + store.set(key, value); let _ = resp.send(empty_ok); } Command::Delete { key, resp } => { @@ -105,10 +105,6 @@ pub async fn new() -> (JoinHandle<()>, Sender) { let exists = store.exists(key); let _ = resp.send(Ok(exists)); } - Command::SetArray { key, value, resp } => { - store.set(key, Value::Array(value)); - let _ = resp.send(empty_ok); - } Command::ListKeys { resp } => { let keys = store.list_keys(); let _ = resp.send(Ok(keys)); diff --git a/src/main.rs b/src/main.rs index 4894444..cbf7ede 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,7 +6,7 @@ use std::path::PathBuf; use clap::Parser; use kv_store::commands::Command; -use rhai::Dynamic; +use serde_json::json; use tokio::sync::oneshot; use crate::flow::Flow; @@ -53,7 +53,7 @@ async fn main() -> Result<()> { kv_tx .send(Command::Set { key: "metrics_output_path".into(), - value: Dynamic::from(args.output_path), + value: json!(args.output_path), resp: resp_tx, }) .await?;