-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbasic.rs
More file actions
154 lines (131 loc) · 5.39 KB
/
basic.rs
File metadata and controls
154 lines (131 loc) · 5.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// Copyright 2022 Jeff Kim <hiking90@gmail.com>
// SPDX-License-Identifier: Apache-2.0
use anyhow::Result;
use rsactor::{message_handlers, Actor, ActorRef, ActorWeak};
use tokio::time::{interval, Duration};
use tracing::info;
// Message types
struct Increment; // Message to increment the actor's counter
struct Decrement; // Message to decrement the actor's counter
// Define the actor struct
struct MyActor {
count: u32, // Internal state of the actor
start_up: std::time::Instant, // Optional field to track the start time
tick_300ms: tokio::time::Interval, // Interval for 300ms ticks
tick_1s: tokio::time::Interval, // Interval for 1s ticks
}
// Implement the Actor trait for MyActor
impl Actor for MyActor {
type Args = Self;
type Error = anyhow::Error; // Define the error type for actor operations
// Called when the actor is started
async fn on_start(args: Self::Args, _actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
info!("MyActor started. Initial count: {}.", args.count);
Ok(args)
}
async fn on_run(&mut self, _actor_ref: &ActorWeak<Self>) -> Result<bool, Self::Error> {
// Use the tokio::select! macro to handle the first completed asynchronous operation among several.
tokio::select! {
// Executes when the 300ms interval timer ticks.
_ = self.tick_300ms.tick() => {
println!("300ms tick. Elapsed: {:?}",
self.start_up.elapsed()); // Print the current count
}
// Executes when the 1s interval timer ticks. (Currently no specific action)
_ = self.tick_1s.tick() => {
println!("1s tick. Elapsed: {:?} ",
self.start_up.elapsed()); // Print the current count
}
}
Ok(true) // Continue calling on_run
}
}
// A dummy message type for demonstration
struct DummyMessage;
// Message handling using the #[message_handlers] macro with #[handler] attributes
#[message_handlers]
impl MyActor {
#[handler]
async fn handle_increment(&mut self, _msg: Increment, _: &ActorRef<Self>) -> u32 {
self.count += 1;
println!("MyActor handled Increment. Count is now {}.", self.count);
self.count
}
#[handler]
async fn handle_decrement(&mut self, _msg: Decrement, _: &ActorRef<Self>) -> u32 {
self.count -= 1;
println!("MyActor handled Decrement. Count is now {}.", self.count);
self.count
}
#[handler]
async fn handle_dummy_message(&mut self, _msg: DummyMessage, _: &ActorRef<Self>) -> u32 {
println!("MyActor handled DummyMessage. Count is now {}.", self.count);
self.count
}
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_target(false)
.init();
println!("Spawning MyActor...");
let my_actor = MyActor {
count: 100,
start_up: std::time::Instant::now(),
tick_300ms: interval(Duration::from_millis(300)),
tick_1s: interval(Duration::from_secs(1)),
};
// Spawn the actor. This returns an ActorRef for sending messages
// and a JoinHandle to await the actor's completion.
let (actor_ref, join_handle) = rsactor::spawn::<MyActor>(my_actor); // MODIFIED: use system.spawn and await
tokio::time::sleep(Duration::from_millis(700)).await;
println!("Sending Increment message...");
// Send an Increment message and await the reply using `ask`.
let count_after_inc: u32 = actor_ref.ask(Increment).await?;
println!("Reply after Increment: {count_after_inc}");
println!("Sending Decrement message...");
// Send a Decrement message and await the reply.
let count_after_dec: u32 = actor_ref.ask(Decrement).await?;
println!("Reply after Decrement: {count_after_dec}");
println!("Sending Increment message again...");
// Send another Increment message.
let count_after_inc_2: u32 = actor_ref.ask(Increment).await?;
println!("Reply after Increment again: {count_after_inc_2}");
tokio::time::sleep(Duration::from_millis(700)).await;
// Signal the actor to stop gracefully.
// The actor will process any remaining messages in its mailbox before stopping.
println!("Sending StopGracefully message to actor.",);
actor_ref.stop().await?; // Corrected method name
// Wait for the actor's task to complete.
// `join_handle.await` returns a Result containing a tuple:
// - The actor instance (allowing access to its final state).
// - The ActorResult indicating completion state and returned actor.
println!("Waiting for actor to stop...");
let result = join_handle.await?;
// Successfully retrieved the actor result.
match result {
rsactor::ActorResult::Completed { actor, killed } => {
println!(
"Actor stopped. Final count: {}. Killed: {}",
actor.count, killed
);
}
rsactor::ActorResult::Failed {
actor,
error,
phase,
killed,
} => {
println!(
"Actor stop failed: {}. Phase: {}, Killed: {}. Final count: {}",
error,
phase,
killed,
actor.as_ref().map(|a| a.count).unwrap_or(0)
);
}
}
println!("Main function finished.");
Ok(())
}