1#[cfg(not(target_arch = "wasm32"))]
2use std::time::Instant;
3use std::{
4 collections::VecDeque,
5 fmt,
6 future::Future,
7 marker::PhantomData,
8 sync::{
9 atomic::{AtomicUsize, Ordering},
10 Arc, Mutex, Weak,
11 },
12 time::Duration,
13};
14
15use deadpool_runtime::Runtime;
16use tokio::sync::{Semaphore, TryAcquireError};
17
18use crate::{
19 managed::{
20 dropguard::DropGuard, hooks::Hooks, object::ObjectInner, Manager, Metrics, Object,
21 PoolBuilder, PoolConfig, PoolError, QueueMode, TimeoutType, Timeouts,
22 },
23 Status,
24};
25
26pub struct Pool<M: Manager, W: From<Object<M>> = Object<M>> {
31 pub(crate) inner: Arc<PoolInner<M>>,
32 pub(crate) _wrapper: PhantomData<fn() -> W>,
33}
34
35impl<M, W> fmt::Debug for Pool<M, W>
37where
38 M: fmt::Debug + Manager,
39 M::Type: fmt::Debug,
40 W: From<Object<M>>,
41{
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 f.debug_struct("Pool")
44 .field("inner", &self.inner)
45 .field("wrapper", &self._wrapper)
46 .finish()
47 }
48}
49
50impl<M: Manager, W: From<Object<M>>> Clone for Pool<M, W> {
51 fn clone(&self) -> Self {
52 Self {
53 inner: self.inner.clone(),
54 _wrapper: PhantomData,
55 }
56 }
57}
58
59impl<M: Manager, W: From<Object<M>>> Pool<M, W> {
60 pub fn builder(manager: M) -> PoolBuilder<M, W> {
64 PoolBuilder::new(manager)
65 }
66
67 pub(crate) fn from_builder(builder: PoolBuilder<M, W>) -> Self {
68 Self {
69 inner: Arc::new(PoolInner {
70 manager: builder.manager,
71 next_id: AtomicUsize::new(0),
72 slots: Mutex::new(Slots {
73 vec: VecDeque::with_capacity(builder.config.max_size),
74 size: 0,
75 max_size: builder.config.max_size,
76 }),
77 users: AtomicUsize::new(0),
78 semaphore: Semaphore::new(builder.config.max_size),
79 config: builder.config,
80 hooks: builder.hooks,
81 runtime: builder.runtime,
82 }),
83 _wrapper: PhantomData,
84 }
85 }
86
87 pub async fn get(&self) -> Result<W, PoolError<M::Error>> {
94 self.timeout_get(&self.timeouts()).await
95 }
96
97 pub async fn timeout_get(&self, timeouts: &Timeouts) -> Result<W, PoolError<M::Error>> {
104 let _ = self.inner.users.fetch_add(1, Ordering::Relaxed);
105 let users_guard = DropGuard(|| {
106 let _ = self.inner.users.fetch_sub(1, Ordering::Relaxed);
107 });
108
109 let non_blocking = match timeouts.wait {
110 Some(t) => t.as_nanos() == 0,
111 None => false,
112 };
113
114 let permit = if non_blocking {
115 self.inner.semaphore.try_acquire().map_err(|e| match e {
116 TryAcquireError::Closed => PoolError::Closed,
117 TryAcquireError::NoPermits => PoolError::Timeout(TimeoutType::Wait),
118 })?
119 } else {
120 apply_timeout(
121 self.inner.runtime,
122 TimeoutType::Wait,
123 timeouts.wait,
124 async {
125 self.inner
126 .semaphore
127 .acquire()
128 .await
129 .map_err(|_| PoolError::Closed)
130 },
131 )
132 .await?
133 };
134
135 let inner_obj = loop {
136 let inner_obj = match self.inner.config.queue_mode {
137 QueueMode::Fifo => self.inner.slots.lock().unwrap().vec.pop_front(),
138 QueueMode::Lifo => self.inner.slots.lock().unwrap().vec.pop_back(),
139 };
140 let inner_obj = if let Some(inner_obj) = inner_obj {
141 self.try_recycle(timeouts, inner_obj).await?
142 } else {
143 self.try_create(timeouts).await?
144 };
145 if let Some(inner_obj) = inner_obj {
146 break inner_obj;
147 }
148 };
149
150 users_guard.disarm();
151 permit.forget();
152
153 Ok(Object {
154 inner: Some(inner_obj),
155 pool: self.weak(),
156 }
157 .into())
158 }
159
160 #[inline]
161 async fn try_recycle(
162 &self,
163 timeouts: &Timeouts,
164 inner_obj: ObjectInner<M>,
165 ) -> Result<Option<ObjectInner<M>>, PoolError<M::Error>> {
166 let mut unready_obj = UnreadyObject {
167 inner: Some(inner_obj),
168 pool: &self.inner,
169 };
170 let inner = unready_obj.inner();
171
172 if let Err(_e) = self.inner.hooks.pre_recycle.apply(inner).await {
174 return Ok(None);
176 }
177
178 if apply_timeout(
179 self.inner.runtime,
180 TimeoutType::Recycle,
181 timeouts.recycle,
182 self.inner.manager.recycle(&mut inner.obj, &inner.metrics),
183 )
184 .await
185 .is_err()
186 {
187 return Ok(None);
188 }
189
190 if let Err(_e) = self.inner.hooks.post_recycle.apply(inner).await {
192 return Ok(None);
194 }
195
196 inner.metrics.recycle_count += 1;
197 #[cfg(not(target_arch = "wasm32"))]
198 {
199 inner.metrics.recycled = Some(Instant::now());
200 }
201
202 Ok(Some(unready_obj.ready()))
203 }
204
205 #[inline]
206 async fn try_create(
207 &self,
208 timeouts: &Timeouts,
209 ) -> Result<Option<ObjectInner<M>>, PoolError<M::Error>> {
210 let mut unready_obj = UnreadyObject {
211 inner: Some(ObjectInner {
212 obj: apply_timeout(
213 self.inner.runtime,
214 TimeoutType::Create,
215 timeouts.create,
216 self.inner.manager.create(),
217 )
218 .await?,
219 id: self.inner.next_id.fetch_add(1, Ordering::Relaxed),
220 metrics: Metrics::default(),
221 }),
222 pool: &self.inner,
223 };
224
225 self.inner.slots.lock().unwrap().size += 1;
226
227 if let Err(e) = self
229 .inner
230 .hooks
231 .post_create
232 .apply(unready_obj.inner())
233 .await
234 {
235 return Err(PoolError::PostCreateHook(e));
236 }
237
238 Ok(Some(unready_obj.ready()))
239 }
240
241 pub fn resize(&self, max_size: usize) {
249 if self.inner.semaphore.is_closed() {
250 return;
251 }
252 let mut slots = self.inner.slots.lock().unwrap();
253 let old_max_size = slots.max_size;
254 slots.max_size = max_size;
255 if max_size < old_max_size {
257 while slots.size > slots.max_size {
258 if let Ok(permit) = self.inner.semaphore.try_acquire() {
259 permit.forget();
260 if slots.vec.pop_front().is_some() {
261 slots.size -= 1;
262 }
263 } else {
264 break;
265 }
266 }
267 let mut vec = VecDeque::with_capacity(max_size);
269 for obj in slots.vec.drain(..) {
270 vec.push_back(obj);
271 }
272 slots.vec = vec;
273 }
274 if max_size > old_max_size {
276 let additional = slots.max_size - old_max_size;
277 slots.vec.reserve_exact(additional);
278 self.inner.semaphore.add_permits(additional);
279 }
280 }
281
282 pub fn retain(
306 &self,
307 mut predicate: impl FnMut(&M::Type, Metrics) -> bool,
308 ) -> RetainResult<M::Type> {
309 let mut removed = Vec::with_capacity(self.status().size);
310 let mut guard = self.inner.slots.lock().unwrap();
311 let mut i = 0;
312 while i < guard.vec.len() {
315 let obj = &mut guard.vec[i];
316 if predicate(&mut obj.obj, obj.metrics) {
317 i += 1;
318 } else {
319 let mut obj = guard.vec.remove(i).unwrap();
320 self.manager().detach(&mut obj.obj);
321 removed.push(obj.obj);
322 }
323 }
324 guard.size -= removed.len();
325 RetainResult {
326 retained: i,
327 removed,
328 }
329 }
330
331 pub fn timeouts(&self) -> Timeouts {
333 self.inner.config.timeouts
334 }
335
336 pub fn close(&self) {
343 self.resize(0);
344 self.inner.semaphore.close();
345 }
346
347 pub fn is_closed(&self) -> bool {
349 self.inner.semaphore.is_closed()
350 }
351
352 #[must_use]
354 pub fn status(&self) -> Status {
355 let slots = self.inner.slots.lock().unwrap();
356 let users = self.inner.users.load(Ordering::Relaxed);
357 let (available, waiting) = if users < slots.size {
358 (slots.size - users, 0)
359 } else {
360 (0, users - slots.size)
361 };
362 Status {
363 max_size: slots.max_size,
364 size: slots.size,
365 available,
366 waiting,
367 }
368 }
369
370 #[must_use]
372 pub fn manager(&self) -> &M {
373 &self.inner.manager
374 }
375
376 pub fn weak(&self) -> WeakPool<M> {
378 WeakPool {
379 inner: Arc::downgrade(&self.inner),
380 _wrapper: PhantomData,
381 }
382 }
383}
384
385#[derive(Debug)]
397pub struct WeakPool<M: Manager, W: From<Object<M>> = Object<M>> {
398 inner: Weak<PoolInner<M>>,
399 _wrapper: PhantomData<fn() -> W>,
400}
401
402impl<M: Manager, W: From<Object<M>>> WeakPool<M, W> {
403 pub fn upgrade(&self) -> Option<Pool<M, W>> {
408 Some(Pool {
409 inner: self.inner.upgrade()?,
410 _wrapper: PhantomData,
411 })
412 }
413}
414
415pub(crate) struct PoolInner<M: Manager> {
416 manager: M,
417 next_id: AtomicUsize,
418 slots: Mutex<Slots<ObjectInner<M>>>,
419 users: AtomicUsize,
422 semaphore: Semaphore,
423 config: PoolConfig,
424 runtime: Option<Runtime>,
425 hooks: Hooks<M>,
426}
427
428#[derive(Debug)]
429struct Slots<T> {
430 vec: VecDeque<T>,
431 size: usize,
432 max_size: usize,
433}
434
435impl<M> fmt::Debug for PoolInner<M>
437where
438 M: fmt::Debug + Manager,
439 M::Type: fmt::Debug,
440{
441 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
442 f.debug_struct("PoolInner")
443 .field("manager", &self.manager)
444 .field("slots", &self.slots)
445 .field("used", &self.users)
446 .field("semaphore", &self.semaphore)
447 .field("config", &self.config)
448 .field("runtime", &self.runtime)
449 .field("hooks", &self.hooks)
450 .finish()
451 }
452}
453
454impl<M: Manager> PoolInner<M> {
455 pub(crate) fn return_object(&self, mut inner: ObjectInner<M>) {
456 let _ = self.users.fetch_sub(1, Ordering::Relaxed);
457 let mut slots = self.slots.lock().unwrap();
458 if slots.size <= slots.max_size {
459 slots.vec.push_back(inner);
460 drop(slots);
461 self.semaphore.add_permits(1);
462 } else {
463 slots.size -= 1;
464 drop(slots);
465 self.manager.detach(&mut inner.obj);
466 }
467 }
468 pub(crate) fn detach_object(&self, obj: &mut M::Type) {
469 let _ = self.users.fetch_sub(1, Ordering::Relaxed);
470 let mut slots = self.slots.lock().unwrap();
471 let add_permits = slots.size <= slots.max_size;
472 slots.size -= 1;
473 drop(slots);
474 if add_permits {
475 self.semaphore.add_permits(1);
476 }
477 self.manager.detach(obj);
478 }
479}
480
481struct UnreadyObject<'a, M: Manager> {
482 inner: Option<ObjectInner<M>>,
483 pool: &'a PoolInner<M>,
484}
485
486impl<M: Manager> UnreadyObject<'_, M> {
487 fn ready(mut self) -> ObjectInner<M> {
488 self.inner.take().unwrap()
489 }
490 fn inner(&mut self) -> &mut ObjectInner<M> {
491 self.inner.as_mut().unwrap()
492 }
493}
494
495impl<M: Manager> Drop for UnreadyObject<'_, M> {
496 fn drop(&mut self) {
497 if let Some(mut inner) = self.inner.take() {
498 self.pool.slots.lock().unwrap().size -= 1;
499 self.pool.manager.detach(&mut inner.obj);
500 }
501 }
502}
503
504async fn apply_timeout<O, E>(
505 runtime: Option<Runtime>,
506 timeout_type: TimeoutType,
507 duration: Option<Duration>,
508 future: impl Future<Output = Result<O, impl Into<PoolError<E>>>>,
509) -> Result<O, PoolError<E>> {
510 match (runtime, duration) {
511 (_, None) => future.await.map_err(Into::into),
512 (Some(runtime), Some(duration)) => runtime
513 .timeout(duration, future)
514 .await
515 .ok_or(PoolError::Timeout(timeout_type))?
516 .map_err(Into::into),
517 (None, Some(_)) => Err(PoolError::NoRuntimeSpecified),
518 }
519}
520
521#[derive(Debug)]
522pub struct RetainResult<T> {
524 pub retained: usize,
526 pub removed: Vec<T>,
528}
529
530impl<T> Default for RetainResult<T> {
531 fn default() -> Self {
532 Self {
533 retained: Default::default(),
534 removed: Default::default(),
535 }
536 }
537}