tonic/transport/channel/service/
reconnect.rs1use pin_project::pin_project;
2use std::fmt;
3use std::{
4 future::Future,
5 pin::Pin,
6 task::{Context, Poll},
7};
8use tower::make::MakeService;
9use tower_service::Service;
10use tracing::trace;
11
12pub(crate) struct Reconnect<M, Target>
13where
14 M: Service<Target>,
15 M::Error: Into<crate::BoxError>,
16{
17 mk_service: M,
18 state: State<M::Future, M::Response>,
19 target: Target,
20 error: Option<crate::BoxError>,
21 has_been_connected: bool,
22 is_lazy: bool,
23}
24
25#[derive(Debug)]
26enum State<F, S> {
27 Idle,
28 Connecting(F),
29 Connected(S),
30}
31
32impl<M, Target> Reconnect<M, Target>
33where
34 M: Service<Target>,
35 M::Error: Into<crate::BoxError>,
36{
37 pub(crate) fn new(mk_service: M, target: Target, is_lazy: bool) -> Self {
38 Reconnect {
39 mk_service,
40 state: State::Idle,
41 target,
42 error: None,
43 has_been_connected: false,
44 is_lazy,
45 }
46 }
47}
48
49impl<M, Target, S, Request> Service<Request> for Reconnect<M, Target>
50where
51 M: Service<Target, Response = S>,
52 S: Service<Request>,
53 M::Future: Unpin,
54 crate::BoxError: From<M::Error> + From<S::Error>,
55 Target: Clone,
56 <M as tower_service::Service<Target>>::Error: Into<crate::BoxError>,
57{
58 type Response = S::Response;
59 type Error = crate::BoxError;
60 type Future = ResponseFuture<S::Future>;
61
62 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
63 let mut state;
64
65 if self.error.is_some() {
66 return Poll::Ready(Ok(()));
67 }
68
69 loop {
70 match self.state {
71 State::Idle => {
72 trace!("poll_ready; idle");
73 match self.mk_service.poll_ready(cx) {
74 Poll::Ready(r) => r?,
75 Poll::Pending => {
76 trace!("poll_ready; MakeService not ready");
77 return Poll::Pending;
78 }
79 }
80
81 let fut = self.mk_service.make_service(self.target.clone());
82 self.state = State::Connecting(fut);
83 continue;
84 }
85 State::Connecting(ref mut f) => {
86 trace!("poll_ready; connecting");
87 match Pin::new(f).poll(cx) {
88 Poll::Ready(Ok(service)) => {
89 state = State::Connected(service);
90 }
91 Poll::Pending => {
92 trace!("poll_ready; not ready");
93 return Poll::Pending;
94 }
95 Poll::Ready(Err(e)) => {
96 trace!("poll_ready; error");
97
98 state = State::Idle;
99
100 if !(self.has_been_connected || self.is_lazy) {
101 return Poll::Ready(Err(e.into()));
102 } else {
103 let error = e.into();
104 tracing::debug!("reconnect::poll_ready: {:?}", error);
105 self.error = Some(error);
106 break;
107 }
108 }
109 }
110 }
111 State::Connected(ref mut inner) => {
112 trace!("poll_ready; connected");
113
114 self.has_been_connected = true;
115
116 match inner.poll_ready(cx) {
117 Poll::Ready(Ok(())) => {
118 trace!("poll_ready; ready");
119 return Poll::Ready(Ok(()));
120 }
121 Poll::Pending => {
122 trace!("poll_ready; not ready");
123 return Poll::Pending;
124 }
125 Poll::Ready(Err(_)) => {
126 trace!("poll_ready; error");
127 state = State::Idle;
128 }
129 }
130 }
131 }
132
133 self.state = state;
134 }
135
136 self.state = state;
137 Poll::Ready(Ok(()))
138 }
139
140 fn call(&mut self, request: Request) -> Self::Future {
141 tracing::trace!("Reconnect::call");
142 if let Some(error) = self.error.take() {
143 tracing::debug!("error: {}", error);
144 return ResponseFuture::error(error);
145 }
146
147 let State::Connected(service) = &mut self.state else {
148 panic!("service not ready; poll_ready must be called first");
149 };
150
151 let fut = service.call(request);
152 ResponseFuture::new(fut)
153 }
154}
155
156impl<M, Target> fmt::Debug for Reconnect<M, Target>
157where
158 M: Service<Target> + fmt::Debug,
159 M::Future: fmt::Debug,
160 M::Response: fmt::Debug,
161 Target: fmt::Debug,
162 <M as tower_service::Service<Target>>::Error: Into<crate::BoxError>,
163{
164 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
165 fmt.debug_struct("Reconnect")
166 .field("mk_service", &self.mk_service)
167 .field("state", &self.state)
168 .field("target", &self.target)
169 .finish()
170 }
171}
172
173#[pin_project]
175#[derive(Debug)]
176pub(crate) struct ResponseFuture<F> {
177 #[pin]
178 inner: Inner<F>,
179}
180
181#[pin_project(project = InnerProj)]
182#[derive(Debug)]
183enum Inner<F> {
184 Future(#[pin] F),
185 Error(Option<crate::BoxError>),
186}
187
188impl<F> ResponseFuture<F> {
189 pub(crate) fn new(inner: F) -> Self {
190 ResponseFuture {
191 inner: Inner::Future(inner),
192 }
193 }
194
195 pub(crate) fn error(error: crate::BoxError) -> Self {
196 ResponseFuture {
197 inner: Inner::Error(Some(error)),
198 }
199 }
200}
201
202impl<F, T, E> Future for ResponseFuture<F>
203where
204 F: Future<Output = Result<T, E>>,
205 E: Into<crate::BoxError>,
206{
207 type Output = Result<T, crate::BoxError>;
208
209 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
210 let me = self.project();
212 match me.inner.project() {
213 InnerProj::Future(fut) => fut.poll(cx).map_err(Into::into),
214 InnerProj::Error(e) => {
215 let e = e.take().expect("Polled after ready.");
216 Poll::Ready(Err(e))
217 }
218 }
219 }
220}