Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 100 additions & 179 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ form-data-builder = "1.0.1"
url_encoded_data = "0.6.1"
thiserror = "1.0.69"
async-recursion = "1.0.4"
rhai = { version = "1", features = ["serde", "sync"] }
rhai-rand = "0.1.6"
pyo3 = { version = "0.21.2", features = ["auto-initialize", "serde"] }
futures = "0.3.31"
parking_lot = "0.12.3"
chrono = "0.4.39"
Expand Down
12 changes: 6 additions & 6 deletions flow.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
}
},
{
"RunRhaiCode": {
"code": "let user_id = http_response[\"data\"].id;"
"RunPythonCode": {
"code": "user_id = http_response['data']['id']"
}
},
{
"RunRhaiCode": {
"code": "print(\"Picked user_id: \" + user_id);"
"RunPythonCode": {
"code": "print(f'Picked user_id: {user_id}')"
}
},
{
Expand All @@ -47,8 +47,8 @@
}
},
{
"RunRhaiCode": {
"code": "let data = http_response.data;\nprint(data.first_name + \" \" + data.last_name);"
"RunPythonCode": {
"code": "data = http_response['data']\nprint(f\"{data['first_name']} {data['last_name']}\")"
}
}
]
Expand Down
4 changes: 2 additions & 2 deletions src/flow/function.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};

use crate::functions::{http_request, load_gen, rhai_code, sleep};
use crate::functions::{http_request, load_gen, python_code, sleep};

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Flow {
Expand All @@ -12,7 +12,7 @@ pub enum Function {
HttpRequest(http_request::HttpRequestParam),
Sleep(sleep::SleepParam),
LoadGen(load_gen::LoadGenParam),
RunRhaiCode(rhai_code::RhaiCodeParam),
RunPythonCode(python_code::PythonCodeParam),
// Pick random item from li
// Append item to ilst
}
34 changes: 10 additions & 24 deletions src/functions/http_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use isahc::{config::RedirectPolicy, prelude::*, Request};
use isahc::{AsyncBody, AsyncReadResponseExt, HttpClient};

use form_data_builder::FormData;
use rhai::Dynamic;
use serde_json::{json, Value as JsonValue};
use tokio::sync::oneshot;
use url_encoded_data::UrlEncodedData;

Expand All @@ -17,7 +17,7 @@ use crate::kv_store::commands::{Command, Sender};

use super::result::*;

async fn set_local_value(local_kv_tx: &Sender, key: &str, value: Dynamic) -> Result<()> {
async fn set_local_value(local_kv_tx: &Sender, key: &str, value: JsonValue) -> Result<()> {
let (resp_tx, resp_rx) = oneshot::channel();
local_kv_tx
.send(Command::Set {
Expand All @@ -31,11 +31,12 @@ async fn set_local_value(local_kv_tx: &Sender, key: &str, value: Dynamic) -> Res
}

async fn append_metric(global_kv_tx: &Sender, metric: HttpMetric) -> Result<()> {
let metric_value = serde_json::to_value(metric)?;
let (resp_tx, resp_rx) = oneshot::channel();
global_kv_tx
.send(Command::Append {
key: "load_gen_metrics".into(),
value: Dynamic::from(metric),
value: metric_value,
resp: resp_tx,
})
.await?;
Expand Down Expand Up @@ -67,25 +68,15 @@ async fn record_http_error(
global_kv_tx: &Sender,
local_kv_tx: &Sender,
) -> Result<()> {
set_local_value(
local_kv_tx,
"http_response",
Dynamic::from(error_message.clone()),
)
.await?;
set_local_value(local_kv_tx, "http_response", json!(error_message.clone())).await?;
set_local_value(
local_kv_tx,
"http_status_code",
Dynamic::from_int(status_code.unwrap_or(0)),
json!(status_code.unwrap_or(0)),
)
.await?;
let headers_json = headers_json.unwrap_or_else(|| "{}".to_string());
set_local_value(
local_kv_tx,
"http_response_headers",
Dynamic::from(headers_json),
)
.await?;
set_local_value(local_kv_tx, "http_response_headers", json!(headers_json)).await?;

if should_collect_metrics {
let metric = HttpMetric {
Expand Down Expand Up @@ -368,21 +359,16 @@ pub async fn make_request(
}
};

set_local_value(&local_kv_tx, "http_response", Dynamic::from(body.clone())).await?;
set_local_value(&local_kv_tx, "http_response", json!(body.clone())).await?;
set_local_value(
&local_kv_tx,
"http_status_code",
Dynamic::from_int(response.status().as_u16() as i64),
json!(response.status().as_u16() as i64),
)
.await?;

let headers_json = headers_to_json(response.headers())?;
set_local_value(
&local_kv_tx,
"http_response_headers",
Dynamic::from(headers_json),
)
.await?;
set_local_value(&local_kv_tx, "http_response_headers", json!(headers_json)).await?;

// Collect metrics if the key is set.
if should_collect_metrics {
Expand Down
81 changes: 41 additions & 40 deletions src/functions/load_gen.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::path::PathBuf;
use std::{collections::BTreeMap, path::PathBuf};

use rhai::Array;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value as JsonValue};

use crate::{
flow::Function,
functions::{http_request::HttpMetric, run::run_functions},
kv_store::commands::{Command, Sender, Value},
functions::{http_request::HttpMetric, python_code, run::run_functions},
kv_store::commands::{Command, Sender},
};

use tokio::{
Expand All @@ -28,35 +28,29 @@ pub struct LoadGenParam {
functions_to_execute: Vec<Function>,
}

fn max(a: i64, b: i64) -> i64 {
if a > b {
a
} else {
b
}
}

fn min(a: i64, b: i64) -> i64 {
if a < b {
a
} else {
b
}
}

fn eval_task_count(
interpreter: &python_code::PythonInterpreter,
expression: &str,
tick: i64,
) -> std::result::Result<i64, Box<dyn std::error::Error + Send + Sync>> {
let mut engine = rhai::Engine::new();
engine.register_fn("max", max);
engine.register_fn("min", min);

let mut scope = rhai::Scope::new();
scope.push_constant("TICK", tick);

let result = engine.eval_expression_with_scope(&mut scope, expression)?;
Ok(result)
let mut context = BTreeMap::new();
context.insert("TICK".to_string(), json!(tick));

let result = python_code::eval_expression_with_context(interpreter, expression, &context)?;
match result {
JsonValue::Number(num) => {
if let Some(value) = num.as_i64() {
Ok(value)
} else if let Some(value) = num.as_u64() {
Ok(value as i64)
} else {
Err(format!("expression '{expression}' produced non-integer value").into())
}
}
other => {
Err(format!("expression '{expression}' produced non-numeric value: {other}").into())
}
}
}

pub async fn load_gen(param: LoadGenParam, kv_tx: Sender) -> FunctionResult {
Expand All @@ -65,10 +59,12 @@ pub async fn load_gen(param: LoadGenParam, kv_tx: Sender) -> FunctionResult {
config_display.functions_to_execute = Vec::new();
println!("{:?}", config_display);

let metrics: Array = Vec::new();
let eval_interpreter = python_code::PythonInterpreter::new()?;

let metrics = json!([]);
let (resp_tx, resp_rx) = oneshot::channel();
kv_tx
.send(Command::SetArray {
.send(Command::Set {
key: "load_gen_metrics".into(),
value: metrics,
resp: resp_tx,
Expand Down Expand Up @@ -98,7 +94,7 @@ pub async fn load_gen(param: LoadGenParam, kv_tx: Sender) -> FunctionResult {
param.timeout,
)));

let spawn_rate = eval_task_count(&param.spawn_rate, tick)?.max(1) as u64;
let spawn_rate = eval_task_count(&eval_interpreter, &param.spawn_rate, tick)?.max(1) as u64;
if (i + 1) % spawn_rate == 0 {
sleep(Duration::from_secs(1)).await;
tick += 1;
Expand Down Expand Up @@ -144,12 +140,12 @@ pub async fn load_gen(param: LoadGenParam, kv_tx: Sender) -> FunctionResult {
.await?;
let metrics = resp_rx.await??;

if let Value::Array(mut metrics) = metrics {
if let Some(JsonValue::Array(metrics)) = metrics {
println!("Collected metrics array size: {:?}", metrics.len());
let metrics: Vec<HttpMetric> = metrics
.iter_mut()
.map(|x| x.take().cast::<HttpMetric>())
.collect();
.into_iter()
.map(serde_json::from_value)
.collect::<std::result::Result<_, _>>()?;

let json_str = serde_json::to_string(&metrics)?;

Expand All @@ -160,10 +156,15 @@ pub async fn load_gen(param: LoadGenParam, kv_tx: Sender) -> FunctionResult {
resp: resp_tx,
})
.await?;
let metrics_output_path = match resp_rx.await?? {
Value::Dynamic(value) => value.clone_cast::<PathBuf>(),
Value::Array(_) => unreachable!(),
};
let metrics_output_value = resp_rx.await??;
let metrics_output_path: PathBuf = metrics_output_value
.and_then(|value| serde_json::from_value(value).ok())
.ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::Other,
"metrics_output_path is missing or invalid",
)
})?;

println!("Saving collected metrics to: {:?}", metrics_output_path);
std::fs::write(metrics_output_path, json_str)?;
Expand Down
2 changes: 1 addition & 1 deletion src/functions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod http_request;
pub mod load_gen;
pub mod python_code;
pub mod result;
pub mod rhai_code;
pub mod run;
pub mod sleep;
Loading