hyper_util/client/legacy/
pool.rs

1#![allow(dead_code)]
2
3use std::collections::{HashMap, HashSet, VecDeque};
4use std::convert::Infallible;
5use std::error::Error as StdError;
6use std::fmt::{self, Debug};
7use std::future::Future;
8use std::hash::Hash;
9use std::ops::{Deref, DerefMut};
10use std::pin::Pin;
11use std::sync::{Arc, Mutex, Weak};
12use std::task::{self, Poll};
13
14use std::time::{Duration, Instant};
15
16use futures_channel::oneshot;
17use futures_core::ready;
18use tracing::{debug, trace};
19
20use hyper::rt::Timer as _;
21
22use crate::common::{exec, exec::Exec, timer::Timer};
23
24// FIXME: allow() required due to `impl Trait` leaking types to this lint
25#[allow(missing_debug_implementations)]
26pub struct Pool<T, K: Key> {
27    // If the pool is disabled, this is None.
28    inner: Option<Arc<Mutex<PoolInner<T, K>>>>,
29}
30
31// Before using a pooled connection, make sure the sender is not dead.
32//
33// This is a trait to allow the `client::pool::tests` to work for `i32`.
34//
35// See https://github.com/hyperium/hyper/issues/1429
36pub trait Poolable: Unpin + Send + Sized + 'static {
37    fn is_open(&self) -> bool;
38    /// Reserve this connection.
39    ///
40    /// Allows for HTTP/2 to return a shared reservation.
41    fn reserve(self) -> Reservation<Self>;
42    fn can_share(&self) -> bool;
43}
44
45pub trait Key: Eq + Hash + Clone + Debug + Unpin + Send + 'static {}
46
47impl<T> Key for T where T: Eq + Hash + Clone + Debug + Unpin + Send + 'static {}
48
49/// A marker to identify what version a pooled connection is.
50#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
51#[allow(dead_code)]
52pub enum Ver {
53    Auto,
54    Http2,
55}
56
57/// When checking out a pooled connection, it might be that the connection
58/// only supports a single reservation, or it might be usable for many.
59///
60/// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be
61/// used for multiple requests.
62// FIXME: allow() required due to `impl Trait` leaking types to this lint
63#[allow(missing_debug_implementations)]
64pub enum Reservation<T> {
65    /// This connection could be used multiple times, the first one will be
66    /// reinserted into the `idle` pool, and the second will be given to
67    /// the `Checkout`.
68    #[cfg(feature = "http2")]
69    Shared(T, T),
70    /// This connection requires unique access. It will be returned after
71    /// use is complete.
72    Unique(T),
73}
74
75/// Simple type alias in case the key type needs to be adjusted.
76// pub type Key = (http::uri::Scheme, http::uri::Authority); //Arc<String>;
77
78struct PoolInner<T, K: Eq + Hash> {
79    // A flag that a connection is being established, and the connection
80    // should be shared. This prevents making multiple HTTP/2 connections
81    // to the same host.
82    connecting: HashSet<K>,
83    // These are internal Conns sitting in the event loop in the KeepAlive
84    // state, waiting to receive a new Request to send on the socket.
85    idle: HashMap<K, Vec<Idle<T>>>,
86    max_idle_per_host: usize,
87    // These are outstanding Checkouts that are waiting for a socket to be
88    // able to send a Request one. This is used when "racing" for a new
89    // connection.
90    //
91    // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait
92    // for the Pool to receive an idle Conn. When a Conn becomes idle,
93    // this list is checked for any parked Checkouts, and tries to notify
94    // them that the Conn could be used instead of waiting for a brand new
95    // connection.
96    waiters: HashMap<K, VecDeque<oneshot::Sender<T>>>,
97    // A oneshot channel is used to allow the interval to be notified when
98    // the Pool completely drops. That way, the interval can cancel immediately.
99    idle_interval_ref: Option<oneshot::Sender<Infallible>>,
100    exec: Exec,
101    timer: Option<Timer>,
102    timeout: Option<Duration>,
103}
104
105// This is because `Weak::new()` *allocates* space for `T`, even if it
106// doesn't need it!
107struct WeakOpt<T>(Option<Weak<T>>);
108
109#[derive(Clone, Copy, Debug)]
110pub struct Config {
111    pub idle_timeout: Option<Duration>,
112    pub max_idle_per_host: usize,
113}
114
115impl Config {
116    pub fn is_enabled(&self) -> bool {
117        self.max_idle_per_host > 0
118    }
119}
120
121impl<T, K: Key> Pool<T, K> {
122    pub fn new<E, M>(config: Config, executor: E, timer: Option<M>) -> Pool<T, K>
123    where
124        E: hyper::rt::Executor<exec::BoxSendFuture> + Send + Sync + Clone + 'static,
125        M: hyper::rt::Timer + Send + Sync + Clone + 'static,
126    {
127        let exec = Exec::new(executor);
128        let timer = timer.map(|t| Timer::new(t));
129        let inner = if config.is_enabled() {
130            Some(Arc::new(Mutex::new(PoolInner {
131                connecting: HashSet::new(),
132                idle: HashMap::new(),
133                idle_interval_ref: None,
134                max_idle_per_host: config.max_idle_per_host,
135                waiters: HashMap::new(),
136                exec,
137                timer,
138                timeout: config.idle_timeout,
139            })))
140        } else {
141            None
142        };
143
144        Pool { inner }
145    }
146
147    pub(crate) fn is_enabled(&self) -> bool {
148        self.inner.is_some()
149    }
150
151    #[cfg(test)]
152    pub(super) fn no_timer(&self) {
153        // Prevent an actual interval from being created for this pool...
154        {
155            let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
156            assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
157            let (tx, _) = oneshot::channel();
158            inner.idle_interval_ref = Some(tx);
159        }
160    }
161}
162
163impl<T: Poolable, K: Key> Pool<T, K> {
164    /// Returns a `Checkout` which is a future that resolves if an idle
165    /// connection becomes available.
166    pub fn checkout(&self, key: K) -> Checkout<T, K> {
167        Checkout {
168            key,
169            pool: self.clone(),
170            waiter: None,
171        }
172    }
173
174    /// Ensure that there is only ever 1 connecting task for HTTP/2
175    /// connections. This does nothing for HTTP/1.
176    pub fn connecting(&self, key: &K, ver: Ver) -> Option<Connecting<T, K>> {
177        if ver == Ver::Http2 {
178            if let Some(ref enabled) = self.inner {
179                let mut inner = enabled.lock().unwrap();
180                return if inner.connecting.insert(key.clone()) {
181                    let connecting = Connecting {
182                        key: key.clone(),
183                        pool: WeakOpt::downgrade(enabled),
184                    };
185                    Some(connecting)
186                } else {
187                    trace!("HTTP/2 connecting already in progress for {:?}", key);
188                    None
189                };
190            }
191        }
192
193        // else
194        Some(Connecting {
195            key: key.clone(),
196            // in HTTP/1's case, there is never a lock, so we don't
197            // need to do anything in Drop.
198            pool: WeakOpt::none(),
199        })
200    }
201
202    #[cfg(test)]
203    fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T, K>> {
204        self.inner.as_ref().expect("enabled").lock().expect("lock")
205    }
206
207    /* Used in client/tests.rs...
208    #[cfg(test)]
209    pub(super) fn h1_key(&self, s: &str) -> Key {
210        Arc::new(s.to_string())
211    }
212
213    #[cfg(test)]
214    pub(super) fn idle_count(&self, key: &Key) -> usize {
215        self
216            .locked()
217            .idle
218            .get(key)
219            .map(|list| list.len())
220            .unwrap_or(0)
221    }
222    */
223
224    pub fn pooled(
225        &self,
226        #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T, K>,
227        value: T,
228    ) -> Pooled<T, K> {
229        let (value, pool_ref) = if let Some(ref enabled) = self.inner {
230            match value.reserve() {
231                #[cfg(feature = "http2")]
232                Reservation::Shared(to_insert, to_return) => {
233                    let mut inner = enabled.lock().unwrap();
234                    inner.put(connecting.key.clone(), to_insert, enabled);
235                    // Do this here instead of Drop for Connecting because we
236                    // already have a lock, no need to lock the mutex twice.
237                    inner.connected(&connecting.key);
238                    // prevent the Drop of Connecting from repeating inner.connected()
239                    connecting.pool = WeakOpt::none();
240
241                    // Shared reservations don't need a reference to the pool,
242                    // since the pool always keeps a copy.
243                    (to_return, WeakOpt::none())
244                }
245                Reservation::Unique(value) => {
246                    // Unique reservations must take a reference to the pool
247                    // since they hope to reinsert once the reservation is
248                    // completed
249                    (value, WeakOpt::downgrade(enabled))
250                }
251            }
252        } else {
253            // If pool is not enabled, skip all the things...
254
255            // The Connecting should have had no pool ref
256            debug_assert!(connecting.pool.upgrade().is_none());
257
258            (value, WeakOpt::none())
259        };
260        Pooled {
261            key: connecting.key.clone(),
262            is_reused: false,
263            pool: pool_ref,
264            value: Some(value),
265        }
266    }
267
268    fn reuse(&self, key: &K, value: T) -> Pooled<T, K> {
269        debug!("reuse idle connection for {:?}", key);
270        // TODO: unhack this
271        // In Pool::pooled(), which is used for inserting brand new connections,
272        // there's some code that adjusts the pool reference taken depending
273        // on if the Reservation can be shared or is unique. By the time
274        // reuse() is called, the reservation has already been made, and
275        // we just have the final value, without knowledge of if this is
276        // unique or shared. So, the hack is to just assume Ver::Http2 means
277        // shared... :(
278        let mut pool_ref = WeakOpt::none();
279        if !value.can_share() {
280            if let Some(ref enabled) = self.inner {
281                pool_ref = WeakOpt::downgrade(enabled);
282            }
283        }
284
285        Pooled {
286            is_reused: true,
287            key: key.clone(),
288            pool: pool_ref,
289            value: Some(value),
290        }
291    }
292}
293
294/// Pop off this list, looking for a usable connection that hasn't expired.
295struct IdlePopper<'a, T, K> {
296    key: &'a K,
297    list: &'a mut Vec<Idle<T>>,
298}
299
300impl<'a, T: Poolable + 'a, K: Debug> IdlePopper<'a, T, K> {
301    fn pop(self, expiration: &Expiration, now: Instant) -> Option<Idle<T>> {
302        while let Some(entry) = self.list.pop() {
303            // If the connection has been closed, or is older than our idle
304            // timeout, simply drop it and keep looking...
305            if !entry.value.is_open() {
306                trace!("removing closed connection for {:?}", self.key);
307                continue;
308            }
309            // TODO: Actually, since the `idle` list is pushed to the end always,
310            // that would imply that if *this* entry is expired, then anything
311            // "earlier" in the list would *have* to be expired also... Right?
312            //
313            // In that case, we could just break out of the loop and drop the
314            // whole list...
315            if expiration.expires(entry.idle_at, now) {
316                trace!("removing expired connection for {:?}", self.key);
317                continue;
318            }
319
320            let value = match entry.value.reserve() {
321                #[cfg(feature = "http2")]
322                Reservation::Shared(to_reinsert, to_checkout) => {
323                    self.list.push(Idle {
324                        idle_at: now,
325                        value: to_reinsert,
326                    });
327                    to_checkout
328                }
329                Reservation::Unique(unique) => unique,
330            };
331
332            return Some(Idle {
333                idle_at: entry.idle_at,
334                value,
335            });
336        }
337
338        None
339    }
340}
341
342impl<T: Poolable, K: Key> PoolInner<T, K> {
343    fn now(&self) -> Instant {
344        self.timer
345            .as_ref()
346            .map_or_else(|| Instant::now(), |t| t.now())
347    }
348
349    fn put(&mut self, key: K, value: T, __pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
350        if value.can_share() && self.idle.contains_key(&key) {
351            trace!("put; existing idle HTTP/2 connection for {:?}", key);
352            return;
353        }
354        trace!("put; add idle connection for {:?}", key);
355        let mut remove_waiters = false;
356        let mut value = Some(value);
357        if let Some(waiters) = self.waiters.get_mut(&key) {
358            while let Some(tx) = waiters.pop_front() {
359                if !tx.is_canceled() {
360                    let reserved = value.take().expect("value already sent");
361                    let reserved = match reserved.reserve() {
362                        #[cfg(feature = "http2")]
363                        Reservation::Shared(to_keep, to_send) => {
364                            value = Some(to_keep);
365                            to_send
366                        }
367                        Reservation::Unique(uniq) => uniq,
368                    };
369                    match tx.send(reserved) {
370                        Ok(()) => {
371                            if value.is_none() {
372                                break;
373                            } else {
374                                continue;
375                            }
376                        }
377                        Err(e) => {
378                            value = Some(e);
379                        }
380                    }
381                }
382
383                trace!("put; removing canceled waiter for {:?}", key);
384            }
385            remove_waiters = waiters.is_empty();
386        }
387        if remove_waiters {
388            self.waiters.remove(&key);
389        }
390
391        match value {
392            Some(value) => {
393                // borrow-check scope...
394                {
395                    let now = self.now();
396                    let idle_list = self.idle.entry(key.clone()).or_default();
397                    if self.max_idle_per_host <= idle_list.len() {
398                        trace!("max idle per host for {:?}, dropping connection", key);
399                        return;
400                    }
401
402                    debug!("pooling idle connection for {:?}", key);
403                    idle_list.push(Idle {
404                        value,
405                        idle_at: now,
406                    });
407                }
408
409                self.spawn_idle_interval(__pool_ref);
410            }
411            None => trace!("put; found waiter for {:?}", key),
412        }
413    }
414
415    /// A `Connecting` task is complete. Not necessarily successfully,
416    /// but the lock is going away, so clean up.
417    fn connected(&mut self, key: &K) {
418        let existed = self.connecting.remove(key);
419        debug_assert!(existed, "Connecting dropped, key not in pool.connecting");
420        // cancel any waiters. if there are any, it's because
421        // this Connecting task didn't complete successfully.
422        // those waiters would never receive a connection.
423        self.waiters.remove(key);
424    }
425
426    fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
427        if self.idle_interval_ref.is_some() {
428            return;
429        }
430        let dur = if let Some(dur) = self.timeout {
431            dur
432        } else {
433            return;
434        };
435        if dur == Duration::ZERO {
436            return;
437        }
438        let timer = if let Some(timer) = self.timer.clone() {
439            timer
440        } else {
441            return;
442        };
443
444        // While someone might want a shorter duration, and it will be respected
445        // at checkout time, there's no need to wake up and proactively evict
446        // faster than this.
447        const MIN_CHECK: Duration = Duration::from_millis(90);
448
449        let dur = dur.max(MIN_CHECK);
450
451        let (tx, rx) = oneshot::channel();
452        self.idle_interval_ref = Some(tx);
453
454        let interval = IdleTask {
455            timer: timer.clone(),
456            duration: dur,
457            pool: WeakOpt::downgrade(pool_ref),
458            pool_drop_notifier: rx,
459        };
460
461        self.exec.execute(interval.run());
462    }
463}
464
465impl<T, K: Eq + Hash> PoolInner<T, K> {
466    /// Any `FutureResponse`s that were created will have made a `Checkout`,
467    /// and possibly inserted into the pool that it is waiting for an idle
468    /// connection. If a user ever dropped that future, we need to clean out
469    /// those parked senders.
470    fn clean_waiters(&mut self, key: &K) {
471        let mut remove_waiters = false;
472        if let Some(waiters) = self.waiters.get_mut(key) {
473            waiters.retain(|tx| !tx.is_canceled());
474            remove_waiters = waiters.is_empty();
475        }
476        if remove_waiters {
477            self.waiters.remove(key);
478        }
479    }
480}
481
482impl<T: Poolable, K: Key> PoolInner<T, K> {
483    /// This should *only* be called by the IdleTask
484    fn clear_expired(&mut self) {
485        let dur = self.timeout.expect("interval assumes timeout");
486
487        let now = self.now();
488        //self.last_idle_check_at = now;
489
490        self.idle.retain(|key, values| {
491            values.retain(|entry| {
492                if !entry.value.is_open() {
493                    trace!("idle interval evicting closed for {:?}", key);
494                    return false;
495                }
496
497                // Avoid `Instant::sub` to avoid issues like rust-lang/rust#86470.
498                if now.saturating_duration_since(entry.idle_at) > dur {
499                    trace!("idle interval evicting expired for {:?}", key);
500                    return false;
501                }
502
503                // Otherwise, keep this value...
504                true
505            });
506
507            // returning false evicts this key/val
508            !values.is_empty()
509        });
510    }
511}
512
513impl<T, K: Key> Clone for Pool<T, K> {
514    fn clone(&self) -> Pool<T, K> {
515        Pool {
516            inner: self.inner.clone(),
517        }
518    }
519}
520
521/// A wrapped poolable value that tries to reinsert to the Pool on Drop.
522// Note: The bounds `T: Poolable` is needed for the Drop impl.
523pub struct Pooled<T: Poolable, K: Key> {
524    value: Option<T>,
525    is_reused: bool,
526    key: K,
527    pool: WeakOpt<Mutex<PoolInner<T, K>>>,
528}
529
530impl<T: Poolable, K: Key> Pooled<T, K> {
531    pub fn is_reused(&self) -> bool {
532        self.is_reused
533    }
534
535    pub fn is_pool_enabled(&self) -> bool {
536        self.pool.0.is_some()
537    }
538
539    fn as_ref(&self) -> &T {
540        self.value.as_ref().expect("not dropped")
541    }
542
543    fn as_mut(&mut self) -> &mut T {
544        self.value.as_mut().expect("not dropped")
545    }
546}
547
548impl<T: Poolable, K: Key> Deref for Pooled<T, K> {
549    type Target = T;
550    fn deref(&self) -> &T {
551        self.as_ref()
552    }
553}
554
555impl<T: Poolable, K: Key> DerefMut for Pooled<T, K> {
556    fn deref_mut(&mut self) -> &mut T {
557        self.as_mut()
558    }
559}
560
561impl<T: Poolable, K: Key> Drop for Pooled<T, K> {
562    fn drop(&mut self) {
563        if let Some(value) = self.value.take() {
564            if !value.is_open() {
565                // If we *already* know the connection is done here,
566                // it shouldn't be re-inserted back into the pool.
567                return;
568            }
569
570            if let Some(pool) = self.pool.upgrade() {
571                if let Ok(mut inner) = pool.lock() {
572                    inner.put(self.key.clone(), value, &pool);
573                }
574            } else if !value.can_share() {
575                trace!("pool dropped, dropping pooled ({:?})", self.key);
576            }
577            // Ver::Http2 is already in the Pool (or dead), so we wouldn't
578            // have an actual reference to the Pool.
579        }
580    }
581}
582
583impl<T: Poolable, K: Key> fmt::Debug for Pooled<T, K> {
584    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
585        f.debug_struct("Pooled").field("key", &self.key).finish()
586    }
587}
588
589struct Idle<T> {
590    idle_at: Instant,
591    value: T,
592}
593
594// FIXME: allow() required due to `impl Trait` leaking types to this lint
595#[allow(missing_debug_implementations)]
596pub struct Checkout<T, K: Key> {
597    key: K,
598    pool: Pool<T, K>,
599    waiter: Option<oneshot::Receiver<T>>,
600}
601
602#[derive(Debug)]
603#[non_exhaustive]
604pub enum Error {
605    PoolDisabled,
606    CheckoutNoLongerWanted,
607    CheckedOutClosedValue,
608}
609
610impl Error {
611    pub(super) fn is_canceled(&self) -> bool {
612        matches!(self, Error::CheckedOutClosedValue)
613    }
614}
615
616impl fmt::Display for Error {
617    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
618        f.write_str(match self {
619            Error::PoolDisabled => "pool is disabled",
620            Error::CheckedOutClosedValue => "checked out connection was closed",
621            Error::CheckoutNoLongerWanted => "request was canceled",
622        })
623    }
624}
625
626impl StdError for Error {}
627
628impl<T: Poolable, K: Key> Checkout<T, K> {
629    fn poll_waiter(
630        &mut self,
631        cx: &mut task::Context<'_>,
632    ) -> Poll<Option<Result<Pooled<T, K>, Error>>> {
633        if let Some(mut rx) = self.waiter.take() {
634            match Pin::new(&mut rx).poll(cx) {
635                Poll::Ready(Ok(value)) => {
636                    if value.is_open() {
637                        Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
638                    } else {
639                        Poll::Ready(Some(Err(Error::CheckedOutClosedValue)))
640                    }
641                }
642                Poll::Pending => {
643                    self.waiter = Some(rx);
644                    Poll::Pending
645                }
646                Poll::Ready(Err(_canceled)) => {
647                    Poll::Ready(Some(Err(Error::CheckoutNoLongerWanted)))
648                }
649            }
650        } else {
651            Poll::Ready(None)
652        }
653    }
654
655    fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T, K>> {
656        let entry = {
657            let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
658            let expiration = Expiration::new(inner.timeout);
659            let now = inner.now();
660            let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
661                trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
662                // A block to end the mutable borrow on list,
663                // so the map below can check is_empty()
664                {
665                    let popper = IdlePopper {
666                        key: &self.key,
667                        list,
668                    };
669                    popper.pop(&expiration, now)
670                }
671                .map(|e| (e, list.is_empty()))
672            });
673
674            let (entry, empty) = if let Some((e, empty)) = maybe_entry {
675                (Some(e), empty)
676            } else {
677                // No entry found means nuke the list for sure.
678                (None, true)
679            };
680            if empty {
681                //TODO: This could be done with the HashMap::entry API instead.
682                inner.idle.remove(&self.key);
683            }
684
685            if entry.is_none() && self.waiter.is_none() {
686                let (tx, mut rx) = oneshot::channel();
687                trace!("checkout waiting for idle connection: {:?}", self.key);
688                inner
689                    .waiters
690                    .entry(self.key.clone())
691                    .or_insert_with(VecDeque::new)
692                    .push_back(tx);
693
694                // register the waker with this oneshot
695                assert!(Pin::new(&mut rx).poll(cx).is_pending());
696                self.waiter = Some(rx);
697            }
698
699            entry
700        };
701
702        entry.map(|e| self.pool.reuse(&self.key, e.value))
703    }
704}
705
706impl<T: Poolable, K: Key> Future for Checkout<T, K> {
707    type Output = Result<Pooled<T, K>, Error>;
708
709    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
710        if let Some(pooled) = ready!(self.poll_waiter(cx)?) {
711            return Poll::Ready(Ok(pooled));
712        }
713
714        if let Some(pooled) = self.checkout(cx) {
715            Poll::Ready(Ok(pooled))
716        } else if !self.pool.is_enabled() {
717            Poll::Ready(Err(Error::PoolDisabled))
718        } else {
719            // There's a new waiter, already registered in self.checkout()
720            debug_assert!(self.waiter.is_some());
721            Poll::Pending
722        }
723    }
724}
725
726impl<T, K: Key> Drop for Checkout<T, K> {
727    fn drop(&mut self) {
728        if self.waiter.take().is_some() {
729            trace!("checkout dropped for {:?}", self.key);
730            if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) {
731                inner.clean_waiters(&self.key);
732            }
733        }
734    }
735}
736
737// FIXME: allow() required due to `impl Trait` leaking types to this lint
738#[allow(missing_debug_implementations)]
739pub struct Connecting<T: Poolable, K: Key> {
740    key: K,
741    pool: WeakOpt<Mutex<PoolInner<T, K>>>,
742}
743
744impl<T: Poolable, K: Key> Connecting<T, K> {
745    pub fn alpn_h2(self, pool: &Pool<T, K>) -> Option<Self> {
746        debug_assert!(
747            self.pool.0.is_none(),
748            "Connecting::alpn_h2 but already Http2"
749        );
750
751        pool.connecting(&self.key, Ver::Http2)
752    }
753}
754
755impl<T: Poolable, K: Key> Drop for Connecting<T, K> {
756    fn drop(&mut self) {
757        if let Some(pool) = self.pool.upgrade() {
758            // No need to panic on drop, that could abort!
759            if let Ok(mut inner) = pool.lock() {
760                inner.connected(&self.key);
761            }
762        }
763    }
764}
765
766struct Expiration(Option<Duration>);
767
768impl Expiration {
769    fn new(dur: Option<Duration>) -> Expiration {
770        Expiration(dur)
771    }
772
773    fn expires(&self, instant: Instant, now: Instant) -> bool {
774        match self.0 {
775            // Avoid `Instant::elapsed` to avoid issues like rust-lang/rust#86470.
776            Some(timeout) => now.saturating_duration_since(instant) > timeout,
777            None => false,
778        }
779    }
780}
781
782struct IdleTask<T, K: Key> {
783    timer: Timer,
784    duration: Duration,
785    pool: WeakOpt<Mutex<PoolInner<T, K>>>,
786    // This allows the IdleTask to be notified as soon as the entire
787    // Pool is fully dropped, and shutdown. This channel is never sent on,
788    // but Err(Canceled) will be received when the Pool is dropped.
789    pool_drop_notifier: oneshot::Receiver<Infallible>,
790}
791
792impl<T: Poolable + 'static, K: Key> IdleTask<T, K> {
793    async fn run(self) {
794        use futures_util::future;
795
796        let mut sleep = self.timer.sleep_until(self.timer.now() + self.duration);
797        let mut on_pool_drop = self.pool_drop_notifier;
798        loop {
799            match future::select(&mut on_pool_drop, &mut sleep).await {
800                future::Either::Left(_) => {
801                    // pool dropped, bah-bye
802                    break;
803                }
804                future::Either::Right(((), _)) => {
805                    if let Some(inner) = self.pool.upgrade() {
806                        if let Ok(mut inner) = inner.lock() {
807                            trace!("idle interval checking for expired");
808                            inner.clear_expired();
809                        }
810                    }
811
812                    let deadline = self.timer.now() + self.duration;
813                    self.timer.reset(&mut sleep, deadline);
814                }
815            }
816        }
817
818        trace!("pool closed, canceling idle interval");
819        return;
820    }
821}
822
823impl<T> WeakOpt<T> {
824    fn none() -> Self {
825        WeakOpt(None)
826    }
827
828    fn downgrade(arc: &Arc<T>) -> Self {
829        WeakOpt(Some(Arc::downgrade(arc)))
830    }
831
832    fn upgrade(&self) -> Option<Arc<T>> {
833        self.0.as_ref().and_then(Weak::upgrade)
834    }
835}
836
837#[cfg(test)]
838mod tests {
839    use std::fmt::Debug;
840    use std::future::Future;
841    use std::hash::Hash;
842    use std::pin::Pin;
843    use std::task::{self, Poll};
844    use std::time::Duration;
845
846    use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
847    use crate::rt::{TokioExecutor, TokioTimer};
848
849    use crate::common::timer;
850
851    #[derive(Clone, Debug, PartialEq, Eq, Hash)]
852    struct KeyImpl(http::uri::Scheme, http::uri::Authority);
853
854    type KeyTuple = (http::uri::Scheme, http::uri::Authority);
855
856    /// Test unique reservations.
857    #[derive(Debug, PartialEq, Eq)]
858    struct Uniq<T>(T);
859
860    impl<T: Send + 'static + Unpin> Poolable for Uniq<T> {
861        fn is_open(&self) -> bool {
862            true
863        }
864
865        fn reserve(self) -> Reservation<Self> {
866            Reservation::Unique(self)
867        }
868
869        fn can_share(&self) -> bool {
870            false
871        }
872    }
873
874    fn c<T: Poolable, K: Key>(key: K) -> Connecting<T, K> {
875        Connecting {
876            key,
877            pool: WeakOpt::none(),
878        }
879    }
880
881    fn host_key(s: &str) -> KeyImpl {
882        KeyImpl(http::uri::Scheme::HTTP, s.parse().expect("host key"))
883    }
884
885    fn pool_no_timer<T, K: Key>() -> Pool<T, K> {
886        pool_max_idle_no_timer(usize::MAX)
887    }
888
889    fn pool_max_idle_no_timer<T, K: Key>(max_idle: usize) -> Pool<T, K> {
890        let pool = Pool::new(
891            super::Config {
892                idle_timeout: Some(Duration::from_millis(100)),
893                max_idle_per_host: max_idle,
894            },
895            TokioExecutor::new(),
896            Option::<timer::Timer>::None,
897        );
898        pool.no_timer();
899        pool
900    }
901
902    #[tokio::test]
903    async fn test_pool_checkout_smoke() {
904        let pool = pool_no_timer();
905        let key = host_key("foo");
906        let pooled = pool.pooled(c(key.clone()), Uniq(41));
907
908        drop(pooled);
909
910        match pool.checkout(key).await {
911            Ok(pooled) => assert_eq!(*pooled, Uniq(41)),
912            Err(_) => panic!("not ready"),
913        };
914    }
915
916    /// Helper to check if the future is ready after polling once.
917    struct PollOnce<'a, F>(&'a mut F);
918
919    impl<F, T, U> Future for PollOnce<'_, F>
920    where
921        F: Future<Output = Result<T, U>> + Unpin,
922    {
923        type Output = Option<()>;
924
925        fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
926            match Pin::new(&mut self.0).poll(cx) {
927                Poll::Ready(Ok(_)) => Poll::Ready(Some(())),
928                Poll::Ready(Err(_)) => Poll::Ready(Some(())),
929                Poll::Pending => Poll::Ready(None),
930            }
931        }
932    }
933
934    #[tokio::test]
935    async fn test_pool_checkout_returns_none_if_expired() {
936        let pool = pool_no_timer();
937        let key = host_key("foo");
938        let pooled = pool.pooled(c(key.clone()), Uniq(41));
939
940        drop(pooled);
941        tokio::time::sleep(pool.locked().timeout.unwrap()).await;
942        let mut checkout = pool.checkout(key);
943        let poll_once = PollOnce(&mut checkout);
944        let is_not_ready = poll_once.await.is_none();
945        assert!(is_not_ready);
946    }
947
948    #[tokio::test]
949    async fn test_pool_checkout_removes_expired() {
950        let pool = pool_no_timer();
951        let key = host_key("foo");
952
953        pool.pooled(c(key.clone()), Uniq(41));
954        pool.pooled(c(key.clone()), Uniq(5));
955        pool.pooled(c(key.clone()), Uniq(99));
956
957        assert_eq!(
958            pool.locked().idle.get(&key).map(|entries| entries.len()),
959            Some(3)
960        );
961        tokio::time::sleep(pool.locked().timeout.unwrap()).await;
962
963        let mut checkout = pool.checkout(key.clone());
964        let poll_once = PollOnce(&mut checkout);
965        // checkout.await should clean out the expired
966        poll_once.await;
967        assert!(!pool.locked().idle.contains_key(&key));
968    }
969
970    #[test]
971    fn test_pool_max_idle_per_host() {
972        let pool = pool_max_idle_no_timer(2);
973        let key = host_key("foo");
974
975        pool.pooled(c(key.clone()), Uniq(41));
976        pool.pooled(c(key.clone()), Uniq(5));
977        pool.pooled(c(key.clone()), Uniq(99));
978
979        // pooled and dropped 3, max_idle should only allow 2
980        assert_eq!(
981            pool.locked().idle.get(&key).map(|entries| entries.len()),
982            Some(2)
983        );
984    }
985
986    #[tokio::test]
987    async fn test_pool_timer_removes_expired_realtime() {
988        test_pool_timer_removes_expired_inner().await
989    }
990
991    #[tokio::test(start_paused = true)]
992    async fn test_pool_timer_removes_expired_faketime() {
993        test_pool_timer_removes_expired_inner().await
994    }
995
996    async fn test_pool_timer_removes_expired_inner() {
997        let pool = Pool::new(
998            super::Config {
999                idle_timeout: Some(Duration::from_millis(10)),
1000                max_idle_per_host: usize::MAX,
1001            },
1002            TokioExecutor::new(),
1003            Some(TokioTimer::new()),
1004        );
1005
1006        let key = host_key("foo");
1007
1008        pool.pooled(c(key.clone()), Uniq(41));
1009        pool.pooled(c(key.clone()), Uniq(5));
1010        pool.pooled(c(key.clone()), Uniq(99));
1011
1012        assert_eq!(
1013            pool.locked().idle.get(&key).map(|entries| entries.len()),
1014            Some(3)
1015        );
1016
1017        // Let the timer tick passed the expiration...
1018        tokio::time::sleep(Duration::from_millis(30)).await;
1019
1020        // But minimum interval is higher, so nothing should have been reaped
1021        assert_eq!(
1022            pool.locked().idle.get(&key).map(|entries| entries.len()),
1023            Some(3)
1024        );
1025
1026        // Now wait passed the minimum interval more
1027        tokio::time::sleep(Duration::from_millis(70)).await;
1028        // Yield in case other task hasn't been able to run :shrug:
1029        tokio::task::yield_now().await;
1030
1031        assert!(!pool.locked().idle.contains_key(&key));
1032    }
1033
1034    #[tokio::test]
1035    async fn test_pool_checkout_task_unparked() {
1036        use futures_util::future::join;
1037        use futures_util::FutureExt;
1038
1039        let pool = pool_no_timer();
1040        let key = host_key("foo");
1041        let pooled = pool.pooled(c(key.clone()), Uniq(41));
1042
1043        let checkout = join(pool.checkout(key), async {
1044            // the checkout future will park first,
1045            // and then this lazy future will be polled, which will insert
1046            // the pooled back into the pool
1047            //
1048            // this test makes sure that doing so will unpark the checkout
1049            drop(pooled);
1050        })
1051        .map(|(entry, _)| entry);
1052
1053        assert_eq!(*checkout.await.unwrap(), Uniq(41));
1054    }
1055
1056    #[tokio::test]
1057    async fn test_pool_checkout_drop_cleans_up_waiters() {
1058        let pool = pool_no_timer::<Uniq<i32>, KeyImpl>();
1059        let key = host_key("foo");
1060
1061        let mut checkout1 = pool.checkout(key.clone());
1062        let mut checkout2 = pool.checkout(key.clone());
1063
1064        let poll_once1 = PollOnce(&mut checkout1);
1065        let poll_once2 = PollOnce(&mut checkout2);
1066
1067        // first poll needed to get into Pool's parked
1068        poll_once1.await;
1069        assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1070        poll_once2.await;
1071        assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
1072
1073        // on drop, clean up Pool
1074        drop(checkout1);
1075        assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1076
1077        drop(checkout2);
1078        assert!(!pool.locked().waiters.contains_key(&key));
1079    }
1080
1081    #[derive(Debug)]
1082    struct CanClose {
1083        #[allow(unused)]
1084        val: i32,
1085        closed: bool,
1086    }
1087
1088    impl Poolable for CanClose {
1089        fn is_open(&self) -> bool {
1090            !self.closed
1091        }
1092
1093        fn reserve(self) -> Reservation<Self> {
1094            Reservation::Unique(self)
1095        }
1096
1097        fn can_share(&self) -> bool {
1098            false
1099        }
1100    }
1101
1102    #[test]
1103    fn pooled_drop_if_closed_doesnt_reinsert() {
1104        let pool = pool_no_timer();
1105        let key = host_key("foo");
1106        pool.pooled(
1107            c(key.clone()),
1108            CanClose {
1109                val: 57,
1110                closed: true,
1111            },
1112        );
1113
1114        assert!(!pool.locked().idle.contains_key(&key));
1115    }
1116}