Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/src/async_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let conf: Config = Config::with_yaml_defaults("staging/config.yml".to_string())?;
let service_name = "video";
// Create the rpc call struct
let mut rpc_client = RpcClient::new(conf);
let mut rpc_client = RpcClient::new(conf).await;
rpc_client.register_service(service_name).await?;
rpc_client.start().await?;
let tempo: time::Duration = time::Duration::from_secs(2);
Expand All @@ -30,7 +30,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
let duration = start.elapsed() - tempo;
println!("Time elapsed in expensive_function() is: {:?}", duration);
rpc_client.unregister_service("video")?;
rpc_client.unregister_service("video").await?;
rpc_client.close().await?;
Ok(())
}
4 changes: 2 additions & 2 deletions examples/src/error_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let conf: Config = Config::with_yaml_defaults("staging/config.yml".to_string())?;
let service_name = "video";
// Create the rpc call struct
let mut rpc_client = RpcClient::new(conf);
let mut rpc_client = RpcClient::new(conf).await;
rpc_client.register_service(service_name).await?;
rpc_client.start().await?;
// Send the payload
Expand Down Expand Up @@ -69,7 +69,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
let duration = start.elapsed() - tempo;
println!("Time elapsed in expensive_function() is: {:?}", duration);
rpc_client.unregister_service("video")?;
rpc_client.unregister_service("video").await?;
rpc_client.close().await?;
Ok(())
}
2 changes: 1 addition & 1 deletion examples/src/simple_fib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let conf: Config = Config::with_yaml_defaults("staging/config.yml".to_string())?;
let service_name = "video";
// Create the rpc call struct
let mut rpc_client = RpcClient::new(conf);
let mut rpc_client = RpcClient::new(conf).await;
rpc_client.register_service(service_name).await?;
rpc_client.start().await?;
let file = File::open("examples/data_set.json")?;
Expand Down
4 changes: 2 additions & 2 deletions examples/src/simple_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let conf: Config = Config::with_yaml_defaults("staging/config.yml".to_string())?;
let service_name = "video";
// Create the rpc call struct
let mut rpc_client = RpcClient::new(conf);
let mut rpc_client = RpcClient::new(conf).await;
rpc_client.register_service(service_name).await?;
rpc_client.start().await?;
// Send the payload
Expand Down Expand Up @@ -63,7 +63,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
let duration = start.elapsed() - tempo;
println!("Time elapsed in expensive_function() is: {:?}", duration);
rpc_client.unregister_service("video")?;
rpc_client.unregister_service("video").await?;
rpc_client.close().await?;
Ok(())
}
2 changes: 1 addition & 1 deletion girolle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ bench = false

[dependencies]
lapin = "2.5.3"
futures = "0.3.31"

girolle_macro = { path = "../girolle_macro", version = "1.8" }
tokio-executor-trait = "2.1.3"
tokio-reactor-trait = "1.1.0"
Expand Down
4 changes: 2 additions & 2 deletions girolle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
//!
//! #[tokio::main]
//! async fn main() {
//! let mut rpc_client = RpcClient::new(Config::default());
//! rpc_client.register_service("video");
//! let mut rpc_client = RpcClient::new(Config::default()).await;
//! rpc_client.register_service("video").await;
//!
//! let result = rpc_client.send("video", "hello", Payload::new().arg("Girolle")).unwrap();
//! }
Expand Down
61 changes: 33 additions & 28 deletions girolle/src/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::error::GirolleError;
use crate::payload::{Payload, PayloadResult};
use crate::queue::{create_message_channel, create_service_channel, get_connection};
use crate::types::GirolleResult;
use futures::executor;

use lapin::{
message::DeliveryResult,
options::*,
Expand Down Expand Up @@ -35,7 +35,7 @@ use uuid::Uuid;
///
/// #[tokio::main]
/// async fn main() {
/// let rpc_client = RpcClient::new(Config::default());
/// let rpc_client = RpcClient::new(Config::default()).await;
/// }
pub struct RpcClient {
conf: Config,
Expand Down Expand Up @@ -68,20 +68,21 @@ impl RpcClient {
///
/// #[tokio::main]
/// async fn main() {
/// let target_service = RpcClient::new(Config::default());
/// let target_service = RpcClient::new(Config::default()).await;
/// }
pub fn new(conf: Config) -> Self {
pub async fn new(conf: Config) -> Self {
let identifier = Uuid::new_v4();
let conn = executor::block_on(get_connection(conf.AMQP_URI(), conf.heartbeat()))
let conn = get_connection(conf.AMQP_URI(), conf.heartbeat()).await
.expect("Can't init connection");
let reply_queue_name = format!("rpc.listener-{}", identifier);
let reply_channel = executor::block_on(create_message_channel(
let reply_channel = create_message_channel(
&conn,
&reply_queue_name,
conf.prefetch_count(),
&identifier,
conf.rpc_exchange(),
))
)
.await
.expect("Can't create reply channel");
Self {
conf,
Expand All @@ -107,7 +108,7 @@ impl RpcClient {
///
/// #[tokio::main]
/// async fn main() {
/// let mut rpc_client = RpcClient::new(Config::default());
/// let mut rpc_client = RpcClient::new(Config::default()).await;
/// rpc_client.start().await.expect("call");
/// }
pub async fn start(&mut self) -> GirolleResult<()> {
Expand Down Expand Up @@ -170,7 +171,7 @@ impl RpcClient {
///
/// #[tokio::main]
/// async fn main() {
/// let rpc_client = RpcClient::new(Config::default());
/// let rpc_client = RpcClient::new(Config::default()).await;
/// let identifier = rpc_client.get_identifier();
/// }
pub fn get_identifier(&self) -> String {
Expand Down Expand Up @@ -199,7 +200,7 @@ impl RpcClient {
/// #[tokio::main]
/// async fn main() {
/// let conf = Config::with_yaml_defaults("config.yml".to_string()).unwrap();
/// let mut rpc_client = RpcClient::new(conf);
/// let mut rpc_client = RpcClient::new(conf).await;
/// rpc_client.register_service("video").await.expect("call");
/// let method_name = "hello";
/// let consumer = rpc_client.call_async("video", method_name, Payload::new().arg("John Doe"));
Expand Down Expand Up @@ -281,7 +282,7 @@ impl RpcClient {
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let conf = Config::with_yaml_defaults("config.yml".to_string()).unwrap();
/// let mut rpc_client = RpcClient::new(conf);
/// let mut rpc_client = RpcClient::new(conf).await;
/// rpc_client.register_service("video").await.expect("call");
/// let method_name = "hello";
/// let rpc_event = rpc_client.call_async("video", method_name, Payload::new().arg("John Doe"))?;
Expand Down Expand Up @@ -339,7 +340,7 @@ impl RpcClient {
/// #[tokio::main]
/// async fn main() {
/// let conf = Config::with_yaml_defaults("config.yml".to_string()).unwrap();
/// let mut rpc_client = RpcClient::new(conf);
/// let mut rpc_client = RpcClient::new(conf).await;
/// rpc_client.register_service("video").await.expect("call");
/// let method_name = "hello";
/// let result = rpc_client.send("video", method_name, Payload::new().arg("John Doe")).expect("call");
Expand Down Expand Up @@ -371,9 +372,11 @@ impl RpcClient {
/// ```rust,no_run
/// use girolle::prelude::*;
///
///
/// let rpc_client = RpcClient::new(Config::default());
/// let conf = rpc_client.get_config();
/// #[tokio::main]
/// async fn main() {
/// let rpc_client = RpcClient::new(Config::default()).await;
/// let conf = rpc_client.get_config();
/// }
/// ```
pub fn get_config(&self) -> &Config {
&self.conf
Expand All @@ -393,10 +396,12 @@ impl RpcClient {
/// ```rust,no_run
/// use girolle::prelude::*;
///
///
/// let mut rpc_client = RpcClient::new(Config::default());
/// let conf = Config::default();
/// rpc_client.set_config(conf);
/// #[tokio::main]
/// async fn main() {
/// let mut rpc_client = RpcClient::new(Config::default()).await;
/// let conf = Config::default();
/// rpc_client.set_config(conf);
/// }
/// ```
pub fn set_config(&mut self, config: Config) -> std::result::Result<(), std::string::String> {
self.conf = config;
Expand All @@ -423,7 +428,7 @@ impl RpcClient {
///
/// #[tokio::main]
/// async fn main() {
/// let mut rpc_client = RpcClient::new(Config::default());
/// let mut rpc_client = RpcClient::new(Config::default()).await;
/// rpc_client.register_service("video").await.expect("call");
/// }
pub async fn register_service(&mut self, service_name: &str) -> Result<(), lapin::Error> {
Expand Down Expand Up @@ -459,13 +464,13 @@ impl RpcClient {
///
/// #[tokio::main]
/// async fn main() {
/// let mut rpc_client = RpcClient::new(Config::default());
/// let mut rpc_client = RpcClient::new(Config::default()).await;
/// rpc_client.register_service("video").await.expect("call");
/// rpc_client.unregister_service("video").expect("call");
/// rpc_client.unregister_service("video").await.expect("call");
/// }
pub fn unregister_service(&mut self, service_name: &str) -> Result<(), lapin::Error> {
pub async fn unregister_service(&mut self, service_name: &str) -> Result<(), lapin::Error> {
let target_service = self.services.get(service_name).unwrap();
target_service.close()?;
target_service.close().await?;
self.services.remove(service_name);
Ok(())
}
Expand All @@ -482,7 +487,7 @@ impl RpcClient {
///
/// #[tokio::main]
/// async fn main() {
/// let rpc_client = RpcClient::new(Config::default());
/// let rpc_client = RpcClient::new(Config::default()).await;
/// rpc_client.close().await.expect("close");
/// }
pub async fn close(&self) -> Result<(), lapin::Error> {
Expand All @@ -506,7 +511,7 @@ impl RpcClient {
///
/// #[tokio::main]
/// async fn main() {
/// let mut rpc_client = RpcClient::new(Config::default());
/// let mut rpc_client = RpcClient::new(Config::default()).await;
/// rpc_client.register_service("video").await.expect("call");
/// }
struct TargetService {
Expand All @@ -517,8 +522,8 @@ impl TargetService {
Self { channel }
}
#[allow(dead_code)]
fn close(&self) -> Result<(), lapin::Error> {
executor::block_on(self.channel.close(200, "Goodbye"))?;
async fn close(&self) -> Result<(), lapin::Error> {
self.channel.close(200, "Goodbye").await?;
Ok(())
}
#[allow(dead_code)]
Expand Down
Loading