async_channel/
lib.rs

1//! An async multi-producer multi-consumer channel, where each message can be received by only
2//! one of all existing consumers.
3//!
4//! There are two kinds of channels:
5//!
6//! 1. [Bounded][`bounded()`] channel with limited capacity.
7//! 2. [Unbounded][`unbounded()`] channel with unlimited capacity.
8//!
9//! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable and can be shared
10//! among multiple threads.
11//!
12//! When all [`Sender`]s or all [`Receiver`]s are dropped, the channel becomes closed. When a
13//! channel is closed, no more messages can be sent, but remaining messages can still be received.
14//!
15//! The channel can also be closed manually by calling [`Sender::close()`] or
16//! [`Receiver::close()`].
17//!
18//! # Examples
19//!
20//! ```
21//! # futures_lite::future::block_on(async {
22//! let (s, r) = async_channel::unbounded();
23//!
24//! assert_eq!(s.send("Hello").await, Ok(()));
25//! assert_eq!(r.recv().await, Ok("Hello"));
26//! # });
27//! ```
28
29#![forbid(unsafe_code)]
30#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
31
32use std::error;
33use std::fmt;
34use std::future::Future;
35use std::pin::Pin;
36use std::process;
37use std::sync::atomic::{AtomicUsize, Ordering};
38use std::sync::Arc;
39use std::task::{Context, Poll};
40use std::usize;
41
42use concurrent_queue::{ConcurrentQueue, PopError, PushError};
43use event_listener::{Event, EventListener};
44use futures_core::stream::Stream;
45
46struct Channel<T> {
47    /// Inner message queue.
48    queue: ConcurrentQueue<T>,
49
50    /// Send operations waiting while the channel is full.
51    send_ops: Event,
52
53    /// Receive operations waiting while the channel is empty and not closed.
54    recv_ops: Event,
55
56    /// Stream operations while the channel is empty and not closed.
57    stream_ops: Event,
58
59    /// The number of currently active `Sender`s.
60    sender_count: AtomicUsize,
61
62    /// The number of currently active `Receivers`s.
63    receiver_count: AtomicUsize,
64}
65
66impl<T> Channel<T> {
67    /// Closes the channel and notifies all blocked operations.
68    ///
69    /// Returns `true` if this call has closed the channel and it was not closed already.
70    fn close(&self) -> bool {
71        if self.queue.close() {
72            // Notify all send operations.
73            self.send_ops.notify(usize::MAX);
74
75            // Notify all receive and stream operations.
76            self.recv_ops.notify(usize::MAX);
77            self.stream_ops.notify(usize::MAX);
78
79            true
80        } else {
81            false
82        }
83    }
84}
85
86/// Creates a bounded channel.
87///
88/// The created channel has space to hold at most `cap` messages at a time.
89///
90/// # Panics
91///
92/// Capacity must be a positive number. If `cap` is zero, this function will panic.
93///
94/// # Examples
95///
96/// ```
97/// # futures_lite::future::block_on(async {
98/// use async_channel::{bounded, TryRecvError, TrySendError};
99///
100/// let (s, r) = bounded(1);
101///
102/// assert_eq!(s.send(10).await, Ok(()));
103/// assert_eq!(s.try_send(20), Err(TrySendError::Full(20)));
104///
105/// assert_eq!(r.recv().await, Ok(10));
106/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
107/// # });
108/// ```
109pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
110    assert!(cap > 0, "capacity cannot be zero");
111
112    let channel = Arc::new(Channel {
113        queue: ConcurrentQueue::bounded(cap),
114        send_ops: Event::new(),
115        recv_ops: Event::new(),
116        stream_ops: Event::new(),
117        sender_count: AtomicUsize::new(1),
118        receiver_count: AtomicUsize::new(1),
119    });
120
121    let s = Sender {
122        channel: channel.clone(),
123    };
124    let r = Receiver {
125        channel,
126        listener: None,
127    };
128    (s, r)
129}
130
131/// Creates an unbounded channel.
132///
133/// The created channel can hold an unlimited number of messages.
134///
135/// # Examples
136///
137/// ```
138/// # futures_lite::future::block_on(async {
139/// use async_channel::{unbounded, TryRecvError};
140///
141/// let (s, r) = unbounded();
142///
143/// assert_eq!(s.send(10).await, Ok(()));
144/// assert_eq!(s.send(20).await, Ok(()));
145///
146/// assert_eq!(r.recv().await, Ok(10));
147/// assert_eq!(r.recv().await, Ok(20));
148/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
149/// # });
150/// ```
151pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
152    let channel = Arc::new(Channel {
153        queue: ConcurrentQueue::unbounded(),
154        send_ops: Event::new(),
155        recv_ops: Event::new(),
156        stream_ops: Event::new(),
157        sender_count: AtomicUsize::new(1),
158        receiver_count: AtomicUsize::new(1),
159    });
160
161    let s = Sender {
162        channel: channel.clone(),
163    };
164    let r = Receiver {
165        channel,
166        listener: None,
167    };
168    (s, r)
169}
170
171/// The sending side of a channel.
172///
173/// Senders can be cloned and shared among threads. When all senders associated with a channel are
174/// dropped, the channel becomes closed.
175///
176/// The channel can also be closed manually by calling [`Sender::close()`].
177pub struct Sender<T> {
178    /// Inner channel state.
179    channel: Arc<Channel<T>>,
180}
181
182impl<T> Sender<T> {
183    /// Attempts to send a message into the channel.
184    ///
185    /// If the channel is full or closed, this method returns an error.
186    ///
187    /// # Examples
188    ///
189    /// ```
190    /// use async_channel::{bounded, TrySendError};
191    ///
192    /// let (s, r) = bounded(1);
193    ///
194    /// assert_eq!(s.try_send(1), Ok(()));
195    /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
196    ///
197    /// drop(r);
198    /// assert_eq!(s.try_send(3), Err(TrySendError::Closed(3)));
199    /// ```
200    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
201        match self.channel.queue.push(msg) {
202            Ok(()) => {
203                // Notify a blocked receive operation. If the notified operation gets canceled,
204                // it will notify another blocked receive operation.
205                self.channel.recv_ops.notify_additional(1);
206
207                // Notify all blocked streams.
208                self.channel.stream_ops.notify(usize::MAX);
209
210                Ok(())
211            }
212            Err(PushError::Full(msg)) => Err(TrySendError::Full(msg)),
213            Err(PushError::Closed(msg)) => Err(TrySendError::Closed(msg)),
214        }
215    }
216
217    /// Sends a message into the channel.
218    ///
219    /// If the channel is full, this method waits until there is space for a message.
220    ///
221    /// If the channel is closed, this method returns an error.
222    ///
223    /// # Examples
224    ///
225    /// ```
226    /// # futures_lite::future::block_on(async {
227    /// use async_channel::{unbounded, SendError};
228    ///
229    /// let (s, r) = unbounded();
230    ///
231    /// assert_eq!(s.send(1).await, Ok(()));
232    /// drop(r);
233    /// assert_eq!(s.send(2).await, Err(SendError(2)));
234    /// # });
235    /// ```
236    pub fn send(&self, msg: T) -> Send<'_, T> {
237        Send {
238            sender: self,
239            listener: None,
240            msg: Some(msg),
241        }
242    }
243
244    /// Sends a message into this channel using the blocking strategy.
245    ///
246    /// If the channel is full, this method will block until there is room.
247    /// If the channel is closed, this method returns an error.
248    ///
249    /// # Blocking
250    ///
251    /// Rather than using asynchronous waiting, like the [`send`](Self::send) method,
252    /// this method will block the current thread until the message is sent.
253    ///
254    /// This method should not be used in an asynchronous context. It is intended
255    /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
256    /// Calling this method in an asynchronous context may result in deadlocks.
257    ///
258    /// # Examples
259    ///
260    /// ```
261    /// use async_channel::{unbounded, SendError};
262    ///
263    /// let (s, r) = unbounded();
264    ///
265    /// assert_eq!(s.send_blocking(1), Ok(()));
266    /// drop(r);
267    /// assert_eq!(s.send_blocking(2), Err(SendError(2)));
268    /// ```
269    pub fn send_blocking(&self, msg: T) -> Result<(), SendError<T>> {
270        self.send(msg).wait()
271    }
272
273    /// Closes the channel.
274    ///
275    /// Returns `true` if this call has closed the channel and it was not closed already.
276    ///
277    /// The remaining messages can still be received.
278    ///
279    /// # Examples
280    ///
281    /// ```
282    /// # futures_lite::future::block_on(async {
283    /// use async_channel::{unbounded, RecvError};
284    ///
285    /// let (s, r) = unbounded();
286    /// assert_eq!(s.send(1).await, Ok(()));
287    /// assert!(s.close());
288    ///
289    /// assert_eq!(r.recv().await, Ok(1));
290    /// assert_eq!(r.recv().await, Err(RecvError));
291    /// # });
292    /// ```
293    pub fn close(&self) -> bool {
294        self.channel.close()
295    }
296
297    /// Returns `true` if the channel is closed.
298    ///
299    /// # Examples
300    ///
301    /// ```
302    /// # futures_lite::future::block_on(async {
303    /// use async_channel::{unbounded, RecvError};
304    ///
305    /// let (s, r) = unbounded::<()>();
306    /// assert!(!s.is_closed());
307    ///
308    /// drop(r);
309    /// assert!(s.is_closed());
310    /// # });
311    /// ```
312    pub fn is_closed(&self) -> bool {
313        self.channel.queue.is_closed()
314    }
315
316    /// Returns `true` if the channel is empty.
317    ///
318    /// # Examples
319    ///
320    /// ```
321    /// # futures_lite::future::block_on(async {
322    /// use async_channel::unbounded;
323    ///
324    /// let (s, r) = unbounded();
325    ///
326    /// assert!(s.is_empty());
327    /// s.send(1).await;
328    /// assert!(!s.is_empty());
329    /// # });
330    /// ```
331    pub fn is_empty(&self) -> bool {
332        self.channel.queue.is_empty()
333    }
334
335    /// Returns `true` if the channel is full.
336    ///
337    /// Unbounded channels are never full.
338    ///
339    /// # Examples
340    ///
341    /// ```
342    /// # futures_lite::future::block_on(async {
343    /// use async_channel::bounded;
344    ///
345    /// let (s, r) = bounded(1);
346    ///
347    /// assert!(!s.is_full());
348    /// s.send(1).await;
349    /// assert!(s.is_full());
350    /// # });
351    /// ```
352    pub fn is_full(&self) -> bool {
353        self.channel.queue.is_full()
354    }
355
356    /// Returns the number of messages in the channel.
357    ///
358    /// # Examples
359    ///
360    /// ```
361    /// # futures_lite::future::block_on(async {
362    /// use async_channel::unbounded;
363    ///
364    /// let (s, r) = unbounded();
365    /// assert_eq!(s.len(), 0);
366    ///
367    /// s.send(1).await;
368    /// s.send(2).await;
369    /// assert_eq!(s.len(), 2);
370    /// # });
371    /// ```
372    pub fn len(&self) -> usize {
373        self.channel.queue.len()
374    }
375
376    /// Returns the channel capacity if it's bounded.
377    ///
378    /// # Examples
379    ///
380    /// ```
381    /// use async_channel::{bounded, unbounded};
382    ///
383    /// let (s, r) = bounded::<i32>(5);
384    /// assert_eq!(s.capacity(), Some(5));
385    ///
386    /// let (s, r) = unbounded::<i32>();
387    /// assert_eq!(s.capacity(), None);
388    /// ```
389    pub fn capacity(&self) -> Option<usize> {
390        self.channel.queue.capacity()
391    }
392
393    /// Returns the number of receivers for the channel.
394    ///
395    /// # Examples
396    ///
397    /// ```
398    /// # futures_lite::future::block_on(async {
399    /// use async_channel::unbounded;
400    ///
401    /// let (s, r) = unbounded::<()>();
402    /// assert_eq!(s.receiver_count(), 1);
403    ///
404    /// let r2 = r.clone();
405    /// assert_eq!(s.receiver_count(), 2);
406    /// # });
407    /// ```
408    pub fn receiver_count(&self) -> usize {
409        self.channel.receiver_count.load(Ordering::SeqCst)
410    }
411
412    /// Returns the number of senders for the channel.
413    ///
414    /// # Examples
415    ///
416    /// ```
417    /// # futures_lite::future::block_on(async {
418    /// use async_channel::unbounded;
419    ///
420    /// let (s, r) = unbounded::<()>();
421    /// assert_eq!(s.sender_count(), 1);
422    ///
423    /// let s2 = s.clone();
424    /// assert_eq!(s.sender_count(), 2);
425    /// # });
426    /// ```
427    pub fn sender_count(&self) -> usize {
428        self.channel.sender_count.load(Ordering::SeqCst)
429    }
430
431    /// Downgrade the sender to a weak reference.
432    pub fn downgrade(&self) -> WeakSender<T> {
433        WeakSender {
434            channel: self.channel.clone(),
435        }
436    }
437}
438
439impl<T> Drop for Sender<T> {
440    fn drop(&mut self) {
441        // Decrement the sender count and close the channel if it drops down to zero.
442        if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
443            self.channel.close();
444        }
445    }
446}
447
448impl<T> fmt::Debug for Sender<T> {
449    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
450        write!(f, "Sender {{ .. }}")
451    }
452}
453
454impl<T> Clone for Sender<T> {
455    fn clone(&self) -> Sender<T> {
456        let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
457
458        // Make sure the count never overflows, even if lots of sender clones are leaked.
459        if count > usize::MAX / 2 {
460            process::abort();
461        }
462
463        Sender {
464            channel: self.channel.clone(),
465        }
466    }
467}
468
469/// The receiving side of a channel.
470///
471/// Receivers can be cloned and shared among threads. When all receivers associated with a channel
472/// are dropped, the channel becomes closed.
473///
474/// The channel can also be closed manually by calling [`Receiver::close()`].
475///
476/// Receivers implement the [`Stream`] trait.
477pub struct Receiver<T> {
478    /// Inner channel state.
479    channel: Arc<Channel<T>>,
480
481    /// Listens for a send or close event to unblock this stream.
482    listener: Option<EventListener>,
483}
484
485impl<T> Receiver<T> {
486    /// Attempts to receive a message from the channel.
487    ///
488    /// If the channel is empty, or empty and closed, this method returns an error.
489    ///
490    /// # Examples
491    ///
492    /// ```
493    /// # futures_lite::future::block_on(async {
494    /// use async_channel::{unbounded, TryRecvError};
495    ///
496    /// let (s, r) = unbounded();
497    /// assert_eq!(s.send(1).await, Ok(()));
498    ///
499    /// assert_eq!(r.try_recv(), Ok(1));
500    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
501    ///
502    /// drop(s);
503    /// assert_eq!(r.try_recv(), Err(TryRecvError::Closed));
504    /// # });
505    /// ```
506    pub fn try_recv(&self) -> Result<T, TryRecvError> {
507        match self.channel.queue.pop() {
508            Ok(msg) => {
509                // Notify a blocked send operation. If the notified operation gets canceled, it
510                // will notify another blocked send operation.
511                self.channel.send_ops.notify_additional(1);
512
513                Ok(msg)
514            }
515            Err(PopError::Empty) => Err(TryRecvError::Empty),
516            Err(PopError::Closed) => Err(TryRecvError::Closed),
517        }
518    }
519
520    /// Receives a message from the channel.
521    ///
522    /// If the channel is empty, this method waits until there is a message.
523    ///
524    /// If the channel is closed, this method receives a message or returns an error if there are
525    /// no more messages.
526    ///
527    /// # Examples
528    ///
529    /// ```
530    /// # futures_lite::future::block_on(async {
531    /// use async_channel::{unbounded, RecvError};
532    ///
533    /// let (s, r) = unbounded();
534    ///
535    /// assert_eq!(s.send(1).await, Ok(()));
536    /// drop(s);
537    ///
538    /// assert_eq!(r.recv().await, Ok(1));
539    /// assert_eq!(r.recv().await, Err(RecvError));
540    /// # });
541    /// ```
542    pub fn recv(&self) -> Recv<'_, T> {
543        Recv {
544            receiver: self,
545            listener: None,
546        }
547    }
548
549    /// Receives a message from the channel using the blocking strategy.
550    ///
551    /// If the channel is empty, this method waits until there is a message.
552    /// If the channel is closed, this method receives a message or returns an error if there are
553    /// no more messages.
554    ///
555    /// # Blocking
556    ///
557    /// Rather than using asynchronous waiting, like the [`recv`](Self::recv) method,
558    /// this method will block the current thread until the message is sent.
559    ///
560    /// This method should not be used in an asynchronous context. It is intended
561    /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
562    /// Calling this method in an asynchronous context may result in deadlocks.
563    ///
564    /// # Examples
565    ///
566    /// ```
567    /// use async_channel::{unbounded, RecvError};
568    ///
569    /// let (s, r) = unbounded();
570    ///
571    /// assert_eq!(s.send_blocking(1), Ok(()));
572    /// drop(s);
573    ///
574    /// assert_eq!(r.recv_blocking(), Ok(1));
575    /// assert_eq!(r.recv_blocking(), Err(RecvError));
576    /// ```
577    pub fn recv_blocking(&self) -> Result<T, RecvError> {
578        self.recv().wait()
579    }
580
581    /// Closes the channel.
582    ///
583    /// Returns `true` if this call has closed the channel and it was not closed already.
584    ///
585    /// The remaining messages can still be received.
586    ///
587    /// # Examples
588    ///
589    /// ```
590    /// # futures_lite::future::block_on(async {
591    /// use async_channel::{unbounded, RecvError};
592    ///
593    /// let (s, r) = unbounded();
594    /// assert_eq!(s.send(1).await, Ok(()));
595    ///
596    /// assert!(r.close());
597    /// assert_eq!(r.recv().await, Ok(1));
598    /// assert_eq!(r.recv().await, Err(RecvError));
599    /// # });
600    /// ```
601    pub fn close(&self) -> bool {
602        self.channel.close()
603    }
604
605    /// Returns `true` if the channel is closed.
606    ///
607    /// # Examples
608    ///
609    /// ```
610    /// # futures_lite::future::block_on(async {
611    /// use async_channel::{unbounded, RecvError};
612    ///
613    /// let (s, r) = unbounded::<()>();
614    /// assert!(!r.is_closed());
615    ///
616    /// drop(s);
617    /// assert!(r.is_closed());
618    /// # });
619    /// ```
620    pub fn is_closed(&self) -> bool {
621        self.channel.queue.is_closed()
622    }
623
624    /// Returns `true` if the channel is empty.
625    ///
626    /// # Examples
627    ///
628    /// ```
629    /// # futures_lite::future::block_on(async {
630    /// use async_channel::unbounded;
631    ///
632    /// let (s, r) = unbounded();
633    ///
634    /// assert!(s.is_empty());
635    /// s.send(1).await;
636    /// assert!(!s.is_empty());
637    /// # });
638    /// ```
639    pub fn is_empty(&self) -> bool {
640        self.channel.queue.is_empty()
641    }
642
643    /// Returns `true` if the channel is full.
644    ///
645    /// Unbounded channels are never full.
646    ///
647    /// # Examples
648    ///
649    /// ```
650    /// # futures_lite::future::block_on(async {
651    /// use async_channel::bounded;
652    ///
653    /// let (s, r) = bounded(1);
654    ///
655    /// assert!(!r.is_full());
656    /// s.send(1).await;
657    /// assert!(r.is_full());
658    /// # });
659    /// ```
660    pub fn is_full(&self) -> bool {
661        self.channel.queue.is_full()
662    }
663
664    /// Returns the number of messages in the channel.
665    ///
666    /// # Examples
667    ///
668    /// ```
669    /// # futures_lite::future::block_on(async {
670    /// use async_channel::unbounded;
671    ///
672    /// let (s, r) = unbounded();
673    /// assert_eq!(r.len(), 0);
674    ///
675    /// s.send(1).await;
676    /// s.send(2).await;
677    /// assert_eq!(r.len(), 2);
678    /// # });
679    /// ```
680    pub fn len(&self) -> usize {
681        self.channel.queue.len()
682    }
683
684    /// Returns the channel capacity if it's bounded.
685    ///
686    /// # Examples
687    ///
688    /// ```
689    /// use async_channel::{bounded, unbounded};
690    ///
691    /// let (s, r) = bounded::<i32>(5);
692    /// assert_eq!(r.capacity(), Some(5));
693    ///
694    /// let (s, r) = unbounded::<i32>();
695    /// assert_eq!(r.capacity(), None);
696    /// ```
697    pub fn capacity(&self) -> Option<usize> {
698        self.channel.queue.capacity()
699    }
700
701    /// Returns the number of receivers for the channel.
702    ///
703    /// # Examples
704    ///
705    /// ```
706    /// # futures_lite::future::block_on(async {
707    /// use async_channel::unbounded;
708    ///
709    /// let (s, r) = unbounded::<()>();
710    /// assert_eq!(r.receiver_count(), 1);
711    ///
712    /// let r2 = r.clone();
713    /// assert_eq!(r.receiver_count(), 2);
714    /// # });
715    /// ```
716    pub fn receiver_count(&self) -> usize {
717        self.channel.receiver_count.load(Ordering::SeqCst)
718    }
719
720    /// Returns the number of senders for the channel.
721    ///
722    /// # Examples
723    ///
724    /// ```
725    /// # futures_lite::future::block_on(async {
726    /// use async_channel::unbounded;
727    ///
728    /// let (s, r) = unbounded::<()>();
729    /// assert_eq!(r.sender_count(), 1);
730    ///
731    /// let s2 = s.clone();
732    /// assert_eq!(r.sender_count(), 2);
733    /// # });
734    /// ```
735    pub fn sender_count(&self) -> usize {
736        self.channel.sender_count.load(Ordering::SeqCst)
737    }
738
739    /// Downgrade the receiver to a weak reference.
740    pub fn downgrade(&self) -> WeakReceiver<T> {
741        WeakReceiver {
742            channel: self.channel.clone(),
743        }
744    }
745}
746
747impl<T> Drop for Receiver<T> {
748    fn drop(&mut self) {
749        // Decrement the receiver count and close the channel if it drops down to zero.
750        if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
751            self.channel.close();
752        }
753    }
754}
755
756impl<T> fmt::Debug for Receiver<T> {
757    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
758        write!(f, "Receiver {{ .. }}")
759    }
760}
761
762impl<T> Clone for Receiver<T> {
763    fn clone(&self) -> Receiver<T> {
764        let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
765
766        // Make sure the count never overflows, even if lots of receiver clones are leaked.
767        if count > usize::MAX / 2 {
768            process::abort();
769        }
770
771        Receiver {
772            channel: self.channel.clone(),
773            listener: None,
774        }
775    }
776}
777
778impl<T> Stream for Receiver<T> {
779    type Item = T;
780
781    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
782        loop {
783            // If this stream is listening for events, first wait for a notification.
784            if let Some(listener) = self.listener.as_mut() {
785                futures_core::ready!(Pin::new(listener).poll(cx));
786                self.listener = None;
787            }
788
789            loop {
790                // Attempt to receive a message.
791                match self.try_recv() {
792                    Ok(msg) => {
793                        // The stream is not blocked on an event - drop the listener.
794                        self.listener = None;
795                        return Poll::Ready(Some(msg));
796                    }
797                    Err(TryRecvError::Closed) => {
798                        // The stream is not blocked on an event - drop the listener.
799                        self.listener = None;
800                        return Poll::Ready(None);
801                    }
802                    Err(TryRecvError::Empty) => {}
803                }
804
805                // Receiving failed - now start listening for notifications or wait for one.
806                match self.listener.as_mut() {
807                    None => {
808                        // Create a listener and try sending the message again.
809                        self.listener = Some(self.channel.stream_ops.listen());
810                    }
811                    Some(_) => {
812                        // Go back to the outer loop to poll the listener.
813                        break;
814                    }
815                }
816            }
817        }
818    }
819}
820
821impl<T> futures_core::stream::FusedStream for Receiver<T> {
822    fn is_terminated(&self) -> bool {
823        self.channel.queue.is_closed() && self.channel.queue.is_empty()
824    }
825}
826
827/// A [`Sender`] that prevents the channel from not being closed.
828///
829/// This is created through the [`Sender::downgrade`] method. In order to use it, it needs
830/// to be upgraded into a [`Sender`] through the `upgrade` method.
831pub struct WeakSender<T> {
832    channel: Arc<Channel<T>>,
833}
834
835impl<T> WeakSender<T> {
836    /// Upgrade the [`WeakSender`] into a [`Sender`].
837    pub fn upgrade(&self) -> Option<Sender<T>> {
838        if self.channel.queue.is_closed() {
839            None
840        } else {
841            match self.channel.sender_count.fetch_update(
842                Ordering::Relaxed,
843                Ordering::Relaxed,
844                |count| if count == 0 { None } else { Some(count + 1) },
845            ) {
846                Err(_) => None,
847                Ok(new_value) if new_value > usize::MAX / 2 => {
848                    // Make sure the count never overflows, even if lots of sender clones are leaked.
849                    process::abort();
850                }
851                Ok(_) => Some(Sender {
852                    channel: self.channel.clone(),
853                }),
854            }
855        }
856    }
857}
858
859impl<T> Clone for WeakSender<T> {
860    fn clone(&self) -> Self {
861        WeakSender {
862            channel: self.channel.clone(),
863        }
864    }
865}
866
867impl<T> fmt::Debug for WeakSender<T> {
868    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
869        write!(f, "WeakSender {{ .. }}")
870    }
871}
872
873/// A [`Receiver`] that prevents the channel from not being closed.
874///
875/// This is created through the [`Receiver::downgrade`] method. In order to use it, it needs
876/// to be upgraded into a [`Receiver`] through the `upgrade` method.
877pub struct WeakReceiver<T> {
878    channel: Arc<Channel<T>>,
879}
880
881impl<T> WeakReceiver<T> {
882    /// Upgrade the [`WeakReceiver`] into a [`Receiver`].
883    pub fn upgrade(&self) -> Option<Receiver<T>> {
884        if self.channel.queue.is_closed() {
885            None
886        } else {
887            match self.channel.receiver_count.fetch_update(
888                Ordering::Relaxed,
889                Ordering::Relaxed,
890                |count| if count == 0 { None } else { Some(count + 1) },
891            ) {
892                Err(_) => None,
893                Ok(new_value) if new_value > usize::MAX / 2 => {
894                    // Make sure the count never overflows, even if lots of receiver clones are leaked.
895                    process::abort();
896                }
897                Ok(_) => Some(Receiver {
898                    channel: self.channel.clone(),
899                    listener: None,
900                }),
901            }
902        }
903    }
904}
905
906impl<T> Clone for WeakReceiver<T> {
907    fn clone(&self) -> Self {
908        WeakReceiver {
909            channel: self.channel.clone(),
910        }
911    }
912}
913
914impl<T> fmt::Debug for WeakReceiver<T> {
915    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
916        write!(f, "WeakReceiver {{ .. }}")
917    }
918}
919
920/// An error returned from [`Sender::send()`].
921///
922/// Received because the channel is closed.
923#[derive(PartialEq, Eq, Clone, Copy)]
924pub struct SendError<T>(pub T);
925
926impl<T> SendError<T> {
927    /// Unwraps the message that couldn't be sent.
928    pub fn into_inner(self) -> T {
929        self.0
930    }
931}
932
933impl<T> error::Error for SendError<T> {}
934
935impl<T> fmt::Debug for SendError<T> {
936    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
937        write!(f, "SendError(..)")
938    }
939}
940
941impl<T> fmt::Display for SendError<T> {
942    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
943        write!(f, "sending into a closed channel")
944    }
945}
946
947/// An error returned from [`Sender::try_send()`].
948#[derive(PartialEq, Eq, Clone, Copy)]
949pub enum TrySendError<T> {
950    /// The channel is full but not closed.
951    Full(T),
952
953    /// The channel is closed.
954    Closed(T),
955}
956
957impl<T> TrySendError<T> {
958    /// Unwraps the message that couldn't be sent.
959    pub fn into_inner(self) -> T {
960        match self {
961            TrySendError::Full(t) => t,
962            TrySendError::Closed(t) => t,
963        }
964    }
965
966    /// Returns `true` if the channel is full but not closed.
967    pub fn is_full(&self) -> bool {
968        match self {
969            TrySendError::Full(_) => true,
970            TrySendError::Closed(_) => false,
971        }
972    }
973
974    /// Returns `true` if the channel is closed.
975    pub fn is_closed(&self) -> bool {
976        match self {
977            TrySendError::Full(_) => false,
978            TrySendError::Closed(_) => true,
979        }
980    }
981}
982
983impl<T> error::Error for TrySendError<T> {}
984
985impl<T> fmt::Debug for TrySendError<T> {
986    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
987        match *self {
988            TrySendError::Full(..) => write!(f, "Full(..)"),
989            TrySendError::Closed(..) => write!(f, "Closed(..)"),
990        }
991    }
992}
993
994impl<T> fmt::Display for TrySendError<T> {
995    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
996        match *self {
997            TrySendError::Full(..) => write!(f, "sending into a full channel"),
998            TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
999        }
1000    }
1001}
1002
1003/// An error returned from [`Receiver::recv()`].
1004///
1005/// Received because the channel is empty and closed.
1006#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1007pub struct RecvError;
1008
1009impl error::Error for RecvError {}
1010
1011impl fmt::Display for RecvError {
1012    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1013        write!(f, "receiving from an empty and closed channel")
1014    }
1015}
1016
1017/// An error returned from [`Receiver::try_recv()`].
1018#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1019pub enum TryRecvError {
1020    /// The channel is empty but not closed.
1021    Empty,
1022
1023    /// The channel is empty and closed.
1024    Closed,
1025}
1026
1027impl TryRecvError {
1028    /// Returns `true` if the channel is empty but not closed.
1029    pub fn is_empty(&self) -> bool {
1030        match self {
1031            TryRecvError::Empty => true,
1032            TryRecvError::Closed => false,
1033        }
1034    }
1035
1036    /// Returns `true` if the channel is empty and closed.
1037    pub fn is_closed(&self) -> bool {
1038        match self {
1039            TryRecvError::Empty => false,
1040            TryRecvError::Closed => true,
1041        }
1042    }
1043}
1044
1045impl error::Error for TryRecvError {}
1046
1047impl fmt::Display for TryRecvError {
1048    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1049        match *self {
1050            TryRecvError::Empty => write!(f, "receiving from an empty channel"),
1051            TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
1052        }
1053    }
1054}
1055
1056/// A future returned by [`Sender::send()`].
1057#[derive(Debug)]
1058#[must_use = "futures do nothing unless you `.await` or poll them"]
1059pub struct Send<'a, T> {
1060    sender: &'a Sender<T>,
1061    listener: Option<EventListener>,
1062    msg: Option<T>,
1063}
1064
1065impl<'a, T> Send<'a, T> {
1066    /// Run this future with the given `Strategy`.
1067    fn run_with_strategy<S: Strategy>(
1068        &mut self,
1069        cx: &mut S::Context,
1070    ) -> Poll<Result<(), SendError<T>>> {
1071        loop {
1072            let msg = self.msg.take().unwrap();
1073            // Attempt to send a message.
1074            match self.sender.try_send(msg) {
1075                Ok(()) => return Poll::Ready(Ok(())),
1076                Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
1077                Err(TrySendError::Full(m)) => self.msg = Some(m),
1078            }
1079
1080            // Sending failed - now start listening for notifications or wait for one.
1081            match self.listener.take() {
1082                None => {
1083                    // Start listening and then try sending again.
1084                    self.listener = Some(self.sender.channel.send_ops.listen());
1085                }
1086                Some(l) => {
1087                    // Poll using the given strategy
1088                    if let Err(l) = S::poll(l, cx) {
1089                        self.listener = Some(l);
1090                        return Poll::Pending;
1091                    }
1092                }
1093            }
1094        }
1095    }
1096
1097    /// Run using the blocking strategy.
1098    fn wait(mut self) -> Result<(), SendError<T>> {
1099        match self.run_with_strategy::<Blocking>(&mut ()) {
1100            Poll::Ready(res) => res,
1101            Poll::Pending => unreachable!(),
1102        }
1103    }
1104}
1105
1106impl<'a, T> Unpin for Send<'a, T> {}
1107
1108impl<'a, T> Future for Send<'a, T> {
1109    type Output = Result<(), SendError<T>>;
1110
1111    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1112        self.run_with_strategy::<NonBlocking<'_>>(cx)
1113    }
1114}
1115
1116/// A future returned by [`Receiver::recv()`].
1117#[derive(Debug)]
1118#[must_use = "futures do nothing unless you `.await` or poll them"]
1119pub struct Recv<'a, T> {
1120    receiver: &'a Receiver<T>,
1121    listener: Option<EventListener>,
1122}
1123
1124impl<'a, T> Unpin for Recv<'a, T> {}
1125
1126impl<'a, T> Recv<'a, T> {
1127    /// Run this future with the given `Strategy`.
1128    fn run_with_strategy<S: Strategy>(
1129        &mut self,
1130        cx: &mut S::Context,
1131    ) -> Poll<Result<T, RecvError>> {
1132        loop {
1133            // Attempt to receive a message.
1134            match self.receiver.try_recv() {
1135                Ok(msg) => return Poll::Ready(Ok(msg)),
1136                Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)),
1137                Err(TryRecvError::Empty) => {}
1138            }
1139
1140            // Receiving failed - now start listening for notifications or wait for one.
1141            match self.listener.take() {
1142                None => {
1143                    // Start listening and then try receiving again.
1144                    self.listener = Some(self.receiver.channel.recv_ops.listen());
1145                }
1146                Some(l) => {
1147                    // Poll using the given strategy.
1148                    if let Err(l) = S::poll(l, cx) {
1149                        self.listener = Some(l);
1150                        return Poll::Pending;
1151                    }
1152                }
1153            }
1154        }
1155    }
1156
1157    /// Run with the blocking strategy.
1158    fn wait(mut self) -> Result<T, RecvError> {
1159        match self.run_with_strategy::<Blocking>(&mut ()) {
1160            Poll::Ready(res) => res,
1161            Poll::Pending => unreachable!(),
1162        }
1163    }
1164}
1165
1166impl<'a, T> Future for Recv<'a, T> {
1167    type Output = Result<T, RecvError>;
1168
1169    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1170        self.run_with_strategy::<NonBlocking<'_>>(cx)
1171    }
1172}
1173
1174/// A strategy used to poll an `EventListener`.
1175trait Strategy {
1176    /// Context needed to be provided to the `poll` method.
1177    type Context;
1178
1179    /// Polls the given `EventListener`.
1180    ///
1181    /// Returns the `EventListener` back if it was not completed; otherwise,
1182    /// returns `Ok(())`.
1183    fn poll(evl: EventListener, cx: &mut Self::Context) -> Result<(), EventListener>;
1184}
1185
1186/// Non-blocking strategy for use in asynchronous code.
1187struct NonBlocking<'a>(&'a mut ());
1188
1189impl<'a> Strategy for NonBlocking<'a> {
1190    type Context = Context<'a>;
1191
1192    fn poll(mut evl: EventListener, cx: &mut Context<'a>) -> Result<(), EventListener> {
1193        match Pin::new(&mut evl).poll(cx) {
1194            Poll::Ready(()) => Ok(()),
1195            Poll::Pending => Err(evl),
1196        }
1197    }
1198}
1199
1200/// Blocking strategy for use in synchronous code.
1201struct Blocking;
1202
1203impl Strategy for Blocking {
1204    type Context = ();
1205
1206    fn poll(evl: EventListener, _cx: &mut ()) -> Result<(), EventListener> {
1207        evl.wait();
1208        Ok(())
1209    }
1210}