1use std::{
2 convert::Infallible,
3 future::Future,
4 marker::PhantomData,
5 pin::Pin,
6 task::{Context, Poll},
7 time::Duration,
8};
9
10use crate::rt::{Read, Write};
11use bytes::Bytes;
12use futures_channel::mpsc::{Receiver, Sender};
13use futures_channel::{mpsc, oneshot};
14use futures_core::{ready, FusedFuture, FusedStream, Stream};
15use h2::client::{Builder, Connection, SendRequest};
16use h2::SendStream;
17use http::{Method, StatusCode};
18use pin_project_lite::pin_project;
19
20use super::ping::{Ponger, Recorder};
21use super::{ping, PipeToSendStream, SendBuf};
22use crate::body::{Body, Incoming as IncomingBody};
23use crate::client::dispatch::{Callback, SendWhen, TrySendError};
24use crate::common::either::Either;
25use crate::common::io::Compat;
26use crate::common::time::Time;
27use crate::ext::Protocol;
28use crate::headers;
29use crate::proto::Dispatched;
30use crate::rt::bounds::{Http2ClientConnExec, Http2UpgradedExec};
31use crate::upgrade::Upgraded;
32use crate::{Request, Response};
33use h2::client::ResponseFuture;
34
35type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>;
36
37type ConnDropRef = mpsc::Sender<Infallible>;
40
41type ConnEof = oneshot::Receiver<Infallible>;
44
45const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100;
62
63#[derive(Clone, Debug)]
64pub(crate) struct Config {
65 pub(crate) adaptive_window: bool,
66 pub(crate) initial_conn_window_size: u32,
67 pub(crate) initial_stream_window_size: u32,
68 pub(crate) initial_max_send_streams: usize,
69 pub(crate) max_frame_size: Option<u32>,
70 pub(crate) max_header_list_size: u32,
71 pub(crate) keep_alive_interval: Option<Duration>,
72 pub(crate) keep_alive_timeout: Duration,
73 pub(crate) keep_alive_while_idle: bool,
74 pub(crate) max_concurrent_reset_streams: Option<usize>,
75 pub(crate) max_send_buffer_size: usize,
76 pub(crate) max_pending_accept_reset_streams: Option<usize>,
77 pub(crate) header_table_size: Option<u32>,
78 pub(crate) max_concurrent_streams: Option<u32>,
79}
80
81impl Default for Config {
82 fn default() -> Config {
83 Config {
84 adaptive_window: false,
85 initial_conn_window_size: DEFAULT_CONN_WINDOW,
86 initial_stream_window_size: DEFAULT_STREAM_WINDOW,
87 initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS,
88 max_frame_size: Some(DEFAULT_MAX_FRAME_SIZE),
89 max_header_list_size: DEFAULT_MAX_HEADER_LIST_SIZE,
90 keep_alive_interval: None,
91 keep_alive_timeout: Duration::from_secs(20),
92 keep_alive_while_idle: false,
93 max_concurrent_reset_streams: None,
94 max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
95 max_pending_accept_reset_streams: None,
96 header_table_size: None,
97 max_concurrent_streams: None,
98 }
99 }
100}
101
102fn new_builder(config: &Config) -> Builder {
103 let mut builder = Builder::default();
104 builder
105 .initial_max_send_streams(config.initial_max_send_streams)
106 .initial_window_size(config.initial_stream_window_size)
107 .initial_connection_window_size(config.initial_conn_window_size)
108 .max_header_list_size(config.max_header_list_size)
109 .max_send_buffer_size(config.max_send_buffer_size)
110 .enable_push(false);
111 if let Some(max) = config.max_frame_size {
112 builder.max_frame_size(max);
113 }
114 if let Some(max) = config.max_concurrent_reset_streams {
115 builder.max_concurrent_reset_streams(max);
116 }
117 if let Some(max) = config.max_pending_accept_reset_streams {
118 builder.max_pending_accept_reset_streams(max);
119 }
120 if let Some(size) = config.header_table_size {
121 builder.header_table_size(size);
122 }
123 if let Some(max) = config.max_concurrent_streams {
124 builder.max_concurrent_streams(max);
125 }
126 builder
127}
128
129fn new_ping_config(config: &Config) -> ping::Config {
130 ping::Config {
131 bdp_initial_window: if config.adaptive_window {
132 Some(config.initial_stream_window_size)
133 } else {
134 None
135 },
136 keep_alive_interval: config.keep_alive_interval,
137 keep_alive_timeout: config.keep_alive_timeout,
138 keep_alive_while_idle: config.keep_alive_while_idle,
139 }
140}
141
142pub(crate) async fn handshake<T, B, E>(
143 io: T,
144 req_rx: ClientRx<B>,
145 config: &Config,
146 mut exec: E,
147 timer: Time,
148) -> crate::Result<ClientTask<B, E, T>>
149where
150 T: Read + Write + Unpin,
151 B: Body + 'static,
152 B::Data: Send + 'static,
153 E: Http2ClientConnExec<B, T> + Clone + Unpin,
154 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
155{
156 let (h2_tx, mut conn) = new_builder(config)
157 .handshake::<_, SendBuf<B::Data>>(Compat::new(io))
158 .await
159 .map_err(crate::Error::new_h2)?;
160
161 let (conn_drop_ref, conn_drop_rx) = mpsc::channel(1);
166 let (cancel_tx, conn_eof) = oneshot::channel();
167
168 let ping_config = new_ping_config(config);
169
170 let (conn, ping) = if ping_config.is_enabled() {
171 let pp = conn.ping_pong().expect("conn.ping_pong");
172 let (recorder, ponger) = ping::channel(pp, ping_config, timer);
173
174 let conn: Conn<_, B> = Conn::new(ponger, conn);
175 (Either::left(conn), recorder)
176 } else {
177 (Either::right(conn), ping::disabled())
178 };
179 let conn: ConnMapErr<T, B> = ConnMapErr {
180 conn,
181 is_terminated: false,
182 };
183
184 exec.execute_h2_future(H2ClientFuture::Task {
185 task: ConnTask::new(conn, conn_drop_rx, cancel_tx),
186 });
187
188 Ok(ClientTask {
189 ping,
190 conn_drop_ref,
191 conn_eof,
192 executor: exec,
193 h2_tx,
194 req_rx,
195 fut_ctx: None,
196 marker: PhantomData,
197 })
198}
199
200pin_project! {
201 struct Conn<T, B>
202 where
203 B: Body,
204 {
205 #[pin]
206 ponger: Ponger,
207 #[pin]
208 conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>,
209 }
210}
211
212impl<T, B> Conn<T, B>
213where
214 B: Body,
215 T: Read + Write + Unpin,
216{
217 fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self {
218 Conn { ponger, conn }
219 }
220}
221
222impl<T, B> Future for Conn<T, B>
223where
224 B: Body,
225 T: Read + Write + Unpin,
226{
227 type Output = Result<(), h2::Error>;
228
229 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
230 let mut this = self.project();
231 match this.ponger.poll(cx) {
232 Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
233 this.conn.set_target_window_size(wnd);
234 this.conn.set_initial_window_size(wnd)?;
235 }
236 Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
237 debug!("connection keep-alive timed out");
238 return Poll::Ready(Ok(()));
239 }
240 Poll::Pending => {}
241 }
242
243 Pin::new(&mut this.conn).poll(cx)
244 }
245}
246
247pin_project! {
248 struct ConnMapErr<T, B>
249 where
250 B: Body,
251 T: Read,
252 T: Write,
253 T: Unpin,
254 {
255 #[pin]
256 conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>,
257 #[pin]
258 is_terminated: bool,
259 }
260}
261
262impl<T, B> Future for ConnMapErr<T, B>
263where
264 B: Body,
265 T: Read + Write + Unpin,
266{
267 type Output = Result<(), ()>;
268
269 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
270 let mut this = self.project();
271
272 if *this.is_terminated {
273 return Poll::Pending;
274 }
275 let polled = this.conn.poll(cx);
276 if polled.is_ready() {
277 *this.is_terminated = true;
278 }
279 polled.map_err(|_e| {
280 debug!(error = %_e, "connection error");
281 })
282 }
283}
284
285impl<T, B> FusedFuture for ConnMapErr<T, B>
286where
287 B: Body,
288 T: Read + Write + Unpin,
289{
290 fn is_terminated(&self) -> bool {
291 self.is_terminated
292 }
293}
294
295pin_project! {
296 pub struct ConnTask<T, B>
297 where
298 B: Body,
299 T: Read,
300 T: Write,
301 T: Unpin,
302 {
303 #[pin]
304 drop_rx: Receiver<Infallible>,
305 #[pin]
306 cancel_tx: Option<oneshot::Sender<Infallible>>,
307 #[pin]
308 conn: ConnMapErr<T, B>,
309 }
310}
311
312impl<T, B> ConnTask<T, B>
313where
314 B: Body,
315 T: Read + Write + Unpin,
316{
317 fn new(
318 conn: ConnMapErr<T, B>,
319 drop_rx: Receiver<Infallible>,
320 cancel_tx: oneshot::Sender<Infallible>,
321 ) -> Self {
322 Self {
323 drop_rx,
324 cancel_tx: Some(cancel_tx),
325 conn,
326 }
327 }
328}
329
330impl<T, B> Future for ConnTask<T, B>
331where
332 B: Body,
333 T: Read + Write + Unpin,
334{
335 type Output = ();
336
337 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
338 let mut this = self.project();
339
340 if !this.conn.is_terminated() && Pin::new(&mut this.conn).poll(cx).is_ready() {
341 return Poll::Ready(());
343 }
344
345 if !this.drop_rx.is_terminated() && Pin::new(&mut this.drop_rx).poll_next(cx).is_ready() {
346 trace!("send_request dropped, starting conn shutdown");
350 drop(this.cancel_tx.take().expect("ConnTask Future polled twice"));
351 }
352
353 Poll::Pending
354 }
355}
356
357pin_project! {
358 #[project = H2ClientFutureProject]
359 pub enum H2ClientFuture<B, T, E>
360 where
361 B: http_body::Body,
362 B: 'static,
363 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
364 T: Read,
365 T: Write,
366 T: Unpin,
367 {
368 Pipe {
369 #[pin]
370 pipe: PipeMap<B>,
371 },
372 Send {
373 #[pin]
374 send_when: SendWhen<B, E>,
375 },
376 Task {
377 #[pin]
378 task: ConnTask<T, B>,
379 },
380 }
381}
382
383impl<B, T, E> Future for H2ClientFuture<B, T, E>
384where
385 B: http_body::Body + 'static,
386 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
387 T: Read + Write + Unpin,
388 E: Http2UpgradedExec<B::Data>,
389{
390 type Output = ();
391
392 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
393 let this = self.project();
394
395 match this {
396 H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx),
397 H2ClientFutureProject::Send { send_when } => send_when.poll(cx),
398 H2ClientFutureProject::Task { task } => task.poll(cx),
399 }
400 }
401}
402
403struct FutCtx<B>
404where
405 B: Body,
406{
407 is_connect: bool,
408 eos: bool,
409 fut: ResponseFuture,
410 body_tx: SendStream<SendBuf<B::Data>>,
411 body: B,
412 cb: Callback<Request<B>, Response<IncomingBody>>,
413}
414
415impl<B: Body> Unpin for FutCtx<B> {}
416
417pub(crate) struct ClientTask<B, E, T>
418where
419 B: Body,
420 E: Unpin,
421{
422 ping: ping::Recorder,
423 conn_drop_ref: ConnDropRef,
424 conn_eof: ConnEof,
425 executor: E,
426 h2_tx: SendRequest<SendBuf<B::Data>>,
427 req_rx: ClientRx<B>,
428 fut_ctx: Option<FutCtx<B>>,
429 marker: PhantomData<T>,
430}
431
432impl<B, E, T> ClientTask<B, E, T>
433where
434 B: Body + 'static,
435 E: Http2ClientConnExec<B, T> + Unpin,
436 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
437 T: Read + Write + Unpin,
438{
439 pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
440 self.h2_tx.is_extended_connect_protocol_enabled()
441 }
442}
443
444pin_project! {
445 pub struct PipeMap<S>
446 where
447 S: Body,
448 {
449 #[pin]
450 pipe: PipeToSendStream<S>,
451 #[pin]
452 conn_drop_ref: Option<Sender<Infallible>>,
453 #[pin]
454 ping: Option<Recorder>,
455 }
456}
457
458impl<B> Future for PipeMap<B>
459where
460 B: http_body::Body,
461 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
462{
463 type Output = ();
464
465 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
466 let mut this = self.project();
467
468 match Pin::new(&mut this.pipe).poll(cx) {
469 Poll::Ready(result) => {
470 if let Err(_e) = result {
471 debug!("client request body error: {}", _e);
472 }
473 drop(this.conn_drop_ref.take().expect("Future polled twice"));
474 drop(this.ping.take().expect("Future polled twice"));
475 return Poll::Ready(());
476 }
477 Poll::Pending => (),
478 };
479 Poll::Pending
480 }
481}
482
483impl<B, E, T> ClientTask<B, E, T>
484where
485 B: Body + 'static + Unpin,
486 B::Data: Send,
487 E: Http2ClientConnExec<B, T> + Clone + Unpin,
488 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
489 T: Read + Write + Unpin,
490{
491 fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) {
492 let ping = self.ping.clone();
493
494 let send_stream = if !f.is_connect {
495 if !f.eos {
496 let mut pipe = PipeToSendStream::new(f.body, f.body_tx);
497
498 match Pin::new(&mut pipe).poll(cx) {
501 Poll::Ready(_) => (),
502 Poll::Pending => {
503 let conn_drop_ref = self.conn_drop_ref.clone();
504 let ping = ping.clone();
508
509 let pipe = PipeMap {
510 pipe,
511 conn_drop_ref: Some(conn_drop_ref),
512 ping: Some(ping),
513 };
514 self.executor
516 .execute_h2_future(H2ClientFuture::Pipe { pipe });
517 }
518 }
519 }
520
521 None
522 } else {
523 Some(f.body_tx)
524 };
525
526 self.executor.execute_h2_future(H2ClientFuture::Send {
527 send_when: SendWhen {
528 when: ResponseFutMap {
529 fut: f.fut,
530 ping: Some(ping),
531 send_stream: Some(send_stream),
532 exec: self.executor.clone(),
533 },
534 call_back: Some(f.cb),
535 },
536 });
537 }
538}
539
540pin_project! {
541 pub(crate) struct ResponseFutMap<B, E>
542 where
543 B: Body,
544 B: 'static,
545 {
546 #[pin]
547 fut: ResponseFuture,
548 ping: Option<Recorder>,
549 #[pin]
550 send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>,
551 exec: E,
552 }
553}
554
555impl<B, E> Future for ResponseFutMap<B, E>
556where
557 B: Body + 'static,
558 E: Http2UpgradedExec<B::Data>,
559{
560 type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>;
561
562 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
563 let mut this = self.as_mut().project();
564
565 let result = ready!(this.fut.poll(cx));
566
567 let ping = this.ping.take().expect("Future polled twice");
568 let send_stream = this.send_stream.take().expect("Future polled twice");
569
570 match result {
571 Ok(res) => {
572 ping.record_non_data();
574
575 let content_length = headers::content_length_parse_all(res.headers());
576 if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) {
577 if content_length.map_or(false, |len| len != 0) {
578 warn!("h2 connect response with non-zero body not supported");
579
580 send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
581 return Poll::Ready(Err((
582 crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
583 None::<Request<B>>,
584 )));
585 }
586 let (parts, recv_stream) = res.into_parts();
587 let mut res = Response::from_parts(parts, IncomingBody::empty());
588
589 let (pending, on_upgrade) = crate::upgrade::pending();
590
591 let (h2_up, up_task) = super::upgrade::pair(send_stream, recv_stream, ping);
592 self.exec.execute_upgrade(up_task);
593 let upgraded = Upgraded::new(h2_up, Bytes::new());
594
595 pending.fulfill(upgraded);
596 res.extensions_mut().insert(on_upgrade);
597
598 Poll::Ready(Ok(res))
599 } else {
600 let res = res.map(|stream| {
601 let ping = ping.for_stream(&stream);
602 IncomingBody::h2(stream, content_length.into(), ping)
603 });
604 Poll::Ready(Ok(res))
605 }
606 }
607 Err(err) => {
608 ping.ensure_not_timed_out().map_err(|e| (e, None))?;
609
610 debug!("client response error: {}", err);
611 Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>)))
612 }
613 }
614 }
615}
616
617impl<B, E, T> Future for ClientTask<B, E, T>
618where
619 B: Body + 'static + Unpin,
620 B::Data: Send,
621 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
622 E: Http2ClientConnExec<B, T> + Clone + Unpin,
623 T: Read + Write + Unpin,
624{
625 type Output = crate::Result<Dispatched>;
626
627 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
628 loop {
629 match ready!(self.h2_tx.poll_ready(cx)) {
630 Ok(()) => (),
631 Err(err) => {
632 self.ping.ensure_not_timed_out()?;
633 return if err.reason() == Some(::h2::Reason::NO_ERROR) {
634 trace!("connection gracefully shutdown");
635 Poll::Ready(Ok(Dispatched::Shutdown))
636 } else {
637 Poll::Ready(Err(crate::Error::new_h2(err)))
638 };
639 }
640 };
641
642 if let Some(f) = self.fut_ctx.take() {
645 self.poll_pipe(f, cx);
646 continue;
647 }
648
649 match self.req_rx.poll_recv(cx) {
650 Poll::Ready(Some((req, cb))) => {
651 if cb.is_canceled() {
653 trace!("request callback is canceled");
654 continue;
655 }
656 let (head, body) = req.into_parts();
657 let mut req = ::http::Request::from_parts(head, ());
658 super::strip_connection_headers(req.headers_mut(), true);
659 if let Some(len) = body.size_hint().exact() {
660 if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
661 headers::set_content_length_if_missing(req.headers_mut(), len);
662 }
663 }
664
665 let is_connect = req.method() == Method::CONNECT;
666 let eos = body.is_end_stream();
667
668 if is_connect
669 && headers::content_length_parse_all(req.headers())
670 .map_or(false, |len| len != 0)
671 {
672 debug!("h2 connect request with non-zero body not supported");
673 cb.send(Err(TrySendError {
674 error: crate::Error::new_user_invalid_connect(),
675 message: None,
676 }));
677 continue;
678 }
679
680 if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
681 req.extensions_mut().insert(protocol.into_inner());
682 }
683
684 let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
685 Ok(ok) => ok,
686 Err(err) => {
687 debug!("client send request error: {}", err);
688 cb.send(Err(TrySendError {
689 error: crate::Error::new_h2(err),
690 message: None,
691 }));
692 continue;
693 }
694 };
695
696 let f = FutCtx {
697 is_connect,
698 eos,
699 fut,
700 body_tx,
701 body,
702 cb,
703 };
704
705 match self.h2_tx.poll_ready(cx) {
709 Poll::Pending => {
710 self.fut_ctx = Some(f);
712 return Poll::Pending;
713 }
714 Poll::Ready(Ok(())) => (),
715 Poll::Ready(Err(err)) => {
716 f.cb.send(Err(TrySendError {
717 error: crate::Error::new_h2(err),
718 message: None,
719 }));
720 continue;
721 }
722 }
723 self.poll_pipe(f, cx);
724 continue;
725 }
726
727 Poll::Ready(None) => {
728 trace!("client::dispatch::Sender dropped");
729 return Poll::Ready(Ok(Dispatched::Shutdown));
730 }
731
732 Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
733 #[allow(unused)]
736 Ok(never) => match never {},
737 Err(_conn_is_eof) => {
738 trace!("connection task is closed, closing dispatch task");
739 return Poll::Ready(Ok(Dispatched::Shutdown));
740 }
741 },
742 }
743 }
744 }
745}