deadpool/managed/
pool.rs

1#[cfg(not(target_arch = "wasm32"))]
2use std::time::Instant;
3use std::{
4    collections::VecDeque,
5    fmt,
6    future::Future,
7    marker::PhantomData,
8    sync::{
9        atomic::{AtomicUsize, Ordering},
10        Arc, Mutex, Weak,
11    },
12    time::Duration,
13};
14
15use deadpool_runtime::Runtime;
16use tokio::sync::{Semaphore, TryAcquireError};
17
18use crate::{
19    managed::{
20        dropguard::DropGuard, hooks::Hooks, object::ObjectInner, Manager, Metrics, Object,
21        PoolBuilder, PoolConfig, PoolError, QueueMode, TimeoutType, Timeouts,
22    },
23    Status,
24};
25
26/// Generic object and connection pool.
27///
28/// This struct can be cloned and transferred across thread boundaries and uses
29/// reference counting for its internal state.
30pub struct Pool<M: Manager, W: From<Object<M>> = Object<M>> {
31    pub(crate) inner: Arc<PoolInner<M>>,
32    pub(crate) _wrapper: PhantomData<fn() -> W>,
33}
34
35// Implemented manually to avoid unnecessary trait bound on `W` type parameter.
36impl<M, W> fmt::Debug for Pool<M, W>
37where
38    M: fmt::Debug + Manager,
39    M::Type: fmt::Debug,
40    W: From<Object<M>>,
41{
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        f.debug_struct("Pool")
44            .field("inner", &self.inner)
45            .field("wrapper", &self._wrapper)
46            .finish()
47    }
48}
49
50impl<M: Manager, W: From<Object<M>>> Clone for Pool<M, W> {
51    fn clone(&self) -> Self {
52        Self {
53            inner: self.inner.clone(),
54            _wrapper: PhantomData,
55        }
56    }
57}
58
59impl<M: Manager, W: From<Object<M>>> Pool<M, W> {
60    /// Instantiates a builder for a new [`Pool`].
61    ///
62    /// This is the only way to create a [`Pool`] instance.
63    pub fn builder(manager: M) -> PoolBuilder<M, W> {
64        PoolBuilder::new(manager)
65    }
66
67    pub(crate) fn from_builder(builder: PoolBuilder<M, W>) -> Self {
68        Self {
69            inner: Arc::new(PoolInner {
70                manager: builder.manager,
71                next_id: AtomicUsize::new(0),
72                slots: Mutex::new(Slots {
73                    vec: VecDeque::with_capacity(builder.config.max_size),
74                    size: 0,
75                    max_size: builder.config.max_size,
76                }),
77                users: AtomicUsize::new(0),
78                semaphore: Semaphore::new(builder.config.max_size),
79                config: builder.config,
80                hooks: builder.hooks,
81                runtime: builder.runtime,
82            }),
83            _wrapper: PhantomData,
84        }
85    }
86
87    /// Retrieves an [`Object`] from this [`Pool`] or waits for one to
88    /// become available.
89    ///
90    /// # Errors
91    ///
92    /// See [`PoolError`] for details.
93    pub async fn get(&self) -> Result<W, PoolError<M::Error>> {
94        self.timeout_get(&self.timeouts()).await
95    }
96
97    /// Retrieves an [`Object`] from this [`Pool`] using a different `timeout`
98    /// than the configured one.
99    ///
100    /// # Errors
101    ///
102    /// See [`PoolError`] for details.
103    pub async fn timeout_get(&self, timeouts: &Timeouts) -> Result<W, PoolError<M::Error>> {
104        let _ = self.inner.users.fetch_add(1, Ordering::Relaxed);
105        let users_guard = DropGuard(|| {
106            let _ = self.inner.users.fetch_sub(1, Ordering::Relaxed);
107        });
108
109        let non_blocking = match timeouts.wait {
110            Some(t) => t.as_nanos() == 0,
111            None => false,
112        };
113
114        let permit = if non_blocking {
115            self.inner.semaphore.try_acquire().map_err(|e| match e {
116                TryAcquireError::Closed => PoolError::Closed,
117                TryAcquireError::NoPermits => PoolError::Timeout(TimeoutType::Wait),
118            })?
119        } else {
120            apply_timeout(
121                self.inner.runtime,
122                TimeoutType::Wait,
123                timeouts.wait,
124                async {
125                    self.inner
126                        .semaphore
127                        .acquire()
128                        .await
129                        .map_err(|_| PoolError::Closed)
130                },
131            )
132            .await?
133        };
134
135        let inner_obj = loop {
136            let inner_obj = match self.inner.config.queue_mode {
137                QueueMode::Fifo => self.inner.slots.lock().unwrap().vec.pop_front(),
138                QueueMode::Lifo => self.inner.slots.lock().unwrap().vec.pop_back(),
139            };
140            let inner_obj = if let Some(inner_obj) = inner_obj {
141                self.try_recycle(timeouts, inner_obj).await?
142            } else {
143                self.try_create(timeouts).await?
144            };
145            if let Some(inner_obj) = inner_obj {
146                break inner_obj;
147            }
148        };
149
150        users_guard.disarm();
151        permit.forget();
152
153        Ok(Object {
154            inner: Some(inner_obj),
155            pool: self.weak(),
156        }
157        .into())
158    }
159
160    #[inline]
161    async fn try_recycle(
162        &self,
163        timeouts: &Timeouts,
164        inner_obj: ObjectInner<M>,
165    ) -> Result<Option<ObjectInner<M>>, PoolError<M::Error>> {
166        let mut unready_obj = UnreadyObject {
167            inner: Some(inner_obj),
168            pool: &self.inner,
169        };
170        let inner = unready_obj.inner();
171
172        // Apply pre_recycle hooks
173        if let Err(_e) = self.inner.hooks.pre_recycle.apply(inner).await {
174            // TODO log pre_recycle error
175            return Ok(None);
176        }
177
178        if apply_timeout(
179            self.inner.runtime,
180            TimeoutType::Recycle,
181            timeouts.recycle,
182            self.inner.manager.recycle(&mut inner.obj, &inner.metrics),
183        )
184        .await
185        .is_err()
186        {
187            return Ok(None);
188        }
189
190        // Apply post_recycle hooks
191        if let Err(_e) = self.inner.hooks.post_recycle.apply(inner).await {
192            // TODO log post_recycle error
193            return Ok(None);
194        }
195
196        inner.metrics.recycle_count += 1;
197        #[cfg(not(target_arch = "wasm32"))]
198        {
199            inner.metrics.recycled = Some(Instant::now());
200        }
201
202        Ok(Some(unready_obj.ready()))
203    }
204
205    #[inline]
206    async fn try_create(
207        &self,
208        timeouts: &Timeouts,
209    ) -> Result<Option<ObjectInner<M>>, PoolError<M::Error>> {
210        let mut unready_obj = UnreadyObject {
211            inner: Some(ObjectInner {
212                obj: apply_timeout(
213                    self.inner.runtime,
214                    TimeoutType::Create,
215                    timeouts.create,
216                    self.inner.manager.create(),
217                )
218                .await?,
219                id: self.inner.next_id.fetch_add(1, Ordering::Relaxed),
220                metrics: Metrics::default(),
221            }),
222            pool: &self.inner,
223        };
224
225        self.inner.slots.lock().unwrap().size += 1;
226
227        // Apply post_create hooks
228        if let Err(e) = self
229            .inner
230            .hooks
231            .post_create
232            .apply(unready_obj.inner())
233            .await
234        {
235            return Err(PoolError::PostCreateHook(e));
236        }
237
238        Ok(Some(unready_obj.ready()))
239    }
240
241    /**
242     * Resize the pool. This change the `max_size` of the pool dropping
243     * excess objects and/or making space for new ones.
244     *
245     * If the pool is closed this method does nothing. The [`Pool::status`] method
246     * always reports a `max_size` of 0 for closed pools.
247     */
248    pub fn resize(&self, max_size: usize) {
249        if self.inner.semaphore.is_closed() {
250            return;
251        }
252        let mut slots = self.inner.slots.lock().unwrap();
253        let old_max_size = slots.max_size;
254        slots.max_size = max_size;
255        // shrink pool
256        if max_size < old_max_size {
257            while slots.size > slots.max_size {
258                if let Ok(permit) = self.inner.semaphore.try_acquire() {
259                    permit.forget();
260                    if slots.vec.pop_front().is_some() {
261                        slots.size -= 1;
262                    }
263                } else {
264                    break;
265                }
266            }
267            // Create a new VecDeque with a smaller capacity
268            let mut vec = VecDeque::with_capacity(max_size);
269            for obj in slots.vec.drain(..) {
270                vec.push_back(obj);
271            }
272            slots.vec = vec;
273        }
274        // grow pool
275        if max_size > old_max_size {
276            let additional = slots.max_size - old_max_size;
277            slots.vec.reserve_exact(additional);
278            self.inner.semaphore.add_permits(additional);
279        }
280    }
281
282    /// Retains only the objects specified by the given function.
283    ///
284    /// This function is typically used to remove objects from
285    /// the pool based on their current state or metrics.
286    ///
287    /// **Caution:** This function blocks the entire pool while
288    /// it is running. Therefore the given function should not
289    /// block.
290    ///
291    /// The following example starts a background task that
292    /// runs every 30 seconds and removes objects from the pool
293    /// that haven't been used for more than one minute.
294    ///
295    /// ```rust,ignore
296    /// let interval = Duration::from_secs(30);
297    /// let max_age = Duration::from_secs(60);
298    /// tokio::spawn(async move {
299    ///     loop {
300    ///         tokio::time::sleep(interval).await;
301    ///         pool.retain(|_, metrics| metrics.last_used() < max_age);
302    ///     }
303    /// });
304    /// ```
305    pub fn retain(
306        &self,
307        mut predicate: impl FnMut(&M::Type, Metrics) -> bool,
308    ) -> RetainResult<M::Type> {
309        let mut removed = Vec::with_capacity(self.status().size);
310        let mut guard = self.inner.slots.lock().unwrap();
311        let mut i = 0;
312        // This code can be simplified once `Vec::extract_if` lands in stable Rust.
313        // https://doc.rust-lang.org/std/vec/struct.Vec.html#method.extract_if
314        while i < guard.vec.len() {
315            let obj = &mut guard.vec[i];
316            if predicate(&mut obj.obj, obj.metrics) {
317                i += 1;
318            } else {
319                let mut obj = guard.vec.remove(i).unwrap();
320                self.manager().detach(&mut obj.obj);
321                removed.push(obj.obj);
322            }
323        }
324        guard.size -= removed.len();
325        RetainResult {
326            retained: i,
327            removed,
328        }
329    }
330
331    /// Get current timeout configuration
332    pub fn timeouts(&self) -> Timeouts {
333        self.inner.config.timeouts
334    }
335
336    /// Closes this [`Pool`].
337    ///
338    /// All current and future tasks waiting for [`Object`]s will return
339    /// [`PoolError::Closed`] immediately.
340    ///
341    /// This operation resizes the pool to 0.
342    pub fn close(&self) {
343        self.resize(0);
344        self.inner.semaphore.close();
345    }
346
347    /// Indicates whether this [`Pool`] has been closed.
348    pub fn is_closed(&self) -> bool {
349        self.inner.semaphore.is_closed()
350    }
351
352    /// Retrieves [`Status`] of this [`Pool`].
353    #[must_use]
354    pub fn status(&self) -> Status {
355        let slots = self.inner.slots.lock().unwrap();
356        let users = self.inner.users.load(Ordering::Relaxed);
357        let (available, waiting) = if users < slots.size {
358            (slots.size - users, 0)
359        } else {
360            (0, users - slots.size)
361        };
362        Status {
363            max_size: slots.max_size,
364            size: slots.size,
365            available,
366            waiting,
367        }
368    }
369
370    /// Returns [`Manager`] of this [`Pool`].
371    #[must_use]
372    pub fn manager(&self) -> &M {
373        &self.inner.manager
374    }
375
376    /// Returns a [`WeakPool<T>`] of this [`Pool`].
377    pub fn weak(&self) -> WeakPool<M> {
378        WeakPool {
379            inner: Arc::downgrade(&self.inner),
380            _wrapper: PhantomData,
381        }
382    }
383}
384
385/// A weak reference to a [`Pool<T>`], used to avoid keeping the pool alive.
386///
387/// `WeakPool<T>` is analogous to [`std::sync::Weak<T>`] for [`Pool<T>`], and
388/// is typically used in situations where you need a non-owning reference to a pool,
389/// such as in background tasks, managers, or callbacks that should not extend
390/// the lifetime of the pool.
391///
392/// This allows components to retain a reference to the pool while avoiding
393/// reference cycles or prolonging its lifetime unnecessarily.
394///
395/// To access the pool, use [`WeakPool::upgrade`] to attempt to get a strong reference.
396#[derive(Debug)]
397pub struct WeakPool<M: Manager, W: From<Object<M>> = Object<M>> {
398    inner: Weak<PoolInner<M>>,
399    _wrapper: PhantomData<fn() -> W>,
400}
401
402impl<M: Manager, W: From<Object<M>>> WeakPool<M, W> {
403    /// Attempts to upgrade the `WeakPool` to a strong [`Pool<T>`] reference.
404    ///
405    /// If the pool has already been dropped (i.e., no strong references remain),
406    /// this returns `None`.
407    pub fn upgrade(&self) -> Option<Pool<M, W>> {
408        Some(Pool {
409            inner: self.inner.upgrade()?,
410            _wrapper: PhantomData,
411        })
412    }
413}
414
415pub(crate) struct PoolInner<M: Manager> {
416    manager: M,
417    next_id: AtomicUsize,
418    slots: Mutex<Slots<ObjectInner<M>>>,
419    /// Number of ['Pool'] users. A user is both a future which is waiting for an ['Object'] or one
420    /// with an ['Object'] which hasn't been returned, yet.
421    users: AtomicUsize,
422    semaphore: Semaphore,
423    config: PoolConfig,
424    runtime: Option<Runtime>,
425    hooks: Hooks<M>,
426}
427
428#[derive(Debug)]
429struct Slots<T> {
430    vec: VecDeque<T>,
431    size: usize,
432    max_size: usize,
433}
434
435// Implemented manually to avoid unnecessary trait bound on the struct.
436impl<M> fmt::Debug for PoolInner<M>
437where
438    M: fmt::Debug + Manager,
439    M::Type: fmt::Debug,
440{
441    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
442        f.debug_struct("PoolInner")
443            .field("manager", &self.manager)
444            .field("slots", &self.slots)
445            .field("used", &self.users)
446            .field("semaphore", &self.semaphore)
447            .field("config", &self.config)
448            .field("runtime", &self.runtime)
449            .field("hooks", &self.hooks)
450            .finish()
451    }
452}
453
454impl<M: Manager> PoolInner<M> {
455    pub(crate) fn return_object(&self, mut inner: ObjectInner<M>) {
456        let _ = self.users.fetch_sub(1, Ordering::Relaxed);
457        let mut slots = self.slots.lock().unwrap();
458        if slots.size <= slots.max_size {
459            slots.vec.push_back(inner);
460            drop(slots);
461            self.semaphore.add_permits(1);
462        } else {
463            slots.size -= 1;
464            drop(slots);
465            self.manager.detach(&mut inner.obj);
466        }
467    }
468    pub(crate) fn detach_object(&self, obj: &mut M::Type) {
469        let _ = self.users.fetch_sub(1, Ordering::Relaxed);
470        let mut slots = self.slots.lock().unwrap();
471        let add_permits = slots.size <= slots.max_size;
472        slots.size -= 1;
473        drop(slots);
474        if add_permits {
475            self.semaphore.add_permits(1);
476        }
477        self.manager.detach(obj);
478    }
479}
480
481struct UnreadyObject<'a, M: Manager> {
482    inner: Option<ObjectInner<M>>,
483    pool: &'a PoolInner<M>,
484}
485
486impl<M: Manager> UnreadyObject<'_, M> {
487    fn ready(mut self) -> ObjectInner<M> {
488        self.inner.take().unwrap()
489    }
490    fn inner(&mut self) -> &mut ObjectInner<M> {
491        self.inner.as_mut().unwrap()
492    }
493}
494
495impl<M: Manager> Drop for UnreadyObject<'_, M> {
496    fn drop(&mut self) {
497        if let Some(mut inner) = self.inner.take() {
498            self.pool.slots.lock().unwrap().size -= 1;
499            self.pool.manager.detach(&mut inner.obj);
500        }
501    }
502}
503
504async fn apply_timeout<O, E>(
505    runtime: Option<Runtime>,
506    timeout_type: TimeoutType,
507    duration: Option<Duration>,
508    future: impl Future<Output = Result<O, impl Into<PoolError<E>>>>,
509) -> Result<O, PoolError<E>> {
510    match (runtime, duration) {
511        (_, None) => future.await.map_err(Into::into),
512        (Some(runtime), Some(duration)) => runtime
513            .timeout(duration, future)
514            .await
515            .ok_or(PoolError::Timeout(timeout_type))?
516            .map_err(Into::into),
517        (None, Some(_)) => Err(PoolError::NoRuntimeSpecified),
518    }
519}
520
521#[derive(Debug)]
522/// This is the result returned by `Pool::retain`
523pub struct RetainResult<T> {
524    /// Number of retained objects
525    pub retained: usize,
526    /// Objects that were removed from the pool
527    pub removed: Vec<T>,
528}
529
530impl<T> Default for RetainResult<T> {
531    fn default() -> Self {
532        Self {
533            retained: Default::default(),
534            removed: Default::default(),
535        }
536    }
537}