tonic/transport/channel/service/
connection.rs

1use super::{AddOrigin, Reconnect, SharedExec, UserAgent};
2use crate::{
3    body::Body,
4    transport::{channel::BoxFuture, service::GrpcTimeout, Endpoint},
5};
6use http::{Request, Response, Uri};
7use hyper::rt;
8use hyper::{client::conn::http2::Builder, rt::Executor};
9use hyper_util::rt::TokioTimer;
10use std::{
11    fmt,
12    task::{Context, Poll},
13};
14use tower::load::Load;
15use tower::{
16    layer::Layer,
17    limit::{concurrency::ConcurrencyLimitLayer, rate::RateLimitLayer},
18    util::BoxService,
19    ServiceBuilder, ServiceExt,
20};
21use tower_service::Service;
22
23pub(crate) struct Connection {
24    inner: BoxService<Request<Body>, Response<Body>, crate::BoxError>,
25}
26
27impl Connection {
28    fn new<C>(connector: C, endpoint: Endpoint, is_lazy: bool) -> Self
29    where
30        C: Service<Uri> + Send + 'static,
31        C::Error: Into<crate::BoxError> + Send,
32        C::Future: Send,
33        C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
34    {
35        let mut settings: Builder<SharedExec> = Builder::new(endpoint.executor.clone())
36            .initial_stream_window_size(endpoint.init_stream_window_size)
37            .initial_connection_window_size(endpoint.init_connection_window_size)
38            .keep_alive_interval(endpoint.http2_keep_alive_interval)
39            .timer(TokioTimer::new())
40            .clone();
41
42        if let Some(val) = endpoint.http2_keep_alive_timeout {
43            settings.keep_alive_timeout(val);
44        }
45
46        if let Some(val) = endpoint.http2_keep_alive_while_idle {
47            settings.keep_alive_while_idle(val);
48        }
49
50        if let Some(val) = endpoint.http2_adaptive_window {
51            settings.adaptive_window(val);
52        }
53
54        if let Some(val) = endpoint.http2_max_header_list_size {
55            settings.max_header_list_size(val);
56        }
57
58        let stack = ServiceBuilder::new()
59            .layer_fn(|s| {
60                let origin = endpoint.origin.as_ref().unwrap_or(endpoint.uri()).clone();
61
62                AddOrigin::new(s, origin)
63            })
64            .layer_fn(|s| UserAgent::new(s, endpoint.user_agent.clone()))
65            .layer_fn(|s| GrpcTimeout::new(s, endpoint.timeout))
66            .option_layer(endpoint.concurrency_limit.map(ConcurrencyLimitLayer::new))
67            .option_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d)))
68            .into_inner();
69
70        let make_service =
71            MakeSendRequestService::new(connector, endpoint.executor.clone(), settings);
72
73        let conn = Reconnect::new(make_service, endpoint.uri().clone(), is_lazy);
74
75        Self {
76            inner: BoxService::new(stack.layer(conn)),
77        }
78    }
79
80    pub(crate) async fn connect<C>(
81        connector: C,
82        endpoint: Endpoint,
83    ) -> Result<Self, crate::BoxError>
84    where
85        C: Service<Uri> + Send + 'static,
86        C::Error: Into<crate::BoxError> + Send,
87        C::Future: Unpin + Send,
88        C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
89    {
90        Self::new(connector, endpoint, false).ready_oneshot().await
91    }
92
93    pub(crate) fn lazy<C>(connector: C, endpoint: Endpoint) -> Self
94    where
95        C: Service<Uri> + Send + 'static,
96        C::Error: Into<crate::BoxError> + Send,
97        C::Future: Send,
98        C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
99    {
100        Self::new(connector, endpoint, true)
101    }
102}
103
104impl Service<Request<Body>> for Connection {
105    type Response = Response<Body>;
106    type Error = crate::BoxError;
107    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
108
109    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
110        Service::poll_ready(&mut self.inner, cx).map_err(Into::into)
111    }
112
113    fn call(&mut self, req: Request<Body>) -> Self::Future {
114        self.inner.call(req)
115    }
116}
117
118impl Load for Connection {
119    type Metric = usize;
120
121    fn load(&self) -> Self::Metric {
122        0
123    }
124}
125
126impl fmt::Debug for Connection {
127    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128        f.debug_struct("Connection").finish()
129    }
130}
131
132struct SendRequest {
133    inner: hyper::client::conn::http2::SendRequest<Body>,
134}
135
136impl From<hyper::client::conn::http2::SendRequest<Body>> for SendRequest {
137    fn from(inner: hyper::client::conn::http2::SendRequest<Body>) -> Self {
138        Self { inner }
139    }
140}
141
142impl tower::Service<Request<Body>> for SendRequest {
143    type Response = Response<Body>;
144    type Error = crate::BoxError;
145    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
146
147    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
148        self.inner.poll_ready(cx).map_err(Into::into)
149    }
150
151    fn call(&mut self, req: Request<Body>) -> Self::Future {
152        let fut = self.inner.send_request(req);
153
154        Box::pin(async move { fut.await.map_err(Into::into).map(|res| res.map(Body::new)) })
155    }
156}
157
158struct MakeSendRequestService<C> {
159    connector: C,
160    executor: SharedExec,
161    settings: Builder<SharedExec>,
162}
163
164impl<C> MakeSendRequestService<C> {
165    fn new(connector: C, executor: SharedExec, settings: Builder<SharedExec>) -> Self {
166        Self {
167            connector,
168            executor,
169            settings,
170        }
171    }
172}
173
174impl<C> tower::Service<Uri> for MakeSendRequestService<C>
175where
176    C: Service<Uri> + Send + 'static,
177    C::Error: Into<crate::BoxError> + Send,
178    C::Future: Send,
179    C::Response: rt::Read + rt::Write + Unpin + Send,
180{
181    type Response = SendRequest;
182    type Error = crate::BoxError;
183    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
184
185    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
186        self.connector.poll_ready(cx).map_err(Into::into)
187    }
188
189    fn call(&mut self, req: Uri) -> Self::Future {
190        let fut = self.connector.call(req);
191        let builder = self.settings.clone();
192        let executor = self.executor.clone();
193
194        Box::pin(async move {
195            let io = fut.await.map_err(Into::into)?;
196            let (send_request, conn) = builder.handshake(io).await?;
197
198            Executor::<BoxFuture<'static, ()>>::execute(
199                &executor,
200                Box::pin(async move {
201                    if let Err(e) = conn.await {
202                        tracing::debug!("connection task error: {:?}", e);
203                    }
204                }) as _,
205            );
206
207            Ok(SendRequest::from(send_request))
208        })
209    }
210}