threading/
channel.rs

1pub struct Channel<R, W> {
2    receiver: std::sync::mpsc::Receiver<R>,
3    sender: std::sync::mpsc::Sender<W>,
4}
5
6impl<R, W> Channel<R, W> {
7    // i don't rly like how Read and Write have no meaning here (as you have a Sender<R> and a Receiver<W>)
8    // But having the function outside is bad too
9    /// Creates a new pair of inter-connected channels
10    pub fn new_pair() -> (Channel<W, R>, Channel<R, W>) {
11        let (sender1, receiver1) = std::sync::mpsc::channel::<R>();
12        let (sender2, receiver2) = std::sync::mpsc::channel::<W>();
13
14        let c1 = Channel {
15            sender: sender1,
16            receiver: receiver2,
17        };
18        let c2 = Channel {
19            sender: sender2,
20            receiver: receiver1,
21        };
22        (c1, c2)
23    }
24}
25
26impl<R: std::cmp::PartialEq, W: std::cmp::PartialEq> Channel<R, W> {
27    /// Waits for a specific message of type R to be received.
28    ///
29    /// # Parameters
30    /// - `waited_message`: The message to wait for.
31    ///
32    /// # Returns
33    /// - `Ok(())` if the waited message is received.
34    /// - An error of type `std::sync::mpsc::RecvError` if the receiving fails.
35    pub fn wait_for(&self, waited_message: R) -> Result<(), std::sync::mpsc::RecvError> {
36        loop {
37            let message = self.receiver.recv()?;
38            if message == waited_message {
39                break;
40            }
41        }
42        Ok(())
43    }
44
45    /// Waits for a specific message of type R to be received or times out after a specified duration.
46    ///
47    /// # Parameters
48    /// - `waited_message`: The message to wait for.
49    /// - `timeout`: The duration to wait before timing out.
50    ///
51    /// # Returns
52    /// - `Ok(())` if the waited message is received before the timeout.
53    /// - An error of type `std::sync::mpsc::RecvTimeoutError::Timeout` if the timeout occurs.
54    /// - An error of type `std::sync::mpsc::RecvTimeoutError` if the receiving fails.
55    pub fn wait_for_or_timeout(
56        &self,
57        waited_message: R,
58        timeout: std::time::Duration,
59    ) -> Result<(), std::sync::mpsc::RecvTimeoutError> {
60        let start_time = std::time::Instant::now();
61
62        let internal_timeout = timeout / 100;
63        while start_time.elapsed() < timeout {
64            // we map the internal_timeout to be very small to be able to quit as soon as the timeout is done
65            // + having a dynamic internal_timeout is adding to the consistency
66            match self.recv_timeout(internal_timeout) {
67                Ok(message) => {
68                    if message == waited_message {
69                        return Ok(());
70                    }
71                }
72                Err(err) => return Err(err),
73            }
74        }
75        Err(std::sync::mpsc::RecvTimeoutError::Timeout)
76    }
77
78    /// Short hand for [std::sync::mpsc::Sender::send]
79    pub fn send(&self, t: W) -> Result<(), std::sync::mpsc::SendError<W>> {
80        self.sender.send(t)
81    }
82
83    /// Short hand for [std::sync::mpsc::Receiver::iter]
84    pub fn iter(&self) -> std::sync::mpsc::Iter<'_, R> {
85        self.receiver.iter()
86    }
87
88    /// Short hand for [std::sync::mpsc::Receiver::try_iter]
89    pub fn try_iter(&self) -> std::sync::mpsc::TryIter<'_, R> {
90        self.receiver.try_iter()
91    }
92
93    /// Short hand for [std::sync::mpsc::Receiver::recv]
94    pub fn recv(&self) -> Result<R, std::sync::mpsc::RecvError> {
95        self.receiver.recv()
96    }
97
98    /// Short hand for [std::sync::mpsc::Receiver::try_recv]
99    pub fn try_recv(&self) -> Result<R, std::sync::mpsc::TryRecvError> {
100        self.receiver.try_recv()
101    }
102
103    /// Short hand for [std::sync::mpsc::Receiver::recv_timeout]
104    pub fn recv_timeout(
105        &self,
106        timeout: std::time::Duration,
107    ) -> Result<R, std::sync::mpsc::RecvTimeoutError> {
108        self.receiver.recv_timeout(timeout)
109    }
110}