1#![deny(missing_docs)]
13
14use crate::prelude::*;
15use crate::runtime::vm::{SendSyncPtr, WaitResult};
16use std::collections::BTreeMap;
17use std::ptr::NonNull;
18use std::sync::Mutex;
19use std::sync::atomic::{AtomicU32, AtomicU64, Ordering::SeqCst};
20use std::thread::{self, Thread};
21use std::time::{Duration, Instant};
22
23#[derive(Default, Debug)]
24struct Spot {
25 head: Option<SendSyncPtr<WaiterInner>>,
26 tail: Option<SendSyncPtr<WaiterInner>>,
27}
28
29#[derive(Default, Debug)]
31pub struct ParkingSpot {
32 inner: Mutex<BTreeMap<u64, Spot>>,
33}
34
35#[derive(Default)]
36pub struct Waiter {
37 inner: Option<Box<WaiterInner>>,
38}
39
40struct WaiterInner {
41 thread: Thread,
44
45 notified: bool,
48 next: Option<SendSyncPtr<WaiterInner>>,
49 prev: Option<SendSyncPtr<WaiterInner>>,
50}
51
52impl ParkingSpot {
53 pub fn wait32(
74 &self,
75 atomic: &AtomicU32,
76 expected: u32,
77 deadline: impl Into<Option<Instant>>,
78 waiter: &mut Waiter,
79 ) -> WaitResult {
80 self.wait(
81 atomic.as_ptr() as u64,
82 || atomic.load(SeqCst) == expected,
83 deadline.into(),
84 waiter,
85 )
86 }
87
88 pub fn wait64(
90 &self,
91 atomic: &AtomicU64,
92 expected: u64,
93 deadline: impl Into<Option<Instant>>,
94 waiter: &mut Waiter,
95 ) -> WaitResult {
96 self.wait(
97 atomic.as_ptr() as u64,
98 || atomic.load(SeqCst) == expected,
99 deadline.into(),
100 waiter,
101 )
102 }
103
104 fn wait(
105 &self,
106 key: u64,
107 validate: impl FnOnce() -> bool,
108 deadline: Option<Instant>,
109 waiter: &mut Waiter,
110 ) -> WaitResult {
111 let mut inner = self
112 .inner
113 .lock()
114 .expect("failed to lock inner parking table");
115
116 if !validate() {
119 return WaitResult::Mismatch;
120 }
121
122 let waiter = waiter.inner.get_or_insert_with(|| {
125 Box::new(WaiterInner {
126 next: None,
127 prev: None,
128 notified: false,
129 thread: thread::current(),
130 })
131 });
132 assert!(waiter.next.is_none());
133 assert!(waiter.prev.is_none());
134
135 waiter.notified = false;
138 waiter.thread = thread::current();
139
140 let ptr = SendSyncPtr::new(NonNull::from(&mut **waiter));
141 let spot = inner.entry(key).or_insert_with(Spot::default);
142 unsafe {
143 spot.push(ptr);
145
146 let timed_out = loop {
157 let timeout = match deadline {
158 Some(deadline) => {
159 let now = Instant::now();
160 if deadline <= now {
161 break true;
162 } else {
163 deadline - now
164 }
165 }
166 None => Duration::MAX,
167 };
168
169 drop(inner);
170 thread::park_timeout(timeout);
171 inner = self.inner.lock().unwrap();
172
173 if ptr.as_ref().notified {
174 break false;
175 }
176 };
177
178 if timed_out {
179 inner.get_mut(&key).unwrap().remove(ptr);
182 WaitResult::TimedOut
183 } else {
184 assert!(ptr.as_ref().next.is_none());
187 assert!(ptr.as_ref().prev.is_none());
188 WaitResult::Ok
189 }
190 }
191 }
192
193 pub fn notify<T>(&self, addr: &T, n: u32) -> u32 {
197 if n == 0 {
198 return 0;
199 }
200 let mut unparked = 0;
201
202 self.with_lot(addr, |spot| unsafe {
206 while let Some(mut head) = spot.pop() {
207 let head = head.as_mut();
208 assert!(head.next.is_none());
209 head.notified = true;
210 head.thread.unpark();
211 unparked += 1;
212 if unparked == n {
213 break;
214 }
215 }
216 });
217
218 unparked
219 }
220
221 fn with_lot<T, F: FnMut(&mut Spot)>(&self, addr: &T, mut f: F) {
222 let key = addr as *const _ as u64;
223 let mut inner = self
224 .inner
225 .lock()
226 .expect("failed to lock inner parking table");
227 if let Some(spot) = inner.get_mut(&key) {
228 f(spot);
229 }
230 }
231}
232
233impl Waiter {
234 pub const fn new() -> Waiter {
235 Waiter { inner: None }
236 }
237}
238
239impl Spot {
240 unsafe fn push(&mut self, mut waiter: SendSyncPtr<WaiterInner>) {
249 unsafe {
250 assert!(waiter.as_ref().next.is_none());
251 assert!(waiter.as_ref().prev.is_none());
252
253 waiter.as_mut().prev = self.tail;
254 match self.tail {
255 Some(mut tail) => tail.as_mut().next = Some(waiter),
256 None => self.head = Some(waiter),
257 }
258 self.tail = Some(waiter);
259 }
260 }
261
262 unsafe fn remove(&mut self, mut waiter: SendSyncPtr<WaiterInner>) {
270 unsafe {
271 let w = waiter.as_mut();
272 match w.prev {
273 Some(mut prev) => prev.as_mut().next = w.next,
274 None => self.head = w.next,
275 }
276 match w.next {
277 Some(mut next) => next.as_mut().prev = w.prev,
278 None => self.tail = w.prev,
279 }
280 w.prev = None;
281 w.next = None;
282 }
283 }
284
285 unsafe fn pop(&mut self) -> Option<SendSyncPtr<WaiterInner>> {
292 let ret = self.head?;
293 unsafe {
294 self.remove(ret);
295 }
296 Some(ret)
297 }
298
299 #[cfg(test)]
300 fn num_parked(&self) -> u32 {
301 let mut ret = 0;
302 let mut cur = self.head;
303 while let Some(next) = cur {
304 ret += 1;
305 cur = unsafe { next.as_ref().next };
306 }
307 ret
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::{ParkingSpot, Waiter};
314 use crate::prelude::*;
315 use std::sync::atomic::{AtomicU64, Ordering};
316 use std::thread;
317 use std::time::{Duration, Instant};
318
319 #[test]
320 fn atomic_wait_notify() {
321 let parking_spot = ParkingSpot::default();
322 let atomic = AtomicU64::new(0);
323
324 let wait_until_value = |val: u64, waiter: &mut Waiter| loop {
325 let cur = atomic.load(Ordering::SeqCst);
326 if cur == val {
327 break;
328 } else {
329 parking_spot.wait64(&atomic, cur, None, waiter);
330 }
331 };
332
333 thread::scope(|s| {
334 let thread1 = s.spawn(|| {
335 let mut waiter = Waiter::default();
336 atomic.store(1, Ordering::SeqCst);
337 parking_spot.notify(&atomic, u32::MAX);
338 parking_spot.wait64(&atomic, 1, None, &mut waiter);
339 });
340
341 let thread2 = s.spawn(|| {
342 let mut waiter = Waiter::default();
343 wait_until_value(1, &mut waiter);
344 atomic.store(2, Ordering::SeqCst);
345 parking_spot.notify(&atomic, u32::MAX);
346 parking_spot.wait64(&atomic, 2, None, &mut waiter);
347 });
348
349 let thread3 = s.spawn(|| {
350 let mut waiter = Waiter::default();
351 wait_until_value(2, &mut waiter);
352 atomic.store(3, Ordering::SeqCst);
353 parking_spot.notify(&atomic, u32::MAX);
354 parking_spot.wait64(&atomic, 3, None, &mut waiter);
355 });
356
357 let mut waiter = Waiter::default();
358 wait_until_value(3, &mut waiter);
359 atomic.store(4, Ordering::SeqCst);
360 parking_spot.notify(&atomic, u32::MAX);
361
362 thread1.join().unwrap();
363 thread2.join().unwrap();
364 thread3.join().unwrap();
365 });
366 }
367
368 mod parking_lot {
369 use super::*;
372 use std::sync::Arc;
373 use std::sync::atomic::AtomicU32;
374
375 macro_rules! test {
376 ( $( $name:ident(
377 repeats: $repeats:expr,
378 latches: $latches:expr,
379 delay: $delay:expr,
380 threads: $threads:expr,
381 single_unparks: $single_unparks:expr);
382 )* ) => {
383 $(
384 #[test]
385 #[cfg_attr(miri, ignore)]
386 fn $name() {
387 if std::env::var("WASMTIME_TEST_NO_HOG_MEMORY").is_ok() {
388 return;
389 }
390 let delay = Duration::from_micros($delay);
391 for _ in 0..$repeats {
392 run_parking_test($latches, delay, $threads, $single_unparks);
393 }
394 })*
395 };
396 }
397
398 test! {
399 unpark_all_one_fast(
400 repeats: 10000, latches: 1, delay: 0, threads: 1, single_unparks: 0
401 );
402 unpark_all_hundred_fast(
403 repeats: 100, latches: 1, delay: 0, threads: 100, single_unparks: 0
404 );
405 unpark_one_one_fast(
406 repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 1
407 );
408 unpark_one_hundred_fast(
409 repeats: 20, latches: 1, delay: 0, threads: 100, single_unparks: 100
410 );
411 unpark_one_fifty_then_fifty_all_fast(
412 repeats: 50, latches: 1, delay: 0, threads: 100, single_unparks: 50
413 );
414 unpark_all_one(
415 repeats: 100, latches: 1, delay: 10000, threads: 1, single_unparks: 0
416 );
417 unpark_all_hundred(
418 repeats: 100, latches: 1, delay: 10000, threads: 100, single_unparks: 0
419 );
420 unpark_one_one(
421 repeats: 10, latches: 1, delay: 10000, threads: 1, single_unparks: 1
422 );
423 unpark_one_fifty(
424 repeats: 1, latches: 1, delay: 10000, threads: 50, single_unparks: 50
425 );
426 unpark_one_fifty_then_fifty_all(
427 repeats: 2, latches: 1, delay: 10000, threads: 100, single_unparks: 50
428 );
429 hundred_unpark_all_one_fast(
430 repeats: 100, latches: 100, delay: 0, threads: 1, single_unparks: 0
431 );
432 hundred_unpark_all_one(
433 repeats: 1, latches: 100, delay: 10000, threads: 1, single_unparks: 0
434 );
435 }
436
437 fn run_parking_test(
438 num_latches: usize,
439 delay: Duration,
440 num_threads: u32,
441 num_single_unparks: u32,
442 ) {
443 let spot = ParkingSpot::default();
444
445 thread::scope(|s| {
446 let mut tests = Vec::with_capacity(num_latches);
447
448 for _ in 0..num_latches {
449 let test = Arc::new(SingleLatchTest::new(num_threads, &spot));
450 let mut threads = Vec::with_capacity(num_threads as _);
451 for _ in 0..num_threads {
452 let test = test.clone();
453 threads.push(s.spawn(move || test.run()));
454 }
455 tests.push((test, threads));
456 }
457
458 for unpark_index in 0..num_single_unparks {
459 thread::sleep(delay);
460 for (test, _) in &tests {
461 test.unpark_one(unpark_index);
462 }
463 }
464
465 for (test, threads) in tests {
466 test.finish(num_single_unparks);
467 for thread in threads {
468 thread.join().expect("Test thread panic");
469 }
470 }
471 });
472 }
473
474 struct SingleLatchTest<'a> {
475 semaphore: AtomicU32,
476 num_awake: AtomicU32,
477 num_threads: u32,
479 spot: &'a ParkingSpot,
480 }
481
482 impl<'a> SingleLatchTest<'a> {
483 pub fn new(num_threads: u32, spot: &'a ParkingSpot) -> Self {
484 Self {
485 semaphore: AtomicU32::new(0),
487 num_awake: AtomicU32::new(0),
488 num_threads,
489 spot,
490 }
491 }
492
493 pub fn run(&self) {
494 self.down();
496
497 self.num_awake.fetch_add(1, Ordering::SeqCst);
498 }
499
500 pub fn unpark_one(&self, _single_unpark_index: u32) {
501 let num_awake_before_up = self.num_awake.load(Ordering::SeqCst);
502
503 self.up();
504
505 while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 {
507 thread::yield_now();
508 }
509 }
510
511 pub fn finish(&self, num_single_unparks: u32) {
512 let mut num_threads_left =
514 self.num_threads.checked_sub(num_single_unparks).unwrap();
515
516 while num_threads_left > 0 {
519 let mut num_waiting_on_address = 0;
520 self.spot.with_lot(&self.semaphore, |thread_data| {
521 num_waiting_on_address = thread_data.num_parked();
522 });
523 assert!(num_waiting_on_address <= num_threads_left);
524
525 let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst);
526
527 let num_unparked = self.spot.notify(&self.semaphore, u32::MAX);
528 assert!(num_unparked >= num_waiting_on_address);
529 assert!(num_unparked <= num_threads_left);
530
531 while self.num_awake.load(Ordering::SeqCst)
533 != num_awake_before_unpark + num_unparked
534 {
535 thread::yield_now();
536 }
537
538 num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap();
539 }
540 assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads);
542
543 let mut num_waiting_on_address = 0;
545 self.spot.with_lot(&self.semaphore, |thread_data| {
546 num_waiting_on_address = thread_data.num_parked();
547 });
548 assert_eq!(num_waiting_on_address, 0);
549 }
550
551 pub fn down(&self) {
552 let mut old_semaphore_value = self.semaphore.fetch_sub(1, Ordering::SeqCst);
553
554 if (old_semaphore_value as i32) > 0 {
555 return;
557 }
558
559 let mut waiter = Waiter::new();
561 loop {
562 match self
563 .spot
564 .wait32(&self.semaphore, old_semaphore_value, None, &mut waiter)
565 {
566 crate::runtime::vm::WaitResult::Mismatch => {}
567 _ => break,
568 }
569 old_semaphore_value = self.semaphore.load(Ordering::SeqCst);
570 }
571 }
572
573 pub fn up(&self) {
574 let old_semaphore_value = self.semaphore.fetch_add(1, Ordering::SeqCst) as i32;
575
576 if old_semaphore_value < 0 {
578 loop {
582 match self.spot.notify(&self.semaphore, 1) {
583 1 => break,
584 0 => (),
585 i => panic!("Should not wake up {i} threads"),
586 }
587 }
588 }
589 }
590 }
591 }
592
593 #[test]
594 fn wait_with_timeout() {
595 let parking_spot = ParkingSpot::default();
596 let atomic = AtomicU64::new(0);
597
598 thread::scope(|s| {
599 const N: u64 = 5;
600 const M: u64 = if cfg!(miri) { 10 } else { 1000 };
601
602 let thread = s.spawn(|| {
603 let mut waiter = Waiter::new();
604 loop {
605 let cur = atomic.load(Ordering::SeqCst);
606 if cur == N * M {
607 break;
608 }
609 let timeout = Instant::now() + Duration::from_millis(1);
610 parking_spot.wait64(&atomic, cur, Some(timeout), &mut waiter);
611 }
612 });
613
614 let mut threads = vec![thread];
615 for _ in 0..N {
616 threads.push(s.spawn(|| {
617 for _ in 0..M {
618 atomic.fetch_add(1, Ordering::SeqCst);
619 parking_spot.notify(&atomic, 1);
620 }
621 }));
622 }
623
624 for thread in threads {
625 thread.join().unwrap();
626 }
627 });
628 }
629}