async_channel/lib.rs
1//! An async multi-producer multi-consumer channel, where each message can be received by only
2//! one of all existing consumers.
3//!
4//! There are two kinds of channels:
5//!
6//! 1. [Bounded][`bounded()`] channel with limited capacity.
7//! 2. [Unbounded][`unbounded()`] channel with unlimited capacity.
8//!
9//! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable and can be shared
10//! among multiple threads.
11//!
12//! When all [`Sender`]s or all [`Receiver`]s are dropped, the channel becomes closed. When a
13//! channel is closed, no more messages can be sent, but remaining messages can still be received.
14//!
15//! The channel can also be closed manually by calling [`Sender::close()`] or
16//! [`Receiver::close()`].
17//!
18//! # Examples
19//!
20//! ```
21//! # futures_lite::future::block_on(async {
22//! let (s, r) = async_channel::unbounded();
23//!
24//! assert_eq!(s.send("Hello").await, Ok(()));
25//! assert_eq!(r.recv().await, Ok("Hello"));
26//! # });
27//! ```
28
29#![forbid(unsafe_code)]
30#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
31
32use std::error;
33use std::fmt;
34use std::future::Future;
35use std::pin::Pin;
36use std::process;
37use std::sync::atomic::{AtomicUsize, Ordering};
38use std::sync::Arc;
39use std::task::{Context, Poll};
40use std::usize;
41
42use concurrent_queue::{ConcurrentQueue, PopError, PushError};
43use event_listener::{Event, EventListener};
44use futures_core::stream::Stream;
45
46struct Channel<T> {
47 /// Inner message queue.
48 queue: ConcurrentQueue<T>,
49
50 /// Send operations waiting while the channel is full.
51 send_ops: Event,
52
53 /// Receive operations waiting while the channel is empty and not closed.
54 recv_ops: Event,
55
56 /// Stream operations while the channel is empty and not closed.
57 stream_ops: Event,
58
59 /// The number of currently active `Sender`s.
60 sender_count: AtomicUsize,
61
62 /// The number of currently active `Receivers`s.
63 receiver_count: AtomicUsize,
64}
65
66impl<T> Channel<T> {
67 /// Closes the channel and notifies all blocked operations.
68 ///
69 /// Returns `true` if this call has closed the channel and it was not closed already.
70 fn close(&self) -> bool {
71 if self.queue.close() {
72 // Notify all send operations.
73 self.send_ops.notify(usize::MAX);
74
75 // Notify all receive and stream operations.
76 self.recv_ops.notify(usize::MAX);
77 self.stream_ops.notify(usize::MAX);
78
79 true
80 } else {
81 false
82 }
83 }
84}
85
86/// Creates a bounded channel.
87///
88/// The created channel has space to hold at most `cap` messages at a time.
89///
90/// # Panics
91///
92/// Capacity must be a positive number. If `cap` is zero, this function will panic.
93///
94/// # Examples
95///
96/// ```
97/// # futures_lite::future::block_on(async {
98/// use async_channel::{bounded, TryRecvError, TrySendError};
99///
100/// let (s, r) = bounded(1);
101///
102/// assert_eq!(s.send(10).await, Ok(()));
103/// assert_eq!(s.try_send(20), Err(TrySendError::Full(20)));
104///
105/// assert_eq!(r.recv().await, Ok(10));
106/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
107/// # });
108/// ```
109pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
110 assert!(cap > 0, "capacity cannot be zero");
111
112 let channel = Arc::new(Channel {
113 queue: ConcurrentQueue::bounded(cap),
114 send_ops: Event::new(),
115 recv_ops: Event::new(),
116 stream_ops: Event::new(),
117 sender_count: AtomicUsize::new(1),
118 receiver_count: AtomicUsize::new(1),
119 });
120
121 let s = Sender {
122 channel: channel.clone(),
123 };
124 let r = Receiver {
125 channel,
126 listener: None,
127 };
128 (s, r)
129}
130
131/// Creates an unbounded channel.
132///
133/// The created channel can hold an unlimited number of messages.
134///
135/// # Examples
136///
137/// ```
138/// # futures_lite::future::block_on(async {
139/// use async_channel::{unbounded, TryRecvError};
140///
141/// let (s, r) = unbounded();
142///
143/// assert_eq!(s.send(10).await, Ok(()));
144/// assert_eq!(s.send(20).await, Ok(()));
145///
146/// assert_eq!(r.recv().await, Ok(10));
147/// assert_eq!(r.recv().await, Ok(20));
148/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
149/// # });
150/// ```
151pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
152 let channel = Arc::new(Channel {
153 queue: ConcurrentQueue::unbounded(),
154 send_ops: Event::new(),
155 recv_ops: Event::new(),
156 stream_ops: Event::new(),
157 sender_count: AtomicUsize::new(1),
158 receiver_count: AtomicUsize::new(1),
159 });
160
161 let s = Sender {
162 channel: channel.clone(),
163 };
164 let r = Receiver {
165 channel,
166 listener: None,
167 };
168 (s, r)
169}
170
171/// The sending side of a channel.
172///
173/// Senders can be cloned and shared among threads. When all senders associated with a channel are
174/// dropped, the channel becomes closed.
175///
176/// The channel can also be closed manually by calling [`Sender::close()`].
177pub struct Sender<T> {
178 /// Inner channel state.
179 channel: Arc<Channel<T>>,
180}
181
182impl<T> Sender<T> {
183 /// Attempts to send a message into the channel.
184 ///
185 /// If the channel is full or closed, this method returns an error.
186 ///
187 /// # Examples
188 ///
189 /// ```
190 /// use async_channel::{bounded, TrySendError};
191 ///
192 /// let (s, r) = bounded(1);
193 ///
194 /// assert_eq!(s.try_send(1), Ok(()));
195 /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
196 ///
197 /// drop(r);
198 /// assert_eq!(s.try_send(3), Err(TrySendError::Closed(3)));
199 /// ```
200 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
201 match self.channel.queue.push(msg) {
202 Ok(()) => {
203 // Notify a blocked receive operation. If the notified operation gets canceled,
204 // it will notify another blocked receive operation.
205 self.channel.recv_ops.notify_additional(1);
206
207 // Notify all blocked streams.
208 self.channel.stream_ops.notify(usize::MAX);
209
210 Ok(())
211 }
212 Err(PushError::Full(msg)) => Err(TrySendError::Full(msg)),
213 Err(PushError::Closed(msg)) => Err(TrySendError::Closed(msg)),
214 }
215 }
216
217 /// Sends a message into the channel.
218 ///
219 /// If the channel is full, this method waits until there is space for a message.
220 ///
221 /// If the channel is closed, this method returns an error.
222 ///
223 /// # Examples
224 ///
225 /// ```
226 /// # futures_lite::future::block_on(async {
227 /// use async_channel::{unbounded, SendError};
228 ///
229 /// let (s, r) = unbounded();
230 ///
231 /// assert_eq!(s.send(1).await, Ok(()));
232 /// drop(r);
233 /// assert_eq!(s.send(2).await, Err(SendError(2)));
234 /// # });
235 /// ```
236 pub fn send(&self, msg: T) -> Send<'_, T> {
237 Send {
238 sender: self,
239 listener: None,
240 msg: Some(msg),
241 }
242 }
243
244 /// Sends a message into this channel using the blocking strategy.
245 ///
246 /// If the channel is full, this method will block until there is room.
247 /// If the channel is closed, this method returns an error.
248 ///
249 /// # Blocking
250 ///
251 /// Rather than using asynchronous waiting, like the [`send`](Self::send) method,
252 /// this method will block the current thread until the message is sent.
253 ///
254 /// This method should not be used in an asynchronous context. It is intended
255 /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
256 /// Calling this method in an asynchronous context may result in deadlocks.
257 ///
258 /// # Examples
259 ///
260 /// ```
261 /// use async_channel::{unbounded, SendError};
262 ///
263 /// let (s, r) = unbounded();
264 ///
265 /// assert_eq!(s.send_blocking(1), Ok(()));
266 /// drop(r);
267 /// assert_eq!(s.send_blocking(2), Err(SendError(2)));
268 /// ```
269 pub fn send_blocking(&self, msg: T) -> Result<(), SendError<T>> {
270 self.send(msg).wait()
271 }
272
273 /// Closes the channel.
274 ///
275 /// Returns `true` if this call has closed the channel and it was not closed already.
276 ///
277 /// The remaining messages can still be received.
278 ///
279 /// # Examples
280 ///
281 /// ```
282 /// # futures_lite::future::block_on(async {
283 /// use async_channel::{unbounded, RecvError};
284 ///
285 /// let (s, r) = unbounded();
286 /// assert_eq!(s.send(1).await, Ok(()));
287 /// assert!(s.close());
288 ///
289 /// assert_eq!(r.recv().await, Ok(1));
290 /// assert_eq!(r.recv().await, Err(RecvError));
291 /// # });
292 /// ```
293 pub fn close(&self) -> bool {
294 self.channel.close()
295 }
296
297 /// Returns `true` if the channel is closed.
298 ///
299 /// # Examples
300 ///
301 /// ```
302 /// # futures_lite::future::block_on(async {
303 /// use async_channel::{unbounded, RecvError};
304 ///
305 /// let (s, r) = unbounded::<()>();
306 /// assert!(!s.is_closed());
307 ///
308 /// drop(r);
309 /// assert!(s.is_closed());
310 /// # });
311 /// ```
312 pub fn is_closed(&self) -> bool {
313 self.channel.queue.is_closed()
314 }
315
316 /// Returns `true` if the channel is empty.
317 ///
318 /// # Examples
319 ///
320 /// ```
321 /// # futures_lite::future::block_on(async {
322 /// use async_channel::unbounded;
323 ///
324 /// let (s, r) = unbounded();
325 ///
326 /// assert!(s.is_empty());
327 /// s.send(1).await;
328 /// assert!(!s.is_empty());
329 /// # });
330 /// ```
331 pub fn is_empty(&self) -> bool {
332 self.channel.queue.is_empty()
333 }
334
335 /// Returns `true` if the channel is full.
336 ///
337 /// Unbounded channels are never full.
338 ///
339 /// # Examples
340 ///
341 /// ```
342 /// # futures_lite::future::block_on(async {
343 /// use async_channel::bounded;
344 ///
345 /// let (s, r) = bounded(1);
346 ///
347 /// assert!(!s.is_full());
348 /// s.send(1).await;
349 /// assert!(s.is_full());
350 /// # });
351 /// ```
352 pub fn is_full(&self) -> bool {
353 self.channel.queue.is_full()
354 }
355
356 /// Returns the number of messages in the channel.
357 ///
358 /// # Examples
359 ///
360 /// ```
361 /// # futures_lite::future::block_on(async {
362 /// use async_channel::unbounded;
363 ///
364 /// let (s, r) = unbounded();
365 /// assert_eq!(s.len(), 0);
366 ///
367 /// s.send(1).await;
368 /// s.send(2).await;
369 /// assert_eq!(s.len(), 2);
370 /// # });
371 /// ```
372 pub fn len(&self) -> usize {
373 self.channel.queue.len()
374 }
375
376 /// Returns the channel capacity if it's bounded.
377 ///
378 /// # Examples
379 ///
380 /// ```
381 /// use async_channel::{bounded, unbounded};
382 ///
383 /// let (s, r) = bounded::<i32>(5);
384 /// assert_eq!(s.capacity(), Some(5));
385 ///
386 /// let (s, r) = unbounded::<i32>();
387 /// assert_eq!(s.capacity(), None);
388 /// ```
389 pub fn capacity(&self) -> Option<usize> {
390 self.channel.queue.capacity()
391 }
392
393 /// Returns the number of receivers for the channel.
394 ///
395 /// # Examples
396 ///
397 /// ```
398 /// # futures_lite::future::block_on(async {
399 /// use async_channel::unbounded;
400 ///
401 /// let (s, r) = unbounded::<()>();
402 /// assert_eq!(s.receiver_count(), 1);
403 ///
404 /// let r2 = r.clone();
405 /// assert_eq!(s.receiver_count(), 2);
406 /// # });
407 /// ```
408 pub fn receiver_count(&self) -> usize {
409 self.channel.receiver_count.load(Ordering::SeqCst)
410 }
411
412 /// Returns the number of senders for the channel.
413 ///
414 /// # Examples
415 ///
416 /// ```
417 /// # futures_lite::future::block_on(async {
418 /// use async_channel::unbounded;
419 ///
420 /// let (s, r) = unbounded::<()>();
421 /// assert_eq!(s.sender_count(), 1);
422 ///
423 /// let s2 = s.clone();
424 /// assert_eq!(s.sender_count(), 2);
425 /// # });
426 /// ```
427 pub fn sender_count(&self) -> usize {
428 self.channel.sender_count.load(Ordering::SeqCst)
429 }
430
431 /// Downgrade the sender to a weak reference.
432 pub fn downgrade(&self) -> WeakSender<T> {
433 WeakSender {
434 channel: self.channel.clone(),
435 }
436 }
437}
438
439impl<T> Drop for Sender<T> {
440 fn drop(&mut self) {
441 // Decrement the sender count and close the channel if it drops down to zero.
442 if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
443 self.channel.close();
444 }
445 }
446}
447
448impl<T> fmt::Debug for Sender<T> {
449 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
450 write!(f, "Sender {{ .. }}")
451 }
452}
453
454impl<T> Clone for Sender<T> {
455 fn clone(&self) -> Sender<T> {
456 let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
457
458 // Make sure the count never overflows, even if lots of sender clones are leaked.
459 if count > usize::MAX / 2 {
460 process::abort();
461 }
462
463 Sender {
464 channel: self.channel.clone(),
465 }
466 }
467}
468
469/// The receiving side of a channel.
470///
471/// Receivers can be cloned and shared among threads. When all receivers associated with a channel
472/// are dropped, the channel becomes closed.
473///
474/// The channel can also be closed manually by calling [`Receiver::close()`].
475///
476/// Receivers implement the [`Stream`] trait.
477pub struct Receiver<T> {
478 /// Inner channel state.
479 channel: Arc<Channel<T>>,
480
481 /// Listens for a send or close event to unblock this stream.
482 listener: Option<EventListener>,
483}
484
485impl<T> Receiver<T> {
486 /// Attempts to receive a message from the channel.
487 ///
488 /// If the channel is empty, or empty and closed, this method returns an error.
489 ///
490 /// # Examples
491 ///
492 /// ```
493 /// # futures_lite::future::block_on(async {
494 /// use async_channel::{unbounded, TryRecvError};
495 ///
496 /// let (s, r) = unbounded();
497 /// assert_eq!(s.send(1).await, Ok(()));
498 ///
499 /// assert_eq!(r.try_recv(), Ok(1));
500 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
501 ///
502 /// drop(s);
503 /// assert_eq!(r.try_recv(), Err(TryRecvError::Closed));
504 /// # });
505 /// ```
506 pub fn try_recv(&self) -> Result<T, TryRecvError> {
507 match self.channel.queue.pop() {
508 Ok(msg) => {
509 // Notify a blocked send operation. If the notified operation gets canceled, it
510 // will notify another blocked send operation.
511 self.channel.send_ops.notify_additional(1);
512
513 Ok(msg)
514 }
515 Err(PopError::Empty) => Err(TryRecvError::Empty),
516 Err(PopError::Closed) => Err(TryRecvError::Closed),
517 }
518 }
519
520 /// Receives a message from the channel.
521 ///
522 /// If the channel is empty, this method waits until there is a message.
523 ///
524 /// If the channel is closed, this method receives a message or returns an error if there are
525 /// no more messages.
526 ///
527 /// # Examples
528 ///
529 /// ```
530 /// # futures_lite::future::block_on(async {
531 /// use async_channel::{unbounded, RecvError};
532 ///
533 /// let (s, r) = unbounded();
534 ///
535 /// assert_eq!(s.send(1).await, Ok(()));
536 /// drop(s);
537 ///
538 /// assert_eq!(r.recv().await, Ok(1));
539 /// assert_eq!(r.recv().await, Err(RecvError));
540 /// # });
541 /// ```
542 pub fn recv(&self) -> Recv<'_, T> {
543 Recv {
544 receiver: self,
545 listener: None,
546 }
547 }
548
549 /// Receives a message from the channel using the blocking strategy.
550 ///
551 /// If the channel is empty, this method waits until there is a message.
552 /// If the channel is closed, this method receives a message or returns an error if there are
553 /// no more messages.
554 ///
555 /// # Blocking
556 ///
557 /// Rather than using asynchronous waiting, like the [`recv`](Self::recv) method,
558 /// this method will block the current thread until the message is sent.
559 ///
560 /// This method should not be used in an asynchronous context. It is intended
561 /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
562 /// Calling this method in an asynchronous context may result in deadlocks.
563 ///
564 /// # Examples
565 ///
566 /// ```
567 /// use async_channel::{unbounded, RecvError};
568 ///
569 /// let (s, r) = unbounded();
570 ///
571 /// assert_eq!(s.send_blocking(1), Ok(()));
572 /// drop(s);
573 ///
574 /// assert_eq!(r.recv_blocking(), Ok(1));
575 /// assert_eq!(r.recv_blocking(), Err(RecvError));
576 /// ```
577 pub fn recv_blocking(&self) -> Result<T, RecvError> {
578 self.recv().wait()
579 }
580
581 /// Closes the channel.
582 ///
583 /// Returns `true` if this call has closed the channel and it was not closed already.
584 ///
585 /// The remaining messages can still be received.
586 ///
587 /// # Examples
588 ///
589 /// ```
590 /// # futures_lite::future::block_on(async {
591 /// use async_channel::{unbounded, RecvError};
592 ///
593 /// let (s, r) = unbounded();
594 /// assert_eq!(s.send(1).await, Ok(()));
595 ///
596 /// assert!(r.close());
597 /// assert_eq!(r.recv().await, Ok(1));
598 /// assert_eq!(r.recv().await, Err(RecvError));
599 /// # });
600 /// ```
601 pub fn close(&self) -> bool {
602 self.channel.close()
603 }
604
605 /// Returns `true` if the channel is closed.
606 ///
607 /// # Examples
608 ///
609 /// ```
610 /// # futures_lite::future::block_on(async {
611 /// use async_channel::{unbounded, RecvError};
612 ///
613 /// let (s, r) = unbounded::<()>();
614 /// assert!(!r.is_closed());
615 ///
616 /// drop(s);
617 /// assert!(r.is_closed());
618 /// # });
619 /// ```
620 pub fn is_closed(&self) -> bool {
621 self.channel.queue.is_closed()
622 }
623
624 /// Returns `true` if the channel is empty.
625 ///
626 /// # Examples
627 ///
628 /// ```
629 /// # futures_lite::future::block_on(async {
630 /// use async_channel::unbounded;
631 ///
632 /// let (s, r) = unbounded();
633 ///
634 /// assert!(s.is_empty());
635 /// s.send(1).await;
636 /// assert!(!s.is_empty());
637 /// # });
638 /// ```
639 pub fn is_empty(&self) -> bool {
640 self.channel.queue.is_empty()
641 }
642
643 /// Returns `true` if the channel is full.
644 ///
645 /// Unbounded channels are never full.
646 ///
647 /// # Examples
648 ///
649 /// ```
650 /// # futures_lite::future::block_on(async {
651 /// use async_channel::bounded;
652 ///
653 /// let (s, r) = bounded(1);
654 ///
655 /// assert!(!r.is_full());
656 /// s.send(1).await;
657 /// assert!(r.is_full());
658 /// # });
659 /// ```
660 pub fn is_full(&self) -> bool {
661 self.channel.queue.is_full()
662 }
663
664 /// Returns the number of messages in the channel.
665 ///
666 /// # Examples
667 ///
668 /// ```
669 /// # futures_lite::future::block_on(async {
670 /// use async_channel::unbounded;
671 ///
672 /// let (s, r) = unbounded();
673 /// assert_eq!(r.len(), 0);
674 ///
675 /// s.send(1).await;
676 /// s.send(2).await;
677 /// assert_eq!(r.len(), 2);
678 /// # });
679 /// ```
680 pub fn len(&self) -> usize {
681 self.channel.queue.len()
682 }
683
684 /// Returns the channel capacity if it's bounded.
685 ///
686 /// # Examples
687 ///
688 /// ```
689 /// use async_channel::{bounded, unbounded};
690 ///
691 /// let (s, r) = bounded::<i32>(5);
692 /// assert_eq!(r.capacity(), Some(5));
693 ///
694 /// let (s, r) = unbounded::<i32>();
695 /// assert_eq!(r.capacity(), None);
696 /// ```
697 pub fn capacity(&self) -> Option<usize> {
698 self.channel.queue.capacity()
699 }
700
701 /// Returns the number of receivers for the channel.
702 ///
703 /// # Examples
704 ///
705 /// ```
706 /// # futures_lite::future::block_on(async {
707 /// use async_channel::unbounded;
708 ///
709 /// let (s, r) = unbounded::<()>();
710 /// assert_eq!(r.receiver_count(), 1);
711 ///
712 /// let r2 = r.clone();
713 /// assert_eq!(r.receiver_count(), 2);
714 /// # });
715 /// ```
716 pub fn receiver_count(&self) -> usize {
717 self.channel.receiver_count.load(Ordering::SeqCst)
718 }
719
720 /// Returns the number of senders for the channel.
721 ///
722 /// # Examples
723 ///
724 /// ```
725 /// # futures_lite::future::block_on(async {
726 /// use async_channel::unbounded;
727 ///
728 /// let (s, r) = unbounded::<()>();
729 /// assert_eq!(r.sender_count(), 1);
730 ///
731 /// let s2 = s.clone();
732 /// assert_eq!(r.sender_count(), 2);
733 /// # });
734 /// ```
735 pub fn sender_count(&self) -> usize {
736 self.channel.sender_count.load(Ordering::SeqCst)
737 }
738
739 /// Downgrade the receiver to a weak reference.
740 pub fn downgrade(&self) -> WeakReceiver<T> {
741 WeakReceiver {
742 channel: self.channel.clone(),
743 }
744 }
745}
746
747impl<T> Drop for Receiver<T> {
748 fn drop(&mut self) {
749 // Decrement the receiver count and close the channel if it drops down to zero.
750 if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
751 self.channel.close();
752 }
753 }
754}
755
756impl<T> fmt::Debug for Receiver<T> {
757 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
758 write!(f, "Receiver {{ .. }}")
759 }
760}
761
762impl<T> Clone for Receiver<T> {
763 fn clone(&self) -> Receiver<T> {
764 let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
765
766 // Make sure the count never overflows, even if lots of receiver clones are leaked.
767 if count > usize::MAX / 2 {
768 process::abort();
769 }
770
771 Receiver {
772 channel: self.channel.clone(),
773 listener: None,
774 }
775 }
776}
777
778impl<T> Stream for Receiver<T> {
779 type Item = T;
780
781 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
782 loop {
783 // If this stream is listening for events, first wait for a notification.
784 if let Some(listener) = self.listener.as_mut() {
785 futures_core::ready!(Pin::new(listener).poll(cx));
786 self.listener = None;
787 }
788
789 loop {
790 // Attempt to receive a message.
791 match self.try_recv() {
792 Ok(msg) => {
793 // The stream is not blocked on an event - drop the listener.
794 self.listener = None;
795 return Poll::Ready(Some(msg));
796 }
797 Err(TryRecvError::Closed) => {
798 // The stream is not blocked on an event - drop the listener.
799 self.listener = None;
800 return Poll::Ready(None);
801 }
802 Err(TryRecvError::Empty) => {}
803 }
804
805 // Receiving failed - now start listening for notifications or wait for one.
806 match self.listener.as_mut() {
807 None => {
808 // Create a listener and try sending the message again.
809 self.listener = Some(self.channel.stream_ops.listen());
810 }
811 Some(_) => {
812 // Go back to the outer loop to poll the listener.
813 break;
814 }
815 }
816 }
817 }
818 }
819}
820
821impl<T> futures_core::stream::FusedStream for Receiver<T> {
822 fn is_terminated(&self) -> bool {
823 self.channel.queue.is_closed() && self.channel.queue.is_empty()
824 }
825}
826
827/// A [`Sender`] that prevents the channel from not being closed.
828///
829/// This is created through the [`Sender::downgrade`] method. In order to use it, it needs
830/// to be upgraded into a [`Sender`] through the `upgrade` method.
831pub struct WeakSender<T> {
832 channel: Arc<Channel<T>>,
833}
834
835impl<T> WeakSender<T> {
836 /// Upgrade the [`WeakSender`] into a [`Sender`].
837 pub fn upgrade(&self) -> Option<Sender<T>> {
838 if self.channel.queue.is_closed() {
839 None
840 } else {
841 match self.channel.sender_count.fetch_update(
842 Ordering::Relaxed,
843 Ordering::Relaxed,
844 |count| if count == 0 { None } else { Some(count + 1) },
845 ) {
846 Err(_) => None,
847 Ok(new_value) if new_value > usize::MAX / 2 => {
848 // Make sure the count never overflows, even if lots of sender clones are leaked.
849 process::abort();
850 }
851 Ok(_) => Some(Sender {
852 channel: self.channel.clone(),
853 }),
854 }
855 }
856 }
857}
858
859impl<T> Clone for WeakSender<T> {
860 fn clone(&self) -> Self {
861 WeakSender {
862 channel: self.channel.clone(),
863 }
864 }
865}
866
867impl<T> fmt::Debug for WeakSender<T> {
868 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
869 write!(f, "WeakSender {{ .. }}")
870 }
871}
872
873/// A [`Receiver`] that prevents the channel from not being closed.
874///
875/// This is created through the [`Receiver::downgrade`] method. In order to use it, it needs
876/// to be upgraded into a [`Receiver`] through the `upgrade` method.
877pub struct WeakReceiver<T> {
878 channel: Arc<Channel<T>>,
879}
880
881impl<T> WeakReceiver<T> {
882 /// Upgrade the [`WeakReceiver`] into a [`Receiver`].
883 pub fn upgrade(&self) -> Option<Receiver<T>> {
884 if self.channel.queue.is_closed() {
885 None
886 } else {
887 match self.channel.receiver_count.fetch_update(
888 Ordering::Relaxed,
889 Ordering::Relaxed,
890 |count| if count == 0 { None } else { Some(count + 1) },
891 ) {
892 Err(_) => None,
893 Ok(new_value) if new_value > usize::MAX / 2 => {
894 // Make sure the count never overflows, even if lots of receiver clones are leaked.
895 process::abort();
896 }
897 Ok(_) => Some(Receiver {
898 channel: self.channel.clone(),
899 listener: None,
900 }),
901 }
902 }
903 }
904}
905
906impl<T> Clone for WeakReceiver<T> {
907 fn clone(&self) -> Self {
908 WeakReceiver {
909 channel: self.channel.clone(),
910 }
911 }
912}
913
914impl<T> fmt::Debug for WeakReceiver<T> {
915 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
916 write!(f, "WeakReceiver {{ .. }}")
917 }
918}
919
920/// An error returned from [`Sender::send()`].
921///
922/// Received because the channel is closed.
923#[derive(PartialEq, Eq, Clone, Copy)]
924pub struct SendError<T>(pub T);
925
926impl<T> SendError<T> {
927 /// Unwraps the message that couldn't be sent.
928 pub fn into_inner(self) -> T {
929 self.0
930 }
931}
932
933impl<T> error::Error for SendError<T> {}
934
935impl<T> fmt::Debug for SendError<T> {
936 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
937 write!(f, "SendError(..)")
938 }
939}
940
941impl<T> fmt::Display for SendError<T> {
942 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
943 write!(f, "sending into a closed channel")
944 }
945}
946
947/// An error returned from [`Sender::try_send()`].
948#[derive(PartialEq, Eq, Clone, Copy)]
949pub enum TrySendError<T> {
950 /// The channel is full but not closed.
951 Full(T),
952
953 /// The channel is closed.
954 Closed(T),
955}
956
957impl<T> TrySendError<T> {
958 /// Unwraps the message that couldn't be sent.
959 pub fn into_inner(self) -> T {
960 match self {
961 TrySendError::Full(t) => t,
962 TrySendError::Closed(t) => t,
963 }
964 }
965
966 /// Returns `true` if the channel is full but not closed.
967 pub fn is_full(&self) -> bool {
968 match self {
969 TrySendError::Full(_) => true,
970 TrySendError::Closed(_) => false,
971 }
972 }
973
974 /// Returns `true` if the channel is closed.
975 pub fn is_closed(&self) -> bool {
976 match self {
977 TrySendError::Full(_) => false,
978 TrySendError::Closed(_) => true,
979 }
980 }
981}
982
983impl<T> error::Error for TrySendError<T> {}
984
985impl<T> fmt::Debug for TrySendError<T> {
986 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
987 match *self {
988 TrySendError::Full(..) => write!(f, "Full(..)"),
989 TrySendError::Closed(..) => write!(f, "Closed(..)"),
990 }
991 }
992}
993
994impl<T> fmt::Display for TrySendError<T> {
995 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
996 match *self {
997 TrySendError::Full(..) => write!(f, "sending into a full channel"),
998 TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
999 }
1000 }
1001}
1002
1003/// An error returned from [`Receiver::recv()`].
1004///
1005/// Received because the channel is empty and closed.
1006#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1007pub struct RecvError;
1008
1009impl error::Error for RecvError {}
1010
1011impl fmt::Display for RecvError {
1012 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1013 write!(f, "receiving from an empty and closed channel")
1014 }
1015}
1016
1017/// An error returned from [`Receiver::try_recv()`].
1018#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1019pub enum TryRecvError {
1020 /// The channel is empty but not closed.
1021 Empty,
1022
1023 /// The channel is empty and closed.
1024 Closed,
1025}
1026
1027impl TryRecvError {
1028 /// Returns `true` if the channel is empty but not closed.
1029 pub fn is_empty(&self) -> bool {
1030 match self {
1031 TryRecvError::Empty => true,
1032 TryRecvError::Closed => false,
1033 }
1034 }
1035
1036 /// Returns `true` if the channel is empty and closed.
1037 pub fn is_closed(&self) -> bool {
1038 match self {
1039 TryRecvError::Empty => false,
1040 TryRecvError::Closed => true,
1041 }
1042 }
1043}
1044
1045impl error::Error for TryRecvError {}
1046
1047impl fmt::Display for TryRecvError {
1048 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1049 match *self {
1050 TryRecvError::Empty => write!(f, "receiving from an empty channel"),
1051 TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
1052 }
1053 }
1054}
1055
1056/// A future returned by [`Sender::send()`].
1057#[derive(Debug)]
1058#[must_use = "futures do nothing unless you `.await` or poll them"]
1059pub struct Send<'a, T> {
1060 sender: &'a Sender<T>,
1061 listener: Option<EventListener>,
1062 msg: Option<T>,
1063}
1064
1065impl<'a, T> Send<'a, T> {
1066 /// Run this future with the given `Strategy`.
1067 fn run_with_strategy<S: Strategy>(
1068 &mut self,
1069 cx: &mut S::Context,
1070 ) -> Poll<Result<(), SendError<T>>> {
1071 loop {
1072 let msg = self.msg.take().unwrap();
1073 // Attempt to send a message.
1074 match self.sender.try_send(msg) {
1075 Ok(()) => return Poll::Ready(Ok(())),
1076 Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
1077 Err(TrySendError::Full(m)) => self.msg = Some(m),
1078 }
1079
1080 // Sending failed - now start listening for notifications or wait for one.
1081 match self.listener.take() {
1082 None => {
1083 // Start listening and then try sending again.
1084 self.listener = Some(self.sender.channel.send_ops.listen());
1085 }
1086 Some(l) => {
1087 // Poll using the given strategy
1088 if let Err(l) = S::poll(l, cx) {
1089 self.listener = Some(l);
1090 return Poll::Pending;
1091 }
1092 }
1093 }
1094 }
1095 }
1096
1097 /// Run using the blocking strategy.
1098 fn wait(mut self) -> Result<(), SendError<T>> {
1099 match self.run_with_strategy::<Blocking>(&mut ()) {
1100 Poll::Ready(res) => res,
1101 Poll::Pending => unreachable!(),
1102 }
1103 }
1104}
1105
1106impl<'a, T> Unpin for Send<'a, T> {}
1107
1108impl<'a, T> Future for Send<'a, T> {
1109 type Output = Result<(), SendError<T>>;
1110
1111 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1112 self.run_with_strategy::<NonBlocking<'_>>(cx)
1113 }
1114}
1115
1116/// A future returned by [`Receiver::recv()`].
1117#[derive(Debug)]
1118#[must_use = "futures do nothing unless you `.await` or poll them"]
1119pub struct Recv<'a, T> {
1120 receiver: &'a Receiver<T>,
1121 listener: Option<EventListener>,
1122}
1123
1124impl<'a, T> Unpin for Recv<'a, T> {}
1125
1126impl<'a, T> Recv<'a, T> {
1127 /// Run this future with the given `Strategy`.
1128 fn run_with_strategy<S: Strategy>(
1129 &mut self,
1130 cx: &mut S::Context,
1131 ) -> Poll<Result<T, RecvError>> {
1132 loop {
1133 // Attempt to receive a message.
1134 match self.receiver.try_recv() {
1135 Ok(msg) => return Poll::Ready(Ok(msg)),
1136 Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)),
1137 Err(TryRecvError::Empty) => {}
1138 }
1139
1140 // Receiving failed - now start listening for notifications or wait for one.
1141 match self.listener.take() {
1142 None => {
1143 // Start listening and then try receiving again.
1144 self.listener = Some(self.receiver.channel.recv_ops.listen());
1145 }
1146 Some(l) => {
1147 // Poll using the given strategy.
1148 if let Err(l) = S::poll(l, cx) {
1149 self.listener = Some(l);
1150 return Poll::Pending;
1151 }
1152 }
1153 }
1154 }
1155 }
1156
1157 /// Run with the blocking strategy.
1158 fn wait(mut self) -> Result<T, RecvError> {
1159 match self.run_with_strategy::<Blocking>(&mut ()) {
1160 Poll::Ready(res) => res,
1161 Poll::Pending => unreachable!(),
1162 }
1163 }
1164}
1165
1166impl<'a, T> Future for Recv<'a, T> {
1167 type Output = Result<T, RecvError>;
1168
1169 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1170 self.run_with_strategy::<NonBlocking<'_>>(cx)
1171 }
1172}
1173
1174/// A strategy used to poll an `EventListener`.
1175trait Strategy {
1176 /// Context needed to be provided to the `poll` method.
1177 type Context;
1178
1179 /// Polls the given `EventListener`.
1180 ///
1181 /// Returns the `EventListener` back if it was not completed; otherwise,
1182 /// returns `Ok(())`.
1183 fn poll(evl: EventListener, cx: &mut Self::Context) -> Result<(), EventListener>;
1184}
1185
1186/// Non-blocking strategy for use in asynchronous code.
1187struct NonBlocking<'a>(&'a mut ());
1188
1189impl<'a> Strategy for NonBlocking<'a> {
1190 type Context = Context<'a>;
1191
1192 fn poll(mut evl: EventListener, cx: &mut Context<'a>) -> Result<(), EventListener> {
1193 match Pin::new(&mut evl).poll(cx) {
1194 Poll::Ready(()) => Ok(()),
1195 Poll::Pending => Err(evl),
1196 }
1197 }
1198}
1199
1200/// Blocking strategy for use in synchronous code.
1201struct Blocking;
1202
1203impl Strategy for Blocking {
1204 type Context = ();
1205
1206 fn poll(evl: EventListener, _cx: &mut ()) -> Result<(), EventListener> {
1207 evl.wait();
1208 Ok(())
1209 }
1210}