reqwest/
connect.rs

1#[cfg(feature = "__tls")]
2use http::header::HeaderValue;
3#[cfg(feature = "__tls")]
4use http::uri::Scheme;
5use http::Uri;
6use hyper::rt::{Read, ReadBufCursor, Write};
7use hyper_util::client::legacy::connect::{Connected, Connection};
8#[cfg(any(feature = "socks", feature = "__tls", unix))]
9use hyper_util::rt::TokioIo;
10#[cfg(feature = "default-tls")]
11use native_tls_crate::{TlsConnector, TlsConnectorBuilder};
12use pin_project_lite::pin_project;
13use tower::util::{BoxCloneSyncServiceLayer, MapRequestLayer};
14use tower::{timeout::TimeoutLayer, util::BoxCloneSyncService, ServiceBuilder};
15use tower_service::Service;
16
17use std::future::Future;
18use std::io::{self, IoSlice};
19use std::net::IpAddr;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use std::time::Duration;
24
25#[cfg(feature = "default-tls")]
26use self::native_tls_conn::NativeTlsConn;
27#[cfg(feature = "__rustls")]
28use self::rustls_tls_conn::RustlsTlsConn;
29use crate::dns::DynResolver;
30use crate::error::{cast_to_internal_error, BoxError};
31use crate::proxy::{Intercepted, Matcher as ProxyMatcher};
32use sealed::{Conn, Unnameable};
33
34pub(crate) type HttpConnector = hyper_util::client::legacy::connect::HttpConnector<DynResolver>;
35
36#[derive(Clone)]
37pub(crate) enum Connector {
38    // base service, with or without an embedded timeout
39    Simple(ConnectorService),
40    // at least one custom layer along with maybe an outer timeout layer
41    // from `builder.connect_timeout()`
42    WithLayers(BoxCloneSyncService<Unnameable, Conn, BoxError>),
43}
44
45impl Service<Uri> for Connector {
46    type Response = Conn;
47    type Error = BoxError;
48    type Future = Connecting;
49
50    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
51        match self {
52            Connector::Simple(service) => service.poll_ready(cx),
53            Connector::WithLayers(service) => service.poll_ready(cx),
54        }
55    }
56
57    fn call(&mut self, dst: Uri) -> Self::Future {
58        match self {
59            Connector::Simple(service) => service.call(dst),
60            Connector::WithLayers(service) => service.call(Unnameable(dst)),
61        }
62    }
63}
64
65pub(crate) type BoxedConnectorService = BoxCloneSyncService<Unnameable, Conn, BoxError>;
66
67pub(crate) type BoxedConnectorLayer =
68    BoxCloneSyncServiceLayer<BoxedConnectorService, Unnameable, Conn, BoxError>;
69
70pub(crate) struct ConnectorBuilder {
71    inner: Inner,
72    proxies: Arc<Vec<ProxyMatcher>>,
73    verbose: verbose::Wrapper,
74    timeout: Option<Duration>,
75    #[cfg(feature = "__tls")]
76    nodelay: bool,
77    #[cfg(feature = "__tls")]
78    tls_info: bool,
79    #[cfg(feature = "__tls")]
80    user_agent: Option<HeaderValue>,
81    #[cfg(feature = "socks")]
82    resolver: Option<DynResolver>,
83    #[cfg(unix)]
84    unix_socket: Option<Arc<std::path::Path>>,
85}
86
87impl ConnectorBuilder {
88    pub(crate) fn build(self, layers: Vec<BoxedConnectorLayer>) -> Connector
89where {
90        // construct the inner tower service
91        let mut base_service = ConnectorService {
92            inner: self.inner,
93            proxies: self.proxies,
94            verbose: self.verbose,
95            #[cfg(feature = "__tls")]
96            nodelay: self.nodelay,
97            #[cfg(feature = "__tls")]
98            tls_info: self.tls_info,
99            #[cfg(feature = "__tls")]
100            user_agent: self.user_agent,
101            simple_timeout: None,
102            #[cfg(feature = "socks")]
103            resolver: self.resolver.unwrap_or_else(DynResolver::gai),
104            #[cfg(unix)]
105            unix_socket: self.unix_socket,
106        };
107
108        #[cfg(unix)]
109        if base_service.unix_socket.is_some() && !base_service.proxies.is_empty() {
110            base_service.proxies = Default::default();
111            log::trace!("unix_socket() set, proxies are ignored");
112        }
113
114        if layers.is_empty() {
115            // we have no user-provided layers, only use concrete types
116            base_service.simple_timeout = self.timeout;
117            return Connector::Simple(base_service);
118        }
119
120        // otherwise we have user provided layers
121        // so we need type erasure all the way through
122        // as well as mapping the unnameable type of the layers back to Uri for the inner service
123        let unnameable_service = ServiceBuilder::new()
124            .layer(MapRequestLayer::new(|request: Unnameable| request.0))
125            .service(base_service);
126        let mut service = BoxCloneSyncService::new(unnameable_service);
127
128        for layer in layers {
129            service = ServiceBuilder::new().layer(layer).service(service);
130        }
131
132        // now we handle the concrete stuff - any `connect_timeout`,
133        // plus a final map_err layer we can use to cast default tower layer
134        // errors to internal errors
135        match self.timeout {
136            Some(timeout) => {
137                let service = ServiceBuilder::new()
138                    .layer(TimeoutLayer::new(timeout))
139                    .service(service);
140                let service = ServiceBuilder::new()
141                    .map_err(|error: BoxError| cast_to_internal_error(error))
142                    .service(service);
143                let service = BoxCloneSyncService::new(service);
144
145                Connector::WithLayers(service)
146            }
147            None => {
148                // no timeout, but still map err
149                // no named timeout layer but we still map errors since
150                // we might have user-provided timeout layer
151                let service = ServiceBuilder::new().service(service);
152                let service = ServiceBuilder::new()
153                    .map_err(|error: BoxError| cast_to_internal_error(error))
154                    .service(service);
155                let service = BoxCloneSyncService::new(service);
156                Connector::WithLayers(service)
157            }
158        }
159    }
160
161    #[cfg(not(feature = "__tls"))]
162    pub(crate) fn new<T>(
163        mut http: HttpConnector,
164        proxies: Arc<Vec<ProxyMatcher>>,
165        local_addr: T,
166        #[cfg(any(
167            target_os = "android",
168            target_os = "fuchsia",
169            target_os = "illumos",
170            target_os = "ios",
171            target_os = "linux",
172            target_os = "macos",
173            target_os = "solaris",
174            target_os = "tvos",
175            target_os = "visionos",
176            target_os = "watchos",
177        ))]
178        interface: Option<&str>,
179        nodelay: bool,
180    ) -> ConnectorBuilder
181    where
182        T: Into<Option<IpAddr>>,
183    {
184        http.set_local_address(local_addr.into());
185        #[cfg(any(
186            target_os = "android",
187            target_os = "fuchsia",
188            target_os = "illumos",
189            target_os = "ios",
190            target_os = "linux",
191            target_os = "macos",
192            target_os = "solaris",
193            target_os = "tvos",
194            target_os = "visionos",
195            target_os = "watchos",
196        ))]
197        if let Some(interface) = interface {
198            http.set_interface(interface.to_owned());
199        }
200        http.set_nodelay(nodelay);
201
202        ConnectorBuilder {
203            inner: Inner::Http(http),
204            proxies,
205            verbose: verbose::OFF,
206            timeout: None,
207            #[cfg(feature = "socks")]
208            resolver: None,
209            #[cfg(unix)]
210            unix_socket: None,
211        }
212    }
213
214    #[cfg(feature = "default-tls")]
215    pub(crate) fn new_default_tls<T>(
216        http: HttpConnector,
217        tls: TlsConnectorBuilder,
218        proxies: Arc<Vec<ProxyMatcher>>,
219        user_agent: Option<HeaderValue>,
220        local_addr: T,
221        #[cfg(any(
222            target_os = "android",
223            target_os = "fuchsia",
224            target_os = "illumos",
225            target_os = "ios",
226            target_os = "linux",
227            target_os = "macos",
228            target_os = "solaris",
229            target_os = "tvos",
230            target_os = "visionos",
231            target_os = "watchos",
232        ))]
233        interface: Option<&str>,
234        nodelay: bool,
235        tls_info: bool,
236    ) -> crate::Result<ConnectorBuilder>
237    where
238        T: Into<Option<IpAddr>>,
239    {
240        let tls = tls.build().map_err(crate::error::builder)?;
241        Ok(Self::from_built_default_tls(
242            http,
243            tls,
244            proxies,
245            user_agent,
246            local_addr,
247            #[cfg(any(
248                target_os = "android",
249                target_os = "fuchsia",
250                target_os = "illumos",
251                target_os = "ios",
252                target_os = "linux",
253                target_os = "macos",
254                target_os = "solaris",
255                target_os = "tvos",
256                target_os = "visionos",
257                target_os = "watchos",
258            ))]
259            interface,
260            nodelay,
261            tls_info,
262        ))
263    }
264
265    #[cfg(feature = "default-tls")]
266    pub(crate) fn from_built_default_tls<T>(
267        mut http: HttpConnector,
268        tls: TlsConnector,
269        proxies: Arc<Vec<ProxyMatcher>>,
270        user_agent: Option<HeaderValue>,
271        local_addr: T,
272        #[cfg(any(
273            target_os = "android",
274            target_os = "fuchsia",
275            target_os = "illumos",
276            target_os = "ios",
277            target_os = "linux",
278            target_os = "macos",
279            target_os = "solaris",
280            target_os = "tvos",
281            target_os = "visionos",
282            target_os = "watchos",
283        ))]
284        interface: Option<&str>,
285        nodelay: bool,
286        tls_info: bool,
287    ) -> ConnectorBuilder
288    where
289        T: Into<Option<IpAddr>>,
290    {
291        http.set_local_address(local_addr.into());
292        #[cfg(any(
293            target_os = "android",
294            target_os = "fuchsia",
295            target_os = "illumos",
296            target_os = "ios",
297            target_os = "linux",
298            target_os = "macos",
299            target_os = "solaris",
300            target_os = "tvos",
301            target_os = "visionos",
302            target_os = "watchos",
303        ))]
304        if let Some(interface) = interface {
305            http.set_interface(interface);
306        }
307        http.set_nodelay(nodelay);
308        http.enforce_http(false);
309
310        ConnectorBuilder {
311            inner: Inner::DefaultTls(http, tls),
312            proxies,
313            verbose: verbose::OFF,
314            nodelay,
315            tls_info,
316            user_agent,
317            timeout: None,
318            #[cfg(feature = "socks")]
319            resolver: None,
320            #[cfg(unix)]
321            unix_socket: None,
322        }
323    }
324
325    #[cfg(feature = "__rustls")]
326    pub(crate) fn new_rustls_tls<T>(
327        mut http: HttpConnector,
328        tls: rustls::ClientConfig,
329        proxies: Arc<Vec<ProxyMatcher>>,
330        user_agent: Option<HeaderValue>,
331        local_addr: T,
332        #[cfg(any(
333            target_os = "android",
334            target_os = "fuchsia",
335            target_os = "illumos",
336            target_os = "ios",
337            target_os = "linux",
338            target_os = "macos",
339            target_os = "solaris",
340            target_os = "tvos",
341            target_os = "visionos",
342            target_os = "watchos",
343        ))]
344        interface: Option<&str>,
345        nodelay: bool,
346        tls_info: bool,
347    ) -> ConnectorBuilder
348    where
349        T: Into<Option<IpAddr>>,
350    {
351        http.set_local_address(local_addr.into());
352        #[cfg(any(
353            target_os = "android",
354            target_os = "fuchsia",
355            target_os = "illumos",
356            target_os = "ios",
357            target_os = "linux",
358            target_os = "macos",
359            target_os = "solaris",
360            target_os = "tvos",
361            target_os = "visionos",
362            target_os = "watchos",
363        ))]
364        if let Some(interface) = interface {
365            http.set_interface(interface.to_owned());
366        }
367        http.set_nodelay(nodelay);
368        http.enforce_http(false);
369
370        let (tls, tls_proxy) = if proxies.is_empty() {
371            let tls = Arc::new(tls);
372            (tls.clone(), tls)
373        } else {
374            let mut tls_proxy = tls.clone();
375            tls_proxy.alpn_protocols.clear();
376            (Arc::new(tls), Arc::new(tls_proxy))
377        };
378
379        ConnectorBuilder {
380            inner: Inner::RustlsTls {
381                http,
382                tls,
383                tls_proxy,
384            },
385            proxies,
386            verbose: verbose::OFF,
387            nodelay,
388            tls_info,
389            user_agent,
390            timeout: None,
391            #[cfg(feature = "socks")]
392            resolver: None,
393            #[cfg(unix)]
394            unix_socket: None,
395        }
396    }
397
398    pub(crate) fn set_timeout(&mut self, timeout: Option<Duration>) {
399        self.timeout = timeout;
400    }
401
402    pub(crate) fn set_verbose(&mut self, enabled: bool) {
403        self.verbose.0 = enabled;
404    }
405
406    pub(crate) fn set_keepalive(&mut self, dur: Option<Duration>) {
407        match &mut self.inner {
408            #[cfg(feature = "default-tls")]
409            Inner::DefaultTls(http, _tls) => http.set_keepalive(dur),
410            #[cfg(feature = "__rustls")]
411            Inner::RustlsTls { http, .. } => http.set_keepalive(dur),
412            #[cfg(not(feature = "__tls"))]
413            Inner::Http(http) => http.set_keepalive(dur),
414        }
415    }
416
417    pub(crate) fn set_keepalive_interval(&mut self, dur: Option<Duration>) {
418        match &mut self.inner {
419            #[cfg(feature = "default-tls")]
420            Inner::DefaultTls(http, _tls) => http.set_keepalive_interval(dur),
421            #[cfg(feature = "__rustls")]
422            Inner::RustlsTls { http, .. } => http.set_keepalive_interval(dur),
423            #[cfg(not(feature = "__tls"))]
424            Inner::Http(http) => http.set_keepalive_interval(dur),
425        }
426    }
427
428    pub(crate) fn set_keepalive_retries(&mut self, retries: Option<u32>) {
429        match &mut self.inner {
430            #[cfg(feature = "default-tls")]
431            Inner::DefaultTls(http, _tls) => http.set_keepalive_retries(retries),
432            #[cfg(feature = "__rustls")]
433            Inner::RustlsTls { http, .. } => http.set_keepalive_retries(retries),
434            #[cfg(not(feature = "__tls"))]
435            Inner::Http(http) => http.set_keepalive_retries(retries),
436        }
437    }
438
439    #[cfg(feature = "socks")]
440    pub(crate) fn set_socks_resolver(&mut self, resolver: DynResolver) {
441        self.resolver = Some(resolver);
442    }
443
444    #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
445    pub(crate) fn set_tcp_user_timeout(&mut self, dur: Option<Duration>) {
446        match &mut self.inner {
447            #[cfg(feature = "default-tls")]
448            Inner::DefaultTls(http, _tls) => http.set_tcp_user_timeout(dur),
449            #[cfg(feature = "__rustls")]
450            Inner::RustlsTls { http, .. } => http.set_tcp_user_timeout(dur),
451            #[cfg(not(feature = "__tls"))]
452            Inner::Http(http) => http.set_tcp_user_timeout(dur),
453        }
454    }
455
456    #[cfg(unix)]
457    pub(crate) fn set_unix_socket(&mut self, path: Option<Arc<std::path::Path>>) {
458        self.unix_socket = path;
459    }
460}
461
462#[allow(missing_debug_implementations)]
463#[derive(Clone)]
464pub(crate) struct ConnectorService {
465    inner: Inner,
466    proxies: Arc<Vec<ProxyMatcher>>,
467    verbose: verbose::Wrapper,
468    /// When there is a single timeout layer and no other layers,
469    /// we embed it directly inside our base Service::call().
470    /// This lets us avoid an extra `Box::pin` indirection layer
471    /// since `tokio::time::Timeout` is `Unpin`
472    simple_timeout: Option<Duration>,
473    #[cfg(feature = "__tls")]
474    nodelay: bool,
475    #[cfg(feature = "__tls")]
476    tls_info: bool,
477    #[cfg(feature = "__tls")]
478    user_agent: Option<HeaderValue>,
479    #[cfg(feature = "socks")]
480    resolver: DynResolver,
481    /// If set, this always takes priority over TCP.
482    #[cfg(unix)]
483    unix_socket: Option<Arc<std::path::Path>>,
484}
485
486#[derive(Clone)]
487enum Inner {
488    #[cfg(not(feature = "__tls"))]
489    Http(HttpConnector),
490    #[cfg(feature = "default-tls")]
491    DefaultTls(HttpConnector, TlsConnector),
492    #[cfg(feature = "__rustls")]
493    RustlsTls {
494        http: HttpConnector,
495        tls: Arc<rustls::ClientConfig>,
496        tls_proxy: Arc<rustls::ClientConfig>,
497    },
498}
499
500impl Inner {
501    #[cfg(feature = "socks")]
502    fn get_http_connector(&mut self) -> &mut crate::connect::HttpConnector {
503        match self {
504            #[cfg(feature = "default-tls")]
505            Inner::DefaultTls(http, _) => http,
506            #[cfg(feature = "__rustls")]
507            Inner::RustlsTls { http, .. } => http,
508            #[cfg(not(feature = "__tls"))]
509            Inner::Http(http) => http,
510        }
511    }
512}
513
514impl ConnectorService {
515    #[cfg(feature = "socks")]
516    async fn connect_socks(mut self, dst: Uri, proxy: Intercepted) -> Result<Conn, BoxError> {
517        let dns = match proxy.uri().scheme_str() {
518            Some("socks4") | Some("socks5") => socks::DnsResolve::Local,
519            Some("socks4a") | Some("socks5h") => socks::DnsResolve::Proxy,
520            _ => {
521                unreachable!("connect_socks is only called for socks proxies");
522            }
523        };
524
525        match &mut self.inner {
526            #[cfg(feature = "default-tls")]
527            Inner::DefaultTls(http, tls) => {
528                if dst.scheme() == Some(&Scheme::HTTPS) {
529                    let host = dst.host().ok_or("no host in url")?.to_string();
530                    let conn = socks::connect(proxy, dst, dns, &self.resolver, http).await?;
531                    let conn = TokioIo::new(conn);
532                    let conn = TokioIo::new(conn);
533                    let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
534                    let io = tls_connector.connect(&host, conn).await?;
535                    let io = TokioIo::new(io);
536                    return Ok(Conn {
537                        inner: self.verbose.wrap(NativeTlsConn { inner: io }),
538                        is_proxy: false,
539                        tls_info: self.tls_info,
540                    });
541                }
542            }
543            #[cfg(feature = "__rustls")]
544            Inner::RustlsTls { http, tls, .. } => {
545                if dst.scheme() == Some(&Scheme::HTTPS) {
546                    use std::convert::TryFrom;
547                    use tokio_rustls::TlsConnector as RustlsConnector;
548
549                    let tls = tls.clone();
550                    let host = dst.host().ok_or("no host in url")?.to_string();
551                    let conn = socks::connect(proxy, dst, dns, &self.resolver, http).await?;
552                    let conn = TokioIo::new(conn);
553                    let conn = TokioIo::new(conn);
554                    let server_name =
555                        rustls_pki_types::ServerName::try_from(host.as_str().to_owned())
556                            .map_err(|_| "Invalid Server Name")?;
557                    let io = RustlsConnector::from(tls)
558                        .connect(server_name, conn)
559                        .await?;
560                    let io = TokioIo::new(io);
561                    return Ok(Conn {
562                        inner: self.verbose.wrap(RustlsTlsConn { inner: io }),
563                        is_proxy: false,
564                        tls_info: false,
565                    });
566                }
567            }
568            #[cfg(not(feature = "__tls"))]
569            Inner::Http(http) => {
570                let conn = socks::connect(proxy, dst, dns, &self.resolver, http).await?;
571                return Ok(Conn {
572                    inner: self.verbose.wrap(TokioIo::new(conn)),
573                    is_proxy: false,
574                    tls_info: false,
575                });
576            }
577        }
578
579        let resolver = &self.resolver;
580        let http = self.inner.get_http_connector();
581        socks::connect(proxy, dst, dns, resolver, http)
582            .await
583            .map(|tcp| Conn {
584                inner: self.verbose.wrap(TokioIo::new(tcp)),
585                is_proxy: false,
586                tls_info: false,
587            })
588            .map_err(Into::into)
589    }
590
591    async fn connect_with_maybe_proxy(self, dst: Uri, is_proxy: bool) -> Result<Conn, BoxError> {
592        match self.inner {
593            #[cfg(not(feature = "__tls"))]
594            Inner::Http(mut http) => {
595                let io = http.call(dst).await?;
596                Ok(Conn {
597                    inner: self.verbose.wrap(io),
598                    is_proxy,
599                    tls_info: false,
600                })
601            }
602            #[cfg(feature = "default-tls")]
603            Inner::DefaultTls(http, tls) => {
604                let mut http = http.clone();
605
606                // Disable Nagle's algorithm for TLS handshake
607                //
608                // https://www.openssl.org/docs/man1.1.1/man3/SSL_connect.html#NOTES
609                if !self.nodelay && (dst.scheme() == Some(&Scheme::HTTPS)) {
610                    http.set_nodelay(true);
611                }
612
613                let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
614                let mut http = hyper_tls::HttpsConnector::from((http, tls_connector));
615                let io = http.call(dst).await?;
616
617                if let hyper_tls::MaybeHttpsStream::Https(stream) = io {
618                    if !self.nodelay {
619                        stream
620                            .inner()
621                            .get_ref()
622                            .get_ref()
623                            .get_ref()
624                            .inner()
625                            .inner()
626                            .set_nodelay(false)?;
627                    }
628                    Ok(Conn {
629                        inner: self.verbose.wrap(NativeTlsConn { inner: stream }),
630                        is_proxy,
631                        tls_info: self.tls_info,
632                    })
633                } else {
634                    Ok(Conn {
635                        inner: self.verbose.wrap(io),
636                        is_proxy,
637                        tls_info: false,
638                    })
639                }
640            }
641            #[cfg(feature = "__rustls")]
642            Inner::RustlsTls { http, tls, .. } => {
643                let mut http = http.clone();
644
645                // Disable Nagle's algorithm for TLS handshake
646                //
647                // https://www.openssl.org/docs/man1.1.1/man3/SSL_connect.html#NOTES
648                if !self.nodelay && (dst.scheme() == Some(&Scheme::HTTPS)) {
649                    http.set_nodelay(true);
650                }
651
652                let mut http = hyper_rustls::HttpsConnector::from((http, tls.clone()));
653                let io = http.call(dst).await?;
654
655                if let hyper_rustls::MaybeHttpsStream::Https(stream) = io {
656                    if !self.nodelay {
657                        let (io, _) = stream.inner().get_ref();
658                        io.inner().inner().set_nodelay(false)?;
659                    }
660                    Ok(Conn {
661                        inner: self.verbose.wrap(RustlsTlsConn { inner: stream }),
662                        is_proxy,
663                        tls_info: self.tls_info,
664                    })
665                } else {
666                    Ok(Conn {
667                        inner: self.verbose.wrap(io),
668                        is_proxy,
669                        tls_info: false,
670                    })
671                }
672            }
673        }
674    }
675
676    /// Connect over Unix Domain Socket (or Windows?).
677    #[cfg(unix)]
678    async fn connect_local_transport(self, dst: Uri) -> Result<Conn, BoxError> {
679        let path = self
680            .unix_socket
681            .as_ref()
682            .expect("connect local must have socket path")
683            .clone();
684        let svc = tower::service_fn(move |_| {
685            let fut = tokio::net::UnixStream::connect(path.clone());
686            async move {
687                let io = fut.await?;
688                Ok::<_, std::io::Error>(TokioIo::new(io))
689            }
690        });
691        let is_proxy = false;
692        match self.inner {
693            #[cfg(not(feature = "__tls"))]
694            Inner::Http(..) => {
695                let mut svc = svc;
696                let io = svc.call(dst).await?;
697                Ok(Conn {
698                    inner: self.verbose.wrap(io),
699                    is_proxy,
700                    tls_info: false,
701                })
702            }
703            #[cfg(feature = "default-tls")]
704            Inner::DefaultTls(_, tls) => {
705                let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
706                let mut http = hyper_tls::HttpsConnector::from((svc, tls_connector));
707                let io = http.call(dst).await?;
708
709                if let hyper_tls::MaybeHttpsStream::Https(stream) = io {
710                    Ok(Conn {
711                        inner: self.verbose.wrap(NativeTlsConn { inner: stream }),
712                        is_proxy,
713                        tls_info: self.tls_info,
714                    })
715                } else {
716                    Ok(Conn {
717                        inner: self.verbose.wrap(io),
718                        is_proxy,
719                        tls_info: false,
720                    })
721                }
722            }
723            #[cfg(feature = "__rustls")]
724            Inner::RustlsTls { tls, .. } => {
725                let mut http = hyper_rustls::HttpsConnector::from((svc, tls.clone()));
726                let io = http.call(dst).await?;
727
728                if let hyper_rustls::MaybeHttpsStream::Https(stream) = io {
729                    Ok(Conn {
730                        inner: self.verbose.wrap(RustlsTlsConn { inner: stream }),
731                        is_proxy,
732                        tls_info: self.tls_info,
733                    })
734                } else {
735                    Ok(Conn {
736                        inner: self.verbose.wrap(io),
737                        is_proxy,
738                        tls_info: false,
739                    })
740                }
741            }
742        }
743    }
744
745    async fn connect_via_proxy(self, dst: Uri, proxy: Intercepted) -> Result<Conn, BoxError> {
746        log::debug!("proxy({proxy:?}) intercepts '{dst:?}'");
747
748        #[cfg(feature = "socks")]
749        match proxy.uri().scheme_str().ok_or("proxy scheme expected")? {
750            "socks4" | "socks4a" | "socks5" | "socks5h" => {
751                return self.connect_socks(dst, proxy).await
752            }
753            _ => (),
754        }
755
756        let proxy_dst = proxy.uri().clone();
757        #[cfg(feature = "__tls")]
758        let auth = proxy.basic_auth().cloned();
759
760        #[cfg(feature = "__tls")]
761        let misc = proxy.custom_headers().clone();
762
763        match &self.inner {
764            #[cfg(feature = "default-tls")]
765            Inner::DefaultTls(http, tls) => {
766                if dst.scheme() == Some(&Scheme::HTTPS) {
767                    log::trace!("tunneling HTTPS over proxy");
768                    let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
769                    let inner =
770                        hyper_tls::HttpsConnector::from((http.clone(), tls_connector.clone()));
771                    // TODO: we could cache constructing this
772                    let mut tunnel =
773                        hyper_util::client::legacy::connect::proxy::Tunnel::new(proxy_dst, inner);
774                    if let Some(auth) = auth {
775                        tunnel = tunnel.with_auth(auth);
776                    }
777                    if let Some(ua) = self.user_agent {
778                        let mut headers = http::HeaderMap::new();
779                        headers.insert(http::header::USER_AGENT, ua);
780                        tunnel = tunnel.with_headers(headers);
781                    }
782                    // Note that custom headers may override the user agent header.
783                    if let Some(custom_headers) = misc {
784                        tunnel = tunnel.with_headers(custom_headers.clone());
785                    }
786                    // We don't wrap this again in an HttpsConnector since that uses Maybe,
787                    // and we know this is definitely HTTPS.
788                    let tunneled = tunnel.call(dst.clone()).await?;
789                    let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
790                    let io = tls_connector
791                        .connect(dst.host().ok_or("no host in url")?, TokioIo::new(tunneled))
792                        .await?;
793                    return Ok(Conn {
794                        inner: self.verbose.wrap(NativeTlsConn {
795                            inner: TokioIo::new(io),
796                        }),
797                        is_proxy: false,
798                        tls_info: false,
799                    });
800                }
801            }
802            #[cfg(feature = "__rustls")]
803            Inner::RustlsTls {
804                http,
805                tls,
806                tls_proxy,
807            } => {
808                if dst.scheme() == Some(&Scheme::HTTPS) {
809                    use rustls_pki_types::ServerName;
810                    use std::convert::TryFrom;
811                    use tokio_rustls::TlsConnector as RustlsConnector;
812
813                    log::trace!("tunneling HTTPS over proxy");
814                    let http = http.clone();
815                    let inner = hyper_rustls::HttpsConnector::from((http, tls_proxy.clone()));
816                    // TODO: we could cache constructing this
817                    let mut tunnel =
818                        hyper_util::client::legacy::connect::proxy::Tunnel::new(proxy_dst, inner);
819                    if let Some(auth) = auth {
820                        tunnel = tunnel.with_auth(auth);
821                    }
822                    if let Some(custom_headers) = misc {
823                        tunnel = tunnel.with_headers(custom_headers.clone());
824                    }
825                    if let Some(ua) = self.user_agent {
826                        let mut headers = http::HeaderMap::new();
827                        headers.insert(http::header::USER_AGENT, ua);
828                        tunnel = tunnel.with_headers(headers);
829                    }
830                    // We don't wrap this again in an HttpsConnector since that uses Maybe,
831                    // and we know this is definitely HTTPS.
832                    let tunneled = tunnel.call(dst.clone()).await?;
833                    let host = dst.host().ok_or("no host in url")?.to_string();
834                    let server_name = ServerName::try_from(host.as_str().to_owned())
835                        .map_err(|_| "Invalid Server Name")?;
836                    let io = RustlsConnector::from(tls.clone())
837                        .connect(server_name, TokioIo::new(tunneled))
838                        .await?;
839
840                    return Ok(Conn {
841                        inner: self.verbose.wrap(RustlsTlsConn {
842                            inner: TokioIo::new(io),
843                        }),
844                        is_proxy: false,
845                        tls_info: false,
846                    });
847                }
848            }
849            #[cfg(not(feature = "__tls"))]
850            Inner::Http(_) => (),
851        }
852
853        self.connect_with_maybe_proxy(proxy_dst, true).await
854    }
855}
856
857async fn with_timeout<T, F>(f: F, timeout: Option<Duration>) -> Result<T, BoxError>
858where
859    F: Future<Output = Result<T, BoxError>>,
860{
861    if let Some(to) = timeout {
862        match tokio::time::timeout(to, f).await {
863            Err(_elapsed) => Err(Box::new(crate::error::TimedOut) as BoxError),
864            Ok(Ok(try_res)) => Ok(try_res),
865            Ok(Err(e)) => Err(e),
866        }
867    } else {
868        f.await
869    }
870}
871
872impl Service<Uri> for ConnectorService {
873    type Response = Conn;
874    type Error = BoxError;
875    type Future = Connecting;
876
877    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
878        Poll::Ready(Ok(()))
879    }
880
881    fn call(&mut self, dst: Uri) -> Self::Future {
882        log::debug!("starting new connection: {dst:?}");
883        let timeout = self.simple_timeout;
884
885        // Local transports (UDS) skip proxies
886        #[cfg(unix)]
887        if self.unix_socket.is_some() {
888            return Box::pin(with_timeout(
889                self.clone().connect_local_transport(dst),
890                timeout,
891            ));
892        }
893
894        for prox in self.proxies.iter() {
895            if let Some(intercepted) = prox.intercept(&dst) {
896                return Box::pin(with_timeout(
897                    self.clone().connect_via_proxy(dst, intercepted),
898                    timeout,
899                ));
900            }
901        }
902
903        Box::pin(with_timeout(
904            self.clone().connect_with_maybe_proxy(dst, false),
905            timeout,
906        ))
907    }
908}
909
910#[cfg(feature = "__tls")]
911trait TlsInfoFactory {
912    fn tls_info(&self) -> Option<crate::tls::TlsInfo>;
913}
914
915#[cfg(feature = "__tls")]
916impl<T: TlsInfoFactory> TlsInfoFactory for TokioIo<T> {
917    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
918        self.inner().tls_info()
919    }
920}
921
922// ===== TcpStream =====
923
924#[cfg(feature = "__tls")]
925impl TlsInfoFactory for tokio::net::TcpStream {
926    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
927        None
928    }
929}
930
931#[cfg(feature = "default-tls")]
932impl TlsInfoFactory for tokio_native_tls::TlsStream<TokioIo<TokioIo<tokio::net::TcpStream>>> {
933    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
934        let peer_certificate = self
935            .get_ref()
936            .peer_certificate()
937            .ok()
938            .flatten()
939            .and_then(|c| c.to_der().ok());
940        Some(crate::tls::TlsInfo { peer_certificate })
941    }
942}
943
944#[cfg(feature = "default-tls")]
945impl TlsInfoFactory
946    for tokio_native_tls::TlsStream<
947        TokioIo<hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>>>,
948    >
949{
950    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
951        let peer_certificate = self
952            .get_ref()
953            .peer_certificate()
954            .ok()
955            .flatten()
956            .and_then(|c| c.to_der().ok());
957        Some(crate::tls::TlsInfo { peer_certificate })
958    }
959}
960
961#[cfg(feature = "default-tls")]
962impl TlsInfoFactory for hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>> {
963    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
964        match self {
965            hyper_tls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
966            hyper_tls::MaybeHttpsStream::Http(_) => None,
967        }
968    }
969}
970
971#[cfg(feature = "__rustls")]
972impl TlsInfoFactory for tokio_rustls::client::TlsStream<TokioIo<TokioIo<tokio::net::TcpStream>>> {
973    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
974        let peer_certificate = self
975            .get_ref()
976            .1
977            .peer_certificates()
978            .and_then(|certs| certs.first())
979            .map(|c| c.to_vec());
980        Some(crate::tls::TlsInfo { peer_certificate })
981    }
982}
983
984#[cfg(feature = "__rustls")]
985impl TlsInfoFactory
986    for tokio_rustls::client::TlsStream<
987        TokioIo<hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>>>,
988    >
989{
990    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
991        let peer_certificate = self
992            .get_ref()
993            .1
994            .peer_certificates()
995            .and_then(|certs| certs.first())
996            .map(|c| c.to_vec());
997        Some(crate::tls::TlsInfo { peer_certificate })
998    }
999}
1000
1001#[cfg(feature = "__rustls")]
1002impl TlsInfoFactory for hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>> {
1003    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1004        match self {
1005            hyper_rustls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
1006            hyper_rustls::MaybeHttpsStream::Http(_) => None,
1007        }
1008    }
1009}
1010
1011// ===== UnixStream =====
1012
1013#[cfg(feature = "__tls")]
1014#[cfg(unix)]
1015impl TlsInfoFactory for tokio::net::UnixStream {
1016    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1017        None
1018    }
1019}
1020
1021#[cfg(feature = "default-tls")]
1022#[cfg(unix)]
1023impl TlsInfoFactory for tokio_native_tls::TlsStream<TokioIo<TokioIo<tokio::net::UnixStream>>> {
1024    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1025        let peer_certificate = self
1026            .get_ref()
1027            .peer_certificate()
1028            .ok()
1029            .flatten()
1030            .and_then(|c| c.to_der().ok());
1031        Some(crate::tls::TlsInfo { peer_certificate })
1032    }
1033}
1034
1035#[cfg(feature = "default-tls")]
1036#[cfg(unix)]
1037impl TlsInfoFactory
1038    for tokio_native_tls::TlsStream<
1039        TokioIo<hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::UnixStream>>>,
1040    >
1041{
1042    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1043        let peer_certificate = self
1044            .get_ref()
1045            .peer_certificate()
1046            .ok()
1047            .flatten()
1048            .and_then(|c| c.to_der().ok());
1049        Some(crate::tls::TlsInfo { peer_certificate })
1050    }
1051}
1052
1053#[cfg(feature = "default-tls")]
1054#[cfg(unix)]
1055impl TlsInfoFactory for hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::UnixStream>> {
1056    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1057        match self {
1058            hyper_tls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
1059            hyper_tls::MaybeHttpsStream::Http(_) => None,
1060        }
1061    }
1062}
1063
1064#[cfg(feature = "__rustls")]
1065#[cfg(unix)]
1066impl TlsInfoFactory for tokio_rustls::client::TlsStream<TokioIo<TokioIo<tokio::net::UnixStream>>> {
1067    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1068        let peer_certificate = self
1069            .get_ref()
1070            .1
1071            .peer_certificates()
1072            .and_then(|certs| certs.first())
1073            .map(|c| c.to_vec());
1074        Some(crate::tls::TlsInfo { peer_certificate })
1075    }
1076}
1077
1078#[cfg(feature = "__rustls")]
1079#[cfg(unix)]
1080impl TlsInfoFactory
1081    for tokio_rustls::client::TlsStream<
1082        TokioIo<hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::UnixStream>>>,
1083    >
1084{
1085    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1086        let peer_certificate = self
1087            .get_ref()
1088            .1
1089            .peer_certificates()
1090            .and_then(|certs| certs.first())
1091            .map(|c| c.to_vec());
1092        Some(crate::tls::TlsInfo { peer_certificate })
1093    }
1094}
1095
1096#[cfg(feature = "__rustls")]
1097#[cfg(unix)]
1098impl TlsInfoFactory for hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::UnixStream>> {
1099    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1100        match self {
1101            hyper_rustls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
1102            hyper_rustls::MaybeHttpsStream::Http(_) => None,
1103        }
1104    }
1105}
1106
1107pub(crate) trait AsyncConn:
1108    Read + Write + Connection + Send + Sync + Unpin + 'static
1109{
1110}
1111
1112impl<T: Read + Write + Connection + Send + Sync + Unpin + 'static> AsyncConn for T {}
1113
1114#[cfg(feature = "__tls")]
1115trait AsyncConnWithInfo: AsyncConn + TlsInfoFactory {}
1116#[cfg(not(feature = "__tls"))]
1117trait AsyncConnWithInfo: AsyncConn {}
1118
1119#[cfg(feature = "__tls")]
1120impl<T: AsyncConn + TlsInfoFactory> AsyncConnWithInfo for T {}
1121#[cfg(not(feature = "__tls"))]
1122impl<T: AsyncConn> AsyncConnWithInfo for T {}
1123
1124type BoxConn = Box<dyn AsyncConnWithInfo>;
1125
1126pub(crate) mod sealed {
1127    use super::*;
1128    #[derive(Debug)]
1129    pub struct Unnameable(pub(super) Uri);
1130
1131    pin_project! {
1132        /// Note: the `is_proxy` member means *is plain text HTTP proxy*.
1133        /// This tells hyper whether the URI should be written in
1134        /// * origin-form (`GET /just/a/path HTTP/1.1`), when `is_proxy == false`, or
1135        /// * absolute-form (`GET http://foo.bar/and/a/path HTTP/1.1`), otherwise.
1136        #[allow(missing_debug_implementations)]
1137        pub struct Conn {
1138            #[pin]
1139            pub(super)inner: BoxConn,
1140            pub(super) is_proxy: bool,
1141            // Only needed for __tls, but #[cfg()] on fields breaks pin_project!
1142            pub(super) tls_info: bool,
1143        }
1144    }
1145
1146    impl Connection for Conn {
1147        fn connected(&self) -> Connected {
1148            let connected = self.inner.connected().proxy(self.is_proxy);
1149            #[cfg(feature = "__tls")]
1150            if self.tls_info {
1151                if let Some(tls_info) = self.inner.tls_info() {
1152                    connected.extra(tls_info)
1153                } else {
1154                    connected
1155                }
1156            } else {
1157                connected
1158            }
1159            #[cfg(not(feature = "__tls"))]
1160            connected
1161        }
1162    }
1163
1164    impl Read for Conn {
1165        fn poll_read(
1166            self: Pin<&mut Self>,
1167            cx: &mut Context,
1168            buf: ReadBufCursor<'_>,
1169        ) -> Poll<io::Result<()>> {
1170            let this = self.project();
1171            Read::poll_read(this.inner, cx, buf)
1172        }
1173    }
1174
1175    impl Write for Conn {
1176        fn poll_write(
1177            self: Pin<&mut Self>,
1178            cx: &mut Context,
1179            buf: &[u8],
1180        ) -> Poll<Result<usize, io::Error>> {
1181            let this = self.project();
1182            Write::poll_write(this.inner, cx, buf)
1183        }
1184
1185        fn poll_write_vectored(
1186            self: Pin<&mut Self>,
1187            cx: &mut Context<'_>,
1188            bufs: &[IoSlice<'_>],
1189        ) -> Poll<Result<usize, io::Error>> {
1190            let this = self.project();
1191            Write::poll_write_vectored(this.inner, cx, bufs)
1192        }
1193
1194        fn is_write_vectored(&self) -> bool {
1195            self.inner.is_write_vectored()
1196        }
1197
1198        fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
1199            let this = self.project();
1200            Write::poll_flush(this.inner, cx)
1201        }
1202
1203        fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
1204            let this = self.project();
1205            Write::poll_shutdown(this.inner, cx)
1206        }
1207    }
1208}
1209
1210// Some sealed things for UDS
1211#[cfg(unix)]
1212pub(crate) mod uds {
1213    use std::path::Path;
1214
1215    /// A provider for Unix Domain Socket paths.
1216    ///
1217    /// This trait is sealed. This allows us expand the support in the future
1218    /// by controlling who can implement the trait.
1219    ///
1220    /// It's available in the docs to see what type may be passed in.
1221    #[cfg(unix)]
1222    pub trait UnixSocketProvider {
1223        #[doc(hidden)]
1224        fn reqwest_uds_path(&self, _: Internal) -> &Path;
1225    }
1226
1227    #[allow(missing_debug_implementations)]
1228    pub struct Internal;
1229
1230    macro_rules! as_path {
1231        ($($t:ty,)+) => {
1232            $(
1233                impl UnixSocketProvider for $t {
1234                    #[doc(hidden)]
1235                    fn reqwest_uds_path(&self, _: Internal) -> &Path {
1236                        self.as_ref()
1237                    }
1238                }
1239            )+
1240        }
1241    }
1242
1243    as_path![
1244        String,
1245        &'_ str,
1246        &'_ Path,
1247        std::path::PathBuf,
1248        std::sync::Arc<Path>,
1249    ];
1250}
1251
1252pub(crate) type Connecting = Pin<Box<dyn Future<Output = Result<Conn, BoxError>> + Send>>;
1253
1254#[cfg(feature = "default-tls")]
1255mod native_tls_conn {
1256    use super::TlsInfoFactory;
1257    use hyper::rt::{Read, ReadBufCursor, Write};
1258    use hyper_tls::MaybeHttpsStream;
1259    use hyper_util::client::legacy::connect::{Connected, Connection};
1260    use hyper_util::rt::TokioIo;
1261    use pin_project_lite::pin_project;
1262    use std::{
1263        io::{self, IoSlice},
1264        pin::Pin,
1265        task::{Context, Poll},
1266    };
1267    use tokio::io::{AsyncRead, AsyncWrite};
1268    use tokio::net::TcpStream;
1269    use tokio_native_tls::TlsStream;
1270
1271    pin_project! {
1272        pub(super) struct NativeTlsConn<T> {
1273            #[pin] pub(super) inner: TokioIo<TlsStream<T>>,
1274        }
1275    }
1276
1277    impl Connection for NativeTlsConn<TokioIo<TokioIo<TcpStream>>> {
1278        fn connected(&self) -> Connected {
1279            let connected = self
1280                .inner
1281                .inner()
1282                .get_ref()
1283                .get_ref()
1284                .get_ref()
1285                .inner()
1286                .connected();
1287            #[cfg(feature = "native-tls-alpn")]
1288            match self.inner.inner().get_ref().negotiated_alpn().ok() {
1289                Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1290                _ => connected,
1291            }
1292            #[cfg(not(feature = "native-tls-alpn"))]
1293            connected
1294        }
1295    }
1296
1297    impl Connection for NativeTlsConn<TokioIo<MaybeHttpsStream<TokioIo<TcpStream>>>> {
1298        fn connected(&self) -> Connected {
1299            let connected = self
1300                .inner
1301                .inner()
1302                .get_ref()
1303                .get_ref()
1304                .get_ref()
1305                .inner()
1306                .connected();
1307            #[cfg(feature = "native-tls-alpn")]
1308            match self.inner.inner().get_ref().negotiated_alpn().ok() {
1309                Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1310                _ => connected,
1311            }
1312            #[cfg(not(feature = "native-tls-alpn"))]
1313            connected
1314        }
1315    }
1316
1317    #[cfg(unix)]
1318    impl Connection for NativeTlsConn<TokioIo<TokioIo<tokio::net::UnixStream>>> {
1319        fn connected(&self) -> Connected {
1320            let connected = Connected::new();
1321            #[cfg(feature = "native-tls-alpn")]
1322            match self.inner.inner().get_ref().negotiated_alpn().ok() {
1323                Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1324                _ => connected,
1325            }
1326            #[cfg(not(feature = "native-tls-alpn"))]
1327            connected
1328        }
1329    }
1330
1331    #[cfg(unix)]
1332    impl Connection for NativeTlsConn<TokioIo<MaybeHttpsStream<TokioIo<tokio::net::UnixStream>>>> {
1333        fn connected(&self) -> Connected {
1334            let connected = Connected::new();
1335            #[cfg(feature = "native-tls-alpn")]
1336            match self.inner.inner().get_ref().negotiated_alpn().ok() {
1337                Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1338                _ => connected,
1339            }
1340            #[cfg(not(feature = "native-tls-alpn"))]
1341            connected
1342        }
1343    }
1344
1345    impl<T: AsyncRead + AsyncWrite + Unpin> Read for NativeTlsConn<T> {
1346        fn poll_read(
1347            self: Pin<&mut Self>,
1348            cx: &mut Context,
1349            buf: ReadBufCursor<'_>,
1350        ) -> Poll<tokio::io::Result<()>> {
1351            let this = self.project();
1352            Read::poll_read(this.inner, cx, buf)
1353        }
1354    }
1355
1356    impl<T: AsyncRead + AsyncWrite + Unpin> Write for NativeTlsConn<T> {
1357        fn poll_write(
1358            self: Pin<&mut Self>,
1359            cx: &mut Context,
1360            buf: &[u8],
1361        ) -> Poll<Result<usize, tokio::io::Error>> {
1362            let this = self.project();
1363            Write::poll_write(this.inner, cx, buf)
1364        }
1365
1366        fn poll_write_vectored(
1367            self: Pin<&mut Self>,
1368            cx: &mut Context<'_>,
1369            bufs: &[IoSlice<'_>],
1370        ) -> Poll<Result<usize, io::Error>> {
1371            let this = self.project();
1372            Write::poll_write_vectored(this.inner, cx, bufs)
1373        }
1374
1375        fn is_write_vectored(&self) -> bool {
1376            self.inner.is_write_vectored()
1377        }
1378
1379        fn poll_flush(
1380            self: Pin<&mut Self>,
1381            cx: &mut Context,
1382        ) -> Poll<Result<(), tokio::io::Error>> {
1383            let this = self.project();
1384            Write::poll_flush(this.inner, cx)
1385        }
1386
1387        fn poll_shutdown(
1388            self: Pin<&mut Self>,
1389            cx: &mut Context,
1390        ) -> Poll<Result<(), tokio::io::Error>> {
1391            let this = self.project();
1392            Write::poll_shutdown(this.inner, cx)
1393        }
1394    }
1395
1396    impl<T> TlsInfoFactory for NativeTlsConn<T>
1397    where
1398        TokioIo<TlsStream<T>>: TlsInfoFactory,
1399    {
1400        fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1401            self.inner.tls_info()
1402        }
1403    }
1404}
1405
1406#[cfg(feature = "__rustls")]
1407mod rustls_tls_conn {
1408    use super::TlsInfoFactory;
1409    use hyper::rt::{Read, ReadBufCursor, Write};
1410    use hyper_rustls::MaybeHttpsStream;
1411    use hyper_util::client::legacy::connect::{Connected, Connection};
1412    use hyper_util::rt::TokioIo;
1413    use pin_project_lite::pin_project;
1414    use std::{
1415        io::{self, IoSlice},
1416        pin::Pin,
1417        task::{Context, Poll},
1418    };
1419    use tokio::io::{AsyncRead, AsyncWrite};
1420    use tokio::net::TcpStream;
1421    use tokio_rustls::client::TlsStream;
1422
1423    pin_project! {
1424        pub(super) struct RustlsTlsConn<T> {
1425            #[pin] pub(super) inner: TokioIo<TlsStream<T>>,
1426        }
1427    }
1428
1429    impl Connection for RustlsTlsConn<TokioIo<TokioIo<TcpStream>>> {
1430        fn connected(&self) -> Connected {
1431            if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1432                self.inner
1433                    .inner()
1434                    .get_ref()
1435                    .0
1436                    .inner()
1437                    .connected()
1438                    .negotiated_h2()
1439            } else {
1440                self.inner.inner().get_ref().0.inner().connected()
1441            }
1442        }
1443    }
1444    impl Connection for RustlsTlsConn<TokioIo<MaybeHttpsStream<TokioIo<TcpStream>>>> {
1445        fn connected(&self) -> Connected {
1446            if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1447                self.inner
1448                    .inner()
1449                    .get_ref()
1450                    .0
1451                    .inner()
1452                    .connected()
1453                    .negotiated_h2()
1454            } else {
1455                self.inner.inner().get_ref().0.inner().connected()
1456            }
1457        }
1458    }
1459
1460    #[cfg(unix)]
1461    impl Connection for RustlsTlsConn<TokioIo<TokioIo<tokio::net::UnixStream>>> {
1462        fn connected(&self) -> Connected {
1463            if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1464                self.inner
1465                    .inner()
1466                    .get_ref()
1467                    .0
1468                    .inner()
1469                    .connected()
1470                    .negotiated_h2()
1471            } else {
1472                self.inner.inner().get_ref().0.inner().connected()
1473            }
1474        }
1475    }
1476
1477    #[cfg(unix)]
1478    impl Connection for RustlsTlsConn<TokioIo<MaybeHttpsStream<TokioIo<tokio::net::UnixStream>>>> {
1479        fn connected(&self) -> Connected {
1480            if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1481                self.inner
1482                    .inner()
1483                    .get_ref()
1484                    .0
1485                    .inner()
1486                    .connected()
1487                    .negotiated_h2()
1488            } else {
1489                self.inner.inner().get_ref().0.inner().connected()
1490            }
1491        }
1492    }
1493
1494    impl<T: AsyncRead + AsyncWrite + Unpin> Read for RustlsTlsConn<T> {
1495        fn poll_read(
1496            self: Pin<&mut Self>,
1497            cx: &mut Context,
1498            buf: ReadBufCursor<'_>,
1499        ) -> Poll<tokio::io::Result<()>> {
1500            let this = self.project();
1501            Read::poll_read(this.inner, cx, buf)
1502        }
1503    }
1504
1505    impl<T: AsyncRead + AsyncWrite + Unpin> Write for RustlsTlsConn<T> {
1506        fn poll_write(
1507            self: Pin<&mut Self>,
1508            cx: &mut Context,
1509            buf: &[u8],
1510        ) -> Poll<Result<usize, tokio::io::Error>> {
1511            let this = self.project();
1512            Write::poll_write(this.inner, cx, buf)
1513        }
1514
1515        fn poll_write_vectored(
1516            self: Pin<&mut Self>,
1517            cx: &mut Context<'_>,
1518            bufs: &[IoSlice<'_>],
1519        ) -> Poll<Result<usize, io::Error>> {
1520            let this = self.project();
1521            Write::poll_write_vectored(this.inner, cx, bufs)
1522        }
1523
1524        fn is_write_vectored(&self) -> bool {
1525            self.inner.is_write_vectored()
1526        }
1527
1528        fn poll_flush(
1529            self: Pin<&mut Self>,
1530            cx: &mut Context,
1531        ) -> Poll<Result<(), tokio::io::Error>> {
1532            let this = self.project();
1533            Write::poll_flush(this.inner, cx)
1534        }
1535
1536        fn poll_shutdown(
1537            self: Pin<&mut Self>,
1538            cx: &mut Context,
1539        ) -> Poll<Result<(), tokio::io::Error>> {
1540            let this = self.project();
1541            Write::poll_shutdown(this.inner, cx)
1542        }
1543    }
1544    impl<T> TlsInfoFactory for RustlsTlsConn<T>
1545    where
1546        TokioIo<TlsStream<T>>: TlsInfoFactory,
1547    {
1548        fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1549            self.inner.tls_info()
1550        }
1551    }
1552}
1553
1554#[cfg(feature = "socks")]
1555mod socks {
1556    use tower_service::Service;
1557
1558    use http::uri::Scheme;
1559    use http::Uri;
1560    use hyper_util::client::legacy::connect::proxy::{SocksV4, SocksV5};
1561    use tokio::net::TcpStream;
1562
1563    use super::BoxError;
1564    use crate::proxy::Intercepted;
1565
1566    pub(super) enum DnsResolve {
1567        Local,
1568        Proxy,
1569    }
1570
1571    #[derive(Debug)]
1572    pub(super) enum SocksProxyError {
1573        SocksNoHostInUrl,
1574        SocksLocalResolve(BoxError),
1575        SocksConnect(BoxError),
1576    }
1577
1578    pub(super) async fn connect(
1579        proxy: Intercepted,
1580        dst: Uri,
1581        dns_mode: DnsResolve,
1582        resolver: &crate::dns::DynResolver,
1583        http_connector: &mut crate::connect::HttpConnector,
1584    ) -> Result<TcpStream, SocksProxyError> {
1585        let https = dst.scheme() == Some(&Scheme::HTTPS);
1586        let original_host = dst.host().ok_or(SocksProxyError::SocksNoHostInUrl)?;
1587        let mut host = original_host.to_owned();
1588        let port = match dst.port() {
1589            Some(p) => p.as_u16(),
1590            None if https => 443u16,
1591            _ => 80u16,
1592        };
1593
1594        if let DnsResolve::Local = dns_mode {
1595            let maybe_new_target = resolver
1596                .http_resolve(&dst)
1597                .await
1598                .map_err(SocksProxyError::SocksLocalResolve)?
1599                .next();
1600            if let Some(new_target) = maybe_new_target {
1601                log::trace!("socks local dns resolved {new_target:?}");
1602                // If the resolved IP is IPv6, wrap it in brackets for URI formatting
1603                let ip = new_target.ip();
1604                if ip.is_ipv6() {
1605                    host = format!("[{}]", ip);
1606                } else {
1607                    host = ip.to_string();
1608                }
1609            }
1610        }
1611
1612        let proxy_uri = proxy.uri().clone();
1613        // Build a Uri for the destination
1614        let dst_uri = format!(
1615            "{}://{}:{}",
1616            if https { "https" } else { "http" },
1617            host,
1618            port
1619        )
1620        .parse::<Uri>()
1621        .map_err(|e| SocksProxyError::SocksConnect(e.into()))?;
1622
1623        // TODO: can `Scheme::from_static()` be const fn, compare with a SOCKS5 constant?
1624        match proxy.uri().scheme_str() {
1625            Some("socks4") | Some("socks4a") => {
1626                let mut svc = SocksV4::new(proxy_uri, http_connector);
1627                let stream = Service::call(&mut svc, dst_uri)
1628                    .await
1629                    .map_err(|e| SocksProxyError::SocksConnect(e.into()))?;
1630                Ok(stream.into_inner())
1631            }
1632            Some("socks5") | Some("socks5h") => {
1633                let mut svc = if let Some((username, password)) = proxy.raw_auth() {
1634                    SocksV5::new(proxy_uri, http_connector)
1635                        .with_auth(username.to_string(), password.to_string())
1636                } else {
1637                    SocksV5::new(proxy_uri, http_connector)
1638                };
1639                let stream = Service::call(&mut svc, dst_uri)
1640                    .await
1641                    .map_err(|e| SocksProxyError::SocksConnect(e.into()))?;
1642                Ok(stream.into_inner())
1643            }
1644            _ => unreachable!(),
1645        }
1646    }
1647
1648    impl std::fmt::Display for SocksProxyError {
1649        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1650            match self {
1651                Self::SocksNoHostInUrl => f.write_str("socks proxy destination has no host"),
1652                Self::SocksLocalResolve(_) => f.write_str("error resolving for socks proxy"),
1653                Self::SocksConnect(_) => f.write_str("error connecting to socks proxy"),
1654            }
1655        }
1656    }
1657
1658    impl std::error::Error for SocksProxyError {
1659        fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
1660            match self {
1661                Self::SocksNoHostInUrl => None,
1662                Self::SocksLocalResolve(ref e) => Some(&**e),
1663                Self::SocksConnect(ref e) => Some(&**e),
1664            }
1665        }
1666    }
1667}
1668
1669mod verbose {
1670    use crate::util::Escape;
1671    use hyper::rt::{Read, ReadBufCursor, Write};
1672    use hyper_util::client::legacy::connect::{Connected, Connection};
1673    use std::cmp::min;
1674    use std::fmt;
1675    use std::io::{self, IoSlice};
1676    use std::pin::Pin;
1677    use std::task::{Context, Poll};
1678
1679    pub(super) const OFF: Wrapper = Wrapper(false);
1680
1681    #[derive(Clone, Copy)]
1682    pub(super) struct Wrapper(pub(super) bool);
1683
1684    impl Wrapper {
1685        pub(super) fn wrap<T: super::AsyncConnWithInfo>(&self, conn: T) -> super::BoxConn {
1686            if self.0 && log::log_enabled!(log::Level::Trace) {
1687                Box::new(Verbose {
1688                    // truncate is fine
1689                    id: crate::util::fast_random() as u32,
1690                    inner: conn,
1691                })
1692            } else {
1693                Box::new(conn)
1694            }
1695        }
1696    }
1697
1698    struct Verbose<T> {
1699        id: u32,
1700        inner: T,
1701    }
1702
1703    impl<T: Connection + Read + Write + Unpin> Connection for Verbose<T> {
1704        fn connected(&self) -> Connected {
1705            self.inner.connected()
1706        }
1707    }
1708
1709    impl<T: Read + Write + Unpin> Read for Verbose<T> {
1710        fn poll_read(
1711            mut self: Pin<&mut Self>,
1712            cx: &mut Context,
1713            mut buf: ReadBufCursor<'_>,
1714        ) -> Poll<std::io::Result<()>> {
1715            // TODO: This _does_ forget the `init` len, so it could result in
1716            // re-initializing twice. Needs upstream support, perhaps.
1717            // SAFETY: Passing to a ReadBuf will never de-initialize any bytes.
1718            let mut vbuf = hyper::rt::ReadBuf::uninit(unsafe { buf.as_mut() });
1719            match Pin::new(&mut self.inner).poll_read(cx, vbuf.unfilled()) {
1720                Poll::Ready(Ok(())) => {
1721                    log::trace!("{:08x} read: {:?}", self.id, Escape::new(vbuf.filled()));
1722                    let len = vbuf.filled().len();
1723                    // SAFETY: The two cursors were for the same buffer. What was
1724                    // filled in one is safe in the other.
1725                    unsafe {
1726                        buf.advance(len);
1727                    }
1728                    Poll::Ready(Ok(()))
1729                }
1730                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
1731                Poll::Pending => Poll::Pending,
1732            }
1733        }
1734    }
1735
1736    impl<T: Read + Write + Unpin> Write for Verbose<T> {
1737        fn poll_write(
1738            mut self: Pin<&mut Self>,
1739            cx: &mut Context,
1740            buf: &[u8],
1741        ) -> Poll<Result<usize, std::io::Error>> {
1742            match Pin::new(&mut self.inner).poll_write(cx, buf) {
1743                Poll::Ready(Ok(n)) => {
1744                    log::trace!("{:08x} write: {:?}", self.id, Escape::new(&buf[..n]));
1745                    Poll::Ready(Ok(n))
1746                }
1747                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
1748                Poll::Pending => Poll::Pending,
1749            }
1750        }
1751
1752        fn poll_write_vectored(
1753            mut self: Pin<&mut Self>,
1754            cx: &mut Context<'_>,
1755            bufs: &[IoSlice<'_>],
1756        ) -> Poll<Result<usize, io::Error>> {
1757            match Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) {
1758                Poll::Ready(Ok(nwritten)) => {
1759                    log::trace!(
1760                        "{:08x} write (vectored): {:?}",
1761                        self.id,
1762                        Vectored { bufs, nwritten }
1763                    );
1764                    Poll::Ready(Ok(nwritten))
1765                }
1766                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
1767                Poll::Pending => Poll::Pending,
1768            }
1769        }
1770
1771        fn is_write_vectored(&self) -> bool {
1772            self.inner.is_write_vectored()
1773        }
1774
1775        fn poll_flush(
1776            mut self: Pin<&mut Self>,
1777            cx: &mut Context,
1778        ) -> Poll<Result<(), std::io::Error>> {
1779            Pin::new(&mut self.inner).poll_flush(cx)
1780        }
1781
1782        fn poll_shutdown(
1783            mut self: Pin<&mut Self>,
1784            cx: &mut Context,
1785        ) -> Poll<Result<(), std::io::Error>> {
1786            Pin::new(&mut self.inner).poll_shutdown(cx)
1787        }
1788    }
1789
1790    #[cfg(feature = "__tls")]
1791    impl<T: super::TlsInfoFactory> super::TlsInfoFactory for Verbose<T> {
1792        fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1793            self.inner.tls_info()
1794        }
1795    }
1796
1797    struct Vectored<'a, 'b> {
1798        bufs: &'a [IoSlice<'b>],
1799        nwritten: usize,
1800    }
1801
1802    impl fmt::Debug for Vectored<'_, '_> {
1803        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1804            let mut left = self.nwritten;
1805            for buf in self.bufs.iter() {
1806                if left == 0 {
1807                    break;
1808                }
1809                let n = min(left, buf.len());
1810                Escape::new(&buf[..n]).fmt(f)?;
1811                left -= n;
1812            }
1813            Ok(())
1814        }
1815    }
1816}