use crate::cmd::Cmd;
use crate::connection::{
check_connection_setup, connection_setup_pipeline, AuthResult, ConnectionSetupComponents,
RedisConnectionInfo,
};
use crate::types::{RedisFuture, RedisResult, Value};
use crate::PushInfo;
use ::tokio::io::{AsyncRead, AsyncWrite};
use futures_util::Future;
use std::net::SocketAddr;
#[cfg(unix)]
use std::path::Path;
use std::pin::Pin;
#[cfg(feature = "async-std-comp")]
#[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
pub mod async_std;
#[cfg(any(feature = "tls-rustls", feature = "tls-native-tls"))]
use crate::connection::TlsConnParams;
#[cfg(feature = "tokio-comp")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
pub mod tokio;
mod pubsub;
pub use pubsub::{PubSub, PubSubSink, PubSubStream};
pub(crate) trait RedisRuntime: AsyncStream + Send + Sync + Sized + 'static {
async fn connect_tcp(
socket_addr: SocketAddr,
tcp_settings: &crate::io::tcp::TcpSettings,
) -> RedisResult<Self>;
#[cfg(any(feature = "tls-native-tls", feature = "tls-rustls"))]
async fn connect_tcp_tls(
hostname: &str,
socket_addr: SocketAddr,
insecure: bool,
tls_params: &Option<TlsConnParams>,
tcp_settings: &crate::io::tcp::TcpSettings,
) -> RedisResult<Self>;
#[cfg(unix)]
async fn connect_unix(path: &Path) -> RedisResult<Self>;
fn spawn(f: impl Future<Output = ()> + Send + 'static) -> TaskHandle;
fn boxed(self) -> Pin<Box<dyn AsyncStream + Send + Sync>> {
Box::pin(self)
}
}
pub trait AsyncStream: AsyncRead + AsyncWrite {}
impl<S> AsyncStream for S where S: AsyncRead + AsyncWrite {}
pub trait ConnectionLike {
fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value>;
#[doc(hidden)]
fn req_packed_commands<'a>(
&'a mut self,
cmd: &'a crate::Pipeline,
offset: usize,
count: usize,
) -> RedisFuture<'a, Vec<Value>>;
fn get_db(&self) -> i64;
}
async fn execute_connection_pipeline(
rv: &mut impl ConnectionLike,
(pipeline, instructions): (crate::Pipeline, ConnectionSetupComponents),
) -> RedisResult<AuthResult> {
if pipeline.is_empty() {
return Ok(AuthResult::Succeeded);
}
let results = rv.req_packed_commands(&pipeline, 0, pipeline.len()).await?;
check_connection_setup(results, instructions)
}
async fn setup_connection(
connection_info: &RedisConnectionInfo,
con: &mut impl ConnectionLike,
#[cfg(feature = "cache-aio")] cache_config: Option<crate::caching::CacheConfig>,
) -> RedisResult<()> {
if execute_connection_pipeline(
con,
connection_setup_pipeline(
connection_info,
true,
#[cfg(feature = "cache-aio")]
cache_config,
),
)
.await?
== AuthResult::ShouldRetryWithoutUsername
{
execute_connection_pipeline(
con,
connection_setup_pipeline(
connection_info,
false,
#[cfg(feature = "cache-aio")]
cache_config,
),
)
.await?;
}
Ok(())
}
mod connection;
pub use connection::*;
mod multiplexed_connection;
pub use multiplexed_connection::*;
#[cfg(feature = "connection-manager")]
mod connection_manager;
#[cfg(feature = "connection-manager")]
#[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
pub use connection_manager::*;
mod runtime;
pub(super) use runtime::*;
macro_rules! check_resp3 {
($protocol: expr) => {
use crate::types::ProtocolVersion;
if $protocol == ProtocolVersion::RESP2 {
return Err(RedisError::from((
crate::ErrorKind::InvalidClientConfig,
"RESP3 is required for this command",
)));
}
};
($protocol: expr, $message: expr) => {
use crate::types::ProtocolVersion;
if $protocol == ProtocolVersion::RESP2 {
return Err(RedisError::from((
crate::ErrorKind::InvalidClientConfig,
$message,
)));
}
};
}
pub(crate) use check_resp3;
pub struct SendError;
pub trait AsyncPushSender: Send + Sync + 'static {
fn send(&self, info: PushInfo) -> Result<(), SendError>;
}
impl AsyncPushSender for ::tokio::sync::mpsc::UnboundedSender<PushInfo> {
fn send(&self, info: PushInfo) -> Result<(), SendError> {
match self.send(info) {
Ok(_) => Ok(()),
Err(_) => Err(SendError),
}
}
}
impl AsyncPushSender for ::tokio::sync::broadcast::Sender<PushInfo> {
fn send(&self, info: PushInfo) -> Result<(), SendError> {
match self.send(info) {
Ok(_) => Ok(()),
Err(_) => Err(SendError),
}
}
}
impl<T, Func: Fn(PushInfo) -> Result<(), T> + Send + Sync + 'static> AsyncPushSender for Func {
fn send(&self, info: PushInfo) -> Result<(), SendError> {
match self(info) {
Ok(_) => Ok(()),
Err(_) => Err(SendError),
}
}
}
impl AsyncPushSender for std::sync::mpsc::Sender<PushInfo> {
fn send(&self, info: PushInfo) -> Result<(), SendError> {
match self.send(info) {
Ok(_) => Ok(()),
Err(_) => Err(SendError),
}
}
}
impl<T> AsyncPushSender for std::sync::Arc<T>
where
T: AsyncPushSender,
{
fn send(&self, info: PushInfo) -> Result<(), SendError> {
self.as_ref().send(info)
}
}