use std::collections::{BTreeMap, HashMap};
use std::convert::TryFrom;
use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;
use futures_util::future;
use futures_util::stream::{self, BoxStream, StreamExt, TryStreamExt};
use http::header::RANGE;
use http::{HeaderValue, StatusCode};
use http_auth::{parser::ChallengeParser, ChallengeRef};
use olpc_cjson::CanonicalFormatter;
use reqwest::header::HeaderMap;
use reqwest::{NoProxy, Proxy, RequestBuilder, Response, Url};
use serde::Deserialize;
use serde::Serialize;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::RwLock;
use tracing::{debug, trace, warn};
pub use crate::blob::*;
use crate::config::ConfigFile;
use crate::digest::{digest_header_value, validate_digest, Digest, Digester};
use crate::errors::*;
use crate::manifest::{
ImageIndexEntry, OciDescriptor, OciImageIndex, OciImageManifest, OciManifest, Versioned,
IMAGE_CONFIG_MEDIA_TYPE, IMAGE_LAYER_GZIP_MEDIA_TYPE, IMAGE_LAYER_MEDIA_TYPE,
IMAGE_MANIFEST_LIST_MEDIA_TYPE, IMAGE_MANIFEST_MEDIA_TYPE, OCI_IMAGE_INDEX_MEDIA_TYPE,
OCI_IMAGE_MEDIA_TYPE,
};
use crate::secrets::RegistryAuth;
use crate::secrets::*;
use crate::sha256_digest;
use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
use crate::Reference;
const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[
IMAGE_MANIFEST_MEDIA_TYPE,
IMAGE_MANIFEST_LIST_MEDIA_TYPE,
OCI_IMAGE_MEDIA_TYPE,
OCI_IMAGE_INDEX_MEDIA_TYPE,
];
const PUSH_CHUNK_MAX_SIZE: usize = 4096 * 1024;
pub const DEFAULT_MAX_CONCURRENT_UPLOAD: usize = 16;
pub const DEFAULT_MAX_CONCURRENT_DOWNLOAD: usize = 16;
pub const DEFAULT_TOKEN_EXPIRATION_SECS: usize = 60;
static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
#[derive(Clone)]
pub struct ImageData {
pub layers: Vec<ImageLayer>,
pub digest: Option<String>,
pub config: Config,
pub manifest: Option<OciImageManifest>,
}
pub struct PushResponse {
pub config_url: String,
pub manifest_url: String,
}
#[derive(Deserialize, Debug)]
pub struct TagResponse {
pub name: String,
pub tags: Vec<String>,
}
pub struct LayerDescriptor<'a> {
pub digest: &'a str,
pub urls: &'a Option<Vec<String>>,
}
pub trait AsLayerDescriptor {
fn as_layer_descriptor(&self) -> LayerDescriptor<'_>;
}
impl<T: AsLayerDescriptor> AsLayerDescriptor for &T {
fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
(*self).as_layer_descriptor()
}
}
impl AsLayerDescriptor for &str {
fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
LayerDescriptor {
digest: self,
urls: &None,
}
}
}
impl AsLayerDescriptor for &OciDescriptor {
fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
LayerDescriptor {
digest: &self.digest,
urls: &self.urls,
}
}
}
impl AsLayerDescriptor for &LayerDescriptor<'_> {
fn as_layer_descriptor(&self) -> LayerDescriptor<'_> {
LayerDescriptor {
digest: self.digest,
urls: self.urls,
}
}
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct ImageLayer {
pub data: Vec<u8>,
pub media_type: String,
pub annotations: Option<BTreeMap<String, String>>,
}
impl ImageLayer {
pub fn new(
data: Vec<u8>,
media_type: String,
annotations: Option<BTreeMap<String, String>>,
) -> Self {
ImageLayer {
data,
media_type,
annotations,
}
}
pub fn oci_v1(data: Vec<u8>, annotations: Option<BTreeMap<String, String>>) -> Self {
Self::new(data, IMAGE_LAYER_MEDIA_TYPE.to_string(), annotations)
}
pub fn oci_v1_gzip(data: Vec<u8>, annotations: Option<BTreeMap<String, String>>) -> Self {
Self::new(data, IMAGE_LAYER_GZIP_MEDIA_TYPE.to_string(), annotations)
}
pub fn sha256_digest(&self) -> String {
sha256_digest(&self.data)
}
}
#[derive(Clone)]
pub struct Config {
pub data: Vec<u8>,
pub media_type: String,
pub annotations: Option<BTreeMap<String, String>>,
}
impl Config {
pub fn new(
data: Vec<u8>,
media_type: String,
annotations: Option<BTreeMap<String, String>>,
) -> Self {
Config {
data,
media_type,
annotations,
}
}
pub fn oci_v1(data: Vec<u8>, annotations: Option<BTreeMap<String, String>>) -> Self {
Self::new(data, IMAGE_CONFIG_MEDIA_TYPE.to_string(), annotations)
}
pub fn oci_v1_from_config_file(
config_file: ConfigFile,
annotations: Option<BTreeMap<String, String>>,
) -> Result<Self> {
let data = serde_json::to_vec(&config_file)?;
Ok(Self::new(
data,
IMAGE_CONFIG_MEDIA_TYPE.to_string(),
annotations,
))
}
pub fn sha256_digest(&self) -> String {
sha256_digest(&self.data)
}
}
impl TryFrom<Config> for ConfigFile {
type Error = crate::errors::OciDistributionError;
fn try_from(config: Config) -> Result<Self> {
let config = String::from_utf8(config.data)
.map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
let config_file: ConfigFile = serde_json::from_str(&config)
.map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?;
Ok(config_file)
}
}
#[derive(Clone)]
pub struct Client {
config: Arc<ClientConfig>,
auth_store: Arc<RwLock<HashMap<String, RegistryAuth>>>,
tokens: TokenCache,
client: reqwest::Client,
push_chunk_size: usize,
}
impl Default for Client {
fn default() -> Self {
Self {
config: Arc::default(),
auth_store: Arc::default(),
tokens: TokenCache::new(DEFAULT_TOKEN_EXPIRATION_SECS),
client: reqwest::Client::default(),
push_chunk_size: PUSH_CHUNK_MAX_SIZE,
}
}
}
pub trait ClientConfigSource {
fn client_config(&self) -> ClientConfig;
}
impl TryFrom<ClientConfig> for Client {
type Error = OciDistributionError;
fn try_from(config: ClientConfig) -> std::result::Result<Self, Self::Error> {
#[allow(unused_mut)]
let mut client_builder = reqwest::Client::builder();
#[cfg(not(target_arch = "wasm32"))]
let mut client_builder =
client_builder.danger_accept_invalid_certs(config.accept_invalid_certificates);
client_builder = match () {
#[cfg(all(feature = "native-tls", not(target_arch = "wasm32")))]
() => client_builder.danger_accept_invalid_hostnames(config.accept_invalid_hostnames),
#[cfg(any(not(feature = "native-tls"), target_arch = "wasm32"))]
() => client_builder,
};
#[cfg(not(target_arch = "wasm32"))]
for c in &config.extra_root_certificates {
let cert = match c.encoding {
CertificateEncoding::Der => reqwest::Certificate::from_der(c.data.as_slice())?,
CertificateEncoding::Pem => reqwest::Certificate::from_pem(c.data.as_slice())?,
};
client_builder = client_builder.add_root_certificate(cert);
}
if let Some(timeout) = config.read_timeout {
client_builder = client_builder.read_timeout(timeout);
}
if let Some(timeout) = config.connect_timeout {
client_builder = client_builder.connect_timeout(timeout);
}
client_builder = client_builder.user_agent(config.user_agent);
if let Some(proxy_addr) = &config.https_proxy {
let no_proxy = config
.no_proxy
.as_ref()
.and_then(|no_proxy| NoProxy::from_string(no_proxy));
let proxy = Proxy::https(proxy_addr)?.no_proxy(no_proxy);
client_builder = client_builder.proxy(proxy);
}
let default_token_expiration_secs = config.default_token_expiration_secs;
Ok(Self {
config: Arc::new(config),
tokens: TokenCache::new(default_token_expiration_secs),
client: client_builder.build()?,
push_chunk_size: PUSH_CHUNK_MAX_SIZE,
..Default::default()
})
}
}
impl Client {
pub fn new(config: ClientConfig) -> Self {
let default_token_expiration_secs = config.default_token_expiration_secs;
Client::try_from(config).unwrap_or_else(|err| {
warn!("Cannot create OCI client from config: {:?}", err);
warn!("Creating client with default configuration");
Self {
tokens: TokenCache::new(default_token_expiration_secs),
push_chunk_size: PUSH_CHUNK_MAX_SIZE,
..Default::default()
}
})
}
pub fn from_source(config_source: &impl ClientConfigSource) -> Self {
Self::new(config_source.client_config())
}
async fn store_auth(&self, registry: &str, auth: RegistryAuth) {
self.auth_store
.write()
.await
.insert(registry.to_string(), auth);
}
async fn is_stored_auth(&self, registry: &str) -> bool {
self.auth_store.read().await.contains_key(registry)
}
pub async fn store_auth_if_needed(&self, registry: &str, auth: &RegistryAuth) {
if !self.is_stored_auth(registry).await {
self.store_auth(registry, auth.clone()).await;
}
}
async fn get_auth_token(
&self,
reference: &Reference,
op: RegistryOperation,
) -> Option<RegistryTokenType> {
let registry = reference.resolve_registry();
let auth = self.auth_store.read().await.get(registry)?.clone();
match self.tokens.get(reference, op).await {
Some(token) => Some(token),
None => {
let token = self._auth(reference, &auth, op).await.ok()??;
self.tokens.insert(reference, op, token.clone()).await;
Some(token)
}
}
}
pub async fn list_tags(
&self,
image: &Reference,
auth: &RegistryAuth,
n: Option<usize>,
last: Option<&str>,
) -> Result<TagResponse> {
let op = RegistryOperation::Pull;
let url = self.to_list_tags_url(image);
self.store_auth_if_needed(image.resolve_registry(), auth)
.await;
let request = self.client.get(&url);
let request = if let Some(num) = n {
request.query(&[("n", num)])
} else {
request
};
let request = if let Some(l) = last {
request.query(&[("last", l)])
} else {
request
};
let request = RequestBuilderWrapper {
client: self,
request_builder: request,
};
let res = request
.apply_auth(image, op)
.await?
.into_request_builder()
.send()
.await?;
let status = res.status();
let body = res.bytes().await?;
validate_registry_response(status, &body, &url)?;
Ok(serde_json::from_str(std::str::from_utf8(&body)?)?)
}
pub async fn pull(
&self,
image: &Reference,
auth: &RegistryAuth,
accepted_media_types: Vec<&str>,
) -> Result<ImageData> {
debug!("Pulling image: {:?}", image);
self.store_auth_if_needed(image.resolve_registry(), auth)
.await;
let (manifest, digest, config) = self._pull_manifest_and_config(image).await?;
self.validate_layers(&manifest, accepted_media_types)
.await?;
let layers = stream::iter(&manifest.layers)
.map(|layer| {
let this = &self;
async move {
let mut out: Vec<u8> = Vec::new();
debug!("Pulling image layer");
this.pull_blob(image, layer, &mut out).await?;
Ok::<_, OciDistributionError>(ImageLayer::new(
out,
layer.media_type.clone(),
layer.annotations.clone(),
))
}
})
.boxed() .buffer_unordered(self.config.max_concurrent_download)
.try_collect()
.await?;
Ok(ImageData {
layers,
manifest: Some(manifest),
config,
digest: Some(digest),
})
}
pub async fn push(
&self,
image_ref: &Reference,
layers: &[ImageLayer],
config: Config,
auth: &RegistryAuth,
manifest: Option<OciImageManifest>,
) -> Result<PushResponse> {
debug!("Pushing image: {:?}", image_ref);
self.store_auth_if_needed(image_ref.resolve_registry(), auth)
.await;
let manifest: OciImageManifest = match manifest {
Some(m) => m,
None => OciImageManifest::build(layers, &config, None),
};
stream::iter(layers)
.map(|layer| {
let this = &self;
async move {
let digest = layer.sha256_digest();
this.push_blob(image_ref, &layer.data, &digest).await?;
Result::Ok(())
}
})
.boxed() .buffer_unordered(self.config.max_concurrent_upload)
.try_for_each(future::ok)
.await?;
let config_url = self
.push_blob(image_ref, &config.data, &manifest.config.digest)
.await?;
let manifest_url = self.push_manifest(image_ref, &manifest.into()).await?;
Ok(PushResponse {
config_url,
manifest_url,
})
}
pub async fn push_blob(
&self,
image_ref: &Reference,
data: &[u8],
digest: &str,
) -> Result<String> {
if self.config.use_monolithic_push {
return self.push_blob_monolithically(image_ref, data, digest).await;
}
match self.push_blob_chunked(image_ref, data, digest).await {
Ok(url) => Ok(url),
Err(OciDistributionError::SpecViolationError(violation)) => {
warn!(?violation, "Registry is not respecting the OCI Distribution Specification when doing chunked push operations");
warn!("Attempting monolithic push");
self.push_blob_monolithically(image_ref, data, digest).await
}
Err(e) => Err(e),
}
}
async fn push_blob_monolithically(
&self,
image: &Reference,
blob_data: &[u8],
blob_digest: &str,
) -> Result<String> {
let location = self.begin_push_monolithical_session(image).await?;
self.push_monolithically(&location, image, blob_data, blob_digest)
.await
}
async fn push_blob_chunked(
&self,
image: &Reference,
blob_data: &[u8],
blob_digest: &str,
) -> Result<String> {
let mut location = self.begin_push_chunked_session(image).await?;
let mut start: usize = 0;
loop {
(location, start) = self.push_chunk(&location, image, blob_data, start).await?;
if start >= blob_data.len() {
break;
}
}
self.end_push_chunked_session(&location, image, blob_digest)
.await
}
pub async fn auth(
&self,
image: &Reference,
authentication: &RegistryAuth,
operation: RegistryOperation,
) -> Result<Option<String>> {
self.store_auth_if_needed(image.resolve_registry(), authentication)
.await;
match self._auth(image, authentication, operation).await {
Ok(Some(RegistryTokenType::Bearer(token))) => {
self.tokens
.insert(image, operation, RegistryTokenType::Bearer(token.clone()))
.await;
Ok(Some(token.token().to_string()))
}
Ok(Some(RegistryTokenType::Basic(username, password))) => {
self.tokens
.insert(
image,
operation,
RegistryTokenType::Basic(username, password),
)
.await;
Ok(None)
}
Ok(None) => Ok(None),
Err(e) => Err(e),
}
}
async fn _auth(
&self,
image: &Reference,
authentication: &RegistryAuth,
operation: RegistryOperation,
) -> Result<Option<RegistryTokenType>> {
debug!("Authorizing for image: {:?}", image);
let url = format!(
"{}://{}/v2/",
self.config.protocol.scheme_for(image.resolve_registry()),
image.resolve_registry()
);
debug!(?url);
let res = self.client.get(&url).send().await?;
let dist_hdr = match res.headers().get(reqwest::header::WWW_AUTHENTICATE) {
Some(h) => h,
None => return Ok(None),
};
let challenge = match BearerChallenge::try_from(dist_hdr) {
Ok(c) => c,
Err(e) => {
debug!(error = ?e, "Falling back to HTTP Basic Auth");
if let RegistryAuth::Basic(username, password) = authentication {
return Ok(Some(RegistryTokenType::Basic(
username.to_string(),
password.to_string(),
)));
}
return Ok(None);
}
};
let scope = match operation {
RegistryOperation::Pull => format!("repository:{}:pull", image.repository()),
RegistryOperation::Push => format!("repository:{}:pull,push", image.repository()),
};
let realm = challenge.realm.as_ref();
let service = challenge.service.as_ref();
let mut query = vec![("scope", &scope)];
if let Some(s) = service {
query.push(("service", s))
}
debug!(?realm, ?service, ?scope, "Making authentication call");
let auth_res = self
.client
.get(realm)
.query(&query)
.apply_authentication(authentication)
.send()
.await?;
match auth_res.status() {
reqwest::StatusCode::OK => {
let text = auth_res.text().await?;
debug!("Received response from auth request: {}", text);
let token: RegistryToken = serde_json::from_str(&text)
.map_err(|e| OciDistributionError::RegistryTokenDecodeError(e.to_string()))?;
debug!("Successfully authorized for image '{:?}'", image);
Ok(Some(RegistryTokenType::Bearer(token)))
}
_ => {
let reason = auth_res.text().await?;
debug!("Failed to authenticate for image '{:?}': {}", image, reason);
Err(OciDistributionError::AuthenticationFailure(reason))
}
}
}
pub async fn fetch_manifest_digest(
&self,
image: &Reference,
auth: &RegistryAuth,
) -> Result<String> {
self.store_auth_if_needed(image.resolve_registry(), auth)
.await;
let url = self.to_v2_manifest_url(image);
debug!("HEAD image manifest from {}", url);
let res = RequestBuilderWrapper::from_client(self, |client| client.head(&url))
.apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
.apply_auth(image, RegistryOperation::Pull)
.await?
.into_request_builder()
.send()
.await?;
if let Some(digest) = digest_header_value(res.headers().clone())? {
let status = res.status();
let body = res.bytes().await?;
validate_registry_response(status, &body, &url)?;
if let Some(img_digest) = image.digest() {
let header_digest = Digest::new(&digest)?;
let image_digest = Digest::new(img_digest)?;
if header_digest.algorithm == image_digest.algorithm
&& header_digest != image_digest
{
return Err(DigestError::VerificationError {
expected: img_digest.to_string(),
actual: digest,
}
.into());
}
}
Ok(digest)
} else {
debug!("GET image manifest from {}", url);
let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
.apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
.apply_auth(image, RegistryOperation::Pull)
.await?
.into_request_builder()
.send()
.await?;
let status = res.status();
trace!(headers = ?res.headers(), "Got Headers");
let headers = res.headers().clone();
let body = res.bytes().await?;
validate_registry_response(status, &body, &url)?;
validate_digest(&body, digest_header_value(headers)?, image.digest())
.map_err(OciDistributionError::from)
}
}
async fn validate_layers(
&self,
manifest: &OciImageManifest,
accepted_media_types: Vec<&str>,
) -> Result<()> {
if manifest.layers.is_empty() {
return Err(OciDistributionError::PullNoLayersError);
}
for layer in &manifest.layers {
if !accepted_media_types.iter().any(|i| i.eq(&layer.media_type)) {
return Err(OciDistributionError::IncompatibleLayerMediaTypeError(
layer.media_type.clone(),
));
}
}
Ok(())
}
pub async fn pull_image_manifest(
&self,
image: &Reference,
auth: &RegistryAuth,
) -> Result<(OciImageManifest, String)> {
self.store_auth_if_needed(image.resolve_registry(), auth)
.await;
self._pull_image_manifest(image).await
}
pub async fn pull_manifest_raw(
&self,
image: &Reference,
auth: &RegistryAuth,
accepted_media_types: &[&str],
) -> Result<(Vec<u8>, String)> {
self.store_auth_if_needed(image.resolve_registry(), auth)
.await;
self._pull_manifest_raw(image, accepted_media_types).await
}
pub async fn pull_manifest(
&self,
image: &Reference,
auth: &RegistryAuth,
) -> Result<(OciManifest, String)> {
self.store_auth_if_needed(image.resolve_registry(), auth)
.await;
self._pull_manifest(image).await
}
async fn _pull_image_manifest(&self, image: &Reference) -> Result<(OciImageManifest, String)> {
let (manifest, digest) = self._pull_manifest(image).await?;
match manifest {
OciManifest::Image(image_manifest) => Ok((image_manifest, digest)),
OciManifest::ImageIndex(image_index_manifest) => {
debug!("Inspecting Image Index Manifest");
let digest = if let Some(resolver) = &self.config.platform_resolver {
resolver(&image_index_manifest.manifests)
} else {
return Err(OciDistributionError::ImageIndexParsingNoPlatformResolverError);
};
match digest {
Some(digest) => {
debug!("Selected manifest entry with digest: {}", digest);
let manifest_entry_reference = image.clone_with_digest(digest.clone());
self._pull_manifest(&manifest_entry_reference)
.await
.and_then(|(manifest, _digest)| match manifest {
OciManifest::Image(manifest) => Ok((manifest, digest)),
OciManifest::ImageIndex(_) => {
Err(OciDistributionError::ImageManifestNotFoundError(
"received Image Index manifest instead".to_string(),
))
}
})
}
None => Err(OciDistributionError::ImageManifestNotFoundError(
"no entry found in image index manifest matching client's default platform"
.to_string(),
)),
}
}
}
}
async fn _pull_manifest_raw(
&self,
image: &Reference,
accepted_media_types: &[&str],
) -> Result<(Vec<u8>, String)> {
let url = self.to_v2_manifest_url(image);
debug!("Pulling image manifest from {}", url);
let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
.apply_accept(accepted_media_types)?
.apply_auth(image, RegistryOperation::Pull)
.await?
.into_request_builder()
.send()
.await?;
let status = res.status();
let headers = res.headers().clone();
let body = res.bytes().await?;
validate_registry_response(status, &body, &url)?;
let digest_header = digest_header_value(headers)?;
let digest = validate_digest(&body, digest_header, image.digest())?;
Ok((body.to_vec(), digest))
}
async fn _pull_manifest(&self, image: &Reference) -> Result<(OciManifest, String)> {
let (body, digest) = self
._pull_manifest_raw(image, MIME_TYPES_DISTRIBUTION_MANIFEST)
.await?;
self.validate_image_manifest(&body).await?;
debug!("Parsing response as Manifest");
let manifest = serde_json::from_slice(&body)
.map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
Ok((manifest, digest))
}
async fn validate_image_manifest(&self, body: &[u8]) -> Result<()> {
let versioned: Versioned = serde_json::from_slice(body)
.map_err(|e| OciDistributionError::VersionedParsingError(e.to_string()))?;
debug!(?versioned, "validating manifest");
if versioned.schema_version != 2 {
return Err(OciDistributionError::UnsupportedSchemaVersionError(
versioned.schema_version,
));
}
if let Some(media_type) = versioned.media_type {
if media_type != IMAGE_MANIFEST_MEDIA_TYPE
&& media_type != OCI_IMAGE_MEDIA_TYPE
&& media_type != IMAGE_MANIFEST_LIST_MEDIA_TYPE
&& media_type != OCI_IMAGE_INDEX_MEDIA_TYPE
{
return Err(OciDistributionError::UnsupportedMediaTypeError(media_type));
}
}
Ok(())
}
pub async fn pull_manifest_and_config(
&self,
image: &Reference,
auth: &RegistryAuth,
) -> Result<(OciImageManifest, String, String)> {
self.store_auth_if_needed(image.resolve_registry(), auth)
.await;
self._pull_manifest_and_config(image)
.await
.and_then(|(manifest, digest, config)| {
Ok((
manifest,
digest,
String::from_utf8(config.data).map_err(|e| {
OciDistributionError::GenericError(Some(format!(
"Cannot not UTF8 compliant: {}",
e
)))
})?,
))
})
}
async fn _pull_manifest_and_config(
&self,
image: &Reference,
) -> Result<(OciImageManifest, String, Config)> {
let (manifest, digest) = self._pull_image_manifest(image).await?;
let mut out: Vec<u8> = Vec::new();
debug!("Pulling config layer");
self.pull_blob(image, &manifest.config, &mut out).await?;
let media_type = manifest.config.media_type.clone();
let annotations = manifest.annotations.clone();
Ok((manifest, digest, Config::new(out, media_type, annotations)))
}
pub async fn push_manifest_list(
&self,
reference: &Reference,
auth: &RegistryAuth,
manifest: OciImageIndex,
) -> Result<String> {
self.store_auth_if_needed(reference.resolve_registry(), auth)
.await;
self.push_manifest(reference, &OciManifest::ImageIndex(manifest))
.await
}
pub async fn pull_blob<T: AsyncWrite + Unpin>(
&self,
image: &Reference,
layer: impl AsLayerDescriptor,
mut out: T,
) -> Result<()> {
let response = self.pull_blob_response(image, &layer, None, None).await?;
let mut maybe_header_digester = digest_header_value(response.headers().clone())?
.map(|digest| Digester::new(&digest).map(|d| (d, digest)))
.transpose()?;
let layer_digest = layer.as_layer_descriptor().digest.to_string();
let mut layer_digester = Digester::new(&layer_digest)?;
let mut stream = response.error_for_status()?.bytes_stream();
while let Some(bytes) = stream.next().await {
let bytes = bytes?;
if let Some((ref mut digester, _)) = maybe_header_digester.as_mut() {
digester.update(&bytes);
}
layer_digester.update(&bytes);
out.write_all(&bytes).await?;
}
if let Some((mut digester, expected)) = maybe_header_digester.take() {
let digest = digester.finalize();
if digest != expected {
return Err(DigestError::VerificationError {
expected,
actual: digest,
}
.into());
}
}
let digest = layer_digester.finalize();
if digest != layer_digest {
return Err(DigestError::VerificationError {
expected: layer_digest,
actual: digest,
}
.into());
}
Ok(())
}
pub async fn pull_blob_stream(
&self,
image: &Reference,
layer: impl AsLayerDescriptor,
) -> Result<SizedStream> {
stream_from_response(
self.pull_blob_response(image, &layer, None, None).await?,
layer,
true,
)
}
pub async fn pull_blob_stream_partial(
&self,
image: &Reference,
layer: impl AsLayerDescriptor,
offset: u64,
length: Option<u64>,
) -> Result<BlobResponse> {
let response = self
.pull_blob_response(image, &layer, Some(offset), length)
.await?;
let status = response.status();
match status {
StatusCode::OK => Ok(BlobResponse::Full(stream_from_response(
response, &layer, true,
)?)),
StatusCode::PARTIAL_CONTENT => Ok(BlobResponse::Partial(stream_from_response(
response, &layer, false,
)?)),
_ => Err(OciDistributionError::ServerError {
code: status.as_u16(),
url: response.url().to_string(),
message: response.text().await?,
}),
}
}
async fn pull_blob_response(
&self,
image: &Reference,
layer: impl AsLayerDescriptor,
offset: Option<u64>,
length: Option<u64>,
) -> Result<Response> {
let layer = layer.as_layer_descriptor();
let url = self.to_v2_blob_url(image, layer.digest);
let mut request = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
.apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
.apply_auth(image, RegistryOperation::Pull)
.await?
.into_request_builder();
if let (Some(off), Some(len)) = (offset, length) {
let end = (off + len).saturating_sub(1);
request = request.header(
RANGE,
HeaderValue::from_str(&format!("bytes={off}-{end}")).unwrap(),
);
} else if let Some(offset) = offset {
request = request.header(
RANGE,
HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
);
}
let mut response = request.send().await?;
if let Some(urls) = &layer.urls {
for url in urls {
if response.error_for_status_ref().is_ok() {
break;
}
let url = Url::parse(url)
.map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
if url.scheme() == "http" || url.scheme() == "https" {
request =
RequestBuilderWrapper::from_client(self, |client| client.get(url.clone()))
.apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
.into_request_builder();
if let Some(offset) = offset {
request = request.header(
RANGE,
HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
);
}
response = request.send().await?
}
}
}
Ok(response)
}
async fn begin_push_monolithical_session(&self, image: &Reference) -> Result<String> {
let url = &self.to_v2_blob_upload_url(image);
debug!(?url, "begin_push_monolithical_session");
let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
.apply_auth(image, RegistryOperation::Push)
.await?
.into_request_builder()
.header("Content-Length", 0)
.send()
.await?;
self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
.await
}
async fn begin_push_chunked_session(&self, image: &Reference) -> Result<String> {
let url = &self.to_v2_blob_upload_url(image);
debug!(?url, "begin_push_session");
let res = RequestBuilderWrapper::from_client(self, |client| client.post(url))
.apply_auth(image, RegistryOperation::Push)
.await?
.into_request_builder()
.header("Content-Length", 0)
.send()
.await?;
self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
.await
}
async fn end_push_chunked_session(
&self,
location: &str,
image: &Reference,
digest: &str,
) -> Result<String> {
let url = Url::parse_with_params(location, &[("digest", digest)])
.map_err(|e| OciDistributionError::GenericError(Some(e.to_string())))?;
let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
.apply_auth(image, RegistryOperation::Push)
.await?
.into_request_builder()
.header("Content-Length", 0)
.send()
.await?;
self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
.await
}
async fn push_monolithically(
&self,
location: &str,
image: &Reference,
layer: &[u8],
blob_digest: &str,
) -> Result<String> {
let mut url = Url::parse(location).unwrap();
url.query_pairs_mut().append_pair("digest", blob_digest);
let url = url.to_string();
debug!(size = layer.len(), location = ?url, "Pushing monolithically");
if layer.is_empty() {
return Err(OciDistributionError::PushNoDataError);
};
let mut headers = HeaderMap::new();
headers.insert(
"Content-Length",
format!("{}", layer.len()).parse().unwrap(),
);
headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
let res = RequestBuilderWrapper::from_client(self, |client| client.put(&url))
.apply_auth(image, RegistryOperation::Push)
.await?
.into_request_builder()
.headers(headers)
.body(layer.to_vec())
.send()
.await?;
self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
.await
}
async fn push_chunk(
&self,
location: &str,
image: &Reference,
blob_data: &[u8],
start_byte: usize,
) -> Result<(String, usize)> {
if blob_data.is_empty() {
return Err(OciDistributionError::PushNoDataError);
};
let end_byte = if (start_byte + self.push_chunk_size) < blob_data.len() {
start_byte + self.push_chunk_size - 1
} else {
blob_data.len() - 1
};
let body = blob_data[start_byte..end_byte + 1].to_vec();
let mut headers = HeaderMap::new();
headers.insert(
"Content-Range",
format!("{}-{}", start_byte, end_byte).parse().unwrap(),
);
headers.insert("Content-Length", format!("{}", body.len()).parse().unwrap());
headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
debug!(
?start_byte,
?end_byte,
blob_data_len = blob_data.len(),
body_len = body.len(),
?location,
?headers,
"Pushing chunk"
);
let res = RequestBuilderWrapper::from_client(self, |client| client.patch(location))
.apply_auth(image, RegistryOperation::Push)
.await?
.into_request_builder()
.headers(headers)
.body(body)
.send()
.await?;
Ok((
self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)
.await?,
end_byte + 1,
))
}
pub async fn mount_blob(
&self,
image: &Reference,
source: &Reference,
digest: &str,
) -> Result<()> {
let base_url = self.to_v2_blob_upload_url(image);
let url = Url::parse_with_params(
&base_url,
&[("mount", digest), ("from", source.repository())],
)
.map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
let res = RequestBuilderWrapper::from_client(self, |client| client.post(url.clone()))
.apply_auth(image, RegistryOperation::Push)
.await?
.into_request_builder()
.send()
.await?;
self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
.await?;
Ok(())
}
pub async fn push_manifest(&self, image: &Reference, manifest: &OciManifest) -> Result<String> {
let mut headers = HeaderMap::new();
let content_type = manifest.content_type();
headers.insert("Content-Type", content_type.parse().unwrap());
let mut body = Vec::new();
let mut ser = serde_json::Serializer::with_formatter(&mut body, CanonicalFormatter::new());
manifest.serialize(&mut ser).unwrap();
self.push_manifest_raw(image, body, manifest.content_type().parse().unwrap())
.await
}
pub async fn push_manifest_raw(
&self,
image: &Reference,
body: Vec<u8>,
content_type: HeaderValue,
) -> Result<String> {
let url = self.to_v2_manifest_url(image);
debug!(?url, ?content_type, "push manifest");
let mut headers = HeaderMap::new();
headers.insert("Content-Type", content_type);
let manifest_hash = sha256_digest(&body);
let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
.apply_auth(image, RegistryOperation::Push)
.await?
.into_request_builder()
.headers(headers)
.body(body)
.send()
.await?;
let ret = self
.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
.await;
if matches!(ret, Err(OciDistributionError::RegistryNoLocationError)) {
warn!("Registry is not respecting the OCI Distribution Specification: it didn't return the Location of the uploaded Manifest inside of the response headers. Working around this issue...");
let url_base = url
.strip_suffix(image.tag().unwrap_or("latest"))
.expect("The manifest URL always ends with the image tag suffix");
let url_by_digest = format!("{}{}", url_base, manifest_hash);
return Ok(url_by_digest);
}
ret
}
pub async fn pull_referrers(
&self,
image: &Reference,
artifact_type: Option<&str>,
) -> Result<OciImageIndex> {
let url = self.to_v2_referrers_url(image, artifact_type)?;
debug!("Pulling referrers from {}", url);
let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
.apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
.apply_auth(image, RegistryOperation::Pull)
.await?
.into_request_builder()
.send()
.await?;
let status = res.status();
let body = res.bytes().await?;
validate_registry_response(status, &body, &url)?;
let manifest = serde_json::from_slice(&body)
.map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?;
Ok(manifest)
}
async fn extract_location_header(
&self,
image: &Reference,
res: reqwest::Response,
expected_status: &reqwest::StatusCode,
) -> Result<String> {
debug!(expected_status_code=?expected_status.as_u16(),
status_code=?res.status().as_u16(),
"extract location header");
if res.status().eq(expected_status) {
let location_header = res.headers().get("Location");
debug!(location=?location_header, "Location header");
match location_header {
None => Err(OciDistributionError::RegistryNoLocationError),
Some(lh) => self.location_header_to_url(image, lh),
}
} else if res.status().is_success() && expected_status.is_success() {
Err(OciDistributionError::SpecViolationError(format!(
"Expected HTTP Status {}, got {} instead",
expected_status,
res.status(),
)))
} else {
let url = res.url().to_string();
let code = res.status().as_u16();
let message = res.text().await?;
Err(OciDistributionError::ServerError { url, code, message })
}
}
fn location_header_to_url(
&self,
image: &Reference,
location_header: &reqwest::header::HeaderValue,
) -> Result<String> {
let lh = location_header.to_str()?;
if lh.starts_with("/") {
let registry = image.resolve_registry();
Ok(format!(
"{scheme}://{registry}{lh}",
scheme = self.config.protocol.scheme_for(registry)
))
} else {
Ok(lh.to_string())
}
}
fn to_v2_manifest_url(&self, reference: &Reference) -> String {
let registry = reference.resolve_registry();
format!(
"{scheme}://{registry}/v2/{repository}/manifests/{reference}{ns}",
scheme = self.config.protocol.scheme_for(registry),
repository = reference.repository(),
reference = if let Some(digest) = reference.digest() {
digest
} else {
reference.tag().unwrap_or("latest")
},
ns = reference
.namespace()
.map(|ns| format!("?ns={ns}"))
.unwrap_or_default(),
)
}
fn to_v2_blob_url(&self, reference: &Reference, digest: &str) -> String {
let registry = reference.resolve_registry();
format!(
"{scheme}://{registry}/v2/{repository}/blobs/{digest}{ns}",
scheme = self.config.protocol.scheme_for(registry),
repository = reference.repository(),
ns = reference
.namespace()
.map(|ns| format!("?ns={ns}"))
.unwrap_or_default(),
)
}
fn to_v2_blob_upload_url(&self, reference: &Reference) -> String {
self.to_v2_blob_url(reference, "uploads/")
}
fn to_list_tags_url(&self, reference: &Reference) -> String {
let registry = reference.resolve_registry();
format!(
"{scheme}://{registry}/v2/{repository}/tags/list{ns}",
scheme = self.config.protocol.scheme_for(registry),
repository = reference.repository(),
ns = reference
.namespace()
.map(|ns| format!("?ns={ns}"))
.unwrap_or_default(),
)
}
fn to_v2_referrers_url(
&self,
reference: &Reference,
artifact_type: Option<&str>,
) -> Result<String> {
let registry = reference.resolve_registry();
Ok(format!(
"{scheme}://{registry}/v2/{repository}/referrers/{reference}{at}",
scheme = self.config.protocol.scheme_for(registry),
repository = reference.repository(),
reference = if let Some(digest) = reference.digest() {
digest
} else {
return Err(OciDistributionError::GenericError(Some(
"Getting referrers for a tag is not supported".into(),
)));
},
at = artifact_type
.map(|at| format!("?artifactType={at}"))
.unwrap_or_default(),
))
}
}
fn validate_registry_response(status: reqwest::StatusCode, body: &[u8], url: &str) -> Result<()> {
match status {
reqwest::StatusCode::OK => Ok(()),
reqwest::StatusCode::UNAUTHORIZED => Err(OciDistributionError::UnauthorizedError {
url: url.to_string(),
}),
s if s.is_success() => Err(OciDistributionError::SpecViolationError(format!(
"Expected HTTP Status {}, got {} instead",
reqwest::StatusCode::OK,
status,
))),
s if s.is_client_error() => {
let text = std::str::from_utf8(body)?;
let envelope = serde_json::from_str::<OciEnvelope>(text)?;
Err(OciDistributionError::RegistryError {
envelope,
url: url.to_string(),
})
}
s => {
let text = std::str::from_utf8(body)?;
Err(OciDistributionError::ServerError {
code: s.as_u16(),
url: url.to_string(),
message: text.to_string(),
})
}
}
}
fn stream_from_response(
response: Response,
layer: impl AsLayerDescriptor,
verify: bool,
) -> Result<SizedStream> {
let content_length = response.content_length();
let headers = response.headers().clone();
let stream = response
.error_for_status()?
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
let expected_layer_digest = layer.as_layer_descriptor().digest.to_string();
let layer_digester = Digester::new(&expected_layer_digest)?;
let header_digester_and_digest = match digest_header_value(headers)? {
Some(digest) if digest == expected_layer_digest => None,
Some(digest) => Some((Digester::new(&digest)?, digest)),
None => None,
};
let header_digest = header_digester_and_digest
.as_ref()
.map(|(_, digest)| digest.to_owned());
let stream: BoxStream<'static, std::result::Result<bytes::Bytes, std::io::Error>> = if verify {
Box::pin(VerifyingStream::new(
Box::pin(stream),
layer_digester,
expected_layer_digest,
header_digester_and_digest,
))
} else {
Box::pin(stream)
};
Ok(SizedStream {
content_length,
digest_header_value: header_digest,
stream,
})
}
struct RequestBuilderWrapper<'a> {
client: &'a Client,
request_builder: RequestBuilder,
}
impl<'a> RequestBuilderWrapper<'a> {
fn from_client(
client: &'a Client,
f: impl Fn(&reqwest::Client) -> RequestBuilder,
) -> RequestBuilderWrapper {
let request_builder = f(&client.client);
RequestBuilderWrapper {
client,
request_builder,
}
}
fn into_request_builder(self) -> RequestBuilder {
self.request_builder
}
}
impl<'a> RequestBuilderWrapper<'a> {
fn apply_accept(&self, accept: &[&str]) -> Result<RequestBuilderWrapper> {
let request_builder = self
.request_builder
.try_clone()
.ok_or_else(|| {
OciDistributionError::GenericError(Some(
"could not clone request builder".to_string(),
))
})?
.header("Accept", Vec::from(accept).join(", "));
Ok(RequestBuilderWrapper {
client: self.client,
request_builder,
})
}
async fn apply_auth(
&self,
image: &Reference,
op: RegistryOperation,
) -> Result<RequestBuilderWrapper> {
let mut headers = HeaderMap::new();
if let Some(token) = self.client.get_auth_token(image, op).await {
match token {
RegistryTokenType::Bearer(token) => {
debug!("Using bearer token authentication.");
headers.insert("Authorization", token.bearer_token().parse().unwrap());
}
RegistryTokenType::Basic(username, password) => {
debug!("Using HTTP basic authentication.");
return Ok(RequestBuilderWrapper {
client: self.client,
request_builder: self
.request_builder
.try_clone()
.ok_or_else(|| {
OciDistributionError::GenericError(Some(
"could not clone request builder".to_string(),
))
})?
.headers(headers)
.basic_auth(username.to_string(), Some(password.to_string())),
});
}
}
}
Ok(RequestBuilderWrapper {
client: self.client,
request_builder: self
.request_builder
.try_clone()
.ok_or_else(|| {
OciDistributionError::GenericError(Some(
"could not clone request builder".to_string(),
))
})?
.headers(headers),
})
}
}
#[derive(Debug, Clone)]
pub enum CertificateEncoding {
#[allow(missing_docs)]
Der,
#[allow(missing_docs)]
Pem,
}
#[derive(Debug, Clone)]
pub struct Certificate {
pub encoding: CertificateEncoding,
pub data: Vec<u8>,
}
pub struct ClientConfig {
pub protocol: ClientProtocol,
#[cfg(feature = "native-tls")]
pub accept_invalid_hostnames: bool,
pub accept_invalid_certificates: bool,
pub use_monolithic_push: bool,
pub extra_root_certificates: Vec<Certificate>,
pub platform_resolver: Option<Box<PlatformResolverFn>>,
pub max_concurrent_upload: usize,
pub max_concurrent_download: usize,
pub default_token_expiration_secs: usize,
pub read_timeout: Option<Duration>,
pub connect_timeout: Option<Duration>,
pub user_agent: &'static str,
pub https_proxy: Option<String>,
pub no_proxy: Option<String>,
}
impl Default for ClientConfig {
fn default() -> Self {
Self {
protocol: ClientProtocol::default(),
#[cfg(feature = "native-tls")]
accept_invalid_hostnames: false,
accept_invalid_certificates: false,
use_monolithic_push: false,
extra_root_certificates: Vec::new(),
platform_resolver: Some(Box::new(current_platform_resolver)),
max_concurrent_upload: DEFAULT_MAX_CONCURRENT_UPLOAD,
max_concurrent_download: DEFAULT_MAX_CONCURRENT_DOWNLOAD,
default_token_expiration_secs: DEFAULT_TOKEN_EXPIRATION_SECS,
read_timeout: None,
connect_timeout: None,
user_agent: DEFAULT_USER_AGENT,
https_proxy: None,
no_proxy: None,
}
}
}
type PlatformResolverFn = dyn Fn(&[ImageIndexEntry]) -> Option<String> + Send + Sync;
pub fn linux_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
manifests
.iter()
.find(|entry| {
entry.platform.as_ref().map_or(false, |platform| {
platform.os == "linux" && platform.architecture == "amd64"
})
})
.map(|entry| entry.digest.clone())
}
pub fn windows_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
manifests
.iter()
.find(|entry| {
entry.platform.as_ref().map_or(false, |platform| {
platform.os == "windows" && platform.architecture == "amd64"
})
})
.map(|entry| entry.digest.clone())
}
const MACOS: &str = "macos";
const DARWIN: &str = "darwin";
fn go_os() -> &'static str {
match std::env::consts::OS {
MACOS => DARWIN,
other => other,
}
}
const X86_64: &str = "x86_64";
const AMD64: &str = "amd64";
const X86: &str = "x86";
const AMD: &str = "amd";
const ARM64: &str = "arm64";
const AARCH64: &str = "aarch64";
const POWERPC64: &str = "powerpc64";
const PPC64LE: &str = "ppc64le";
fn go_arch() -> &'static str {
match std::env::consts::ARCH {
X86_64 => AMD64,
X86 => AMD,
AARCH64 => ARM64,
POWERPC64 => PPC64LE,
other => other,
}
}
pub fn current_platform_resolver(manifests: &[ImageIndexEntry]) -> Option<String> {
manifests
.iter()
.find(|entry| {
entry.platform.as_ref().map_or(false, |platform| {
platform.os == go_os() && platform.architecture == go_arch()
})
})
.map(|entry| entry.digest.clone())
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum ClientProtocol {
#[allow(missing_docs)]
Http,
#[allow(missing_docs)]
#[default]
Https,
#[allow(missing_docs)]
HttpsExcept(Vec<String>),
}
impl ClientProtocol {
fn scheme_for(&self, registry: &str) -> &str {
match self {
ClientProtocol::Https => "https",
ClientProtocol::Http => "http",
ClientProtocol::HttpsExcept(exceptions) => {
if exceptions.contains(®istry.to_owned()) {
"http"
} else {
"https"
}
}
}
}
}
#[derive(Clone, Debug)]
struct BearerChallenge {
pub realm: Box<str>,
pub service: Option<String>,
}
impl TryFrom<&HeaderValue> for BearerChallenge {
type Error = String;
fn try_from(value: &HeaderValue) -> std::result::Result<Self, Self::Error> {
let parser = ChallengeParser::new(
value
.to_str()
.map_err(|e| format!("cannot convert header value to string: {:?}", e))?,
);
parser
.filter_map(|parser_res| {
if let Ok(chalenge_ref) = parser_res {
let bearer_challenge = BearerChallenge::try_from(&chalenge_ref);
bearer_challenge.ok()
} else {
None
}
})
.next()
.ok_or_else(|| "Cannot find Bearer challenge".to_string())
}
}
impl TryFrom<&ChallengeRef<'_>> for BearerChallenge {
type Error = String;
fn try_from(value: &ChallengeRef<'_>) -> std::result::Result<Self, Self::Error> {
if !value.scheme.eq_ignore_ascii_case("Bearer") {
return Err(format!(
"BearerChallenge doesn't support challenge scheme {:?}",
value.scheme
));
}
let mut realm = None;
let mut service = None;
for (k, v) in &value.params {
if k.eq_ignore_ascii_case("realm") {
realm = Some(v.to_unescaped());
}
if k.eq_ignore_ascii_case("service") {
service = Some(v.to_unescaped());
}
}
let realm = realm.ok_or("missing required parameter realm")?;
Ok(BearerChallenge {
realm: realm.into_boxed_str(),
service,
})
}
}
#[cfg(test)]
mod test {
use super::*;
use std::convert::TryFrom;
use std::fs;
use std::path;
use std::result::Result;
use rstest::rstest;
use sha2::Digest as _;
use tempfile::TempDir;
use tokio::io::AsyncReadExt;
use tokio_util::io::StreamReader;
use crate::manifest::{self, IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE};
#[cfg(feature = "test-registry")]
use testcontainers::{
core::{Mount, WaitFor},
runners::AsyncRunner,
ContainerRequest, GenericImage, ImageExt,
};
const HELLO_IMAGE_NO_TAG: &str = "webassembly.azurecr.io/hello-wasm";
const HELLO_IMAGE_TAG: &str = "webassembly.azurecr.io/hello-wasm:v1";
const HELLO_IMAGE_DIGEST: &str = "webassembly.azurecr.io/hello-wasm@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
const HELLO_IMAGE_TAG_AND_DIGEST: &str = "webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7";
const TEST_IMAGES: &[&str] = &[
HELLO_IMAGE_TAG,
HELLO_IMAGE_DIGEST,
HELLO_IMAGE_TAG_AND_DIGEST,
];
const GHCR_IO_IMAGE: &str = "ghcr.io/krustlet/oci-distribution/hello-wasm:v1";
const DOCKER_IO_IMAGE: &str = "docker.io/library/hello-world@sha256:37a0b92b08d4919615c3ee023f7ddb068d12b8387475d64c622ac30f45c29c51";
const HTPASSWD: &str = "testuser:$2y$05$8/q2bfRcX74EuxGf0qOcSuhWDQJXrgWiy6Fi73/JM2tKC66qSrLve";
const HTPASSWD_USERNAME: &str = "testuser";
const HTPASSWD_PASSWORD: &str = "testpassword";
#[test]
fn test_apply_accept() -> anyhow::Result<()> {
assert_eq!(
RequestBuilderWrapper::from_client(&Client::default(), |client| client
.get("https://example.com/some/module.wasm"))
.apply_accept(&["*/*"])?
.into_request_builder()
.build()?
.headers()["Accept"],
"*/*"
);
assert_eq!(
RequestBuilderWrapper::from_client(&Client::default(), |client| client
.get("https://example.com/some/module.wasm"))
.apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
.into_request_builder()
.build()?
.headers()["Accept"],
MIME_TYPES_DISTRIBUTION_MANIFEST.join(", ")
);
Ok(())
}
#[tokio::test]
async fn test_apply_auth_no_token() -> anyhow::Result<()> {
assert!(
!RequestBuilderWrapper::from_client(&Client::default(), |client| client
.get("https://example.com/some/module.wasm"))
.apply_auth(
&Reference::try_from(HELLO_IMAGE_TAG)?,
RegistryOperation::Pull
)
.await?
.into_request_builder()
.build()?
.headers()
.contains_key("Authorization")
);
Ok(())
}
#[tokio::test]
async fn test_apply_auth_bearer_token() -> anyhow::Result<()> {
use hmac::{Hmac, Mac};
use jwt::SignWithKey;
use sha2::Sha256;
let client = Client::default();
let header = jwt::header::Header {
algorithm: jwt::algorithm::AlgorithmType::Hs256,
key_id: None,
type_: None,
content_type: None,
};
let claims: jwt::claims::Claims = Default::default();
let key: Hmac<Sha256> = Hmac::new_from_slice(b"some-secret").unwrap();
let token = jwt::Token::new(header, claims)
.sign_with_key(&key)?
.as_str()
.to_string();
client
.store_auth(
Reference::try_from(HELLO_IMAGE_TAG)?.resolve_registry(),
RegistryAuth::Anonymous,
)
.await;
client
.tokens
.insert(
&Reference::try_from(HELLO_IMAGE_TAG)?,
RegistryOperation::Pull,
RegistryTokenType::Bearer(RegistryToken::Token {
token: token.clone(),
}),
)
.await;
assert_eq!(
RequestBuilderWrapper::from_client(&client, |client| client
.get("https://example.com/some/module.wasm"))
.apply_auth(
&Reference::try_from(HELLO_IMAGE_TAG)?,
RegistryOperation::Pull
)
.await?
.into_request_builder()
.build()?
.headers()["Authorization"],
format!("Bearer {}", &token)
);
Ok(())
}
#[test]
fn test_to_v2_blob_url() {
let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
let c = Client::default();
assert_eq!(
c.to_v2_blob_url(&image, "sha256:deadbeef"),
"https://webassembly.azurecr.io/v2/hello-wasm/blobs/sha256:deadbeef"
);
image.set_mirror_registry("docker.mirror.io".to_owned());
assert_eq!(
c.to_v2_blob_url(&image, "sha256:deadbeef"),
"https://docker.mirror.io/v2/hello-wasm/blobs/sha256:deadbeef?ns=webassembly.azurecr.io"
);
}
#[rstest(image, expected_uri, expected_mirror_uri,
case(HELLO_IMAGE_NO_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/latest", "https://docker.mirror.io/v2/hello-wasm/manifests/latest?ns=webassembly.azurecr.io"), case(HELLO_IMAGE_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/v1", "https://docker.mirror.io/v2/hello-wasm/manifests/v1?ns=webassembly.azurecr.io"),
case(HELLO_IMAGE_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
case(HELLO_IMAGE_TAG_AND_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"),
)]
fn test_to_v2_manifest(image: &str, expected_uri: &str, expected_mirror_uri: &str) {
let mut reference = Reference::try_from(image).expect("failed to parse reference");
let c = Client::default();
assert_eq!(c.to_v2_manifest_url(&reference), expected_uri);
reference.set_mirror_registry("docker.mirror.io".to_owned());
assert_eq!(c.to_v2_manifest_url(&reference), expected_mirror_uri);
}
#[test]
fn test_to_v2_blob_upload_url() {
let image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
let blob_url = Client::default().to_v2_blob_upload_url(&image);
assert_eq!(
blob_url,
"https://webassembly.azurecr.io/v2/hello-wasm/blobs/uploads/"
)
}
#[test]
fn test_to_list_tags_url() {
let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference");
let c = Client::default();
assert_eq!(
c.to_list_tags_url(&image),
"https://webassembly.azurecr.io/v2/hello-wasm/tags/list"
);
image.set_mirror_registry("docker.mirror.io".to_owned());
assert_eq!(
c.to_list_tags_url(&image),
"https://docker.mirror.io/v2/hello-wasm/tags/list?ns=webassembly.azurecr.io"
);
}
#[test]
fn manifest_url_generation_respects_http_protocol() {
let c = Client::new(ClientConfig {
protocol: ClientProtocol::Http,
..Default::default()
});
let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
.expect("Could not parse reference");
assert_eq!(
"http://webassembly.azurecr.io/v2/hello/manifests/v1",
c.to_v2_manifest_url(&reference)
);
}
#[test]
fn blob_url_generation_respects_http_protocol() {
let c = Client::new(ClientConfig {
protocol: ClientProtocol::Http,
..Default::default()
});
let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
.expect("Could not parse reference");
assert_eq!(
"http://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
c.to_v2_blob_url(&reference, reference.digest().unwrap())
);
}
#[test]
fn manifest_url_generation_uses_https_if_not_on_exception_list() {
let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
let protocol = ClientProtocol::HttpsExcept(insecure_registries);
let c = Client::new(ClientConfig {
protocol,
..Default::default()
});
let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned())
.expect("Could not parse reference");
assert_eq!(
"https://webassembly.azurecr.io/v2/hello/manifests/v1",
c.to_v2_manifest_url(&reference)
);
}
#[test]
fn manifest_url_generation_uses_http_if_on_exception_list() {
let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
let protocol = ClientProtocol::HttpsExcept(insecure_registries);
let c = Client::new(ClientConfig {
protocol,
..Default::default()
});
let reference = Reference::try_from("oci.registry.local/hello:v1".to_owned())
.expect("Could not parse reference");
assert_eq!(
"http://oci.registry.local/v2/hello/manifests/v1",
c.to_v2_manifest_url(&reference)
);
}
#[test]
fn blob_url_generation_uses_https_if_not_on_exception_list() {
let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
let protocol = ClientProtocol::HttpsExcept(insecure_registries);
let c = Client::new(ClientConfig {
protocol,
..Default::default()
});
let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
.expect("Could not parse reference");
assert_eq!(
"https://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
c.to_v2_blob_url(&reference, reference.digest().unwrap())
);
}
#[test]
fn blob_url_generation_uses_http_if_on_exception_list() {
let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()];
let protocol = ClientProtocol::HttpsExcept(insecure_registries);
let c = Client::new(ClientConfig {
protocol,
..Default::default()
});
let reference = Reference::try_from("oci.registry.local/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned())
.expect("Could not parse reference");
assert_eq!(
"http://oci.registry.local/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
c.to_v2_blob_url(&reference, reference.digest().unwrap())
);
}
#[test]
fn can_generate_valid_digest() {
let bytes = b"hellobytes";
let hash = sha256_digest(bytes);
let combination = vec![b"hello".to_vec(), b"bytes".to_vec()];
let combination_hash =
sha256_digest(&combination.into_iter().flatten().collect::<Vec<u8>>());
assert_eq!(
hash,
"sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
);
assert_eq!(
combination_hash,
"sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99"
);
}
#[test]
fn test_registry_token_deserialize() {
let text = r#"{"token": "abc"}"#;
let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
assert!(res.is_ok());
let rt = res.unwrap();
assert_eq!(rt.token(), "abc");
let text = r#"{"access_token": "xyz"}"#;
let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
assert!(res.is_ok());
let rt = res.unwrap();
assert_eq!(rt.token(), "xyz");
let text = r#"{"access_token": "xyz", "token": "abc"}"#;
let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
assert!(res.is_ok());
let rt = res.unwrap();
assert_eq!(rt.token(), "abc");
let text = r#"{"token": "abc", "access_token": "xyz"}"#;
let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
assert!(res.is_ok());
let rt = res.unwrap();
assert_eq!(rt.token(), "abc");
let text = r#"{"aaa": 300, "access_token": "xyz", "token": "abc", "zzz": 600}"#;
let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
assert!(res.is_ok());
let text = r#"{"access_token": 300, "token": "abc"}"#;
let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
assert!(res.is_ok());
let rt = res.unwrap();
assert_eq!(rt.token(), "abc");
let text = r#"{"access_token": "xyz", "token": 300}"#;
let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
assert!(res.is_ok());
let rt = res.unwrap();
assert_eq!(rt.token(), "xyz");
let text = r#"{"token": 300}"#;
let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
assert!(res.is_err());
let text = r#"{"access_token": 300}"#;
let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
assert!(res.is_err());
let text = r#"{"token": {"some": "thing"}}"#;
let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
assert!(res.is_err());
let text = r#"{"access_token": {"some": "thing"}}"#;
let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
assert!(res.is_err());
let text = r#"{"some": "thing"}"#;
let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
assert!(res.is_err());
let text = r#"{"token": "abc""#;
let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
assert!(res.is_err());
let text = r#"_ _ _ kjbwef??98{9898 }} }}"#;
let res: Result<RegistryToken, serde_json::Error> = serde_json::from_str(text);
assert!(res.is_err());
}
fn check_auth_token(token: &str) {
assert!(token.len() > 64);
}
#[tokio::test]
async fn test_auth() {
for &image in TEST_IMAGES {
let reference = Reference::try_from(image).expect("failed to parse reference");
let c = Client::default();
let token = c
.auth(
&reference,
&RegistryAuth::Anonymous,
RegistryOperation::Pull,
)
.await
.expect("result from auth request");
assert!(token.is_some());
check_auth_token(token.unwrap().as_ref());
let tok = c
.tokens
.get(&reference, RegistryOperation::Pull)
.await
.expect("token is available");
if let RegistryTokenType::Bearer(tok) = tok {
check_auth_token(tok.token());
} else {
panic!("Unexpeted Basic Auth Token");
}
}
}
#[cfg(feature = "test-registry")]
#[tokio::test]
async fn test_list_tags() {
let test_container = registry_image_edge()
.start()
.await
.expect("Failed to start registry container");
let port = test_container
.get_host_port_ipv4(5000)
.await
.expect("Failed to get port");
let auth =
RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
let client = Client::new(ClientConfig {
protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
..Default::default()
});
let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
client
.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
.await
.expect("cannot authenticate against registry for pull operation");
let (manifest, _digest) = client
._pull_image_manifest(&image)
.await
.expect("failed to pull manifest");
let image_data = client
.pull(&image, &auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
.await
.expect("failed to pull image");
for i in 0..=3 {
let push_image: Reference = format!("localhost:{}/hello-wasm:1.0.{}", port, i)
.parse()
.unwrap();
client
.auth(&push_image, &auth, RegistryOperation::Push)
.await
.expect("authenticated");
client
.push(
&push_image,
&image_data.layers,
image_data.config.clone(),
&auth,
Some(manifest.clone()),
)
.await
.expect("Failed to push Image");
}
let image: Reference = format!("localhost:{}/hello-wasm:1.0.1", port)
.parse()
.unwrap();
let response = client
.list_tags(&image, &RegistryAuth::Anonymous, Some(2), Some("1.0.1"))
.await
.expect("Cannot list Tags");
assert_eq!(response.tags, vec!["1.0.2", "1.0.3"])
}
#[tokio::test]
async fn test_pull_manifest_private() {
for &image in TEST_IMAGES {
let reference = Reference::try_from(image).expect("failed to parse reference");
let c = Client::default();
c._pull_image_manifest(&reference)
.await
.expect_err("pull manifest should fail");
let c = Client::default();
c.auth(
&reference,
&RegistryAuth::Anonymous,
RegistryOperation::Pull,
)
.await
.expect("authenticated");
let (manifest, _) = c
._pull_image_manifest(&reference)
.await
.expect("pull manifest should not fail");
assert_eq!(manifest.schema_version, 2);
assert!(!manifest.layers.is_empty());
}
}
#[tokio::test]
async fn test_pull_manifest_public() {
for &image in TEST_IMAGES {
let reference = Reference::try_from(image).expect("failed to parse reference");
let c = Client::default();
let (manifest, _) = c
.pull_image_manifest(&reference, &RegistryAuth::Anonymous)
.await
.expect("pull manifest should not fail");
assert_eq!(manifest.schema_version, 2);
assert!(!manifest.layers.is_empty());
}
}
#[tokio::test]
async fn pull_manifest_and_config_public() {
for &image in TEST_IMAGES {
let reference = Reference::try_from(image).expect("failed to parse reference");
let c = Client::default();
let (manifest, _, config) = c
.pull_manifest_and_config(&reference, &RegistryAuth::Anonymous)
.await
.expect("pull manifest and config should not fail");
assert_eq!(manifest.schema_version, 2);
assert!(!manifest.layers.is_empty());
assert!(!config.is_empty());
}
}
#[tokio::test]
async fn test_fetch_digest() {
let c = Client::default();
for &image in TEST_IMAGES {
let reference = Reference::try_from(image).expect("failed to parse reference");
c.fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
.await
.expect("pull manifest should not fail");
let reference = Reference::try_from(image).expect("failed to parse reference");
let c = Client::default();
c.auth(
&reference,
&RegistryAuth::Anonymous,
RegistryOperation::Pull,
)
.await
.expect("authenticated");
let digest = c
.fetch_manifest_digest(&reference, &RegistryAuth::Anonymous)
.await
.expect("pull manifest should not fail");
assert_eq!(
digest,
"sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7"
);
}
}
#[tokio::test]
async fn test_pull_blob() {
let c = Client::default();
for &image in TEST_IMAGES {
let reference = Reference::try_from(image).expect("failed to parse reference");
c.auth(
&reference,
&RegistryAuth::Anonymous,
RegistryOperation::Pull,
)
.await
.expect("authenticated");
let (manifest, _) = c
._pull_image_manifest(&reference)
.await
.expect("failed to pull manifest");
let mut file: Vec<u8> = Vec::new();
let layer0 = &manifest.layers[0];
let mut last_error = None;
for i in 1..6 {
if let Err(e) = c.pull_blob(&reference, layer0, &mut file).await {
println!(
"Got error on pull_blob call attempt {}. Will retry in 1s: {:?}",
i, e
);
last_error.replace(e);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
} else {
last_error = None;
break;
}
}
if let Some(e) = last_error {
panic!("Unable to pull layer: {:?}", e);
}
assert_eq!(file.len(), layer0.size as usize);
}
}
#[tokio::test]
async fn test_pull_blob_stream() {
let c = Client::default();
for &image in TEST_IMAGES {
let reference = Reference::try_from(image).expect("failed to parse reference");
c.auth(
&reference,
&RegistryAuth::Anonymous,
RegistryOperation::Pull,
)
.await
.expect("authenticated");
let (manifest, _) = c
._pull_image_manifest(&reference)
.await
.expect("failed to pull manifest");
let mut file: Vec<u8> = Vec::new();
let layer0 = &manifest.layers[0];
let layer_stream = c
.pull_blob_stream(&reference, layer0)
.await
.expect("failed to pull blob stream");
assert_eq!(layer_stream.content_length, Some(layer0.size as u64));
AsyncReadExt::read_to_end(&mut StreamReader::new(layer_stream.stream), &mut file)
.await
.unwrap();
assert_eq!(file.len(), layer0.size as usize);
}
}
#[tokio::test]
async fn test_pull_blob_stream_partial() {
let c = Client::default();
for &image in TEST_IMAGES {
let reference = Reference::try_from(image).expect("failed to parse reference");
c.auth(
&reference,
&RegistryAuth::Anonymous,
RegistryOperation::Pull,
)
.await
.expect("authenticated");
let (manifest, _) = c
._pull_image_manifest(&reference)
.await
.expect("failed to pull manifest");
let mut partial_file: Vec<u8> = Vec::new();
let layer0 = &manifest.layers[0];
let (offset, length) = (10, 6);
let partial_response = c
.pull_blob_stream_partial(&reference, layer0, offset, Some(length))
.await
.expect("failed to pull blob stream");
let full_response = c
.pull_blob_stream_partial(&reference, layer0, 0, Some(layer0.size as u64))
.await
.expect("failed to pull blob stream");
let layer_stream_partial = match partial_response {
BlobResponse::Full(_stream) => panic!("expected partial response"),
BlobResponse::Partial(stream) => stream,
};
assert_eq!(layer_stream_partial.content_length, Some(length));
AsyncReadExt::read_to_end(
&mut StreamReader::new(layer_stream_partial.stream),
&mut partial_file,
)
.await
.unwrap();
let mut full_file: Vec<u8> = Vec::new();
let layer_stream_full = match full_response {
BlobResponse::Full(_stream) => panic!("expected partial response"),
BlobResponse::Partial(stream) => stream,
};
assert_eq!(layer_stream_full.content_length, Some(layer0.size as u64));
AsyncReadExt::read_to_end(
&mut StreamReader::new(layer_stream_full.stream),
&mut full_file,
)
.await
.unwrap();
assert_eq!(partial_file.len(), length as usize);
assert_eq!(full_file.len(), layer0.size as usize);
let end: usize = (offset + length) as usize;
assert_eq!(partial_file, full_file[offset as usize..end]);
}
}
#[tokio::test]
async fn test_pull() {
for &image in TEST_IMAGES {
let reference = Reference::try_from(image).expect("failed to parse reference");
let mut last_error = None;
let mut image_data = None;
for i in 1..6 {
match Client::default()
.pull(
&reference,
&RegistryAuth::Anonymous,
vec![manifest::WASM_LAYER_MEDIA_TYPE],
)
.await
{
Ok(data) => {
image_data = Some(data);
last_error = None;
break;
}
Err(e) => {
println!(
"Got error on pull call attempt {}. Will retry in 1s: {:?}",
i, e
);
last_error.replace(e);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
}
if let Some(e) = last_error {
panic!("Unable to pull layer: {:?}", e);
}
assert!(image_data.is_some());
let image_data = image_data.unwrap();
assert!(!image_data.layers.is_empty());
assert!(image_data.digest.is_some());
}
}
#[tokio::test]
async fn test_pull_without_layer_validation() {
for &image in TEST_IMAGES {
let reference = Reference::try_from(image).expect("failed to parse reference");
assert!(Client::default()
.pull(&reference, &RegistryAuth::Anonymous, vec![],)
.await
.is_err());
}
}
#[tokio::test]
async fn test_pull_wrong_layer_validation() {
for &image in TEST_IMAGES {
let reference = Reference::try_from(image).expect("failed to parse reference");
assert!(Client::default()
.pull(&reference, &RegistryAuth::Anonymous, vec!["text/plain"],)
.await
.is_err());
}
}
#[cfg(feature = "test-registry")]
fn registry_image_edge() -> GenericImage {
GenericImage::new("distribution/distribution", "edge")
.with_wait_for(WaitFor::message_on_stderr("listening on "))
}
#[cfg(feature = "test-registry")]
fn registry_image() -> GenericImage {
GenericImage::new("docker.io/library/registry", "2")
.with_wait_for(WaitFor::message_on_stderr("listening on "))
}
#[cfg(feature = "test-registry")]
fn registry_image_basic_auth(auth_path: &str) -> ContainerRequest<GenericImage> {
GenericImage::new("docker.io/library/registry", "2")
.with_wait_for(WaitFor::message_on_stderr("listening on "))
.with_env_var("REGISTRY_AUTH", "htpasswd")
.with_env_var("REGISTRY_AUTH_HTPASSWD_REALM", "Registry Realm")
.with_env_var("REGISTRY_AUTH_HTPASSWD_PATH", "/auth/htpasswd")
.with_mount(Mount::bind_mount(auth_path, "/auth"))
}
#[tokio::test]
#[cfg(feature = "test-registry")]
async fn can_push_chunk() {
let test_container = registry_image()
.start()
.await
.expect("Failed to start registry container");
let port = test_container
.get_host_port_ipv4(5000)
.await
.expect("Failed to get port");
let c = Client::new(ClientConfig {
protocol: ClientProtocol::Http,
..Default::default()
});
let url = format!("localhost:{}/hello-wasm:v1", port);
let image: Reference = url.parse().unwrap();
c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
.await
.expect("result from auth request");
let location = c
.begin_push_chunked_session(&image)
.await
.expect("failed to begin push session");
let image_data: Vec<Vec<u8>> = vec![b"iamawebassemblymodule".to_vec()];
let (next_location, next_byte) = c
.push_chunk(&location, &image, &image_data[0], 0)
.await
.expect("failed to push layer");
assert!(next_location.len() >= url.len() + "6987887f-0196-45ee-91a1-2dfad901bea0".len());
assert_eq!(
next_byte,
"iamawebassemblymodule".to_string().into_bytes().len()
);
let layer_location = c
.end_push_chunked_session(&next_location, &image, &sha256_digest(&image_data[0]))
.await
.expect("failed to end push session");
assert_eq!(layer_location, format!("http://localhost:{}/v2/hello-wasm/blobs/sha256:6165c4ad43c0803798b6f2e49d6348c915d52c999a5f890846cee77ea65d230b", port));
}
#[tokio::test]
#[cfg(feature = "test-registry")]
async fn can_push_multiple_chunks() {
let test_container = registry_image()
.start()
.await
.expect("Failed to start registry container");
let port = test_container
.get_host_port_ipv4(5000)
.await
.expect("Failed to get port");
let mut c = Client::new(ClientConfig {
protocol: ClientProtocol::Http,
..Default::default()
});
c.push_chunk_size = 3;
let url = format!("localhost:{}/hello-wasm:v1", port);
let image: Reference = url.parse().unwrap();
c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
.await
.expect("result from auth request");
let image_data: Vec<u8> =
b"i am a big webassembly mode that needs chunked uploads".to_vec();
let image_digest = sha256_digest(&image_data);
let location = c
.push_blob_chunked(&image, &image_data, &image_digest)
.await
.expect("failed to begin push session");
assert_eq!(
location,
format!(
"http://localhost:{}/v2/hello-wasm/blobs/{}",
port, image_digest
)
);
}
#[tokio::test]
#[cfg(feature = "test-registry")]
async fn test_image_roundtrip_anon_auth() {
let test_container = registry_image()
.start()
.await
.expect("Failed to start registry container");
test_image_roundtrip(&RegistryAuth::Anonymous, &test_container).await;
}
#[tokio::test]
#[cfg(feature = "test-registry")]
async fn test_image_roundtrip_basic_auth() {
let auth_dir = TempDir::new().expect("cannot create tmp directory");
let htpasswd_path = path::Path::join(auth_dir.path(), "htpasswd");
fs::write(htpasswd_path, HTPASSWD).expect("cannot write htpasswd file");
let image = registry_image_basic_auth(
auth_dir
.path()
.to_str()
.expect("cannot convert htpasswd_path to string"),
);
let test_container = image.start().await.expect("cannot registry container");
let auth =
RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string());
test_image_roundtrip(&auth, &test_container).await;
}
#[cfg(feature = "test-registry")]
async fn test_image_roundtrip(
registry_auth: &RegistryAuth,
test_container: &testcontainers::ContainerAsync<GenericImage>,
) {
let _ = tracing_subscriber::fmt::try_init();
let port = test_container
.get_host_port_ipv4(5000)
.await
.expect("Failed to get port");
let c = Client::new(ClientConfig {
protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
..Default::default()
});
let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
.await
.expect("cannot authenticate against registry for pull operation");
let (manifest, _digest) = c
._pull_image_manifest(&image)
.await
.expect("failed to pull manifest");
let image_data = c
.pull(&image, registry_auth, vec![manifest::WASM_LAYER_MEDIA_TYPE])
.await
.expect("failed to pull image");
let push_image: Reference = format!("localhost:{}/hello-wasm:v1", port).parse().unwrap();
c.auth(&push_image, registry_auth, RegistryOperation::Push)
.await
.expect("authenticated");
c.push(
&push_image,
&image_data.layers,
image_data.config.clone(),
registry_auth,
Some(manifest.clone()),
)
.await
.expect("failed to push image");
let pulled_image_data = c
.pull(
&push_image,
registry_auth,
vec![manifest::WASM_LAYER_MEDIA_TYPE],
)
.await
.expect("failed to pull pushed image");
let (pulled_manifest, _digest) = c
._pull_image_manifest(&push_image)
.await
.expect("failed to pull pushed image manifest");
assert!(image_data.layers.len() == 1);
assert!(pulled_image_data.layers.len() == 1);
assert_eq!(
image_data.layers[0].data.len(),
pulled_image_data.layers[0].data.len()
);
assert_eq!(image_data.layers[0].data, pulled_image_data.layers[0].data);
assert_eq!(manifest.media_type, pulled_manifest.media_type);
assert_eq!(manifest.schema_version, pulled_manifest.schema_version);
assert_eq!(manifest.config.digest, pulled_manifest.config.digest);
}
#[tokio::test]
async fn test_raw_manifest_digest() {
let _ = tracing_subscriber::fmt::try_init();
let c = Client::default();
let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap();
c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull)
.await
.expect("cannot authenticate against registry for pull operation");
let (manifest, _) = c
.pull_manifest_raw(
&image,
&RegistryAuth::Anonymous,
MIME_TYPES_DISTRIBUTION_MANIFEST,
)
.await
.expect("failed to pull manifest");
let digest = sha2::Sha256::digest(manifest);
let hex = format!("sha256:{:x}", digest);
assert_eq!(image.digest().unwrap(), hex);
}
#[tokio::test]
#[cfg(feature = "test-registry")]
async fn test_mount() {
let test_container = registry_image()
.start()
.await
.expect("Failed to start registry");
let port = test_container
.get_host_port_ipv4(5000)
.await
.expect("Failed to get port");
let c = Client::new(ClientConfig {
protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
..Default::default()
});
let layer_reference: Reference = format!("localhost:{}/layer-repository", port)
.parse()
.unwrap();
let layer_data = vec![1u8, 2, 3, 4];
let layer = OciDescriptor {
digest: sha256_digest(&layer_data),
..Default::default()
};
c.push_blob(&layer_reference, &[1, 2, 3, 4], &layer.digest)
.await
.expect("Failed to push");
let image_reference: Reference = format!("localhost:{}/image-repository", port)
.parse()
.unwrap();
c.mount_blob(&image_reference, &layer_reference, &layer.digest)
.await
.expect("Failed to mount");
let mut buf = Vec::new();
c.pull_blob(&image_reference, &layer, &mut buf)
.await
.expect("Failed to pull");
assert_eq!(layer_data, buf);
}
#[tokio::test]
async fn test_platform_resolution() {
let reference = Reference::try_from(DOCKER_IO_IMAGE).expect("failed to parse reference");
let mut c = Client::new(ClientConfig {
platform_resolver: None,
..Default::default()
});
let err = c
.pull_image_manifest(&reference, &RegistryAuth::Anonymous)
.await
.unwrap_err();
assert_eq!(
format!("{}", err),
"Received Image Index/Manifest List, but platform_resolver was not defined on the client config. Consider setting platform_resolver"
);
c = Client::new(ClientConfig {
platform_resolver: Some(Box::new(linux_amd64_resolver)),
..Default::default()
});
let (_manifest, digest) = c
.pull_image_manifest(&reference, &RegistryAuth::Anonymous)
.await
.expect("Couldn't pull manifest");
assert_eq!(
digest,
"sha256:f54a58bc1aac5ea1a25d796ae155dc228b3f0e11d046ae276b39c4bf2f13d8c4"
);
}
#[tokio::test]
async fn test_pull_ghcr_io() {
let reference = Reference::try_from(GHCR_IO_IMAGE).expect("failed to parse reference");
let c = Client::default();
let (manifest, _manifest_str) = c
.pull_image_manifest(&reference, &RegistryAuth::Anonymous)
.await
.unwrap();
assert_eq!(manifest.config.media_type, manifest::WASM_CONFIG_MEDIA_TYPE);
}
#[tokio::test]
#[ignore]
async fn test_roundtrip_multiple_layers() {
let _ = tracing_subscriber::fmt::try_init();
let c = Client::new(ClientConfig {
protocol: ClientProtocol::HttpsExcept(vec!["oci.registry.local".to_string()]),
..Default::default()
});
let src_image = Reference::try_from("registry:2.7.1").expect("failed to parse reference");
let dest_image = Reference::try_from("oci.registry.local/registry:roundtrip-test")
.expect("failed to parse reference");
let image = c
.pull(
&src_image,
&RegistryAuth::Anonymous,
vec![IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE],
)
.await
.expect("Failed to pull manifest");
assert!(image.layers.len() > 1);
let ImageData {
layers,
config,
manifest,
..
} = image;
c.push(
&dest_image,
&layers,
config,
&RegistryAuth::Anonymous,
manifest,
)
.await
.expect("Failed to pull manifest");
c.pull_image_manifest(&dest_image, &RegistryAuth::Anonymous)
.await
.expect("Failed to pull manifest");
}
#[tokio::test]
async fn test_hashable_image_layer() {
use itertools::Itertools;
let image_layers = Vec::from([
ImageLayer {
data: Vec::from([0, 1, 2, 3]),
media_type: "media_type".to_owned(),
annotations: Some(BTreeMap::from([
("0".to_owned(), "1".to_owned()),
("2".to_owned(), "3".to_owned()),
])),
},
ImageLayer {
data: Vec::from([0, 1, 2, 3]),
media_type: "media_type".to_owned(),
annotations: Some(BTreeMap::from([
("2".to_owned(), "3".to_owned()),
("0".to_owned(), "1".to_owned()),
])),
},
ImageLayer {
data: Vec::from([0, 1, 2, 3]),
media_type: "different_media_type".to_owned(),
annotations: Some(BTreeMap::from([
("0".to_owned(), "1".to_owned()),
("2".to_owned(), "3".to_owned()),
])),
},
ImageLayer {
data: Vec::from([0, 1, 2]),
media_type: "media_type".to_owned(),
annotations: Some(BTreeMap::from([
("0".to_owned(), "1".to_owned()),
("2".to_owned(), "3".to_owned()),
])),
},
ImageLayer {
data: Vec::from([0, 1, 2, 3]),
media_type: "media_type".to_owned(),
annotations: Some(BTreeMap::from([
("1".to_owned(), "0".to_owned()),
("2".to_owned(), "3".to_owned()),
])),
},
]);
assert_eq!(
&image_layers[0], &image_layers[1],
"image_layers[0] should equal image_layers[1]"
);
assert_ne!(
&image_layers[0], &image_layers[2],
"image_layers[0] should not equal image_layers[2]"
);
assert_ne!(
&image_layers[0], &image_layers[3],
"image_layers[0] should not equal image_layers[3]"
);
assert_ne!(
&image_layers[0], &image_layers[4],
"image_layers[0] should not equal image_layers[4]"
);
assert_ne!(
&image_layers[2], &image_layers[3],
"image_layers[2] should not equal image_layers[3]"
);
assert_ne!(
&image_layers[2], &image_layers[4],
"image_layers[2] should not equal image_layers[4]"
);
assert_ne!(
&image_layers[3], &image_layers[4],
"image_layers[3] should not equal image_layers[4]"
);
let deduped: Vec<ImageLayer> = image_layers.clone().into_iter().unique().collect();
assert_eq!(
image_layers.len() - 1,
deduped.len(),
"after deduplication, there should be one less image layer"
);
}
}