Struct async_nats::jetstream::kv::Store
source · pub struct Store {
pub name: String,
pub stream_name: String,
pub prefix: String,
pub put_prefix: Option<String>,
pub use_jetstream_prefix: bool,
pub stream: Stream,
}
Expand description
A struct used as a handle for the bucket.
Fields§
§name: String
The name of the Store.
stream_name: String
The name of the stream associated with the Store.
prefix: String
The prefix for keys in the Store.
put_prefix: Option<String>
The optional prefix to use when putting new key-value pairs.
use_jetstream_prefix: bool
Indicates whether to use the JetStream prefix.
stream: Stream
The stream associated with the Store.
Implementations§
source§impl Store
impl Store
sourcepub async fn status(&self) -> Result<Status, StatusError>
pub async fn status(&self) -> Result<Status, StatusError>
Queries the server and returns status from the server.
§Examples
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let status = kv.status().await?;
println!("status: {:?}", status);
sourcepub async fn create<T: AsRef<str>>(
&self,
key: T,
value: Bytes,
) -> Result<u64, CreateError>
pub async fn create<T: AsRef<str>>( &self, key: T, value: Bytes, ) -> Result<u64, CreateError>
Create will add the key/value pair if it does not exist. If it does exist, it will return an error.
§Examples
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let status = kv.create("key", "value".into()).await;
assert!(status.is_ok());
let status = kv.create("key", "value".into()).await;
assert!(status.is_err());
sourcepub async fn put<T: AsRef<str>>(
&self,
key: T,
value: Bytes,
) -> Result<u64, PutError>
pub async fn put<T: AsRef<str>>( &self, key: T, value: Bytes, ) -> Result<u64, PutError>
Puts new key value pair into the bucket. If key didn’t exist, it is created. If it did exist, a new value with a new version is added.
§Examples
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let status = kv.put("key", "value".into()).await?;
sourcepub async fn entry<T: Into<String>>(
&self,
key: T,
) -> Result<Option<Entry>, EntryError>
pub async fn entry<T: Into<String>>( &self, key: T, ) -> Result<Option<Entry>, EntryError>
Retrieves the last Entry for a given key from a bucket.
§Examples
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let status = kv.put("key", "value".into()).await?;
let entry = kv.entry("key").await?;
println!("entry: {:?}", entry);
sourcepub async fn entry_for_revision<T: Into<String>>(
&self,
key: T,
revision: u64,
) -> Result<Option<Entry>, EntryError>
pub async fn entry_for_revision<T: Into<String>>( &self, key: T, revision: u64, ) -> Result<Option<Entry>, EntryError>
Retrieves the Entry for a given key revision from a bucket.
§Examples
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let status = kv.put("key", "value".into()).await?;
let status = kv.put("key", "value2".into()).await?;
let entry = kv.entry_for_revision("key", 2).await?;
println!("entry: {:?}", entry);
sourcepub async fn watch<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError>
pub async fn watch<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError>
Creates a futures::Stream over Entries a given key in the bucket, which yields values whenever there are changes for that key.
§Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let mut entries = kv.watch("kv").await?;
while let Some(entry) = entries.next().await {
println!("entry: {:?}", entry);
}
sourcepub async fn watch_from_revision<T: AsRef<str>>(
&self,
key: T,
revision: u64,
) -> Result<Watch, WatchError>
pub async fn watch_from_revision<T: AsRef<str>>( &self, key: T, revision: u64, ) -> Result<Watch, WatchError>
Creates a futures::Stream over Entries a given key in the bucket, starting from provided revision. This is useful to resume watching over big KV buckets without a need to replay all the history.
§Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let mut entries = kv.watch_from_revision("kv", 5).await?;
while let Some(entry) = entries.next().await {
println!("entry: {:?}", entry);
}
sourcepub async fn watch_with_history<T: AsRef<str>>(
&self,
key: T,
) -> Result<Watch, WatchError>
pub async fn watch_with_history<T: AsRef<str>>( &self, key: T, ) -> Result<Watch, WatchError>
Creates a futures::Stream over Entries a given key in the bucket, which yields values whenever there are changes for that key with as well as last value.
§Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let mut entries = kv.watch_with_history("kv").await?;
while let Some(entry) = entries.next().await {
println!("entry: {:?}", entry);
}
sourcepub async fn watch_all(&self) -> Result<Watch, WatchError>
pub async fn watch_all(&self) -> Result<Watch, WatchError>
Creates a futures::Stream over Entries for all keys, which yields values whenever there are changes in the bucket.
§Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let mut entries = kv.watch_all().await?;
while let Some(entry) = entries.next().await {
println!("entry: {:?}", entry);
}
sourcepub async fn watch_all_from_revision(
&self,
revision: u64,
) -> Result<Watch, WatchError>
pub async fn watch_all_from_revision( &self, revision: u64, ) -> Result<Watch, WatchError>
Creates a futures::Stream over Entries for all keys starting from a provider revision. This can be useful when resuming watching over a big bucket without the need to replay all the history.
§Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let mut entries = kv.watch_all_from_revision(40).await?;
while let Some(entry) = entries.next().await {
println!("entry: {:?}", entry);
}
sourcepub async fn get<T: Into<String>>(
&self,
key: T,
) -> Result<Option<Bytes>, EntryError>
pub async fn get<T: Into<String>>( &self, key: T, ) -> Result<Option<Bytes>, EntryError>
Retrieves the Entry for a given key from a bucket.
§Examples
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let value = kv.get("key").await?;
match value {
Some(bytes) => {
let value_str = std::str::from_utf8(&bytes)?;
println!("Value: {}", value_str);
}
None => {
println!("Key not found or value not set");
}
}
sourcepub async fn update<T: AsRef<str>>(
&self,
key: T,
value: Bytes,
revision: u64,
) -> Result<u64, UpdateError>
pub async fn update<T: AsRef<str>>( &self, key: T, value: Bytes, revision: u64, ) -> Result<u64, UpdateError>
Updates a value for a given key, but only if passed revision
is the last revision
in
the bucket.
§Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let revision = kv.put("key", "value".into()).await?;
kv.update("key", "updated".into(), revision).await?;
sourcepub async fn delete<T: AsRef<str>>(&self, key: T) -> Result<(), DeleteError>
pub async fn delete<T: AsRef<str>>(&self, key: T) -> Result<(), DeleteError>
Deletes a given key. This is a non-destructive operation, which sets a DELETE
marker.
§Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
kv.put("key", "value".into()).await?;
kv.delete("key").await?;
sourcepub async fn delete_expect_revision<T: AsRef<str>>(
&self,
key: T,
revison: Option<u64>,
) -> Result<(), DeleteError>
pub async fn delete_expect_revision<T: AsRef<str>>( &self, key: T, revison: Option<u64>, ) -> Result<(), DeleteError>
Deletes a given key if the revision matches. This is a non-destructive operation, which
sets a DELETE
marker.
§Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let revision = kv.put("key", "value".into()).await?;
kv.delete_expect_revision("key", Some(revision)).await?;
sourcepub async fn purge<T: AsRef<str>>(&self, key: T) -> Result<(), PurgeError>
pub async fn purge<T: AsRef<str>>(&self, key: T) -> Result<(), PurgeError>
Purges all the revisions of a entry destructively, leaving behind a single purge entry in-place.
§Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
kv.put("key", "value".into()).await?;
kv.put("key", "another".into()).await?;
kv.purge("key").await?;
sourcepub async fn purge_expect_revision<T: AsRef<str>>(
&self,
key: T,
revison: Option<u64>,
) -> Result<(), PurgeError>
pub async fn purge_expect_revision<T: AsRef<str>>( &self, key: T, revison: Option<u64>, ) -> Result<(), PurgeError>
Purges all the revisions of a entry destructively if the revision matches, leaving behind a single purge entry in-place.
§Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
kv.put("key", "value".into()).await?;
let revision = kv.put("key", "another".into()).await?;
kv.purge_expect_revision("key", Some(revision)).await?;
sourcepub async fn history<T: AsRef<str>>(
&self,
key: T,
) -> Result<History, HistoryError>
pub async fn history<T: AsRef<str>>( &self, key: T, ) -> Result<History, HistoryError>
Returns a futures::Stream that allows iterating over all Operations that happen for given key.
§Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let mut entries = kv.history("kv").await?;
while let Some(entry) = entries.next().await {
println!("entry: {:?}", entry);
}
sourcepub async fn keys(&self) -> Result<Keys, HistoryError>
pub async fn keys(&self) -> Result<Keys, HistoryError>
Returns a futures::Stream that allows iterating over all keys in the bucket.
§Examples
Iterating over each each key individually
use futures::{StreamExt, TryStreamExt};
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let mut keys = kv.keys().await?.boxed();
while let Some(key) = keys.try_next().await? {
println!("key: {:?}", key);
}
Collecting it into a vector of keys
use futures::TryStreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let keys = kv.keys().await?.try_collect::<Vec<String>>().await?;
println!("Keys: {:?}", keys);
Trait Implementations§
Auto Trait Implementations§
impl Freeze for Store
impl !RefUnwindSafe for Store
impl Send for Store
impl Sync for Store
impl Unpin for Store
impl !UnwindSafe for Store
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
source§unsafe fn clone_to_uninit(&self, dst: *mut T)
unsafe fn clone_to_uninit(&self, dst: *mut T)
clone_to_uninit
)