threading/channel.rs
1pub struct Channel<R, W> {
2 receiver: std::sync::mpsc::Receiver<R>,
3 sender: std::sync::mpsc::Sender<W>,
4}
5
6impl<R, W> Channel<R, W> {
7 // i don't rly like how Read and Write have no meaning here (as you have a Sender<R> and a Receiver<W>)
8 // But having the function outside is bad too
9 /// Creates a new pair of inter-connected channels
10 pub fn new_pair() -> (Channel<W, R>, Channel<R, W>) {
11 let (sender1, receiver1) = std::sync::mpsc::channel::<R>();
12 let (sender2, receiver2) = std::sync::mpsc::channel::<W>();
13
14 let c1 = Channel {
15 sender: sender1,
16 receiver: receiver2,
17 };
18 let c2 = Channel {
19 sender: sender2,
20 receiver: receiver1,
21 };
22 (c1, c2)
23 }
24}
25
26impl<R: std::cmp::PartialEq, W: std::cmp::PartialEq> Channel<R, W> {
27 /// Waits for a specific message of type R to be received.
28 ///
29 /// # Parameters
30 /// - `waited_message`: The message to wait for.
31 ///
32 /// # Returns
33 /// - `Ok(())` if the waited message is received.
34 /// - An error of type `std::sync::mpsc::RecvError` if the receiving fails.
35 pub fn wait_for(&self, waited_message: R) -> Result<(), std::sync::mpsc::RecvError> {
36 loop {
37 let message = self.receiver.recv()?;
38 if message == waited_message {
39 break;
40 }
41 }
42 Ok(())
43 }
44
45 /// Waits for a specific message of type R to be received or times out after a specified duration.
46 ///
47 /// # Parameters
48 /// - `waited_message`: The message to wait for.
49 /// - `timeout`: The duration to wait before timing out.
50 ///
51 /// # Returns
52 /// - `Ok(())` if the waited message is received before the timeout.
53 /// - An error of type `std::sync::mpsc::RecvTimeoutError::Timeout` if the timeout occurs.
54 /// - An error of type `std::sync::mpsc::RecvTimeoutError` if the receiving fails.
55 pub fn wait_for_or_timeout(
56 &self,
57 waited_message: R,
58 timeout: std::time::Duration,
59 ) -> Result<(), std::sync::mpsc::RecvTimeoutError> {
60 let start_time = std::time::Instant::now();
61
62 let internal_timeout = timeout / 100;
63 while start_time.elapsed() < timeout {
64 // we map the internal_timeout to be very small to be able to quit as soon as the timeout is done
65 // + having a dynamic internal_timeout is adding to the consistency
66 match self.recv_timeout(internal_timeout) {
67 Ok(message) => {
68 if message == waited_message {
69 return Ok(());
70 }
71 }
72 Err(err) => return Err(err),
73 }
74 }
75 Err(std::sync::mpsc::RecvTimeoutError::Timeout)
76 }
77
78 /// Short hand for [std::sync::mpsc::Sender::send]
79 pub fn send(&self, t: W) -> Result<(), std::sync::mpsc::SendError<W>> {
80 self.sender.send(t)
81 }
82
83 /// Short hand for [std::sync::mpsc::Receiver::iter]
84 pub fn iter(&self) -> std::sync::mpsc::Iter<'_, R> {
85 self.receiver.iter()
86 }
87
88 /// Short hand for [std::sync::mpsc::Receiver::try_iter]
89 pub fn try_iter(&self) -> std::sync::mpsc::TryIter<'_, R> {
90 self.receiver.try_iter()
91 }
92
93 /// Short hand for [std::sync::mpsc::Receiver::recv]
94 pub fn recv(&self) -> Result<R, std::sync::mpsc::RecvError> {
95 self.receiver.recv()
96 }
97
98 /// Short hand for [std::sync::mpsc::Receiver::try_recv]
99 pub fn try_recv(&self) -> Result<R, std::sync::mpsc::TryRecvError> {
100 self.receiver.try_recv()
101 }
102
103 /// Short hand for [std::sync::mpsc::Receiver::recv_timeout]
104 pub fn recv_timeout(
105 &self,
106 timeout: std::time::Duration,
107 ) -> Result<R, std::sync::mpsc::RecvTimeoutError> {
108 self.receiver.recv_timeout(timeout)
109 }
110}