hyper/proto/h2/
client.rs

1use std::{
2    convert::Infallible,
3    future::Future,
4    marker::PhantomData,
5    pin::Pin,
6    task::{Context, Poll},
7    time::Duration,
8};
9
10use crate::rt::{Read, Write};
11use bytes::Bytes;
12use futures_channel::mpsc::{Receiver, Sender};
13use futures_channel::{mpsc, oneshot};
14use futures_core::{ready, FusedFuture, FusedStream, Stream};
15use h2::client::{Builder, Connection, SendRequest};
16use h2::SendStream;
17use http::{Method, StatusCode};
18use pin_project_lite::pin_project;
19
20use super::ping::{Ponger, Recorder};
21use super::{ping, PipeToSendStream, SendBuf};
22use crate::body::{Body, Incoming as IncomingBody};
23use crate::client::dispatch::{Callback, SendWhen, TrySendError};
24use crate::common::either::Either;
25use crate::common::io::Compat;
26use crate::common::time::Time;
27use crate::ext::Protocol;
28use crate::headers;
29use crate::proto::Dispatched;
30use crate::rt::bounds::{Http2ClientConnExec, Http2UpgradedExec};
31use crate::upgrade::Upgraded;
32use crate::{Request, Response};
33use h2::client::ResponseFuture;
34
35type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>;
36
37///// An mpsc channel is used to help notify the `Connection` task when *all*
38///// other handles to it have been dropped, so that it can shutdown.
39type ConnDropRef = mpsc::Sender<Infallible>;
40
41///// A oneshot channel watches the `Connection` task, and when it completes,
42///// the "dispatch" task will be notified and can shutdown sooner.
43type ConnEof = oneshot::Receiver<Infallible>;
44
45// Our defaults are chosen for the "majority" case, which usually are not
46// resource constrained, and so the spec default of 64kb can be too limiting
47// for performance.
48const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
49const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
50const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
51const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb
52const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; // 16kb
53
54// The maximum number of concurrent streams that the client is allowed to open
55// before it receives the initial SETTINGS frame from the server.
56// This default value is derived from what the HTTP/2 spec recommends as the
57// minimum value that endpoints advertise to their peers. It means that using
58// this value will minimize the chance of the failure where the local endpoint
59// attempts to open too many streams and gets rejected by the remote peer with
60// the `REFUSED_STREAM` error.
61const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100;
62
63#[derive(Clone, Debug)]
64pub(crate) struct Config {
65    pub(crate) adaptive_window: bool,
66    pub(crate) initial_conn_window_size: u32,
67    pub(crate) initial_stream_window_size: u32,
68    pub(crate) initial_max_send_streams: usize,
69    pub(crate) max_frame_size: Option<u32>,
70    pub(crate) max_header_list_size: u32,
71    pub(crate) keep_alive_interval: Option<Duration>,
72    pub(crate) keep_alive_timeout: Duration,
73    pub(crate) keep_alive_while_idle: bool,
74    pub(crate) max_concurrent_reset_streams: Option<usize>,
75    pub(crate) max_send_buffer_size: usize,
76    pub(crate) max_pending_accept_reset_streams: Option<usize>,
77    pub(crate) header_table_size: Option<u32>,
78    pub(crate) max_concurrent_streams: Option<u32>,
79}
80
81impl Default for Config {
82    fn default() -> Config {
83        Config {
84            adaptive_window: false,
85            initial_conn_window_size: DEFAULT_CONN_WINDOW,
86            initial_stream_window_size: DEFAULT_STREAM_WINDOW,
87            initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS,
88            max_frame_size: Some(DEFAULT_MAX_FRAME_SIZE),
89            max_header_list_size: DEFAULT_MAX_HEADER_LIST_SIZE,
90            keep_alive_interval: None,
91            keep_alive_timeout: Duration::from_secs(20),
92            keep_alive_while_idle: false,
93            max_concurrent_reset_streams: None,
94            max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
95            max_pending_accept_reset_streams: None,
96            header_table_size: None,
97            max_concurrent_streams: None,
98        }
99    }
100}
101
102fn new_builder(config: &Config) -> Builder {
103    let mut builder = Builder::default();
104    builder
105        .initial_max_send_streams(config.initial_max_send_streams)
106        .initial_window_size(config.initial_stream_window_size)
107        .initial_connection_window_size(config.initial_conn_window_size)
108        .max_header_list_size(config.max_header_list_size)
109        .max_send_buffer_size(config.max_send_buffer_size)
110        .enable_push(false);
111    if let Some(max) = config.max_frame_size {
112        builder.max_frame_size(max);
113    }
114    if let Some(max) = config.max_concurrent_reset_streams {
115        builder.max_concurrent_reset_streams(max);
116    }
117    if let Some(max) = config.max_pending_accept_reset_streams {
118        builder.max_pending_accept_reset_streams(max);
119    }
120    if let Some(size) = config.header_table_size {
121        builder.header_table_size(size);
122    }
123    if let Some(max) = config.max_concurrent_streams {
124        builder.max_concurrent_streams(max);
125    }
126    builder
127}
128
129fn new_ping_config(config: &Config) -> ping::Config {
130    ping::Config {
131        bdp_initial_window: if config.adaptive_window {
132            Some(config.initial_stream_window_size)
133        } else {
134            None
135        },
136        keep_alive_interval: config.keep_alive_interval,
137        keep_alive_timeout: config.keep_alive_timeout,
138        keep_alive_while_idle: config.keep_alive_while_idle,
139    }
140}
141
142pub(crate) async fn handshake<T, B, E>(
143    io: T,
144    req_rx: ClientRx<B>,
145    config: &Config,
146    mut exec: E,
147    timer: Time,
148) -> crate::Result<ClientTask<B, E, T>>
149where
150    T: Read + Write + Unpin,
151    B: Body + 'static,
152    B::Data: Send + 'static,
153    E: Http2ClientConnExec<B, T> + Clone + Unpin,
154    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
155{
156    let (h2_tx, mut conn) = new_builder(config)
157        .handshake::<_, SendBuf<B::Data>>(Compat::new(io))
158        .await
159        .map_err(crate::Error::new_h2)?;
160
161    // An mpsc channel is used entirely to detect when the
162    // 'Client' has been dropped. This is to get around a bug
163    // in h2 where dropping all SendRequests won't notify a
164    // parked Connection.
165    let (conn_drop_ref, conn_drop_rx) = mpsc::channel(1);
166    let (cancel_tx, conn_eof) = oneshot::channel();
167
168    let ping_config = new_ping_config(config);
169
170    let (conn, ping) = if ping_config.is_enabled() {
171        let pp = conn.ping_pong().expect("conn.ping_pong");
172        let (recorder, ponger) = ping::channel(pp, ping_config, timer);
173
174        let conn: Conn<_, B> = Conn::new(ponger, conn);
175        (Either::left(conn), recorder)
176    } else {
177        (Either::right(conn), ping::disabled())
178    };
179    let conn: ConnMapErr<T, B> = ConnMapErr {
180        conn,
181        is_terminated: false,
182    };
183
184    exec.execute_h2_future(H2ClientFuture::Task {
185        task: ConnTask::new(conn, conn_drop_rx, cancel_tx),
186    });
187
188    Ok(ClientTask {
189        ping,
190        conn_drop_ref,
191        conn_eof,
192        executor: exec,
193        h2_tx,
194        req_rx,
195        fut_ctx: None,
196        marker: PhantomData,
197    })
198}
199
200pin_project! {
201    struct Conn<T, B>
202    where
203        B: Body,
204    {
205        #[pin]
206        ponger: Ponger,
207        #[pin]
208        conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>,
209    }
210}
211
212impl<T, B> Conn<T, B>
213where
214    B: Body,
215    T: Read + Write + Unpin,
216{
217    fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self {
218        Conn { ponger, conn }
219    }
220}
221
222impl<T, B> Future for Conn<T, B>
223where
224    B: Body,
225    T: Read + Write + Unpin,
226{
227    type Output = Result<(), h2::Error>;
228
229    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
230        let mut this = self.project();
231        match this.ponger.poll(cx) {
232            Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
233                this.conn.set_target_window_size(wnd);
234                this.conn.set_initial_window_size(wnd)?;
235            }
236            Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
237                debug!("connection keep-alive timed out");
238                return Poll::Ready(Ok(()));
239            }
240            Poll::Pending => {}
241        }
242
243        Pin::new(&mut this.conn).poll(cx)
244    }
245}
246
247pin_project! {
248    struct ConnMapErr<T, B>
249    where
250        B: Body,
251        T: Read,
252        T: Write,
253        T: Unpin,
254    {
255        #[pin]
256        conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>,
257        #[pin]
258        is_terminated: bool,
259    }
260}
261
262impl<T, B> Future for ConnMapErr<T, B>
263where
264    B: Body,
265    T: Read + Write + Unpin,
266{
267    type Output = Result<(), ()>;
268
269    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
270        let mut this = self.project();
271
272        if *this.is_terminated {
273            return Poll::Pending;
274        }
275        let polled = this.conn.poll(cx);
276        if polled.is_ready() {
277            *this.is_terminated = true;
278        }
279        polled.map_err(|_e| {
280            debug!(error = %_e, "connection error");
281        })
282    }
283}
284
285impl<T, B> FusedFuture for ConnMapErr<T, B>
286where
287    B: Body,
288    T: Read + Write + Unpin,
289{
290    fn is_terminated(&self) -> bool {
291        self.is_terminated
292    }
293}
294
295pin_project! {
296    pub struct ConnTask<T, B>
297    where
298        B: Body,
299        T: Read,
300        T: Write,
301        T: Unpin,
302    {
303        #[pin]
304        drop_rx: Receiver<Infallible>,
305        #[pin]
306        cancel_tx: Option<oneshot::Sender<Infallible>>,
307        #[pin]
308        conn: ConnMapErr<T, B>,
309    }
310}
311
312impl<T, B> ConnTask<T, B>
313where
314    B: Body,
315    T: Read + Write + Unpin,
316{
317    fn new(
318        conn: ConnMapErr<T, B>,
319        drop_rx: Receiver<Infallible>,
320        cancel_tx: oneshot::Sender<Infallible>,
321    ) -> Self {
322        Self {
323            drop_rx,
324            cancel_tx: Some(cancel_tx),
325            conn,
326        }
327    }
328}
329
330impl<T, B> Future for ConnTask<T, B>
331where
332    B: Body,
333    T: Read + Write + Unpin,
334{
335    type Output = ();
336
337    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
338        let mut this = self.project();
339
340        if !this.conn.is_terminated() && Pin::new(&mut this.conn).poll(cx).is_ready() {
341            // ok or err, the `conn` has finished.
342            return Poll::Ready(());
343        }
344
345        if !this.drop_rx.is_terminated() && Pin::new(&mut this.drop_rx).poll_next(cx).is_ready() {
346            // mpsc has been dropped, hopefully polling
347            // the connection some more should start shutdown
348            // and then close.
349            trace!("send_request dropped, starting conn shutdown");
350            drop(this.cancel_tx.take().expect("ConnTask Future polled twice"));
351        }
352
353        Poll::Pending
354    }
355}
356
357pin_project! {
358    #[project = H2ClientFutureProject]
359    pub enum H2ClientFuture<B, T, E>
360    where
361        B: http_body::Body,
362        B: 'static,
363        B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
364        T: Read,
365        T: Write,
366        T: Unpin,
367    {
368        Pipe {
369            #[pin]
370            pipe: PipeMap<B>,
371        },
372        Send {
373            #[pin]
374            send_when: SendWhen<B, E>,
375        },
376        Task {
377            #[pin]
378            task: ConnTask<T, B>,
379        },
380    }
381}
382
383impl<B, T, E> Future for H2ClientFuture<B, T, E>
384where
385    B: http_body::Body + 'static,
386    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
387    T: Read + Write + Unpin,
388    E: Http2UpgradedExec<B::Data>,
389{
390    type Output = ();
391
392    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
393        let this = self.project();
394
395        match this {
396            H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx),
397            H2ClientFutureProject::Send { send_when } => send_when.poll(cx),
398            H2ClientFutureProject::Task { task } => task.poll(cx),
399        }
400    }
401}
402
403struct FutCtx<B>
404where
405    B: Body,
406{
407    is_connect: bool,
408    eos: bool,
409    fut: ResponseFuture,
410    body_tx: SendStream<SendBuf<B::Data>>,
411    body: B,
412    cb: Callback<Request<B>, Response<IncomingBody>>,
413}
414
415impl<B: Body> Unpin for FutCtx<B> {}
416
417pub(crate) struct ClientTask<B, E, T>
418where
419    B: Body,
420    E: Unpin,
421{
422    ping: ping::Recorder,
423    conn_drop_ref: ConnDropRef,
424    conn_eof: ConnEof,
425    executor: E,
426    h2_tx: SendRequest<SendBuf<B::Data>>,
427    req_rx: ClientRx<B>,
428    fut_ctx: Option<FutCtx<B>>,
429    marker: PhantomData<T>,
430}
431
432impl<B, E, T> ClientTask<B, E, T>
433where
434    B: Body + 'static,
435    E: Http2ClientConnExec<B, T> + Unpin,
436    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
437    T: Read + Write + Unpin,
438{
439    pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
440        self.h2_tx.is_extended_connect_protocol_enabled()
441    }
442}
443
444pin_project! {
445    pub struct PipeMap<S>
446    where
447        S: Body,
448    {
449        #[pin]
450        pipe: PipeToSendStream<S>,
451        #[pin]
452        conn_drop_ref: Option<Sender<Infallible>>,
453        #[pin]
454        ping: Option<Recorder>,
455    }
456}
457
458impl<B> Future for PipeMap<B>
459where
460    B: http_body::Body,
461    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
462{
463    type Output = ();
464
465    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
466        let mut this = self.project();
467
468        match Pin::new(&mut this.pipe).poll(cx) {
469            Poll::Ready(result) => {
470                if let Err(_e) = result {
471                    debug!("client request body error: {}", _e);
472                }
473                drop(this.conn_drop_ref.take().expect("Future polled twice"));
474                drop(this.ping.take().expect("Future polled twice"));
475                return Poll::Ready(());
476            }
477            Poll::Pending => (),
478        };
479        Poll::Pending
480    }
481}
482
483impl<B, E, T> ClientTask<B, E, T>
484where
485    B: Body + 'static + Unpin,
486    B::Data: Send,
487    E: Http2ClientConnExec<B, T> + Clone + Unpin,
488    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
489    T: Read + Write + Unpin,
490{
491    fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) {
492        let ping = self.ping.clone();
493
494        let send_stream = if !f.is_connect {
495            if !f.eos {
496                let mut pipe = PipeToSendStream::new(f.body, f.body_tx);
497
498                // eagerly see if the body pipe is ready and
499                // can thus skip allocating in the executor
500                match Pin::new(&mut pipe).poll(cx) {
501                    Poll::Ready(_) => (),
502                    Poll::Pending => {
503                        let conn_drop_ref = self.conn_drop_ref.clone();
504                        // keep the ping recorder's knowledge of an
505                        // "open stream" alive while this body is
506                        // still sending...
507                        let ping = ping.clone();
508
509                        let pipe = PipeMap {
510                            pipe,
511                            conn_drop_ref: Some(conn_drop_ref),
512                            ping: Some(ping),
513                        };
514                        // Clear send task
515                        self.executor
516                            .execute_h2_future(H2ClientFuture::Pipe { pipe });
517                    }
518                }
519            }
520
521            None
522        } else {
523            Some(f.body_tx)
524        };
525
526        self.executor.execute_h2_future(H2ClientFuture::Send {
527            send_when: SendWhen {
528                when: ResponseFutMap {
529                    fut: f.fut,
530                    ping: Some(ping),
531                    send_stream: Some(send_stream),
532                    exec: self.executor.clone(),
533                },
534                call_back: Some(f.cb),
535            },
536        });
537    }
538}
539
540pin_project! {
541    pub(crate) struct ResponseFutMap<B, E>
542    where
543        B: Body,
544        B: 'static,
545    {
546        #[pin]
547        fut: ResponseFuture,
548        ping: Option<Recorder>,
549        #[pin]
550        send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>,
551        exec: E,
552    }
553}
554
555impl<B, E> Future for ResponseFutMap<B, E>
556where
557    B: Body + 'static,
558    E: Http2UpgradedExec<B::Data>,
559{
560    type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>;
561
562    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
563        let mut this = self.as_mut().project();
564
565        let result = ready!(this.fut.poll(cx));
566
567        let ping = this.ping.take().expect("Future polled twice");
568        let send_stream = this.send_stream.take().expect("Future polled twice");
569
570        match result {
571            Ok(res) => {
572                // record that we got the response headers
573                ping.record_non_data();
574
575                let content_length = headers::content_length_parse_all(res.headers());
576                if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) {
577                    if content_length.map_or(false, |len| len != 0) {
578                        warn!("h2 connect response with non-zero body not supported");
579
580                        send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
581                        return Poll::Ready(Err((
582                            crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
583                            None::<Request<B>>,
584                        )));
585                    }
586                    let (parts, recv_stream) = res.into_parts();
587                    let mut res = Response::from_parts(parts, IncomingBody::empty());
588
589                    let (pending, on_upgrade) = crate::upgrade::pending();
590
591                    let (h2_up, up_task) = super::upgrade::pair(send_stream, recv_stream, ping);
592                    self.exec.execute_upgrade(up_task);
593                    let upgraded = Upgraded::new(h2_up, Bytes::new());
594
595                    pending.fulfill(upgraded);
596                    res.extensions_mut().insert(on_upgrade);
597
598                    Poll::Ready(Ok(res))
599                } else {
600                    let res = res.map(|stream| {
601                        let ping = ping.for_stream(&stream);
602                        IncomingBody::h2(stream, content_length.into(), ping)
603                    });
604                    Poll::Ready(Ok(res))
605                }
606            }
607            Err(err) => {
608                ping.ensure_not_timed_out().map_err(|e| (e, None))?;
609
610                debug!("client response error: {}", err);
611                Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>)))
612            }
613        }
614    }
615}
616
617impl<B, E, T> Future for ClientTask<B, E, T>
618where
619    B: Body + 'static + Unpin,
620    B::Data: Send,
621    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
622    E: Http2ClientConnExec<B, T> + Clone + Unpin,
623    T: Read + Write + Unpin,
624{
625    type Output = crate::Result<Dispatched>;
626
627    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
628        loop {
629            match ready!(self.h2_tx.poll_ready(cx)) {
630                Ok(()) => (),
631                Err(err) => {
632                    self.ping.ensure_not_timed_out()?;
633                    return if err.reason() == Some(::h2::Reason::NO_ERROR) {
634                        trace!("connection gracefully shutdown");
635                        Poll::Ready(Ok(Dispatched::Shutdown))
636                    } else {
637                        Poll::Ready(Err(crate::Error::new_h2(err)))
638                    };
639                }
640            };
641
642            // If we were waiting on pending open
643            // continue where we left off.
644            if let Some(f) = self.fut_ctx.take() {
645                self.poll_pipe(f, cx);
646                continue;
647            }
648
649            match self.req_rx.poll_recv(cx) {
650                Poll::Ready(Some((req, cb))) => {
651                    // check that future hasn't been canceled already
652                    if cb.is_canceled() {
653                        trace!("request callback is canceled");
654                        continue;
655                    }
656                    let (head, body) = req.into_parts();
657                    let mut req = ::http::Request::from_parts(head, ());
658                    super::strip_connection_headers(req.headers_mut(), true);
659                    if let Some(len) = body.size_hint().exact() {
660                        if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
661                            headers::set_content_length_if_missing(req.headers_mut(), len);
662                        }
663                    }
664
665                    let is_connect = req.method() == Method::CONNECT;
666                    let eos = body.is_end_stream();
667
668                    if is_connect
669                        && headers::content_length_parse_all(req.headers())
670                            .map_or(false, |len| len != 0)
671                    {
672                        debug!("h2 connect request with non-zero body not supported");
673                        cb.send(Err(TrySendError {
674                            error: crate::Error::new_user_invalid_connect(),
675                            message: None,
676                        }));
677                        continue;
678                    }
679
680                    if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
681                        req.extensions_mut().insert(protocol.into_inner());
682                    }
683
684                    let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
685                        Ok(ok) => ok,
686                        Err(err) => {
687                            debug!("client send request error: {}", err);
688                            cb.send(Err(TrySendError {
689                                error: crate::Error::new_h2(err),
690                                message: None,
691                            }));
692                            continue;
693                        }
694                    };
695
696                    let f = FutCtx {
697                        is_connect,
698                        eos,
699                        fut,
700                        body_tx,
701                        body,
702                        cb,
703                    };
704
705                    // Check poll_ready() again.
706                    // If the call to send_request() resulted in the new stream being pending open
707                    // we have to wait for the open to complete before accepting new requests.
708                    match self.h2_tx.poll_ready(cx) {
709                        Poll::Pending => {
710                            // Save Context
711                            self.fut_ctx = Some(f);
712                            return Poll::Pending;
713                        }
714                        Poll::Ready(Ok(())) => (),
715                        Poll::Ready(Err(err)) => {
716                            f.cb.send(Err(TrySendError {
717                                error: crate::Error::new_h2(err),
718                                message: None,
719                            }));
720                            continue;
721                        }
722                    }
723                    self.poll_pipe(f, cx);
724                    continue;
725                }
726
727                Poll::Ready(None) => {
728                    trace!("client::dispatch::Sender dropped");
729                    return Poll::Ready(Ok(Dispatched::Shutdown));
730                }
731
732                Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
733                    // As of Rust 1.82, this pattern is no longer needed, and emits a warning.
734                    // But we cannot remove it as long as MSRV is less than that.
735                    #[allow(unused)]
736                    Ok(never) => match never {},
737                    Err(_conn_is_eof) => {
738                        trace!("connection task is closed, closing dispatch task");
739                        return Poll::Ready(Ok(Dispatched::Shutdown));
740                    }
741                },
742            }
743        }
744    }
745}