async_nats::jetstream::consumer

Type Alias PullConsumer

Source
pub type PullConsumer = Consumer<Config>;

Aliased Type§

struct PullConsumer { /* private fields */ }

Implementations

Source§

impl Consumer<Config>

Source

pub async fn messages(&self) -> Result<Stream, StreamError>

Returns a stream of messages for Pull Consumer.

§Example
use futures::StreamExt;
use futures::TryStreamExt;

let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
    .get_or_create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        max_messages: 10_000,
        ..Default::default()
    })
    .await?;

jetstream.publish("events", "data".into()).await?;

let consumer = stream
    .get_or_create_consumer(
        "consumer",
        async_nats::jetstream::consumer::pull::Config {
            durable_name: Some("consumer".to_string()),
            ..Default::default()
        },
    )
    .await?;

let mut messages = consumer.messages().await?.take(100);
while let Some(Ok(message)) = messages.next().await {
    println!("got message {:?}", message);
    message.ack().await?;
}
Ok(())
Source

pub fn stream(&self) -> StreamBuilder<'_>

Enables customization of Stream by setting timeouts, heartbeats, maximum number of messages or bytes buffered.

§Examples
use async_nats::jetstream::consumer::PullConsumer;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let consumer: PullConsumer = jetstream
    .get_stream("events")
    .await?
    .get_consumer("pull")
    .await?;

let mut messages = consumer
    .stream()
    .max_messages_per_batch(100)
    .max_bytes_per_batch(1024)
    .messages()
    .await?;

while let Some(message) = messages.next().await {
    let message = message?;
    println!("message: {:?}", message);
    message.ack().await?;
}
Source

pub fn fetch(&self) -> FetchBuilder<'_>

Returns a batch of specified number of messages, or if there are less messages on the Stream than requested, returns all available messages.

§Example
use futures::StreamExt;
use futures::TryStreamExt;

let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
    .get_or_create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        max_messages: 10_000,
        ..Default::default()
    })
    .await?;

jetstream.publish("events", "data".into()).await?;

let consumer = stream
    .get_or_create_consumer(
        "consumer",
        async_nats::jetstream::consumer::pull::Config {
            durable_name: Some("consumer".to_string()),
            ..Default::default()
        },
    )
    .await?;

for _ in 0..100 {
    jetstream.publish("events", "data".into()).await?;
}

let mut messages = consumer.fetch().max_messages(200).messages().await?;
// will finish after 100 messages, as that is the number of messages available on the
// stream.
while let Some(Ok(message)) = messages.next().await {
    println!("got message {:?}", message);
    message.ack().await?;
}
Ok(())
Source

pub fn batch(&self) -> BatchBuilder<'_>

Returns a batch of specified number of messages unless timeout happens first.

§Example
use futures::StreamExt;
use futures::TryStreamExt;

let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
    .get_or_create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        max_messages: 10_000,
        ..Default::default()
    })
    .await?;

jetstream.publish("events", "data".into()).await?;

let consumer = stream
    .get_or_create_consumer(
        "consumer",
        async_nats::jetstream::consumer::pull::Config {
            durable_name: Some("consumer".to_string()),
            ..Default::default()
        },
    )
    .await?;

let mut messages = consumer.batch().max_messages(100).messages().await?;
while let Some(Ok(message)) = messages.next().await {
    println!("got message {:?}", message);
    message.ack().await?;
}
Ok(())
Source

pub fn sequence(&self, batch: usize) -> Result<Sequence, BatchError>

Returns a sequence of Batches allowing for iterating over batches, and then over messages in those batches.

§Example
use futures::StreamExt;
use futures::TryStreamExt;

let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
    .get_or_create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        max_messages: 10_000,
        ..Default::default()
    })
    .await?;

jetstream.publish("events", "data".into()).await?;

let consumer = stream
    .get_or_create_consumer(
        "consumer",
        async_nats::jetstream::consumer::pull::Config {
            durable_name: Some("consumer".to_string()),
            ..Default::default()
        },
    )
    .await?;

let mut iter = consumer.sequence(50).unwrap().take(10);
while let Ok(Some(mut batch)) = iter.try_next().await {
    while let Ok(Some(message)) = batch.try_next().await {
        println!("message received: {:?}", message);
    }
}
Ok(())
Source§

impl<T: IntoConsumerConfig> Consumer<T>

Source

pub fn new(config: T, info: Info, context: Context) -> Self

Source§

impl<T: IntoConsumerConfig> Consumer<T>

Source

pub async fn info(&mut self) -> Result<&Info, RequestError>

Retrieves info about Consumer from the server, updates the cached info inside Consumer and returns it.

§Examples
use async_nats::jetstream::consumer::PullConsumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let mut consumer: PullConsumer = jetstream
    .get_stream("events")
    .await?
    .get_consumer("pull")
    .await?;

let info = consumer.info().await?;
Source

pub fn cached_info(&self) -> &Info

Returns cached Info for the Consumer. Cache is either from initial creation/retrieval of the Consumer or last call to Info.

§Examples
use async_nats::jetstream::consumer::PullConsumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let consumer: PullConsumer = jetstream
    .get_stream("events")
    .await?
    .get_consumer("pull")
    .await?;

let info = consumer.cached_info();

Trait Implementations

Source§

impl<T: Clone + IntoConsumerConfig> Clone for Consumer<T>

Source§

fn clone(&self) -> Consumer<T>

Returns a copy of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<T: Debug + IntoConsumerConfig> Debug for Consumer<T>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more