1use std::error::Error as StdError;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use bytes::Bytes;
8use futures_core::ready;
9use h2::server::{Connection, Handshake, SendResponse};
10use h2::{Reason, RecvStream};
11use http::{Method, Request};
12use pin_project_lite::pin_project;
13
14use super::{ping, PipeToSendStream, SendBuf};
15use crate::body::{Body, Incoming as IncomingBody};
16use crate::common::date;
17use crate::common::io::Compat;
18use crate::common::time::Time;
19use crate::ext::Protocol;
20use crate::headers;
21use crate::proto::h2::ping::Recorder;
22use crate::proto::Dispatched;
23use crate::rt::bounds::{Http2ServerConnExec, Http2UpgradedExec};
24use crate::rt::{Read, Write};
25use crate::service::HttpService;
26
27use crate::upgrade::{OnUpgrade, Pending, Upgraded};
28use crate::Response;
29
30const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; const DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS: usize = 1024;
42
43#[derive(Clone, Debug)]
44pub(crate) struct Config {
45 pub(crate) adaptive_window: bool,
46 pub(crate) initial_conn_window_size: u32,
47 pub(crate) initial_stream_window_size: u32,
48 pub(crate) max_frame_size: u32,
49 pub(crate) enable_connect_protocol: bool,
50 pub(crate) max_concurrent_streams: Option<u32>,
51 pub(crate) max_pending_accept_reset_streams: Option<usize>,
52 pub(crate) max_local_error_reset_streams: Option<usize>,
53 pub(crate) keep_alive_interval: Option<Duration>,
54 pub(crate) keep_alive_timeout: Duration,
55 pub(crate) max_send_buffer_size: usize,
56 pub(crate) max_header_list_size: u32,
57 pub(crate) date_header: bool,
58}
59
60impl Default for Config {
61 fn default() -> Config {
62 Config {
63 adaptive_window: false,
64 initial_conn_window_size: DEFAULT_CONN_WINDOW,
65 initial_stream_window_size: DEFAULT_STREAM_WINDOW,
66 max_frame_size: DEFAULT_MAX_FRAME_SIZE,
67 enable_connect_protocol: false,
68 max_concurrent_streams: Some(200),
69 max_pending_accept_reset_streams: None,
70 max_local_error_reset_streams: Some(DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS),
71 keep_alive_interval: None,
72 keep_alive_timeout: Duration::from_secs(20),
73 max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
74 max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE,
75 date_header: true,
76 }
77 }
78}
79
80pin_project! {
81 pub(crate) struct Server<T, S, B, E>
82 where
83 S: HttpService<IncomingBody>,
84 B: Body,
85 {
86 exec: E,
87 timer: Time,
88 service: S,
89 state: State<T, B>,
90 date_header: bool,
91 close_pending: bool
92 }
93}
94
95enum State<T, B>
96where
97 B: Body,
98{
99 Handshaking {
100 ping_config: ping::Config,
101 hs: Handshake<Compat<T>, SendBuf<B::Data>>,
102 },
103 Serving(Serving<T, B>),
104}
105
106struct Serving<T, B>
107where
108 B: Body,
109{
110 ping: Option<(ping::Recorder, ping::Ponger)>,
111 conn: Connection<Compat<T>, SendBuf<B::Data>>,
112 closing: Option<crate::Error>,
113 date_header: bool,
114}
115
116impl<T, S, B, E> Server<T, S, B, E>
117where
118 T: Read + Write + Unpin,
119 S: HttpService<IncomingBody, ResBody = B>,
120 S::Error: Into<Box<dyn StdError + Send + Sync>>,
121 B: Body + 'static,
122 E: Http2ServerConnExec<S::Future, B>,
123{
124 pub(crate) fn new(
125 io: T,
126 service: S,
127 config: &Config,
128 exec: E,
129 timer: Time,
130 ) -> Server<T, S, B, E> {
131 let mut builder = h2::server::Builder::default();
132 builder
133 .initial_window_size(config.initial_stream_window_size)
134 .initial_connection_window_size(config.initial_conn_window_size)
135 .max_frame_size(config.max_frame_size)
136 .max_header_list_size(config.max_header_list_size)
137 .max_local_error_reset_streams(config.max_local_error_reset_streams)
138 .max_send_buffer_size(config.max_send_buffer_size);
139 if let Some(max) = config.max_concurrent_streams {
140 builder.max_concurrent_streams(max);
141 }
142 if let Some(max) = config.max_pending_accept_reset_streams {
143 builder.max_pending_accept_reset_streams(max);
144 }
145 if config.enable_connect_protocol {
146 builder.enable_connect_protocol();
147 }
148 let handshake = builder.handshake(Compat::new(io));
149
150 let bdp = if config.adaptive_window {
151 Some(config.initial_stream_window_size)
152 } else {
153 None
154 };
155
156 let ping_config = ping::Config {
157 bdp_initial_window: bdp,
158 keep_alive_interval: config.keep_alive_interval,
159 keep_alive_timeout: config.keep_alive_timeout,
160 keep_alive_while_idle: true,
163 };
164
165 Server {
166 exec,
167 timer,
168 state: State::Handshaking {
169 ping_config,
170 hs: handshake,
171 },
172 service,
173 date_header: config.date_header,
174 close_pending: false,
175 }
176 }
177
178 pub(crate) fn graceful_shutdown(&mut self) {
179 trace!("graceful_shutdown");
180 match self.state {
181 State::Handshaking { .. } => {
182 self.close_pending = true;
183 }
184 State::Serving(ref mut srv) => {
185 if srv.closing.is_none() {
186 srv.conn.graceful_shutdown();
187 }
188 }
189 }
190 }
191}
192
193impl<T, S, B, E> Future for Server<T, S, B, E>
194where
195 T: Read + Write + Unpin,
196 S: HttpService<IncomingBody, ResBody = B>,
197 S::Error: Into<Box<dyn StdError + Send + Sync>>,
198 B: Body + 'static,
199 E: Http2ServerConnExec<S::Future, B>,
200{
201 type Output = crate::Result<Dispatched>;
202
203 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
204 let me = &mut *self;
205 loop {
206 let next = match me.state {
207 State::Handshaking {
208 ref mut hs,
209 ref ping_config,
210 } => {
211 let mut conn = ready!(Pin::new(hs).poll(cx).map_err(crate::Error::new_h2))?;
212 let ping = if ping_config.is_enabled() {
213 let pp = conn.ping_pong().expect("conn.ping_pong");
214 Some(ping::channel(pp, ping_config.clone(), me.timer.clone()))
215 } else {
216 None
217 };
218 State::Serving(Serving {
219 ping,
220 conn,
221 closing: None,
222 date_header: me.date_header,
223 })
224 }
225 State::Serving(ref mut srv) => {
226 if me.close_pending && srv.closing.is_none() {
228 srv.conn.graceful_shutdown();
229 }
230 ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?;
231 return Poll::Ready(Ok(Dispatched::Shutdown));
232 }
233 };
234 me.state = next;
235 }
236 }
237}
238
239impl<T, B> Serving<T, B>
240where
241 T: Read + Write + Unpin,
242 B: Body + 'static,
243{
244 fn poll_server<S, E>(
245 &mut self,
246 cx: &mut Context<'_>,
247 service: &mut S,
248 exec: &mut E,
249 ) -> Poll<crate::Result<()>>
250 where
251 S: HttpService<IncomingBody, ResBody = B>,
252 S::Error: Into<Box<dyn StdError + Send + Sync>>,
253 E: Http2ServerConnExec<S::Future, B>,
254 {
255 if self.closing.is_none() {
256 loop {
257 self.poll_ping(cx);
258
259 match ready!(self.conn.poll_accept(cx)) {
260 Some(Ok((req, mut respond))) => {
261 trace!("incoming request");
262 let content_length = headers::content_length_parse_all(req.headers());
263 let ping = self
264 .ping
265 .as_ref()
266 .map(|ping| ping.0.clone())
267 .unwrap_or_else(ping::disabled);
268
269 ping.record_non_data();
271
272 let is_connect = req.method() == Method::CONNECT;
273 let (mut parts, stream) = req.into_parts();
274 let (mut req, connect_parts) = if !is_connect {
275 (
276 Request::from_parts(
277 parts,
278 IncomingBody::h2(stream, content_length.into(), ping),
279 ),
280 None,
281 )
282 } else {
283 if content_length.map_or(false, |len| len != 0) {
284 warn!("h2 connect request with non-zero body not supported");
285 respond.send_reset(h2::Reason::INTERNAL_ERROR);
286 return Poll::Ready(Ok(()));
287 }
288 let (pending, upgrade) = crate::upgrade::pending();
289 debug_assert!(parts.extensions.get::<OnUpgrade>().is_none());
290 parts.extensions.insert(upgrade);
291 (
292 Request::from_parts(parts, IncomingBody::empty()),
293 Some(ConnectParts {
294 pending,
295 ping,
296 recv_stream: stream,
297 }),
298 )
299 };
300
301 if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() {
302 req.extensions_mut().insert(Protocol::from_inner(protocol));
303 }
304
305 let fut = H2Stream::new(
306 service.call(req),
307 connect_parts,
308 respond,
309 self.date_header,
310 exec.clone(),
311 );
312
313 exec.execute_h2stream(fut);
314 }
315 Some(Err(e)) => {
316 return Poll::Ready(Err(crate::Error::new_h2(e)));
317 }
318 None => {
319 if let Some((ref ping, _)) = self.ping {
321 ping.ensure_not_timed_out()?;
322 }
323
324 trace!("incoming connection complete");
325 return Poll::Ready(Ok(()));
326 }
327 }
328 }
329 }
330
331 debug_assert!(
332 self.closing.is_some(),
333 "poll_server broke loop without closing"
334 );
335
336 ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
337
338 Poll::Ready(Err(self.closing.take().expect("polled after error")))
339 }
340
341 fn poll_ping(&mut self, cx: &mut Context<'_>) {
342 if let Some((_, ref mut estimator)) = self.ping {
343 match estimator.poll(cx) {
344 Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
345 self.conn.set_target_window_size(wnd);
346 let _ = self.conn.set_initial_window_size(wnd);
347 }
348 Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
349 debug!("keep-alive timed out, closing connection");
350 self.conn.abrupt_shutdown(h2::Reason::NO_ERROR);
351 }
352 Poll::Pending => {}
353 }
354 }
355 }
356}
357
358pin_project! {
359 #[allow(missing_debug_implementations)]
360 pub struct H2Stream<F, B, E>
361 where
362 B: Body,
363 {
364 reply: SendResponse<SendBuf<B::Data>>,
365 #[pin]
366 state: H2StreamState<F, B>,
367 date_header: bool,
368 exec: E,
369 }
370}
371
372pin_project! {
373 #[project = H2StreamStateProj]
374 enum H2StreamState<F, B>
375 where
376 B: Body,
377 {
378 Service {
379 #[pin]
380 fut: F,
381 connect_parts: Option<ConnectParts>,
382 },
383 Body {
384 #[pin]
385 pipe: PipeToSendStream<B>,
386 },
387 }
388}
389
390struct ConnectParts {
391 pending: Pending,
392 ping: Recorder,
393 recv_stream: RecvStream,
394}
395
396impl<F, B, E> H2Stream<F, B, E>
397where
398 B: Body,
399{
400 fn new(
401 fut: F,
402 connect_parts: Option<ConnectParts>,
403 respond: SendResponse<SendBuf<B::Data>>,
404 date_header: bool,
405 exec: E,
406 ) -> H2Stream<F, B, E> {
407 H2Stream {
408 reply: respond,
409 state: H2StreamState::Service { fut, connect_parts },
410 date_header,
411 exec,
412 }
413 }
414}
415
416macro_rules! reply {
417 ($me:expr, $res:expr, $eos:expr) => {{
418 match $me.reply.send_response($res, $eos) {
419 Ok(tx) => tx,
420 Err(e) => {
421 debug!("send response error: {}", e);
422 $me.reply.send_reset(Reason::INTERNAL_ERROR);
423 return Poll::Ready(Err(crate::Error::new_h2(e)));
424 }
425 }
426 }};
427}
428
429impl<F, B, Ex, E> H2Stream<F, B, Ex>
430where
431 F: Future<Output = Result<Response<B>, E>>,
432 B: Body,
433 B::Data: 'static,
434 B::Error: Into<Box<dyn StdError + Send + Sync>>,
435 Ex: Http2UpgradedExec<B::Data>,
436 E: Into<Box<dyn StdError + Send + Sync>>,
437{
438 fn poll2(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
439 let mut me = self.as_mut().project();
440 loop {
441 let next = match me.state.as_mut().project() {
442 H2StreamStateProj::Service {
443 fut: h,
444 connect_parts,
445 } => {
446 let res = match h.poll(cx) {
447 Poll::Ready(Ok(r)) => r,
448 Poll::Pending => {
449 if let Poll::Ready(reason) =
452 me.reply.poll_reset(cx).map_err(crate::Error::new_h2)?
453 {
454 debug!("stream received RST_STREAM: {:?}", reason);
455 return Poll::Ready(Err(crate::Error::new_h2(reason.into())));
456 }
457 return Poll::Pending;
458 }
459 Poll::Ready(Err(e)) => {
460 let err = crate::Error::new_user_service(e);
461 warn!("http2 service errored: {}", err);
462 me.reply.send_reset(err.h2_reason());
463 return Poll::Ready(Err(err));
464 }
465 };
466
467 let (head, body) = res.into_parts();
468 let mut res = ::http::Response::from_parts(head, ());
469 super::strip_connection_headers(res.headers_mut(), false);
470
471 if *me.date_header {
473 res.headers_mut()
474 .entry(::http::header::DATE)
475 .or_insert_with(date::update_and_header_value);
476 }
477
478 if let Some(connect_parts) = connect_parts.take() {
479 if res.status().is_success() {
480 if headers::content_length_parse_all(res.headers())
481 .map_or(false, |len| len != 0)
482 {
483 warn!("h2 successful response to CONNECT request with body not supported");
484 me.reply.send_reset(h2::Reason::INTERNAL_ERROR);
485 return Poll::Ready(Err(crate::Error::new_user_header()));
486 }
487 if res
488 .headers_mut()
489 .remove(::http::header::CONTENT_LENGTH)
490 .is_some()
491 {
492 warn!("successful response to CONNECT request disallows content-length header");
493 }
494 let send_stream = reply!(me, res, false);
495 let (h2_up, up_task) = super::upgrade::pair(
496 send_stream,
497 connect_parts.recv_stream,
498 connect_parts.ping,
499 );
500 connect_parts
501 .pending
502 .fulfill(Upgraded::new(h2_up, Bytes::new()));
503 self.exec.execute_upgrade(up_task);
504 return Poll::Ready(Ok(()));
505 }
506 }
507
508 if !body.is_end_stream() {
509 if let Some(len) = body.size_hint().exact() {
511 headers::set_content_length_if_missing(res.headers_mut(), len);
512 }
513
514 let body_tx = reply!(me, res, false);
515 H2StreamState::Body {
516 pipe: PipeToSendStream::new(body, body_tx),
517 }
518 } else {
519 reply!(me, res, true);
520 return Poll::Ready(Ok(()));
521 }
522 }
523 H2StreamStateProj::Body { pipe } => {
524 return pipe.poll(cx);
525 }
526 };
527 me.state.set(next);
528 }
529 }
530}
531
532impl<F, B, Ex, E> Future for H2Stream<F, B, Ex>
533where
534 F: Future<Output = Result<Response<B>, E>>,
535 B: Body,
536 B::Data: 'static,
537 B::Error: Into<Box<dyn StdError + Send + Sync>>,
538 Ex: Http2UpgradedExec<B::Data>,
539 E: Into<Box<dyn StdError + Send + Sync>>,
540{
541 type Output = ();
542
543 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
544 self.poll2(cx).map(|res| {
545 if let Err(_e) = res {
546 debug!("stream error: {}", _e);
547 }
548 })
549 }
550}