hyper/proto/h2/
server.rs

1use std::error::Error as StdError;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use bytes::Bytes;
8use futures_core::ready;
9use h2::server::{Connection, Handshake, SendResponse};
10use h2::{Reason, RecvStream};
11use http::{Method, Request};
12use pin_project_lite::pin_project;
13
14use super::{ping, PipeToSendStream, SendBuf};
15use crate::body::{Body, Incoming as IncomingBody};
16use crate::common::date;
17use crate::common::io::Compat;
18use crate::common::time::Time;
19use crate::ext::Protocol;
20use crate::headers;
21use crate::proto::h2::ping::Recorder;
22use crate::proto::Dispatched;
23use crate::rt::bounds::{Http2ServerConnExec, Http2UpgradedExec};
24use crate::rt::{Read, Write};
25use crate::service::HttpService;
26
27use crate::upgrade::{OnUpgrade, Pending, Upgraded};
28use crate::Response;
29
30// Our defaults are chosen for the "majority" case, which usually are not
31// resource constrained, and so the spec default of 64kb can be too limiting
32// for performance.
33//
34// At the same time, a server more often has multiple clients connected, and
35// so is more likely to use more resources than a client would.
36const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; // 1mb
37const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; // 1mb
38const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
39const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; // 400kb
40const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; // 16kb
41const DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS: usize = 1024;
42
43#[derive(Clone, Debug)]
44pub(crate) struct Config {
45    pub(crate) adaptive_window: bool,
46    pub(crate) initial_conn_window_size: u32,
47    pub(crate) initial_stream_window_size: u32,
48    pub(crate) max_frame_size: u32,
49    pub(crate) enable_connect_protocol: bool,
50    pub(crate) max_concurrent_streams: Option<u32>,
51    pub(crate) max_pending_accept_reset_streams: Option<usize>,
52    pub(crate) max_local_error_reset_streams: Option<usize>,
53    pub(crate) keep_alive_interval: Option<Duration>,
54    pub(crate) keep_alive_timeout: Duration,
55    pub(crate) max_send_buffer_size: usize,
56    pub(crate) max_header_list_size: u32,
57    pub(crate) date_header: bool,
58}
59
60impl Default for Config {
61    fn default() -> Config {
62        Config {
63            adaptive_window: false,
64            initial_conn_window_size: DEFAULT_CONN_WINDOW,
65            initial_stream_window_size: DEFAULT_STREAM_WINDOW,
66            max_frame_size: DEFAULT_MAX_FRAME_SIZE,
67            enable_connect_protocol: false,
68            max_concurrent_streams: Some(200),
69            max_pending_accept_reset_streams: None,
70            max_local_error_reset_streams: Some(DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS),
71            keep_alive_interval: None,
72            keep_alive_timeout: Duration::from_secs(20),
73            max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
74            max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE,
75            date_header: true,
76        }
77    }
78}
79
80pin_project! {
81    pub(crate) struct Server<T, S, B, E>
82    where
83        S: HttpService<IncomingBody>,
84        B: Body,
85    {
86        exec: E,
87        timer: Time,
88        service: S,
89        state: State<T, B>,
90        date_header: bool,
91        close_pending: bool
92    }
93}
94
95enum State<T, B>
96where
97    B: Body,
98{
99    Handshaking {
100        ping_config: ping::Config,
101        hs: Handshake<Compat<T>, SendBuf<B::Data>>,
102    },
103    Serving(Serving<T, B>),
104}
105
106struct Serving<T, B>
107where
108    B: Body,
109{
110    ping: Option<(ping::Recorder, ping::Ponger)>,
111    conn: Connection<Compat<T>, SendBuf<B::Data>>,
112    closing: Option<crate::Error>,
113    date_header: bool,
114}
115
116impl<T, S, B, E> Server<T, S, B, E>
117where
118    T: Read + Write + Unpin,
119    S: HttpService<IncomingBody, ResBody = B>,
120    S::Error: Into<Box<dyn StdError + Send + Sync>>,
121    B: Body + 'static,
122    E: Http2ServerConnExec<S::Future, B>,
123{
124    pub(crate) fn new(
125        io: T,
126        service: S,
127        config: &Config,
128        exec: E,
129        timer: Time,
130    ) -> Server<T, S, B, E> {
131        let mut builder = h2::server::Builder::default();
132        builder
133            .initial_window_size(config.initial_stream_window_size)
134            .initial_connection_window_size(config.initial_conn_window_size)
135            .max_frame_size(config.max_frame_size)
136            .max_header_list_size(config.max_header_list_size)
137            .max_local_error_reset_streams(config.max_local_error_reset_streams)
138            .max_send_buffer_size(config.max_send_buffer_size);
139        if let Some(max) = config.max_concurrent_streams {
140            builder.max_concurrent_streams(max);
141        }
142        if let Some(max) = config.max_pending_accept_reset_streams {
143            builder.max_pending_accept_reset_streams(max);
144        }
145        if config.enable_connect_protocol {
146            builder.enable_connect_protocol();
147        }
148        let handshake = builder.handshake(Compat::new(io));
149
150        let bdp = if config.adaptive_window {
151            Some(config.initial_stream_window_size)
152        } else {
153            None
154        };
155
156        let ping_config = ping::Config {
157            bdp_initial_window: bdp,
158            keep_alive_interval: config.keep_alive_interval,
159            keep_alive_timeout: config.keep_alive_timeout,
160            // If keep-alive is enabled for servers, always enabled while
161            // idle, so it can more aggressively close dead connections.
162            keep_alive_while_idle: true,
163        };
164
165        Server {
166            exec,
167            timer,
168            state: State::Handshaking {
169                ping_config,
170                hs: handshake,
171            },
172            service,
173            date_header: config.date_header,
174            close_pending: false,
175        }
176    }
177
178    pub(crate) fn graceful_shutdown(&mut self) {
179        trace!("graceful_shutdown");
180        match self.state {
181            State::Handshaking { .. } => {
182                self.close_pending = true;
183            }
184            State::Serving(ref mut srv) => {
185                if srv.closing.is_none() {
186                    srv.conn.graceful_shutdown();
187                }
188            }
189        }
190    }
191}
192
193impl<T, S, B, E> Future for Server<T, S, B, E>
194where
195    T: Read + Write + Unpin,
196    S: HttpService<IncomingBody, ResBody = B>,
197    S::Error: Into<Box<dyn StdError + Send + Sync>>,
198    B: Body + 'static,
199    E: Http2ServerConnExec<S::Future, B>,
200{
201    type Output = crate::Result<Dispatched>;
202
203    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
204        let me = &mut *self;
205        loop {
206            let next = match me.state {
207                State::Handshaking {
208                    ref mut hs,
209                    ref ping_config,
210                } => {
211                    let mut conn = ready!(Pin::new(hs).poll(cx).map_err(crate::Error::new_h2))?;
212                    let ping = if ping_config.is_enabled() {
213                        let pp = conn.ping_pong().expect("conn.ping_pong");
214                        Some(ping::channel(pp, ping_config.clone(), me.timer.clone()))
215                    } else {
216                        None
217                    };
218                    State::Serving(Serving {
219                        ping,
220                        conn,
221                        closing: None,
222                        date_header: me.date_header,
223                    })
224                }
225                State::Serving(ref mut srv) => {
226                    // graceful_shutdown was called before handshaking finished,
227                    if me.close_pending && srv.closing.is_none() {
228                        srv.conn.graceful_shutdown();
229                    }
230                    ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?;
231                    return Poll::Ready(Ok(Dispatched::Shutdown));
232                }
233            };
234            me.state = next;
235        }
236    }
237}
238
239impl<T, B> Serving<T, B>
240where
241    T: Read + Write + Unpin,
242    B: Body + 'static,
243{
244    fn poll_server<S, E>(
245        &mut self,
246        cx: &mut Context<'_>,
247        service: &mut S,
248        exec: &mut E,
249    ) -> Poll<crate::Result<()>>
250    where
251        S: HttpService<IncomingBody, ResBody = B>,
252        S::Error: Into<Box<dyn StdError + Send + Sync>>,
253        E: Http2ServerConnExec<S::Future, B>,
254    {
255        if self.closing.is_none() {
256            loop {
257                self.poll_ping(cx);
258
259                match ready!(self.conn.poll_accept(cx)) {
260                    Some(Ok((req, mut respond))) => {
261                        trace!("incoming request");
262                        let content_length = headers::content_length_parse_all(req.headers());
263                        let ping = self
264                            .ping
265                            .as_ref()
266                            .map(|ping| ping.0.clone())
267                            .unwrap_or_else(ping::disabled);
268
269                        // Record the headers received
270                        ping.record_non_data();
271
272                        let is_connect = req.method() == Method::CONNECT;
273                        let (mut parts, stream) = req.into_parts();
274                        let (mut req, connect_parts) = if !is_connect {
275                            (
276                                Request::from_parts(
277                                    parts,
278                                    IncomingBody::h2(stream, content_length.into(), ping),
279                                ),
280                                None,
281                            )
282                        } else {
283                            if content_length.map_or(false, |len| len != 0) {
284                                warn!("h2 connect request with non-zero body not supported");
285                                respond.send_reset(h2::Reason::INTERNAL_ERROR);
286                                return Poll::Ready(Ok(()));
287                            }
288                            let (pending, upgrade) = crate::upgrade::pending();
289                            debug_assert!(parts.extensions.get::<OnUpgrade>().is_none());
290                            parts.extensions.insert(upgrade);
291                            (
292                                Request::from_parts(parts, IncomingBody::empty()),
293                                Some(ConnectParts {
294                                    pending,
295                                    ping,
296                                    recv_stream: stream,
297                                }),
298                            )
299                        };
300
301                        if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() {
302                            req.extensions_mut().insert(Protocol::from_inner(protocol));
303                        }
304
305                        let fut = H2Stream::new(
306                            service.call(req),
307                            connect_parts,
308                            respond,
309                            self.date_header,
310                            exec.clone(),
311                        );
312
313                        exec.execute_h2stream(fut);
314                    }
315                    Some(Err(e)) => {
316                        return Poll::Ready(Err(crate::Error::new_h2(e)));
317                    }
318                    None => {
319                        // no more incoming streams...
320                        if let Some((ref ping, _)) = self.ping {
321                            ping.ensure_not_timed_out()?;
322                        }
323
324                        trace!("incoming connection complete");
325                        return Poll::Ready(Ok(()));
326                    }
327                }
328            }
329        }
330
331        debug_assert!(
332            self.closing.is_some(),
333            "poll_server broke loop without closing"
334        );
335
336        ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
337
338        Poll::Ready(Err(self.closing.take().expect("polled after error")))
339    }
340
341    fn poll_ping(&mut self, cx: &mut Context<'_>) {
342        if let Some((_, ref mut estimator)) = self.ping {
343            match estimator.poll(cx) {
344                Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
345                    self.conn.set_target_window_size(wnd);
346                    let _ = self.conn.set_initial_window_size(wnd);
347                }
348                Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
349                    debug!("keep-alive timed out, closing connection");
350                    self.conn.abrupt_shutdown(h2::Reason::NO_ERROR);
351                }
352                Poll::Pending => {}
353            }
354        }
355    }
356}
357
358pin_project! {
359    #[allow(missing_debug_implementations)]
360    pub struct H2Stream<F, B, E>
361    where
362        B: Body,
363    {
364        reply: SendResponse<SendBuf<B::Data>>,
365        #[pin]
366        state: H2StreamState<F, B>,
367        date_header: bool,
368        exec: E,
369    }
370}
371
372pin_project! {
373    #[project = H2StreamStateProj]
374    enum H2StreamState<F, B>
375    where
376        B: Body,
377    {
378        Service {
379            #[pin]
380            fut: F,
381            connect_parts: Option<ConnectParts>,
382        },
383        Body {
384            #[pin]
385            pipe: PipeToSendStream<B>,
386        },
387    }
388}
389
390struct ConnectParts {
391    pending: Pending,
392    ping: Recorder,
393    recv_stream: RecvStream,
394}
395
396impl<F, B, E> H2Stream<F, B, E>
397where
398    B: Body,
399{
400    fn new(
401        fut: F,
402        connect_parts: Option<ConnectParts>,
403        respond: SendResponse<SendBuf<B::Data>>,
404        date_header: bool,
405        exec: E,
406    ) -> H2Stream<F, B, E> {
407        H2Stream {
408            reply: respond,
409            state: H2StreamState::Service { fut, connect_parts },
410            date_header,
411            exec,
412        }
413    }
414}
415
416macro_rules! reply {
417    ($me:expr, $res:expr, $eos:expr) => {{
418        match $me.reply.send_response($res, $eos) {
419            Ok(tx) => tx,
420            Err(e) => {
421                debug!("send response error: {}", e);
422                $me.reply.send_reset(Reason::INTERNAL_ERROR);
423                return Poll::Ready(Err(crate::Error::new_h2(e)));
424            }
425        }
426    }};
427}
428
429impl<F, B, Ex, E> H2Stream<F, B, Ex>
430where
431    F: Future<Output = Result<Response<B>, E>>,
432    B: Body,
433    B::Data: 'static,
434    B::Error: Into<Box<dyn StdError + Send + Sync>>,
435    Ex: Http2UpgradedExec<B::Data>,
436    E: Into<Box<dyn StdError + Send + Sync>>,
437{
438    fn poll2(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
439        let mut me = self.as_mut().project();
440        loop {
441            let next = match me.state.as_mut().project() {
442                H2StreamStateProj::Service {
443                    fut: h,
444                    connect_parts,
445                } => {
446                    let res = match h.poll(cx) {
447                        Poll::Ready(Ok(r)) => r,
448                        Poll::Pending => {
449                            // Response is not yet ready, so we want to check if the client has sent a
450                            // RST_STREAM frame which would cancel the current request.
451                            if let Poll::Ready(reason) =
452                                me.reply.poll_reset(cx).map_err(crate::Error::new_h2)?
453                            {
454                                debug!("stream received RST_STREAM: {:?}", reason);
455                                return Poll::Ready(Err(crate::Error::new_h2(reason.into())));
456                            }
457                            return Poll::Pending;
458                        }
459                        Poll::Ready(Err(e)) => {
460                            let err = crate::Error::new_user_service(e);
461                            warn!("http2 service errored: {}", err);
462                            me.reply.send_reset(err.h2_reason());
463                            return Poll::Ready(Err(err));
464                        }
465                    };
466
467                    let (head, body) = res.into_parts();
468                    let mut res = ::http::Response::from_parts(head, ());
469                    super::strip_connection_headers(res.headers_mut(), false);
470
471                    // set Date header if it isn't already set if instructed
472                    if *me.date_header {
473                        res.headers_mut()
474                            .entry(::http::header::DATE)
475                            .or_insert_with(date::update_and_header_value);
476                    }
477
478                    if let Some(connect_parts) = connect_parts.take() {
479                        if res.status().is_success() {
480                            if headers::content_length_parse_all(res.headers())
481                                .map_or(false, |len| len != 0)
482                            {
483                                warn!("h2 successful response to CONNECT request with body not supported");
484                                me.reply.send_reset(h2::Reason::INTERNAL_ERROR);
485                                return Poll::Ready(Err(crate::Error::new_user_header()));
486                            }
487                            if res
488                                .headers_mut()
489                                .remove(::http::header::CONTENT_LENGTH)
490                                .is_some()
491                            {
492                                warn!("successful response to CONNECT request disallows content-length header");
493                            }
494                            let send_stream = reply!(me, res, false);
495                            let (h2_up, up_task) = super::upgrade::pair(
496                                send_stream,
497                                connect_parts.recv_stream,
498                                connect_parts.ping,
499                            );
500                            connect_parts
501                                .pending
502                                .fulfill(Upgraded::new(h2_up, Bytes::new()));
503                            self.exec.execute_upgrade(up_task);
504                            return Poll::Ready(Ok(()));
505                        }
506                    }
507
508                    if !body.is_end_stream() {
509                        // automatically set Content-Length from body...
510                        if let Some(len) = body.size_hint().exact() {
511                            headers::set_content_length_if_missing(res.headers_mut(), len);
512                        }
513
514                        let body_tx = reply!(me, res, false);
515                        H2StreamState::Body {
516                            pipe: PipeToSendStream::new(body, body_tx),
517                        }
518                    } else {
519                        reply!(me, res, true);
520                        return Poll::Ready(Ok(()));
521                    }
522                }
523                H2StreamStateProj::Body { pipe } => {
524                    return pipe.poll(cx);
525                }
526            };
527            me.state.set(next);
528        }
529    }
530}
531
532impl<F, B, Ex, E> Future for H2Stream<F, B, Ex>
533where
534    F: Future<Output = Result<Response<B>, E>>,
535    B: Body,
536    B::Data: 'static,
537    B::Error: Into<Box<dyn StdError + Send + Sync>>,
538    Ex: Http2UpgradedExec<B::Data>,
539    E: Into<Box<dyn StdError + Send + Sync>>,
540{
541    type Output = ();
542
543    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
544        self.poll2(cx).map(|res| {
545            if let Err(_e) = res {
546                debug!("stream error: {}", _e);
547            }
548        })
549    }
550}