pub type OrderedPullConsumer = Consumer<OrderedConfig>;
Aliased Type§
struct OrderedPullConsumer { /* private fields */ }
Implementations
Source§impl Consumer<OrderedConfig>
impl Consumer<OrderedConfig>
Sourcepub async fn messages(self) -> Result<Ordered, StreamError>
pub async fn messages(self) -> Result<Ordered, StreamError>
Returns a stream of messages for Ordered Pull Consumer.
Ordered consumers uses single replica ephemeral consumer, no matter the replication factor of the Stream. It does not use acks, instead it tracks sequences and recreate itself whenever it sees mismatch.
§Example
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
jetstream.publish("events", "data".into()).await?;
let consumer = stream
.get_or_create_consumer(
"consumer",
async_nats::jetstream::consumer::pull::OrderedConfig {
name: Some("consumer".to_string()),
..Default::default()
},
)
.await?;
let mut messages = consumer.messages().await?.take(100);
while let Some(Ok(message)) = messages.next().await {
println!("got message {:?}", message);
}
Ok(())
Source§impl<T: IntoConsumerConfig> Consumer<T>
impl<T: IntoConsumerConfig> Consumer<T>
Source§impl<T: IntoConsumerConfig> Consumer<T>
impl<T: IntoConsumerConfig> Consumer<T>
Sourcepub async fn info(&mut self) -> Result<&Info, RequestError>
pub async fn info(&mut self) -> Result<&Info, RequestError>
Retrieves info
about Consumer from the server, updates the cached info
inside
Consumer and returns it.
§Examples
use async_nats::jetstream::consumer::PullConsumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let mut consumer: PullConsumer = jetstream
.get_stream("events")
.await?
.get_consumer("pull")
.await?;
let info = consumer.info().await?;
Sourcepub fn cached_info(&self) -> &Info
pub fn cached_info(&self) -> &Info
Returns cached Info for the Consumer. Cache is either from initial creation/retrieval of the Consumer or last call to Info.
§Examples
use async_nats::jetstream::consumer::PullConsumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let consumer: PullConsumer = jetstream
.get_stream("events")
.await?
.get_consumer("pull")
.await?;
let info = consumer.cached_info();