diff --git a/examples/src/async_test.rs b/examples/src/async_test.rs index 2823ce3..af77b84 100644 --- a/examples/src/async_test.rs +++ b/examples/src/async_test.rs @@ -8,7 +8,7 @@ async fn main() -> Result<(), Box> { let conf: Config = Config::with_yaml_defaults("staging/config.yml".to_string())?; let service_name = "video"; // Create the rpc call struct - let mut rpc_client = RpcClient::new(conf); + let mut rpc_client = RpcClient::new(conf).await; rpc_client.register_service(service_name).await?; rpc_client.start().await?; let tempo: time::Duration = time::Duration::from_secs(2); @@ -30,7 +30,7 @@ async fn main() -> Result<(), Box> { } let duration = start.elapsed() - tempo; println!("Time elapsed in expensive_function() is: {:?}", duration); - rpc_client.unregister_service("video")?; + rpc_client.unregister_service("video").await?; rpc_client.close().await?; Ok(()) } diff --git a/examples/src/error_sender.rs b/examples/src/error_sender.rs index 3347370..baa6c67 100644 --- a/examples/src/error_sender.rs +++ b/examples/src/error_sender.rs @@ -8,7 +8,7 @@ async fn main() -> Result<(), Box> { let conf: Config = Config::with_yaml_defaults("staging/config.yml".to_string())?; let service_name = "video"; // Create the rpc call struct - let mut rpc_client = RpcClient::new(conf); + let mut rpc_client = RpcClient::new(conf).await; rpc_client.register_service(service_name).await?; rpc_client.start().await?; // Send the payload @@ -69,7 +69,7 @@ async fn main() -> Result<(), Box> { } let duration = start.elapsed() - tempo; println!("Time elapsed in expensive_function() is: {:?}", duration); - rpc_client.unregister_service("video")?; + rpc_client.unregister_service("video").await?; rpc_client.close().await?; Ok(()) } diff --git a/examples/src/simple_fib.rs b/examples/src/simple_fib.rs index 65ded04..e5de242 100644 --- a/examples/src/simple_fib.rs +++ b/examples/src/simple_fib.rs @@ -7,7 +7,7 @@ async fn main() -> Result<(), Box> { let conf: Config = Config::with_yaml_defaults("staging/config.yml".to_string())?; let service_name = "video"; // Create the rpc call struct - let mut rpc_client = RpcClient::new(conf); + let mut rpc_client = RpcClient::new(conf).await; rpc_client.register_service(service_name).await?; rpc_client.start().await?; let file = File::open("examples/data_set.json")?; diff --git a/examples/src/simple_sender.rs b/examples/src/simple_sender.rs index 082a652..0bc0a4c 100644 --- a/examples/src/simple_sender.rs +++ b/examples/src/simple_sender.rs @@ -8,7 +8,7 @@ async fn main() -> Result<(), Box> { let conf: Config = Config::with_yaml_defaults("staging/config.yml".to_string())?; let service_name = "video"; // Create the rpc call struct - let mut rpc_client = RpcClient::new(conf); + let mut rpc_client = RpcClient::new(conf).await; rpc_client.register_service(service_name).await?; rpc_client.start().await?; // Send the payload @@ -63,7 +63,7 @@ async fn main() -> Result<(), Box> { } let duration = start.elapsed() - tempo; println!("Time elapsed in expensive_function() is: {:?}", duration); - rpc_client.unregister_service("video")?; + rpc_client.unregister_service("video").await?; rpc_client.close().await?; Ok(()) } diff --git a/girolle/Cargo.toml b/girolle/Cargo.toml index 1222bc0..7591fbd 100644 --- a/girolle/Cargo.toml +++ b/girolle/Cargo.toml @@ -14,7 +14,7 @@ bench = false [dependencies] lapin = "2.5.3" -futures = "0.3.31" + girolle_macro = { path = "../girolle_macro", version = "1.8" } tokio-executor-trait = "2.1.3" tokio-reactor-trait = "1.1.0" diff --git a/girolle/src/lib.rs b/girolle/src/lib.rs index 8642b62..702919b 100644 --- a/girolle/src/lib.rs +++ b/girolle/src/lib.rs @@ -40,8 +40,8 @@ //! //! #[tokio::main] //! async fn main() { -//! let mut rpc_client = RpcClient::new(Config::default()); -//! rpc_client.register_service("video"); +//! let mut rpc_client = RpcClient::new(Config::default()).await; +//! rpc_client.register_service("video").await; //! //! let result = rpc_client.send("video", "hello", Payload::new().arg("Girolle")).unwrap(); //! } diff --git a/girolle/src/rpc_client.rs b/girolle/src/rpc_client.rs index e49b83e..f5db11e 100644 --- a/girolle/src/rpc_client.rs +++ b/girolle/src/rpc_client.rs @@ -3,7 +3,7 @@ use crate::error::GirolleError; use crate::payload::{Payload, PayloadResult}; use crate::queue::{create_message_channel, create_service_channel, get_connection}; use crate::types::GirolleResult; -use futures::executor; + use lapin::{ message::DeliveryResult, options::*, @@ -35,7 +35,7 @@ use uuid::Uuid; /// /// #[tokio::main] /// async fn main() { -/// let rpc_client = RpcClient::new(Config::default()); +/// let rpc_client = RpcClient::new(Config::default()).await; /// } pub struct RpcClient { conf: Config, @@ -68,20 +68,21 @@ impl RpcClient { /// /// #[tokio::main] /// async fn main() { - /// let target_service = RpcClient::new(Config::default()); + /// let target_service = RpcClient::new(Config::default()).await; /// } - pub fn new(conf: Config) -> Self { + pub async fn new(conf: Config) -> Self { let identifier = Uuid::new_v4(); - let conn = executor::block_on(get_connection(conf.AMQP_URI(), conf.heartbeat())) + let conn = get_connection(conf.AMQP_URI(), conf.heartbeat()).await .expect("Can't init connection"); let reply_queue_name = format!("rpc.listener-{}", identifier); - let reply_channel = executor::block_on(create_message_channel( + let reply_channel = create_message_channel( &conn, &reply_queue_name, conf.prefetch_count(), &identifier, conf.rpc_exchange(), - )) + ) + .await .expect("Can't create reply channel"); Self { conf, @@ -107,7 +108,7 @@ impl RpcClient { /// /// #[tokio::main] /// async fn main() { - /// let mut rpc_client = RpcClient::new(Config::default()); + /// let mut rpc_client = RpcClient::new(Config::default()).await; /// rpc_client.start().await.expect("call"); /// } pub async fn start(&mut self) -> GirolleResult<()> { @@ -170,7 +171,7 @@ impl RpcClient { /// /// #[tokio::main] /// async fn main() { - /// let rpc_client = RpcClient::new(Config::default()); + /// let rpc_client = RpcClient::new(Config::default()).await; /// let identifier = rpc_client.get_identifier(); /// } pub fn get_identifier(&self) -> String { @@ -199,7 +200,7 @@ impl RpcClient { /// #[tokio::main] /// async fn main() { /// let conf = Config::with_yaml_defaults("config.yml".to_string()).unwrap(); - /// let mut rpc_client = RpcClient::new(conf); + /// let mut rpc_client = RpcClient::new(conf).await; /// rpc_client.register_service("video").await.expect("call"); /// let method_name = "hello"; /// let consumer = rpc_client.call_async("video", method_name, Payload::new().arg("John Doe")); @@ -281,7 +282,7 @@ impl RpcClient { /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// let conf = Config::with_yaml_defaults("config.yml".to_string()).unwrap(); - /// let mut rpc_client = RpcClient::new(conf); + /// let mut rpc_client = RpcClient::new(conf).await; /// rpc_client.register_service("video").await.expect("call"); /// let method_name = "hello"; /// let rpc_event = rpc_client.call_async("video", method_name, Payload::new().arg("John Doe"))?; @@ -339,7 +340,7 @@ impl RpcClient { /// #[tokio::main] /// async fn main() { /// let conf = Config::with_yaml_defaults("config.yml".to_string()).unwrap(); - /// let mut rpc_client = RpcClient::new(conf); + /// let mut rpc_client = RpcClient::new(conf).await; /// rpc_client.register_service("video").await.expect("call"); /// let method_name = "hello"; /// let result = rpc_client.send("video", method_name, Payload::new().arg("John Doe")).expect("call"); @@ -371,9 +372,11 @@ impl RpcClient { /// ```rust,no_run /// use girolle::prelude::*; /// - /// - /// let rpc_client = RpcClient::new(Config::default()); - /// let conf = rpc_client.get_config(); + /// #[tokio::main] + /// async fn main() { + /// let rpc_client = RpcClient::new(Config::default()).await; + /// let conf = rpc_client.get_config(); + /// } /// ``` pub fn get_config(&self) -> &Config { &self.conf @@ -393,10 +396,12 @@ impl RpcClient { /// ```rust,no_run /// use girolle::prelude::*; /// - /// - /// let mut rpc_client = RpcClient::new(Config::default()); - /// let conf = Config::default(); - /// rpc_client.set_config(conf); + /// #[tokio::main] + /// async fn main() { + /// let mut rpc_client = RpcClient::new(Config::default()).await; + /// let conf = Config::default(); + /// rpc_client.set_config(conf); + /// } /// ``` pub fn set_config(&mut self, config: Config) -> std::result::Result<(), std::string::String> { self.conf = config; @@ -423,7 +428,7 @@ impl RpcClient { /// /// #[tokio::main] /// async fn main() { - /// let mut rpc_client = RpcClient::new(Config::default()); + /// let mut rpc_client = RpcClient::new(Config::default()).await; /// rpc_client.register_service("video").await.expect("call"); /// } pub async fn register_service(&mut self, service_name: &str) -> Result<(), lapin::Error> { @@ -459,13 +464,13 @@ impl RpcClient { /// /// #[tokio::main] /// async fn main() { - /// let mut rpc_client = RpcClient::new(Config::default()); + /// let mut rpc_client = RpcClient::new(Config::default()).await; /// rpc_client.register_service("video").await.expect("call"); - /// rpc_client.unregister_service("video").expect("call"); + /// rpc_client.unregister_service("video").await.expect("call"); /// } - pub fn unregister_service(&mut self, service_name: &str) -> Result<(), lapin::Error> { + pub async fn unregister_service(&mut self, service_name: &str) -> Result<(), lapin::Error> { let target_service = self.services.get(service_name).unwrap(); - target_service.close()?; + target_service.close().await?; self.services.remove(service_name); Ok(()) } @@ -482,7 +487,7 @@ impl RpcClient { /// /// #[tokio::main] /// async fn main() { - /// let rpc_client = RpcClient::new(Config::default()); + /// let rpc_client = RpcClient::new(Config::default()).await; /// rpc_client.close().await.expect("close"); /// } pub async fn close(&self) -> Result<(), lapin::Error> { @@ -506,7 +511,7 @@ impl RpcClient { /// /// #[tokio::main] /// async fn main() { -/// let mut rpc_client = RpcClient::new(Config::default()); +/// let mut rpc_client = RpcClient::new(Config::default()).await; /// rpc_client.register_service("video").await.expect("call"); /// } struct TargetService { @@ -517,8 +522,8 @@ impl TargetService { Self { channel } } #[allow(dead_code)] - fn close(&self) -> Result<(), lapin::Error> { - executor::block_on(self.channel.close(200, "Goodbye"))?; + async fn close(&self) -> Result<(), lapin::Error> { + self.channel.close(200, "Goodbye").await?; Ok(()) } #[allow(dead_code)]