async_nats::jetstream::consumer

Type Alias OrderedPullConsumer

Source
pub type OrderedPullConsumer = Consumer<OrderedConfig>;

Aliased Type§

struct OrderedPullConsumer { /* private fields */ }

Implementations

Source§

impl Consumer<OrderedConfig>

Source

pub async fn messages(self) -> Result<Ordered, StreamError>

Returns a stream of messages for Ordered Pull Consumer.

Ordered consumers uses single replica ephemeral consumer, no matter the replication factor of the Stream. It does not use acks, instead it tracks sequences and recreate itself whenever it sees mismatch.

§Example
use futures::StreamExt;
use futures::TryStreamExt;

let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
    .get_or_create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        max_messages: 10_000,
        ..Default::default()
    })
    .await?;

jetstream.publish("events", "data".into()).await?;

let consumer = stream
    .get_or_create_consumer(
        "consumer",
        async_nats::jetstream::consumer::pull::OrderedConfig {
            name: Some("consumer".to_string()),
            ..Default::default()
        },
    )
    .await?;

let mut messages = consumer.messages().await?.take(100);
while let Some(Ok(message)) = messages.next().await {
    println!("got message {:?}", message);
}
Ok(())
Source§

impl<T: IntoConsumerConfig> Consumer<T>

Source

pub fn new(config: T, info: Info, context: Context) -> Self

Source§

impl<T: IntoConsumerConfig> Consumer<T>

Source

pub async fn info(&mut self) -> Result<&Info, RequestError>

Retrieves info about Consumer from the server, updates the cached info inside Consumer and returns it.

§Examples
use async_nats::jetstream::consumer::PullConsumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let mut consumer: PullConsumer = jetstream
    .get_stream("events")
    .await?
    .get_consumer("pull")
    .await?;

let info = consumer.info().await?;
Source

pub fn cached_info(&self) -> &Info

Returns cached Info for the Consumer. Cache is either from initial creation/retrieval of the Consumer or last call to Info.

§Examples
use async_nats::jetstream::consumer::PullConsumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let consumer: PullConsumer = jetstream
    .get_stream("events")
    .await?
    .get_consumer("pull")
    .await?;

let info = consumer.cached_info();

Trait Implementations

Source§

impl<T: Clone + IntoConsumerConfig> Clone for Consumer<T>

Source§

fn clone(&self) -> Consumer<T>

Returns a copy of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<T: Debug + IntoConsumerConfig> Debug for Consumer<T>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more