1#![allow(dead_code)]
2
3use std::collections::{HashMap, HashSet, VecDeque};
4use std::convert::Infallible;
5use std::error::Error as StdError;
6use std::fmt::{self, Debug};
7use std::future::Future;
8use std::hash::Hash;
9use std::ops::{Deref, DerefMut};
10use std::pin::Pin;
11use std::sync::{Arc, Mutex, Weak};
12use std::task::{self, Poll};
13
14use std::time::{Duration, Instant};
15
16use futures_channel::oneshot;
17use futures_core::ready;
18use tracing::{debug, trace};
19
20use hyper::rt::Timer as _;
21
22use crate::common::{exec, exec::Exec, timer::Timer};
23
24#[allow(missing_debug_implementations)]
26pub struct Pool<T, K: Key> {
27 inner: Option<Arc<Mutex<PoolInner<T, K>>>>,
29}
30
31pub trait Poolable: Unpin + Send + Sized + 'static {
37 fn is_open(&self) -> bool;
38 fn reserve(self) -> Reservation<Self>;
42 fn can_share(&self) -> bool;
43}
44
45pub trait Key: Eq + Hash + Clone + Debug + Unpin + Send + 'static {}
46
47impl<T> Key for T where T: Eq + Hash + Clone + Debug + Unpin + Send + 'static {}
48
49#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
51#[allow(dead_code)]
52pub enum Ver {
53 Auto,
54 Http2,
55}
56
57#[allow(missing_debug_implementations)]
64pub enum Reservation<T> {
65 #[cfg(feature = "http2")]
69 Shared(T, T),
70 Unique(T),
73}
74
75struct PoolInner<T, K: Eq + Hash> {
79 connecting: HashSet<K>,
83 idle: HashMap<K, Vec<Idle<T>>>,
86 max_idle_per_host: usize,
87 waiters: HashMap<K, VecDeque<oneshot::Sender<T>>>,
97 idle_interval_ref: Option<oneshot::Sender<Infallible>>,
100 exec: Exec,
101 timer: Option<Timer>,
102 timeout: Option<Duration>,
103}
104
105struct WeakOpt<T>(Option<Weak<T>>);
108
109#[derive(Clone, Copy, Debug)]
110pub struct Config {
111 pub idle_timeout: Option<Duration>,
112 pub max_idle_per_host: usize,
113}
114
115impl Config {
116 pub fn is_enabled(&self) -> bool {
117 self.max_idle_per_host > 0
118 }
119}
120
121impl<T, K: Key> Pool<T, K> {
122 pub fn new<E, M>(config: Config, executor: E, timer: Option<M>) -> Pool<T, K>
123 where
124 E: hyper::rt::Executor<exec::BoxSendFuture> + Send + Sync + Clone + 'static,
125 M: hyper::rt::Timer + Send + Sync + Clone + 'static,
126 {
127 let exec = Exec::new(executor);
128 let timer = timer.map(|t| Timer::new(t));
129 let inner = if config.is_enabled() {
130 Some(Arc::new(Mutex::new(PoolInner {
131 connecting: HashSet::new(),
132 idle: HashMap::new(),
133 idle_interval_ref: None,
134 max_idle_per_host: config.max_idle_per_host,
135 waiters: HashMap::new(),
136 exec,
137 timer,
138 timeout: config.idle_timeout,
139 })))
140 } else {
141 None
142 };
143
144 Pool { inner }
145 }
146
147 pub(crate) fn is_enabled(&self) -> bool {
148 self.inner.is_some()
149 }
150
151 #[cfg(test)]
152 pub(super) fn no_timer(&self) {
153 {
155 let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
156 assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
157 let (tx, _) = oneshot::channel();
158 inner.idle_interval_ref = Some(tx);
159 }
160 }
161}
162
163impl<T: Poolable, K: Key> Pool<T, K> {
164 pub fn checkout(&self, key: K) -> Checkout<T, K> {
167 Checkout {
168 key,
169 pool: self.clone(),
170 waiter: None,
171 }
172 }
173
174 pub fn connecting(&self, key: &K, ver: Ver) -> Option<Connecting<T, K>> {
177 if ver == Ver::Http2 {
178 if let Some(ref enabled) = self.inner {
179 let mut inner = enabled.lock().unwrap();
180 return if inner.connecting.insert(key.clone()) {
181 let connecting = Connecting {
182 key: key.clone(),
183 pool: WeakOpt::downgrade(enabled),
184 };
185 Some(connecting)
186 } else {
187 trace!("HTTP/2 connecting already in progress for {:?}", key);
188 None
189 };
190 }
191 }
192
193 Some(Connecting {
195 key: key.clone(),
196 pool: WeakOpt::none(),
199 })
200 }
201
202 #[cfg(test)]
203 fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T, K>> {
204 self.inner.as_ref().expect("enabled").lock().expect("lock")
205 }
206
207 pub fn pooled(
225 &self,
226 #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T, K>,
227 value: T,
228 ) -> Pooled<T, K> {
229 let (value, pool_ref) = if let Some(ref enabled) = self.inner {
230 match value.reserve() {
231 #[cfg(feature = "http2")]
232 Reservation::Shared(to_insert, to_return) => {
233 let mut inner = enabled.lock().unwrap();
234 inner.put(connecting.key.clone(), to_insert, enabled);
235 inner.connected(&connecting.key);
238 connecting.pool = WeakOpt::none();
240
241 (to_return, WeakOpt::none())
244 }
245 Reservation::Unique(value) => {
246 (value, WeakOpt::downgrade(enabled))
250 }
251 }
252 } else {
253 debug_assert!(connecting.pool.upgrade().is_none());
257
258 (value, WeakOpt::none())
259 };
260 Pooled {
261 key: connecting.key.clone(),
262 is_reused: false,
263 pool: pool_ref,
264 value: Some(value),
265 }
266 }
267
268 fn reuse(&self, key: &K, value: T) -> Pooled<T, K> {
269 debug!("reuse idle connection for {:?}", key);
270 let mut pool_ref = WeakOpt::none();
279 if !value.can_share() {
280 if let Some(ref enabled) = self.inner {
281 pool_ref = WeakOpt::downgrade(enabled);
282 }
283 }
284
285 Pooled {
286 is_reused: true,
287 key: key.clone(),
288 pool: pool_ref,
289 value: Some(value),
290 }
291 }
292}
293
294struct IdlePopper<'a, T, K> {
296 key: &'a K,
297 list: &'a mut Vec<Idle<T>>,
298}
299
300impl<'a, T: Poolable + 'a, K: Debug> IdlePopper<'a, T, K> {
301 fn pop(self, expiration: &Expiration, now: Instant) -> Option<Idle<T>> {
302 while let Some(entry) = self.list.pop() {
303 if !entry.value.is_open() {
306 trace!("removing closed connection for {:?}", self.key);
307 continue;
308 }
309 if expiration.expires(entry.idle_at, now) {
316 trace!("removing expired connection for {:?}", self.key);
317 continue;
318 }
319
320 let value = match entry.value.reserve() {
321 #[cfg(feature = "http2")]
322 Reservation::Shared(to_reinsert, to_checkout) => {
323 self.list.push(Idle {
324 idle_at: now,
325 value: to_reinsert,
326 });
327 to_checkout
328 }
329 Reservation::Unique(unique) => unique,
330 };
331
332 return Some(Idle {
333 idle_at: entry.idle_at,
334 value,
335 });
336 }
337
338 None
339 }
340}
341
342impl<T: Poolable, K: Key> PoolInner<T, K> {
343 fn now(&self) -> Instant {
344 self.timer
345 .as_ref()
346 .map_or_else(|| Instant::now(), |t| t.now())
347 }
348
349 fn put(&mut self, key: K, value: T, __pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
350 if value.can_share() && self.idle.contains_key(&key) {
351 trace!("put; existing idle HTTP/2 connection for {:?}", key);
352 return;
353 }
354 trace!("put; add idle connection for {:?}", key);
355 let mut remove_waiters = false;
356 let mut value = Some(value);
357 if let Some(waiters) = self.waiters.get_mut(&key) {
358 while let Some(tx) = waiters.pop_front() {
359 if !tx.is_canceled() {
360 let reserved = value.take().expect("value already sent");
361 let reserved = match reserved.reserve() {
362 #[cfg(feature = "http2")]
363 Reservation::Shared(to_keep, to_send) => {
364 value = Some(to_keep);
365 to_send
366 }
367 Reservation::Unique(uniq) => uniq,
368 };
369 match tx.send(reserved) {
370 Ok(()) => {
371 if value.is_none() {
372 break;
373 } else {
374 continue;
375 }
376 }
377 Err(e) => {
378 value = Some(e);
379 }
380 }
381 }
382
383 trace!("put; removing canceled waiter for {:?}", key);
384 }
385 remove_waiters = waiters.is_empty();
386 }
387 if remove_waiters {
388 self.waiters.remove(&key);
389 }
390
391 match value {
392 Some(value) => {
393 {
395 let now = self.now();
396 let idle_list = self.idle.entry(key.clone()).or_default();
397 if self.max_idle_per_host <= idle_list.len() {
398 trace!("max idle per host for {:?}, dropping connection", key);
399 return;
400 }
401
402 debug!("pooling idle connection for {:?}", key);
403 idle_list.push(Idle {
404 value,
405 idle_at: now,
406 });
407 }
408
409 self.spawn_idle_interval(__pool_ref);
410 }
411 None => trace!("put; found waiter for {:?}", key),
412 }
413 }
414
415 fn connected(&mut self, key: &K) {
418 let existed = self.connecting.remove(key);
419 debug_assert!(existed, "Connecting dropped, key not in pool.connecting");
420 self.waiters.remove(key);
424 }
425
426 fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
427 if self.idle_interval_ref.is_some() {
428 return;
429 }
430 let dur = if let Some(dur) = self.timeout {
431 dur
432 } else {
433 return;
434 };
435 if dur == Duration::ZERO {
436 return;
437 }
438 let timer = if let Some(timer) = self.timer.clone() {
439 timer
440 } else {
441 return;
442 };
443
444 const MIN_CHECK: Duration = Duration::from_millis(90);
448
449 let dur = dur.max(MIN_CHECK);
450
451 let (tx, rx) = oneshot::channel();
452 self.idle_interval_ref = Some(tx);
453
454 let interval = IdleTask {
455 timer: timer.clone(),
456 duration: dur,
457 pool: WeakOpt::downgrade(pool_ref),
458 pool_drop_notifier: rx,
459 };
460
461 self.exec.execute(interval.run());
462 }
463}
464
465impl<T, K: Eq + Hash> PoolInner<T, K> {
466 fn clean_waiters(&mut self, key: &K) {
471 let mut remove_waiters = false;
472 if let Some(waiters) = self.waiters.get_mut(key) {
473 waiters.retain(|tx| !tx.is_canceled());
474 remove_waiters = waiters.is_empty();
475 }
476 if remove_waiters {
477 self.waiters.remove(key);
478 }
479 }
480}
481
482impl<T: Poolable, K: Key> PoolInner<T, K> {
483 fn clear_expired(&mut self) {
485 let dur = self.timeout.expect("interval assumes timeout");
486
487 let now = self.now();
488 self.idle.retain(|key, values| {
491 values.retain(|entry| {
492 if !entry.value.is_open() {
493 trace!("idle interval evicting closed for {:?}", key);
494 return false;
495 }
496
497 if now.saturating_duration_since(entry.idle_at) > dur {
499 trace!("idle interval evicting expired for {:?}", key);
500 return false;
501 }
502
503 true
505 });
506
507 !values.is_empty()
509 });
510 }
511}
512
513impl<T, K: Key> Clone for Pool<T, K> {
514 fn clone(&self) -> Pool<T, K> {
515 Pool {
516 inner: self.inner.clone(),
517 }
518 }
519}
520
521pub struct Pooled<T: Poolable, K: Key> {
524 value: Option<T>,
525 is_reused: bool,
526 key: K,
527 pool: WeakOpt<Mutex<PoolInner<T, K>>>,
528}
529
530impl<T: Poolable, K: Key> Pooled<T, K> {
531 pub fn is_reused(&self) -> bool {
532 self.is_reused
533 }
534
535 pub fn is_pool_enabled(&self) -> bool {
536 self.pool.0.is_some()
537 }
538
539 fn as_ref(&self) -> &T {
540 self.value.as_ref().expect("not dropped")
541 }
542
543 fn as_mut(&mut self) -> &mut T {
544 self.value.as_mut().expect("not dropped")
545 }
546}
547
548impl<T: Poolable, K: Key> Deref for Pooled<T, K> {
549 type Target = T;
550 fn deref(&self) -> &T {
551 self.as_ref()
552 }
553}
554
555impl<T: Poolable, K: Key> DerefMut for Pooled<T, K> {
556 fn deref_mut(&mut self) -> &mut T {
557 self.as_mut()
558 }
559}
560
561impl<T: Poolable, K: Key> Drop for Pooled<T, K> {
562 fn drop(&mut self) {
563 if let Some(value) = self.value.take() {
564 if !value.is_open() {
565 return;
568 }
569
570 if let Some(pool) = self.pool.upgrade() {
571 if let Ok(mut inner) = pool.lock() {
572 inner.put(self.key.clone(), value, &pool);
573 }
574 } else if !value.can_share() {
575 trace!("pool dropped, dropping pooled ({:?})", self.key);
576 }
577 }
580 }
581}
582
583impl<T: Poolable, K: Key> fmt::Debug for Pooled<T, K> {
584 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
585 f.debug_struct("Pooled").field("key", &self.key).finish()
586 }
587}
588
589struct Idle<T> {
590 idle_at: Instant,
591 value: T,
592}
593
594#[allow(missing_debug_implementations)]
596pub struct Checkout<T, K: Key> {
597 key: K,
598 pool: Pool<T, K>,
599 waiter: Option<oneshot::Receiver<T>>,
600}
601
602#[derive(Debug)]
603#[non_exhaustive]
604pub enum Error {
605 PoolDisabled,
606 CheckoutNoLongerWanted,
607 CheckedOutClosedValue,
608}
609
610impl Error {
611 pub(super) fn is_canceled(&self) -> bool {
612 matches!(self, Error::CheckedOutClosedValue)
613 }
614}
615
616impl fmt::Display for Error {
617 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
618 f.write_str(match self {
619 Error::PoolDisabled => "pool is disabled",
620 Error::CheckedOutClosedValue => "checked out connection was closed",
621 Error::CheckoutNoLongerWanted => "request was canceled",
622 })
623 }
624}
625
626impl StdError for Error {}
627
628impl<T: Poolable, K: Key> Checkout<T, K> {
629 fn poll_waiter(
630 &mut self,
631 cx: &mut task::Context<'_>,
632 ) -> Poll<Option<Result<Pooled<T, K>, Error>>> {
633 if let Some(mut rx) = self.waiter.take() {
634 match Pin::new(&mut rx).poll(cx) {
635 Poll::Ready(Ok(value)) => {
636 if value.is_open() {
637 Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
638 } else {
639 Poll::Ready(Some(Err(Error::CheckedOutClosedValue)))
640 }
641 }
642 Poll::Pending => {
643 self.waiter = Some(rx);
644 Poll::Pending
645 }
646 Poll::Ready(Err(_canceled)) => {
647 Poll::Ready(Some(Err(Error::CheckoutNoLongerWanted)))
648 }
649 }
650 } else {
651 Poll::Ready(None)
652 }
653 }
654
655 fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T, K>> {
656 let entry = {
657 let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
658 let expiration = Expiration::new(inner.timeout);
659 let now = inner.now();
660 let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
661 trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
662 {
665 let popper = IdlePopper {
666 key: &self.key,
667 list,
668 };
669 popper.pop(&expiration, now)
670 }
671 .map(|e| (e, list.is_empty()))
672 });
673
674 let (entry, empty) = if let Some((e, empty)) = maybe_entry {
675 (Some(e), empty)
676 } else {
677 (None, true)
679 };
680 if empty {
681 inner.idle.remove(&self.key);
683 }
684
685 if entry.is_none() && self.waiter.is_none() {
686 let (tx, mut rx) = oneshot::channel();
687 trace!("checkout waiting for idle connection: {:?}", self.key);
688 inner
689 .waiters
690 .entry(self.key.clone())
691 .or_insert_with(VecDeque::new)
692 .push_back(tx);
693
694 assert!(Pin::new(&mut rx).poll(cx).is_pending());
696 self.waiter = Some(rx);
697 }
698
699 entry
700 };
701
702 entry.map(|e| self.pool.reuse(&self.key, e.value))
703 }
704}
705
706impl<T: Poolable, K: Key> Future for Checkout<T, K> {
707 type Output = Result<Pooled<T, K>, Error>;
708
709 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
710 if let Some(pooled) = ready!(self.poll_waiter(cx)?) {
711 return Poll::Ready(Ok(pooled));
712 }
713
714 if let Some(pooled) = self.checkout(cx) {
715 Poll::Ready(Ok(pooled))
716 } else if !self.pool.is_enabled() {
717 Poll::Ready(Err(Error::PoolDisabled))
718 } else {
719 debug_assert!(self.waiter.is_some());
721 Poll::Pending
722 }
723 }
724}
725
726impl<T, K: Key> Drop for Checkout<T, K> {
727 fn drop(&mut self) {
728 if self.waiter.take().is_some() {
729 trace!("checkout dropped for {:?}", self.key);
730 if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) {
731 inner.clean_waiters(&self.key);
732 }
733 }
734 }
735}
736
737#[allow(missing_debug_implementations)]
739pub struct Connecting<T: Poolable, K: Key> {
740 key: K,
741 pool: WeakOpt<Mutex<PoolInner<T, K>>>,
742}
743
744impl<T: Poolable, K: Key> Connecting<T, K> {
745 pub fn alpn_h2(self, pool: &Pool<T, K>) -> Option<Self> {
746 debug_assert!(
747 self.pool.0.is_none(),
748 "Connecting::alpn_h2 but already Http2"
749 );
750
751 pool.connecting(&self.key, Ver::Http2)
752 }
753}
754
755impl<T: Poolable, K: Key> Drop for Connecting<T, K> {
756 fn drop(&mut self) {
757 if let Some(pool) = self.pool.upgrade() {
758 if let Ok(mut inner) = pool.lock() {
760 inner.connected(&self.key);
761 }
762 }
763 }
764}
765
766struct Expiration(Option<Duration>);
767
768impl Expiration {
769 fn new(dur: Option<Duration>) -> Expiration {
770 Expiration(dur)
771 }
772
773 fn expires(&self, instant: Instant, now: Instant) -> bool {
774 match self.0 {
775 Some(timeout) => now.saturating_duration_since(instant) > timeout,
777 None => false,
778 }
779 }
780}
781
782struct IdleTask<T, K: Key> {
783 timer: Timer,
784 duration: Duration,
785 pool: WeakOpt<Mutex<PoolInner<T, K>>>,
786 pool_drop_notifier: oneshot::Receiver<Infallible>,
790}
791
792impl<T: Poolable + 'static, K: Key> IdleTask<T, K> {
793 async fn run(self) {
794 use futures_util::future;
795
796 let mut sleep = self.timer.sleep_until(self.timer.now() + self.duration);
797 let mut on_pool_drop = self.pool_drop_notifier;
798 loop {
799 match future::select(&mut on_pool_drop, &mut sleep).await {
800 future::Either::Left(_) => {
801 break;
803 }
804 future::Either::Right(((), _)) => {
805 if let Some(inner) = self.pool.upgrade() {
806 if let Ok(mut inner) = inner.lock() {
807 trace!("idle interval checking for expired");
808 inner.clear_expired();
809 }
810 }
811
812 let deadline = self.timer.now() + self.duration;
813 self.timer.reset(&mut sleep, deadline);
814 }
815 }
816 }
817
818 trace!("pool closed, canceling idle interval");
819 return;
820 }
821}
822
823impl<T> WeakOpt<T> {
824 fn none() -> Self {
825 WeakOpt(None)
826 }
827
828 fn downgrade(arc: &Arc<T>) -> Self {
829 WeakOpt(Some(Arc::downgrade(arc)))
830 }
831
832 fn upgrade(&self) -> Option<Arc<T>> {
833 self.0.as_ref().and_then(Weak::upgrade)
834 }
835}
836
837#[cfg(test)]
838mod tests {
839 use std::fmt::Debug;
840 use std::future::Future;
841 use std::hash::Hash;
842 use std::pin::Pin;
843 use std::task::{self, Poll};
844 use std::time::Duration;
845
846 use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
847 use crate::rt::{TokioExecutor, TokioTimer};
848
849 use crate::common::timer;
850
851 #[derive(Clone, Debug, PartialEq, Eq, Hash)]
852 struct KeyImpl(http::uri::Scheme, http::uri::Authority);
853
854 type KeyTuple = (http::uri::Scheme, http::uri::Authority);
855
856 #[derive(Debug, PartialEq, Eq)]
858 struct Uniq<T>(T);
859
860 impl<T: Send + 'static + Unpin> Poolable for Uniq<T> {
861 fn is_open(&self) -> bool {
862 true
863 }
864
865 fn reserve(self) -> Reservation<Self> {
866 Reservation::Unique(self)
867 }
868
869 fn can_share(&self) -> bool {
870 false
871 }
872 }
873
874 fn c<T: Poolable, K: Key>(key: K) -> Connecting<T, K> {
875 Connecting {
876 key,
877 pool: WeakOpt::none(),
878 }
879 }
880
881 fn host_key(s: &str) -> KeyImpl {
882 KeyImpl(http::uri::Scheme::HTTP, s.parse().expect("host key"))
883 }
884
885 fn pool_no_timer<T, K: Key>() -> Pool<T, K> {
886 pool_max_idle_no_timer(usize::MAX)
887 }
888
889 fn pool_max_idle_no_timer<T, K: Key>(max_idle: usize) -> Pool<T, K> {
890 let pool = Pool::new(
891 super::Config {
892 idle_timeout: Some(Duration::from_millis(100)),
893 max_idle_per_host: max_idle,
894 },
895 TokioExecutor::new(),
896 Option::<timer::Timer>::None,
897 );
898 pool.no_timer();
899 pool
900 }
901
902 #[tokio::test]
903 async fn test_pool_checkout_smoke() {
904 let pool = pool_no_timer();
905 let key = host_key("foo");
906 let pooled = pool.pooled(c(key.clone()), Uniq(41));
907
908 drop(pooled);
909
910 match pool.checkout(key).await {
911 Ok(pooled) => assert_eq!(*pooled, Uniq(41)),
912 Err(_) => panic!("not ready"),
913 };
914 }
915
916 struct PollOnce<'a, F>(&'a mut F);
918
919 impl<F, T, U> Future for PollOnce<'_, F>
920 where
921 F: Future<Output = Result<T, U>> + Unpin,
922 {
923 type Output = Option<()>;
924
925 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
926 match Pin::new(&mut self.0).poll(cx) {
927 Poll::Ready(Ok(_)) => Poll::Ready(Some(())),
928 Poll::Ready(Err(_)) => Poll::Ready(Some(())),
929 Poll::Pending => Poll::Ready(None),
930 }
931 }
932 }
933
934 #[tokio::test]
935 async fn test_pool_checkout_returns_none_if_expired() {
936 let pool = pool_no_timer();
937 let key = host_key("foo");
938 let pooled = pool.pooled(c(key.clone()), Uniq(41));
939
940 drop(pooled);
941 tokio::time::sleep(pool.locked().timeout.unwrap()).await;
942 let mut checkout = pool.checkout(key);
943 let poll_once = PollOnce(&mut checkout);
944 let is_not_ready = poll_once.await.is_none();
945 assert!(is_not_ready);
946 }
947
948 #[tokio::test]
949 async fn test_pool_checkout_removes_expired() {
950 let pool = pool_no_timer();
951 let key = host_key("foo");
952
953 pool.pooled(c(key.clone()), Uniq(41));
954 pool.pooled(c(key.clone()), Uniq(5));
955 pool.pooled(c(key.clone()), Uniq(99));
956
957 assert_eq!(
958 pool.locked().idle.get(&key).map(|entries| entries.len()),
959 Some(3)
960 );
961 tokio::time::sleep(pool.locked().timeout.unwrap()).await;
962
963 let mut checkout = pool.checkout(key.clone());
964 let poll_once = PollOnce(&mut checkout);
965 poll_once.await;
967 assert!(!pool.locked().idle.contains_key(&key));
968 }
969
970 #[test]
971 fn test_pool_max_idle_per_host() {
972 let pool = pool_max_idle_no_timer(2);
973 let key = host_key("foo");
974
975 pool.pooled(c(key.clone()), Uniq(41));
976 pool.pooled(c(key.clone()), Uniq(5));
977 pool.pooled(c(key.clone()), Uniq(99));
978
979 assert_eq!(
981 pool.locked().idle.get(&key).map(|entries| entries.len()),
982 Some(2)
983 );
984 }
985
986 #[tokio::test]
987 async fn test_pool_timer_removes_expired_realtime() {
988 test_pool_timer_removes_expired_inner().await
989 }
990
991 #[tokio::test(start_paused = true)]
992 async fn test_pool_timer_removes_expired_faketime() {
993 test_pool_timer_removes_expired_inner().await
994 }
995
996 async fn test_pool_timer_removes_expired_inner() {
997 let pool = Pool::new(
998 super::Config {
999 idle_timeout: Some(Duration::from_millis(10)),
1000 max_idle_per_host: usize::MAX,
1001 },
1002 TokioExecutor::new(),
1003 Some(TokioTimer::new()),
1004 );
1005
1006 let key = host_key("foo");
1007
1008 pool.pooled(c(key.clone()), Uniq(41));
1009 pool.pooled(c(key.clone()), Uniq(5));
1010 pool.pooled(c(key.clone()), Uniq(99));
1011
1012 assert_eq!(
1013 pool.locked().idle.get(&key).map(|entries| entries.len()),
1014 Some(3)
1015 );
1016
1017 tokio::time::sleep(Duration::from_millis(30)).await;
1019
1020 assert_eq!(
1022 pool.locked().idle.get(&key).map(|entries| entries.len()),
1023 Some(3)
1024 );
1025
1026 tokio::time::sleep(Duration::from_millis(70)).await;
1028 tokio::task::yield_now().await;
1030
1031 assert!(!pool.locked().idle.contains_key(&key));
1032 }
1033
1034 #[tokio::test]
1035 async fn test_pool_checkout_task_unparked() {
1036 use futures_util::future::join;
1037 use futures_util::FutureExt;
1038
1039 let pool = pool_no_timer();
1040 let key = host_key("foo");
1041 let pooled = pool.pooled(c(key.clone()), Uniq(41));
1042
1043 let checkout = join(pool.checkout(key), async {
1044 drop(pooled);
1050 })
1051 .map(|(entry, _)| entry);
1052
1053 assert_eq!(*checkout.await.unwrap(), Uniq(41));
1054 }
1055
1056 #[tokio::test]
1057 async fn test_pool_checkout_drop_cleans_up_waiters() {
1058 let pool = pool_no_timer::<Uniq<i32>, KeyImpl>();
1059 let key = host_key("foo");
1060
1061 let mut checkout1 = pool.checkout(key.clone());
1062 let mut checkout2 = pool.checkout(key.clone());
1063
1064 let poll_once1 = PollOnce(&mut checkout1);
1065 let poll_once2 = PollOnce(&mut checkout2);
1066
1067 poll_once1.await;
1069 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1070 poll_once2.await;
1071 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
1072
1073 drop(checkout1);
1075 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1076
1077 drop(checkout2);
1078 assert!(!pool.locked().waiters.contains_key(&key));
1079 }
1080
1081 #[derive(Debug)]
1082 struct CanClose {
1083 #[allow(unused)]
1084 val: i32,
1085 closed: bool,
1086 }
1087
1088 impl Poolable for CanClose {
1089 fn is_open(&self) -> bool {
1090 !self.closed
1091 }
1092
1093 fn reserve(self) -> Reservation<Self> {
1094 Reservation::Unique(self)
1095 }
1096
1097 fn can_share(&self) -> bool {
1098 false
1099 }
1100 }
1101
1102 #[test]
1103 fn pooled_drop_if_closed_doesnt_reinsert() {
1104 let pool = pool_no_timer();
1105 let key = host_key("foo");
1106 pool.pooled(
1107 c(key.clone()),
1108 CanClose {
1109 val: 57,
1110 closed: true,
1111 },
1112 );
1113
1114 assert!(!pool.locked().idle.contains_key(&key));
1115 }
1116}