1use std::collections::{BTreeMap, HashMap};
3use std::convert::TryFrom;
4use std::hash::Hash;
5use std::sync::Arc;
6use std::time::Duration;
7
8use futures_util::future;
9use futures_util::stream::{self, BoxStream, StreamExt, TryStreamExt};
10use http::header::RANGE;
11use http::{HeaderValue, StatusCode};
12use http_auth::{parser::ChallengeParser, ChallengeRef};
13use olpc_cjson::CanonicalFormatter;
14use reqwest::header::HeaderMap;
15use reqwest::{NoProxy, Proxy, RequestBuilder, Response, Url};
16use serde::Deserialize;
17use serde::Serialize;
18use tokio::io::{AsyncWrite, AsyncWriteExt};
19use tokio::sync::RwLock;
20use tracing::{debug, trace, warn};
21
22pub use crate::blob::*;
23use crate::config::ConfigFile;
24use crate::digest::{digest_header_value, validate_digest, Digest, Digester};
25use crate::errors::*;
26use crate::manifest::{
27 ImageIndexEntry, OciDescriptor, OciImageIndex, OciImageManifest, OciManifest, Versioned,
28 IMAGE_CONFIG_MEDIA_TYPE, IMAGE_LAYER_GZIP_MEDIA_TYPE, IMAGE_LAYER_MEDIA_TYPE,
29 IMAGE_MANIFEST_LIST_MEDIA_TYPE, IMAGE_MANIFEST_MEDIA_TYPE, OCI_IMAGE_INDEX_MEDIA_TYPE,
30 OCI_IMAGE_MEDIA_TYPE,
31};
32use crate::secrets::RegistryAuth;
33use crate::secrets::*;
34use crate::sha256_digest;
35use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
36use crate::Reference;
37
38const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[
39 IMAGE_MANIFEST_MEDIA_TYPE,
40 IMAGE_MANIFEST_LIST_MEDIA_TYPE,
41 OCI_IMAGE_MEDIA_TYPE,
42 OCI_IMAGE_INDEX_MEDIA_TYPE,
43];
44
45const PUSH_CHUNK_MAX_SIZE: usize = 4096 * 1024;
46
47pub const DEFAULT_MAX_CONCURRENT_UPLOAD: usize = 16;
49
50pub const DEFAULT_MAX_CONCURRENT_DOWNLOAD: usize = 16;
52
53pub const DEFAULT_TOKEN_EXPIRATION_SECS: usize = 60;
55
56static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
57
58#[derive(Clone)]
60pub struct ImageData {
61 pub layers: Vec<ImageLayer>,
63 pub digest: Option<String>,
65 pub config: Config,
67 pub manifest: Option<OciImageManifest>,
69}
70
71pub struct PushResponse {
74 pub config_url: String,
76 pub manifest_url: String,
78}
79
80#[derive(Deserialize, Debug)]
82pub struct TagResponse {
83 pub name: String,
85 pub tags: Vec<String>,
87}
88
89pub struct LayerDescriptor<'a> {
91 pub digest: &'a str,
93 pub urls: &'a Option<Vec<String>>,
95}
96
97pub trait AsLayerDescriptor {
99 fn as_layer_descriptor(&self) -> LayerDescriptor<'_>;
101}
102
103impl<T: AsLayerDescriptor> AsLayerDescriptor for &T {
104 fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
105 (*self).as_layer_descriptor()
106 }
107}
108
109impl AsLayerDescriptor for &str {
110 fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
111 LayerDescriptor {
112 digest: self,
113 urls: &None,
114 }
115 }
116}
117
118impl AsLayerDescriptor for &OciDescriptor {
119 fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
120 LayerDescriptor {
121 digest: &self.digest,
122 urls: &self.urls,
123 }
124 }
125}
126
127impl AsLayerDescriptor for &LayerDescriptor<'_> {
128 fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
129 LayerDescriptor {
130 digest: self.digest,
131 urls: self.urls,
132 }
133 }
134}
135
136#[derive(Clone, Debug, Eq, Hash, PartialEq)]
138pub struct ImageLayer {
139 pub data: Vec<u8>,
141 pub media_type: String,
143 pub annotations: Option<BTreeMap<String, String>>,
146}
147
148impl ImageLayer {
149 pub fn new(
151 data: Vec<u8>,
152 media_type: String,
153 annotations: Option<BTreeMap<String, String>>,
154 ) -> Self {
155 ImageLayer {
156 data,
157 media_type,
158 annotations,
159 }
160 }
161
162 pub fn oci_v1(data: Vec<u8>, annotations: Option<BTreeMap<String, String>>) -> Self {
165 Self::new(data, IMAGE_LAYER_MEDIA_TYPE.to_string(), annotations)
166 }
167 pub fn oci_v1_gzip(data: Vec<u8>, annotations: Option<BTreeMap<String, String>>) -> Self {
170 Self::new(data, IMAGE_LAYER_GZIP_MEDIA_TYPE.to_string(), annotations)
171 }
172
173 pub fn sha256_digest(&self) -> String {
175 sha256_digest(&self.data)
176 }
177}
178
179#[derive(Clone)]
181pub struct Config {
182 pub data: Vec<u8>,
184 pub media_type: String,
186 pub annotations: Option<BTreeMap<String, String>>,
189}
190
191impl Config {
192 pub fn new(
194 data: Vec<u8>,
195 media_type: String,
196 annotations: Option<BTreeMap<String, String>>,
197 ) -> Self {
198 Config {
199 data,
200 media_type,
201 annotations,
202 }
203 }
204
205 pub fn oci_v1(data: Vec<u8>, annotations: Option<BTreeMap<String, String>>) -> Self {
208 Self::new(data, IMAGE_CONFIG_MEDIA_TYPE.to_string(), annotations)
209 }
210
211 pub fn oci_v1_from_config_file(
214 config_file: ConfigFile,
215 annotations: Option<BTreeMap<String, String>>,
216 ) -> Result<Self> {
217 let data = serde_json::to_vec(&config_file)?;
218 Ok(Self::new(
219 data,
220 IMAGE_CONFIG_MEDIA_TYPE.to_string(),
221 annotations,
222 ))
223 }
224
225 pub fn sha256_digest(&self) -> String {
227 sha256_digest(&self.data)
228 }
229}
230
231impl TryFrom<Config> for ConfigFile {
232 type Error = crate::errors::OciDistributionError;
233
234 fn try_from(config: Config) -> Result<Self> {
235 let config = String::from_utf8(config.data)
236 .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
237 let config_file: ConfigFile = serde_json::from_str(&config)
238 .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
239 Ok(config_file)
240 }
241}
242
243#[derive(Clone)]
258pub struct Client {
259 config: Arc<ClientConfig>,
260 auth_store: Arc<RwLock<HashMap<String, RegistryAuth>>>,
262 tokens: TokenCache,
263 client: reqwest::Client,
264 push_chunk_size: usize,
265}
266
267impl Default for Client {
268 fn default() -> Self {
269 Self {
270 config: Arc::default(),
271 auth_store: Arc::default(),
272 tokens: TokenCache::new(DEFAULT_TOKEN_EXPIRATION_SECS),
273 client: reqwest::Client::default(),
274 push_chunk_size: PUSH_CHUNK_MAX_SIZE,
275 }
276 }
277}
278
279pub trait ClientConfigSource {
283 fn client_config(&self) -> ClientConfig;
285}
286
287impl TryFrom<ClientConfig> for Client {
288 type Error = OciDistributionError;
289
290 fn try_from(config: ClientConfig) -> std::result::Result<Self, Self::Error> {
291 #[allow(unused_mut)]
292 let mut client_builder = reqwest::Client::builder();
293 #[cfg(not(target_arch = "wasm32"))]
294 let mut client_builder =
295 client_builder.danger_accept_invalid_certs(config.accept_invalid_certificates);
296
297 client_builder = match () {
298 #[cfg(all(feature = "native-tls", not(target_arch = "wasm32")))]
299 () => client_builder.danger_accept_invalid_hostnames(config.accept_invalid_hostnames),
300 #[cfg(any(not(feature = "native-tls"), target_arch = "wasm32"))]
301 () => client_builder,
302 };
303
304 #[cfg(not(target_arch = "wasm32"))]
305 for c in &config.extra_root_certificates {
306 let cert = match c.encoding {
307 CertificateEncoding::Der => reqwest::Certificate::from_der(c.data.as_slice())?,
308 CertificateEncoding::Pem => reqwest::Certificate::from_pem(c.data.as_slice())?,
309 };
310 client_builder = client_builder.add_root_certificate(cert);
311 }
312
313 if let Some(timeout) = config.read_timeout {
314 client_builder = client_builder.read_timeout(timeout);
315 }
316 if let Some(timeout) = config.connect_timeout {
317 client_builder = client_builder.connect_timeout(timeout);
318 }
319
320 client_builder = client_builder.user_agent(config.user_agent);
321
322 if let Some(proxy_addr) = &config.https_proxy {
323 let no_proxy = config
324 .no_proxy
325 .as_ref()
326 .and_then(|no_proxy| NoProxy::from_string(no_proxy));
327 let proxy = Proxy::https(proxy_addr)?.no_proxy(no_proxy);
328 client_builder = client_builder.proxy(proxy);
329 }
330
331 if let Some(proxy_addr) = &config.http_proxy {
332 let no_proxy = config
333 .no_proxy
334 .as_ref()
335 .and_then(|no_proxy| NoProxy::from_string(no_proxy));
336 let proxy = Proxy::http(proxy_addr)?.no_proxy(no_proxy);
337 client_builder = client_builder.proxy(proxy);
338 }
339
340 let default_token_expiration_secs = config.default_token_expiration_secs;
341 Ok(Self {
342 config: Arc::new(config),
343 tokens: TokenCache::new(default_token_expiration_secs),
344 client: client_builder.build()?,
345 push_chunk_size: PUSH_CHUNK_MAX_SIZE,
346 ..Default::default()
347 })
348 }
349}
350
351impl Client {
352 pub fn new(config: ClientConfig) -> Self {
354 let default_token_expiration_secs = config.default_token_expiration_secs;
355 Client::try_from(config).unwrap_or_else(|err| {
356 warn!("Cannot create OCI client from config: {:?}", err);
357 warn!("Creating client with default configuration");
358 Self {
359 tokens: TokenCache::new(default_token_expiration_secs),
360 push_chunk_size: PUSH_CHUNK_MAX_SIZE,
361 ..Default::default()
362 }
363 })
364 }
365
366 pub fn from_source(config_source: &impl ClientConfigSource) -> Self {
368 Self::new(config_source.client_config())
369 }
370
371 async fn store_auth(&self, registry: &str, auth: RegistryAuth) {
372 self.auth_store
373 .write()
374 .await
375 .insert(registry.to_string(), auth);
376 }
377
378 async fn is_stored_auth(&self, registry: &str) -> bool {
379 self.auth_store.read().await.contains_key(registry)
380 }
381
382 pub async fn store_auth_if_needed(&self, registry: &str, auth: &RegistryAuth) {
391 if !self.is_stored_auth(registry).await {
392 self.store_auth(registry, auth.clone()).await;
393 }
394 }
395
396 async fn get_auth_token(
398 &self,
399 reference: &Reference,
400 op: RegistryOperation,
401 ) -> Option<RegistryTokenType> {
402 let registry = reference.resolve_registry();
403 let auth = self.auth_store.read().await.get(registry)?.clone();
404 match self.tokens.get(reference, op).await {
405 Some(token) => Some(token),
406 None => {
407 let token = self._auth(reference, &auth, op).await.ok()??;
408 self.tokens.insert(reference, op, token.clone()).await;
409 Some(token)
410 }
411 }
412 }
413
414 pub async fn list_tags(
419 &self,
420 image: &Reference,
421 auth: &RegistryAuth,
422 n: Option<usize>,
423 last: Option<&str>,
424 ) -> Result<TagResponse> {
425 let op = RegistryOperation::Pull;
426 let url = self.to_list_tags_url(image);
427
428 self.store_auth_if_needed(image.resolve_registry(), auth)
429 .await;
430
431 let request = self.client.get(&url);
432 let request = if let Some(num) = n {
433 request.query(&[("n", num)])
434 } else {
435 request
436 };
437 let request = if let Some(l) = last {
438 request.query(&[("last", l)])
439 } else {
440 request
441 };
442 let request = RequestBuilderWrapper {
443 client: self,
444 request_builder: request,
445 };
446 let res = request
447 .apply_auth(image, op)
448 .await?
449 .into_request_builder()
450 .send()
451 .await?;
452 let status = res.status();
453 let body = res.bytes().await?;
454
455 validate_registry_response(status, &body, &url)?;
456
457 Ok(serde_json::from_str(std::str::from_utf8(&body)?)?)
458 }
459
460 pub async fn pull(
465 &self,
466 image: &Reference,
467 auth: &RegistryAuth,
468 accepted_media_types: Vec<&str>,
469 ) -> Result<ImageData> {
470 debug!("Pulling image: {:?}", image);
471 self.store_auth_if_needed(image.resolve_registry(), auth)
472 .await;
473
474 let (manifest, digest, config) = self._pull_manifest_and_config(image).await?;
475
476 self.validate_layers(&manifest, accepted_media_types)
477 .await?;
478
479 let layers = stream::iter(&manifest.layers)
480 .map(|layer| {
481 let this = &self;
485 async move {
486 let mut out: Vec<u8> = Vec::new();
487 debug!("Pulling image layer");
488 this.pull_blob(image, layer, &mut out).await?;
489 Ok::<_, OciDistributionError>(ImageLayer::new(
490 out,
491 layer.media_type.clone(),
492 layer.annotations.clone(),
493 ))
494 }
495 })
496 .boxed() .buffer_unordered(self.config.max_concurrent_download)
498 .try_collect()
499 .await?;
500
501 Ok(ImageData {
502 layers,
503 manifest: Some(manifest),
504 config,
505 digest: Some(digest),
506 })
507 }
508
509 pub async fn push(
519 &self,
520 image_ref: &Reference,
521 layers: &[ImageLayer],
522 config: Config,
523 auth: &RegistryAuth,
524 manifest: Option<OciImageManifest>,
525 ) -> Result<PushResponse> {
526 debug!("Pushing image: {:?}", image_ref);
527 self.store_auth_if_needed(image_ref.resolve_registry(), auth)
528 .await;
529
530 let manifest: OciImageManifest = match manifest {
531 Some(m) => m,
532 None => OciImageManifest::build(layers, &config, None),
533 };
534
535 stream::iter(layers)
537 .map(|layer| {
538 let this = &self;
542 async move {
543 let digest = layer.sha256_digest();
544 this.push_blob(image_ref, &layer.data, &digest).await?;
545 Result::Ok(())
546 }
547 })
548 .boxed() .buffer_unordered(self.config.max_concurrent_upload)
550 .try_for_each(future::ok)
551 .await?;
552
553 let config_url = self
554 .push_blob(image_ref, &config.data, &manifest.config.digest)
555 .await?;
556 let manifest_url = self.push_manifest(image_ref, &manifest.into()).await?;
557
558 Ok(PushResponse {
559 config_url,
560 manifest_url,
561 })
562 }
563
564 pub async fn push_blob(
566 &self,
567 image_ref: &Reference,
568 data: &[u8],
569 digest: &str,
570 ) -> Result<String> {
571 if self.config.use_monolithic_push {
572 return self.push_blob_monolithically(image_ref, data, digest).await;
573 }
574
575 match self.push_blob_chunked(image_ref, data, digest).await {
576 Ok(url) => Ok(url),
577 Err(OciDistributionError::SpecViolationError(violation)) => {
578 warn!(?violation, "Registry is not respecting the OCI Distribution Specification when doing chunked push operations");
579 warn!("Attempting monolithic push");
580 self.push_blob_monolithically(image_ref, data, digest).await
581 }
582 Err(e) => Err(e),
583 }
584 }
585
586 async fn push_blob_monolithically(
590 &self,
591 image: &Reference,
592 blob_data: &[u8],
593 blob_digest: &str,
594 ) -> Result<String> {
595 let location = self.begin_push_monolithical_session(image).await?;
596 self.push_monolithically(&location, image, blob_data, blob_digest)
597 .await
598 }
599
600 async fn push_blob_chunked(
604 &self,
605 image: &Reference,
606 blob_data: &[u8],
607 blob_digest: &str,
608 ) -> Result<String> {
609 let mut location = self.begin_push_chunked_session(image).await?;
610 let mut start: usize = 0;
611 loop {
612 (location, start) = self.push_chunk(&location, image, blob_data, start).await?;
613 if start >= blob_data.len() {
614 break;
615 }
616 }
617 self.end_push_chunked_session(&location, image, blob_digest)
618 .await
619 }
620
621 pub async fn auth(
626 &self,
627 image: &Reference,
628 authentication: &RegistryAuth,
629 operation: RegistryOperation,
630 ) -> Result<Option<String>> {
631 self.store_auth_if_needed(image.resolve_registry(), authentication)
632 .await;
633 match self._auth(image, authentication, operation).await {
635 Ok(Some(RegistryTokenType::Bearer(token))) => {
636 self.tokens
637 .insert(image, operation, RegistryTokenType::Bearer(token.clone()))
638 .await;
639 Ok(Some(token.token().to_string()))
640 }
641 Ok(Some(RegistryTokenType::Basic(username, password))) => {
642 self.tokens
643 .insert(
644 image,
645 operation,
646 RegistryTokenType::Basic(username, password),
647 )
648 .await;
649 Ok(None)
650 }
651 Ok(None) => Ok(None),
652 Err(e) => Err(e),
653 }
654 }
655
656 async fn _auth(
658 &self,
659 image: &Reference,
660 authentication: &RegistryAuth,
661 operation: RegistryOperation,
662 ) -> Result<Option<RegistryTokenType>> {
663 debug!("Authorizing for image: {:?}", image);
664 let url = format!(
666 "{}://{}/v2/",
667 self.config.protocol.scheme_for(image.resolve_registry()),
668 image.resolve_registry()
669 );
670 debug!(?url);
671
672 if let RegistryAuth::Bearer(token) = authentication {
673 return Ok(Some(RegistryTokenType::Bearer(RegistryToken::Token {
674 token: token.clone(),
675 })));
676 }
677
678 let res = self.client.get(&url).send().await?;
679 let dist_hdr = match res.headers().get(reqwest::header::WWW_AUTHENTICATE) {
680 Some(h) => h,
681 None => return Ok(None),
682 };
683
684 let challenge = match BearerChallenge::try_from(dist_hdr) {
685 Ok(c) => c,
686 Err(e) => {
687 debug!(error = ?e, "Falling back to HTTP Basic Auth");
688 if let RegistryAuth::Basic(username, password) = authentication {
689 return Ok(Some(RegistryTokenType::Basic(
690 username.to_string(),
691 password.to_string(),
692 )));
693 }
694 return Ok(None);
695 }
696 };
697
698 let scope = match operation {
700 RegistryOperation::Pull => format!("repository:{}:pull", image.repository()),
701 RegistryOperation::Push => format!("repository:{}:pull,push", image.repository()),
702 };
703
704 let realm = challenge.realm.as_ref();
705 let service = challenge.service.as_ref();
706 let mut query = vec![("scope", &scope)];
707
708 if let Some(s) = service {
709 query.push(("service", s))
710 }
711
712 debug!(?realm, ?service, ?scope, "Making authentication call");
715
716 let auth_res = self
717 .client
718 .get(realm)
719 .query(&query)
720 .apply_authentication(authentication)
721 .send()
722 .await?;
723
724 match auth_res.status() {
725 reqwest::StatusCode::OK => {
726 let text = auth_res.text().await?;
727 debug!("Received response from auth request: {}", text);
728 let token: RegistryToken = serde_json::from_str(&text)
729 .map_err(|e| OciDistributionError::RegistryTokenDecodeError(e.to_string()))?;
730 debug!("Successfully authorized for image '{:?}'", image);
731 Ok(Some(RegistryTokenType::Bearer(token)))
732 }
733 _ => {
734 let reason = auth_res.text().await?;
735 debug!("Failed to authenticate for image '{:?}': {}", image, reason);
736 Err(OciDistributionError::AuthenticationFailure(reason))
737 }
738 }
739 }
740
741 pub async fn fetch_manifest_digest(
750 &self,
751 image: &Reference,
752 auth: &RegistryAuth,
753 ) -> Result<String> {
754 self.store_auth_if_needed(image.resolve_registry(), auth)
755 .await;
756
757 let url = self.to_v2_manifest_url(image);
758 debug!("HEAD image manifest from {}", url);
759 let res = RequestBuilderWrapper::from_client(self, |client| client.head(&url))
760 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
761 .apply_auth(image, RegistryOperation::Pull)
762 .await?
763 .into_request_builder()
764 .send()
765 .await?;
766
767 if let Some(digest) = digest_header_value(res.headers().clone())? {
768 let status = res.status();
769 let body = res.bytes().await?;
770 validate_registry_response(status, &body, &url)?;
771
772 if let Some(img_digest) = image.digest() {
775 let header_digest = Digest::new(&digest)?;
776 let image_digest = Digest::new(img_digest)?;
777 if header_digest.algorithm == image_digest.algorithm
778 && header_digest != image_digest
779 {
780 return Err(DigestError::VerificationError {
781 expected: img_digest.to_string(),
782 actual: digest,
783 }
784 .into());
785 }
786 }
787
788 Ok(digest)
789 } else {
790 debug!("GET image manifest from {}", url);
791 let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
792 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
793 .apply_auth(image, RegistryOperation::Pull)
794 .await?
795 .into_request_builder()
796 .send()
797 .await?;
798 let status = res.status();
799 trace!(headers = ?res.headers(), "Got Headers");
800 let headers = res.headers().clone();
801 let body = res.bytes().await?;
802 validate_registry_response(status, &body, &url)?;
803
804 validate_digest(&body, digest_header_value(headers)?, image.digest())
805 .map_err(OciDistributionError::from)
806 }
807 }
808
809 async fn validate_layers(
810 &self,
811 manifest: &OciImageManifest,
812 accepted_media_types: Vec<&str>,
813 ) -> Result<()> {
814 if manifest.layers.is_empty() {
815 return Err(OciDistributionError::PullNoLayersError);
816 }
817
818 for layer in &manifest.layers {
819 if !accepted_media_types.iter().any(|i| i.eq(&layer.media_type)) {
820 return Err(OciDistributionError::IncompatibleLayerMediaTypeError(
821 layer.media_type.clone(),
822 ));
823 }
824 }
825
826 Ok(())
827 }
828
829 pub async fn pull_image_manifest(
840 &self,
841 image: &Reference,
842 auth: &RegistryAuth,
843 ) -> Result<(OciImageManifest, String)> {
844 self.store_auth_if_needed(image.resolve_registry(), auth)
845 .await;
846
847 self._pull_image_manifest(image).await
848 }
849
850 pub async fn pull_manifest_raw(
858 &self,
859 image: &Reference,
860 auth: &RegistryAuth,
861 accepted_media_types: &[&str],
862 ) -> Result<(Vec<u8>, String)> {
863 self.store_auth_if_needed(image.resolve_registry(), auth)
864 .await;
865
866 self._pull_manifest_raw(image, accepted_media_types).await
867 }
868
869 pub async fn pull_manifest(
877 &self,
878 image: &Reference,
879 auth: &RegistryAuth,
880 ) -> Result<(OciManifest, String)> {
881 self.store_auth_if_needed(image.resolve_registry(), auth)
882 .await;
883
884 self._pull_manifest(image).await
885 }
886
887 async fn _pull_image_manifest(&self, image: &Reference) -> Result<(OciImageManifest, String)> {
895 let (manifest, digest) = self._pull_manifest(image).await?;
896 match manifest {
897 OciManifest::Image(image_manifest) => Ok((image_manifest, digest)),
898 OciManifest::ImageIndex(image_index_manifest) => {
899 debug!("Inspecting Image Index Manifest");
900 let digest = if let Some(resolver) = &self.config.platform_resolver {
901 resolver(&image_index_manifest.manifests)
902 } else {
903 return Err(OciDistributionError::ImageIndexParsingNoPlatformResolverError);
904 };
905
906 match digest {
907 Some(digest) => {
908 debug!("Selected manifest entry with digest: {}", digest);
909 let manifest_entry_reference = image.clone_with_digest(digest.clone());
910 self._pull_manifest(&manifest_entry_reference)
911 .await
912 .and_then(|(manifest, _digest)| match manifest {
913 OciManifest::Image(manifest) => Ok((manifest, digest)),
914 OciManifest::ImageIndex(_) => {
915 Err(OciDistributionError::ImageManifestNotFoundError(
916 "received Image Index manifest instead".to_string(),
917 ))
918 }
919 })
920 }
921 None => Err(OciDistributionError::ImageManifestNotFoundError(
922 "no entry found in image index manifest matching client's default platform"
923 .to_string(),
924 )),
925 }
926 }
927 }
928 }
929
930 async fn _pull_manifest_raw(
935 &self,
936 image: &Reference,
937 accepted_media_types: &[&str],
938 ) -> Result<(Vec<u8>, String)> {
939 let url = self.to_v2_manifest_url(image);
940 debug!("Pulling image manifest from {}", url);
941
942 let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
943 .apply_accept(accepted_media_types)?
944 .apply_auth(image, RegistryOperation::Pull)
945 .await?
946 .into_request_builder()
947 .send()
948 .await?;
949 let status = res.status();
950 let headers = res.headers().clone();
951 let body = res.bytes().await?;
952
953 validate_registry_response(status, &body, &url)?;
954
955 let digest_header = digest_header_value(headers)?;
956 let digest = validate_digest(&body, digest_header, image.digest())?;
957
958 Ok((body.to_vec(), digest))
959 }
960
961 async fn _pull_manifest(&self, image: &Reference) -> Result<(OciManifest, String)> {
966 let (body, digest) = self
967 ._pull_manifest_raw(image, MIME_TYPES_DISTRIBUTION_MANIFEST)
968 .await?;
969
970 self.validate_image_manifest(&body).await?;
971
972 debug!("Parsing response as Manifest");
973 let manifest = serde_json::from_slice(&body)
974 .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
975 Ok((manifest, digest))
976 }
977
978 async fn validate_image_manifest(&self, body: &[u8]) -> Result<()> {
979 let versioned: Versioned = serde_json::from_slice(body)
980 .map_err(|e| OciDistributionError::VersionedParsingError(e.to_string()))?;
981 debug!(?versioned, "validating manifest");
982 if versioned.schema_version != 2 {
983 return Err(OciDistributionError::UnsupportedSchemaVersionError(
984 versioned.schema_version,
985 ));
986 }
987 if let Some(media_type) = versioned.media_type {
988 if media_type != IMAGE_MANIFEST_MEDIA_TYPE
989 && media_type != OCI_IMAGE_MEDIA_TYPE
990 && media_type != IMAGE_MANIFEST_LIST_MEDIA_TYPE
991 && media_type != OCI_IMAGE_INDEX_MEDIA_TYPE
992 {
993 return Err(OciDistributionError::UnsupportedMediaTypeError(media_type));
994 }
995 }
996
997 Ok(())
998 }
999
1000 pub async fn pull_manifest_and_config(
1009 &self,
1010 image: &Reference,
1011 auth: &RegistryAuth,
1012 ) -> Result<(OciImageManifest, String, String)> {
1013 self.store_auth_if_needed(image.resolve_registry(), auth)
1014 .await;
1015
1016 self._pull_manifest_and_config(image)
1017 .await
1018 .and_then(|(manifest, digest, config)| {
1019 Ok((
1020 manifest,
1021 digest,
1022 String::from_utf8(config.data).map_err(|e| {
1023 OciDistributionError::GenericError(Some(format!(
1024 "Cannot not UTF8 compliant: {}",
1025 e
1026 )))
1027 })?,
1028 ))
1029 })
1030 }
1031
1032 async fn _pull_manifest_and_config(
1033 &self,
1034 image: &Reference,
1035 ) -> Result<(OciImageManifest, String, Config)> {
1036 let (manifest, digest) = self._pull_image_manifest(image).await?;
1037
1038 let mut out: Vec<u8> = Vec::new();
1039 debug!("Pulling config layer");
1040 self.pull_blob(image, &manifest.config, &mut out).await?;
1041 let media_type = manifest.config.media_type.clone();
1042 let annotations = manifest.annotations.clone();
1043 Ok((manifest, digest, Config::new(out, media_type, annotations)))
1044 }
1045
1046 pub async fn push_manifest_list(
1050 &self,
1051 reference: &Reference,
1052 auth: &RegistryAuth,
1053 manifest: OciImageIndex,
1054 ) -> Result<String> {
1055 self.store_auth_if_needed(reference.resolve_registry(), auth)
1056 .await;
1057 self.push_manifest(reference, &OciManifest::ImageIndex(manifest))
1058 .await
1059 }
1060
1061 pub async fn pull_blob<T: AsyncWrite + Unpin>(
1069 &self,
1070 image: &Reference,
1071 layer: impl AsLayerDescriptor,
1072 mut out: T,
1073 ) -> Result<()> {
1074 let response = self.pull_blob_response(image, &layer, None, None).await?;
1075
1076 let mut maybe_header_digester = digest_header_value(response.headers().clone())?
1077 .map(|digest| Digester::new(&digest).map(|d| (d, digest)))
1078 .transpose()?;
1079
1080 let layer_digest = layer.as_layer_descriptor().digest.to_string();
1082 let mut layer_digester = Digester::new(&layer_digest)?;
1083
1084 let mut stream = response.error_for_status()?.bytes_stream();
1085
1086 while let Some(bytes) = stream.next().await {
1087 let bytes = bytes?;
1088 if let Some((ref mut digester, _)) = maybe_header_digester.as_mut() {
1089 digester.update(&bytes);
1090 }
1091 layer_digester.update(&bytes);
1092 out.write_all(&bytes).await?;
1093 }
1094
1095 if let Some((mut digester, expected)) = maybe_header_digester.take() {
1096 let digest = digester.finalize();
1097
1098 if digest != expected {
1099 return Err(DigestError::VerificationError {
1100 expected,
1101 actual: digest,
1102 }
1103 .into());
1104 }
1105 }
1106
1107 let digest = layer_digester.finalize();
1108 if digest != layer_digest {
1109 return Err(DigestError::VerificationError {
1110 expected: layer_digest,
1111 actual: digest,
1112 }
1113 .into());
1114 }
1115
1116 Ok(())
1117 }
1118
1119 pub async fn pull_blob_stream(
1149 &self,
1150 image: &Reference,
1151 layer: impl AsLayerDescriptor,
1152 ) -> Result<SizedStream> {
1153 stream_from_response(
1154 self.pull_blob_response(image, &layer, None, None).await?,
1155 layer,
1156 true,
1157 )
1158 }
1159
1160 pub async fn pull_blob_stream_partial(
1170 &self,
1171 image: &Reference,
1172 layer: impl AsLayerDescriptor,
1173 offset: u64,
1174 length: Option<u64>,
1175 ) -> Result<BlobResponse> {
1176 let response = self
1177 .pull_blob_response(image, &layer, Some(offset), length)
1178 .await?;
1179
1180 let status = response.status();
1181 match status {
1182 StatusCode::OK => Ok(BlobResponse::Full(stream_from_response(
1183 response, &layer, true,
1184 )?)),
1185 StatusCode::PARTIAL_CONTENT => Ok(BlobResponse::Partial(stream_from_response(
1186 response, &layer, false,
1187 )?)),
1188 _ => Err(OciDistributionError::ServerError {
1189 code: status.as_u16(),
1190 url: response.url().to_string(),
1191 message: response.text().await?,
1192 }),
1193 }
1194 }
1195
1196 async fn pull_blob_response(
1198 &self,
1199 image: &Reference,
1200 layer: impl AsLayerDescriptor,
1201 offset: Option<u64>,
1202 length: Option<u64>,
1203 ) -> Result<Response> {
1204 let layer = layer.as_layer_descriptor();
1205 let url = self.to_v2_blob_url(image, layer.digest);
1206
1207 let mut request = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1208 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1209 .apply_auth(image, RegistryOperation::Pull)
1210 .await?
1211 .into_request_builder();
1212 if let (Some(off), Some(len)) = (offset, length) {
1213 let end = (off + len).saturating_sub(1);
1214 request = request.header(
1215 RANGE,
1216 HeaderValue::from_str(&format!("bytes={off}-{end}")).unwrap(),
1217 );
1218 } else if let Some(offset) = offset {
1219 request = request.header(
1220 RANGE,
1221 HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1222 );
1223 }
1224 let mut response = request.send().await?;
1225
1226 if let Some(urls) = &layer.urls {
1227 for url in urls {
1228 if response.error_for_status_ref().is_ok() {
1229 break;
1230 }
1231
1232 let url = Url::parse(url)
1233 .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1234
1235 if url.scheme() == "http" || url.scheme() == "https" {
1236 request =
1240 RequestBuilderWrapper::from_client(self, |client| client.get(url.clone()))
1241 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1242 .into_request_builder();
1243 if let Some(offset) = offset {
1244 request = request.header(
1245 RANGE,
1246 HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1247 );
1248 }
1249 response = request.send().await?
1250 }
1251 }
1252 }
1253
1254 Ok(response)
1255 }
1256
1257 async fn begin_push_monolithical_session(&self, image: &Reference) -> Result<String> {
1261 let url = &self.to_v2_blob_upload_url(image);
1262 debug!(?url, "begin_push_monolithical_session");
1263 let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1264 .apply_auth(image, RegistryOperation::Push)
1265 .await?
1266 .into_request_builder()
1267 .header("Content-Length", 0)
1272 .send()
1273 .await?;
1274
1275 self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1277 .await
1278 }
1279
1280 async fn begin_push_chunked_session(&self, image: &Reference) -> Result<String> {
1284 let url = &self.to_v2_blob_upload_url(image);
1285 debug!(?url, "begin_push_session");
1286 let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
1287 .apply_auth(image, RegistryOperation::Push)
1288 .await?
1289 .into_request_builder()
1290 .header("Content-Length", 0)
1291 .send()
1292 .await?;
1293
1294 self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1296 .await
1297 }
1298
1299 async fn end_push_chunked_session(
1303 &self,
1304 location: &str,
1305 image: &Reference,
1306 digest: &str,
1307 ) -> Result<String> {
1308 let url = Url::parse_with_params(location, &[("digest", digest)])
1309 .map_err(|e| OciDistributionError::GenericError(Some(e.to_string())))?;
1310 let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1311 .apply_auth(image, RegistryOperation::Push)
1312 .await?
1313 .into_request_builder()
1314 .header("Content-Length", 0)
1315 .send()
1316 .await?;
1317 self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1318 .await
1319 }
1320
1321 async fn push_monolithically(
1325 &self,
1326 location: &str,
1327 image: &Reference,
1328 layer: &[u8],
1329 blob_digest: &str,
1330 ) -> Result<String> {
1331 let mut url = Url::parse(location).unwrap();
1332 url.query_pairs_mut().append_pair("digest", blob_digest);
1333 let url = url.to_string();
1334
1335 debug!(size = layer.len(), location = ?url, "Pushing monolithically");
1336 if layer.is_empty() {
1337 return Err(OciDistributionError::PushNoDataError);
1338 };
1339 let mut headers = HeaderMap::new();
1340 headers.insert(
1341 "Content-Length",
1342 format!("{}", layer.len()).parse().unwrap(),
1343 );
1344 headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1345
1346 let res = RequestBuilderWrapper::from_client(self, |client| client.put(&url))
1347 .apply_auth(image, RegistryOperation::Push)
1348 .await?
1349 .into_request_builder()
1350 .headers(headers)
1351 .body(layer.to_vec())
1352 .send()
1353 .await?;
1354
1355 self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1357 .await
1358 }
1359
1360 async fn push_chunk(
1365 &self,
1366 location: &str,
1367 image: &Reference,
1368 blob_data: &[u8],
1369 start_byte: usize,
1370 ) -> Result<(String, usize)> {
1371 if blob_data.is_empty() {
1372 return Err(OciDistributionError::PushNoDataError);
1373 };
1374 let end_byte = if (start_byte + self.push_chunk_size) < blob_data.len() {
1375 start_byte + self.push_chunk_size - 1
1376 } else {
1377 blob_data.len() - 1
1378 };
1379 let body = blob_data[start_byte..end_byte + 1].to_vec();
1380 let mut headers = HeaderMap::new();
1381 headers.insert(
1382 "Content-Range",
1383 format!("{}-{}", start_byte, end_byte).parse().unwrap(),
1384 );
1385 headers.insert("Content-Length", format!("{}", body.len()).parse().unwrap());
1386 headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
1387
1388 debug!(
1389 ?start_byte,
1390 ?end_byte,
1391 blob_data_len = blob_data.len(),
1392 body_len = body.len(),
1393 ?location,
1394 ?headers,
1395 "Pushing chunk"
1396 );
1397
1398 let res = RequestBuilderWrapper::from_client(self, |client| client.patch(location))
1399 .apply_auth(image, RegistryOperation::Push)
1400 .await?
1401 .into_request_builder()
1402 .headers(headers)
1403 .body(body)
1404 .send()
1405 .await?;
1406
1407 Ok((
1409 self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
1410 .await?,
1411 end_byte + 1,
1412 ))
1413 }
1414
1415 pub async fn mount_blob(
1417 &self,
1418 image: &Reference,
1419 source: &Reference,
1420 digest: &str,
1421 ) -> Result<()> {
1422 let base_url = self.to_v2_blob_upload_url(image);
1423 let url = Url::parse_with_params(
1424 &base_url,
1425 &[("mount", digest), ("from", source.repository())],
1426 )
1427 .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1428
1429 let res = RequestBuilderWrapper::from_client(self, |client| client.post(url.clone()))
1430 .apply_auth(image, RegistryOperation::Push)
1431 .await?
1432 .into_request_builder()
1433 .send()
1434 .await?;
1435
1436 self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1437 .await?;
1438
1439 Ok(())
1440 }
1441
1442 pub async fn push_manifest(&self, image: &Reference, manifest: &OciManifest) -> Result<String> {
1446 let mut headers = HeaderMap::new();
1447 let content_type = manifest.content_type();
1448 headers.insert("Content-Type", content_type.parse().unwrap());
1449
1450 let mut body = Vec::new();
1453 let mut ser = serde_json::Serializer::with_formatter(&mut body, CanonicalFormatter::new());
1454 manifest.serialize(&mut ser).unwrap();
1455
1456 self.push_manifest_raw(image, body, manifest.content_type().parse().unwrap())
1457 .await
1458 }
1459
1460 pub async fn push_manifest_raw(
1464 &self,
1465 image: &Reference,
1466 body: Vec<u8>,
1467 content_type: HeaderValue,
1468 ) -> Result<String> {
1469 let url = self.to_v2_manifest_url(image);
1470 debug!(?url, ?content_type, "push manifest");
1471
1472 let mut headers = HeaderMap::new();
1473 headers.insert("Content-Type", content_type);
1474
1475 let manifest_hash = sha256_digest(&body);
1479
1480 let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
1481 .apply_auth(image, RegistryOperation::Push)
1482 .await?
1483 .into_request_builder()
1484 .headers(headers)
1485 .body(body)
1486 .send()
1487 .await?;
1488
1489 let ret = self
1490 .extract_location_header(image, res, &reqwest::StatusCode::CREATED)
1491 .await;
1492
1493 if matches!(ret, Err(OciDistributionError::RegistryNoLocationError)) {
1494 warn!("Registry is not respecting the OCI Distribution Specification: it didn't return the Location of the uploaded Manifest inside of the response headers. Working around this issue...");
1502
1503 let url_base = url
1504 .strip_suffix(image.tag().unwrap_or("latest"))
1505 .expect("The manifest URL always ends with the image tag suffix");
1506 let url_by_digest = format!("{}{}", url_base, manifest_hash);
1507
1508 return Ok(url_by_digest);
1509 }
1510
1511 ret
1512 }
1513
1514 pub async fn pull_referrers(
1516 &self,
1517 image: &Reference,
1518 artifact_type: Option<&str>,
1519 ) -> Result<OciImageIndex> {
1520 let url = self.to_v2_referrers_url(image, artifact_type)?;
1521 debug!("Pulling referrers from {}", url);
1522
1523 let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1524 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1525 .apply_auth(image, RegistryOperation::Pull)
1526 .await?
1527 .into_request_builder()
1528 .send()
1529 .await?;
1530 let status = res.status();
1531 let body = res.bytes().await?;
1532
1533 validate_registry_response(status, &body, &url)?;
1534 let manifest = serde_json::from_slice(&body)
1535 .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
1536
1537 Ok(manifest)
1538 }
1539
1540 async fn extract_location_header(
1541 &self,
1542 image: &Reference,
1543 res: reqwest::Response,
1544 expected_status: &reqwest::StatusCode,
1545 ) -> Result<String> {
1546 debug!(expected_status_code=?expected_status.as_u16(),
1547 status_code=?res.status().as_u16(),
1548 "extract location header");
1549 if res.status().eq(expected_status) {
1550 let location_header = res.headers().get("Location");
1551 debug!(location=?location_header, "Location header");
1552 match location_header {
1553 None => Err(OciDistributionError::RegistryNoLocationError),
1554 Some(lh) => self.location_header_to_url(image, lh),
1555 }
1556 } else if res.status().is_success() && expected_status.is_success() {
1557 Err(OciDistributionError::SpecViolationError(format!(
1558 "Expected HTTP Status {}, got {} instead",
1559 expected_status,
1560 res.status(),
1561 )))
1562 } else {
1563 let url = res.url().to_string();
1564 let code = res.status().as_u16();
1565 let message = res.text().await?;
1566 Err(OciDistributionError::ServerError { url, code, message })
1567 }
1568 }
1569
1570 fn location_header_to_url(
1575 &self,
1576 image: &Reference,
1577 location_header: &reqwest::header::HeaderValue,
1578 ) -> Result<String> {
1579 let lh = location_header.to_str()?;
1580 if lh.starts_with("/") {
1581 let registry = image.resolve_registry();
1582 Ok(format!(
1583 "{scheme}://{registry}{lh}",
1584 scheme = self.config.protocol.scheme_for(registry)
1585 ))
1586 } else {
1587 Ok(lh.to_string())
1588 }
1589 }
1590
1591 fn to_v2_manifest_url(&self, reference: &Reference) -> String {
1593 let registry = reference.resolve_registry();
1594 format!(
1595 "{scheme}://{registry}/v2/{repository}/manifests/{reference}{ns}",
1596 scheme = self.config.protocol.scheme_for(registry),
1597 repository = reference.repository(),
1598 reference = if let Some(digest) = reference.digest() {
1599 digest
1600 } else {
1601 reference.tag().unwrap_or("latest")
1602 },
1603 ns = reference
1604 .namespace()
1605 .map(|ns| format!("?ns={ns}"))
1606 .unwrap_or_default(),
1607 )
1608 }
1609
1610 fn to_v2_blob_url(&self, reference: &Reference, digest: &str) -> String {
1612 let registry = reference.resolve_registry();
1613 format!(
1614 "{scheme}://{registry}/v2/{repository}/blobs/{digest}{ns}",
1615 scheme = self.config.protocol.scheme_for(registry),
1616 repository = reference.repository(),
1617 ns = reference
1618 .namespace()
1619 .map(|ns| format!("?ns={ns}"))
1620 .unwrap_or_default(),
1621 )
1622 }
1623
1624 fn to_v2_blob_upload_url(&self, reference: &Reference) -> String {
1626 self.to_v2_blob_url(reference, "uploads/")
1627 }
1628
1629 fn to_list_tags_url(&self, reference: &Reference) -> String {
1630 let registry = reference.resolve_registry();
1631 format!(
1632 "{scheme}://{registry}/v2/{repository}/tags/list{ns}",
1633 scheme = self.config.protocol.scheme_for(registry),
1634 repository = reference.repository(),
1635 ns = reference
1636 .namespace()
1637 .map(|ns| format!("?ns={ns}"))
1638 .unwrap_or_default(),
1639 )
1640 }
1641
1642 fn to_v2_referrers_url(
1644 &self,
1645 reference: &Reference,
1646 artifact_type: Option<&str>,
1647 ) -> Result<String> {
1648 let registry = reference.resolve_registry();
1649 Ok(format!(
1650 "{scheme}://{registry}/v2/{repository}/referrers/{reference}{at}",
1651 scheme = self.config.protocol.scheme_for(registry),
1652 repository = reference.repository(),
1653 reference = if let Some(digest) = reference.digest() {
1654 digest
1655 } else {
1656 return Err(OciDistributionError::GenericError(Some(
1657 "Getting referrers for a tag is not supported".into(),
1658 )));
1659 },
1660 at = artifact_type
1661 .map(|at| format!("?artifactType={at}"))
1662 .unwrap_or_default(),
1663 ))
1664 }
1665}
1666
1667fn validate_registry_response(status: reqwest::StatusCode, body: &[u8], url: &str) -> Result<()> {
1671 match status {
1672 reqwest::StatusCode::OK => Ok(()),
1673 reqwest::StatusCode::UNAUTHORIZED => Err(OciDistributionError::UnauthorizedError {
1674 url: url.to_string(),
1675 }),
1676 s if s.is_success() => Err(OciDistributionError::SpecViolationError(format!(
1677 "Expected HTTP Status {}, got {} instead",
1678 reqwest::StatusCode::OK,
1679 status,
1680 ))),
1681 s if s.is_client_error() => {
1682 match serde_json::from_slice::<OciEnvelope>(body) {
1683 Ok(envelope) => Err(OciDistributionError::RegistryError {
1685 envelope,
1686 url: url.to_string(),
1687 }),
1688 Err(_) => Err(OciDistributionError::ServerError {
1690 code: s.as_u16(),
1691 url: url.to_string(),
1692 message: String::from_utf8_lossy(body).to_string(),
1693 }),
1694 }
1695 }
1696 s => {
1697 let text = std::str::from_utf8(body)?;
1698
1699 Err(OciDistributionError::ServerError {
1700 code: s.as_u16(),
1701 url: url.to_string(),
1702 message: text.to_string(),
1703 })
1704 }
1705 }
1706}
1707
1708fn stream_from_response(
1710 response: Response,
1711 layer: impl AsLayerDescriptor,
1712 verify: bool,
1713) -> Result<SizedStream> {
1714 let content_length = response.content_length();
1715 let headers = response.headers().clone();
1716 let stream = response
1717 .error_for_status()?
1718 .bytes_stream()
1719 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
1720
1721 let expected_layer_digest = layer.as_layer_descriptor().digest.to_string();
1722 let layer_digester = Digester::new(&expected_layer_digest)?;
1723 let header_digester_and_digest = match digest_header_value(headers)? {
1724 Some(digest) if digest == expected_layer_digest => None,
1726 Some(digest) => Some((Digester::new(&digest)?, digest)),
1727 None => None,
1728 };
1729 let header_digest = header_digester_and_digest
1730 .as_ref()
1731 .map(|(_, digest)| digest.to_owned());
1732 let stream: BoxStream<'static, std::result::Result<bytes::Bytes, std::io::Error>> = if verify {
1733 Box::pin(VerifyingStream::new(
1734 Box::pin(stream),
1735 layer_digester,
1736 expected_layer_digest,
1737 header_digester_and_digest,
1738 ))
1739 } else {
1740 Box::pin(stream)
1741 };
1742 Ok(SizedStream {
1743 content_length,
1744 digest_header_value: header_digest,
1745 stream,
1746 })
1747}
1748
1749struct RequestBuilderWrapper<'a> {
1753 client: &'a Client,
1754 request_builder: RequestBuilder,
1755}
1756
1757impl<'a> RequestBuilderWrapper<'a> {
1759 fn from_client(
1763 client: &'a Client,
1764 f: impl Fn(&reqwest::Client) -> RequestBuilder,
1765 ) -> RequestBuilderWrapper<'a> {
1766 let request_builder = f(&client.client);
1767 RequestBuilderWrapper {
1768 client,
1769 request_builder,
1770 }
1771 }
1772
1773 fn into_request_builder(self) -> RequestBuilder {
1775 self.request_builder
1776 }
1777}
1778
1779impl<'a> RequestBuilderWrapper<'a> {
1781 fn apply_accept(&self, accept: &[&str]) -> Result<RequestBuilderWrapper> {
1782 let request_builder = self
1783 .request_builder
1784 .try_clone()
1785 .ok_or_else(|| {
1786 OciDistributionError::GenericError(Some(
1787 "could not clone request builder".to_string(),
1788 ))
1789 })?
1790 .header("Accept", Vec::from(accept).join(", "));
1791
1792 Ok(RequestBuilderWrapper {
1793 client: self.client,
1794 request_builder,
1795 })
1796 }
1797
1798 async fn apply_auth(
1805 &self,
1806 image: &Reference,
1807 op: RegistryOperation,
1808 ) -> Result<RequestBuilderWrapper> {
1809 let mut headers = HeaderMap::new();
1810
1811 if let Some(token) = self.client.get_auth_token(image, op).await {
1812 match token {
1813 RegistryTokenType::Bearer(token) => {
1814 debug!("Using bearer token authentication.");
1815 headers.insert("Authorization", token.bearer_token().parse().unwrap());
1816 }
1817 RegistryTokenType::Basic(username, password) => {
1818 debug!("Using HTTP basic authentication.");
1819 return Ok(RequestBuilderWrapper {
1820 client: self.client,
1821 request_builder: self
1822 .request_builder
1823 .try_clone()
1824 .ok_or_else(|| {
1825 OciDistributionError::GenericError(Some(
1826 "could not clone request builder".to_string(),
1827 ))
1828 })?
1829 .headers(headers)
1830 .basic_auth(username.to_string(), Some(password.to_string())),
1831 });
1832 }
1833 }
1834 }
1835 Ok(RequestBuilderWrapper {
1836 client: self.client,
1837 request_builder: self
1838 .request_builder
1839 .try_clone()
1840 .ok_or_else(|| {
1841 OciDistributionError::GenericError(Some(
1842 "could not clone request builder".to_string(),
1843 ))
1844 })?
1845 .headers(headers),
1846 })
1847 }
1848}
1849
1850#[derive(Debug, Clone)]
1852pub enum CertificateEncoding {
1853 #[allow(missing_docs)]
1854 Der,
1855 #[allow(missing_docs)]
1856 Pem,
1857}
1858
1859#[derive(Debug, Clone)]
1861pub struct Certificate {
1862 pub encoding: CertificateEncoding,
1864
1865 pub data: Vec<u8>,
1867}
1868
1869pub struct ClientConfig {
1871 pub protocol: ClientProtocol,
1873
1874 #[cfg(feature = "native-tls")]
1876 pub accept_invalid_hostnames: bool,
1877
1878 pub accept_invalid_certificates: bool,
1880
1881 pub use_monolithic_push: bool,
1883
1884 pub extra_root_certificates: Vec<Certificate>,
1887
1888 pub platform_resolver: Option<Box<PlatformResolverFn>>,
1896
1897 pub max_concurrent_upload: usize,
1902
1903 pub max_concurrent_download: usize,
1908
1909 pub default_token_expiration_secs: usize,
1914
1915 pub read_timeout: Option<Duration>,
1919
1920 pub connect_timeout: Option<Duration>,
1924
1925 pub user_agent: &'static str,
1929
1930 pub https_proxy: Option<String>,
1934
1935 pub http_proxy: Option<String>,
1939
1940 pub no_proxy: Option<String>,
1944}
1945
1946impl Default for ClientConfig {
1947 fn default() -> Self {
1948 Self {
1949 protocol: ClientProtocol::default(),
1950 #[cfg(feature = "native-tls")]
1951 accept_invalid_hostnames: false,
1952 accept_invalid_certificates: false,
1953 use_monolithic_push: false,
1954 extra_root_certificates: Vec::new(),
1955 platform_resolver: Some(Box::new(current_platform_resolver)),
1956 max_concurrent_upload: DEFAULT_MAX_CONCURRENT_UPLOAD,
1957 max_concurrent_download: DEFAULT_MAX_CONCURRENT_DOWNLOAD,
1958 default_token_expiration_secs: DEFAULT_TOKEN_EXPIRATION_SECS,
1959 read_timeout: None,
1960 connect_timeout: None,
1961 user_agent: DEFAULT_USER_AGENT,
1962 https_proxy: None,
1963 http_proxy: None,
1964 no_proxy: None,
1965 }
1966 }
1967}
1968
1969type PlatformResolverFn = dyn Fn(&[ImageIndexEntry]) -> Option<String> + Send + Sync;
1973
1974pub fn linux_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
1976 manifests
1977 .iter()
1978 .find(|entry| {
1979 entry.platform.as_ref().map_or(false, |platform| {
1980 platform.os == "linux" && platform.architecture == "amd64"
1981 })
1982 })
1983 .map(|entry| entry.digest.clone())
1984}
1985
1986pub fn windows_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
1988 manifests
1989 .iter()
1990 .find(|entry| {
1991 entry.platform.as_ref().map_or(false, |platform| {
1992 platform.os == "windows" && platform.architecture == "amd64"
1993 })
1994 })
1995 .map(|entry| entry.digest.clone())
1996}
1997
1998const MACOS: &str = "macos";
1999const DARWIN: &str = "darwin";
2000
2001fn go_os() -> &'static str {
2002 match std::env::consts::OS {
2006 MACOS => DARWIN,
2007 other => other,
2008 }
2009}
2010
2011const X86_64: &str = "x86_64";
2012const AMD64: &str = "amd64";
2013const X86: &str = "x86";
2014const AMD: &str = "amd";
2015const ARM64: &str = "arm64";
2016const AARCH64: &str = "aarch64";
2017const POWERPC64: &str = "powerpc64";
2018const PPC64LE: &str = "ppc64le";
2019
2020fn go_arch() -> &'static str {
2021 match std::env::consts::ARCH {
2025 X86_64 => AMD64,
2026 X86 => AMD,
2027 AARCH64 => ARM64,
2028 POWERPC64 => PPC64LE,
2029 other => other,
2030 }
2031}
2032
2033pub fn current_platform_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
2036 manifests
2037 .iter()
2038 .find(|entry| {
2039 entry.platform.as_ref().map_or(false, |platform| {
2040 platform.os == go_os() && platform.architecture == go_arch()
2041 })
2042 })
2043 .map(|entry| entry.digest.clone())
2044}
2045
2046#[derive(Debug, Clone, PartialEq, Eq, Default)]
2048pub enum ClientProtocol {
2049 #[allow(missing_docs)]
2050 Http,
2051 #[allow(missing_docs)]
2052 #[default]
2053 Https,
2054 #[allow(missing_docs)]
2055 HttpsExcept(Vec<String>),
2056}
2057
2058impl ClientProtocol {
2059 fn scheme_for(&self, registry: &str) -> &str {
2060 match self {
2061 ClientProtocol::Https => "https",
2062 ClientProtocol::Http => "http",
2063 ClientProtocol::HttpsExcept(exceptions) => {
2064 if exceptions.contains(®istry.to_owned()) {
2065 "http"
2066 } else {
2067 "https"
2068 }
2069 }
2070 }
2071 }
2072}
2073
2074#[derive(Clone, Debug)]
2075struct BearerChallenge {
2076 pub realm: Box<str>,
2077 pub service: Option<String>,
2078}
2079
2080impl TryFrom<&HeaderValue> for BearerChallenge {
2081 type Error = String;
2082
2083 fn try_from(value: &HeaderValue) -> std::result::Result<Self, Self::Error> {
2084 let parser = ChallengeParser::new(
2085 value
2086 .to_str()
2087 .map_err(|e| format!("cannot convert header value to string: {:?}", e))?,
2088 );
2089 parser
2090 .filter_map(|parser_res| {
2091 if let Ok(chalenge_ref) = parser_res {
2092 let bearer_challenge = BearerChallenge::try_from(&chalenge_ref);
2093 bearer_challenge.ok()
2094 } else {
2095 None
2096 }
2097 })
2098 .next()
2099 .ok_or_else(|| "Cannot find Bearer challenge".to_string())
2100 }
2101}
2102
2103impl TryFrom<&ChallengeRef<'_>> for BearerChallenge {
2104 type Error = String;
2105
2106 fn try_from(value: &ChallengeRef<'_>) -> std::result::Result<Self, Self::Error> {
2107 if !value.scheme.eq_ignore_ascii_case("Bearer") {
2108 return Err(format!(
2109 "BearerChallenge doesn't support challenge scheme {:?}",
2110 value.scheme
2111 ));
2112 }
2113 let mut realm = None;
2114 let mut service = None;
2115 for (k, v) in &value.params {
2116 if k.eq_ignore_ascii_case("realm") {
2117 realm = Some(v.to_unescaped());
2118 }
2119
2120 if k.eq_ignore_ascii_case("service") {
2121 service = Some(v.to_unescaped());
2122 }
2123 }
2124
2125 let realm = realm.ok_or("missing required parameter realm")?;
2126
2127 Ok(BearerChallenge {
2128 realm: realm.into_boxed_str(),
2129 service,
2130 })
2131 }
2132}
2133
2134#[cfg(test)]
2135mod test {
2136 use super::*;
2137 use std::convert::TryFrom;
2138 use std::fs;
2139 use std::path;
2140 use std::result::Result;
2141
2142 use rstest::rstest;
2143 use sha2::Digest as _;
2144 use tempfile::TempDir;
2145 use tokio::io::AsyncReadExt;
2146 use tokio_util::io::StreamReader;
2147
2148 use crate::manifest::{self, IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE};
2149
2150 #[cfg(feature = "test-registry")]
2151 use testcontainers::{
2152 core::{Mount, WaitFor},
2153 runners::AsyncRunner,
2154 ContainerRequest, GenericImage, ImageExt,
2155 };
2156
2157 const HELLO_IMAGE_NO_TAG: &str = "webassembly.azurecr.io/hello-wasm";
2158 const HELLO_IMAGE_TAG: &str = "webassembly.azurecr.io/hello-wasm:v1";
2159 const HELLO_IMAGE_DIGEST: &str = "webassembly.azurecr.io/hello-wasm@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
2160 const HELLO_IMAGE_TAG_AND_DIGEST: &str = "webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
2161 const TEST_IMAGES: &[&str] = &[
2162 HELLO_IMAGE_TAG,
2167 HELLO_IMAGE_DIGEST,
2168 HELLO_IMAGE_TAG_AND_DIGEST,
2169 ];
2170 const GHCR_IO_IMAGE: &str = "ghcr.io/krustlet/oci-distribution/hello-wasm:v1";
2171 const DOCKER_IO_IMAGE: &str = "docker.io/library/hello-world@sha256:37a0b92b08d4919615c3ee023f7ddb068d12b8387475d64c622ac30f45c29c51";
2172 const HTPASSWD: &str = "testuser:$2y$05$8/q2bfRcX74EuxGf0qOcSuhWDQJXrgWiy6Fi73/JM2tKC66qSrLve";
2173 const HTPASSWD_USERNAME: &str = "testuser";
2174 const HTPASSWD_PASSWORD: &str = "testpassword";
2175
2176 #[test]
2177 fn test_apply_accept() -> anyhow::Result<()> {
2178 assert_eq!(
2179 RequestBuilderWrapper::from_client(&Client::default(), |client| client
2180 .get("https://example.com/some/module.wasm"))
2181 .apply_accept(&["*/*"])?
2182 .into_request_builder()
2183 .build()?
2184 .headers()["Accept"],
2185 "*/*"
2186 );
2187
2188 assert_eq!(
2189 RequestBuilderWrapper::from_client(&Client::default(), |client| client
2190 .get("https://example.com/some/module.wasm"))
2191 .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
2192 .into_request_builder()
2193 .build()?
2194 .headers()["Accept"],
2195 MIME_TYPES_DISTRIBUTION_MANIFEST.join(", ")
2196 );
2197
2198 Ok(())
2199 }
2200
2201 #[tokio::test]
2202 async fn test_apply_auth_no_token() -> anyhow::Result<()> {
2203 assert!(
2204 !RequestBuilderWrapper::from_client(&Client::default(), |client| client
2205 .get("https://example.com/some/module.wasm"))
2206 .apply_auth(
2207 &Reference::try_from(HELLO_IMAGE_TAG)?,
2208 RegistryOperation::Pull
2209 )
2210 .await?
2211 .into_request_builder()
2212 .build()?
2213 .headers()
2214 .contains_key("Authorization")
2215 );
2216
2217 Ok(())
2218 }
2219
2220 #[tokio::test]
2221 async fn test_apply_auth_bearer_token() -> anyhow::Result<()> {
2222 use hmac::{Hmac, Mac};
2223 use jwt::SignWithKey;
2224 use sha2::Sha256;
2225 let client = Client::default();
2226 let header = jwt::header::Header {
2227 algorithm: jwt::algorithm::AlgorithmType::Hs256,
2228 key_id: None,
2229 type_: None,
2230 content_type: None,
2231 };
2232 let claims: jwt::claims::Claims = Default::default();
2233 let key: Hmac<Sha256> = Hmac::new_from_slice(b"some-secret").unwrap();
2234 let token = jwt::Token::new(header, claims)
2235 .sign_with_key(&key)?
2236 .as_str()
2237 .to_string();
2238
2239 client
2241 .store_auth(
2242 Reference::try_from(HELLO_IMAGE_TAG)?.resolve_registry(),
2243 RegistryAuth::Anonymous,
2244 )
2245 .await;
2246
2247 client
2248 .tokens
2249 .insert(
2250 &Reference::try_from(HELLO_IMAGE_TAG)?,
2251 RegistryOperation::Pull,
2252 RegistryTokenType::Bearer(RegistryToken::Token {
2253 token: token.clone(),
2254 }),
2255 )
2256 .await;
2257 assert_eq!(
2258 RequestBuilderWrapper::from_client(&client, |client| client
2259 .get("https://example.com/some/module.wasm"))
2260 .apply_auth(
2261 &Reference::try_from(HELLO_IMAGE_TAG)?,
2262 RegistryOperation::Pull
2263 )
2264 .await?
2265 .into_request_builder()
2266 .build()?
2267 .headers()["Authorization"],
2268 format!("Bearer {}", &token)
2269 );
2270
2271 Ok(())
2272 }
2273
2274 #[test]
2275 fn test_to_v2_blob_url() {
2276 let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2277 let c = Client::default();
2278
2279 assert_eq!(
2280 c.to_v2_blob_url(&image, "sha256:deadbeef"),
2281 "https://webassembly.azurecr.io/v2/hello-wasm/blobs/sha256:deadbeef"
2282 );
2283
2284 image.set_mirror_registry("docker.mirror.io".to_owned());
2285 assert_eq!(
2286 c.to_v2_blob_url(&image, "sha256:deadbeef"),
2287 "https://docker.mirror.io/v2/hello-wasm/blobs/sha256:deadbeef?ns=webassembly.azurecr.io"
2288 );
2289 }
2290
2291 #[rstest(image, expected_uri, expected_mirror_uri,
2292 case(HELLO_IMAGE_NO_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/latest", "https://docker.mirror.io/v2/hello-wasm/manifests/latest?ns=webassembly.azurecr.io"), case(HELLO_IMAGE_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/v1", "https://docker.mirror.io/v2/hello-wasm/manifests/v1?ns=webassembly.azurecr.io"),
2294 case(HELLO_IMAGE_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
2295 case(HELLO_IMAGE_TAG_AND_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
2296 )]
2297 fn test_to_v2_manifest(image: &str, expected_uri: &str, expected_mirror_uri: &str) {
2298 let mut reference = Reference::try_from(image).expect("failed to parse reference");
2299 let c = Client::default();
2300 assert_eq!(c.to_v2_manifest_url(&reference), expected_uri);
2301
2302 reference.set_mirror_registry("docker.mirror.io".to_owned());
2303 assert_eq!(c.to_v2_manifest_url(&reference), expected_mirror_uri);
2304 }
2305
2306 #[test]
2307 fn test_to_v2_blob_upload_url() {
2308 let image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2309 let blob_url = Client::default().to_v2_blob_upload_url(&image);
2310
2311 assert_eq!(
2312 blob_url,
2313 "https://webassembly.azurecr.io/v2/hello-wasm/blobs/uploads/"
2314 )
2315 }
2316
2317 #[test]
2318 fn test_to_list_tags_url() {
2319 let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
2320 let c = Client::default();
2321
2322 assert_eq!(
2323 c.to_list_tags_url(&image),
2324 "https://webassembly.azurecr.io/v2/hello-wasm/tags/list"
2325 );
2326
2327 image.set_mirror_registry("docker.mirror.io".to_owned());
2328 assert_eq!(
2329 c.to_list_tags_url(&image),
2330 "https://docker.mirror.io/v2/hello-wasm/tags/list?ns=webassembly.azurecr.io"
2331 );
2332 }
2333
2334 #[test]
2335 fn manifest_url_generation_respects_http_protocol() {
2336 let c = Client::new(ClientConfig {
2337 protocol: ClientProtocol::Http,
2338 ..Default::default()
2339 });
2340 let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2341 .expect("Could not parse reference");
2342 assert_eq!(
2343 "http://webassembly.azurecr.io/v2/hello/manifests/v1",
2344 c.to_v2_manifest_url(&reference)
2345 );
2346 }
2347
2348 #[test]
2349 fn blob_url_generation_respects_http_protocol() {
2350 let c = Client::new(ClientConfig {
2351 protocol: ClientProtocol::Http,
2352 ..Default::default()
2353 });
2354 let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2355 .expect("Could not parse reference");
2356 assert_eq!(
2357 "http://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2358 c.to_v2_blob_url(&reference, reference.digest().unwrap())
2359 );
2360 }
2361
2362 #[test]
2363 fn manifest_url_generation_uses_https_if_not_on_exception_list() {
2364 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2365 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2366 let c = Client::new(ClientConfig {
2367 protocol,
2368 ..Default::default()
2369 });
2370 let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
2371 .expect("Could not parse reference");
2372 assert_eq!(
2373 "https://webassembly.azurecr.io/v2/hello/manifests/v1",
2374 c.to_v2_manifest_url(&reference)
2375 );
2376 }
2377
2378 #[test]
2379 fn manifest_url_generation_uses_http_if_on_exception_list() {
2380 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2381 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2382 let c = Client::new(ClientConfig {
2383 protocol,
2384 ..Default::default()
2385 });
2386 let reference = Reference::try_from("oci.registry.local/hello:v1".to_owned())
2387 .expect("Could not parse reference");
2388 assert_eq!(
2389 "http://oci.registry.local/v2/hello/manifests/v1",
2390 c.to_v2_manifest_url(&reference)
2391 );
2392 }
2393
2394 #[test]
2395 fn blob_url_generation_uses_https_if_not_on_exception_list() {
2396 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2397 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2398 let c = Client::new(ClientConfig {
2399 protocol,
2400 ..Default::default()
2401 });
2402 let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2403 .expect("Could not parse reference");
2404 assert_eq!(
2405 "https://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2406 c.to_v2_blob_url(&reference, reference.digest().unwrap())
2407 );
2408 }
2409
2410 #[test]
2411 fn blob_url_generation_uses_http_if_on_exception_list() {
2412 let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
2413 let protocol = ClientProtocol::HttpsExcept(insecure_registries);
2414 let c = Client::new(ClientConfig {
2415 protocol,
2416 ..Default::default()
2417 });
2418 let reference = Reference::try_from("oci.registry.local/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
2419 .expect("Could not parse reference");
2420 assert_eq!(
2421 "http://oci.registry.local/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
2422 c.to_v2_blob_url(&reference, reference.digest().unwrap())
2423 );
2424 }
2425
2426 #[test]
2427 fn can_generate_valid_digest() {
2428 let bytes = b"hellobytes";
2429 let hash = sha256_digest(bytes);
2430
2431 let combination = vec![b"hello".to_vec(), b"bytes".to_vec()];
2432 let combination_hash =
2433 sha256_digest(&combination.into_iter().flatten().collect::<Vec<u8>>());
2434
2435 assert_eq!(
2436 hash,
2437 "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2438 );
2439 assert_eq!(
2440 combination_hash,
2441 "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
2442 );
2443 }
2444
2445 #[test]
2446 fn test_registry_token_deserialize() {
2447 let text = r#"{"token": "abc"}"#;
2449 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2450 assert!(res.is_ok());
2451 let rt = res.unwrap();
2452 assert_eq!(rt.token(), "abc");
2453
2454 let text = r#"{"access_token": "xyz"}"#;
2456 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2457 assert!(res.is_ok());
2458 let rt = res.unwrap();
2459 assert_eq!(rt.token(), "xyz");
2460
2461 let text = r#"{"access_token": "xyz", "token": "abc"}"#;
2463 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2464 assert!(res.is_ok());
2465 let rt = res.unwrap();
2466 assert_eq!(rt.token(), "abc");
2467
2468 let text = r#"{"token": "abc", "access_token": "xyz"}"#;
2470 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2471 assert!(res.is_ok());
2472 let rt = res.unwrap();
2473 assert_eq!(rt.token(), "abc");
2474
2475 let text = r#"{"aaa": 300, "access_token": "xyz", "token": "abc", "zzz": 600}"#;
2477 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2478 assert!(res.is_ok());
2479
2480 let text = r#"{"access_token": 300, "token": "abc"}"#;
2485 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2486 assert!(res.is_ok());
2487 let rt = res.unwrap();
2488 assert_eq!(rt.token(), "abc");
2489
2490 let text = r#"{"access_token": "xyz", "token": 300}"#;
2492 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2493 assert!(res.is_ok());
2494 let rt = res.unwrap();
2495 assert_eq!(rt.token(), "xyz");
2496
2497 let text = r#"{"token": 300}"#;
2499 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2500 assert!(res.is_err());
2501
2502 let text = r#"{"access_token": 300}"#;
2504 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2505 assert!(res.is_err());
2506
2507 let text = r#"{"token": {"some": "thing"}}"#;
2509 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2510 assert!(res.is_err());
2511
2512 let text = r#"{"access_token": {"some": "thing"}}"#;
2514 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2515 assert!(res.is_err());
2516
2517 let text = r#"{"some": "thing"}"#;
2519 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2520 assert!(res.is_err());
2521
2522 let text = r#"{"token": "abc""#;
2524 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2525 assert!(res.is_err());
2526
2527 let text = r#"_ _ _ kjbwef??98{9898 }} }}"#;
2529 let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
2530 assert!(res.is_err());
2531 }
2532
2533 fn check_auth_token(token: &str) {
2534 assert!(token.len() > 64);
2536 }
2537
2538 #[tokio::test]
2539 async fn test_auth() {
2540 for &image in TEST_IMAGES {
2541 let reference = Reference::try_from(image).expect("failed to parse reference");
2542 let c = Client::default();
2543 let token = c
2544 .auth(
2545 &reference,
2546 &RegistryAuth::Anonymous,
2547 RegistryOperation::Pull,
2548 )
2549 .await
2550 .expect("result from auth request");
2551
2552 assert!(token.is_some());
2553 check_auth_token(token.unwrap().as_ref());
2554
2555 let tok = c
2556 .tokens
2557 .get(&reference, RegistryOperation::Pull)
2558 .await
2559 .expect("token is available");
2560 if let RegistryTokenType::Bearer(tok) = tok {
2562 check_auth_token(tok.token());
2563 } else {
2564 panic!("Unexpeted Basic Auth Token");
2565 }
2566 }
2567 }
2568
2569 #[cfg(feature = "test-registry")]
2570 #[tokio::test]
2571 async fn test_list_tags() {
2572 let test_container = registry_image_edge()
2573 .start()
2574 .await
2575 .expect("Failed to start registry container");
2576 let port = test_container
2577 .get_host_port_ipv4(5000)
2578 .await
2579 .expect("Failed to get port");
2580 let auth =
2581 RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
2582
2583 let client = Client::new(ClientConfig {
2584 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
2585 ..Default::default()
2586 });
2587
2588 let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
2589 client
2590 .auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
2591 .await
2592 .expect("cannot authenticate against registry for pull operation");
2593
2594 let (manifest, _digest) = client
2595 ._pull_image_manifest(&image)
2596 .await
2597 .expect("failed to pull manifest");
2598
2599 let image_data = client
2600 .pull(&image, &auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
2601 .await
2602 .expect("failed to pull image");
2603
2604 for i in 0..=3 {
2605 let push_image: Reference = format!("localhost:{}/hello-wasm:1.0.{}", port, i)
2606 .parse()
2607 .unwrap();
2608 client
2609 .auth(&push_image, &auth, RegistryOperation::Push)
2610 .await
2611 .expect("authenticated");
2612 client
2613 .push(
2614 &push_image,
2615 &image_data.layers,
2616 image_data.config.clone(),
2617 &auth,
2618 Some(manifest.clone()),
2619 )
2620 .await
2621 .expect("Failed to push Image");
2622 }
2623
2624 let image: Reference = format!("localhost:{}/hello-wasm:1.0.1", port)
2625 .parse()
2626 .unwrap();
2627 let response = client
2628 .list_tags(&image, &RegistryAuth::Anonymous, Some(2), Some("1.0.1"))
2629 .await
2630 .expect("Cannot list Tags");
2631 assert_eq!(response.tags, vec!["1.0.2", "1.0.3"])
2632 }
2633
2634 #[tokio::test]
2635 async fn test_pull_manifest_private() {
2636 for &image in TEST_IMAGES {
2637 let reference = Reference::try_from(image).expect("failed to parse reference");
2638 let c = Client::default();
2640 c._pull_image_manifest(&reference)
2641 .await
2642 .expect_err("pull manifest should fail");
2643
2644 let c = Client::default();
2646 c.auth(
2647 &reference,
2648 &RegistryAuth::Anonymous,
2649 RegistryOperation::Pull,
2650 )
2651 .await
2652 .expect("authenticated");
2653 let (manifest, _) = c
2654 ._pull_image_manifest(&reference)
2655 .await
2656 .expect("pull manifest should not fail");
2657
2658 assert_eq!(manifest.schema_version, 2);
2660 assert!(!manifest.layers.is_empty());
2661 }
2662 }
2663
2664 #[tokio::test]
2665 async fn test_pull_manifest_public() {
2666 for &image in TEST_IMAGES {
2667 let reference = Reference::try_from(image).expect("failed to parse reference");
2668 let c = Client::default();
2669 let (manifest, _) = c
2670 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
2671 .await
2672 .expect("pull manifest should not fail");
2673
2674 assert_eq!(manifest.schema_version, 2);
2676 assert!(!manifest.layers.is_empty());
2677 }
2678 }
2679
2680 #[tokio::test]
2681 async fn pull_manifest_and_config_public() {
2682 for &image in TEST_IMAGES {
2683 let reference = Reference::try_from(image).expect("failed to parse reference");
2684 let c = Client::default();
2685 let (manifest, _, config) = c
2686 .pull_manifest_and_config(&reference, &RegistryAuth::Anonymous)
2687 .await
2688 .expect("pull manifest and config should not fail");
2689
2690 assert_eq!(manifest.schema_version, 2);
2692 assert!(!manifest.layers.is_empty());
2693 assert!(!config.is_empty());
2694 }
2695 }
2696
2697 #[tokio::test]
2698 async fn test_fetch_digest() {
2699 let c = Client::default();
2700
2701 for &image in TEST_IMAGES {
2702 let reference = Reference::try_from(image).expect("failed to parse reference");
2703 c.fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
2704 .await
2705 .expect("pull manifest should not fail");
2706
2707 let reference = Reference::try_from(image).expect("failed to parse reference");
2709 let c = Client::default();
2710 c.auth(
2711 &reference,
2712 &RegistryAuth::Anonymous,
2713 RegistryOperation::Pull,
2714 )
2715 .await
2716 .expect("authenticated");
2717 let digest = c
2718 .fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
2719 .await
2720 .expect("pull manifest should not fail");
2721
2722 assert_eq!(
2723 digest,
2724 "sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7"
2725 );
2726 }
2727 }
2728
2729 #[tokio::test]
2730 async fn test_pull_blob() {
2731 let c = Client::default();
2732
2733 for &image in TEST_IMAGES {
2734 let reference = Reference::try_from(image).expect("failed to parse reference");
2735 c.auth(
2736 &reference,
2737 &RegistryAuth::Anonymous,
2738 RegistryOperation::Pull,
2739 )
2740 .await
2741 .expect("authenticated");
2742 let (manifest, _) = c
2743 ._pull_image_manifest(&reference)
2744 .await
2745 .expect("failed to pull manifest");
2746
2747 let mut file: Vec<u8> = Vec::new();
2749 let layer0 = &manifest.layers[0];
2750
2751 let mut last_error = None;
2753 for i in 1..6 {
2754 if let Err(e) = c.pull_blob(&reference, layer0, &mut file).await {
2755 println!(
2756 "Got error on pull_blob call attempt {}. Will retry in 1s: {:?}",
2757 i, e
2758 );
2759 last_error.replace(e);
2760 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2761 } else {
2762 last_error = None;
2763 break;
2764 }
2765 }
2766
2767 if let Some(e) = last_error {
2768 panic!("Unable to pull layer: {:?}", e);
2769 }
2770
2771 assert_eq!(file.len(), layer0.size as usize);
2773 }
2774 }
2775
2776 #[tokio::test]
2777 async fn test_pull_blob_stream() {
2778 let c = Client::default();
2779
2780 for &image in TEST_IMAGES {
2781 let reference = Reference::try_from(image).expect("failed to parse reference");
2782 c.auth(
2783 &reference,
2784 &RegistryAuth::Anonymous,
2785 RegistryOperation::Pull,
2786 )
2787 .await
2788 .expect("authenticated");
2789 let (manifest, _) = c
2790 ._pull_image_manifest(&reference)
2791 .await
2792 .expect("failed to pull manifest");
2793
2794 let mut file: Vec<u8> = Vec::new();
2796 let layer0 = &manifest.layers[0];
2797
2798 let layer_stream = c
2799 .pull_blob_stream(&reference, layer0)
2800 .await
2801 .expect("failed to pull blob stream");
2802
2803 assert_eq!(layer_stream.content_length, Some(layer0.size as u64));
2804 AsyncReadExt::read_to_end(&mut StreamReader::new(layer_stream.stream), &mut file)
2805 .await
2806 .unwrap();
2807
2808 assert_eq!(file.len(), layer0.size as usize);
2810 }
2811 }
2812
2813 #[tokio::test]
2814 async fn test_pull_blob_stream_partial() {
2815 let c = Client::default();
2816
2817 for &image in TEST_IMAGES {
2818 let reference = Reference::try_from(image).expect("failed to parse reference");
2819 c.auth(
2820 &reference,
2821 &RegistryAuth::Anonymous,
2822 RegistryOperation::Pull,
2823 )
2824 .await
2825 .expect("authenticated");
2826 let (manifest, _) = c
2827 ._pull_image_manifest(&reference)
2828 .await
2829 .expect("failed to pull manifest");
2830
2831 let mut partial_file: Vec<u8> = Vec::new();
2833 let layer0 = &manifest.layers[0];
2834 let (offset, length) = (10, 6);
2835
2836 let partial_response = c
2837 .pull_blob_stream_partial(&reference, layer0, offset, Some(length))
2838 .await
2839 .expect("failed to pull blob stream");
2840 let full_response = c
2841 .pull_blob_stream_partial(&reference, layer0, 0, Some(layer0.size as u64))
2842 .await
2843 .expect("failed to pull blob stream");
2844
2845 let layer_stream_partial = match partial_response {
2846 BlobResponse::Full(_stream) => panic!("expected partial response"),
2847 BlobResponse::Partial(stream) => stream,
2848 };
2849 assert_eq!(layer_stream_partial.content_length, Some(length));
2850 AsyncReadExt::read_to_end(
2851 &mut StreamReader::new(layer_stream_partial.stream),
2852 &mut partial_file,
2853 )
2854 .await
2855 .unwrap();
2856
2857 let mut full_file: Vec<u8> = Vec::new();
2859 let layer_stream_full = match full_response {
2860 BlobResponse::Full(_stream) => panic!("expected partial response"),
2861 BlobResponse::Partial(stream) => stream,
2862 };
2863 assert_eq!(layer_stream_full.content_length, Some(layer0.size as u64));
2864 AsyncReadExt::read_to_end(
2865 &mut StreamReader::new(layer_stream_full.stream),
2866 &mut full_file,
2867 )
2868 .await
2869 .unwrap();
2870
2871 assert_eq!(partial_file.len(), length as usize);
2873 assert_eq!(full_file.len(), layer0.size as usize);
2875 let end: usize = (offset + length) as usize;
2877 assert_eq!(partial_file, full_file[offset as usize..end]);
2878 }
2879 }
2880
2881 #[tokio::test]
2882 async fn test_pull() {
2883 for &image in TEST_IMAGES {
2884 let reference = Reference::try_from(image).expect("failed to parse reference");
2885
2886 let mut last_error = None;
2888 let mut image_data = None;
2889 for i in 1..6 {
2890 match Client::default()
2891 .pull(
2892 &reference,
2893 &RegistryAuth::Anonymous,
2894 vec![manifest::WASM_LAYER_MEDIA_TYPE],
2895 )
2896 .await
2897 {
2898 Ok(data) => {
2899 image_data = Some(data);
2900 last_error = None;
2901 break;
2902 }
2903 Err(e) => {
2904 println!(
2905 "Got error on pull call attempt {}. Will retry in 1s: {:?}",
2906 i, e
2907 );
2908 last_error.replace(e);
2909 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2910 }
2911 }
2912 }
2913
2914 if let Some(e) = last_error {
2915 panic!("Unable to pull layer: {:?}", e);
2916 }
2917
2918 assert!(image_data.is_some());
2919 let image_data = image_data.unwrap();
2920 assert!(!image_data.layers.is_empty());
2921 assert!(image_data.digest.is_some());
2922 }
2923 }
2924
2925 #[tokio::test]
2927 async fn test_pull_without_layer_validation() {
2928 for &image in TEST_IMAGES {
2929 let reference = Reference::try_from(image).expect("failed to parse reference");
2930 assert!(Client::default()
2931 .pull(&reference, &RegistryAuth::Anonymous, vec![],)
2932 .await
2933 .is_err());
2934 }
2935 }
2936
2937 #[tokio::test]
2939 async fn test_pull_wrong_layer_validation() {
2940 for &image in TEST_IMAGES {
2941 let reference = Reference::try_from(image).expect("failed to parse reference");
2942 assert!(Client::default()
2943 .pull(&reference, &RegistryAuth::Anonymous, vec!["text/plain"],)
2944 .await
2945 .is_err());
2946 }
2947 }
2948
2949 #[cfg(feature = "test-registry")]
2955 fn registry_image_edge() -> GenericImage {
2956 GenericImage::new("distribution/distribution", "edge")
2957 .with_wait_for(WaitFor::message_on_stderr("listening on "))
2958 }
2959
2960 #[cfg(feature = "test-registry")]
2961 fn registry_image() -> GenericImage {
2962 GenericImage::new("docker.io/library/registry", "2")
2963 .with_wait_for(WaitFor::message_on_stderr("listening on "))
2964 }
2965
2966 #[cfg(feature = "test-registry")]
2967 fn registry_image_basic_auth(auth_path: &str) -> ContainerRequest<GenericImage> {
2968 GenericImage::new("docker.io/library/registry", "2")
2969 .with_wait_for(WaitFor::message_on_stderr("listening on "))
2970 .with_env_var("REGISTRY_AUTH", "htpasswd")
2971 .with_env_var("REGISTRY_AUTH_HTPASSWD_REALM", "Registry Realm")
2972 .with_env_var("REGISTRY_AUTH_HTPASSWD_PATH", "/auth/htpasswd")
2973 .with_mount(Mount::bind_mount(auth_path, "/auth"))
2974 }
2975
2976 #[tokio::test]
2977 #[cfg(feature = "test-registry")]
2978 async fn can_push_chunk() {
2979 let test_container = registry_image()
2980 .start()
2981 .await
2982 .expect("Failed to start registry container");
2983 let port = test_container
2984 .get_host_port_ipv4(5000)
2985 .await
2986 .expect("Failed to get port");
2987
2988 let c = Client::new(ClientConfig {
2989 protocol: ClientProtocol::Http,
2990 ..Default::default()
2991 });
2992 let url = format!("localhost:{}/hello-wasm:v1", port);
2993 let image: Reference = url.parse().unwrap();
2994
2995 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
2996 .await
2997 .expect("result from auth request");
2998
2999 let location = c
3000 .begin_push_chunked_session(&image)
3001 .await
3002 .expect("failed to begin push session");
3003
3004 let image_data: Vec<Vec<u8>> = vec![b"iamawebassemblymodule".to_vec()];
3005
3006 let (next_location, next_byte) = c
3007 .push_chunk(&location, &image, &image_data[0], 0)
3008 .await
3009 .expect("failed to push layer");
3010
3011 assert!(next_location.len() >= url.len() + "6987887f-0196-45ee-91a1-2dfad901bea0".len());
3013 assert_eq!(
3014 next_byte,
3015 "iamawebassemblymodule".to_string().into_bytes().len()
3016 );
3017
3018 let layer_location = c
3019 .end_push_chunked_session(&next_location, &image, &sha256_digest(&image_data[0]))
3020 .await
3021 .expect("failed to end push session");
3022
3023 assert_eq!(layer_location, format!("http://localhost:{}/v2/hello-wasm/blobs/sha256:6165c4ad43c0803798b6f2e49d6348c915d52c999a5f890846cee77ea65d230b", port));
3024 }
3025
3026 #[tokio::test]
3027 #[cfg(feature = "test-registry")]
3028 async fn can_push_multiple_chunks() {
3029 let test_container = registry_image()
3030 .start()
3031 .await
3032 .expect("Failed to start registry container");
3033 let port = test_container
3034 .get_host_port_ipv4(5000)
3035 .await
3036 .expect("Failed to get port");
3037
3038 let mut c = Client::new(ClientConfig {
3039 protocol: ClientProtocol::Http,
3040 ..Default::default()
3041 });
3042 c.push_chunk_size = 3;
3044 let url = format!("localhost:{}/hello-wasm:v1", port);
3045 let image: Reference = url.parse().unwrap();
3046
3047 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
3048 .await
3049 .expect("result from auth request");
3050
3051 let image_data: Vec<u8> =
3052 b"i am a big webassembly mode that needs chunked uploads".to_vec();
3053 let image_digest = sha256_digest(&image_data);
3054
3055 let location = c
3056 .push_blob_chunked(&image, &image_data, &image_digest)
3057 .await
3058 .expect("failed to begin push session");
3059
3060 assert_eq!(
3061 location,
3062 format!(
3063 "http://localhost:{}/v2/hello-wasm/blobs/{}",
3064 port, image_digest
3065 )
3066 );
3067 }
3068
3069 #[tokio::test]
3070 #[cfg(feature = "test-registry")]
3071 async fn test_image_roundtrip_anon_auth() {
3072 let test_container = registry_image()
3073 .start()
3074 .await
3075 .expect("Failed to start registry container");
3076
3077 test_image_roundtrip(&RegistryAuth::Anonymous, &test_container).await;
3078 }
3079
3080 #[tokio::test]
3081 #[cfg(feature = "test-registry")]
3082 async fn test_image_roundtrip_basic_auth() {
3083 let auth_dir = TempDir::new().expect("cannot create tmp directory");
3084 let htpasswd_path = path::Path::join(auth_dir.path(), "htpasswd");
3085 fs::write(htpasswd_path, HTPASSWD).expect("cannot write htpasswd file");
3086
3087 let image = registry_image_basic_auth(
3088 auth_dir
3089 .path()
3090 .to_str()
3091 .expect("cannot convert htpasswd_path to string"),
3092 );
3093 let test_container = image.start().await.expect("cannot registry container");
3094
3095 let auth =
3096 RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
3097
3098 test_image_roundtrip(&auth, &test_container).await;
3099 }
3100
3101 #[cfg(feature = "test-registry")]
3102 async fn test_image_roundtrip(
3103 registry_auth: &RegistryAuth,
3104 test_container: &testcontainers::ContainerAsync<GenericImage>,
3105 ) {
3106 let _ = tracing_subscriber::fmt::try_init();
3107 let port = test_container
3108 .get_host_port_ipv4(5000)
3109 .await
3110 .expect("Failed to get port");
3111
3112 let c = Client::new(ClientConfig {
3113 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3114 ..Default::default()
3115 });
3116
3117 let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3119 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3120 .await
3121 .expect("cannot authenticate against registry for pull operation");
3122
3123 let (manifest, _digest) = c
3124 ._pull_image_manifest(&image)
3125 .await
3126 .expect("failed to pull manifest");
3127
3128 let image_data = c
3129 .pull(&image, registry_auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
3130 .await
3131 .expect("failed to pull image");
3132
3133 let push_image: Reference = format!("localhost:{}/hello-wasm:v1", port).parse().unwrap();
3134 c.auth(&push_image, registry_auth, RegistryOperation::Push)
3135 .await
3136 .expect("authenticated");
3137
3138 c.push(
3139 &push_image,
3140 &image_data.layers,
3141 image_data.config.clone(),
3142 registry_auth,
3143 Some(manifest.clone()),
3144 )
3145 .await
3146 .expect("failed to push image");
3147
3148 let pulled_image_data = c
3149 .pull(
3150 &push_image,
3151 registry_auth,
3152 vec![manifest::WASM_LAYER_MEDIA_TYPE],
3153 )
3154 .await
3155 .expect("failed to pull pushed image");
3156
3157 let (pulled_manifest, _digest) = c
3158 ._pull_image_manifest(&push_image)
3159 .await
3160 .expect("failed to pull pushed image manifest");
3161
3162 assert!(image_data.layers.len() == 1);
3163 assert!(pulled_image_data.layers.len() == 1);
3164 assert_eq!(
3165 image_data.layers[0].data.len(),
3166 pulled_image_data.layers[0].data.len()
3167 );
3168 assert_eq!(image_data.layers[0].data, pulled_image_data.layers[0].data);
3169
3170 assert_eq!(manifest.media_type, pulled_manifest.media_type);
3171 assert_eq!(manifest.schema_version, pulled_manifest.schema_version);
3172 assert_eq!(manifest.config.digest, pulled_manifest.config.digest);
3173 }
3174
3175 #[tokio::test]
3176 async fn test_raw_manifest_digest() {
3177 let _ = tracing_subscriber::fmt::try_init();
3178
3179 let c = Client::default();
3180
3181 let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
3183 c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
3184 .await
3185 .expect("cannot authenticate against registry for pull operation");
3186
3187 let (manifest, _) = c
3188 .pull_manifest_raw(
3189 &image,
3190 &RegistryAuth::Anonymous,
3191 MIME_TYPES_DISTRIBUTION_MANIFEST,
3192 )
3193 .await
3194 .expect("failed to pull manifest");
3195
3196 let digest = sha2::Sha256::digest(manifest);
3198 let hex = format!("sha256:{:x}", digest);
3199
3200 assert_eq!(image.digest().unwrap(), hex);
3202 }
3203
3204 #[tokio::test]
3205 #[cfg(feature = "test-registry")]
3206 async fn test_mount() {
3207 let test_container = registry_image()
3209 .start()
3210 .await
3211 .expect("Failed to start registry");
3212 let port = test_container
3213 .get_host_port_ipv4(5000)
3214 .await
3215 .expect("Failed to get port");
3216
3217 let c = Client::new(ClientConfig {
3218 protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
3219 ..Default::default()
3220 });
3221
3222 let layer_reference: Reference = format!("localhost:{}/layer-repository", port)
3224 .parse()
3225 .unwrap();
3226 let layer_data = vec![1u8, 2, 3, 4];
3227 let layer = OciDescriptor {
3228 digest: sha256_digest(&layer_data),
3229 ..Default::default()
3230 };
3231 c.push_blob(&layer_reference, &[1, 2, 3, 4], &layer.digest)
3232 .await
3233 .expect("Failed to push");
3234
3235 let image_reference: Reference = format!("localhost:{}/image-repository", port)
3237 .parse()
3238 .unwrap();
3239 c.mount_blob(&image_reference, &layer_reference, &layer.digest)
3240 .await
3241 .expect("Failed to mount");
3242
3243 let mut buf = Vec::new();
3245 c.pull_blob(&image_reference, &layer, &mut buf)
3246 .await
3247 .expect("Failed to pull");
3248
3249 assert_eq!(layer_data, buf);
3250 }
3251
3252 #[tokio::test]
3253 async fn test_platform_resolution() {
3254 let reference = Reference::try_from(DOCKER_IO_IMAGE).expect("failed to parse reference");
3256 let mut c = Client::new(ClientConfig {
3257 platform_resolver: None,
3258 ..Default::default()
3259 });
3260 let err = c
3261 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3262 .await
3263 .unwrap_err();
3264 assert_eq!(
3265 format!("{}", err),
3266 "Received Image Index/Manifest List, but platform_resolver was not defined on the client config. Consider setting platform_resolver"
3267 );
3268
3269 c = Client::new(ClientConfig {
3270 platform_resolver: Some(Box::new(linux_amd64_resolver)),
3271 ..Default::default()
3272 });
3273 let (_manifest, digest) = c
3274 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3275 .await
3276 .expect("Couldn't pull manifest");
3277 assert_eq!(
3278 digest,
3279 "sha256:f54a58bc1aac5ea1a25d796ae155dc228b3f0e11d046ae276b39c4bf2f13d8c4"
3280 );
3281 }
3282
3283 #[tokio::test]
3284 async fn test_pull_ghcr_io() {
3285 let reference = Reference::try_from(GHCR_IO_IMAGE).expect("failed to parse reference");
3286 let c = Client::default();
3287 let (manifest, _manifest_str) = c
3288 .pull_image_manifest(&reference, &RegistryAuth::Anonymous)
3289 .await
3290 .unwrap();
3291 assert_eq!(manifest.config.media_type, manifest::WASM_CONFIG_MEDIA_TYPE);
3292 }
3293
3294 #[tokio::test]
3295 #[ignore]
3296 async fn test_roundtrip_multiple_layers() {
3297 let _ = tracing_subscriber::fmt::try_init();
3298 let c = Client::new(ClientConfig {
3299 protocol: ClientProtocol::HttpsExcept(vec!["oci.registry.local".to_string()]),
3300 ..Default::default()
3301 });
3302 let src_image = Reference::try_from("registry:2.7.1").expect("failed to parse reference");
3303 let dest_image = Reference::try_from("oci.registry.local/registry:roundtrip-test")
3304 .expect("failed to parse reference");
3305
3306 let image = c
3307 .pull(
3308 &src_image,
3309 &RegistryAuth::Anonymous,
3310 vec![IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE],
3311 )
3312 .await
3313 .expect("Failed to pull manifest");
3314 assert!(image.layers.len() > 1);
3315
3316 let ImageData {
3317 layers,
3318 config,
3319 manifest,
3320 ..
3321 } = image;
3322 c.push(
3323 &dest_image,
3324 &layers,
3325 config,
3326 &RegistryAuth::Anonymous,
3327 manifest,
3328 )
3329 .await
3330 .expect("Failed to pull manifest");
3331
3332 c.pull_image_manifest(&dest_image, &RegistryAuth::Anonymous)
3333 .await
3334 .expect("Failed to pull manifest");
3335 }
3336
3337 #[tokio::test]
3338 async fn test_hashable_image_layer() {
3339 use itertools::Itertools;
3340
3341 let image_layers = Vec::from([
3343 ImageLayer {
3344 data: Vec::from([0, 1, 2, 3]),
3345 media_type: "media_type".to_owned(),
3346 annotations: Some(BTreeMap::from([
3347 ("0".to_owned(), "1".to_owned()),
3348 ("2".to_owned(), "3".to_owned()),
3349 ])),
3350 },
3351 ImageLayer {
3352 data: Vec::from([0, 1, 2, 3]),
3353 media_type: "media_type".to_owned(),
3354 annotations: Some(BTreeMap::from([
3355 ("2".to_owned(), "3".to_owned()),
3356 ("0".to_owned(), "1".to_owned()),
3357 ])),
3358 },
3359 ImageLayer {
3360 data: Vec::from([0, 1, 2, 3]),
3361 media_type: "different_media_type".to_owned(),
3362 annotations: Some(BTreeMap::from([
3363 ("0".to_owned(), "1".to_owned()),
3364 ("2".to_owned(), "3".to_owned()),
3365 ])),
3366 },
3367 ImageLayer {
3368 data: Vec::from([0, 1, 2]),
3369 media_type: "media_type".to_owned(),
3370 annotations: Some(BTreeMap::from([
3371 ("0".to_owned(), "1".to_owned()),
3372 ("2".to_owned(), "3".to_owned()),
3373 ])),
3374 },
3375 ImageLayer {
3376 data: Vec::from([0, 1, 2, 3]),
3377 media_type: "media_type".to_owned(),
3378 annotations: Some(BTreeMap::from([
3379 ("1".to_owned(), "0".to_owned()),
3380 ("2".to_owned(), "3".to_owned()),
3381 ])),
3382 },
3383 ]);
3384
3385 assert_eq!(
3386 &image_layers[0], &image_layers[1],
3387 "image_layers[0] should equal image_layers[1]"
3388 );
3389 assert_ne!(
3390 &image_layers[0], &image_layers[2],
3391 "image_layers[0] should not equal image_layers[2]"
3392 );
3393 assert_ne!(
3394 &image_layers[0], &image_layers[3],
3395 "image_layers[0] should not equal image_layers[3]"
3396 );
3397 assert_ne!(
3398 &image_layers[0], &image_layers[4],
3399 "image_layers[0] should not equal image_layers[4]"
3400 );
3401 assert_ne!(
3402 &image_layers[2], &image_layers[3],
3403 "image_layers[2] should not equal image_layers[3]"
3404 );
3405 assert_ne!(
3406 &image_layers[2], &image_layers[4],
3407 "image_layers[2] should not equal image_layers[4]"
3408 );
3409 assert_ne!(
3410 &image_layers[3], &image_layers[4],
3411 "image_layers[3] should not equal image_layers[4]"
3412 );
3413
3414 let deduped: Vec<ImageLayer> = image_layers.clone().into_iter().unique().collect();
3415 assert_eq!(
3416 image_layers.len() - 1,
3417 deduped.len(),
3418 "after deduplication, there should be one less image layer"
3419 );
3420 }
3421}