Struct async_nats::ConnectOptions
source · pub struct ConnectOptions { /* private fields */ }
Expand description
Connect options. Used to connect with NATS when custom config is needed.
§Examples
let mut options = async_nats::ConnectOptions::new()
.require_tls(true)
.ping_interval(std::time::Duration::from_secs(10))
.connect("demo.nats.io")
.await?;
Implementations§
source§impl ConnectOptions
impl ConnectOptions
sourcepub fn new() -> ConnectOptions
pub fn new() -> ConnectOptions
Enables customization of NATS connection.
§Examples
let mut options = async_nats::ConnectOptions::new()
.require_tls(true)
.ping_interval(std::time::Duration::from_secs(10))
.connect("demo.nats.io")
.await?;
sourcepub async fn connect<A: ToServerAddrs>(
self,
addrs: A,
) -> Result<Client, ConnectError>
pub async fn connect<A: ToServerAddrs>( self, addrs: A, ) -> Result<Client, ConnectError>
Connect to the NATS Server leveraging all passed options.
§Examples
let nc = async_nats::ConnectOptions::new()
.require_tls(true)
.connect("demo.nats.io")
.await?;
§Pass multiple URLs.
#[tokio::main]
use async_nats::ServerAddr;
let client = async_nats::connect(vec![
"demo.nats.io".parse::<ServerAddr>()?,
"other.nats.io".parse::<ServerAddr>()?,
])
.await
.unwrap();
sourcepub fn with_auth_callback<F, Fut>(callback: F) -> Self
pub fn with_auth_callback<F, Fut>(callback: F) -> Self
Creates a builder with a custom auth callback to be used when authenticating against the NATS Server. Requires an asynchronous function that accepts nonce and returns Auth. It will overwrite all other auth methods used.
§Example
async_nats::ConnectOptions::with_auth_callback(move |_| async move {
let mut auth = async_nats::Auth::new();
auth.username = Some("derek".to_string());
auth.password = Some("s3cr3t".to_string());
Ok(auth)
})
.connect("demo.nats.io")
.await?;
sourcepub fn with_token(token: String) -> Self
pub fn with_token(token: String) -> Self
Authenticate against NATS Server with the provided token.
§Examples
let nc = async_nats::ConnectOptions::with_token("t0k3n!".into())
.connect("demo.nats.io")
.await?;
sourcepub fn token(self, token: String) -> Self
pub fn token(self, token: String) -> Self
Use a builder to specify a token, to be used when authenticating against the NATS Server. This can be used as a way to mix authentication methods.
§Examples
let nc = async_nats::ConnectOptions::new()
.token("t0k3n!".into())
.connect("demo.nats.io")
.await?;
sourcepub fn with_user_and_password(user: String, pass: String) -> Self
pub fn with_user_and_password(user: String, pass: String) -> Self
Authenticate against NATS Server with the provided username and password.
§Examples
let nc = async_nats::ConnectOptions::with_user_and_password("derek".into(), "s3cr3t!".into())
.connect("demo.nats.io")
.await?;
sourcepub fn user_and_password(self, user: String, pass: String) -> Self
pub fn user_and_password(self, user: String, pass: String) -> Self
Use a builder to specify a username and password, to be used when authenticating against the NATS Server. This can be used as a way to mix authentication methods.
§Examples
let nc = async_nats::ConnectOptions::new()
.user_and_password("derek".into(), "s3cr3t!".into())
.connect("demo.nats.io")
.await?;
sourcepub fn with_nkey(seed: String) -> Self
pub fn with_nkey(seed: String) -> Self
Authenticate with an NKey. Requires an NKey Seed secret.
§Example
let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
let nc = async_nats::ConnectOptions::with_nkey(seed.into())
.connect("localhost")
.await?;
sourcepub fn nkey(self, seed: String) -> Self
pub fn nkey(self, seed: String) -> Self
Use a builder to specify an NKey, to be used when authenticating against the NATS Server. Requires an NKey Seed Secret. This can be used as a way to mix authentication methods.
§Example
let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
let nc = async_nats::ConnectOptions::new()
.nkey(seed.into())
.connect("localhost")
.await?;
sourcepub fn with_jwt<F, Fut>(jwt: String, sign_cb: F) -> Self
pub fn with_jwt<F, Fut>(jwt: String, sign_cb: F) -> Self
Authenticate with a JWT. Requires function to sign the server nonce. The signing function is asynchronous.
§Example
let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
// load jwt from creds file or other secure source
async fn load_jwt() -> std::io::Result<String> {
todo!();
}
let jwt = load_jwt().await?;
let nc = async_nats::ConnectOptions::with_jwt(jwt, move |nonce| {
let key_pair = key_pair.clone();
async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
})
.connect("localhost")
.await?;
sourcepub fn jwt<F, Fut>(self, jwt: String, sign_cb: F) -> Self
pub fn jwt<F, Fut>(self, jwt: String, sign_cb: F) -> Self
Use a builder to specify a JWT, to be used when authenticating against the NATS Server. Requires an asynchronous function to sign the server nonce. This can be used as a way to mix authentication methods.
§Example
let seed = "SUANQDPB2RUOE4ETUA26CNX7FUKE5ZZKFCQIIW63OX225F2CO7UEXTM7ZY";
let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(seed).unwrap());
// load jwt from creds file or other secure source
async fn load_jwt() -> std::io::Result<String> {
todo!();
}
let jwt = load_jwt().await?;
let nc = async_nats::ConnectOptions::new()
.jwt(jwt, move |nonce| {
let key_pair = key_pair.clone();
async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
})
.connect("localhost")
.await?;
sourcepub async fn with_credentials_file(path: impl AsRef<Path>) -> Result<Self>
pub async fn with_credentials_file(path: impl AsRef<Path>) -> Result<Self>
Authenticate with NATS using a .creds
file.
Open the provided file, load its creds,
and perform the desired authentication
§Example
let nc = async_nats::ConnectOptions::with_credentials_file("path/to/my.creds")
.await?
.connect("connect.ngs.global")
.await?;
sourcepub async fn credentials_file(self, path: impl AsRef<Path>) -> Result<Self>
pub async fn credentials_file(self, path: impl AsRef<Path>) -> Result<Self>
Use a builder to specify a credentials file, to be used when authenticating against the NATS Server. This will open the credentials file and load its credentials. This can be used as a way to mix authentication methods.
§Example
let nc = async_nats::ConnectOptions::new()
.credentials_file("path/to/my.creds")
.await?
.connect("connect.ngs.global")
.await?;
sourcepub fn with_credentials(creds: &str) -> Result<Self>
pub fn with_credentials(creds: &str) -> Result<Self>
Authenticate with NATS using a credential str, in the creds file format.
§Example
let creds = "-----BEGIN NATS USER JWT-----
eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
------END NATS USER JWT------
************************* IMPORTANT *************************
NKEY Seed printed below can be used sign and prove identity.
NKEYs are sensitive and should be treated as secrets.
-----BEGIN USER NKEY SEED-----
SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
------END USER NKEY SEED------
";
let nc = async_nats::ConnectOptions::with_credentials(creds)
.expect("failed to parse static creds")
.connect("connect.ngs.global")
.await?;
sourcepub fn credentials(self, creds: &str) -> Result<Self>
pub fn credentials(self, creds: &str) -> Result<Self>
Use a builder to specify a credentials string, to be used when authenticating against the NATS Server. The string should be in the credentials file format. This can be used as a way to mix authentication methods.
§Example
let creds = "-----BEGIN NATS USER JWT-----
eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
------END NATS USER JWT------
************************* IMPORTANT *************************
NKEY Seed printed below can be used sign and prove identity.
NKEYs are sensitive and should be treated as secrets.
-----BEGIN USER NKEY SEED-----
SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
------END USER NKEY SEED------
";
let nc = async_nats::ConnectOptions::new()
.credentials(creds)
.expect("failed to parse static creds")
.connect("connect.ngs.global")
.await?;
sourcepub fn add_root_certificates(self, path: PathBuf) -> ConnectOptions
pub fn add_root_certificates(self, path: PathBuf) -> ConnectOptions
Loads root certificates by providing the path to them.
§Examples
let nc = async_nats::ConnectOptions::new()
.add_root_certificates("mycerts.pem".into())
.connect("demo.nats.io")
.await?;
sourcepub fn add_client_certificate(
self,
cert: PathBuf,
key: PathBuf,
) -> ConnectOptions
pub fn add_client_certificate( self, cert: PathBuf, key: PathBuf, ) -> ConnectOptions
Loads client certificate by providing the path to it.
§Examples
let nc = async_nats::ConnectOptions::new()
.add_client_certificate("cert.pem".into(), "key.pem".into())
.connect("demo.nats.io")
.await?;
sourcepub fn require_tls(self, is_required: bool) -> ConnectOptions
pub fn require_tls(self, is_required: bool) -> ConnectOptions
Sets or disables TLS requirement. If TLS connection is impossible while options.require_tls(true)
connection will return error.
§Examples
let nc = async_nats::ConnectOptions::new()
.require_tls(true)
.connect("demo.nats.io")
.await?;
sourcepub fn tls_first(self) -> ConnectOptions
pub fn tls_first(self) -> ConnectOptions
Changes how tls connection is established. If tls_first
is set,
client will try to establish tls before getting info from the server.
That requires the server to enable handshake_first
option in the config.
sourcepub fn ping_interval(self, ping_interval: Duration) -> ConnectOptions
pub fn ping_interval(self, ping_interval: Duration) -> ConnectOptions
Sets how often Client sends PING message to the server.
§Examples
async_nats::ConnectOptions::new()
.ping_interval(Duration::from_secs(24))
.connect("demo.nats.io")
.await?;
sourcepub fn no_echo(self) -> ConnectOptions
pub fn no_echo(self) -> ConnectOptions
Sets no_echo
option which disables delivering messages that were published from the same
connection.
§Examples
async_nats::ConnectOptions::new()
.no_echo()
.connect("demo.nats.io")
.await?;
sourcepub fn subscription_capacity(self, capacity: usize) -> ConnectOptions
pub fn subscription_capacity(self, capacity: usize) -> ConnectOptions
Sets the capacity for Subscribers
. Exceeding it will trigger slow consumer
error
callback and drop messages.
Default is set to 65536 messages buffer.
§Examples
async_nats::ConnectOptions::new()
.subscription_capacity(1024)
.connect("demo.nats.io")
.await?;
sourcepub fn connection_timeout(self, timeout: Duration) -> ConnectOptions
pub fn connection_timeout(self, timeout: Duration) -> ConnectOptions
Sets a timeout for the underlying TcpStream connection to avoid hangs and deadlocks. Default is set to 5 seconds.
§Examples
async_nats::ConnectOptions::new()
.connection_timeout(tokio::time::Duration::from_secs(5))
.connect("demo.nats.io")
.await?;
sourcepub fn request_timeout(self, timeout: Option<Duration>) -> ConnectOptions
pub fn request_timeout(self, timeout: Option<Duration>) -> ConnectOptions
Sets a timeout for Client::request
. Default value is set to 10 seconds.
§Examples
async_nats::ConnectOptions::new()
.request_timeout(Some(std::time::Duration::from_secs(3)))
.connect("demo.nats.io")
.await?;
sourcepub fn event_callback<F, Fut>(self, cb: F) -> ConnectOptions
pub fn event_callback<F, Fut>(self, cb: F) -> ConnectOptions
Registers an asynchronous callback for errors that are received over the wire from the server.
§Examples
As asynchronous callbacks are still not in stable
channel, here are some examples how to
work around this
§Basic
If you don’t need to move anything into the closure, simple signature can be used:
async_nats::ConnectOptions::new()
.event_callback(|event| async move {
println!("event occurred: {}", event);
})
.connect("demo.nats.io")
.await?;
§Listening to specific event kind
async_nats::ConnectOptions::new()
.event_callback(|event| async move {
match event {
async_nats::Event::Disconnected => println!("disconnected"),
async_nats::Event::Connected => println!("reconnected"),
async_nats::Event::ClientError(err) => println!("client error occurred: {}", err),
other => println!("other event happened: {}", other),
}
})
.connect("demo.nats.io")
.await?;
§Advanced
If you need to move something into the closure, here’s an example how to do that
let (tx, mut _rx) = tokio::sync::mpsc::channel(1);
async_nats::ConnectOptions::new()
.event_callback(move |event| {
let tx = tx.clone();
async move {
tx.send(event).await.unwrap();
}
})
.connect("demo.nats.io")
.await?;
sourcepub fn reconnect_delay_callback<F>(self, cb: F) -> ConnectOptions
pub fn reconnect_delay_callback<F>(self, cb: F) -> ConnectOptions
Registers a callback for a custom reconnect delay handler that can be used to define a backoff duration strategy.
§Examples
async_nats::ConnectOptions::new()
.reconnect_delay_callback(|attempts| {
println!("no of attempts: {attempts}");
std::time::Duration::from_millis(std::cmp::min((attempts * 100) as u64, 8000))
})
.connect("demo.nats.io")
.await?;
sourcepub fn client_capacity(self, capacity: usize) -> ConnectOptions
pub fn client_capacity(self, capacity: usize) -> ConnectOptions
By default, Client dispatches op’s to the Client onto the channel with capacity of 128. This option enables overriding it.
§Examples
async_nats::ConnectOptions::new()
.client_capacity(256)
.connect("demo.nats.io")
.await?;
sourcepub fn custom_inbox_prefix<T: ToString>(self, prefix: T) -> ConnectOptions
pub fn custom_inbox_prefix<T: ToString>(self, prefix: T) -> ConnectOptions
Sets custom prefix instead of default _INBOX
.
§Examples
async_nats::ConnectOptions::new()
.custom_inbox_prefix("CUSTOM")
.connect("demo.nats.io")
.await?;
sourcepub fn name<T: ToString>(self, name: T) -> ConnectOptions
pub fn name<T: ToString>(self, name: T) -> ConnectOptions
Sets the name for the client.
§Examples
async_nats::ConnectOptions::new()
.name("rust-service")
.connect("demo.nats.io")
.await?;
sourcepub fn retry_on_initial_connect(self) -> ConnectOptions
pub fn retry_on_initial_connect(self) -> ConnectOptions
By default, ConnectOptions::connect
will return an error if
the connection to the server cannot be established.
Setting retry_on_initial_connect
makes the client
establish the connection in the background.
sourcepub fn max_reconnects<T: Into<Option<usize>>>(
self,
max_reconnects: T,
) -> ConnectOptions
pub fn max_reconnects<T: Into<Option<usize>>>( self, max_reconnects: T, ) -> ConnectOptions
Specifies the number of consecutive reconnect attempts the client will make before giving up. This is useful for preventing zombie services from endlessly reaching the servers, but it can also be a footgun and surprise for users who do not expect that the client can give up entirely.
Pass None
or 0
for no limit.
§Examples
async_nats::ConnectOptions::new()
.max_reconnects(None)
.connect("demo.nats.io")
.await?;
sourcepub fn ignore_discovered_servers(self) -> ConnectOptions
pub fn ignore_discovered_servers(self) -> ConnectOptions
By default, a server may advertise other servers in the cluster known to it. By setting this option, the client will ignore the advertised servers. This may be useful if the client may not be able to reach them.
sourcepub fn retain_servers_order(self) -> ConnectOptions
pub fn retain_servers_order(self) -> ConnectOptions
By default, client will pick random server to which it will try connect to. This option disables that feature, forcing it to always respect the order in which server addresses were passed.
sourcepub fn tls_client_config(self, config: ClientConfig) -> ConnectOptions
pub fn tls_client_config(self, config: ClientConfig) -> ConnectOptions
Allows passing custom rustls tls config.
§Examples
let mut root_store = async_nats::rustls::RootCertStore::empty();
root_store.add_parsable_certificates(rustls_native_certs::load_native_certs()?);
let tls_client = async_nats::rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();
let client = async_nats::ConnectOptions::new()
.require_tls(true)
.tls_client_config(tls_client)
.connect("tls://demo.nats.io")
.await?;
sourcepub fn read_buffer_capacity(self, size: u16) -> ConnectOptions
pub fn read_buffer_capacity(self, size: u16) -> ConnectOptions
Sets the initial capacity of the read buffer. Which is a buffer used to gather partial protocol messages.
§Examples
async_nats::ConnectOptions::new()
.read_buffer_capacity(65535)
.connect("demo.nats.io")
.await?;