wasmcloud_runtime/component/
keyvalue.rs

1use super::{new_store, Ctx, Handler, Instance, ReplacedInstanceTarget};
2
3use crate::capability::keyvalue::{atomics, batch, store};
4use crate::capability::wrpc;
5
6use anyhow::Context;
7use bytes::Bytes;
8use std::sync::Arc;
9use tracing::{debug, instrument, trace};
10use wasmtime::component::Resource;
11
12type Result<T, E = store::Error> = core::result::Result<T, E>;
13
14pub mod keyvalue_watcher_bindings {
15    wasmtime::component::bindgen!({
16        world: "watcher",
17        imports: { default: async | trappable | tracing },
18        exports: { default: async | trappable | tracing },
19        with: {
20            "wasi:keyvalue/store" : crate::capability::keyvalue::store,
21        }
22    });
23}
24
25impl From<wrpc::wrpc::keyvalue::store::Error> for store::Error {
26    fn from(value: wrpc::wrpc::keyvalue::store::Error) -> Self {
27        match value {
28            wrpc::wrpc::keyvalue::store::Error::NoSuchStore => Self::NoSuchStore,
29            wrpc::wrpc::keyvalue::store::Error::AccessDenied => Self::AccessDenied,
30            wrpc::wrpc::keyvalue::store::Error::Other(other) => Self::Other(other),
31        }
32    }
33}
34
35impl<H> atomics::Host for Ctx<H>
36where
37    H: Handler,
38{
39    #[instrument(level = "debug", skip_all)]
40    async fn increment(
41        &mut self,
42        bucket: Resource<store::Bucket>,
43        key: String,
44        delta: u64,
45    ) -> anyhow::Result<Result<u64>> {
46        self.attach_parent_context();
47        let bucket = self.table.get(&bucket).context("failed to get bucket")?;
48        match wrpc::wrpc::keyvalue::atomics::increment(
49            &self.handler,
50            Some(ReplacedInstanceTarget::KeyvalueAtomics),
51            bucket,
52            &key,
53            delta,
54        )
55        .await?
56        {
57            Ok(n) => Ok(Ok(n)),
58            Err(err) => Ok(Err(err.into())),
59        }
60    }
61}
62
63impl<H> store::Host for Ctx<H>
64where
65    H: Handler,
66{
67    #[instrument]
68    async fn open(&mut self, name: String) -> anyhow::Result<Result<Resource<store::Bucket>>> {
69        self.attach_parent_context();
70        let bucket = self
71            .table
72            .push(Arc::from(name))
73            .context("failed to open bucket")?;
74        Ok(Ok(bucket))
75    }
76}
77
78impl<H> batch::Host for Ctx<H>
79where
80    H: Handler,
81{
82    #[instrument(skip_all, fields(num_keys = keys.len()))]
83    async fn get_many(
84        &mut self,
85        bucket: Resource<store::Bucket>,
86        keys: Vec<String>,
87    ) -> anyhow::Result<Result<Vec<Option<(String, Vec<u8>)>>>> {
88        self.attach_parent_context();
89        let bucket = self.table.get(&bucket).context("failed to get bucket")?;
90        // NOTE(thomastaylor312): I don't like allocating a new vec, but I need borrowed strings to
91        // have the right type
92        let keys = keys.iter().map(String::as_str).collect::<Vec<_>>();
93
94        match wrpc::wrpc::keyvalue::batch::get_many(
95            &self.handler,
96            Some(ReplacedInstanceTarget::KeyvalueBatch),
97            bucket,
98            &keys,
99        )
100        .await?
101        {
102            Ok(res) => Ok(Ok(res
103                .into_iter()
104                .map(|opt| opt.map(|(k, v)| (k, Vec::from(v))))
105                .collect())),
106            Err(err) => Err(err.into()),
107        }
108    }
109
110    #[instrument(skip_all, fields(num_entries = entries.len()))]
111    async fn set_many(
112        &mut self,
113        bucket: Resource<store::Bucket>,
114        entries: Vec<(String, Vec<u8>)>,
115    ) -> anyhow::Result<Result<()>> {
116        self.attach_parent_context();
117        let bucket = self.table.get(&bucket).context("failed to get bucket")?;
118        let entries = entries
119            .into_iter()
120            .map(|(k, v)| (k, Bytes::from(v)))
121            .collect::<Vec<_>>();
122        let massaged = entries
123            .iter()
124            .map(|(k, v)| (k.as_str(), v))
125            .collect::<Vec<_>>();
126        match wrpc::wrpc::keyvalue::batch::set_many(
127            &self.handler,
128            Some(ReplacedInstanceTarget::KeyvalueBatch),
129            bucket,
130            &massaged,
131        )
132        .await?
133        {
134            Ok(()) => Ok(Ok(())),
135            Err(err) => Err(err.into()),
136        }
137    }
138
139    #[instrument(skip_all, fields(num_keys = keys.len()))]
140    async fn delete_many(
141        &mut self,
142        bucket: Resource<store::Bucket>,
143        keys: Vec<String>,
144    ) -> anyhow::Result<Result<()>> {
145        self.attach_parent_context();
146        let bucket = self.table.get(&bucket).context("failed to get bucket")?;
147        let keys = keys.iter().map(String::as_str).collect::<Vec<_>>();
148        match wrpc::wrpc::keyvalue::batch::delete_many(
149            &self.handler,
150            Some(ReplacedInstanceTarget::KeyvalueBatch),
151            bucket,
152            &keys,
153        )
154        .await?
155        {
156            Ok(()) => Ok(Ok(())),
157            Err(err) => Err(err.into()),
158        }
159    }
160}
161
162impl<H> store::HostBucket for Ctx<H>
163where
164    H: Handler,
165{
166    #[instrument]
167    async fn get(
168        &mut self,
169        bucket: Resource<store::Bucket>,
170        key: String,
171    ) -> anyhow::Result<Result<Option<Vec<u8>>>> {
172        self.attach_parent_context();
173        let bucket = self.table.get(&bucket).context("failed to get bucket")?;
174        match wrpc::wrpc::keyvalue::store::get(
175            &self.handler,
176            Some(ReplacedInstanceTarget::KeyvalueStore),
177            bucket,
178            &key,
179        )
180        .await?
181        {
182            Ok(buf) => Ok(Ok(buf.map(Into::into))),
183            Err(err) => Ok(Err(err.into())),
184        }
185    }
186
187    #[instrument]
188    async fn set(
189        &mut self,
190        bucket: Resource<store::Bucket>,
191        key: String,
192        outgoing_value: Vec<u8>,
193    ) -> anyhow::Result<Result<()>> {
194        self.attach_parent_context();
195        let bucket = self.table.get(&bucket).context("failed to get bucket")?;
196        match wrpc::wrpc::keyvalue::store::set(
197            &self.handler,
198            Some(ReplacedInstanceTarget::KeyvalueStore),
199            bucket,
200            &key,
201            &Bytes::from(outgoing_value),
202        )
203        .await?
204        {
205            Ok(()) => Ok(Ok(())),
206            Err(err) => Err(err.into()),
207        }
208    }
209
210    #[instrument]
211    async fn delete(
212        &mut self,
213        bucket: Resource<store::Bucket>,
214        key: String,
215    ) -> anyhow::Result<Result<()>> {
216        self.attach_parent_context();
217        let bucket = self.table.get(&bucket).context("failed to get bucket")?;
218        match wrpc::wrpc::keyvalue::store::delete(
219            &self.handler,
220            Some(ReplacedInstanceTarget::KeyvalueStore),
221            bucket,
222            &key,
223        )
224        .await?
225        {
226            Ok(()) => Ok(Ok(())),
227            Err(err) => Err(err.into()),
228        }
229    }
230
231    #[instrument]
232    async fn exists(
233        &mut self,
234        bucket: Resource<store::Bucket>,
235        key: String,
236    ) -> anyhow::Result<Result<bool>> {
237        self.attach_parent_context();
238        let bucket = self.table.get(&bucket).context("failed to get bucket")?;
239        match wrpc::wrpc::keyvalue::store::exists(
240            &self.handler,
241            Some(ReplacedInstanceTarget::KeyvalueStore),
242            bucket,
243            &key,
244        )
245        .await?
246        {
247            Ok(ok) => Ok(Ok(ok)),
248            Err(err) => Err(err.into()),
249        }
250    }
251
252    #[instrument]
253    async fn list_keys(
254        &mut self,
255        bucket: Resource<store::Bucket>,
256        cursor: Option<u64>,
257    ) -> anyhow::Result<Result<store::KeyResponse>> {
258        self.attach_parent_context();
259        let bucket = self.table.get(&bucket).context("failed to get bucket")?;
260        match wrpc::wrpc::keyvalue::store::list_keys(
261            &self.handler,
262            Some(ReplacedInstanceTarget::KeyvalueStore),
263            bucket,
264            cursor,
265        )
266        .await?
267        {
268            Ok(wrpc::wrpc::keyvalue::store::KeyResponse { keys, cursor }) => {
269                Ok(Ok(store::KeyResponse { keys, cursor }))
270            }
271            Err(err) => Err(err.into()),
272        }
273    }
274
275    #[instrument]
276    async fn drop(&mut self, bucket: Resource<store::Bucket>) -> anyhow::Result<()> {
277        self.attach_parent_context();
278        self.table
279            .delete(bucket)
280            .context("failed to delete bucket")?;
281        Ok(())
282    }
283}
284
285impl<H, C> wrpc::exports::wrpc::keyvalue::watcher::Handler<C> for Instance<H, C>
286where
287    H: Handler,
288    C: Send,
289{
290    #[instrument(level = "info", skip_all)]
291    async fn on_set(
292        &self,
293        _cx: C,
294        bucket: String,
295        key: String,
296        value: bytes::Bytes,
297    ) -> anyhow::Result<(), anyhow::Error> {
298        let mut store = new_store(&self.engine, self.handler.clone(), self.max_execution_time);
299        let pre = keyvalue_watcher_bindings::WatcherPre::new(self.pre.clone())
300            .context("failed to pre-instantiate `wasi:keyvalue/watcher`")?;
301        trace!("instantiating `wasi:keyvalue/watcher`");
302        let bindings = pre
303            .instantiate_async(&mut store)
304            .await
305            .context("failed to instantiate `wasi:keyvalue/watcher.on_set`")?;
306        let bucket_repr: u32 = bucket.parse().context("failed to parse bucket as u32")?;
307        let new_bucket = Resource::new_own(bucket_repr);
308        debug!("invoking `wasi:keyvalue/watcher.on_set`");
309        bindings
310            .wasi_keyvalue_watcher()
311            .call_on_set(&mut store, new_bucket, &key, &value)
312            .await
313            .context("failed to call `wasi:keyvalue/watcher.on_set`")?;
314        Ok(())
315    }
316
317    #[instrument(level = "info", skip_all)]
318    async fn on_delete(
319        &self,
320        _cx: C,
321        bucket: String,
322        key: String,
323    ) -> anyhow::Result<(), anyhow::Error> {
324        let mut store = new_store(&self.engine, self.handler.clone(), self.max_execution_time);
325        let pre = keyvalue_watcher_bindings::WatcherPre::new(self.pre.clone())
326            .context("failed to pre-instantiate `wasi:keyvalue/watcher`")?;
327        trace!("instantiating `wasi:keyvalue/watcher`");
328        let bindings = pre
329            .instantiate_async(&mut store)
330            .await
331            .context("failed to instantiate `wasi:keyvalue/watcher.on_delete`")?;
332        let bucket_repr: u32 = bucket.parse().context("failed to parse bucket as u32")?;
333        let new_bucket = Resource::new_own(bucket_repr);
334        debug!("invoking `wasi:keyvalue/watcher.on_delete`");
335        bindings
336            .wasi_keyvalue_watcher()
337            .call_on_delete(&mut store, new_bucket, &key)
338            .await
339            .context("failed to call `wasi:keyvalue/watcher.on_delete`")?;
340        Ok(())
341    }
342}