wasmtime/runtime/vm/
parking_spot.rs

1//! Implements thread wait and notify primitives with `std::sync` primitives.
2//!
3//! This is a simplified version of the `parking_lot_core` crate.
4//!
5//! There are two main operations that can be performed:
6//!
7//! - *Parking* refers to suspending the thread while simultaneously enqueuing it
8//!   on a queue keyed by some address.
9//! - *Unparking* refers to dequeuing a thread from a queue keyed by some address
10//!   and resuming it.
11
12#![deny(missing_docs)]
13
14use crate::prelude::*;
15use crate::runtime::vm::{SendSyncPtr, WaitResult};
16use std::collections::BTreeMap;
17use std::ptr::NonNull;
18use std::sync::Mutex;
19use std::sync::atomic::{AtomicU32, AtomicU64, Ordering::SeqCst};
20use std::thread::{self, Thread};
21use std::time::{Duration, Instant};
22
23#[derive(Default, Debug)]
24struct Spot {
25    head: Option<SendSyncPtr<WaiterInner>>,
26    tail: Option<SendSyncPtr<WaiterInner>>,
27}
28
29/// The thread global `ParkingSpot`.
30#[derive(Default, Debug)]
31pub struct ParkingSpot {
32    inner: Mutex<BTreeMap<u64, Spot>>,
33}
34
35#[derive(Default)]
36pub struct Waiter {
37    inner: Option<Box<WaiterInner>>,
38}
39
40struct WaiterInner {
41    // NB: this field may be read concurrently, but is only written under the
42    // lock of a `ParkingSpot`.
43    thread: Thread,
44
45    // NB: these fields are only modified/read under the lock of a
46    // `ParkingSpot`.
47    notified: bool,
48    next: Option<SendSyncPtr<WaiterInner>>,
49    prev: Option<SendSyncPtr<WaiterInner>>,
50}
51
52impl ParkingSpot {
53    /// Atomically validates if `atomic == expected` and, if so, blocks the
54    /// current thread.
55    ///
56    /// This method will first check to see if `atomic == expected` using a
57    /// `SeqCst` load ordering. If the values are not equal then the method
58    /// immediately returns with `WaitResult::Mismatch`. Otherwise the thread
59    /// will be blocked and can only be woken up with `notify` on the same
60    /// address. Note that the check-and-block operation is atomic with respect
61    /// to `notify`.
62    ///
63    /// The optional `deadline` specified can indicate a point in time after
64    /// which this thread will be unblocked. If this thread is not notified and
65    /// `deadline` is reached then `WaitResult::TimedOut` is returned. If
66    /// `deadline` is `None` then this thread will block forever waiting for
67    /// `notify`.
68    ///
69    /// The `waiter` argument is metadata used by this structure to block
70    /// the current thread.
71    ///
72    /// This method will not spuriously wake up one blocked.
73    pub fn wait32(
74        &self,
75        atomic: &AtomicU32,
76        expected: u32,
77        deadline: impl Into<Option<Instant>>,
78        waiter: &mut Waiter,
79    ) -> WaitResult {
80        self.wait(
81            atomic.as_ptr() as u64,
82            || atomic.load(SeqCst) == expected,
83            deadline.into(),
84            waiter,
85        )
86    }
87
88    /// Same as `wait32`, but for 64-bit values.
89    pub fn wait64(
90        &self,
91        atomic: &AtomicU64,
92        expected: u64,
93        deadline: impl Into<Option<Instant>>,
94        waiter: &mut Waiter,
95    ) -> WaitResult {
96        self.wait(
97            atomic.as_ptr() as u64,
98            || atomic.load(SeqCst) == expected,
99            deadline.into(),
100            waiter,
101        )
102    }
103
104    fn wait(
105        &self,
106        key: u64,
107        validate: impl FnOnce() -> bool,
108        deadline: Option<Instant>,
109        waiter: &mut Waiter,
110    ) -> WaitResult {
111        let mut inner = self
112            .inner
113            .lock()
114            .expect("failed to lock inner parking table");
115
116        // This is the "atomic" part of the `validate` check which ensure that
117        // the memory location still indicates that we're allowed to block.
118        if !validate() {
119            return WaitResult::Mismatch;
120        }
121
122        // Lazily initialize the `waiter` node if it hasn't been already, and
123        // additionally ensure it's not accidentally in some other queue.
124        let waiter = waiter.inner.get_or_insert_with(|| {
125            Box::new(WaiterInner {
126                next: None,
127                prev: None,
128                notified: false,
129                thread: thread::current(),
130            })
131        });
132        assert!(waiter.next.is_none());
133        assert!(waiter.prev.is_none());
134
135        // Clear the `notified` flag if it was previously notified and
136        // configure the thread to wakeup as our own.
137        waiter.notified = false;
138        waiter.thread = thread::current();
139
140        let ptr = SendSyncPtr::new(NonNull::from(&mut **waiter));
141        let spot = inner.entry(key).or_insert_with(Spot::default);
142        unsafe {
143            // Enqueue our `waiter` in the internal queue for this spot.
144            spot.push(ptr);
145
146            // Wait for a notification to arrive. This is done through
147            // `std::thread::park_timeout` by dropping the lock that is held.
148            // This loop is somewhat similar to a condition variable.
149            //
150            // If no timeout was given then the maximum duration is effectively
151            // infinite (500 billion years), otherwise the timeout is
152            // calculated relative to the `deadline` specified.
153            //
154            // To handle spurious wakeups if the thread wakes up but a
155            // notification wasn't received then the thread goes back to sleep.
156            let timed_out = loop {
157                let timeout = match deadline {
158                    Some(deadline) => {
159                        let now = Instant::now();
160                        if deadline <= now {
161                            break true;
162                        } else {
163                            deadline - now
164                        }
165                    }
166                    None => Duration::MAX,
167                };
168
169                drop(inner);
170                thread::park_timeout(timeout);
171                inner = self.inner.lock().unwrap();
172
173                if ptr.as_ref().notified {
174                    break false;
175                }
176            };
177
178            if timed_out {
179                // If this thread timed out then it is still present in the
180                // waiter queue, so remove it.
181                inner.get_mut(&key).unwrap().remove(ptr);
182                WaitResult::TimedOut
183            } else {
184                // If this node was notified then we should not be in a queue
185                // at this point.
186                assert!(ptr.as_ref().next.is_none());
187                assert!(ptr.as_ref().prev.is_none());
188                WaitResult::Ok
189            }
190        }
191    }
192
193    /// Notify at most `n` threads that are blocked on the given address.
194    ///
195    /// Returns the number of threads that were actually unparked.
196    pub fn notify<T>(&self, addr: &T, n: u32) -> u32 {
197        if n == 0 {
198            return 0;
199        }
200        let mut unparked = 0;
201
202        // It's known here that `n > 0` so dequeue items until `unparked`
203        // equals `n` or the queue runs out. Each thread dequeued is signaled
204        // that it's been notified and then woken up.
205        self.with_lot(addr, |spot| unsafe {
206            while let Some(mut head) = spot.pop() {
207                let head = head.as_mut();
208                assert!(head.next.is_none());
209                head.notified = true;
210                head.thread.unpark();
211                unparked += 1;
212                if unparked == n {
213                    break;
214                }
215            }
216        });
217
218        unparked
219    }
220
221    fn with_lot<T, F: FnMut(&mut Spot)>(&self, addr: &T, mut f: F) {
222        let key = addr as *const _ as u64;
223        let mut inner = self
224            .inner
225            .lock()
226            .expect("failed to lock inner parking table");
227        if let Some(spot) = inner.get_mut(&key) {
228            f(spot);
229        }
230    }
231}
232
233impl Waiter {
234    pub const fn new() -> Waiter {
235        Waiter { inner: None }
236    }
237}
238
239impl Spot {
240    /// Adds `waiter` to the queue at the end.
241    ///
242    /// # Unsafety
243    ///
244    /// This method is `unsafe` as it can only be invoked under the parking
245    /// spot's mutex. Additionally `waiter` must be a valid pointer not already
246    /// in any other queue and additionally only exclusively used by this queue
247    /// now.
248    unsafe fn push(&mut self, mut waiter: SendSyncPtr<WaiterInner>) {
249        unsafe {
250            assert!(waiter.as_ref().next.is_none());
251            assert!(waiter.as_ref().prev.is_none());
252
253            waiter.as_mut().prev = self.tail;
254            match self.tail {
255                Some(mut tail) => tail.as_mut().next = Some(waiter),
256                None => self.head = Some(waiter),
257            }
258            self.tail = Some(waiter);
259        }
260    }
261
262    /// Removes `waiter` from the queue.
263    ///
264    /// # Unsafety
265    ///
266    /// This method is `unsafe` as it can only be invoked under the parking
267    /// spot's mutex. Additionally `waiter` must be a valid pointer in this
268    /// queue.
269    unsafe fn remove(&mut self, mut waiter: SendSyncPtr<WaiterInner>) {
270        unsafe {
271            let w = waiter.as_mut();
272            match w.prev {
273                Some(mut prev) => prev.as_mut().next = w.next,
274                None => self.head = w.next,
275            }
276            match w.next {
277                Some(mut next) => next.as_mut().prev = w.prev,
278                None => self.tail = w.prev,
279            }
280            w.prev = None;
281            w.next = None;
282        }
283    }
284
285    /// Pops the head of the queue from this linked list to wake up a waiter.
286    ///
287    /// # Unsafety
288    ///
289    /// This method is `unsafe` as it can only be invoked under the parking
290    /// spot's mutex.
291    unsafe fn pop(&mut self) -> Option<SendSyncPtr<WaiterInner>> {
292        let ret = self.head?;
293        unsafe {
294            self.remove(ret);
295        }
296        Some(ret)
297    }
298
299    #[cfg(test)]
300    fn num_parked(&self) -> u32 {
301        let mut ret = 0;
302        let mut cur = self.head;
303        while let Some(next) = cur {
304            ret += 1;
305            cur = unsafe { next.as_ref().next };
306        }
307        ret
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::{ParkingSpot, Waiter};
314    use crate::prelude::*;
315    use std::sync::atomic::{AtomicU64, Ordering};
316    use std::thread;
317    use std::time::{Duration, Instant};
318
319    #[test]
320    fn atomic_wait_notify() {
321        let parking_spot = ParkingSpot::default();
322        let atomic = AtomicU64::new(0);
323
324        let wait_until_value = |val: u64, waiter: &mut Waiter| loop {
325            let cur = atomic.load(Ordering::SeqCst);
326            if cur == val {
327                break;
328            } else {
329                parking_spot.wait64(&atomic, cur, None, waiter);
330            }
331        };
332
333        thread::scope(|s| {
334            let thread1 = s.spawn(|| {
335                let mut waiter = Waiter::default();
336                atomic.store(1, Ordering::SeqCst);
337                parking_spot.notify(&atomic, u32::MAX);
338                parking_spot.wait64(&atomic, 1, None, &mut waiter);
339            });
340
341            let thread2 = s.spawn(|| {
342                let mut waiter = Waiter::default();
343                wait_until_value(1, &mut waiter);
344                atomic.store(2, Ordering::SeqCst);
345                parking_spot.notify(&atomic, u32::MAX);
346                parking_spot.wait64(&atomic, 2, None, &mut waiter);
347            });
348
349            let thread3 = s.spawn(|| {
350                let mut waiter = Waiter::default();
351                wait_until_value(2, &mut waiter);
352                atomic.store(3, Ordering::SeqCst);
353                parking_spot.notify(&atomic, u32::MAX);
354                parking_spot.wait64(&atomic, 3, None, &mut waiter);
355            });
356
357            let mut waiter = Waiter::default();
358            wait_until_value(3, &mut waiter);
359            atomic.store(4, Ordering::SeqCst);
360            parking_spot.notify(&atomic, u32::MAX);
361
362            thread1.join().unwrap();
363            thread2.join().unwrap();
364            thread3.join().unwrap();
365        });
366    }
367
368    mod parking_lot {
369        // This is a modified version of the parking_lot_core tests,
370        // which are licensed under the MIT and Apache 2.0 licenses.
371        use super::*;
372        use std::sync::Arc;
373        use std::sync::atomic::AtomicU32;
374
375        macro_rules! test {
376            ( $( $name:ident(
377                repeats: $repeats:expr,
378                latches: $latches:expr,
379                delay: $delay:expr,
380                threads: $threads:expr,
381                single_unparks: $single_unparks:expr);
382            )* ) => {
383                $(
384                #[test]
385                #[cfg_attr(miri, ignore)]
386                fn $name() {
387                    if std::env::var("WASMTIME_TEST_NO_HOG_MEMORY").is_ok() {
388                        return;
389                    }
390                    let delay = Duration::from_micros($delay);
391                    for _ in 0..$repeats {
392                        run_parking_test($latches, delay, $threads, $single_unparks);
393                    }
394                })*
395            };
396        }
397
398        test! {
399            unpark_all_one_fast(
400                repeats: 10000, latches: 1, delay: 0, threads: 1, single_unparks: 0
401            );
402            unpark_all_hundred_fast(
403                repeats: 100, latches: 1, delay: 0, threads: 100, single_unparks: 0
404            );
405            unpark_one_one_fast(
406                repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 1
407            );
408            unpark_one_hundred_fast(
409                repeats: 20, latches: 1, delay: 0, threads: 100, single_unparks: 100
410            );
411            unpark_one_fifty_then_fifty_all_fast(
412                repeats: 50, latches: 1, delay: 0, threads: 100, single_unparks: 50
413            );
414            unpark_all_one(
415                repeats: 100, latches: 1, delay: 10000, threads: 1, single_unparks: 0
416            );
417            unpark_all_hundred(
418                repeats: 100, latches: 1, delay: 10000, threads: 100, single_unparks: 0
419            );
420            unpark_one_one(
421                repeats: 10, latches: 1, delay: 10000, threads: 1, single_unparks: 1
422            );
423            unpark_one_fifty(
424                repeats: 1, latches: 1, delay: 10000, threads: 50, single_unparks: 50
425            );
426            unpark_one_fifty_then_fifty_all(
427                repeats: 2, latches: 1, delay: 10000, threads: 100, single_unparks: 50
428            );
429            hundred_unpark_all_one_fast(
430                repeats: 100, latches: 100, delay: 0, threads: 1, single_unparks: 0
431            );
432            hundred_unpark_all_one(
433                repeats: 1, latches: 100, delay: 10000, threads: 1, single_unparks: 0
434            );
435        }
436
437        fn run_parking_test(
438            num_latches: usize,
439            delay: Duration,
440            num_threads: u32,
441            num_single_unparks: u32,
442        ) {
443            let spot = ParkingSpot::default();
444
445            thread::scope(|s| {
446                let mut tests = Vec::with_capacity(num_latches);
447
448                for _ in 0..num_latches {
449                    let test = Arc::new(SingleLatchTest::new(num_threads, &spot));
450                    let mut threads = Vec::with_capacity(num_threads as _);
451                    for _ in 0..num_threads {
452                        let test = test.clone();
453                        threads.push(s.spawn(move || test.run()));
454                    }
455                    tests.push((test, threads));
456                }
457
458                for unpark_index in 0..num_single_unparks {
459                    thread::sleep(delay);
460                    for (test, _) in &tests {
461                        test.unpark_one(unpark_index);
462                    }
463                }
464
465                for (test, threads) in tests {
466                    test.finish(num_single_unparks);
467                    for thread in threads {
468                        thread.join().expect("Test thread panic");
469                    }
470                }
471            });
472        }
473
474        struct SingleLatchTest<'a> {
475            semaphore: AtomicU32,
476            num_awake: AtomicU32,
477            /// Total number of threads participating in this test.
478            num_threads: u32,
479            spot: &'a ParkingSpot,
480        }
481
482        impl<'a> SingleLatchTest<'a> {
483            pub fn new(num_threads: u32, spot: &'a ParkingSpot) -> Self {
484                Self {
485                    // This implements a fair (FIFO) semaphore, and it starts out unavailable.
486                    semaphore: AtomicU32::new(0),
487                    num_awake: AtomicU32::new(0),
488                    num_threads,
489                    spot,
490                }
491            }
492
493            pub fn run(&self) {
494                // Get one slot from the semaphore
495                self.down();
496
497                self.num_awake.fetch_add(1, Ordering::SeqCst);
498            }
499
500            pub fn unpark_one(&self, _single_unpark_index: u32) {
501                let num_awake_before_up = self.num_awake.load(Ordering::SeqCst);
502
503                self.up();
504
505                // Wait for a parked thread to wake up and update num_awake + last_awoken.
506                while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 {
507                    thread::yield_now();
508                }
509            }
510
511            pub fn finish(&self, num_single_unparks: u32) {
512                // The amount of threads not unparked via unpark_one
513                let mut num_threads_left =
514                    self.num_threads.checked_sub(num_single_unparks).unwrap();
515
516                // Wake remaining threads up with unpark_all. Has to be in a loop, because there might
517                // still be threads that has not yet parked.
518                while num_threads_left > 0 {
519                    let mut num_waiting_on_address = 0;
520                    self.spot.with_lot(&self.semaphore, |thread_data| {
521                        num_waiting_on_address = thread_data.num_parked();
522                    });
523                    assert!(num_waiting_on_address <= num_threads_left);
524
525                    let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst);
526
527                    let num_unparked = self.spot.notify(&self.semaphore, u32::MAX);
528                    assert!(num_unparked >= num_waiting_on_address);
529                    assert!(num_unparked <= num_threads_left);
530
531                    // Wait for all unparked threads to wake up and update num_awake + last_awoken.
532                    while self.num_awake.load(Ordering::SeqCst)
533                        != num_awake_before_unpark + num_unparked
534                    {
535                        thread::yield_now();
536                    }
537
538                    num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap();
539                }
540                // By now, all threads should have been woken up
541                assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads);
542
543                // Make sure no thread is parked on our semaphore address
544                let mut num_waiting_on_address = 0;
545                self.spot.with_lot(&self.semaphore, |thread_data| {
546                    num_waiting_on_address = thread_data.num_parked();
547                });
548                assert_eq!(num_waiting_on_address, 0);
549            }
550
551            pub fn down(&self) {
552                let mut old_semaphore_value = self.semaphore.fetch_sub(1, Ordering::SeqCst);
553
554                if (old_semaphore_value as i32) > 0 {
555                    // We acquired the semaphore. Done.
556                    return;
557                }
558
559                // Force this thread to go to sleep.
560                let mut waiter = Waiter::new();
561                loop {
562                    match self
563                        .spot
564                        .wait32(&self.semaphore, old_semaphore_value, None, &mut waiter)
565                    {
566                        crate::runtime::vm::WaitResult::Mismatch => {}
567                        _ => break,
568                    }
569                    old_semaphore_value = self.semaphore.load(Ordering::SeqCst);
570                }
571            }
572
573            pub fn up(&self) {
574                let old_semaphore_value = self.semaphore.fetch_add(1, Ordering::SeqCst) as i32;
575
576                // Check if anyone was waiting on the semaphore. If they were, then pass ownership to them.
577                if old_semaphore_value < 0 {
578                    // We need to continue until we have actually unparked someone. It might be that
579                    // the thread we want to pass ownership to has decremented the semaphore counter,
580                    // but not yet parked.
581                    loop {
582                        match self.spot.notify(&self.semaphore, 1) {
583                            1 => break,
584                            0 => (),
585                            i => panic!("Should not wake up {i} threads"),
586                        }
587                    }
588                }
589            }
590        }
591    }
592
593    #[test]
594    fn wait_with_timeout() {
595        let parking_spot = ParkingSpot::default();
596        let atomic = AtomicU64::new(0);
597
598        thread::scope(|s| {
599            const N: u64 = 5;
600            const M: u64 = if cfg!(miri) { 10 } else { 1000 };
601
602            let thread = s.spawn(|| {
603                let mut waiter = Waiter::new();
604                loop {
605                    let cur = atomic.load(Ordering::SeqCst);
606                    if cur == N * M {
607                        break;
608                    }
609                    let timeout = Instant::now() + Duration::from_millis(1);
610                    parking_spot.wait64(&atomic, cur, Some(timeout), &mut waiter);
611                }
612            });
613
614            let mut threads = vec![thread];
615            for _ in 0..N {
616                threads.push(s.spawn(|| {
617                    for _ in 0..M {
618                        atomic.fetch_add(1, Ordering::SeqCst);
619                        parking_spot.notify(&atomic, 1);
620                    }
621                }));
622            }
623
624            for thread in threads {
625                thread.join().unwrap();
626            }
627        });
628    }
629}