pub type PullConsumer = Consumer<Config>;
Aliased Type§
struct PullConsumer { /* private fields */ }
Implementations
Source§impl Consumer<Config>
impl Consumer<Config>
Sourcepub async fn messages(&self) -> Result<Stream, StreamError>
pub async fn messages(&self) -> Result<Stream, StreamError>
Returns a stream of messages for Pull Consumer.
§Example
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
jetstream.publish("events", "data".into()).await?;
let consumer = stream
.get_or_create_consumer(
"consumer",
async_nats::jetstream::consumer::pull::Config {
durable_name: Some("consumer".to_string()),
..Default::default()
},
)
.await?;
let mut messages = consumer.messages().await?.take(100);
while let Some(Ok(message)) = messages.next().await {
println!("got message {:?}", message);
message.ack().await?;
}
Ok(())
Sourcepub fn stream(&self) -> StreamBuilder<'_>
pub fn stream(&self) -> StreamBuilder<'_>
Enables customization of Stream by setting timeouts, heartbeats, maximum number of messages or bytes buffered.
§Examples
use async_nats::jetstream::consumer::PullConsumer;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let consumer: PullConsumer = jetstream
.get_stream("events")
.await?
.get_consumer("pull")
.await?;
let mut messages = consumer
.stream()
.max_messages_per_batch(100)
.max_bytes_per_batch(1024)
.messages()
.await?;
while let Some(message) = messages.next().await {
let message = message?;
println!("message: {:?}", message);
message.ack().await?;
}
Sourcepub fn fetch(&self) -> FetchBuilder<'_>
pub fn fetch(&self) -> FetchBuilder<'_>
Returns a batch of specified number of messages, or if there are less messages on the Stream than requested, returns all available messages.
§Example
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
jetstream.publish("events", "data".into()).await?;
let consumer = stream
.get_or_create_consumer(
"consumer",
async_nats::jetstream::consumer::pull::Config {
durable_name: Some("consumer".to_string()),
..Default::default()
},
)
.await?;
for _ in 0..100 {
jetstream.publish("events", "data".into()).await?;
}
let mut messages = consumer.fetch().max_messages(200).messages().await?;
// will finish after 100 messages, as that is the number of messages available on the
// stream.
while let Some(Ok(message)) = messages.next().await {
println!("got message {:?}", message);
message.ack().await?;
}
Ok(())
Sourcepub fn batch(&self) -> BatchBuilder<'_>
pub fn batch(&self) -> BatchBuilder<'_>
Returns a batch of specified number of messages unless timeout happens first.
§Example
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
jetstream.publish("events", "data".into()).await?;
let consumer = stream
.get_or_create_consumer(
"consumer",
async_nats::jetstream::consumer::pull::Config {
durable_name: Some("consumer".to_string()),
..Default::default()
},
)
.await?;
let mut messages = consumer.batch().max_messages(100).messages().await?;
while let Some(Ok(message)) = messages.next().await {
println!("got message {:?}", message);
message.ack().await?;
}
Ok(())
Sourcepub fn sequence(&self, batch: usize) -> Result<Sequence, BatchError>
pub fn sequence(&self, batch: usize) -> Result<Sequence, BatchError>
Returns a sequence of Batches allowing for iterating over batches, and then over messages in those batches.
§Example
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
jetstream.publish("events", "data".into()).await?;
let consumer = stream
.get_or_create_consumer(
"consumer",
async_nats::jetstream::consumer::pull::Config {
durable_name: Some("consumer".to_string()),
..Default::default()
},
)
.await?;
let mut iter = consumer.sequence(50).unwrap().take(10);
while let Ok(Some(mut batch)) = iter.try_next().await {
while let Ok(Some(message)) = batch.try_next().await {
println!("message received: {:?}", message);
}
}
Ok(())
Source§impl<T: IntoConsumerConfig> Consumer<T>
impl<T: IntoConsumerConfig> Consumer<T>
Source§impl<T: IntoConsumerConfig> Consumer<T>
impl<T: IntoConsumerConfig> Consumer<T>
Sourcepub async fn info(&mut self) -> Result<&Info, RequestError>
pub async fn info(&mut self) -> Result<&Info, RequestError>
Retrieves info
about Consumer from the server, updates the cached info
inside
Consumer and returns it.
§Examples
use async_nats::jetstream::consumer::PullConsumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let mut consumer: PullConsumer = jetstream
.get_stream("events")
.await?
.get_consumer("pull")
.await?;
let info = consumer.info().await?;
Sourcepub fn cached_info(&self) -> &Info
pub fn cached_info(&self) -> &Info
Returns cached Info for the Consumer. Cache is either from initial creation/retrieval of the Consumer or last call to Info.
§Examples
use async_nats::jetstream::consumer::PullConsumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let consumer: PullConsumer = jetstream
.get_stream("events")
.await?
.get_consumer("pull")
.await?;
let info = consumer.cached_info();