actix_actor_pool/
lib.rs

1use std::sync::{
2    Arc,
3    atomic::{AtomicUsize, Ordering},
4};
5
6use actix::{Addr, Context, Handler, MailboxError, Message, Supervised};
7
8pub struct Pool<A: actix::Actor> {
9    pub(crate) workers: Vec<Addr<A>>,
10    pub(crate) current: Arc<AtomicUsize>,
11}
12
13impl<A> Pool<A>
14where
15    A: actix::Actor<Context = Context<A>> + Supervised,
16{
17    pub fn new<F: 'static + Clone + Fn() -> A>(size: usize, init_fn: F) -> Self {
18        Self {
19            workers: (0..size)
20                .map(|_| {
21                    let init_fn = init_fn.clone();
22                    actix::Supervisor::start(move |_| init_fn())
23                })
24                .collect(),
25            current: Arc::new(AtomicUsize::new(0)),
26        }
27    }
28
29    fn next_worker(&self) -> Addr<A> {
30        let current = self.current.fetch_add(1, Ordering::SeqCst);
31        let index = current % self.workers.len();
32        self.workers[index].clone()
33    }
34
35    pub fn do_send<M>(&self, msg: M)
36    where
37        A: Handler<M>,
38        M: Message + Send + 'static,
39        M::Result: Send,
40    {
41        let actor = self.next_worker();
42        actor.do_send(msg);
43    }
44
45    pub async fn send<M>(&self, msg: M) -> Result<M::Result, MailboxError>
46    where
47        A: Handler<M>,
48        M: Message + Send + 'static,
49        M::Result: Send,
50    {
51        let actor = self.next_worker();
52        actor.send(msg).await
53    }
54}
55
56#[cfg(test)]
57mod tests {
58    use actix::{Actor, ActorContext, Message, Supervised};
59
60    use crate::Pool;
61
62    struct TestActor {
63        pub name: String,
64    }
65
66    impl Actor for TestActor {
67        type Context = actix::Context<Self>;
68    }
69    impl Default for TestActor {
70        fn default() -> Self {
71            TestActor {
72                name: uuid::Uuid::new_v4().to_string(),
73            }
74        }
75    }
76    #[derive(Debug, Message)]
77    #[rtype(result = "String")]
78    struct TestMessage(usize);
79
80    #[derive(Debug, Message)]
81    #[rtype(result = "()")]
82    struct FailMessage;
83
84    impl actix::Handler<TestMessage> for TestActor {
85        type Result = String;
86        fn handle(&mut self, msg: TestMessage, _ctx: &mut Self::Context) -> Self::Result {
87            format!("Handle test message in test actor: {}-{}", self.name, msg.0)
88        }
89    }
90
91    impl actix::Handler<FailMessage> for TestActor {
92        type Result = ();
93        fn handle(&mut self, _msg: FailMessage, ctx: &mut Self::Context) -> Self::Result {
94            ctx.stop();
95        }
96    }
97
98    impl Supervised for TestActor {
99        fn restarting(&mut self, _ctx: &mut <Self as Actor>::Context) {
100            println!("Restarting actor: {}", self.name);
101        }
102    }
103
104    #[test]
105    fn test_pool() {
106        let sys = actix::System::new();
107
108        sys.block_on(async {
109            let pool = Pool::new(15, || TestActor::default());
110
111            for i in 0..250 {
112                if i % 2 != 0 {
113                    let s = pool.send(TestMessage(i)).await;
114                    println!("{s:?}");
115                    assert!(s.is_ok())
116                } else {
117                    let _ = pool.send(FailMessage).await;
118                }
119            }
120
121            actix::System::current().stop();
122        });
123    }
124}