tonic/transport/channel/service/
reconnect.rs

1use pin_project::pin_project;
2use std::fmt;
3use std::{
4    future::Future,
5    pin::Pin,
6    task::{Context, Poll},
7};
8use tower::make::MakeService;
9use tower_service::Service;
10use tracing::trace;
11
12pub(crate) struct Reconnect<M, Target>
13where
14    M: Service<Target>,
15    M::Error: Into<crate::BoxError>,
16{
17    mk_service: M,
18    state: State<M::Future, M::Response>,
19    target: Target,
20    error: Option<crate::BoxError>,
21    has_been_connected: bool,
22    is_lazy: bool,
23}
24
25#[derive(Debug)]
26enum State<F, S> {
27    Idle,
28    Connecting(F),
29    Connected(S),
30}
31
32impl<M, Target> Reconnect<M, Target>
33where
34    M: Service<Target>,
35    M::Error: Into<crate::BoxError>,
36{
37    pub(crate) fn new(mk_service: M, target: Target, is_lazy: bool) -> Self {
38        Reconnect {
39            mk_service,
40            state: State::Idle,
41            target,
42            error: None,
43            has_been_connected: false,
44            is_lazy,
45        }
46    }
47}
48
49impl<M, Target, S, Request> Service<Request> for Reconnect<M, Target>
50where
51    M: Service<Target, Response = S>,
52    S: Service<Request>,
53    M::Future: Unpin,
54    crate::BoxError: From<M::Error> + From<S::Error>,
55    Target: Clone,
56    <M as tower_service::Service<Target>>::Error: Into<crate::BoxError>,
57{
58    type Response = S::Response;
59    type Error = crate::BoxError;
60    type Future = ResponseFuture<S::Future>;
61
62    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
63        let mut state;
64
65        if self.error.is_some() {
66            return Poll::Ready(Ok(()));
67        }
68
69        loop {
70            match self.state {
71                State::Idle => {
72                    trace!("poll_ready; idle");
73                    match self.mk_service.poll_ready(cx) {
74                        Poll::Ready(r) => r?,
75                        Poll::Pending => {
76                            trace!("poll_ready; MakeService not ready");
77                            return Poll::Pending;
78                        }
79                    }
80
81                    let fut = self.mk_service.make_service(self.target.clone());
82                    self.state = State::Connecting(fut);
83                    continue;
84                }
85                State::Connecting(ref mut f) => {
86                    trace!("poll_ready; connecting");
87                    match Pin::new(f).poll(cx) {
88                        Poll::Ready(Ok(service)) => {
89                            state = State::Connected(service);
90                        }
91                        Poll::Pending => {
92                            trace!("poll_ready; not ready");
93                            return Poll::Pending;
94                        }
95                        Poll::Ready(Err(e)) => {
96                            trace!("poll_ready; error");
97
98                            state = State::Idle;
99
100                            if !(self.has_been_connected || self.is_lazy) {
101                                return Poll::Ready(Err(e.into()));
102                            } else {
103                                let error = e.into();
104                                tracing::debug!("reconnect::poll_ready: {:?}", error);
105                                self.error = Some(error);
106                                break;
107                            }
108                        }
109                    }
110                }
111                State::Connected(ref mut inner) => {
112                    trace!("poll_ready; connected");
113
114                    self.has_been_connected = true;
115
116                    match inner.poll_ready(cx) {
117                        Poll::Ready(Ok(())) => {
118                            trace!("poll_ready; ready");
119                            return Poll::Ready(Ok(()));
120                        }
121                        Poll::Pending => {
122                            trace!("poll_ready; not ready");
123                            return Poll::Pending;
124                        }
125                        Poll::Ready(Err(_)) => {
126                            trace!("poll_ready; error");
127                            state = State::Idle;
128                        }
129                    }
130                }
131            }
132
133            self.state = state;
134        }
135
136        self.state = state;
137        Poll::Ready(Ok(()))
138    }
139
140    fn call(&mut self, request: Request) -> Self::Future {
141        tracing::trace!("Reconnect::call");
142        if let Some(error) = self.error.take() {
143            tracing::debug!("error: {}", error);
144            return ResponseFuture::error(error);
145        }
146
147        let State::Connected(service) = &mut self.state else {
148            panic!("service not ready; poll_ready must be called first");
149        };
150
151        let fut = service.call(request);
152        ResponseFuture::new(fut)
153    }
154}
155
156impl<M, Target> fmt::Debug for Reconnect<M, Target>
157where
158    M: Service<Target> + fmt::Debug,
159    M::Future: fmt::Debug,
160    M::Response: fmt::Debug,
161    Target: fmt::Debug,
162    <M as tower_service::Service<Target>>::Error: Into<crate::BoxError>,
163{
164    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
165        fmt.debug_struct("Reconnect")
166            .field("mk_service", &self.mk_service)
167            .field("state", &self.state)
168            .field("target", &self.target)
169            .finish()
170    }
171}
172
173/// Future that resolves to the response or failure to connect.
174#[pin_project]
175#[derive(Debug)]
176pub(crate) struct ResponseFuture<F> {
177    #[pin]
178    inner: Inner<F>,
179}
180
181#[pin_project(project = InnerProj)]
182#[derive(Debug)]
183enum Inner<F> {
184    Future(#[pin] F),
185    Error(Option<crate::BoxError>),
186}
187
188impl<F> ResponseFuture<F> {
189    pub(crate) fn new(inner: F) -> Self {
190        ResponseFuture {
191            inner: Inner::Future(inner),
192        }
193    }
194
195    pub(crate) fn error(error: crate::BoxError) -> Self {
196        ResponseFuture {
197            inner: Inner::Error(Some(error)),
198        }
199    }
200}
201
202impl<F, T, E> Future for ResponseFuture<F>
203where
204    F: Future<Output = Result<T, E>>,
205    E: Into<crate::BoxError>,
206{
207    type Output = Result<T, crate::BoxError>;
208
209    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
210        //self.project().inner.poll(cx).map_err(Into::into)
211        let me = self.project();
212        match me.inner.project() {
213            InnerProj::Future(fut) => fut.poll(cx).map_err(Into::into),
214            InnerProj::Error(e) => {
215                let e = e.take().expect("Polled after ready.");
216                Poll::Ready(Err(e))
217            }
218        }
219    }
220}