Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ include = [
]

[workspace.dependencies]
prosa-utils = { version = "0.4.2", path = "prosa_utils" }
prosa-macros = { version = "0.4.1", path = "prosa_macros" }
prosa-utils = { version = "0.4.3", path = "prosa_utils" }
prosa-macros = { version = "0.4.2", path = "prosa_macros" }
thiserror = "2"
aquamarine = "0.6"
bytes = "1"
Expand Down
5 changes: 4 additions & 1 deletion cargo-prosa/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cargo-prosa"
version = "0.4.1"
version = "0.4.2"
authors.workspace = true
description = "ProSA utility to package and deliver a builded ProSA"
homepage.workspace = true
Expand All @@ -13,6 +13,9 @@ include.workspace = true
name = "cargo-prosa"
path = "src/main.rs"

[lints.clippy]
unwrap_used = "deny"

[dependencies]
aquamarine.workspace = true
clap = "4"
Expand Down
5 changes: 3 additions & 2 deletions cargo-prosa/assets/build.rs.j2
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ fn write_run_rs(out_dir: &OsString, desc: &Desc, metadata: &HashMap<&str, Metada
writeln!(f, "{{ '}}' }}")?;

writeln!(f, "\n/// Method to run all configured processors, return the number of processors runned")?;
writeln!(f, "fn run_processors(bus: prosa::core::main::Main<{{ '{}' }}>, settings: &RunSettings) {{ '{{' }}", desc.prosa.tvf)?;
writeln!(f, "fn run_processors(bus: prosa::core::main::Main<{{ '{}' }}>, settings: &RunSettings) -> Result<(), std::io::Error> {{ '{{' }}", desc.prosa.tvf)?;

let mut proc_id = 0u32;
if let Some(processors) = &desc.proc {{ '{' }}
Expand All @@ -131,10 +131,11 @@ fn write_run_rs(out_dir: &OsString, desc: &Desc, metadata: &HashMap<&str, Metada
writeln!(f, " let proc = {{ '{}' }}::<{{ '{}' }}>::create_raw({{ '{}' }}, \"{{ '{}' }}\".to_string(), bus.clone());", processor.proc, desc.prosa.tvf, proc_id, processor.get_name())?;
{{ '}' }}

writeln!(f, " prosa::core::proc::Proc::<{{ '{}' }}>::run(proc);", processor.adaptor)?;
writeln!(f, " prosa::core::proc::Proc::<{{ '{}' }}>::run(proc)?;", processor.adaptor)?;
{{ '}' }}
{{ '}' }}

writeln!(f, " Ok(())")?;
writeln!(f, "{{ '}}' }}")?;
writeln!(f, "\n/// Number of configured processor")?;
writeln!(f, "#[allow(dead_code)]")?;
Expand Down
2 changes: 1 addition & 1 deletion cargo-prosa/assets/main.rs.j2
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn prosa_main(matches: clap::ArgMatches) -> Result<(), Box<dyn std::error:
let main_task = main.run();

// Run all processors
run_processors(bus, &prosa_settings);
run_processors(bus, &prosa_settings)?;

// Wait on main task
main_task.await;
Expand Down
11 changes: 6 additions & 5 deletions cargo-prosa/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ mod tests {
use super::*;

#[test]
fn prosa_desc_toml() {
fn prosa_desc_toml() -> io::Result<()> {
let mut prosa_desc = Desc::default();
prosa_desc.add_proc(ProcDesc::new(
"proc".into(),
Expand All @@ -249,14 +249,15 @@ proc_name = \"proc\"
proc = \"crate::proc\"
adaptor = \"crate::adaptor\"
";
assert_eq!(prosa_toml, toml::to_string(&prosa_desc).unwrap());
assert_eq!(Ok(prosa_toml.to_string()), toml::to_string(&prosa_desc));

// FIXME use environment variable when they will be available for unit tests
let toml_path_file = Path::new("/tmp/test_prosa_desc.toml");
let mut toml_file = fs::File::create(toml_path_file).unwrap();
toml_file.write_all(prosa_toml.as_bytes()).unwrap();
let mut toml_file = fs::File::create(toml_path_file)?;
toml_file.write_all(prosa_toml.as_bytes())?;

let prosa_desc_from_file = Desc::read(toml_path_file).unwrap();
let prosa_desc_from_file = Desc::read(toml_path_file)?;
assert_eq!(prosa_desc, prosa_desc_from_file);
Ok(())
}
}
6 changes: 4 additions & 2 deletions cargo-prosa/src/cargo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,10 @@ impl CargoMetadata {
if cargo_metadata.status.success() {
let mut metadata: CargoMetadata =
serde_json::from_slice(cargo_metadata.stdout.as_slice())?;
if metadata.packages.len() == 1 {
Ok(metadata.packages.pop().unwrap())
if let Some(package) = metadata.packages.pop()
&& metadata.packages.is_empty()
{
Ok(package)
} else {
Err(io::Error::new(
io::ErrorKind::InvalidData,
Expand Down
12 changes: 6 additions & 6 deletions cargo-prosa/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create the new Rust project
let cargo_new = std::process::Command::new("cargo").args(args).output()?;

io::stdout().write_all(&cargo_new.stdout).unwrap();
io::stderr().write_all(&cargo_new.stderr).unwrap();
io::stdout().write_all(&cargo_new.stdout)?;
io::stderr().write_all(&cargo_new.stderr)?;

if cargo_new.status.success() {
init_prosa(path, &j2_context)?;
Expand All @@ -363,8 +363,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
args.push("--name");
args.push(name);
j2_context.insert("name", name);
} else if let Some(name) = path.file_name() {
j2_context.insert("name", &tera::Value::String(name.to_str().unwrap().into()));
} else if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
j2_context.insert("name", &tera::Value::String(name.into()));
}

j2_context.insert("deb_pkg", &matches.get_flag("deb"));
Expand All @@ -376,8 +376,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// Init the Rust project
let cargo_init = std::process::Command::new("cargo").args(args).output()?;

io::stdout().write_all(&cargo_init.stdout).unwrap();
io::stderr().write_all(&cargo_init.stderr).unwrap();
io::stdout().write_all(&cargo_init.stdout)?;
io::stderr().write_all(&cargo_init.stderr)?;

if cargo_init.status.success() {
init_prosa(path_name, &j2_context)?;
Expand Down
18 changes: 10 additions & 8 deletions cargo-prosa/src/package/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,14 @@ impl ContainerFile {

impl fmt::Display for ContainerFile {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let img_name = format!(
"{}:{}",
self.ctx.get("name").unwrap().as_str().unwrap(),
self.ctx.get("version").unwrap().as_str().unwrap()
);
let img_name = if let Some(name) = self.ctx.get("name").and_then(|v| v.as_str())
&& let Some(version) = self.ctx.get("version").and_then(|v| v.as_str())
{
format!("{name}:{version}")
} else {
"prosa:1.0.0".to_string()
};

writeln!(f, "To build your container, use the command:")?;
if self.is_docker {
write!(f, " `docker build")?;
Expand All @@ -116,9 +119,8 @@ impl fmt::Display for ContainerFile {
} else if self.path.is_some() {
writeln!(
f,
" `podman build -f {} -t {} .`",
self.get_path().display(),
img_name
" `podman build -f {} -t {img_name} .`",
self.get_path().display()
)
} else {
writeln!(f, " `podman build -t {img_name} .`")
Expand Down
4 changes: 2 additions & 2 deletions cargo-prosa/src/package/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ impl InstanceInstall {
let cargo_build = std::process::Command::new("cargo")
.args(build_args)
.output()?;
io::stdout().write_all(&cargo_build.stdout).unwrap();
io::stderr().write_all(&cargo_build.stderr).unwrap();
io::stdout().write_all(&cargo_build.stdout)?;
io::stderr().write_all(&cargo_build.stderr)?;

if !cargo_build.status.success() {
return Err(io::Error::new(
Expand Down
15 changes: 12 additions & 3 deletions cargo-prosa/tests/cargo-prosa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ fn replace_prosa_dependencies(prosa_path: &PathBuf) {
if let Some(opt) = build_opt {
cmd.args(opt);
}
cmd.args(["--path", test_prosa_dep.to_str().unwrap(), prosa_dep]);
cmd.args([
"--path",
test_prosa_dep.to_str().expect("PathBuf should be string"),
prosa_dep,
]);
cmd.assert().success();
}
}
Expand Down Expand Up @@ -175,7 +179,12 @@ Package prosa-utils\[[0-9].[0-9].[0-9]\] \(ProSA utils\)
assert!(!containerfile_path.exists() && !dockerfile_path.exists());
let mut cmd = cargo_prosa_command();
cmd.current_dir(&prosa_path);
cmd.args(["container", containerfile_path.to_str().unwrap()]);
cmd.args([
"container",
containerfile_path
.to_str()
.expect("PathBuf should be string"),
]);
cmd.assert().success().stdout(predicate::str::is_match(
r"To build your container, use the command:
`podman build -f .*/dummy-test-prosa/Containerfile -t dummy-test-prosa:0\.1\.0 \.`",
Expand All @@ -187,7 +196,7 @@ Package prosa-utils\[[0-9].[0-9].[0-9]\] \(ProSA utils\)
"container",
"--docker",
"-b rust-latest",
dockerfile_path.to_str().unwrap(),
dockerfile_path.to_str().expect("PathBuf should be string"),
]);
cmd.assert().success().stdout(predicate::str::is_match(
r"To build your container, use the command:
Expand Down
5 changes: 4 additions & 1 deletion prosa/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "prosa"
version = "0.4.2"
version = "0.4.3"
authors.workspace = true
description = "ProSA core"
homepage.workspace = true
Expand Down Expand Up @@ -39,6 +39,9 @@ proc = "stub::proc::StubProc"
settings = "stub::proc::StubSettings"
adaptor = ["stub::adaptor::StubParotAdaptor"]

[lints.clippy]
unwrap_used = "deny"

[dependencies]
prosa-utils = { workspace = true, features = ["msg", "config", "config-observability"] }
prosa-macros.workspace = true
Expand Down
13 changes: 6 additions & 7 deletions prosa/examples/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ where
let stub_service_name = String::from("STUB_TEST");
if let Some(service) = self.service.get_proc_service(&stub_service_name) {
debug!("The service is find: {:?}", service);
service.proc_queue.send(InternalMsg::Request(RequestMsg::new(stub_service_name, tvf.clone(), self.proc.get_service_queue()))).await.unwrap();
let _ = service.proc_queue.send(InternalMsg::Request(RequestMsg::new(stub_service_name, tvf.clone(), self.proc.get_service_queue()))).await;
}

let proc_service_name = String::from("PROC_TEST");
if let Some(service) = self.service.get_proc_service(&proc_service_name) {
debug!("The service is find: {:?}", service);
service.proc_queue.send(InternalMsg::Request(RequestMsg::new(proc_service_name, tvf, self.proc.get_service_queue()))).await.unwrap();
let _ = service.proc_queue.send(InternalMsg::Request(RequestMsg::new(proc_service_name, tvf, self.proc.get_service_queue()))).await;
}
},
Some(msg) = pending_msgs.pull(), if !pending_msgs.is_empty() => {
Expand Down Expand Up @@ -116,8 +116,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::builder()
.add_source(config::File::with_name("examples/my_prosa_settings.yml"))
.add_source(config::Environment::with_prefix("PROSA"))
.build()
.unwrap();
.build()?;

let my_settings = config.try_deserialize::<MySettings>()?;
println!("My ProSA settings: {my_settings:?}");
Expand All @@ -139,18 +138,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
bus.clone(),
stub_settings,
);
Proc::<StubParotAdaptor>::run(stub_proc);
Proc::<StubParotAdaptor>::run(stub_proc)?;

// Launch the test processor
let proc = MyProcClass::<SimpleStringTvf>::create_raw(2, String::from("proc_1"), bus.clone());
Proc::<MyAdaptor>::run(proc);
Proc::<MyAdaptor>::run(proc)?;

// Wait before launch the second processor
std::thread::sleep(time::Duration::from_secs(2));

// Launch the second test processor
let proc2 = MyProcClass::<SimpleStringTvf>::create_raw(3, String::from("proc_2"), bus.clone());
Proc::<MyAdaptor>::run(proc2);
Proc::<MyAdaptor>::run(proc2)?;

// Wait on main task
main.run().await;
Expand Down
8 changes: 5 additions & 3 deletions prosa/src/core/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,12 @@ where
.send(InternalMsg::Response(ResponseMsg::from_request(self, resp)))
.map_err(|e| {
e.map(|i| {
if let InternalMsg::Response(mut resp) = i {
resp.take_data().unwrap()
if let InternalMsg::Response(mut resp) = i
&& let Some(data) = resp.take_data()
{
data
} else {
panic!("Expected InternalMsg::Response")
panic!("Expected InternalMsg::Response with data")
}
})
})
Expand Down
23 changes: 14 additions & 9 deletions prosa/src/core/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ use glob::glob;
use log::{error, info, warn};
use std::borrow::Cow;
use std::fmt::Debug;
use std::io;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::sleep;
Expand Down Expand Up @@ -191,8 +192,12 @@ pub trait ProcSettings {
Config::builder()
.add_source(
glob(config_path)
.unwrap()
.map(|path| File::from(path.unwrap()))
.map_err(|e| {
ConfigError::Message(format!(
"Wrong adaptor config path pattern `{config_path}`: `{e}`"
))
})?
.filter_map(|path| path.ok().map(File::from))
.collect::<Vec<_>>(),
)
.build()?
Expand Down Expand Up @@ -586,7 +591,7 @@ where
/// Proc::<A>::run(proc);
/// }
/// ```
fn run(mut self)
fn run(mut self) -> Result<(), io::Error>
where
Self: Sized + 'static + std::marker::Send,
{
Expand All @@ -606,12 +611,11 @@ where
.enable_all()
.thread_name(self.name())
.build()
.unwrap()
.expect("Tokio single Runtime can't be build")
.block_on(async {
proc_run!(self);
})
})
.unwrap();
})?;
}
// Start a Tokio runtime on multiple threads
n => {
Expand All @@ -623,14 +627,15 @@ where
.enable_all()
.thread_name(self.name())
.build()
.unwrap()
.expect("Tokio Runtime can't be build")
.block_on(async {
proc_run!(self);
})
})
.unwrap();
})?;
}
}

Ok(())
}
}

Expand Down
10 changes: 6 additions & 4 deletions prosa/src/event/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ mod tests {
},
InternalMsg::Service(table) => {
if let Some(service) = table.get_proc_service("TEST") {
service.proc_queue.send(InternalMsg::Request(RequestMsg::new(String::from("TEST"), Default::default(), self.proc.get_service_queue().clone()))).await.unwrap();
service.proc_queue.send(InternalMsg::Request(RequestMsg::new(String::from("TEST"), Default::default(), self.proc.get_service_queue().clone()))).await.expect("Internal msg should be send");
}
},
_ => return Err(BusError::ProcComm(self.get_proc_id(), 0, String::from("Wrong message"))),
Expand Down Expand Up @@ -423,7 +423,7 @@ mod tests {
if let Some(service) = table.get_proc_service("TEST") {
let mut msg: SimpleStringTvf = Default::default();
msg.put_string(1, "good");
service.proc_queue.send(InternalMsg::Request(RequestMsg::new(String::from("TEST"), msg, self.proc.get_service_queue().clone()))).await.unwrap();
service.proc_queue.send(InternalMsg::Request(RequestMsg::new(String::from("TEST"), msg, self.proc.get_service_queue().clone()))).await.expect("Internal msg should be send");
}
},
_ => return Err(BusError::ProcComm(self.get_proc_id(), 0, String::from("Wrong message"))),
Expand Down Expand Up @@ -494,7 +494,9 @@ mod tests {
.await
);

bus.stop("ProSA unit test end".into()).await.unwrap();
main_task.await.unwrap();
bus.stop("ProSA unit test end".into())
.await
.expect("ProSA should stop");
main_task.await.expect("Main task should end correctly");
}
}
Loading
Loading