oci_client/
client.rs

1//! OCI distribution client for fetching oci images from an OCI compliant remote store
2use std::collections::{BTreeMap, HashMap};
3use std::convert::TryFrom;
4use std::hash::Hash;
5use std::sync::Arc;
6use std::time::Duration;
7
8use futures_util::future;
9use futures_util::stream::{self, BoxStream, StreamExt, TryStreamExt};
10use http::header::RANGE;
11use http::{HeaderValue, StatusCode};
12use http_auth::{parser::ChallengeParser, ChallengeRef};
13use olpc_cjson::CanonicalFormatter;
14use reqwest::header::HeaderMap;
15use reqwest::{NoProxy, Proxy, RequestBuilder, Response, Url};
16use serde::Deserialize;
17use serde::Serialize;
18use tokio::io::{AsyncWrite, AsyncWriteExt};
19use tokio::sync::RwLock;
20use tracing::{debug, trace, warn};
21
22pub use crate::blob::*;
23use crate::config::ConfigFile;
24use crate::digest::{digest_header_value, validate_digest, Digest, Digester};
25use crate::errors::*;
26use crate::manifest::{
27    ImageIndexEntry, OciDescriptor, OciImageIndex, OciImageManifest, OciManifest, Versioned,
28    IMAGE_CONFIG_MEDIA_TYPE, IMAGE_LAYER_GZIP_MEDIA_TYPE, IMAGE_LAYER_MEDIA_TYPE,
29    IMAGE_MANIFEST_LIST_MEDIA_TYPE, IMAGE_MANIFEST_MEDIA_TYPE, OCI_IMAGE_INDEX_MEDIA_TYPE,
30    OCI_IMAGE_MEDIA_TYPE,
31};
32use crate::secrets::RegistryAuth;
33use crate::secrets::*;
34use crate::sha256_digest;
35use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
36use crate::Reference;
37
38const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[
39    IMAGE_MANIFEST_MEDIA_TYPE,
40    IMAGE_MANIFEST_LIST_MEDIA_TYPE,
41    OCI_IMAGE_MEDIA_TYPE,
42    OCI_IMAGE_INDEX_MEDIA_TYPE,
43];
44
45const PUSH_CHUNK_MAX_SIZE: usize = 4096 * 1024;
46
47/// Default value for `ClientConfig::max_concurrent_upload`
48pub const DEFAULT_MAX_CONCURRENT_UPLOAD: usize = 16;
49
50/// Default value for `ClientConfig::max_concurrent_download`
51pub const DEFAULT_MAX_CONCURRENT_DOWNLOAD: usize = 16;
52
53/// Default value for `ClientConfig:default_token_expiration_secs`
54pub const DEFAULT_TOKEN_EXPIRATION_SECS: usize = 60;
55
56static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
57
58/// The data for an image or module.
59#[derive(Clone)]
60pub struct ImageData {
61    /// The layers of the image or module.
62    pub layers: Vec<ImageLayer>,
63    /// The digest of the image or module.
64    pub digest: Option<String>,
65    /// The Configuration object of the image or module.
66    pub config: Config,
67    /// The manifest of the image or module.
68    pub manifest: Option<OciImageManifest>,
69}
70
71/// The data returned by an OCI registry after a successful push
72/// operation is completed
73pub struct PushResponse {
74    /// Pullable url for the config
75    pub config_url: String,
76    /// Pullable url for the manifest
77    pub manifest_url: String,
78}
79
80/// The data returned by a successful tags/list Request
81#[derive(Deserialize, Debug)]
82pub struct TagResponse {
83    /// Repository Name
84    pub name: String,
85    /// List of existing Tags
86    pub tags: Vec<String>,
87}
88
89/// Layer descriptor required to pull a layer
90pub struct LayerDescriptor<'a> {
91    /// The digest of the layer
92    pub digest: &'a str,
93    /// Optional list of additional URIs to pull the layer from
94    pub urls: &'a Option<Vec<String>>,
95}
96
97/// A trait for converting any type into a [`LayerDescriptor`]
98pub trait AsLayerDescriptor {
99    /// Convert the type to a LayerDescriptor reference
100    fn as_layer_descriptor(&self) -> LayerDescriptor<'_>;
101}
102
103impl<T: AsLayerDescriptor> AsLayerDescriptor for &T {
104    fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
105        (*self).as_layer_descriptor()
106    }
107}
108
109impl AsLayerDescriptor for &str {
110    fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
111        LayerDescriptor {
112            digest: self,
113            urls: &None,
114        }
115    }
116}
117
118impl AsLayerDescriptor for &OciDescriptor {
119    fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
120        LayerDescriptor {
121            digest: &self.digest,
122            urls: &self.urls,
123        }
124    }
125}
126
127impl AsLayerDescriptor for &LayerDescriptor<'_> {
128    fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
129        LayerDescriptor {
130            digest: self.digest,
131            urls: self.urls,
132        }
133    }
134}
135
136/// The data and media type for an image layer
137#[derive(Clone, Debug, Eq, Hash, PartialEq)]
138pub struct ImageLayer {
139    /// The data of this layer
140    pub data: Vec<u8>,
141    /// The media type of this layer
142    pub media_type: String,
143    /// This OPTIONAL property contains arbitrary metadata for this descriptor.
144    /// This OPTIONAL property MUST use the [annotation rules](https://github.com/opencontainers/image-spec/blob/main/annotations.md#rules)
145    pub annotations: Option<BTreeMap<String, String>>,
146}
147
148impl ImageLayer {
149    /// Constructs a new ImageLayer struct with provided data and media type
150    pub fn new(
151        data: Vec<u8>,
152        media_type: String,
153        annotations: Option<BTreeMap<String, String>>,
154    ) -> Self {
155        ImageLayer {
156            data,
157            media_type,
158            annotations,
159        }
160    }
161
162    /// Constructs a new ImageLayer struct with provided data and
163    /// media type application/vnd.oci.image.layer.v1.tar
164    pub fn oci_v1(data: Vec<u8>, annotations: Option<BTreeMap<String, String>>) -> Self {
165        Self::new(data, IMAGE_LAYER_MEDIA_TYPE.to_string(), annotations)
166    }
167    /// Constructs a new ImageLayer struct with provided data and
168    /// media type application/vnd.oci.image.layer.v1.tar+gzip
169    pub fn oci_v1_gzip(data: Vec<u8>, annotations: Option<BTreeMap<String, String>>) -> Self {
170        Self::new(data, IMAGE_LAYER_GZIP_MEDIA_TYPE.to_string(), annotations)
171    }
172
173    /// Helper function to compute the sha256 digest of an image layer
174    pub fn sha256_digest(&self) -> String {
175        sha256_digest(&self.data)
176    }
177}
178
179/// The data and media type for a configuration object
180#[derive(Clone)]
181pub struct Config {
182    /// The data of this config object
183    pub data: Vec<u8>,
184    /// The media type of this object
185    pub media_type: String,
186    /// This OPTIONAL property contains arbitrary metadata for this descriptor.
187    /// This OPTIONAL property MUST use the [annotation rules](https://github.com/opencontainers/image-spec/blob/main/annotations.md#rules)
188    pub annotations: Option<BTreeMap<String, String>>,
189}
190
191impl Config {
192    /// Constructs a new Config struct with provided data and media type
193    pub fn new(
194        data: Vec<u8>,
195        media_type: String,
196        annotations: Option<BTreeMap<String, String>>,
197    ) -> Self {
198        Config {
199            data,
200            media_type,
201            annotations,
202        }
203    }
204
205    /// Constructs a new Config struct with provided data and
206    /// media type application/vnd.oci.image.config.v1+json
207    pub fn oci_v1(data: Vec<u8>, annotations: Option<BTreeMap<String, String>>) -> Self {
208        Self::new(data, IMAGE_CONFIG_MEDIA_TYPE.to_string(), annotations)
209    }
210
211    /// Construct a new Config struct with provided [`ConfigFile`] and
212    /// media type `application/vnd.oci.image.config.v1+json`
213    pub fn oci_v1_from_config_file(
214        config_file: ConfigFile,
215        annotations: Option<BTreeMap<String, String>>,
216    ) -> Result<Self> {
217        let data = serde_json::to_vec(&config_file)?;
218        Ok(Self::new(
219            data,
220            IMAGE_CONFIG_MEDIA_TYPE.to_string(),
221            annotations,
222        ))
223    }
224
225    /// Helper function to compute the sha256 digest of this config object
226    pub fn sha256_digest(&self) -> String {
227        sha256_digest(&self.data)
228    }
229}
230
231impl TryFrom<Config> for ConfigFile {
232    type Error = crate::errors::OciDistributionError;
233
234    fn try_from(config: Config) -> Result<Self> {
235        let config = String::from_utf8(config.data)
236            .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
237        let config_file: ConfigFile = serde_json::from_str(&config)
238            .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
239        Ok(config_file)
240    }
241}
242
243/// The OCI client connects to an OCI registry and fetches OCI images.
244///
245/// An OCI registry is a container registry that adheres to the OCI Distribution
246/// specification. DockerHub is one example, as are ACR and GCR. This client
247/// provides a native Rust implementation for pulling OCI images.
248///
249/// Some OCI registries support completely anonymous access. But most require
250/// at least an Oauth2 handshake. Typically, you will want to create a new
251/// client, and then run the `auth()` method, which will attempt to get
252/// a read-only bearer token. From there, pulling images can be done with
253/// the `pull_*` functions.
254///
255/// For true anonymous access, you can skip `auth()`. This is not recommended
256/// unless you are sure that the remote registry does not require Oauth2.
257#[derive(Clone)]
258pub struct Client {
259    config: Arc<ClientConfig>,
260    // Registry -> RegistryAuth
261    auth_store: Arc<RwLock<HashMap<String, RegistryAuth>>>,
262    tokens: TokenCache,
263    client: reqwest::Client,
264    push_chunk_size: usize,
265}
266
267impl Default for Client {
268    fn default() -> Self {
269        Self {
270            config: Arc::default(),
271            auth_store: Arc::default(),
272            tokens: TokenCache::new(DEFAULT_TOKEN_EXPIRATION_SECS),
273            client: reqwest::Client::default(),
274            push_chunk_size: PUSH_CHUNK_MAX_SIZE,
275        }
276    }
277}
278
279/// A source that can provide a `ClientConfig`.
280/// If you are using this crate in your own application, you can implement this
281/// trait on your configuration type so that it can be passed to `Client::from_source`.
282pub trait ClientConfigSource {
283    /// Provides a `ClientConfig`.
284    fn client_config(&self) -> ClientConfig;
285}
286
287impl TryFrom<ClientConfig> for Client {
288    type Error = OciDistributionError;
289
290    fn try_from(config: ClientConfig) -> std::result::Result<Self, Self::Error> {
291        #[allow(unused_mut)]
292        let mut client_builder = reqwest::Client::builder();
293        #[cfg(not(target_arch = "wasm32"))]
294        let mut client_builder =
295            client_builder.danger_accept_invalid_certs(config.accept_invalid_certificates);
296
297        client_builder = match () {
298            #[cfg(all(feature = "native-tls", not(target_arch = "wasm32")))]
299            () => client_builder.danger_accept_invalid_hostnames(config.accept_invalid_hostnames),
300            #[cfg(any(not(feature = "native-tls"), target_arch = "wasm32"))]
301            () => client_builder,
302        };
303
304        #[cfg(not(target_arch = "wasm32"))]
305        for c in &config.extra_root_certificates {
306            let cert = match c.encoding {
307                CertificateEncoding::Der => reqwest::Certificate::from_der(c.data.as_slice())?,
308                CertificateEncoding::Pem => reqwest::Certificate::from_pem(c.data.as_slice())?,
309            };
310            client_builder = client_builder.add_root_certificate(cert);
311        }
312
313        if let Some(timeout) = config.read_timeout {
314            client_builder = client_builder.read_timeout(timeout);
315        }
316        if let Some(timeout) = config.connect_timeout {
317            client_builder = client_builder.connect_timeout(timeout);
318        }
319
320        client_builder = client_builder.user_agent(config.user_agent);
321
322        if let Some(proxy_addr) = &config.https_proxy {
323            let no_proxy = config
324                .no_proxy
325                .as_ref()
326                .and_then(|no_proxy| NoProxy::from_string(no_proxy));
327            let proxy = Proxy::https(proxy_addr)?.no_proxy(no_proxy);
328            client_builder = client_builder.proxy(proxy);
329        }
330
331        if let Some(proxy_addr) = &config.http_proxy {
332            let no_proxy = config
333                .no_proxy
334                .as_ref()
335                .and_then(|no_proxy| NoProxy::from_string(no_proxy));
336            let proxy = Proxy::http(proxy_addr)?.no_proxy(no_proxy);
337            client_builder = client_builder.proxy(proxy);
338        }
339
340        let default_token_expiration_secs = config.default_token_expiration_secs;
341        Ok(Self {
342            config: Arc::new(config),
343            tokens: TokenCache::new(default_token_expiration_secs),
344            client: client_builder.build()?,
345            push_chunk_size: PUSH_CHUNK_MAX_SIZE,
346            ..Default::default()
347        })
348    }
349}
350
351impl Client {
352    /// Create a new client with the supplied config
353    pub fn new(config: ClientConfig) -> Self {
354        let default_token_expiration_secs = config.default_token_expiration_secs;
355        Client::try_from(config).unwrap_or_else(|err| {
356            warn!("Cannot create OCI client from config: {:?}", err);
357            warn!("Creating client with default configuration");
358            Self {
359                tokens: TokenCache::new(default_token_expiration_secs),
360                push_chunk_size: PUSH_CHUNK_MAX_SIZE,
361                ..Default::default()
362            }
363        })
364    }
365
366    /// Create a new client with the supplied config
367    pub fn from_source(config_source: &impl ClientConfigSource) -> Self {
368        Self::new(config_source.client_config())
369    }
370
371    async fn store_auth(&self, registry: &str, auth: RegistryAuth) {
372        self.auth_store
373            .write()
374            .await
375            .insert(registry.to_string(), auth);
376    }
377
378    async fn is_stored_auth(&self, registry: &str) -> bool {
379        self.auth_store.read().await.contains_key(registry)
380    }
381
382    /// Store the authentication information for this registry if it's not already stored in the client.
383    ///
384    /// Most of the time, you don't need to call this method directly. It's called by other
385    /// methods (where you have to provide the authentication information as parameter).
386    ///
387    /// But if you want to pull/push a blob without calling any of the other methods first, which would
388    /// store the authentication information, you can call this method to store the authentication
389    /// information manually.
390    pub async fn store_auth_if_needed(&self, registry: &str, auth: &RegistryAuth) {
391        if !self.is_stored_auth(registry).await {
392            self.store_auth(registry, auth.clone()).await;
393        }
394    }
395
396    /// Checks if we got a token, if we don't - create it and store it in cache.
397    async fn get_auth_token(
398        &self,
399        reference: &Reference,
400        op: RegistryOperation,
401    ) -> Option<RegistryTokenType> {
402        let registry = reference.resolve_registry();
403        let auth = self.auth_store.read().await.get(registry)?.clone();
404        match self.tokens.get(reference, op).await {
405            Some(token) => Some(token),
406            None => {
407                let token = self._auth(reference, &auth, op).await.ok()??;
408                self.tokens.insert(reference, op, token.clone()).await;
409                Some(token)
410            }
411        }
412    }
413
414    /// Fetches the available Tags for the given Reference
415    ///
416    /// The client will check if it's already been authenticated and if
417    /// not will attempt to do.
418    pub async fn list_tags(
419        &self,
420        image: &Reference,
421        auth: &RegistryAuth,
422        n: Option<usize>,
423        last: Option<&str>,
424    ) -> Result<TagResponse> {
425        let op = RegistryOperation::Pull;
426        let url = self.to_list_tags_url(image);
427
428        self.store_auth_if_needed(image.resolve_registry(), auth)
429            .await;
430
431        let request = self.client.get(&url);
432        let request = if let Some(num) = n {
433            request.query(&[("n", num)])
434        } else {
435            request
436        };
437        let request = if let Some(l) = last {
438            request.query(&[("last", l)])
439        } else {
440            request
441        };
442        let request = RequestBuilderWrapper {
443            client: self,
444            request_builder: request,
445        };
446        let res = request
447            .apply_auth(image, op)
448            .await?
449            .into_request_builder()
450            .send()
451            .await?;
452        let status = res.status();
453        let body = res.bytes().await?;
454
455        validate_registry_response(status, &body, &url)?;
456
457        Ok(serde_json::from_str(std::str::from_utf8(&body)?)?)
458    }
459
460    /// Pull an image and return the bytes
461    ///
462    /// The client will check if it's already been authenticated and if
463    /// not will attempt to do.
464    pub async fn pull(
465        &self,
466        image: &Reference,
467        auth: &RegistryAuth,
468        accepted_media_types: Vec<&str>,
469    ) -> Result<ImageData> {
470        debug!("Pulling image: {:?}", image);
471        self.store_auth_if_needed(image.resolve_registry(), auth)
472            .await;
473
474        let (manifest, digest, config) = self._pull_manifest_and_config(image).await?;
475
476        self.validate_layers(&manifest, accepted_media_types)
477            .await?;
478
479        let layers = stream::iter(&manifest.layers)
480            .map(|layer| {
481                // This avoids moving `self` which is &Self
482                // into the async block. We only want to capture
483                // as &Self
484                let this = &self;
485                async move {
486                    let mut out: Vec<u8> = Vec::new();
487                    debug!("Pulling image layer");
488                    this.pull_blob(image, layer, &mut out).await?;
489                    Ok::<_, OciDistributionError>(ImageLayer::new(
490                        out,
491                        layer.media_type.clone(),
492                        layer.annotations.clone(),
493                    ))
494                }
495            })
496            .boxed() // Workaround to rustc issue https://github.com/rust-lang/rust/issues/104382
497            .buffer_unordered(self.config.max_concurrent_download)
498            .try_collect()
499            .await?;
500
501        Ok(ImageData {
502            layers,
503            manifest: Some(manifest),
504            config,
505            digest: Some(digest),
506        })
507    }
508
509    /// Push an image and return the uploaded URL of the image
510    ///
511    /// The client will check if it's already been authenticated and if
512    /// not will attempt to do.
513    ///
514    /// If a manifest is not provided, the client will attempt to generate
515    /// it from the provided image and config data.
516    ///
517    /// Returns pullable URL for the image
518    pub async fn push(
519        &self,
520        image_ref: &Reference,
521        layers: &[ImageLayer],
522        config: Config,
523        auth: &RegistryAuth,
524        manifest: Option<OciImageManifest>,
525    ) -> Result<PushResponse> {
526        debug!("Pushing image: {:?}", image_ref);
527        self.store_auth_if_needed(image_ref.resolve_registry(), auth)
528            .await;
529
530        let manifest: OciImageManifest = match manifest {
531            Some(m) => m,
532            None => OciImageManifest::build(layers, &config, None),
533        };
534
535        // Upload layers
536        stream::iter(layers)
537            .map(|layer| {
538                // This avoids moving `self` which is &Self
539                // into the async block. We only want to capture
540                // as &Self
541                let this = &self;
542                async move {
543                    let digest = layer.sha256_digest();
544                    this.push_blob(image_ref, &layer.data, &digest).await?;
545                    Result::Ok(())
546                }
547            })
548            .boxed() // Workaround to rustc issue https://github.com/rust-lang/rust/issues/104382
549            .buffer_unordered(self.config.max_concurrent_upload)
550            .try_for_each(future::ok)
551            .await?;
552
553        let config_url = self
554            .push_blob(image_ref, &config.data, &manifest.config.digest)
555            .await?;
556        let manifest_url = self.push_manifest(image_ref, &manifest.into()).await?;
557
558        Ok(PushResponse {
559            config_url,
560            manifest_url,
561        })
562    }
563
564    /// Pushes a blob to the registry
565    pub async fn push_blob(
566        &self,
567        image_ref: &Reference,
568        data: &[u8],
569        digest: &str,
570    ) -> Result<String> {
571        if self.config.use_monolithic_push {
572            return self.push_blob_monolithically(image_ref, data, digest).await;
573        }
574
575        match self.push_blob_chunked(image_ref, data, digest).await {
576            Ok(url) => Ok(url),
577            Err(OciDistributionError::SpecViolationError(violation)) => {
578                warn!(?violation, "Registry is not respecting the OCI Distribution Specification when doing chunked push operations");
579                warn!("Attempting monolithic push");
580                self.push_blob_monolithically(image_ref, data, digest).await
581            }
582            Err(e) => Err(e),
583        }
584    }
585
586    /// Pushes a blob to the registry as a monolith
587    ///
588    /// Returns the pullable location of the blob
589    async fn push_blob_monolithically(
590        &self,
591        image: &Reference,
592        blob_data: &[u8],
593        blob_digest: &str,
594    ) -> Result<String> {
595        let location = self.begin_push_monolithical_session(image).await?;
596        self.push_monolithically(&location, image, blob_data, blob_digest)
597            .await
598    }
599
600    /// Pushes a blob to the registry as a series of chunks
601    ///
602    /// Returns the pullable location of the blob
603    async fn push_blob_chunked(
604        &self,
605        image: &Reference,
606        blob_data: &[u8],
607        blob_digest: &str,
608    ) -> Result<String> {
609        let mut location = self.begin_push_chunked_session(image).await?;
610        let mut start: usize = 0;
611        loop {
612            (location, start) = self.push_chunk(&location, image, blob_data, start).await?;
613            if start >= blob_data.len() {
614                break;
615            }
616        }
617        self.end_push_chunked_session(&location, image, blob_digest)
618            .await
619    }
620
621    /// Perform an OAuth v2 auth request if necessary.
622    ///
623    /// This performs authorization and then stores the token internally to be used
624    /// on other requests.
625    pub async fn auth(
626        &self,
627        image: &Reference,
628        authentication: &RegistryAuth,
629        operation: RegistryOperation,
630    ) -> Result<Option<String>> {
631        self.store_auth_if_needed(image.resolve_registry(), authentication)
632            .await;
633        // preserve old caching behavior
634        match self._auth(image, authentication, operation).await {
635            Ok(Some(RegistryTokenType::Bearer(token))) => {
636                self.tokens
637                    .insert(image, operation, RegistryTokenType::Bearer(token.clone()))
638                    .await;
639                Ok(Some(token.token().to_string()))
640            }
641            Ok(Some(RegistryTokenType::Basic(username, password))) => {
642                self.tokens
643                    .insert(
644                        image,
645                        operation,
646                        RegistryTokenType::Basic(username, password),
647                    )
648                    .await;
649                Ok(None)
650            }
651            Ok(None) => Ok(None),
652            Err(e) => Err(e),
653        }
654    }
655
656    /// Internal auth that retrieves token.
657    async fn _auth(
658        &self,
659        image: &Reference,
660        authentication: &RegistryAuth,
661        operation: RegistryOperation,
662    ) -> Result<Option<RegistryTokenType>> {
663        debug!("Authorizing for image: {:?}", image);
664        // The version request will tell us where to go.
665        let url = format!(
666            "{}://{}/v2/",
667            self.config.protocol.scheme_for(image.resolve_registry()),
668            image.resolve_registry()
669        );
670        debug!(?url);
671
672        if let RegistryAuth::Bearer(token) = authentication {
673            return Ok(Some(RegistryTokenType::Bearer(RegistryToken::Token {
674                token: token.clone(),
675            })));
676        }
677
678        let res = self.client.get(&url).send().await?;
679        let dist_hdr = match res.headers().get(reqwest::header::WWW_AUTHENTICATE) {
680            Some(h) => h,
681            None => return Ok(None),
682        };
683
684        let challenge = match BearerChallenge::try_from(dist_hdr) {
685            Ok(c) => c,
686            Err(e) => {
687                debug!(error = ?e, "Falling back to HTTP Basic Auth");
688                if let RegistryAuth::Basic(username, password) = authentication {
689                    return Ok(Some(RegistryTokenType::Basic(
690                        username.to_string(),
691                        password.to_string(),
692                    )));
693                }
694                return Ok(None);
695            }
696        };
697
698        // Allow for either push or pull authentication
699        let scope = match operation {
700            RegistryOperation::Pull => format!("repository:{}:pull", image.repository()),
701            RegistryOperation::Push => format!("repository:{}:pull,push", image.repository()),
702        };
703
704        let realm = challenge.realm.as_ref();
705        let service = challenge.service.as_ref();
706        let mut query = vec![("scope", &scope)];
707
708        if let Some(s) = service {
709            query.push(("service", s))
710        }
711
712        // TODO: At some point in the future, we should support sending a secret to the
713        // server for auth. This particular workflow is for read-only public auth.
714        debug!(?realm, ?service, ?scope, "Making authentication call");
715
716        let auth_res = self
717            .client
718            .get(realm)
719            .query(&query)
720            .apply_authentication(authentication)
721            .send()
722            .await?;
723
724        match auth_res.status() {
725            reqwest::StatusCode::OK => {
726                let text = auth_res.text().await?;
727                debug!("Received response from auth request: {}", text);
728                let token: RegistryToken = serde_json::from_str(&text)
729                    .map_err(|e| OciDistributionError::RegistryTokenDecodeError(e.to_string()))?;
730                debug!("Successfully authorized for image '{:?}'", image);
731                Ok(Some(RegistryTokenType::Bearer(token)))
732            }
733            _ => {
734                let reason = auth_res.text().await?;
735                debug!("Failed to authenticate for image '{:?}': {}", image, reason);
736                Err(OciDistributionError::AuthenticationFailure(reason))
737            }
738        }
739    }
740
741    /// Fetch a manifest's digest from the remote OCI Distribution service.
742    ///
743    /// If the connection has already gone through authentication, this will
744    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
745    ///
746    /// Will first attempt to read the `Docker-Content-Digest` header using a
747    /// HEAD request. If this header is not present, will make a second GET
748    /// request and return the SHA256 of the response body.
749    pub async fn fetch_manifest_digest(
750        &self,
751        image: &Reference,
752        auth: &RegistryAuth,
753    ) -> Result<String> {
754        self.store_auth_if_needed(image.resolve_registry(), auth)
755            .await;
756
757        let url = self.to_v2_manifest_url(image);
758        debug!("HEAD image manifest from {}", url);
759        let res = RequestBuilderWrapper::from_client(self, |client| client.head(&url))
760            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
761            .apply_auth(image, RegistryOperation::Pull)
762            .await?
763            .into_request_builder()
764            .send()
765            .await?;
766
767        if let Some(digest) = digest_header_value(res.headers().clone())? {
768            let status = res.status();
769            let body = res.bytes().await?;
770            validate_registry_response(status, &body, &url)?;
771
772            // If the reference has a digest and the digest header has a matching algorithm, compare
773            // them and return an error if they don't match.
774            if let Some(img_digest) = image.digest() {
775                let header_digest = Digest::new(&digest)?;
776                let image_digest = Digest::new(img_digest)?;
777                if header_digest.algorithm == image_digest.algorithm
778                    && header_digest != image_digest
779                {
780                    return Err(DigestError::VerificationError {
781                        expected: img_digest.to_string(),
782                        actual: digest,
783                    }
784                    .into());
785                }
786            }
787
788            Ok(digest)
789        } else {
790            debug!("GET image manifest from {}", url);
791            let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
792                .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
793                .apply_auth(image, RegistryOperation::Pull)
794                .await?
795                .into_request_builder()
796                .send()
797                .await?;
798            let status = res.status();
799            trace!(headers = ?res.headers(), "Got Headers");
800            let headers = res.headers().clone();
801            let body = res.bytes().await?;
802            validate_registry_response(status, &body, &url)?;
803
804            validate_digest(&body, digest_header_value(headers)?, image.digest())
805                .map_err(OciDistributionError::from)
806        }
807    }
808
809    async fn validate_layers(
810        &self,
811        manifest: &OciImageManifest,
812        accepted_media_types: Vec<&str>,
813    ) -> Result<()> {
814        if manifest.layers.is_empty() {
815            return Err(OciDistributionError::PullNoLayersError);
816        }
817
818        for layer in &manifest.layers {
819            if !accepted_media_types.iter().any(|i| i.eq(&layer.media_type)) {
820                return Err(OciDistributionError::IncompatibleLayerMediaTypeError(
821                    layer.media_type.clone(),
822                ));
823            }
824        }
825
826        Ok(())
827    }
828
829    /// Pull a manifest from the remote OCI Distribution service.
830    ///
831    /// The client will check if it's already been authenticated and if
832    /// not will attempt to do.
833    ///
834    /// A Tuple is returned containing the [OciImageManifest](crate::manifest::OciImageManifest)
835    /// and the manifest content digest hash.
836    ///
837    /// If a multi-platform Image Index manifest is encountered, a platform-specific
838    /// Image manifest will be selected using the client's default platform resolution.
839    pub async fn pull_image_manifest(
840        &self,
841        image: &Reference,
842        auth: &RegistryAuth,
843    ) -> Result<(OciImageManifest, String)> {
844        self.store_auth_if_needed(image.resolve_registry(), auth)
845            .await;
846
847        self._pull_image_manifest(image).await
848    }
849
850    /// Pull a manifest from the remote OCI Distribution service without parsing it.
851    ///
852    /// The client will check if it's already been authenticated and if
853    /// not will attempt to do.
854    ///
855    /// A Tuple is returned containing raw byte representation of the manifest
856    /// and the manifest content digest.
857    pub async fn pull_manifest_raw(
858        &self,
859        image: &Reference,
860        auth: &RegistryAuth,
861        accepted_media_types: &[&str],
862    ) -> Result<(Vec<u8>, String)> {
863        self.store_auth_if_needed(image.resolve_registry(), auth)
864            .await;
865
866        self._pull_manifest_raw(image, accepted_media_types).await
867    }
868
869    /// Pull a manifest from the remote OCI Distribution service.
870    ///
871    /// The client will check if it's already been authenticated and if
872    /// not will attempt to do.
873    ///
874    /// A Tuple is returned containing the [Manifest](crate::manifest::OciImageManifest)
875    /// and the manifest content digest hash.
876    pub async fn pull_manifest(
877        &self,
878        image: &Reference,
879        auth: &RegistryAuth,
880    ) -> Result<(OciManifest, String)> {
881        self.store_auth_if_needed(image.resolve_registry(), auth)
882            .await;
883
884        self._pull_manifest(image).await
885    }
886
887    /// Pull an image manifest from the remote OCI Distribution service.
888    ///
889    /// If the connection has already gone through authentication, this will
890    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
891    ///
892    /// If a multi-platform Image Index manifest is encountered, a platform-specific
893    /// Image manifest will be selected using the client's default platform resolution.
894    async fn _pull_image_manifest(&self, image: &Reference) -> Result<(OciImageManifest, String)> {
895        let (manifest, digest) = self._pull_manifest(image).await?;
896        match manifest {
897            OciManifest::Image(image_manifest) => Ok((image_manifest, digest)),
898            OciManifest::ImageIndex(image_index_manifest) => {
899                debug!("Inspecting Image Index Manifest");
900                let digest = if let Some(resolver) = &self.config.platform_resolver {
901                    resolver(&image_index_manifest.manifests)
902                } else {
903                    return Err(OciDistributionError::ImageIndexParsingNoPlatformResolverError);
904                };
905
906                match digest {
907                    Some(digest) => {
908                        debug!("Selected manifest entry with digest: {}", digest);
909                        let manifest_entry_reference = image.clone_with_digest(digest.clone());
910                        self._pull_manifest(&manifest_entry_reference)
911                            .await
912                            .and_then(|(manifest, _digest)| match manifest {
913                                OciManifest::Image(manifest) => Ok((manifest, digest)),
914                                OciManifest::ImageIndex(_) => {
915                                    Err(OciDistributionError::ImageManifestNotFoundError(
916                                        "received Image Index manifest instead".to_string(),
917                                    ))
918                                }
919                            })
920                    }
921                    None => Err(OciDistributionError::ImageManifestNotFoundError(
922                        "no entry found in image index manifest matching client's default platform"
923                            .to_string(),
924                    )),
925                }
926            }
927        }
928    }
929
930    /// Pull a manifest from the remote OCI Distribution service without parsing it.
931    ///
932    /// If the connection has already gone through authentication, this will
933    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
934    async fn _pull_manifest_raw(
935        &self,
936        image: &Reference,
937        accepted_media_types: &[&str],
938    ) -> Result<(Vec<u8>, String)> {
939        let url = self.to_v2_manifest_url(image);
940        debug!("Pulling image manifest from {}", url);
941
942        let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
943            .apply_accept(accepted_media_types)?
944            .apply_auth(image, RegistryOperation::Pull)
945            .await?
946            .into_request_builder()
947            .send()
948            .await?;
949        let status = res.status();
950        let headers = res.headers().clone();
951        let body = res.bytes().await?;
952
953        validate_registry_response(status, &body, &url)?;
954
955        let digest_header = digest_header_value(headers)?;
956        let digest = validate_digest(&body, digest_header, image.digest())?;
957
958        Ok((body.to_vec(), digest))
959    }
960
961    /// Pull a manifest from the remote OCI Distribution service.
962    ///
963    /// If the connection has already gone through authentication, this will
964    /// use the bearer token. Otherwise, this will attempt an anonymous pull.
965    async fn _pull_manifest(&self, image: &Reference) -> Result<(OciManifest, String)> {
966        let (body, digest) = self
967            ._pull_manifest_raw(image, MIME_TYPES_DISTRIBUTION_MANIFEST)
968            .await?;
969
970        self.validate_image_manifest(&body).await?;
971
972        debug!("Parsing response as Manifest");
973        let manifest = serde_json::from_slice(&body)
974            .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
975        Ok((manifest, digest))
976    }
977
978    async fn validate_image_manifest(&self, body: &[u8]) -> Result<()> {
979        let versioned: Versioned = serde_json::from_slice(body)
980            .map_err(|e| OciDistributionError::VersionedParsingError(e.to_string()))?;
981        debug!(?versioned, "validating manifest");
982        if versioned.schema_version != 2 {
983            return Err(OciDistributionError::UnsupportedSchemaVersionError(
984                versioned.schema_version,
985            ));
986        }
987        if let Some(media_type) = versioned.media_type {
988            if media_type != IMAGE_MANIFEST_MEDIA_TYPE
989                && media_type != OCI_IMAGE_MEDIA_TYPE
990                && media_type != IMAGE_MANIFEST_LIST_MEDIA_TYPE
991                && media_type != OCI_IMAGE_INDEX_MEDIA_TYPE
992            {
993                return Err(OciDistributionError::UnsupportedMediaTypeError(media_type));
994            }
995        }
996
997        Ok(())
998    }
999
1000    /// Pull a manifest and its config from the remote OCI Distribution service.
1001    ///
1002    /// The client will check if it's already been authenticated and if
1003    /// not will attempt to do.
1004    ///
1005    /// A Tuple is returned containing the [OciImageManifest](crate::manifest::OciImageManifest),
1006    /// the manifest content digest hash and the contents of the manifests config layer
1007    /// as a String.
1008    pub async fn pull_manifest_and_config(
1009        &self,
1010        image: &Reference,
1011        auth: &RegistryAuth,
1012    ) -> Result<(OciImageManifest, String, String)> {
1013        self.store_auth_if_needed(image.resolve_registry(), auth)
1014            .await;
1015
1016        self._pull_manifest_and_config(image)
1017            .await
1018            .and_then(|(manifest, digest, config)| {
1019                Ok((
1020                    manifest,
1021                    digest,
1022                    String::from_utf8(config.data).map_err(|e| {
1023                        OciDistributionError::GenericError(Some(format!(
1024                            "Cannot not UTF8 compliant: {}",
1025                            e
1026                        )))
1027                    })?,
1028                ))
1029            })
1030    }
1031
1032    async fn _pull_manifest_and_config(
1033        &self,
1034        image: &Reference,
1035    ) -> Result<(OciImageManifest, String, Config)> {
1036        let (manifest, digest) = self._pull_image_manifest(image).await?;
1037
1038        let mut out: Vec<u8> = Vec::new();
1039        debug!("Pulling config layer");
1040        self.pull_blob(image, &manifest.config, &mut out).await?;
1041        let media_type = manifest.config.media_type.clone();
1042        let annotations = manifest.annotations.clone();
1043        Ok((manifest, digest, Config::new(out, media_type, annotations)))
1044    }
1045
1046    /// Push a manifest list to an OCI registry.
1047    ///
1048    /// This pushes a manifest list to an OCI registry.
1049    pub async fn push_manifest_list(
1050        &self,
1051        reference: &Reference,
1052        auth: &RegistryAuth,
1053        manifest: OciImageIndex,
1054    ) -> Result<String> {
1055        self.store_auth_if_needed(reference.resolve_registry(), auth)
1056            .await;
1057        self.push_manifest(reference, &OciManifest::ImageIndex(manifest))
1058            .await
1059    }
1060
1061    /// Pull a single layer from an OCI registry.
1062    ///
1063    /// This pulls the layer for a particular image that is identified by the given layer
1064    /// descriptor. The layer descriptor can be anything that can be referenced as a layer
1065    /// descriptor. The image reference is used to find the repository and the registry, but it is
1066    /// not used to verify that the digest is a layer inside of the image. (The manifest is used for
1067    /// that.)
1068    pub async fn pull_blob<T: AsyncWrite + Unpin>(
1069        &self,
1070        image: &Reference,
1071        layer: impl AsLayerDescriptor,
1072        mut out: T,
1073    ) -> Result<()> {
1074        let response = self.pull_blob_response(image, &layer, None, None).await?;
1075
1076        let mut maybe_header_digester = digest_header_value(response.headers().clone())?
1077            .map(|digest| Digester::new(&digest).map(|d| (d, digest)))
1078            .transpose()?;
1079
1080        // With a blob pull, we need to use the digest from the layer and not the image
1081        let layer_digest = layer.as_layer_descriptor().digest.to_string();
1082        let mut layer_digester = Digester::new(&layer_digest)?;
1083
1084        let mut stream = response.error_for_status()?.bytes_stream();
1085
1086        while let Some(bytes) = stream.next().await {
1087            let bytes = bytes?;
1088            if let Some((ref mut digester, _)) = maybe_header_digester.as_mut() {
1089                digester.update(&bytes);
1090            }
1091            layer_digester.update(&bytes);
1092            out.write_all(&bytes).await?;
1093        }
1094
1095        if let Some((mut digester, expected)) = maybe_header_digester.take() {
1096            let digest = digester.finalize();
1097
1098            if digest != expected {
1099                return Err(DigestError::VerificationError {
1100                    expected,
1101                    actual: digest,
1102                }
1103                .into());
1104            }
1105        }
1106
1107        let digest = layer_digester.finalize();
1108        if digest != layer_digest {
1109            return Err(DigestError::VerificationError {
1110                expected: layer_digest,
1111                actual: digest,
1112            }
1113            .into());
1114        }
1115
1116        Ok(())
1117    }
1118
1119    /// Stream a single layer from an OCI registry.
1120    ///
1121    /// This is a streaming version of [`Client::pull_blob`]. Returns [`SizedStream`], which
1122    /// implements [`Stream`](futures_util::Stream) or can be used directly to get the content
1123    /// length of the response
1124    ///
1125    /// # Example
1126    /// ```rust
1127    /// use std::future::Future;
1128    /// use std::io::Error;
1129    ///
1130    /// use futures_util::TryStreamExt;
1131    /// use oci_client::{Client, Reference};
1132    /// use oci_client::client::ClientConfig;
1133    /// use oci_client::manifest::OciDescriptor;
1134    ///
1135    /// async {
1136    ///   let client = Client::new(Default::default());
1137    ///   let imgRef: Reference = "busybox:latest".parse().unwrap();
1138    ///   let desc = OciDescriptor { digest: "sha256:deadbeef".to_owned(), ..Default::default() };
1139    ///   let mut stream = client.pull_blob_stream(&imgRef, &desc).await.unwrap();
1140    ///   // Check the optional content length
1141    ///   let content_length = stream.content_length.unwrap_or_default();
1142    ///   // Use as a stream
1143    ///   stream.try_next().await.unwrap().unwrap();
1144    ///   // Use the underlying stream
1145    ///   let mut stream = stream.stream;
1146    /// };
1147    /// ```
1148    pub async fn pull_blob_stream(
1149        &self,
1150        image: &Reference,
1151        layer: impl AsLayerDescriptor,
1152    ) -> Result<SizedStream> {
1153        stream_from_response(
1154            self.pull_blob_response(image, &layer, None, None).await?,
1155            layer,
1156            true,
1157        )
1158    }
1159
1160    /// Stream a single layer from an OCI registry starting with a byte offset. This can be used to
1161    /// continue downloading a layer after a network error. Please note that when doing a partial
1162    /// download (meaning it returns the [`BlobResponse::Partial`] variant), the layer digest is not
1163    /// verified as all the bytes are not available. The returned blob response will contain the
1164    /// header from the request digest, if it was set, that can be used (in addition to the digest
1165    /// from the layer) to verify the blob once all the bytes have been downloaded. Failure to do
1166    /// this means your content will not be verified.
1167    ///
1168    /// Returns [`BlobResponse`] which indicates if the response was a full or partial response.
1169    pub async fn pull_blob_stream_partial(
1170        &self,
1171        image: &Reference,
1172        layer: impl AsLayerDescriptor,
1173        offset: u64,
1174        length: Option<u64>,
1175    ) -> Result<BlobResponse> {
1176        let response = self
1177            .pull_blob_response(image, &layer, Some(offset), length)
1178            .await?;
1179
1180        let status = response.status();
1181        match status {
1182            StatusCode::OK => Ok(BlobResponse::Full(stream_from_response(
1183                response, &layer, true,
1184            )?)),
1185            StatusCode::PARTIAL_CONTENT => Ok(BlobResponse::Partial(stream_from_response(
1186                response, &layer, false,
1187            )?)),
1188            _ => Err(OciDistributionError::ServerError {
1189                code: status.as_u16(),
1190                url: response.url().to_string(),
1191                message: response.text().await?,
1192            }),
1193        }
1194    }
1195
1196    /// Pull a single layer from an OCI registry.
1197    async fn pull_blob_response(
1198        &self,
1199        image: &Reference,
1200        layer: impl AsLayerDescriptor,
1201        offset: Option<u64>,
1202        length: Option<u64>,
1203    ) -> Result<Response> {
1204        let layer = layer.as_layer_descriptor();
1205        let url = self.to_v2_blob_url(image, layer.digest);
1206
1207        let mut request = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1208            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1209            .apply_auth(image, RegistryOperation::Pull)
1210            .await?
1211            .into_request_builder();
1212        if let (Some(off), Some(len)) = (offset, length) {
1213            let end = (off + len).saturating_sub(1);
1214            request = request.header(
1215                RANGE,
1216                HeaderValue::from_str(&format!("bytes={off}-{end}")).unwrap(),
1217            );
1218        } else if let Some(offset) = offset {
1219            request = request.header(
1220                RANGE,
1221                HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1222            );
1223        }
1224        let mut response = request.send().await?;
1225
1226        if let Some(urls) = &layer.urls {
1227            for url in urls {
1228                if response.error_for_status_ref().is_ok() {
1229                    break;
1230                }
1231
1232                let url = Url::parse(url)
1233                    .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1234
1235                if url.scheme() == "http" || url.scheme() == "https" {
1236                    // NOTE: we must not authenticate on additional URLs as those
1237                    // can be abused to leak credentials or tokens.  Please
1238                    // refer to CVE-2020-15157 for more information.
1239                    request =
1240                        RequestBuilderWrapper::from_client(self, |client| client.get(url.clone()))
1241                            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1242                            .into_request_builder();
1243                    if let Some(offset) = offset {
1244                        request = request.header(
1245                            RANGE,
1246                            HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1247                        );
1248                    }
1249                    response = request.send().await?
1250                }
1251            }
1252        }
1253
1254        Ok(response)
1255    }
1256
1257    /// Begins a session to push an image to registry in a monolithical way
1258    ///
1259    /// Returns URL with session UUID
1260    async fn begin_push_monolithical_session(&self, image: &Reference) -> Result<String> {
1261        let url = &self.to_v2_blob_upload_url(image);
1262        debug!(?url, "begin_push_monolithical_session");
1263        let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1264            .apply_auth(image, RegistryOperation::Push)
1265            .await?
1266            .into_request_builder()
1267            // We set "Content-Length" to 0 here even though the OCI Distribution
1268            // spec does not strictly require that. In practice we have seen that
1269            // certain registries require "Content-Length" to be present for all
1270            // types of push sessions.
1271            .header("Content-Length", 0)
1272            .send()
1273            .await?;
1274
1275        // OCI spec requires the status code be 202 Accepted to successfully begin the push process
1276        self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1277            .await
1278    }
1279
1280    /// Begins a session to push an image to registry as a series of chunks
1281    ///
1282    /// Returns URL with session UUID
1283    async fn begin_push_chunked_session(&self, image: &Reference) -> Result<String> {
1284        let url = &self.to_v2_blob_upload_url(image);
1285        debug!(?url, "begin_push_session");
1286        let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1287            .apply_auth(image, RegistryOperation::Push)
1288            .await?
1289            .into_request_builder()
1290            .header("Content-Length", 0)
1291            .send()
1292            .await?;
1293
1294        // OCI spec requires the status code be 202 Accepted to successfully begin the push process
1295        self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1296            .await
1297    }
1298
1299    /// Closes the chunked push session
1300    ///
1301    /// Returns the pullable URL for the image
1302    async fn end_push_chunked_session(
1303        &self,
1304        location: &str,
1305        image: &Reference,
1306        digest: &str,
1307    ) -> Result<String> {
1308        let url = Url::parse_with_params(location, &[("digest", digest)])
1309            .map_err(|e| OciDistributionError::GenericError(Some(e.to_string())))?;
1310        let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1311            .apply_auth(image, RegistryOperation::Push)
1312            .await?
1313            .into_request_builder()
1314            .header("Content-Length", 0)
1315            .send()
1316            .await?;
1317        self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1318            .await
1319    }
1320
1321    /// Pushes a layer to a registry as a monolithical blob.
1322    ///
1323    /// Returns the URL location for the next layer
1324    async fn push_monolithically(
1325        &self,
1326        location: &str,
1327        image: &Reference,
1328        layer: &[u8],
1329        blob_digest: &str,
1330    ) -> Result<String> {
1331        let mut url = Url::parse(location).unwrap();
1332        url.query_pairs_mut().append_pair("digest", blob_digest);
1333        let url = url.to_string();
1334
1335        debug!(size = layer.len(), location = ?url, "Pushing monolithically");
1336        if layer.is_empty() {
1337            return Err(OciDistributionError::PushNoDataError);
1338        };
1339        let mut headers = HeaderMap::new();
1340        headers.insert(
1341            "Content-Length",
1342            format!("{}", layer.len()).parse().unwrap(),
1343        );
1344        headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1345
1346        let res = RequestBuilderWrapper::from_client(self, |client| client.put(&url))
1347            .apply_auth(image, RegistryOperation::Push)
1348            .await?
1349            .into_request_builder()
1350            .headers(headers)
1351            .body(layer.to_vec())
1352            .send()
1353            .await?;
1354
1355        // Returns location
1356        self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1357            .await
1358    }
1359
1360    /// Pushes a single chunk of a blob to a registry,
1361    /// as part of a chunked blob upload.
1362    ///
1363    /// Returns the URL location for the next chunk
1364    async fn push_chunk(
1365        &self,
1366        location: &str,
1367        image: &Reference,
1368        blob_data: &[u8],
1369        start_byte: usize,
1370    ) -> Result<(String, usize)> {
1371        if blob_data.is_empty() {
1372            return Err(OciDistributionError::PushNoDataError);
1373        };
1374        let end_byte = if (start_byte + self.push_chunk_size) < blob_data.len() {
1375            start_byte + self.push_chunk_size - 1
1376        } else {
1377            blob_data.len() - 1
1378        };
1379        let body = blob_data[start_byte..end_byte + 1].to_vec();
1380        let mut headers = HeaderMap::new();
1381        headers.insert(
1382            "Content-Range",
1383            format!("{}-{}", start_byte, end_byte).parse().unwrap(),
1384        );
1385        headers.insert("Content-Length", format!("{}", body.len()).parse().unwrap());
1386        headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1387
1388        debug!(
1389            ?start_byte,
1390            ?end_byte,
1391            blob_data_len = blob_data.len(),
1392            body_len = body.len(),
1393            ?location,
1394            ?headers,
1395            "Pushing chunk"
1396        );
1397
1398        let res = RequestBuilderWrapper::from_client(self, |client| client.patch(location))
1399            .apply_auth(image, RegistryOperation::Push)
1400            .await?
1401            .into_request_builder()
1402            .headers(headers)
1403            .body(body)
1404            .send()
1405            .await?;
1406
1407        // Returns location for next chunk and the start byte for the next range
1408        Ok((
1409            self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1410                .await?,
1411            end_byte + 1,
1412        ))
1413    }
1414
1415    /// Mounts a blob to the provided reference, from the given source
1416    pub async fn mount_blob(
1417        &self,
1418        image: &Reference,
1419        source: &Reference,
1420        digest: &str,
1421    ) -> Result<()> {
1422        let base_url = self.to_v2_blob_upload_url(image);
1423        let url = Url::parse_with_params(
1424            &base_url,
1425            &[("mount", digest), ("from", source.repository())],
1426        )
1427        .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1428
1429        let res = RequestBuilderWrapper::from_client(self, |client| client.post(url.clone()))
1430            .apply_auth(image, RegistryOperation::Push)
1431            .await?
1432            .into_request_builder()
1433            .send()
1434            .await?;
1435
1436        self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1437            .await?;
1438
1439        Ok(())
1440    }
1441
1442    /// Pushes the manifest for a specified image
1443    ///
1444    /// Returns pullable manifest URL
1445    pub async fn push_manifest(&self, image: &Reference, manifest: &OciManifest) -> Result<String> {
1446        let mut headers = HeaderMap::new();
1447        let content_type = manifest.content_type();
1448        headers.insert("Content-Type", content_type.parse().unwrap());
1449
1450        // Serialize the manifest with a canonical json formatter, as described at
1451        // https://github.com/opencontainers/image-spec/blob/main/considerations.md#json
1452        let mut body = Vec::new();
1453        let mut ser = serde_json::Serializer::with_formatter(&mut body, CanonicalFormatter::new());
1454        manifest.serialize(&mut ser).unwrap();
1455
1456        self.push_manifest_raw(image, body, manifest.content_type().parse().unwrap())
1457            .await
1458    }
1459
1460    /// Pushes the manifest, provided as raw bytes, for a specified image
1461    ///
1462    /// Returns pullable manifest url
1463    pub async fn push_manifest_raw(
1464        &self,
1465        image: &Reference,
1466        body: Vec<u8>,
1467        content_type: HeaderValue,
1468    ) -> Result<String> {
1469        let url = self.to_v2_manifest_url(image);
1470        debug!(?url, ?content_type, "push manifest");
1471
1472        let mut headers = HeaderMap::new();
1473        headers.insert("Content-Type", content_type);
1474
1475        // Calculate the digest of the manifest, this is useful
1476        // if the remote registry is violating the OCI Distribution Specification.
1477        // See below for more details.
1478        let manifest_hash = sha256_digest(&body);
1479
1480        let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1481            .apply_auth(image, RegistryOperation::Push)
1482            .await?
1483            .into_request_builder()
1484            .headers(headers)
1485            .body(body)
1486            .send()
1487            .await?;
1488
1489        let ret = self
1490            .extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1491            .await;
1492
1493        if matches!(ret, Err(OciDistributionError::RegistryNoLocationError)) {
1494            // The registry is violating the OCI Distribution Spec, BUT the OCI
1495            // image/artifact has been uploaded successfully.
1496            // The `Location` header contains the sha256 digest of the manifest,
1497            // we can reuse the value we calculated before.
1498            // The workaround is there because repositories such as
1499            // AWS ECR are violating this aspect of the spec. This at least let the
1500            // oci-distribution users interact with these registries.
1501            warn!("Registry is not respecting the OCI Distribution Specification: it didn't return the Location of the uploaded Manifest inside of the response headers. Working around this issue...");
1502
1503            let url_base = url
1504                .strip_suffix(image.tag().unwrap_or("latest"))
1505                .expect("The manifest URL always ends with the image tag suffix");
1506            let url_by_digest = format!("{}{}", url_base, manifest_hash);
1507
1508            return Ok(url_by_digest);
1509        }
1510
1511        ret
1512    }
1513
1514    /// Pulls the referrers for the given image filtering by the optionally provided artifact type.
1515    pub async fn pull_referrers(
1516        &self,
1517        image: &Reference,
1518        artifact_type: Option<&str>,
1519    ) -> Result<OciImageIndex> {
1520        let url = self.to_v2_referrers_url(image, artifact_type)?;
1521        debug!("Pulling referrers from {}", url);
1522
1523        let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1524            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1525            .apply_auth(image, RegistryOperation::Pull)
1526            .await?
1527            .into_request_builder()
1528            .send()
1529            .await?;
1530        let status = res.status();
1531        let body = res.bytes().await?;
1532
1533        validate_registry_response(status, &body, &url)?;
1534        let manifest = serde_json::from_slice(&body)
1535            .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
1536
1537        Ok(manifest)
1538    }
1539
1540    async fn extract_location_header(
1541        &self,
1542        image: &Reference,
1543        res: reqwest::Response,
1544        expected_status: &reqwest::StatusCode,
1545    ) -> Result<String> {
1546        debug!(expected_status_code=?expected_status.as_u16(),
1547            status_code=?res.status().as_u16(),
1548            "extract location header");
1549        if res.status().eq(expected_status) {
1550            let location_header = res.headers().get("Location");
1551            debug!(location=?location_header, "Location header");
1552            match location_header {
1553                None => Err(OciDistributionError::RegistryNoLocationError),
1554                Some(lh) => self.location_header_to_url(image, lh),
1555            }
1556        } else if res.status().is_success() && expected_status.is_success() {
1557            Err(OciDistributionError::SpecViolationError(format!(
1558                "Expected HTTP Status {}, got {} instead",
1559                expected_status,
1560                res.status(),
1561            )))
1562        } else {
1563            let url = res.url().to_string();
1564            let code = res.status().as_u16();
1565            let message = res.text().await?;
1566            Err(OciDistributionError::ServerError { url, code, message })
1567        }
1568    }
1569
1570    /// Helper function to convert location header to URL
1571    ///
1572    /// Location may be absolute (containing the protocol and/or hostname), or relative (containing just the URL path)
1573    /// Returns a properly formatted absolute URL
1574    fn location_header_to_url(
1575        &self,
1576        image: &Reference,
1577        location_header: &reqwest::header::HeaderValue,
1578    ) -> Result<String> {
1579        let lh = location_header.to_str()?;
1580        if lh.starts_with("/") {
1581            let registry = image.resolve_registry();
1582            Ok(format!(
1583                "{scheme}://{registry}{lh}",
1584                scheme = self.config.protocol.scheme_for(registry)
1585            ))
1586        } else {
1587            Ok(lh.to_string())
1588        }
1589    }
1590
1591    /// Convert a Reference to a v2 manifest URL.
1592    fn to_v2_manifest_url(&self, reference: &Reference) -> String {
1593        let registry = reference.resolve_registry();
1594        format!(
1595            "{scheme}://{registry}/v2/{repository}/manifests/{reference}{ns}",
1596            scheme = self.config.protocol.scheme_for(registry),
1597            repository = reference.repository(),
1598            reference = if let Some(digest) = reference.digest() {
1599                digest
1600            } else {
1601                reference.tag().unwrap_or("latest")
1602            },
1603            ns = reference
1604                .namespace()
1605                .map(|ns| format!("?ns={ns}"))
1606                .unwrap_or_default(),
1607        )
1608    }
1609
1610    /// Convert a Reference to a v2 blob (layer) URL.
1611    fn to_v2_blob_url(&self, reference: &Reference, digest: &str) -> String {
1612        let registry = reference.resolve_registry();
1613        format!(
1614            "{scheme}://{registry}/v2/{repository}/blobs/{digest}{ns}",
1615            scheme = self.config.protocol.scheme_for(registry),
1616            repository = reference.repository(),
1617            ns = reference
1618                .namespace()
1619                .map(|ns| format!("?ns={ns}"))
1620                .unwrap_or_default(),
1621        )
1622    }
1623
1624    /// Convert a Reference to a v2 blob upload URL.
1625    fn to_v2_blob_upload_url(&self, reference: &Reference) -> String {
1626        self.to_v2_blob_url(reference, "uploads/")
1627    }
1628
1629    fn to_list_tags_url(&self, reference: &Reference) -> String {
1630        let registry = reference.resolve_registry();
1631        format!(
1632            "{scheme}://{registry}/v2/{repository}/tags/list{ns}",
1633            scheme = self.config.protocol.scheme_for(registry),
1634            repository = reference.repository(),
1635            ns = reference
1636                .namespace()
1637                .map(|ns| format!("?ns={ns}"))
1638                .unwrap_or_default(),
1639        )
1640    }
1641
1642    /// Convert a Reference to a v2 manifest URL.
1643    fn to_v2_referrers_url(
1644        &self,
1645        reference: &Reference,
1646        artifact_type: Option<&str>,
1647    ) -> Result<String> {
1648        let registry = reference.resolve_registry();
1649        Ok(format!(
1650            "{scheme}://{registry}/v2/{repository}/referrers/{reference}{at}",
1651            scheme = self.config.protocol.scheme_for(registry),
1652            repository = reference.repository(),
1653            reference = if let Some(digest) = reference.digest() {
1654                digest
1655            } else {
1656                return Err(OciDistributionError::GenericError(Some(
1657                    "Getting referrers for a tag is not supported".into(),
1658                )));
1659            },
1660            at = artifact_type
1661                .map(|at| format!("?artifactType={at}"))
1662                .unwrap_or_default(),
1663        ))
1664    }
1665}
1666
1667/// The OCI spec technically does not allow any codes but 200, 500, 401, and 404.
1668/// Obviously, HTTP servers are going to send other codes. This tries to catch the
1669/// obvious ones (200, 4XX, 5XX). Anything else is just treated as an error.
1670fn validate_registry_response(status: reqwest::StatusCode, body: &[u8], url: &str) -> Result<()> {
1671    match status {
1672        reqwest::StatusCode::OK => Ok(()),
1673        reqwest::StatusCode::UNAUTHORIZED => Err(OciDistributionError::UnauthorizedError {
1674            url: url.to_string(),
1675        }),
1676        s if s.is_success() => Err(OciDistributionError::SpecViolationError(format!(
1677            "Expected HTTP Status {}, got {} instead",
1678            reqwest::StatusCode::OK,
1679            status,
1680        ))),
1681        s if s.is_client_error() => {
1682            match serde_json::from_slice::<OciEnvelope>(body) {
1683                // According to the OCI spec, we should see an error in the message body.
1684                Ok(envelope) => Err(OciDistributionError::RegistryError {
1685                    envelope,
1686                    url: url.to_string(),
1687                }),
1688                // Fall back to a plain server error if the body isn't a valid `OciEnvelope`
1689                Err(_) => Err(OciDistributionError::ServerError {
1690                    code: s.as_u16(),
1691                    url: url.to_string(),
1692                    message: String::from_utf8_lossy(body).to_string(),
1693                }),
1694            }
1695        }
1696        s => {
1697            let text = std::str::from_utf8(body)?;
1698
1699            Err(OciDistributionError::ServerError {
1700                code: s.as_u16(),
1701                url: url.to_string(),
1702                message: text.to_string(),
1703            })
1704        }
1705    }
1706}
1707
1708/// Converts a response into a stream
1709fn stream_from_response(
1710    response: Response,
1711    layer: impl AsLayerDescriptor,
1712    verify: bool,
1713) -> Result<SizedStream> {
1714    let content_length = response.content_length();
1715    let headers = response.headers().clone();
1716    let stream = response
1717        .error_for_status()?
1718        .bytes_stream()
1719        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
1720
1721    let expected_layer_digest = layer.as_layer_descriptor().digest.to_string();
1722    let layer_digester = Digester::new(&expected_layer_digest)?;
1723    let header_digester_and_digest = match digest_header_value(headers)? {
1724        // If the digests match, we don't need to do both digesters
1725        Some(digest) if digest == expected_layer_digest => None,
1726        Some(digest) => Some((Digester::new(&digest)?, digest)),
1727        None => None,
1728    };
1729    let header_digest = header_digester_and_digest
1730        .as_ref()
1731        .map(|(_, digest)| digest.to_owned());
1732    let stream: BoxStream<'static, std::result::Result<bytes::Bytes, std::io::Error>> = if verify {
1733        Box::pin(VerifyingStream::new(
1734            Box::pin(stream),
1735            layer_digester,
1736            expected_layer_digest,
1737            header_digester_and_digest,
1738        ))
1739    } else {
1740        Box::pin(stream)
1741    };
1742    Ok(SizedStream {
1743        content_length,
1744        digest_header_value: header_digest,
1745        stream,
1746    })
1747}
1748
1749/// The request builder wrapper allows to be instantiated from a
1750/// `Client` and allows composable operations on the request builder,
1751/// to produce a `RequestBuilder` object that can be executed.
1752struct RequestBuilderWrapper<'a> {
1753    client: &'a Client,
1754    request_builder: RequestBuilder,
1755}
1756
1757// RequestBuilderWrapper type management
1758impl<'a> RequestBuilderWrapper<'a> {
1759    /// Create a `RequestBuilderWrapper` from a `Client` instance, by
1760    /// instantiating the internal `RequestBuilder` with the provided
1761    /// function `f`.
1762    fn from_client(
1763        client: &'a Client,
1764        f: impl Fn(&reqwest::Client) -> RequestBuilder,
1765    ) -> RequestBuilderWrapper<'a> {
1766        let request_builder = f(&client.client);
1767        RequestBuilderWrapper {
1768            client,
1769            request_builder,
1770        }
1771    }
1772
1773    // Produces a final `RequestBuilder` out of this `RequestBuilderWrapper`
1774    fn into_request_builder(self) -> RequestBuilder {
1775        self.request_builder
1776    }
1777}
1778
1779// Composable functions applicable to a `RequestBuilderWrapper`
1780impl<'a> RequestBuilderWrapper<'a> {
1781    fn apply_accept(&self, accept: &[&str]) -> Result<RequestBuilderWrapper> {
1782        let request_builder = self
1783            .request_builder
1784            .try_clone()
1785            .ok_or_else(|| {
1786                OciDistributionError::GenericError(Some(
1787                    "could not clone request builder".to_string(),
1788                ))
1789            })?
1790            .header("Accept", Vec::from(accept).join(", "));
1791
1792        Ok(RequestBuilderWrapper {
1793            client: self.client,
1794            request_builder,
1795        })
1796    }
1797
1798    /// Updates request as necessary for authentication.
1799    ///
1800    /// If the struct has Some(bearer), this will insert the bearer token in an
1801    /// Authorization header. It will also set the Accept header, which must
1802    /// be set on all OCI Registry requests. If the struct has HTTP Basic Auth
1803    /// credentials, these will be configured.
1804    async fn apply_auth(
1805        &self,
1806        image: &Reference,
1807        op: RegistryOperation,
1808    ) -> Result<RequestBuilderWrapper> {
1809        let mut headers = HeaderMap::new();
1810
1811        if let Some(token) = self.client.get_auth_token(image, op).await {
1812            match token {
1813                RegistryTokenType::Bearer(token) => {
1814                    debug!("Using bearer token authentication.");
1815                    headers.insert("Authorization", token.bearer_token().parse().unwrap());
1816                }
1817                RegistryTokenType::Basic(username, password) => {
1818                    debug!("Using HTTP basic authentication.");
1819                    return Ok(RequestBuilderWrapper {
1820                        client: self.client,
1821                        request_builder: self
1822                            .request_builder
1823                            .try_clone()
1824                            .ok_or_else(|| {
1825                                OciDistributionError::GenericError(Some(
1826                                    "could not clone request builder".to_string(),
1827                                ))
1828                            })?
1829                            .headers(headers)
1830                            .basic_auth(username.to_string(), Some(password.to_string())),
1831                    });
1832                }
1833            }
1834        }
1835        Ok(RequestBuilderWrapper {
1836            client: self.client,
1837            request_builder: self
1838                .request_builder
1839                .try_clone()
1840                .ok_or_else(|| {
1841                    OciDistributionError::GenericError(Some(
1842                        "could not clone request builder".to_string(),
1843                    ))
1844                })?
1845                .headers(headers),
1846        })
1847    }
1848}
1849
1850/// The encoding of the certificate
1851#[derive(Debug, Clone)]
1852pub enum CertificateEncoding {
1853    #[allow(missing_docs)]
1854    Der,
1855    #[allow(missing_docs)]
1856    Pem,
1857}
1858
1859/// A x509 certificate
1860#[derive(Debug, Clone)]
1861pub struct Certificate {
1862    /// Which encoding is used by the certificate
1863    pub encoding: CertificateEncoding,
1864
1865    /// Actual certificate
1866    pub data: Vec<u8>,
1867}
1868
1869/// A client configuration
1870pub struct ClientConfig {
1871    /// Which protocol the client should use
1872    pub protocol: ClientProtocol,
1873
1874    /// Accept invalid hostname. Defaults to false
1875    #[cfg(feature = "native-tls")]
1876    pub accept_invalid_hostnames: bool,
1877
1878    /// Accept invalid certificates. Defaults to false
1879    pub accept_invalid_certificates: bool,
1880
1881    /// Use monolithic push for pushing blobs. Defaults to false
1882    pub use_monolithic_push: bool,
1883
1884    /// A list of extra root certificate to trust. This can be used to connect
1885    /// to servers using self-signed certificates
1886    pub extra_root_certificates: Vec<Certificate>,
1887
1888    /// A function that defines the client's behaviour if an Image Index Manifest
1889    /// (i.e Manifest List) is encountered when pulling an image.
1890    /// Defaults to [current_platform_resolver](self::current_platform_resolver),
1891    /// which attempts to choose an image matching the running OS and Arch.
1892    ///
1893    /// If set to None, an error is raised if an Image Index manifest is received
1894    /// during an image pull.
1895    pub platform_resolver: Option<Box<PlatformResolverFn>>,
1896
1897    /// Maximum number of concurrent uploads to perform during a `push`
1898    /// operation.
1899    ///
1900    /// This defaults to [`DEFAULT_MAX_CONCURRENT_UPLOAD`].
1901    pub max_concurrent_upload: usize,
1902
1903    /// Maximum number of concurrent downloads to perform during a `pull`
1904    /// operation.
1905    ///
1906    /// This defaults to [`DEFAULT_MAX_CONCURRENT_DOWNLOAD`].
1907    pub max_concurrent_download: usize,
1908
1909    /// Default token expiration in seconds, to use when the token claim
1910    /// doesn't provide a value.
1911    ///
1912    /// This defaults to [`DEFAULT_TOKEN_EXPIRATION_SECS`].
1913    pub default_token_expiration_secs: usize,
1914
1915    /// Enables a read timeout for the client.
1916    ///
1917    /// See [`reqwest::ClientBuilder::read_timeout`] for more information.
1918    pub read_timeout: Option<Duration>,
1919
1920    /// Set a timeout for the connect phase for the client.
1921    ///
1922    /// See [`reqwest::ClientBuilder::connect_timeout`] for more information.
1923    pub connect_timeout: Option<Duration>,
1924
1925    /// Set the `User-Agent` used by the client.
1926    ///
1927    /// This defaults to [`DEFAULT_USER_AGENT`].
1928    pub user_agent: &'static str,
1929
1930    /// Set the `HTTPS PROXY` used by the client.
1931    ///
1932    /// This defaults to `None`.
1933    pub https_proxy: Option<String>,
1934
1935    /// Set the `HTTP PROXY` used by the client.
1936    ///
1937    /// This defaults to `None`.
1938    pub http_proxy: Option<String>,
1939
1940    /// Set the `NO PROXY` used by the client.
1941    ///
1942    /// This defaults to `None`.
1943    pub no_proxy: Option<String>,
1944}
1945
1946impl Default for ClientConfig {
1947    fn default() -> Self {
1948        Self {
1949            protocol: ClientProtocol::default(),
1950            #[cfg(feature = "native-tls")]
1951            accept_invalid_hostnames: false,
1952            accept_invalid_certificates: false,
1953            use_monolithic_push: false,
1954            extra_root_certificates: Vec::new(),
1955            platform_resolver: Some(Box::new(current_platform_resolver)),
1956            max_concurrent_upload: DEFAULT_MAX_CONCURRENT_UPLOAD,
1957            max_concurrent_download: DEFAULT_MAX_CONCURRENT_DOWNLOAD,
1958            default_token_expiration_secs: DEFAULT_TOKEN_EXPIRATION_SECS,
1959            read_timeout: None,
1960            connect_timeout: None,
1961            user_agent: DEFAULT_USER_AGENT,
1962            https_proxy: None,
1963            http_proxy: None,
1964            no_proxy: None,
1965        }
1966    }
1967}
1968
1969// Be explicit about the traits supported by this type. This is needed to use
1970// the Client behind a dynamic reference.
1971// Something similar to what is described here: https://users.rust-lang.org/t/how-to-send-function-closure-to-another-thread/43549
1972type PlatformResolverFn = dyn Fn(&[ImageIndexEntry]) -> Option<String> + Send + Sync;
1973
1974/// A platform resolver that chooses the first linux/amd64 variant, if present
1975pub fn linux_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
1976    manifests
1977        .iter()
1978        .find(|entry| {
1979            entry.platform.as_ref().map_or(false, |platform| {
1980                platform.os == "linux" && platform.architecture == "amd64"
1981            })
1982        })
1983        .map(|entry| entry.digest.clone())
1984}
1985
1986/// A platform resolver that chooses the first windows/amd64 variant, if present
1987pub fn windows_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
1988    manifests
1989        .iter()
1990        .find(|entry| {
1991            entry.platform.as_ref().map_or(false, |platform| {
1992                platform.os == "windows" && platform.architecture == "amd64"
1993            })
1994        })
1995        .map(|entry| entry.digest.clone())
1996}
1997
1998const MACOS: &str = "macos";
1999const DARWIN: &str = "darwin";
2000
2001fn go_os() -> &'static str {
2002    // Massage Rust OS var to GO OS:
2003    // - Rust: https://doc.rust-lang.org/std/env/consts/constant.OS.html
2004    // - Go: https://golang.org/doc/install/source#environment
2005    match std::env::consts::OS {
2006        MACOS => DARWIN,
2007        other => other,
2008    }
2009}
2010
2011const X86_64: &str = "x86_64";
2012const AMD64: &str = "amd64";
2013const X86: &str = "x86";
2014const AMD: &str = "amd";
2015const ARM64: &str = "arm64";
2016const AARCH64: &str = "aarch64";
2017const POWERPC64: &str = "powerpc64";
2018const PPC64LE: &str = "ppc64le";
2019
2020fn go_arch() -> &'static str {
2021    // Massage Rust Architecture vars to GO equivalent:
2022    // - Rust: https://doc.rust-lang.org/std/env/consts/constant.ARCH.html
2023    // - Go: https://golang.org/doc/install/source#environment
2024    match std::env::consts::ARCH {
2025        X86_64 => AMD64,
2026        X86 => AMD,
2027        AARCH64 => ARM64,
2028        POWERPC64 => PPC64LE,
2029        other => other,
2030    }
2031}
2032
2033/// A platform resolver that chooses the first variant matching the running OS/Arch, if present.
2034/// Doesn't currently handle platform.variants.
2035pub fn current_platform_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2036    manifests
2037        .iter()
2038        .find(|entry| {
2039            entry.platform.as_ref().map_or(false, |platform| {
2040                platform.os == go_os() && platform.architecture == go_arch()
2041            })
2042        })
2043        .map(|entry| entry.digest.clone())
2044}
2045
2046/// The protocol that the client should use to connect
2047#[derive(Debug, Clone, PartialEq, Eq, Default)]
2048pub enum ClientProtocol {
2049    #[allow(missing_docs)]
2050    Http,
2051    #[allow(missing_docs)]
2052    #[default]
2053    Https,
2054    #[allow(missing_docs)]
2055    HttpsExcept(Vec<String>),
2056}
2057
2058impl ClientProtocol {
2059    fn scheme_for(&self, registry: &str) -> &str {
2060        match self {
2061            ClientProtocol::Https => "https",
2062            ClientProtocol::Http => "http",
2063            ClientProtocol::HttpsExcept(exceptions) => {
2064                if exceptions.contains(&registry.to_owned()) {
2065                    "http"
2066                } else {
2067                    "https"
2068                }
2069            }
2070        }
2071    }
2072}
2073
2074#[derive(Clone, Debug)]
2075struct BearerChallenge {
2076    pub realm: Box<str>,
2077    pub service: Option<String>,
2078}
2079
2080impl TryFrom<&HeaderValue> for BearerChallenge {
2081    type Error = String;
2082
2083    fn try_from(value: &HeaderValue) -> std::result::Result<Self, Self::Error> {
2084        let parser = ChallengeParser::new(
2085            value
2086                .to_str()
2087                .map_err(|e| format!("cannot convert header value to string: {:?}", e))?,
2088        );
2089        parser
2090            .filter_map(|parser_res| {
2091                if let Ok(chalenge_ref) = parser_res {
2092                    let bearer_challenge = BearerChallenge::try_from(&chalenge_ref);
2093                    bearer_challenge.ok()
2094                } else {
2095                    None
2096                }
2097            })
2098            .next()
2099            .ok_or_else(|| "Cannot find Bearer challenge".to_string())
2100    }
2101}
2102
2103impl TryFrom<&ChallengeRef<'_>> for BearerChallenge {
2104    type Error = String;
2105
2106    fn try_from(value: &ChallengeRef<'_>) -> std::result::Result<Self, Self::Error> {
2107        if !value.scheme.eq_ignore_ascii_case("Bearer") {
2108            return Err(format!(
2109                "BearerChallenge doesn't support challenge scheme {:?}",
2110                value.scheme
2111            ));
2112        }
2113        let mut realm = None;
2114        let mut service = None;
2115        for (k, v) in &value.params {
2116            if k.eq_ignore_ascii_case("realm") {
2117                realm = Some(v.to_unescaped());
2118            }
2119
2120            if k.eq_ignore_ascii_case("service") {
2121                service = Some(v.to_unescaped());
2122            }
2123        }
2124
2125        let realm = realm.ok_or("missing required parameter realm")?;
2126
2127        Ok(BearerChallenge {
2128            realm: realm.into_boxed_str(),
2129            service,
2130        })
2131    }
2132}
2133
2134#[cfg(test)]
2135mod test {
2136    use super::*;
2137    use std::convert::TryFrom;
2138    use std::fs;
2139    use std::path;
2140    use std::result::Result;
2141
2142    use rstest::rstest;
2143    use sha2::Digest as _;
2144    use tempfile::TempDir;
2145    use tokio::io::AsyncReadExt;
2146    use tokio_util::io::StreamReader;
2147
2148    use crate::manifest::{self, IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE};
2149
2150    #[cfg(feature = "test-registry")]
2151    use testcontainers::{
2152        core::{Mount, WaitFor},
2153        runners::AsyncRunner,
2154        ContainerRequest, GenericImage, ImageExt,
2155    };
2156
2157    const HELLO_IMAGE_NO_TAG: &str = "webassembly.azurecr.io/hello-wasm";
2158    const HELLO_IMAGE_TAG: &str = "webassembly.azurecr.io/hello-wasm:v1";
2159    const HELLO_IMAGE_DIGEST: &str = "webassembly.azurecr.io/hello-wasm@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
2160    const HELLO_IMAGE_TAG_AND_DIGEST: &str = "webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
2161    const TEST_IMAGES: &[&str] = &[
2162        // TODO(jlegrone): this image cannot be pulled currently because no `latest`
2163        //                 tag exists on the image repository. Re-enable this image
2164        //                 in tests once `latest` is published.
2165        // HELLO_IMAGE_NO_TAG,
2166        HELLO_IMAGE_TAG,
2167        HELLO_IMAGE_DIGEST,
2168        HELLO_IMAGE_TAG_AND_DIGEST,
2169    ];
2170    const GHCR_IO_IMAGE: &str = "ghcr.io/krustlet/oci-distribution/hello-wasm:v1";
2171    const DOCKER_IO_IMAGE: &str = "docker.io/library/hello-world@sha256:37a0b92b08d4919615c3ee023f7ddb068d12b8387475d64c622ac30f45c29c51";
2172    const HTPASSWD: &str = "testuser:$2y$05$8/q2bfRcX74EuxGf0qOcSuhWDQJXrgWiy6Fi73/JM2tKC66qSrLve";
2173    const HTPASSWD_USERNAME: &str = "testuser";
2174    const HTPASSWD_PASSWORD: &str = "testpassword";
2175
2176    #[test]
2177    fn test_apply_accept() -> anyhow::Result<()> {
2178        assert_eq!(
2179            RequestBuilderWrapper::from_client(&Client::default(), |client| client
2180                .get("https://example.com/some/module.wasm"))
2181            .apply_accept(&["*/*"])?
2182            .into_request_builder()
2183            .build()?
2184            .headers()["Accept"],
2185            "*/*"
2186        );
2187
2188        assert_eq!(
2189            RequestBuilderWrapper::from_client(&Client::default(), |client| client
2190                .get("https://example.com/some/module.wasm"))
2191            .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
2192            .into_request_builder()
2193            .build()?
2194            .headers()["Accept"],
2195            MIME_TYPES_DISTRIBUTION_MANIFEST.join(", ")
2196        );
2197
2198        Ok(())
2199    }
2200
2201    #[tokio::test]
2202    async fn test_apply_auth_no_token() -> anyhow::Result<()> {
2203        assert!(
2204            !RequestBuilderWrapper::from_client(&Client::default(), |client| client
2205                .get("https://example.com/some/module.wasm"))
2206            .apply_auth(
2207                &Reference::try_from(HELLO_IMAGE_TAG)?,
2208                RegistryOperation::Pull
2209            )
2210            .await?
2211            .into_request_builder()
2212            .build()?
2213            .headers()
2214            .contains_key("Authorization")
2215        );
2216
2217        Ok(())
2218    }
2219
2220    #[tokio::test]
2221    async fn test_apply_auth_bearer_token() -> anyhow::Result<()> {
2222        use hmac::{Hmac, Mac};
2223        use jwt::SignWithKey;
2224        use sha2::Sha256;
2225        let client = Client::default();
2226        let header = jwt::header::Header {
2227            algorithm: jwt::algorithm::AlgorithmType::Hs256,
2228            key_id: None,
2229            type_: None,
2230            content_type: None,
2231        };
2232        let claims: jwt::claims::Claims = Default::default();
2233        let key: Hmac<Sha256> = Hmac::new_from_slice(b"some-secret").unwrap();
2234        let token = jwt::Token::new(header, claims)
2235            .sign_with_key(&key)?
2236            .as_str()
2237            .to_string();
2238
2239        // we have to have it in the stored auth so we'll get to the token cache check.
2240        client
2241            .store_auth(
2242                Reference::try_from(HELLO_IMAGE_TAG)?.resolve_registry(),
2243                RegistryAuth::Anonymous,
2244            )
2245            .await;
2246
2247        client
2248            .tokens
2249            .insert(
2250                &Reference::try_from(HELLO_IMAGE_TAG)?,
2251                RegistryOperation::Pull,
2252                RegistryTokenType::Bearer(RegistryToken::Token {
2253                    token: token.clone(),
2254                }),
2255            )
2256            .await;
2257        assert_eq!(
2258            RequestBuilderWrapper::from_client(&client, |client| client
2259                .get("https://example.com/some/module.wasm"))
2260            .apply_auth(
2261                &Reference::try_from(HELLO_IMAGE_TAG)?,
2262                RegistryOperation::Pull
2263            )
2264            .await?
2265            .into_request_builder()
2266            .build()?
2267            .headers()["Authorization"],
2268            format!("Bearer {}", &token)
2269        );
2270
2271        Ok(())
2272    }
2273
2274    #[test]
2275    fn test_to_v2_blob_url() {
2276        let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2277        let c = Client::default();
2278
2279        assert_eq!(
2280            c.to_v2_blob_url(&image, "sha256:deadbeef"),
2281            "https://webassembly.azurecr.io/v2/hello-wasm/blobs/sha256:deadbeef"
2282        );
2283
2284        image.set_mirror_registry("docker.mirror.io".to_owned());
2285        assert_eq!(
2286            c.to_v2_blob_url(&image, "sha256:deadbeef"),
2287            "https://docker.mirror.io/v2/hello-wasm/blobs/sha256:deadbeef?ns=webassembly.azurecr.io"
2288        );
2289    }
2290
2291    #[rstest(image, expected_uri, expected_mirror_uri,
2292        case(HELLO_IMAGE_NO_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/latest", "https://docker.mirror.io/v2/hello-wasm/manifests/latest?ns=webassembly.azurecr.io"), // TODO: confirm this is the right translation when no tag
2293        case(HELLO_IMAGE_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/v1", "https://docker.mirror.io/v2/hello-wasm/manifests/v1?ns=webassembly.azurecr.io"),
2294        case(HELLO_IMAGE_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
2295        case(HELLO_IMAGE_TAG_AND_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
2296    )]
2297    fn test_to_v2_manifest(image: &str, expected_uri: &str, expected_mirror_uri: &str) {
2298        let mut reference = Reference::try_from(image).expect("failed to parse reference");
2299        let c = Client::default();
2300        assert_eq!(c.to_v2_manifest_url(&reference), expected_uri);
2301
2302        reference.set_mirror_registry("docker.mirror.io".to_owned());
2303        assert_eq!(c.to_v2_manifest_url(&reference), expected_mirror_uri);
2304    }
2305
2306    #[test]
2307    fn test_to_v2_blob_upload_url() {
2308        let image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2309        let blob_url = Client::default().to_v2_blob_upload_url(&image);
2310
2311        assert_eq!(
2312            blob_url,
2313            "https://webassembly.azurecr.io/v2/hello-wasm/blobs/uploads/"
2314        )
2315    }
2316
2317    #[test]
2318    fn test_to_list_tags_url() {
2319        let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2320        let c = Client::default();
2321
2322        assert_eq!(
2323            c.to_list_tags_url(&image),
2324            "https://webassembly.azurecr.io/v2/hello-wasm/tags/list"
2325        );
2326
2327        image.set_mirror_registry("docker.mirror.io".to_owned());
2328        assert_eq!(
2329            c.to_list_tags_url(&image),
2330            "https://docker.mirror.io/v2/hello-wasm/tags/list?ns=webassembly.azurecr.io"
2331        );
2332    }
2333
2334    #[test]
2335    fn manifest_url_generation_respects_http_protocol() {
2336        let c = Client::new(ClientConfig {
2337            protocol: ClientProtocol::Http,
2338            ..Default::default()
2339        });
2340        let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2341            .expect("Could not parse reference");
2342        assert_eq!(
2343            "http://webassembly.azurecr.io/v2/hello/manifests/v1",
2344            c.to_v2_manifest_url(&reference)
2345        );
2346    }
2347
2348    #[test]
2349    fn blob_url_generation_respects_http_protocol() {
2350        let c = Client::new(ClientConfig {
2351            protocol: ClientProtocol::Http,
2352            ..Default::default()
2353        });
2354        let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2355            .expect("Could not parse reference");
2356        assert_eq!(
2357            "http://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2358            c.to_v2_blob_url(&reference, reference.digest().unwrap())
2359        );
2360    }
2361
2362    #[test]
2363    fn manifest_url_generation_uses_https_if_not_on_exception_list() {
2364        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2365        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2366        let c = Client::new(ClientConfig {
2367            protocol,
2368            ..Default::default()
2369        });
2370        let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2371            .expect("Could not parse reference");
2372        assert_eq!(
2373            "https://webassembly.azurecr.io/v2/hello/manifests/v1",
2374            c.to_v2_manifest_url(&reference)
2375        );
2376    }
2377
2378    #[test]
2379    fn manifest_url_generation_uses_http_if_on_exception_list() {
2380        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2381        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2382        let c = Client::new(ClientConfig {
2383            protocol,
2384            ..Default::default()
2385        });
2386        let reference = Reference::try_from("oci.registry.local/hello:v1".to_owned())
2387            .expect("Could not parse reference");
2388        assert_eq!(
2389            "http://oci.registry.local/v2/hello/manifests/v1",
2390            c.to_v2_manifest_url(&reference)
2391        );
2392    }
2393
2394    #[test]
2395    fn blob_url_generation_uses_https_if_not_on_exception_list() {
2396        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2397        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2398        let c = Client::new(ClientConfig {
2399            protocol,
2400            ..Default::default()
2401        });
2402        let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2403            .expect("Could not parse reference");
2404        assert_eq!(
2405            "https://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2406            c.to_v2_blob_url(&reference, reference.digest().unwrap())
2407        );
2408    }
2409
2410    #[test]
2411    fn blob_url_generation_uses_http_if_on_exception_list() {
2412        let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2413        let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2414        let c = Client::new(ClientConfig {
2415            protocol,
2416            ..Default::default()
2417        });
2418        let reference = Reference::try_from("oci.registry.local/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2419            .expect("Could not parse reference");
2420        assert_eq!(
2421            "http://oci.registry.local/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2422            c.to_v2_blob_url(&reference, reference.digest().unwrap())
2423        );
2424    }
2425
2426    #[test]
2427    fn can_generate_valid_digest() {
2428        let bytes = b"hellobytes";
2429        let hash = sha256_digest(bytes);
2430
2431        let combination = vec![b"hello".to_vec(), b"bytes".to_vec()];
2432        let combination_hash =
2433            sha256_digest(&combination.into_iter().flatten().collect::<Vec<u8>>());
2434
2435        assert_eq!(
2436            hash,
2437            "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2438        );
2439        assert_eq!(
2440            combination_hash,
2441            "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2442        );
2443    }
2444
2445    #[test]
2446    fn test_registry_token_deserialize() {
2447        // 'token' field, standalone
2448        let text = r#"{"token": "abc"}"#;
2449        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2450        assert!(res.is_ok());
2451        let rt = res.unwrap();
2452        assert_eq!(rt.token(), "abc");
2453
2454        // 'access_token' field, standalone
2455        let text = r#"{"access_token": "xyz"}"#;
2456        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2457        assert!(res.is_ok());
2458        let rt = res.unwrap();
2459        assert_eq!(rt.token(), "xyz");
2460
2461        // both 'token' and 'access_token' fields, 'token' field takes precedence
2462        let text = r#"{"access_token": "xyz", "token": "abc"}"#;
2463        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2464        assert!(res.is_ok());
2465        let rt = res.unwrap();
2466        assert_eq!(rt.token(), "abc");
2467
2468        // both 'token' and 'access_token' fields, 'token' field takes precedence (reverse order)
2469        let text = r#"{"token": "abc", "access_token": "xyz"}"#;
2470        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2471        assert!(res.is_ok());
2472        let rt = res.unwrap();
2473        assert_eq!(rt.token(), "abc");
2474
2475        // non-string fields do not break parsing
2476        let text = r#"{"aaa": 300, "access_token": "xyz", "token": "abc", "zzz": 600}"#;
2477        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2478        assert!(res.is_ok());
2479
2480        // Note: tokens should always be strings. The next two tests ensure that if one field
2481        // is invalid (integer), then parse can still succeed if the other field is a string.
2482        //
2483        // numeric 'access_token' field, but string 'token' field does not in parse error
2484        let text = r#"{"access_token": 300, "token": "abc"}"#;
2485        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2486        assert!(res.is_ok());
2487        let rt = res.unwrap();
2488        assert_eq!(rt.token(), "abc");
2489
2490        // numeric 'token' field, but string 'accesss_token' field does not in parse error
2491        let text = r#"{"access_token": "xyz", "token": 300}"#;
2492        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2493        assert!(res.is_ok());
2494        let rt = res.unwrap();
2495        assert_eq!(rt.token(), "xyz");
2496
2497        // numeric 'token' field results in parse error
2498        let text = r#"{"token": 300}"#;
2499        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2500        assert!(res.is_err());
2501
2502        // numeric 'access_token' field results in parse error
2503        let text = r#"{"access_token": 300}"#;
2504        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2505        assert!(res.is_err());
2506
2507        // object 'token' field results in parse error
2508        let text = r#"{"token": {"some": "thing"}}"#;
2509        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2510        assert!(res.is_err());
2511
2512        // object 'access_token' field results in parse error
2513        let text = r#"{"access_token": {"some": "thing"}}"#;
2514        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2515        assert!(res.is_err());
2516
2517        // missing fields results in parse error
2518        let text = r#"{"some": "thing"}"#;
2519        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2520        assert!(res.is_err());
2521
2522        // bad JSON results in parse error
2523        let text = r#"{"token": "abc""#;
2524        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2525        assert!(res.is_err());
2526
2527        // worse JSON results in parse error
2528        let text = r#"_ _ _ kjbwef??98{9898 }} }}"#;
2529        let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2530        assert!(res.is_err());
2531    }
2532
2533    fn check_auth_token(token: &str) {
2534        // We test that the token is longer than a minimal hash.
2535        assert!(token.len() > 64);
2536    }
2537
2538    #[tokio::test]
2539    async fn test_auth() {
2540        for &image in TEST_IMAGES {
2541            let reference = Reference::try_from(image).expect("failed to parse reference");
2542            let c = Client::default();
2543            let token = c
2544                .auth(
2545                    &reference,
2546                    &RegistryAuth::Anonymous,
2547                    RegistryOperation::Pull,
2548                )
2549                .await
2550                .expect("result from auth request");
2551
2552            assert!(token.is_some());
2553            check_auth_token(token.unwrap().as_ref());
2554
2555            let tok = c
2556                .tokens
2557                .get(&reference, RegistryOperation::Pull)
2558                .await
2559                .expect("token is available");
2560            // We test that the token is longer than a minimal hash.
2561            if let RegistryTokenType::Bearer(tok) = tok {
2562                check_auth_token(tok.token());
2563            } else {
2564                panic!("Unexpeted Basic Auth Token");
2565            }
2566        }
2567    }
2568
2569    #[cfg(feature = "test-registry")]
2570    #[tokio::test]
2571    async fn test_list_tags() {
2572        let test_container = registry_image_edge()
2573            .start()
2574            .await
2575            .expect("Failed to start registry container");
2576        let port = test_container
2577            .get_host_port_ipv4(5000)
2578            .await
2579            .expect("Failed to get port");
2580        let auth =
2581            RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
2582
2583        let client = Client::new(ClientConfig {
2584            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
2585            ..Default::default()
2586        });
2587
2588        let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
2589        client
2590            .auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
2591            .await
2592            .expect("cannot authenticate against registry for pull operation");
2593
2594        let (manifest, _digest) = client
2595            ._pull_image_manifest(&image)
2596            .await
2597            .expect("failed to pull manifest");
2598
2599        let image_data = client
2600            .pull(&image, &auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
2601            .await
2602            .expect("failed to pull image");
2603
2604        for i in 0..=3 {
2605            let push_image: Reference = format!("localhost:{}/hello-wasm:1.0.{}", port, i)
2606                .parse()
2607                .unwrap();
2608            client
2609                .auth(&push_image, &auth, RegistryOperation::Push)
2610                .await
2611                .expect("authenticated");
2612            client
2613                .push(
2614                    &push_image,
2615                    &image_data.layers,
2616                    image_data.config.clone(),
2617                    &auth,
2618                    Some(manifest.clone()),
2619                )
2620                .await
2621                .expect("Failed to push Image");
2622        }
2623
2624        let image: Reference = format!("localhost:{}/hello-wasm:1.0.1", port)
2625            .parse()
2626            .unwrap();
2627        let response = client
2628            .list_tags(&image, &RegistryAuth::Anonymous, Some(2), Some("1.0.1"))
2629            .await
2630            .expect("Cannot list Tags");
2631        assert_eq!(response.tags, vec!["1.0.2", "1.0.3"])
2632    }
2633
2634    #[tokio::test]
2635    async fn test_pull_manifest_private() {
2636        for &image in TEST_IMAGES {
2637            let reference = Reference::try_from(image).expect("failed to parse reference");
2638            // Currently, pull_manifest does not perform Authz, so this will fail.
2639            let c = Client::default();
2640            c._pull_image_manifest(&reference)
2641                .await
2642                .expect_err("pull manifest should fail");
2643
2644            // But this should pass
2645            let c = Client::default();
2646            c.auth(
2647                &reference,
2648                &RegistryAuth::Anonymous,
2649                RegistryOperation::Pull,
2650            )
2651            .await
2652            .expect("authenticated");
2653            let (manifest, _) = c
2654                ._pull_image_manifest(&reference)
2655                .await
2656                .expect("pull manifest should not fail");
2657
2658            // The test on the manifest checks all fields. This is just a brief sanity check.
2659            assert_eq!(manifest.schema_version, 2);
2660            assert!(!manifest.layers.is_empty());
2661        }
2662    }
2663
2664    #[tokio::test]
2665    async fn test_pull_manifest_public() {
2666        for &image in TEST_IMAGES {
2667            let reference = Reference::try_from(image).expect("failed to parse reference");
2668            let c = Client::default();
2669            let (manifest, _) = c
2670                .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
2671                .await
2672                .expect("pull manifest should not fail");
2673
2674            // The test on the manifest checks all fields. This is just a brief sanity check.
2675            assert_eq!(manifest.schema_version, 2);
2676            assert!(!manifest.layers.is_empty());
2677        }
2678    }
2679
2680    #[tokio::test]
2681    async fn pull_manifest_and_config_public() {
2682        for &image in TEST_IMAGES {
2683            let reference = Reference::try_from(image).expect("failed to parse reference");
2684            let c = Client::default();
2685            let (manifest, _, config) = c
2686                .pull_manifest_and_config(&reference, &RegistryAuth::Anonymous)
2687                .await
2688                .expect("pull manifest and config should not fail");
2689
2690            // The test on the manifest checks all fields. This is just a brief sanity check.
2691            assert_eq!(manifest.schema_version, 2);
2692            assert!(!manifest.layers.is_empty());
2693            assert!(!config.is_empty());
2694        }
2695    }
2696
2697    #[tokio::test]
2698    async fn test_fetch_digest() {
2699        let c = Client::default();
2700
2701        for &image in TEST_IMAGES {
2702            let reference = Reference::try_from(image).expect("failed to parse reference");
2703            c.fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
2704                .await
2705                .expect("pull manifest should not fail");
2706
2707            // This should pass
2708            let reference = Reference::try_from(image).expect("failed to parse reference");
2709            let c = Client::default();
2710            c.auth(
2711                &reference,
2712                &RegistryAuth::Anonymous,
2713                RegistryOperation::Pull,
2714            )
2715            .await
2716            .expect("authenticated");
2717            let digest = c
2718                .fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
2719                .await
2720                .expect("pull manifest should not fail");
2721
2722            assert_eq!(
2723                digest,
2724                "sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7"
2725            );
2726        }
2727    }
2728
2729    #[tokio::test]
2730    async fn test_pull_blob() {
2731        let c = Client::default();
2732
2733        for &image in TEST_IMAGES {
2734            let reference = Reference::try_from(image).expect("failed to parse reference");
2735            c.auth(
2736                &reference,
2737                &RegistryAuth::Anonymous,
2738                RegistryOperation::Pull,
2739            )
2740            .await
2741            .expect("authenticated");
2742            let (manifest, _) = c
2743                ._pull_image_manifest(&reference)
2744                .await
2745                .expect("failed to pull manifest");
2746
2747            // Pull one specific layer
2748            let mut file: Vec<u8> = Vec::new();
2749            let layer0 = &manifest.layers[0];
2750
2751            // This call likes to flake, so we try it at least 5 times
2752            let mut last_error = None;
2753            for i in 1..6 {
2754                if let Err(e) = c.pull_blob(&reference, layer0, &mut file).await {
2755                    println!(
2756                        "Got error on pull_blob call attempt {}. Will retry in 1s: {:?}",
2757                        i, e
2758                    );
2759                    last_error.replace(e);
2760                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2761                } else {
2762                    last_error = None;
2763                    break;
2764                }
2765            }
2766
2767            if let Some(e) = last_error {
2768                panic!("Unable to pull layer: {:?}", e);
2769            }
2770
2771            // The manifest says how many bytes we should expect.
2772            assert_eq!(file.len(), layer0.size as usize);
2773        }
2774    }
2775
2776    #[tokio::test]
2777    async fn test_pull_blob_stream() {
2778        let c = Client::default();
2779
2780        for &image in TEST_IMAGES {
2781            let reference = Reference::try_from(image).expect("failed to parse reference");
2782            c.auth(
2783                &reference,
2784                &RegistryAuth::Anonymous,
2785                RegistryOperation::Pull,
2786            )
2787            .await
2788            .expect("authenticated");
2789            let (manifest, _) = c
2790                ._pull_image_manifest(&reference)
2791                .await
2792                .expect("failed to pull manifest");
2793
2794            // Pull one specific layer
2795            let mut file: Vec<u8> = Vec::new();
2796            let layer0 = &manifest.layers[0];
2797
2798            let layer_stream = c
2799                .pull_blob_stream(&reference, layer0)
2800                .await
2801                .expect("failed to pull blob stream");
2802
2803            assert_eq!(layer_stream.content_length, Some(layer0.size as u64));
2804            AsyncReadExt::read_to_end(&mut StreamReader::new(layer_stream.stream), &mut file)
2805                .await
2806                .unwrap();
2807
2808            // The manifest says how many bytes we should expect.
2809            assert_eq!(file.len(), layer0.size as usize);
2810        }
2811    }
2812
2813    #[tokio::test]
2814    async fn test_pull_blob_stream_partial() {
2815        let c = Client::default();
2816
2817        for &image in TEST_IMAGES {
2818            let reference = Reference::try_from(image).expect("failed to parse reference");
2819            c.auth(
2820                &reference,
2821                &RegistryAuth::Anonymous,
2822                RegistryOperation::Pull,
2823            )
2824            .await
2825            .expect("authenticated");
2826            let (manifest, _) = c
2827                ._pull_image_manifest(&reference)
2828                .await
2829                .expect("failed to pull manifest");
2830
2831            // Pull part of one specific layer
2832            let mut partial_file: Vec<u8> = Vec::new();
2833            let layer0 = &manifest.layers[0];
2834            let (offset, length) = (10, 6);
2835
2836            let partial_response = c
2837                .pull_blob_stream_partial(&reference, layer0, offset, Some(length))
2838                .await
2839                .expect("failed to pull blob stream");
2840            let full_response = c
2841                .pull_blob_stream_partial(&reference, layer0, 0, Some(layer0.size as u64))
2842                .await
2843                .expect("failed to pull blob stream");
2844
2845            let layer_stream_partial = match partial_response {
2846                BlobResponse::Full(_stream) => panic!("expected partial response"),
2847                BlobResponse::Partial(stream) => stream,
2848            };
2849            assert_eq!(layer_stream_partial.content_length, Some(length));
2850            AsyncReadExt::read_to_end(
2851                &mut StreamReader::new(layer_stream_partial.stream),
2852                &mut partial_file,
2853            )
2854            .await
2855            .unwrap();
2856
2857            // Also pull the full layer into a separate file to compare with the partial.
2858            let mut full_file: Vec<u8> = Vec::new();
2859            let layer_stream_full = match full_response {
2860                BlobResponse::Full(_stream) => panic!("expected partial response"),
2861                BlobResponse::Partial(stream) => stream,
2862            };
2863            assert_eq!(layer_stream_full.content_length, Some(layer0.size as u64));
2864            AsyncReadExt::read_to_end(
2865                &mut StreamReader::new(layer_stream_full.stream),
2866                &mut full_file,
2867            )
2868            .await
2869            .unwrap();
2870
2871            // The partial read length says how many bytes we should expect.
2872            assert_eq!(partial_file.len(), length as usize);
2873            // The manifest says how many bytes we should expect on a full read.
2874            assert_eq!(full_file.len(), layer0.size as usize);
2875            // Check that the partial read retrieved the correct bytes.
2876            let end: usize = (offset + length) as usize;
2877            assert_eq!(partial_file, full_file[offset as usize..end]);
2878        }
2879    }
2880
2881    #[tokio::test]
2882    async fn test_pull() {
2883        for &image in TEST_IMAGES {
2884            let reference = Reference::try_from(image).expect("failed to parse reference");
2885
2886            // This call likes to flake, so we try it at least 5 times
2887            let mut last_error = None;
2888            let mut image_data = None;
2889            for i in 1..6 {
2890                match Client::default()
2891                    .pull(
2892                        &reference,
2893                        &RegistryAuth::Anonymous,
2894                        vec![manifest::WASM_LAYER_MEDIA_TYPE],
2895                    )
2896                    .await
2897                {
2898                    Ok(data) => {
2899                        image_data = Some(data);
2900                        last_error = None;
2901                        break;
2902                    }
2903                    Err(e) => {
2904                        println!(
2905                            "Got error on pull call attempt {}. Will retry in 1s: {:?}",
2906                            i, e
2907                        );
2908                        last_error.replace(e);
2909                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2910                    }
2911                }
2912            }
2913
2914            if let Some(e) = last_error {
2915                panic!("Unable to pull layer: {:?}", e);
2916            }
2917
2918            assert!(image_data.is_some());
2919            let image_data = image_data.unwrap();
2920            assert!(!image_data.layers.is_empty());
2921            assert!(image_data.digest.is_some());
2922        }
2923    }
2924
2925    /// Attempting to pull an image without any layer validation should fail.
2926    #[tokio::test]
2927    async fn test_pull_without_layer_validation() {
2928        for &image in TEST_IMAGES {
2929            let reference = Reference::try_from(image).expect("failed to parse reference");
2930            assert!(Client::default()
2931                .pull(&reference, &RegistryAuth::Anonymous, vec![],)
2932                .await
2933                .is_err());
2934        }
2935    }
2936
2937    /// Attempting to pull an image with the wrong list of layer validations should fail.
2938    #[tokio::test]
2939    async fn test_pull_wrong_layer_validation() {
2940        for &image in TEST_IMAGES {
2941            let reference = Reference::try_from(image).expect("failed to parse reference");
2942            assert!(Client::default()
2943                .pull(&reference, &RegistryAuth::Anonymous, vec!["text/plain"],)
2944                .await
2945                .is_err());
2946        }
2947    }
2948
2949    // This is the latest build of distribution/distribution from the `main` branch
2950    // Until distribution v3 is relased, this is the only way to have this fix
2951    // https://github.com/distribution/distribution/pull/3143
2952    //
2953    // We require this fix only when testing the capability to list tags
2954    #[cfg(feature = "test-registry")]
2955    fn registry_image_edge() -> GenericImage {
2956        GenericImage::new("distribution/distribution", "edge")
2957            .with_wait_for(WaitFor::message_on_stderr("listening on "))
2958    }
2959
2960    #[cfg(feature = "test-registry")]
2961    fn registry_image() -> GenericImage {
2962        GenericImage::new("docker.io/library/registry", "2")
2963            .with_wait_for(WaitFor::message_on_stderr("listening on "))
2964    }
2965
2966    #[cfg(feature = "test-registry")]
2967    fn registry_image_basic_auth(auth_path: &str) -> ContainerRequest<GenericImage> {
2968        GenericImage::new("docker.io/library/registry", "2")
2969            .with_wait_for(WaitFor::message_on_stderr("listening on "))
2970            .with_env_var("REGISTRY_AUTH", "htpasswd")
2971            .with_env_var("REGISTRY_AUTH_HTPASSWD_REALM", "Registry Realm")
2972            .with_env_var("REGISTRY_AUTH_HTPASSWD_PATH", "/auth/htpasswd")
2973            .with_mount(Mount::bind_mount(auth_path, "/auth"))
2974    }
2975
2976    #[tokio::test]
2977    #[cfg(feature = "test-registry")]
2978    async fn can_push_chunk() {
2979        let test_container = registry_image()
2980            .start()
2981            .await
2982            .expect("Failed to start registry container");
2983        let port = test_container
2984            .get_host_port_ipv4(5000)
2985            .await
2986            .expect("Failed to get port");
2987
2988        let c = Client::new(ClientConfig {
2989            protocol: ClientProtocol::Http,
2990            ..Default::default()
2991        });
2992        let url = format!("localhost:{}/hello-wasm:v1", port);
2993        let image: Reference = url.parse().unwrap();
2994
2995        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
2996            .await
2997            .expect("result from auth request");
2998
2999        let location = c
3000            .begin_push_chunked_session(&image)
3001            .await
3002            .expect("failed to begin push session");
3003
3004        let image_data: Vec<Vec<u8>> = vec![b"iamawebassemblymodule".to_vec()];
3005
3006        let (next_location, next_byte) = c
3007            .push_chunk(&location, &image, &image_data[0], 0)
3008            .await
3009            .expect("failed to push layer");
3010
3011        // Location should include original URL with at session ID appended
3012        assert!(next_location.len() >= url.len() + "6987887f-0196-45ee-91a1-2dfad901bea0".len());
3013        assert_eq!(
3014            next_byte,
3015            "iamawebassemblymodule".to_string().into_bytes().len()
3016        );
3017
3018        let layer_location = c
3019            .end_push_chunked_session(&next_location, &image, &sha256_digest(&image_data[0]))
3020            .await
3021            .expect("failed to end push session");
3022
3023        assert_eq!(layer_location, format!("http://localhost:{}/v2/hello-wasm/blobs/sha256:6165c4ad43c0803798b6f2e49d6348c915d52c999a5f890846cee77ea65d230b", port));
3024    }
3025
3026    #[tokio::test]
3027    #[cfg(feature = "test-registry")]
3028    async fn can_push_multiple_chunks() {
3029        let test_container = registry_image()
3030            .start()
3031            .await
3032            .expect("Failed to start registry container");
3033        let port = test_container
3034            .get_host_port_ipv4(5000)
3035            .await
3036            .expect("Failed to get port");
3037
3038        let mut c = Client::new(ClientConfig {
3039            protocol: ClientProtocol::Http,
3040            ..Default::default()
3041        });
3042        // set a super small chunk size - done to force multiple pushes
3043        c.push_chunk_size = 3;
3044        let url = format!("localhost:{}/hello-wasm:v1", port);
3045        let image: Reference = url.parse().unwrap();
3046
3047        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
3048            .await
3049            .expect("result from auth request");
3050
3051        let image_data: Vec<u8> =
3052            b"i am a big webassembly mode that needs chunked uploads".to_vec();
3053        let image_digest = sha256_digest(&image_data);
3054
3055        let location = c
3056            .push_blob_chunked(&image, &image_data, &image_digest)
3057            .await
3058            .expect("failed to begin push session");
3059
3060        assert_eq!(
3061            location,
3062            format!(
3063                "http://localhost:{}/v2/hello-wasm/blobs/{}",
3064                port, image_digest
3065            )
3066        );
3067    }
3068
3069    #[tokio::test]
3070    #[cfg(feature = "test-registry")]
3071    async fn test_image_roundtrip_anon_auth() {
3072        let test_container = registry_image()
3073            .start()
3074            .await
3075            .expect("Failed to start registry container");
3076
3077        test_image_roundtrip(&RegistryAuth::Anonymous, &test_container).await;
3078    }
3079
3080    #[tokio::test]
3081    #[cfg(feature = "test-registry")]
3082    async fn test_image_roundtrip_basic_auth() {
3083        let auth_dir = TempDir::new().expect("cannot create tmp directory");
3084        let htpasswd_path = path::Path::join(auth_dir.path(), "htpasswd");
3085        fs::write(htpasswd_path, HTPASSWD).expect("cannot write htpasswd file");
3086
3087        let image = registry_image_basic_auth(
3088            auth_dir
3089                .path()
3090                .to_str()
3091                .expect("cannot convert htpasswd_path to string"),
3092        );
3093        let test_container = image.start().await.expect("cannot registry container");
3094
3095        let auth =
3096            RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
3097
3098        test_image_roundtrip(&auth, &test_container).await;
3099    }
3100
3101    #[cfg(feature = "test-registry")]
3102    async fn test_image_roundtrip(
3103        registry_auth: &RegistryAuth,
3104        test_container: &testcontainers::ContainerAsync<GenericImage>,
3105    ) {
3106        let _ = tracing_subscriber::fmt::try_init();
3107        let port = test_container
3108            .get_host_port_ipv4(5000)
3109            .await
3110            .expect("Failed to get port");
3111
3112        let c = Client::new(ClientConfig {
3113            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3114            ..Default::default()
3115        });
3116
3117        // pulling webassembly.azurecr.io/hello-wasm:v1
3118        let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3119        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3120            .await
3121            .expect("cannot authenticate against registry for pull operation");
3122
3123        let (manifest, _digest) = c
3124            ._pull_image_manifest(&image)
3125            .await
3126            .expect("failed to pull manifest");
3127
3128        let image_data = c
3129            .pull(&image, registry_auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
3130            .await
3131            .expect("failed to pull image");
3132
3133        let push_image: Reference = format!("localhost:{}/hello-wasm:v1", port).parse().unwrap();
3134        c.auth(&push_image, registry_auth, RegistryOperation::Push)
3135            .await
3136            .expect("authenticated");
3137
3138        c.push(
3139            &push_image,
3140            &image_data.layers,
3141            image_data.config.clone(),
3142            registry_auth,
3143            Some(manifest.clone()),
3144        )
3145        .await
3146        .expect("failed to push image");
3147
3148        let pulled_image_data = c
3149            .pull(
3150                &push_image,
3151                registry_auth,
3152                vec![manifest::WASM_LAYER_MEDIA_TYPE],
3153            )
3154            .await
3155            .expect("failed to pull pushed image");
3156
3157        let (pulled_manifest, _digest) = c
3158            ._pull_image_manifest(&push_image)
3159            .await
3160            .expect("failed to pull pushed image manifest");
3161
3162        assert!(image_data.layers.len() == 1);
3163        assert!(pulled_image_data.layers.len() == 1);
3164        assert_eq!(
3165            image_data.layers[0].data.len(),
3166            pulled_image_data.layers[0].data.len()
3167        );
3168        assert_eq!(image_data.layers[0].data, pulled_image_data.layers[0].data);
3169
3170        assert_eq!(manifest.media_type, pulled_manifest.media_type);
3171        assert_eq!(manifest.schema_version, pulled_manifest.schema_version);
3172        assert_eq!(manifest.config.digest, pulled_manifest.config.digest);
3173    }
3174
3175    #[tokio::test]
3176    async fn test_raw_manifest_digest() {
3177        let _ = tracing_subscriber::fmt::try_init();
3178
3179        let c = Client::default();
3180
3181        // pulling webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7
3182        let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3183        c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3184            .await
3185            .expect("cannot authenticate against registry for pull operation");
3186
3187        let (manifest, _) = c
3188            .pull_manifest_raw(
3189                &image,
3190                &RegistryAuth::Anonymous,
3191                MIME_TYPES_DISTRIBUTION_MANIFEST,
3192            )
3193            .await
3194            .expect("failed to pull manifest");
3195
3196        // Compute the digest of the returned manifest text.
3197        let digest = sha2::Sha256::digest(manifest);
3198        let hex = format!("sha256:{:x}", digest);
3199
3200        // Validate that the computed digest and the digest in the pulled reference match.
3201        assert_eq!(image.digest().unwrap(), hex);
3202    }
3203
3204    #[tokio::test]
3205    #[cfg(feature = "test-registry")]
3206    async fn test_mount() {
3207        // initialize the registry
3208        let test_container = registry_image()
3209            .start()
3210            .await
3211            .expect("Failed to start registry");
3212        let port = test_container
3213            .get_host_port_ipv4(5000)
3214            .await
3215            .expect("Failed to get port");
3216
3217        let c = Client::new(ClientConfig {
3218            protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3219            ..Default::default()
3220        });
3221
3222        // Create a dummy layer and push it to `layer-repository`
3223        let layer_reference: Reference = format!("localhost:{}/layer-repository", port)
3224            .parse()
3225            .unwrap();
3226        let layer_data = vec![1u8, 2, 3, 4];
3227        let layer = OciDescriptor {
3228            digest: sha256_digest(&layer_data),
3229            ..Default::default()
3230        };
3231        c.push_blob(&layer_reference, &[1, 2, 3, 4], &layer.digest)
3232            .await
3233            .expect("Failed to push");
3234
3235        // Mount the layer at `image-repository`
3236        let image_reference: Reference = format!("localhost:{}/image-repository", port)
3237            .parse()
3238            .unwrap();
3239        c.mount_blob(&image_reference, &layer_reference, &layer.digest)
3240            .await
3241            .expect("Failed to mount");
3242
3243        // Pull the layer from `image-repository`
3244        let mut buf = Vec::new();
3245        c.pull_blob(&image_reference, &layer, &mut buf)
3246            .await
3247            .expect("Failed to pull");
3248
3249        assert_eq!(layer_data, buf);
3250    }
3251
3252    #[tokio::test]
3253    async fn test_platform_resolution() {
3254        // test that we get an error when we pull a manifest list
3255        let reference = Reference::try_from(DOCKER_IO_IMAGE).expect("failed to parse reference");
3256        let mut c = Client::new(ClientConfig {
3257            platform_resolver: None,
3258            ..Default::default()
3259        });
3260        let err = c
3261            .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3262            .await
3263            .unwrap_err();
3264        assert_eq!(
3265            format!("{}", err),
3266            "Received Image Index/Manifest List, but platform_resolver was not defined on the client config. Consider setting platform_resolver"
3267        );
3268
3269        c = Client::new(ClientConfig {
3270            platform_resolver: Some(Box::new(linux_amd64_resolver)),
3271            ..Default::default()
3272        });
3273        let (_manifest, digest) = c
3274            .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3275            .await
3276            .expect("Couldn't pull manifest");
3277        assert_eq!(
3278            digest,
3279            "sha256:f54a58bc1aac5ea1a25d796ae155dc228b3f0e11d046ae276b39c4bf2f13d8c4"
3280        );
3281    }
3282
3283    #[tokio::test]
3284    async fn test_pull_ghcr_io() {
3285        let reference = Reference::try_from(GHCR_IO_IMAGE).expect("failed to parse reference");
3286        let c = Client::default();
3287        let (manifest, _manifest_str) = c
3288            .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3289            .await
3290            .unwrap();
3291        assert_eq!(manifest.config.media_type, manifest::WASM_CONFIG_MEDIA_TYPE);
3292    }
3293
3294    #[tokio::test]
3295    #[ignore]
3296    async fn test_roundtrip_multiple_layers() {
3297        let _ = tracing_subscriber::fmt::try_init();
3298        let c = Client::new(ClientConfig {
3299            protocol: ClientProtocol::HttpsExcept(vec!["oci.registry.local".to_string()]),
3300            ..Default::default()
3301        });
3302        let src_image = Reference::try_from("registry:2.7.1").expect("failed to parse reference");
3303        let dest_image = Reference::try_from("oci.registry.local/registry:roundtrip-test")
3304            .expect("failed to parse reference");
3305
3306        let image = c
3307            .pull(
3308                &src_image,
3309                &RegistryAuth::Anonymous,
3310                vec![IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE],
3311            )
3312            .await
3313            .expect("Failed to pull manifest");
3314        assert!(image.layers.len() > 1);
3315
3316        let ImageData {
3317            layers,
3318            config,
3319            manifest,
3320            ..
3321        } = image;
3322        c.push(
3323            &dest_image,
3324            &layers,
3325            config,
3326            &RegistryAuth::Anonymous,
3327            manifest,
3328        )
3329        .await
3330        .expect("Failed to pull manifest");
3331
3332        c.pull_image_manifest(&dest_image, &RegistryAuth::Anonymous)
3333            .await
3334            .expect("Failed to pull manifest");
3335    }
3336
3337    #[tokio::test]
3338    async fn test_hashable_image_layer() {
3339        use itertools::Itertools;
3340
3341        // First two should be identical; others differ
3342        let image_layers = Vec::from([
3343            ImageLayer {
3344                data: Vec::from([0, 1, 2, 3]),
3345                media_type: "media_type".to_owned(),
3346                annotations: Some(BTreeMap::from([
3347                    ("0".to_owned(), "1".to_owned()),
3348                    ("2".to_owned(), "3".to_owned()),
3349                ])),
3350            },
3351            ImageLayer {
3352                data: Vec::from([0, 1, 2, 3]),
3353                media_type: "media_type".to_owned(),
3354                annotations: Some(BTreeMap::from([
3355                    ("2".to_owned(), "3".to_owned()),
3356                    ("0".to_owned(), "1".to_owned()),
3357                ])),
3358            },
3359            ImageLayer {
3360                data: Vec::from([0, 1, 2, 3]),
3361                media_type: "different_media_type".to_owned(),
3362                annotations: Some(BTreeMap::from([
3363                    ("0".to_owned(), "1".to_owned()),
3364                    ("2".to_owned(), "3".to_owned()),
3365                ])),
3366            },
3367            ImageLayer {
3368                data: Vec::from([0, 1, 2]),
3369                media_type: "media_type".to_owned(),
3370                annotations: Some(BTreeMap::from([
3371                    ("0".to_owned(), "1".to_owned()),
3372                    ("2".to_owned(), "3".to_owned()),
3373                ])),
3374            },
3375            ImageLayer {
3376                data: Vec::from([0, 1, 2, 3]),
3377                media_type: "media_type".to_owned(),
3378                annotations: Some(BTreeMap::from([
3379                    ("1".to_owned(), "0".to_owned()),
3380                    ("2".to_owned(), "3".to_owned()),
3381                ])),
3382            },
3383        ]);
3384
3385        assert_eq!(
3386            &image_layers[0], &image_layers[1],
3387            "image_layers[0] should equal image_layers[1]"
3388        );
3389        assert_ne!(
3390            &image_layers[0], &image_layers[2],
3391            "image_layers[0] should not equal image_layers[2]"
3392        );
3393        assert_ne!(
3394            &image_layers[0], &image_layers[3],
3395            "image_layers[0] should not equal image_layers[3]"
3396        );
3397        assert_ne!(
3398            &image_layers[0], &image_layers[4],
3399            "image_layers[0] should not equal image_layers[4]"
3400        );
3401        assert_ne!(
3402            &image_layers[2], &image_layers[3],
3403            "image_layers[2] should not equal image_layers[3]"
3404        );
3405        assert_ne!(
3406            &image_layers[2], &image_layers[4],
3407            "image_layers[2] should not equal image_layers[4]"
3408        );
3409        assert_ne!(
3410            &image_layers[3], &image_layers[4],
3411            "image_layers[3] should not equal image_layers[4]"
3412        );
3413
3414        let deduped: Vec<ImageLayer> = image_layers.clone().into_iter().unique().collect();
3415        assert_eq!(
3416            image_layers.len() - 1,
3417            deduped.len(),
3418            "after deduplication, there should be one less image layer"
3419        );
3420    }
3421}