tonic/transport/channel/service/
connection.rs1use super::{AddOrigin, Reconnect, SharedExec, UserAgent};
2use crate::{
3 body::Body,
4 transport::{channel::BoxFuture, service::GrpcTimeout, Endpoint},
5};
6use http::{Request, Response, Uri};
7use hyper::rt;
8use hyper::{client::conn::http2::Builder, rt::Executor};
9use hyper_util::rt::TokioTimer;
10use std::{
11 fmt,
12 task::{Context, Poll},
13};
14use tower::load::Load;
15use tower::{
16 layer::Layer,
17 limit::{concurrency::ConcurrencyLimitLayer, rate::RateLimitLayer},
18 util::BoxService,
19 ServiceBuilder, ServiceExt,
20};
21use tower_service::Service;
22
23pub(crate) struct Connection {
24 inner: BoxService<Request<Body>, Response<Body>, crate::BoxError>,
25}
26
27impl Connection {
28 fn new<C>(connector: C, endpoint: Endpoint, is_lazy: bool) -> Self
29 where
30 C: Service<Uri> + Send + 'static,
31 C::Error: Into<crate::BoxError> + Send,
32 C::Future: Send,
33 C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
34 {
35 let mut settings: Builder<SharedExec> = Builder::new(endpoint.executor.clone())
36 .initial_stream_window_size(endpoint.init_stream_window_size)
37 .initial_connection_window_size(endpoint.init_connection_window_size)
38 .keep_alive_interval(endpoint.http2_keep_alive_interval)
39 .timer(TokioTimer::new())
40 .clone();
41
42 if let Some(val) = endpoint.http2_keep_alive_timeout {
43 settings.keep_alive_timeout(val);
44 }
45
46 if let Some(val) = endpoint.http2_keep_alive_while_idle {
47 settings.keep_alive_while_idle(val);
48 }
49
50 if let Some(val) = endpoint.http2_adaptive_window {
51 settings.adaptive_window(val);
52 }
53
54 if let Some(val) = endpoint.http2_max_header_list_size {
55 settings.max_header_list_size(val);
56 }
57
58 let stack = ServiceBuilder::new()
59 .layer_fn(|s| {
60 let origin = endpoint.origin.as_ref().unwrap_or(endpoint.uri()).clone();
61
62 AddOrigin::new(s, origin)
63 })
64 .layer_fn(|s| UserAgent::new(s, endpoint.user_agent.clone()))
65 .layer_fn(|s| GrpcTimeout::new(s, endpoint.timeout))
66 .option_layer(endpoint.concurrency_limit.map(ConcurrencyLimitLayer::new))
67 .option_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d)))
68 .into_inner();
69
70 let make_service =
71 MakeSendRequestService::new(connector, endpoint.executor.clone(), settings);
72
73 let conn = Reconnect::new(make_service, endpoint.uri().clone(), is_lazy);
74
75 Self {
76 inner: BoxService::new(stack.layer(conn)),
77 }
78 }
79
80 pub(crate) async fn connect<C>(
81 connector: C,
82 endpoint: Endpoint,
83 ) -> Result<Self, crate::BoxError>
84 where
85 C: Service<Uri> + Send + 'static,
86 C::Error: Into<crate::BoxError> + Send,
87 C::Future: Unpin + Send,
88 C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
89 {
90 Self::new(connector, endpoint, false).ready_oneshot().await
91 }
92
93 pub(crate) fn lazy<C>(connector: C, endpoint: Endpoint) -> Self
94 where
95 C: Service<Uri> + Send + 'static,
96 C::Error: Into<crate::BoxError> + Send,
97 C::Future: Send,
98 C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
99 {
100 Self::new(connector, endpoint, true)
101 }
102}
103
104impl Service<Request<Body>> for Connection {
105 type Response = Response<Body>;
106 type Error = crate::BoxError;
107 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
108
109 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
110 Service::poll_ready(&mut self.inner, cx).map_err(Into::into)
111 }
112
113 fn call(&mut self, req: Request<Body>) -> Self::Future {
114 self.inner.call(req)
115 }
116}
117
118impl Load for Connection {
119 type Metric = usize;
120
121 fn load(&self) -> Self::Metric {
122 0
123 }
124}
125
126impl fmt::Debug for Connection {
127 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128 f.debug_struct("Connection").finish()
129 }
130}
131
132struct SendRequest {
133 inner: hyper::client::conn::http2::SendRequest<Body>,
134}
135
136impl From<hyper::client::conn::http2::SendRequest<Body>> for SendRequest {
137 fn from(inner: hyper::client::conn::http2::SendRequest<Body>) -> Self {
138 Self { inner }
139 }
140}
141
142impl tower::Service<Request<Body>> for SendRequest {
143 type Response = Response<Body>;
144 type Error = crate::BoxError;
145 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
146
147 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
148 self.inner.poll_ready(cx).map_err(Into::into)
149 }
150
151 fn call(&mut self, req: Request<Body>) -> Self::Future {
152 let fut = self.inner.send_request(req);
153
154 Box::pin(async move { fut.await.map_err(Into::into).map(|res| res.map(Body::new)) })
155 }
156}
157
158struct MakeSendRequestService<C> {
159 connector: C,
160 executor: SharedExec,
161 settings: Builder<SharedExec>,
162}
163
164impl<C> MakeSendRequestService<C> {
165 fn new(connector: C, executor: SharedExec, settings: Builder<SharedExec>) -> Self {
166 Self {
167 connector,
168 executor,
169 settings,
170 }
171 }
172}
173
174impl<C> tower::Service<Uri> for MakeSendRequestService<C>
175where
176 C: Service<Uri> + Send + 'static,
177 C::Error: Into<crate::BoxError> + Send,
178 C::Future: Send,
179 C::Response: rt::Read + rt::Write + Unpin + Send,
180{
181 type Response = SendRequest;
182 type Error = crate::BoxError;
183 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
184
185 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
186 self.connector.poll_ready(cx).map_err(Into::into)
187 }
188
189 fn call(&mut self, req: Uri) -> Self::Future {
190 let fut = self.connector.call(req);
191 let builder = self.settings.clone();
192 let executor = self.executor.clone();
193
194 Box::pin(async move {
195 let io = fut.await.map_err(Into::into)?;
196 let (send_request, conn) = builder.handshake(io).await?;
197
198 Executor::<BoxFuture<'static, ()>>::execute(
199 &executor,
200 Box::pin(async move {
201 if let Err(e) = conn.await {
202 tracing::debug!("connection task error: {:?}", e);
203 }
204 }) as _,
205 );
206
207 Ok(SendRequest::from(send_request))
208 })
209 }
210}