wasmcloud_runtime/component/
keyvalue.rs1use super::{new_store, Ctx, Handler, Instance, ReplacedInstanceTarget};
2
3use crate::capability::keyvalue::{atomics, batch, store};
4use crate::capability::wrpc;
5
6use anyhow::Context;
7use bytes::Bytes;
8use std::sync::Arc;
9use tracing::{debug, instrument, trace};
10use wasmtime::component::Resource;
11
12type Result<T, E = store::Error> = core::result::Result<T, E>;
13
14pub mod keyvalue_watcher_bindings {
15 wasmtime::component::bindgen!({
16 world: "watcher",
17 imports: { default: async | trappable | tracing },
18 exports: { default: async | trappable | tracing },
19 with: {
20 "wasi:keyvalue/store" : crate::capability::keyvalue::store,
21 }
22 });
23}
24
25impl From<wrpc::wrpc::keyvalue::store::Error> for store::Error {
26 fn from(value: wrpc::wrpc::keyvalue::store::Error) -> Self {
27 match value {
28 wrpc::wrpc::keyvalue::store::Error::NoSuchStore => Self::NoSuchStore,
29 wrpc::wrpc::keyvalue::store::Error::AccessDenied => Self::AccessDenied,
30 wrpc::wrpc::keyvalue::store::Error::Other(other) => Self::Other(other),
31 }
32 }
33}
34
35impl<H> atomics::Host for Ctx<H>
36where
37 H: Handler,
38{
39 #[instrument(level = "debug", skip_all)]
40 async fn increment(
41 &mut self,
42 bucket: Resource<store::Bucket>,
43 key: String,
44 delta: u64,
45 ) -> anyhow::Result<Result<u64>> {
46 self.attach_parent_context();
47 let bucket = self.table.get(&bucket).context("failed to get bucket")?;
48 match wrpc::wrpc::keyvalue::atomics::increment(
49 &self.handler,
50 Some(ReplacedInstanceTarget::KeyvalueAtomics),
51 bucket,
52 &key,
53 delta,
54 )
55 .await?
56 {
57 Ok(n) => Ok(Ok(n)),
58 Err(err) => Ok(Err(err.into())),
59 }
60 }
61}
62
63impl<H> store::Host for Ctx<H>
64where
65 H: Handler,
66{
67 #[instrument]
68 async fn open(&mut self, name: String) -> anyhow::Result<Result<Resource<store::Bucket>>> {
69 self.attach_parent_context();
70 let bucket = self
71 .table
72 .push(Arc::from(name))
73 .context("failed to open bucket")?;
74 Ok(Ok(bucket))
75 }
76}
77
78impl<H> batch::Host for Ctx<H>
79where
80 H: Handler,
81{
82 #[instrument(skip_all, fields(num_keys = keys.len()))]
83 async fn get_many(
84 &mut self,
85 bucket: Resource<store::Bucket>,
86 keys: Vec<String>,
87 ) -> anyhow::Result<Result<Vec<Option<(String, Vec<u8>)>>>> {
88 self.attach_parent_context();
89 let bucket = self.table.get(&bucket).context("failed to get bucket")?;
90 let keys = keys.iter().map(String::as_str).collect::<Vec<_>>();
93
94 match wrpc::wrpc::keyvalue::batch::get_many(
95 &self.handler,
96 Some(ReplacedInstanceTarget::KeyvalueBatch),
97 bucket,
98 &keys,
99 )
100 .await?
101 {
102 Ok(res) => Ok(Ok(res
103 .into_iter()
104 .map(|opt| opt.map(|(k, v)| (k, Vec::from(v))))
105 .collect())),
106 Err(err) => Err(err.into()),
107 }
108 }
109
110 #[instrument(skip_all, fields(num_entries = entries.len()))]
111 async fn set_many(
112 &mut self,
113 bucket: Resource<store::Bucket>,
114 entries: Vec<(String, Vec<u8>)>,
115 ) -> anyhow::Result<Result<()>> {
116 self.attach_parent_context();
117 let bucket = self.table.get(&bucket).context("failed to get bucket")?;
118 let entries = entries
119 .into_iter()
120 .map(|(k, v)| (k, Bytes::from(v)))
121 .collect::<Vec<_>>();
122 let massaged = entries
123 .iter()
124 .map(|(k, v)| (k.as_str(), v))
125 .collect::<Vec<_>>();
126 match wrpc::wrpc::keyvalue::batch::set_many(
127 &self.handler,
128 Some(ReplacedInstanceTarget::KeyvalueBatch),
129 bucket,
130 &massaged,
131 )
132 .await?
133 {
134 Ok(()) => Ok(Ok(())),
135 Err(err) => Err(err.into()),
136 }
137 }
138
139 #[instrument(skip_all, fields(num_keys = keys.len()))]
140 async fn delete_many(
141 &mut self,
142 bucket: Resource<store::Bucket>,
143 keys: Vec<String>,
144 ) -> anyhow::Result<Result<()>> {
145 self.attach_parent_context();
146 let bucket = self.table.get(&bucket).context("failed to get bucket")?;
147 let keys = keys.iter().map(String::as_str).collect::<Vec<_>>();
148 match wrpc::wrpc::keyvalue::batch::delete_many(
149 &self.handler,
150 Some(ReplacedInstanceTarget::KeyvalueBatch),
151 bucket,
152 &keys,
153 )
154 .await?
155 {
156 Ok(()) => Ok(Ok(())),
157 Err(err) => Err(err.into()),
158 }
159 }
160}
161
162impl<H> store::HostBucket for Ctx<H>
163where
164 H: Handler,
165{
166 #[instrument]
167 async fn get(
168 &mut self,
169 bucket: Resource<store::Bucket>,
170 key: String,
171 ) -> anyhow::Result<Result<Option<Vec<u8>>>> {
172 self.attach_parent_context();
173 let bucket = self.table.get(&bucket).context("failed to get bucket")?;
174 match wrpc::wrpc::keyvalue::store::get(
175 &self.handler,
176 Some(ReplacedInstanceTarget::KeyvalueStore),
177 bucket,
178 &key,
179 )
180 .await?
181 {
182 Ok(buf) => Ok(Ok(buf.map(Into::into))),
183 Err(err) => Ok(Err(err.into())),
184 }
185 }
186
187 #[instrument]
188 async fn set(
189 &mut self,
190 bucket: Resource<store::Bucket>,
191 key: String,
192 outgoing_value: Vec<u8>,
193 ) -> anyhow::Result<Result<()>> {
194 self.attach_parent_context();
195 let bucket = self.table.get(&bucket).context("failed to get bucket")?;
196 match wrpc::wrpc::keyvalue::store::set(
197 &self.handler,
198 Some(ReplacedInstanceTarget::KeyvalueStore),
199 bucket,
200 &key,
201 &Bytes::from(outgoing_value),
202 )
203 .await?
204 {
205 Ok(()) => Ok(Ok(())),
206 Err(err) => Err(err.into()),
207 }
208 }
209
210 #[instrument]
211 async fn delete(
212 &mut self,
213 bucket: Resource<store::Bucket>,
214 key: String,
215 ) -> anyhow::Result<Result<()>> {
216 self.attach_parent_context();
217 let bucket = self.table.get(&bucket).context("failed to get bucket")?;
218 match wrpc::wrpc::keyvalue::store::delete(
219 &self.handler,
220 Some(ReplacedInstanceTarget::KeyvalueStore),
221 bucket,
222 &key,
223 )
224 .await?
225 {
226 Ok(()) => Ok(Ok(())),
227 Err(err) => Err(err.into()),
228 }
229 }
230
231 #[instrument]
232 async fn exists(
233 &mut self,
234 bucket: Resource<store::Bucket>,
235 key: String,
236 ) -> anyhow::Result<Result<bool>> {
237 self.attach_parent_context();
238 let bucket = self.table.get(&bucket).context("failed to get bucket")?;
239 match wrpc::wrpc::keyvalue::store::exists(
240 &self.handler,
241 Some(ReplacedInstanceTarget::KeyvalueStore),
242 bucket,
243 &key,
244 )
245 .await?
246 {
247 Ok(ok) => Ok(Ok(ok)),
248 Err(err) => Err(err.into()),
249 }
250 }
251
252 #[instrument]
253 async fn list_keys(
254 &mut self,
255 bucket: Resource<store::Bucket>,
256 cursor: Option<u64>,
257 ) -> anyhow::Result<Result<store::KeyResponse>> {
258 self.attach_parent_context();
259 let bucket = self.table.get(&bucket).context("failed to get bucket")?;
260 match wrpc::wrpc::keyvalue::store::list_keys(
261 &self.handler,
262 Some(ReplacedInstanceTarget::KeyvalueStore),
263 bucket,
264 cursor,
265 )
266 .await?
267 {
268 Ok(wrpc::wrpc::keyvalue::store::KeyResponse { keys, cursor }) => {
269 Ok(Ok(store::KeyResponse { keys, cursor }))
270 }
271 Err(err) => Err(err.into()),
272 }
273 }
274
275 #[instrument]
276 async fn drop(&mut self, bucket: Resource<store::Bucket>) -> anyhow::Result<()> {
277 self.attach_parent_context();
278 self.table
279 .delete(bucket)
280 .context("failed to delete bucket")?;
281 Ok(())
282 }
283}
284
285impl<H, C> wrpc::exports::wrpc::keyvalue::watcher::Handler<C> for Instance<H, C>
286where
287 H: Handler,
288 C: Send,
289{
290 #[instrument(level = "info", skip_all)]
291 async fn on_set(
292 &self,
293 _cx: C,
294 bucket: String,
295 key: String,
296 value: bytes::Bytes,
297 ) -> anyhow::Result<(), anyhow::Error> {
298 let mut store = new_store(&self.engine, self.handler.clone(), self.max_execution_time);
299 let pre = keyvalue_watcher_bindings::WatcherPre::new(self.pre.clone())
300 .context("failed to pre-instantiate `wasi:keyvalue/watcher`")?;
301 trace!("instantiating `wasi:keyvalue/watcher`");
302 let bindings = pre
303 .instantiate_async(&mut store)
304 .await
305 .context("failed to instantiate `wasi:keyvalue/watcher.on_set`")?;
306 let bucket_repr: u32 = bucket.parse().context("failed to parse bucket as u32")?;
307 let new_bucket = Resource::new_own(bucket_repr);
308 debug!("invoking `wasi:keyvalue/watcher.on_set`");
309 bindings
310 .wasi_keyvalue_watcher()
311 .call_on_set(&mut store, new_bucket, &key, &value)
312 .await
313 .context("failed to call `wasi:keyvalue/watcher.on_set`")?;
314 Ok(())
315 }
316
317 #[instrument(level = "info", skip_all)]
318 async fn on_delete(
319 &self,
320 _cx: C,
321 bucket: String,
322 key: String,
323 ) -> anyhow::Result<(), anyhow::Error> {
324 let mut store = new_store(&self.engine, self.handler.clone(), self.max_execution_time);
325 let pre = keyvalue_watcher_bindings::WatcherPre::new(self.pre.clone())
326 .context("failed to pre-instantiate `wasi:keyvalue/watcher`")?;
327 trace!("instantiating `wasi:keyvalue/watcher`");
328 let bindings = pre
329 .instantiate_async(&mut store)
330 .await
331 .context("failed to instantiate `wasi:keyvalue/watcher.on_delete`")?;
332 let bucket_repr: u32 = bucket.parse().context("failed to parse bucket as u32")?;
333 let new_bucket = Resource::new_own(bucket_repr);
334 debug!("invoking `wasi:keyvalue/watcher.on_delete`");
335 bindings
336 .wasi_keyvalue_watcher()
337 .call_on_delete(&mut store, new_bucket, &key)
338 .await
339 .context("failed to call `wasi:keyvalue/watcher.on_delete`")?;
340 Ok(())
341 }
342}