wasmtime/runtime/fiber.rs
1use crate::prelude::*;
2use crate::store::{AsStoreOpaque, Executor, StoreId, StoreOpaque};
3use crate::vm::mpk::{self, ProtectionMask};
4use crate::vm::{AlwaysMut, AsyncWasmCallState};
5use crate::{Engine, StoreContextMut};
6use anyhow::{Result, anyhow};
7use core::mem;
8use core::ops::Range;
9use core::pin::Pin;
10use core::ptr::{self, NonNull};
11use core::task::{Context, Poll};
12use wasmtime_fiber::{Fiber, FiberStack, Suspend};
13
14type WasmtimeResume = Result<NonNull<Context<'static>>>;
15type WasmtimeYield = StoreFiberYield;
16type WasmtimeComplete = Result<()>;
17type WasmtimeSuspend = Suspend<WasmtimeResume, WasmtimeYield, WasmtimeComplete>;
18type WasmtimeFiber<'a> = Fiber<'a, WasmtimeResume, WasmtimeYield, WasmtimeComplete>;
19
20/// State related to asynchronous computations stored within a `Store<T>`.
21///
22/// This structure resides inside of a `Store<T>` and is used to manage the
23/// various pieces of state associated with asynchronous computations. Chiefly
24/// this manages the `WasmtimeSuspend` pointer as well as `&mut Context<'_>`
25/// when polling futures. This serves as storage to use these pointers across a
26/// WebAssembly function boundary, for example, where the values cannot
27/// otherwise be explicitly threaded through.
28pub(crate) struct AsyncState {
29 /// The `Suspend` for the current fiber (or null if no such fiber is
30 /// running).
31 ///
32 /// This pointer is provided by the `wasmtime_fiber` crate when a fiber
33 /// first starts, but this pointer is unable to be carried through
34 /// WebAssembly frames for example. This serves as an alternative storage
35 /// location for the pointer provided by `wasmtime_fiber` within a fiber's
36 /// execution.
37 ///
38 /// This pointer is null when a fiber is not executing, but it is also null
39 /// when a `BlockingContext` is created. Note that when a fiber is suspended
40 /// it's always through a `BlockingContext` so this field is null whenever a
41 /// fiber is suspended as well. Fiber resumption will save the prior value
42 /// in a store and then set it to null, where suspension will then restore
43 /// what was previously in the store.
44 current_suspend: Option<NonNull<WasmtimeSuspend>>,
45
46 /// The `Context` pointer last provided in `Future for FiberFuture`.
47 ///
48 /// Like `current_suspend` above this is an example of a piece of context
49 /// which needs to be carried over a WebAssembly function frame which
50 /// otherwise doesn't take this as a parameter. This differs from
51 /// `current_suspend` though in that it is provided as part of a `Future`
52 /// poll operation but is "gone" after that poll operation completes. That
53 /// means that while `current_suspend` is the same for the lifetime of a
54 /// future this field is always changing.
55 ///
56 /// Like `current_suspend` though this is null either when a fiber isn't
57 /// running or when a `BlockingContext` is created (in which case this is
58 /// "take"en). That means that this is null on suspension/resumption of a
59 /// fiber.
60 ///
61 /// The value for this pointer is threaded directly through the
62 /// `WasmtimeResume` type which is how a pointer flows into this field from
63 /// a future-related poll call. This means that the `BlockingContext`
64 /// creation may take one value of a pointer here but restore another. That
65 /// would represent suspending in one call to `Future::poll` and then
66 /// resuming some time later in a different call to `Future::poll`.
67 ///
68 /// # Safety
69 ///
70 /// Note that this is a pretty unsafe field for two reasons. One is that
71 /// it's a raw pointer to a `Context` provided ephemerally to some call to
72 /// `Future::poll` on the stack. Another reason is that the lifetime
73 /// parameter of `Context` is unsafely changed to `'static` here which is
74 /// not correct. The ephemeral nature of this pointer is managed through the
75 /// take-style operations in `BlockingContext` and the `'static` lifetime is
76 /// handled by ensuring the signatures that work with `BlockingContext` all
77 /// use constrained anonymous lifetimes that are guaranteed to be shorter
78 /// than the original `Context` lifetime.
79 current_future_cx: Option<NonNull<Context<'static>>>,
80
81 /// The last fiber stack that was in use by the store.
82 ///
83 /// We use this to cache and reuse stacks as a performance optimization.
84 // TODO: With stack switching and the Component Model Async ABI, there may
85 // be multiple concurrent fibers in play; consider caching more than one
86 // stack at a time and making the number tunable via `Config`.
87 last_fiber_stack: Option<wasmtime_fiber::FiberStack>,
88}
89
90// SAFETY: it's known that `std::task::Context` is neither `Send` nor `Sync`,
91// but despite this the storage here is purely temporary in getting these
92// pointers across function frames. The actual types are not sent across threads
93// as when a store isn't polling anything the pointer values are all set to
94// `None`. Thus if a store is being sent across threads that's done because no
95// fibers are active, and once fibers are active everything will stick within
96// the same thread.
97unsafe impl Send for AsyncState {}
98unsafe impl Sync for AsyncState {}
99
100impl Default for AsyncState {
101 fn default() -> Self {
102 Self {
103 current_suspend: None,
104 current_future_cx: None,
105 last_fiber_stack: None,
106 }
107 }
108}
109
110impl AsyncState {
111 pub(crate) fn last_fiber_stack(&mut self) -> &mut Option<wasmtime_fiber::FiberStack> {
112 &mut self.last_fiber_stack
113 }
114}
115
116/// A helper structure used to block a fiber.
117///
118/// This is acquired via either `StoreContextMut::with_blocking` or
119/// `StoreOpaque::with_blocking`. This structure represents the "taken" state of
120/// pointers from a store's `AsyncState`, then modeling them as safe pointers.
121///
122/// Note that the lifetimes here are carefully controlled in instances of this
123/// structure through the construction of the `with` function.
124pub(crate) struct BlockingContext<'a, 'b> {
125 /// Pointer to `wasmtime_fiber::Suspend` which was supplied when a fiber
126 /// first started.
127 ///
128 /// When a `BlockingContext` is first created this pointer is "taken" from
129 /// the store (the store is null'd out) and then the raw pointer previously
130 /// in the store is unsafely transformed to this safe pointer. This
131 /// represents how a `BlockingContext` temporarily has access to this
132 /// suspend but when the `BlockingContext` goes away this'll make its way
133 /// back into the store.
134 suspend: &'a mut WasmtimeSuspend,
135
136 /// Pointer to the future `Context` that this fiber is being polled with.
137 ///
138 /// Similar to `suspend` above this is taken from a store when a
139 /// `BlockingContext` is created and it's restored when the
140 /// `BlockingContext` goes away. Note though that unlike `suspend`, as
141 /// alluded to in the documentation on `AsyncState`, this value changes over
142 /// time as calls to poll are made. This field becomes `None` during a
143 /// suspension because that means that the context is released and no longer
144 /// available. Upon resumption the context here is *optionally* provided.
145 /// Cancellation is a case where it isn't passed back and a re-poll is a
146 /// case where it's passed back.
147 future_cx: Option<&'a mut Context<'b>>,
148}
149
150impl<'a, 'b> BlockingContext<'a, 'b> {
151 /// Method to go from a `store` provided (which internally contains a
152 /// `StoreOpaque`) to a `BlockingContext`.
153 ///
154 /// This function will "take" context from `store`'s `AsyncState` field. It
155 /// will then construct a `BlockingContext` and yield it to the closure `f`
156 /// provided. The closure can then block on futures, suspend, etc.
157 ///
158 /// Upon return of the closure `f` the state from `BlockingContext` is
159 /// restored within the store. The return value of `f` is the return value
160 /// of this function.
161 ///
162 /// Note that the `store` must be provided to this function as an argument
163 /// to originally acquire state from `AsyncState`. This store is then
164 /// supplied back to the closure `f` provided here so the store can be used
165 /// to construct an asynchronous or blocking computation which the
166 /// `BlockingContext` tries to block on.
167 ///
168 /// # Safety
169 ///
170 /// This method is safe to call at any time, but it's worth noting that the
171 /// safety of this function relies on the signature of this function.
172 /// Notably the lifetime parameters of `BlockingContext` in the `f` closure
173 /// here must be anonymous. That ensures that the `BlockingContext` that
174 /// callers get access to cannot be persisted outside of that closure call
175 /// and everything is scoped to just the closure `f` provided with nothing
176 /// escaping.
177 fn with<S, R>(store: &mut S, f: impl FnOnce(&mut S, &mut BlockingContext<'_, '_>) -> R) -> R
178 where
179 S: AsStoreOpaque,
180 {
181 let opaque = store.as_store_opaque();
182
183 let state = opaque.fiber_async_state_mut();
184
185 // SAFETY: this is taking pointers from `AsyncState` and then unsafely
186 // turning them into safe references. Lifetime-wise this should be safe
187 // because the inferred lifetimes for all these pointers is constrained
188 // by the signature of `f` provided here. That ensures that everything
189 // is scoped purely to the closure `f` and nothing should be persisted
190 // outside of this function call. This, for example, ensures that the
191 // `Context<'static>` doesn't leak out, it's only with an anonymous
192 // lifetime that's forcibly shorter.
193 //
194 // Provenance-wise this should be safe as if these fields in the store
195 // are non-null then the pointers are provided up-the-stack on this
196 // fiber and for this fiber. The "take" pattern here ensures that if
197 // this `BlockingContext` context acquires the pointers then there are
198 // no other instances of these pointers in use anywhere else.
199 let future_cx = unsafe { Some(state.current_future_cx.take().unwrap().as_mut()) };
200 let suspend = unsafe { state.current_suspend.take().unwrap().as_mut() };
201
202 let mut reset = ResetBlockingContext {
203 store,
204 cx: BlockingContext { future_cx, suspend },
205 };
206 return f(&mut reset.store, &mut reset.cx);
207
208 struct ResetBlockingContext<'a, 'b, S: AsStoreOpaque> {
209 store: &'a mut S,
210 cx: BlockingContext<'a, 'b>,
211 }
212
213 impl<S: AsStoreOpaque> Drop for ResetBlockingContext<'_, '_, S> {
214 fn drop(&mut self) {
215 let store = self.store.as_store_opaque();
216 let state = store.fiber_async_state_mut();
217
218 debug_assert!(state.current_future_cx.is_none());
219 debug_assert!(state.current_suspend.is_none());
220 state.current_suspend = Some(NonNull::from(&mut *self.cx.suspend));
221
222 if let Some(cx) = &mut self.cx.future_cx {
223 // SAFETY: while this is changing the lifetime to `'static`
224 // it should never be used while it's `'static` given this
225 // `BlockingContext` abstraction.
226 state.current_future_cx =
227 Some(NonNull::from(unsafe { change_context_lifetime(cx) }));
228 }
229 }
230 }
231 }
232
233 /// Blocks on the asynchronous computation represented by `future` and
234 /// produces the result here, in-line.
235 ///
236 /// This function is designed to only work when it's currently executing on
237 /// a native fiber. This fiber provides the ability for us to handle the
238 /// future's `Pending` state as "jump back to whomever called the fiber in
239 /// an asynchronous fashion and propagate `Pending`". This tight coupling
240 /// with `on_fiber` below is what powers the asynchronicity of calling wasm.
241 ///
242 /// This function takes a `future` and will (appear to) synchronously wait
243 /// on the result. While this function is executing it will fiber switch
244 /// to-and-from the original frame calling `on_fiber` which should be a
245 /// guarantee due to how async stores are configured.
246 ///
247 /// The return value here is either the output of the future `T`, or a trap
248 /// which represents that the asynchronous computation was cancelled. It is
249 /// not recommended to catch the trap and try to keep executing wasm, so
250 /// we've tried to liberally document this.
251 ///
252 /// Note that this function suspends (if needed) with
253 /// `StoreFiberYield::KeepStore`, indicating that the store must not be used
254 /// (and that no other fibers may be resumed) until this fiber resumes.
255 /// Therefore, it is not appropriate for use in e.g. guest calls to
256 /// async-lowered imports implemented as host functions, since it will
257 /// prevent any other tasks from being run. Use `Instance::suspend` to
258 /// suspend and release the store to allow other tasks to run before this
259 /// fiber is resumed.
260 ///
261 /// # Return Value
262 ///
263 /// A return value of `Ok(value)` means that the future completed with
264 /// `value`. A return value of `Err(e)` means that the fiber and its future
265 /// have been cancelled and the fiber needs to exit and complete ASAP.
266 ///
267 /// # Safety
268 ///
269 /// This function is safe to call at any time but relies on a trait bound
270 /// that is manually placed here the compiler does not otherwise require.
271 /// Notably the `Send` bound on the future provided here is not required
272 /// insofar as things compile without that. The purpose of this, however, is
273 /// to make the `unsafe impl Send for StoreFiber` more safe. The `future`
274 /// here is state that is stored on the stack during the suspension of this
275 /// fiber and is otherwise not visible to the compiler. By having a `Send`
276 /// bound here it ensures that the future doesn't have things like `Rc` or
277 /// similar pointing into thread locals which would not be sound if this
278 /// fiber crosses threads.
279 pub(crate) fn block_on<F>(&mut self, future: F) -> Result<F::Output>
280 where
281 F: Future + Send,
282 {
283 let mut future = core::pin::pin!(future);
284 loop {
285 match future.as_mut().poll(self.future_cx.as_mut().unwrap()) {
286 Poll::Ready(v) => break Ok(v),
287 Poll::Pending => self.suspend(StoreFiberYield::KeepStore)?,
288 }
289 }
290 }
291
292 /// Suspend this fiber with `yield_` as the reason.
293 ///
294 /// This function will suspend the current fiber and only return after the
295 /// fiber has resumed. This function return `Ok(())` if the fiber was
296 /// resumed to be completed, and `Err(e)` indicates that the fiber has been
297 /// cancelled and needs to exit/complete ASAP.
298 pub(crate) fn suspend(&mut self, yield_: StoreFiberYield) -> Result<()> {
299 // Over a suspension point we're guaranteed that the `Context` provided
300 // here is no longer valid, so discard it. If we're supposed to be able
301 // to poll afterwards this will be given back as part of the resume
302 // value given back.
303 self.future_cx.take();
304
305 let mut new_future_cx: NonNull<Context<'static>> = self.suspend.suspend(yield_)?;
306
307 // SAFETY: this function is unsafe as we're doing "funky" things to the
308 // `new_future_cx` we have been given. The safety here relies on the
309 // fact that the lifetimes of `BlockingContext` are all "smaller" than
310 // the original `Context` itself, and that should be guaranteed through
311 // the exclusive constructor of this type `BlockingContext::with`.
312 unsafe {
313 self.future_cx = Some(change_context_lifetime(new_future_cx.as_mut()));
314 }
315 Ok(())
316 }
317}
318
319impl<T> StoreContextMut<'_, T> {
320 /// Blocks on the future computed by `f`.
321 ///
322 /// # Panics
323 ///
324 /// Panics if this is invoked outside the context of a fiber.
325 pub(crate) fn block_on<R>(
326 self,
327 f: impl FnOnce(StoreContextMut<'_, T>) -> Pin<Box<dyn Future<Output = R> + Send + '_>>,
328 ) -> Result<R> {
329 self.with_blocking(|store, cx| cx.block_on(f(store).as_mut()))
330 }
331
332 /// Creates a `BlockingContext` suitable for blocking on futures or
333 /// suspending the current fiber.
334 ///
335 /// # Panics
336 ///
337 /// Panics if this is invoked outside the context of a fiber.
338 pub(crate) fn with_blocking<R>(
339 self,
340 f: impl FnOnce(StoreContextMut<'_, T>, &mut BlockingContext<'_, '_>) -> R,
341 ) -> R {
342 BlockingContext::with(self.0, |store, cx| f(StoreContextMut(store), cx))
343 }
344}
345
346impl StoreOpaque {
347 /// Creates a `BlockingContext` suitable for blocking on futures or
348 /// suspending the current fiber.
349 ///
350 /// # Panics
351 ///
352 /// Panics if this is invoked outside the context of a fiber.
353 pub(crate) fn with_blocking<R>(
354 &mut self,
355 f: impl FnOnce(&mut Self, &mut BlockingContext<'_, '_>) -> R,
356 ) -> R {
357 BlockingContext::with(self, |store, cx| f(store, cx))
358 }
359
360 /// Returns whether `block_on` will succeed or panic.
361 #[cfg(feature = "call-hook")]
362 pub(crate) fn can_block(&mut self) -> bool {
363 self.fiber_async_state_mut().current_future_cx.is_some()
364 }
365}
366
367/// Indicates whether or not a fiber needs to retain exclusive access to its
368/// store across a suspend/resume interval.
369pub(crate) enum StoreFiberYield {
370 /// Indicates the fiber needs to retain exclusive access, meaning the store
371 /// should not be used outside of the fiber until after the fiber either
372 /// suspends with `ReleaseStore` or resolves.
373 KeepStore,
374 /// Indicates the fiber does _not_ need exclusive access across the
375 /// suspend/resume interval, meaning the store may be used as needed until
376 /// the fiber is resumed.
377 #[cfg(feature = "component-model-async")]
378 ReleaseStore,
379}
380
381pub(crate) struct StoreFiber<'a> {
382 /// The raw `wasmtime_fiber::Fiber`.
383 ///
384 /// Note that using `StoreFiberYield` as the `Yield` type parameter allows
385 /// the fiber to indicate whether it needs exclusive access to the store
386 /// across suspend points (in which case it will pass `KeepStore` when
387 /// suspending , meaning the store must not be used at all until the fiber
388 /// is resumed again) or whether it is giving up exclusive access (in which
389 /// case it will pass `ReleaseStore` when yielding, meaning exclusive access
390 /// may be given to another fiber that runs concurrently.
391 ///
392 /// Note also that every `StoreFiber` is implicitly granted exclusive access
393 /// to the store when it is resumed.
394 fiber: Option<AlwaysMut<RawFiber<'a>>>,
395 /// See `FiberResumeState`
396 state: Option<AlwaysMut<FiberResumeState>>,
397 /// The Wasmtime `Engine` to which this fiber belongs.
398 engine: Engine,
399 /// The id of the store with which this fiber was created.
400 ///
401 /// Any attempt to resume a fiber with a different store than the one with
402 /// which it was created will panic.
403 id: StoreId,
404}
405
406struct RawFiber<'a>(WasmtimeFiber<'a>);
407
408impl<'a> StoreFiber<'a> {
409 /// Convenience method to peel off some layers of abstraction around the raw
410 /// `wasmtime_fiber::Fiber`.
411 fn fiber(&mut self) -> Option<&mut WasmtimeFiber<'a>> {
412 Some(&mut self.fiber.as_mut()?.get_mut().0)
413 }
414
415 /// Convenience method take the internal fiber and consume it, yielding its
416 /// original stack.
417 fn take_fiber_stack(&mut self) -> Option<FiberStack> {
418 self.fiber.take().map(|f| f.into_inner().0.into_stack())
419 }
420
421 pub(crate) fn dispose(&mut self, store: &mut StoreOpaque) {
422 if let Some(fiber) = self.fiber() {
423 if !fiber.done() {
424 let result = resume_fiber(store, self, Err(anyhow!("future dropped")));
425 debug_assert!(result.is_ok());
426 }
427 }
428 }
429}
430
431// Note that this implementation will panic if the fiber is in-progress, which
432// will abort the process if there is already a panic being unwound. That
433// should only happen if we failed to call `StoreFiber::dispose` on the
434// in-progress fiber prior to dropping it, which indicates a bug in this crate
435// which must be fixed.
436impl Drop for StoreFiber<'_> {
437 fn drop(&mut self) {
438 if self.fiber.is_none() {
439 return;
440 }
441
442 assert!(
443 self.fiber().unwrap().done(),
444 "attempted to drop in-progress fiber without first calling `StoreFiber::dispose`"
445 );
446
447 self.state.take().unwrap().into_inner().dispose();
448
449 unsafe {
450 let stack = self.take_fiber_stack().unwrap();
451 self.engine.allocator().deallocate_fiber_stack(stack);
452 }
453 }
454}
455
456// This is surely the most dangerous `unsafe impl Send` in the entire
457// crate. There are two members in `StoreFiber` which cause it to not be
458// `Send`. One is `suspend` and is entirely uninteresting. This is just used to
459// manage `Suspend` when resuming, and requires raw pointers to get it to happen
460// easily. Nothing too weird about the `Send`-ness, values aren't actually
461// crossing threads.
462//
463// The really interesting piece is `fiber`. Now the "fiber" here is actual
464// honest-to-god Rust code which we're moving around. What we're doing is the
465// equivalent of moving our thread's stack to another OS thread. Turns out we,
466// in general, have no idea what's on the stack and would generally have no way
467// to verify that this is actually safe to do!
468//
469// Thankfully, though, Wasmtime has the power. Without being glib it's actually
470// worth examining what's on the stack. It's unfortunately not super-local to
471// this function itself. Our closure to `Fiber::new` runs `func`, which is given
472// to us from the outside. Thankfully, though, we have tight control over
473// this. Usage of `on_fiber` or `Instance::resume_fiber` is typically done
474// *just* before entering WebAssembly itself, so we'll have a few stack frames
475// of Rust code (all in Wasmtime itself) before we enter wasm.
476//
477// Once we've entered wasm, well then we have a whole bunch of wasm frames on
478// the stack. We've got this nifty thing called Cranelift, though, which allows
479// us to also have complete control over everything on the stack!
480//
481// Finally, when wasm switches back to the fiber's starting pointer (this future
482// we're returning) then it means wasm has reentered Rust. Suspension can only
483// happen via either `block_on` or `Instance::suspend`. This, conveniently, also
484// happens entirely in Wasmtime controlled code!
485//
486// There's an extremely important point that should be called out here.
487// User-provided futures **are not on the stack** during suspension points. This
488// is extremely crucial because we in general cannot reason about Send/Sync for
489// stack-local variables since rustc doesn't analyze them at all. With our
490// construction, though, we are guaranteed that Wasmtime owns all stack frames
491// between the stack of a fiber and when the fiber suspends (and it could move
492// across threads). At this time the only user-provided piece of data on the
493// stack is the future itself given to us. Lo-and-behold as you might notice the
494// future is required to be `Send`!
495//
496// What this all boils down to is that we, as the authors of Wasmtime, need to
497// be extremely careful that on the async fiber stack we only store Send
498// things. For example we can't start using `Rc` willy nilly by accident and
499// leave a copy in TLS somewhere. (similarly we have to be ready for TLS to
500// change while we're executing wasm code between suspension points).
501//
502// While somewhat onerous it shouldn't be too too hard (the TLS bit is the
503// hardest bit so far). This does mean, though, that no user should ever have to
504// worry about the `Send`-ness of Wasmtime. If rustc says it's ok, then it's ok.
505//
506// With all that in mind we unsafely assert here that Wasmtime is correct. We
507// declare the fiber as only containing Send data on its stack, despite not
508// knowing for sure at compile time that this is correct. That's what `unsafe`
509// in Rust is all about, though, right?
510unsafe impl Send for RawFiber<'_> {}
511
512/// State of the world when a fiber last suspended.
513///
514/// This structure represents global state that a fiber clobbers during its
515/// execution. For example TLS variables are updated, system resources like MPK
516/// masks are updated, etc. The purpose of this structure is to track all of
517/// this state and appropriately save/restore it around fiber suspension points.
518struct FiberResumeState {
519 /// Saved list of `CallThreadState` activations that are stored on a fiber
520 /// stack.
521 ///
522 /// This is a linked list that references stack-stored nodes on the fiber
523 /// stack that is currently suspended. The `AsyncWasmCallState` type
524 /// documents this more thoroughly but the general gist is that when we this
525 /// fiber is resumed this linked list needs to be pushed on to the current
526 /// thread's linked list of activations.
527 tls: crate::runtime::vm::AsyncWasmCallState,
528
529 /// Saved MPK protection mask, if enabled.
530 ///
531 /// When MPK is enabled then executing WebAssembly will modify the
532 /// processor's current mask of addressable protection keys. This means that
533 /// our current state may get clobbered when a fiber suspends. To ensure
534 /// that this function preserves context it will, when MPK is enabled, save
535 /// the current mask when this function is called and then restore the mask
536 /// when the function returns (aka the fiber suspends).
537 mpk: Option<ProtectionMask>,
538
539 /// The current wasm stack limit, if in use.
540 ///
541 /// This field stores the old of `VMStoreContext::stack_limit` that this
542 /// fiber should be using during its execution. This is saved/restored when
543 /// a fiber is suspended/resumed to ensure that when there are multiple
544 /// fibers within the store they all maintain an appropriate fiber-relative
545 /// stack limit.
546 stack_limit: usize,
547
548 /// The executor (e.g. the Pulley interpreter state) belonging to this
549 /// fiber.
550 ///
551 /// This is swapped with `StoreOpaque::executor` whenever this fiber is
552 /// resumed, suspended, or resolved.
553 executor: Executor,
554}
555
556impl FiberResumeState {
557 unsafe fn replace(
558 self,
559 store: &mut StoreOpaque,
560 fiber: &mut StoreFiber<'_>,
561 ) -> PriorFiberResumeState {
562 let tls = unsafe { self.tls.push() };
563 let mpk = swap_mpk_states(self.mpk);
564 let async_guard_range = fiber
565 .fiber()
566 .unwrap()
567 .stack()
568 .guard_range()
569 .unwrap_or(ptr::null_mut()..ptr::null_mut());
570 let mut executor = self.executor;
571 store.swap_executor(&mut executor);
572 PriorFiberResumeState {
573 tls,
574 mpk,
575 executor,
576 stack_limit: store.replace_stack_limit(self.stack_limit),
577 async_guard_range: store.replace_async_guard_range(async_guard_range),
578
579 // The current suspend/future_cx are always null upon resumption, so
580 // insert null. Save the old values through to get preserved across
581 // this resume/suspend.
582 current_suspend: store.replace_current_suspend(None),
583 current_future_cx: store.replace_current_future_cx(None),
584 }
585 }
586
587 fn dispose(self) {
588 self.tls.assert_null();
589 }
590}
591
592impl StoreOpaque {
593 /// Helper function to swap the `stack_limit` field in the `VMStoreContext`
594 /// within this store.
595 fn replace_stack_limit(&mut self, stack_limit: usize) -> usize {
596 mem::replace(
597 &mut self.vm_store_context_mut().stack_limit.get_mut(),
598 stack_limit,
599 )
600 }
601
602 /// Helper function to swap the `async_guard_range` field in the `VMStoreContext`
603 /// within this store.
604 fn replace_async_guard_range(&mut self, range: Range<*mut u8>) -> Range<*mut u8> {
605 mem::replace(&mut self.vm_store_context_mut().async_guard_range, range)
606 }
607
608 fn replace_current_suspend(
609 &mut self,
610 ptr: Option<NonNull<WasmtimeSuspend>>,
611 ) -> Option<NonNull<WasmtimeSuspend>> {
612 mem::replace(&mut self.fiber_async_state_mut().current_suspend, ptr)
613 }
614
615 fn replace_current_future_cx(
616 &mut self,
617 ptr: Option<NonNull<Context<'static>>>,
618 ) -> Option<NonNull<Context<'static>>> {
619 mem::replace(&mut self.fiber_async_state_mut().current_future_cx, ptr)
620 }
621}
622
623struct PriorFiberResumeState {
624 tls: crate::runtime::vm::PreviousAsyncWasmCallState,
625 mpk: Option<ProtectionMask>,
626 stack_limit: usize,
627 async_guard_range: Range<*mut u8>,
628 current_suspend: Option<NonNull<WasmtimeSuspend>>,
629 current_future_cx: Option<NonNull<Context<'static>>>,
630 executor: Executor,
631}
632
633impl PriorFiberResumeState {
634 unsafe fn replace(self, store: &mut StoreOpaque) -> FiberResumeState {
635 let tls = unsafe { self.tls.restore() };
636 let mpk = swap_mpk_states(self.mpk);
637 // No need to save `_my_guard` since we can re-infer it from the fiber
638 // that this state is attached to.
639 let _my_guard = store.replace_async_guard_range(self.async_guard_range);
640
641 // Restore the previous values of current_{suspend,future_cx} but we
642 // should be guaranteed that the prior values are null, so double-check
643 // that here.
644 let prev = store.replace_current_suspend(self.current_suspend);
645 assert!(prev.is_none());
646 let prev = store.replace_current_future_cx(self.current_future_cx);
647 assert!(prev.is_none());
648
649 let mut executor = self.executor;
650 store.swap_executor(&mut executor);
651
652 FiberResumeState {
653 tls,
654 mpk,
655 executor,
656 stack_limit: store.replace_stack_limit(self.stack_limit),
657 }
658 }
659}
660
661fn swap_mpk_states(mask: Option<ProtectionMask>) -> Option<ProtectionMask> {
662 mask.map(|mask| {
663 let current = mpk::current_mask();
664 mpk::allow(mask);
665 current
666 })
667}
668
669/// Resume the specified fiber, granting it exclusive access to the store with
670/// which it was created.
671///
672/// This will return `Ok(result)` if the fiber resolved, where `result` is the
673/// returned value; it will return `Err(yield_)` if the fiber suspended, where
674/// `yield_` indicates whether it released access to the store or not. See
675/// `StoreFiber::fiber` for details.
676fn resume_fiber<'a>(
677 store: &mut StoreOpaque,
678 fiber: &mut StoreFiber<'a>,
679 result: WasmtimeResume,
680) -> Result<WasmtimeComplete, StoreFiberYield> {
681 assert_eq!(store.id(), fiber.id);
682
683 struct Restore<'a, 'b> {
684 store: &'b mut StoreOpaque,
685 fiber: &'b mut StoreFiber<'a>,
686 state: Option<PriorFiberResumeState>,
687 }
688
689 impl Drop for Restore<'_, '_> {
690 fn drop(&mut self) {
691 self.fiber.state =
692 Some(unsafe { self.state.take().unwrap().replace(self.store).into() });
693 }
694 }
695 let result = unsafe {
696 let prev = fiber
697 .state
698 .take()
699 .unwrap()
700 .into_inner()
701 .replace(store, fiber);
702 let restore = Restore {
703 store,
704 fiber,
705 state: Some(prev),
706 };
707 restore.fiber.fiber().unwrap().resume(result)
708 };
709
710 match &result {
711 // The fiber has finished, so recycle its stack by disposing of the
712 // underlying fiber itself.
713 Ok(_) => {
714 if let Some(stack) = fiber.take_fiber_stack() {
715 store.deallocate_fiber_stack(stack);
716 }
717 }
718
719 // The fiber has not yet finished, so it stays as-is.
720 Err(_) => {
721 // If `Err` is returned that means the fiber suspended, so we
722 // propagate that here.
723 //
724 // An additional safety check is performed when leaving this
725 // function to help bolster the guarantees of `unsafe impl Send`
726 // above. Notably this future may get re-polled on a different
727 // thread. Wasmtime's thread-local state points to the stack,
728 // however, meaning that it would be incorrect to leave a pointer in
729 // TLS when this function returns. This function performs a runtime
730 // assert to verify that this is the case, notably that the one TLS
731 // pointer Wasmtime uses is not pointing anywhere within the
732 // stack. If it is then that's a bug indicating that TLS management
733 // in Wasmtime is incorrect.
734 if let Some(range) = fiber.fiber().unwrap().stack().range() {
735 AsyncWasmCallState::assert_current_state_not_in_range(range);
736 }
737 }
738 }
739
740 result
741}
742
743/// Create a new `StoreFiber` which runs the specified closure.
744///
745/// # Safety
746///
747/// The returned `StoreFiber<'a>` structure is unconditionally `Send` but the
748/// send-ness is actually a function of `S`. When `S` is statically known to be
749/// `Send` then use the safe [`make_fiber`] function.
750pub(crate) unsafe fn make_fiber_unchecked<'a, S>(
751 store: &mut S,
752 fun: impl FnOnce(&mut S) -> Result<()> + Send + Sync + 'a,
753) -> Result<StoreFiber<'a>>
754where
755 S: AsStoreOpaque + ?Sized + 'a,
756{
757 let opaque = store.as_store_opaque();
758 let engine = opaque.engine().clone();
759 let executor = Executor::new(&engine);
760 let id = opaque.id();
761 let stack = opaque.allocate_fiber_stack()?;
762 let track_pkey_context_switch = opaque.has_pkey();
763 let store = &raw mut *store;
764 let fiber = Fiber::new(stack, move |result: WasmtimeResume, suspend| {
765 let future_cx = match result {
766 Ok(cx) => cx,
767 // Cancelled before we started? Just return.
768 Err(_) => return Ok(()),
769 };
770
771 // SAFETY: This fiber will only be resumed using `resume_fiber`, which
772 // takes a `&mut StoreOpaque` parameter and has given us exclusive
773 // access to the store until we exit or yield it back to the resumer.
774 let store_ref = unsafe { &mut *store };
775
776 // It should be a guarantee that the store has null pointers here upon
777 // starting a fiber, so now's the time to fill in the pointers now that
778 // the fiber is running and `future_cx` and `suspend` are both in scope.
779 // Note that these pointers are removed when this function returns as
780 // that's when they fall out of scope.
781 let async_state = store_ref.as_store_opaque().fiber_async_state_mut();
782 assert!(async_state.current_suspend.is_none());
783 assert!(async_state.current_future_cx.is_none());
784 async_state.current_suspend = Some(NonNull::from(suspend));
785 async_state.current_future_cx = Some(future_cx);
786
787 struct ResetCurrentPointersToNull<'a, S>(&'a mut S)
788 where
789 S: AsStoreOpaque + ?Sized;
790
791 impl<S> Drop for ResetCurrentPointersToNull<'_, S>
792 where
793 S: AsStoreOpaque + ?Sized,
794 {
795 fn drop(&mut self) {
796 let state = self.0.as_store_opaque().fiber_async_state_mut();
797
798 // Double-check that the current suspension isn't null (it
799 // should be what's in this closure). Note though that we
800 // can't check `current_future_cx` because it may either be
801 // here or not be here depending on whether this was
802 // cancelled or not.
803 debug_assert!(state.current_suspend.is_some());
804
805 state.current_suspend = None;
806 state.current_future_cx = None;
807 }
808 }
809 let reset = ResetCurrentPointersToNull(store_ref);
810
811 fun(reset.0)
812 })?;
813 Ok(StoreFiber {
814 state: Some(
815 FiberResumeState {
816 tls: crate::runtime::vm::AsyncWasmCallState::new(),
817 mpk: if track_pkey_context_switch {
818 Some(ProtectionMask::all())
819 } else {
820 None
821 },
822 stack_limit: usize::MAX,
823 executor,
824 }
825 .into(),
826 ),
827 engine,
828 id,
829 fiber: Some(RawFiber(fiber).into()),
830 })
831}
832
833/// Safe wrapper around [`make_fiber_unchecked`] which requires that `S` is
834/// `Send`.
835#[cfg(feature = "component-model-async")]
836pub(crate) fn make_fiber<'a, S>(
837 store: &mut S,
838 fun: impl FnOnce(&mut S) -> Result<()> + Send + Sync + 'a,
839) -> Result<StoreFiber<'a>>
840where
841 S: AsStoreOpaque + Send + ?Sized + 'a,
842{
843 unsafe { make_fiber_unchecked(store, fun) }
844}
845
846/// Run the specified function on a newly-created fiber and `.await` its
847/// completion.
848pub(crate) async fn on_fiber<S, R>(
849 store: &mut S,
850 func: impl FnOnce(&mut S) -> R + Send + Sync,
851) -> Result<R>
852where
853 S: AsStoreOpaque + ?Sized,
854 R: Send + Sync,
855{
856 let opaque = store.as_store_opaque();
857 let config = opaque.engine().config();
858 debug_assert!(opaque.async_support());
859 debug_assert!(config.async_stack_size > 0);
860
861 let mut result = None;
862
863 // SAFETY: the `StoreFiber` returned by `make_fiber_unchecked` is `Send`
864 // despite we not actually knowing here whether `S` is `Send` or not. That
865 // is safe here, however, because this function is already conditionally
866 // `Send` based on `S`. Additionally `fiber` doesn't escape this function,
867 // so the future-of-this-function is still correctly `Send`-vs-not.
868 let fiber = unsafe {
869 make_fiber_unchecked(store, |store| {
870 result = Some(func(store));
871 Ok(())
872 })?
873 };
874
875 {
876 let fiber = FiberFuture {
877 store: store.as_store_opaque(),
878 fiber: Some(fiber),
879 #[cfg(feature = "component-model-async")]
880 on_release: OnRelease::ReturnPending,
881 }
882 .await
883 .unwrap();
884
885 debug_assert!(fiber.is_none());
886 }
887
888 Ok(result.unwrap())
889}
890
891/// Run the specified fiber until it either suspends with
892/// `StoreFiberYield::ReleaseStore` or resolves.
893///
894/// This will return `Some` if the fiber suspends with
895/// `StoreFiberYield::ReleaseStore` or else `None` if it resolves.
896#[cfg(feature = "component-model-async")]
897pub(crate) async fn resolve_or_release<'a>(
898 store: &mut StoreOpaque,
899 fiber: StoreFiber<'a>,
900) -> Result<Option<StoreFiber<'a>>> {
901 FiberFuture {
902 store,
903 fiber: Some(fiber),
904 on_release: OnRelease::ReturnReady,
905 }
906 .await
907}
908
909/// Tells a `FiberFuture` what to do if `poll_fiber` returns
910/// `Err(StoreFiberYield::ReleaseStore)`.
911#[cfg(feature = "component-model-async")]
912enum OnRelease {
913 /// Return `Poll::Pending` from `FiberFuture::poll`
914 ReturnPending,
915 /// Return `Poll::Ready` from `FiberFuture::poll`, handing ownership of the
916 /// `StoreFiber` to the caller.
917 ReturnReady,
918}
919
920/// A `Future` implementation for running a `StoreFiber` to completion, giving
921/// it exclusive access to its store until it resolves.
922struct FiberFuture<'a, 'b> {
923 store: &'a mut StoreOpaque,
924 fiber: Option<StoreFiber<'b>>,
925 #[cfg(feature = "component-model-async")]
926 on_release: OnRelease,
927}
928
929impl<'b> Future for FiberFuture<'_, 'b> {
930 type Output = Result<Option<StoreFiber<'b>>>;
931
932 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
933 let me = self.get_mut();
934
935 // SAFETY: We need to carry over this `cx` into our fiber's runtime for
936 // when it tries to poll sub-futures that are created. Doing this must
937 // be done unsafely, however, since `cx` is only alive for this one
938 // singular function call. Here we do a `transmute` to extend the
939 // lifetime of `Context` so it can be stored in our `Store`, and then we
940 // replace the current polling context with this one.
941 //
942 // The safety of this extension relies on never actually using
943 // `Context<'static>` with `'static` actually there, which should be
944 // satisfied by the users of this in the `BlockingContext` structure
945 // where the lifetime parameters there are always more constrained than
946 // they are here.
947 let cx: &mut Context<'static> = unsafe { change_context_lifetime(cx) };
948 let cx = NonNull::from(cx);
949
950 match resume_fiber(me.store, me.fiber.as_mut().unwrap(), Ok(cx)) {
951 Ok(Ok(())) => Poll::Ready(Ok(None)),
952 Ok(Err(e)) => Poll::Ready(Err(e)),
953 Err(StoreFiberYield::KeepStore) => Poll::Pending,
954 #[cfg(feature = "component-model-async")]
955 Err(StoreFiberYield::ReleaseStore) => match &me.on_release {
956 OnRelease::ReturnPending => Poll::Pending,
957 OnRelease::ReturnReady => Poll::Ready(Ok(me.fiber.take())),
958 },
959 }
960 }
961}
962
963impl Drop for FiberFuture<'_, '_> {
964 fn drop(&mut self) {
965 if let Some(fiber) = &mut self.fiber {
966 fiber.dispose(self.store);
967 }
968 }
969}
970
971/// Changes the lifetime `'l` in `Context<'l>` to something else.
972///
973/// # Safety
974///
975/// Not a safe operation. Requires external knowledge about how the pointer is
976/// being used to determine whether it's actually safe or not. See docs on
977/// callers of this function. The purpose of this is to scope the `transmute` to
978/// as small an operation as possible.
979unsafe fn change_context_lifetime<'a, 'b>(cx: &'a mut Context<'_>) -> &'a mut Context<'b> {
980 // SAFETY: See the function documentation, this is not safe in general.
981 unsafe { mem::transmute::<&mut Context<'_>, &mut Context<'b>>(cx) }
982}