1use std::sync::{
2 Arc,
3 atomic::{AtomicUsize, Ordering},
4};
5
6use actix::{Addr, Context, Handler, MailboxError, Message, Supervised};
7
8pub struct Pool<A: actix::Actor> {
9 pub(crate) workers: Vec<Addr<A>>,
10 pub(crate) current: Arc<AtomicUsize>,
11}
12
13impl<A> Pool<A>
14where
15 A: actix::Actor<Context = Context<A>> + Supervised,
16{
17 pub fn new<F: 'static + Clone + Fn() -> A>(size: usize, init_fn: F) -> Self {
18 Self {
19 workers: (0..size)
20 .map(|_| {
21 let init_fn = init_fn.clone();
22 actix::Supervisor::start(move |_| init_fn())
23 })
24 .collect(),
25 current: Arc::new(AtomicUsize::new(0)),
26 }
27 }
28
29 fn next_worker(&self) -> Addr<A> {
30 let current = self.current.fetch_add(1, Ordering::SeqCst);
31 let index = current % self.workers.len();
32 self.workers[index].clone()
33 }
34
35 pub fn do_send<M>(&self, msg: M)
36 where
37 A: Handler<M>,
38 M: Message + Send + 'static,
39 M::Result: Send,
40 {
41 let actor = self.next_worker();
42 actor.do_send(msg);
43 }
44
45 pub async fn send<M>(&self, msg: M) -> Result<M::Result, MailboxError>
46 where
47 A: Handler<M>,
48 M: Message + Send + 'static,
49 M::Result: Send,
50 {
51 let actor = self.next_worker();
52 actor.send(msg).await
53 }
54}
55
56#[cfg(test)]
57mod tests {
58 use actix::{Actor, ActorContext, Message, Supervised};
59
60 use crate::Pool;
61
62 struct TestActor {
63 pub name: String,
64 }
65
66 impl Actor for TestActor {
67 type Context = actix::Context<Self>;
68 }
69 impl Default for TestActor {
70 fn default() -> Self {
71 TestActor {
72 name: uuid::Uuid::new_v4().to_string(),
73 }
74 }
75 }
76 #[derive(Debug, Message)]
77 #[rtype(result = "String")]
78 struct TestMessage(usize);
79
80 #[derive(Debug, Message)]
81 #[rtype(result = "()")]
82 struct FailMessage;
83
84 impl actix::Handler<TestMessage> for TestActor {
85 type Result = String;
86 fn handle(&mut self, msg: TestMessage, _ctx: &mut Self::Context) -> Self::Result {
87 format!("Handle test message in test actor: {}-{}", self.name, msg.0)
88 }
89 }
90
91 impl actix::Handler<FailMessage> for TestActor {
92 type Result = ();
93 fn handle(&mut self, _msg: FailMessage, ctx: &mut Self::Context) -> Self::Result {
94 ctx.stop();
95 }
96 }
97
98 impl Supervised for TestActor {
99 fn restarting(&mut self, _ctx: &mut <Self as Actor>::Context) {
100 println!("Restarting actor: {}", self.name);
101 }
102 }
103
104 #[test]
105 fn test_pool() {
106 let sys = actix::System::new();
107
108 sys.block_on(async {
109 let pool = Pool::new(15, || TestActor::default());
110
111 for i in 0..250 {
112 if i % 2 != 0 {
113 let s = pool.send(TestMessage(i)).await;
114 println!("{s:?}");
115 assert!(s.is_ok())
116 } else {
117 let _ = pool.send(FailMessage).await;
118 }
119 }
120
121 actix::System::current().stop();
122 });
123 }
124}