tokio_tar/
archive.rs

1use portable_atomic::{AtomicU64, Ordering};
2use rustc_hash::FxHashSet;
3use std::{
4    cmp,
5    collections::VecDeque,
6    path::Path,
7    pin::Pin,
8    sync::Arc,
9    task::{Context, Poll},
10};
11use tokio::{
12    fs,
13    io::{self, AsyncRead as Read, AsyncReadExt},
14    sync::Mutex,
15};
16use tokio_stream::*;
17
18use crate::{
19    entry::{EntryFields, EntryIo},
20    error::TarError,
21    other, Entry, GnuExtSparseHeader, GnuSparseHeader, Header,
22};
23use crate::{header::BLOCK_SIZE, pax::pax_extensions};
24
25/// A top-level representation of an archive file.
26///
27/// This archive can have an entry added to it and it can be iterated over.
28#[derive(Debug)]
29pub struct Archive<R: Read + Unpin> {
30    inner: Arc<ArchiveInner<R>>,
31}
32
33impl<R: Read + Unpin> Clone for Archive<R> {
34    fn clone(&self) -> Self {
35        Archive {
36            inner: self.inner.clone(),
37        }
38    }
39}
40
41#[derive(Debug)]
42pub struct ArchiveInner<R> {
43    pos: AtomicU64,
44    unpack_xattrs: bool,
45    preserve_permissions: bool,
46    preserve_mtime: bool,
47    allow_external_symlinks: bool,
48    overwrite: bool,
49    ignore_zeros: bool,
50    obj: Mutex<R>,
51}
52
53/// Configure the archive.
54pub struct ArchiveBuilder<R: Read + Unpin> {
55    obj: R,
56    unpack_xattrs: bool,
57    preserve_permissions: bool,
58    preserve_mtime: bool,
59    allow_external_symlinks: bool,
60    overwrite: bool,
61    ignore_zeros: bool,
62}
63
64impl<R: Read + Unpin> ArchiveBuilder<R> {
65    /// Create a new builder.
66    pub fn new(obj: R) -> Self {
67        ArchiveBuilder {
68            unpack_xattrs: false,
69            preserve_permissions: false,
70            preserve_mtime: true,
71            allow_external_symlinks: true,
72            overwrite: true,
73            ignore_zeros: false,
74            obj,
75        }
76    }
77
78    /// Indicate whether extended file attributes (xattrs on Unix) are preserved
79    /// when unpacking this archive.
80    ///
81    /// This flag is disabled by default and is currently only implemented on
82    /// Unix using xattr support. This may eventually be implemented for
83    /// Windows, however, if other archive implementations are found which do
84    /// this as well.
85    pub fn set_unpack_xattrs(mut self, unpack_xattrs: bool) -> Self {
86        self.unpack_xattrs = unpack_xattrs;
87        self
88    }
89
90    /// Indicate whether the permissions on files and directories are preserved
91    /// when unpacking this entry.
92    ///
93    /// This flag is disabled by default and is currently only implemented on
94    /// Unix.
95    pub fn set_preserve_permissions(mut self, preserve: bool) -> Self {
96        self.preserve_permissions = preserve;
97        self
98    }
99
100    /// Indicate whether files and symlinks should be overwritten on extraction.
101    pub fn set_overwrite(mut self, overwrite: bool) -> Self {
102        self.overwrite = overwrite;
103        self
104    }
105
106    /// Indicate whether access time information is preserved when unpacking
107    /// this entry.
108    ///
109    /// This flag is enabled by default.
110    pub fn set_preserve_mtime(mut self, preserve: bool) -> Self {
111        self.preserve_mtime = preserve;
112        self
113    }
114
115    /// Ignore zeroed headers, which would otherwise indicate to the archive that it has no more
116    /// entries.
117    ///
118    /// This can be used in case multiple tar archives have been concatenated together.
119    pub fn set_ignore_zeros(mut self, ignore_zeros: bool) -> Self {
120        self.ignore_zeros = ignore_zeros;
121        self
122    }
123
124    /// Indicate whether to deny symlinks that point outside the destination
125    /// directory when unpacking this entry. (Writing to locations outside the
126    /// destination directory is _always_ forbidden.)
127    ///
128    /// This flag is enabled by default.
129    pub fn set_allow_external_symlinks(mut self, allow_external_symlinks: bool) -> Self {
130        self.allow_external_symlinks = allow_external_symlinks;
131        self
132    }
133
134    /// Construct the archive, ready to accept inputs.
135    pub fn build(self) -> Archive<R> {
136        let Self {
137            unpack_xattrs,
138            preserve_permissions,
139            preserve_mtime,
140            allow_external_symlinks,
141            overwrite,
142            ignore_zeros,
143            obj,
144        } = self;
145
146        Archive {
147            inner: Arc::new(ArchiveInner {
148                unpack_xattrs,
149                preserve_permissions,
150                preserve_mtime,
151                allow_external_symlinks,
152                overwrite,
153                ignore_zeros,
154                obj: Mutex::new(obj),
155                pos: 0.into(),
156            }),
157        }
158    }
159}
160
161impl<R: Read + Unpin> Archive<R> {
162    /// Create a new archive with the underlying object as the reader.
163    pub fn new(obj: R) -> Archive<R> {
164        Archive {
165            inner: Arc::new(ArchiveInner {
166                unpack_xattrs: false,
167                preserve_permissions: false,
168                preserve_mtime: true,
169                allow_external_symlinks: true,
170                overwrite: true,
171                ignore_zeros: false,
172                obj: Mutex::new(obj),
173                pos: 0.into(),
174            }),
175        }
176    }
177
178    /// Unwrap this archive, returning the underlying object.
179    pub fn into_inner(self) -> Result<R, Self> {
180        let Self { inner } = self;
181
182        match Arc::try_unwrap(inner) {
183            Ok(inner) => Ok(inner.obj.into_inner()),
184            Err(inner) => Err(Self { inner }),
185        }
186    }
187
188    /// Construct an stream over the entries in this archive.
189    ///
190    /// Note that care must be taken to consider each entry within an archive in
191    /// sequence. If entries are processed out of sequence (from what the
192    /// stream returns), then the contents read for each entry may be
193    /// corrupted.
194    pub fn entries(&mut self) -> io::Result<Entries<R>> {
195        if self.inner.pos.load(Ordering::SeqCst) != 0 {
196            return Err(other(
197                "cannot call entries unless archive is at \
198                 position 0",
199            ));
200        }
201
202        Ok(Entries {
203            archive: self.clone(),
204            pending: None,
205            current: (0, None, 0, None, None),
206            gnu_longlink: (false, None),
207            gnu_longname: (false, None),
208            pax_extensions: (false, None),
209        })
210    }
211
212    /// Construct an stream over the raw entries in this archive.
213    ///
214    /// Note that care must be taken to consider each entry within an archive in
215    /// sequence. If entries are processed out of sequence (from what the
216    /// stream returns), then the contents read for each entry may be
217    /// corrupted.
218    pub fn entries_raw(&mut self) -> io::Result<RawEntries<R>> {
219        if self.inner.pos.load(Ordering::SeqCst) != 0 {
220            return Err(other(
221                "cannot call entries_raw unless archive is at \
222                 position 0",
223            ));
224        }
225
226        Ok(RawEntries {
227            archive: self.clone(),
228            current: (0, None, 0),
229        })
230    }
231
232    /// Unpacks the contents tarball into the specified `dst`.
233    ///
234    /// This function will iterate over the entire contents of this tarball,
235    /// extracting each file in turn to the location specified by the entry's
236    /// path name.
237    ///
238    /// This operation is relatively sensitive in that it will not write files
239    /// outside of the path specified by `dst`. Files in the archive which have
240    /// a '..' in their path are skipped during the unpacking process.
241    ///
242    /// # Examples
243    ///
244    /// ```no_run
245    /// # fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> { tokio::runtime::Runtime::new().unwrap().block_on(async {
246    /// #
247    /// use tokio::fs::File;
248    /// use tokio_tar::Archive;
249    ///
250    /// let mut ar = Archive::new(File::open("foo.tar").await?);
251    /// ar.unpack("foo").await?;
252    /// #
253    /// # Ok(()) }) }
254    /// ```
255    pub async fn unpack<P: AsRef<Path>>(&mut self, dst: P) -> io::Result<()> {
256        let mut entries = self.entries()?;
257        let mut pinned = Pin::new(&mut entries);
258        let dst = dst.as_ref();
259
260        if fs::symlink_metadata(dst).await.is_err() {
261            fs::create_dir_all(&dst)
262                .await
263                .map_err(|e| TarError::new(format!("failed to create `{}`", dst.display()), e))?;
264        }
265
266        // Canonicalizing the dst directory will prepend the path with '\\?\'
267        // on windows which will allow windows APIs to treat the path as an
268        // extended-length path with a 32,767 character limit. Otherwise all
269        // unpacked paths over 260 characters will fail on creation with a
270        // NotFound exception.
271        let dst = fs::canonicalize(dst).await?;
272
273        // Memoize filesystem calls to canonicalize paths.
274        let mut targets = FxHashSet::default();
275
276        // Delay any directory entries until the end (they will be created if needed by
277        // descendants), to ensure that directory permissions do not interfere with descendant
278        // extraction.
279        let mut directories = Vec::new();
280        while let Some(entry) = pinned.next().await {
281            let mut file = entry.map_err(|e| TarError::new("failed to iterate over archive", e))?;
282            if file.header().entry_type() == crate::EntryType::Directory {
283                directories.push(file);
284            } else {
285                file.unpack_in_raw(&dst, &mut targets).await?;
286            }
287        }
288
289        // Apply the directories.
290        //
291        // Note: the order of application is important to permissions. That is, we must traverse
292        // the filesystem graph in topological ordering or else we risk not being able to create
293        // child directories within those of more restrictive permissions. See [0] for details.
294        //
295        // [0]: <https://github.com/alexcrichton/tar-rs/issues/242>
296        directories.sort_by(|a, b| b.path_bytes().cmp(&a.path_bytes()));
297        for mut dir in directories {
298            dir.unpack_in_raw(&dst, &mut targets).await?;
299        }
300
301        Ok(())
302    }
303}
304
305/// Stream of `Entry`s.
306pub struct Entries<R: Read + Unpin> {
307    archive: Archive<R>,
308    current: (
309        u64,
310        Option<Header>,
311        usize,
312        Option<GnuExtSparseHeader>,
313        Option<Vec<u8>>,
314    ),
315    /// The [`Entry`] that is currently being processed.
316    pending: Option<Entry<Archive<R>>>,
317    /// GNU long name extension.
318    ///
319    /// The first element is a flag indicating whether the long name entry has been fully read.
320    /// The second element is the buffer containing the long name, or `None` if the long name entry
321    /// has not been encountered yet.
322    gnu_longname: (bool, Option<Vec<u8>>),
323    /// GNU long link extension.
324    ///
325    /// The first element is a flag indicating whether the long link entry has been fully read.
326    /// The second element is the buffer containing the long link, or `None` if the long link entry
327    /// has not been encountered yet.
328    gnu_longlink: (bool, Option<Vec<u8>>),
329    /// PAX extensions.
330    ///
331    /// The first element is a flag indicating whether the extension entry has been fully read.
332    /// The second element is the buffer containing the extension, or `None` if the extension entry
333    /// has not been encountered yet.
334    pax_extensions: (bool, Option<Vec<u8>>),
335}
336
337macro_rules! ready_opt_err {
338    ($val:expr) => {
339        match futures_core::ready!($val) {
340            Some(Ok(val)) => val,
341            Some(Err(err)) => return Poll::Ready(Some(Err(err))),
342            None => return Poll::Ready(None),
343        }
344    };
345}
346
347macro_rules! ready_err {
348    ($val:expr) => {
349        match futures_core::ready!($val) {
350            Ok(val) => val,
351            Err(err) => return Poll::Ready(Some(Err(err))),
352        }
353    };
354}
355
356impl<R: Read + Unpin> Stream for Entries<R> {
357    type Item = io::Result<Entry<Archive<R>>>;
358
359    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
360        loop {
361            let archive = self.archive.clone();
362
363            let entry = if let Some(entry) = self.pending.take() {
364                entry
365            } else {
366                let (next, current_header, current_header_pos, _, pax_extensions) =
367                    &mut self.current;
368                ready_opt_err!(poll_next_raw(
369                    archive,
370                    next,
371                    current_header,
372                    current_header_pos,
373                    cx,
374                    pax_extensions.as_deref(),
375                ))
376            };
377
378            let is_recognized_header =
379                entry.header().as_gnu().is_some() || entry.header().as_ustar().is_some();
380
381            if is_recognized_header && entry.header().entry_type().is_gnu_longname() {
382                if self.gnu_longname.0 {
383                    return Poll::Ready(Some(Err(other(
384                        "two long name entries describing \
385                         the same member",
386                    ))));
387                }
388
389                let mut ef = EntryFields::from(entry);
390                let cursor = self.gnu_longname.1.get_or_insert_with(|| {
391                    let cap = cmp::min(ef.size, 128 * 1024);
392                    Vec::with_capacity(cap as usize)
393                });
394                if let Poll::Ready(result) = Pin::new(&mut ef).poll_read_all(cx, cursor) {
395                    if let Err(err) = result {
396                        return Poll::Ready(Some(Err(err)));
397                    }
398                } else {
399                    self.pending = Some(ef.into_entry());
400                    return Poll::Pending;
401                }
402
403                self.gnu_longname.0 = true;
404                continue;
405            }
406
407            if is_recognized_header && entry.header().entry_type().is_gnu_longlink() {
408                if self.gnu_longlink.0 {
409                    return Poll::Ready(Some(Err(other(
410                        "two long name entries describing \
411                         the same member",
412                    ))));
413                }
414
415                let mut ef = EntryFields::from(entry);
416                let cursor = self.gnu_longlink.1.get_or_insert_with(|| {
417                    let cap = cmp::min(ef.size, 128 * 1024);
418                    Vec::with_capacity(cap as usize)
419                });
420                if let Poll::Ready(result) = Pin::new(&mut ef).poll_read_all(cx, cursor) {
421                    if let Err(err) = result {
422                        return Poll::Ready(Some(Err(err)));
423                    }
424                } else {
425                    self.pending = Some(ef.into_entry());
426                    return Poll::Pending;
427                }
428
429                self.gnu_longlink.0 = true;
430                continue;
431            }
432
433            if is_recognized_header && entry.header().entry_type().is_pax_local_extensions() {
434                if self.pax_extensions.0 {
435                    return Poll::Ready(Some(Err(other(
436                        "two pax extensions entries describing \
437                         the same member",
438                    ))));
439                }
440
441                let mut ef = EntryFields::from(entry);
442                let cursor = self.pax_extensions.1.get_or_insert_with(|| {
443                    let cap = cmp::min(ef.size, 128 * 1024);
444                    Vec::with_capacity(cap as usize)
445                });
446                if let Poll::Ready(result) = Pin::new(&mut ef).poll_read_all(cx, cursor) {
447                    if let Err(err) = result {
448                        return Poll::Ready(Some(Err(err)));
449                    }
450                } else {
451                    self.pending = Some(ef.into_entry());
452                    return Poll::Pending;
453                }
454
455                self.pax_extensions.0 = true;
456                self.current.4 = self.pax_extensions.1.clone();
457                continue;
458            }
459
460            let mut fields = EntryFields::from(entry);
461            if self.gnu_longname.0 {
462                fields.long_pathname = self.gnu_longname.1.take();
463                self.gnu_longname.0 = false;
464            }
465            if self.gnu_longlink.0 {
466                fields.long_linkname = self.gnu_longlink.1.take();
467                self.gnu_longlink.0 = false;
468            }
469            if self.pax_extensions.0 {
470                fields.pax_extensions = self.pax_extensions.1.take();
471                self.pax_extensions.0 = false;
472            }
473
474            let archive = self.archive.clone();
475            let (next, _, current_pos, current_ext, _pax_extensions) = &mut self.current;
476
477            ready_err!(poll_parse_sparse_header(
478                archive,
479                next,
480                current_ext,
481                current_pos,
482                &mut fields,
483                cx,
484            ));
485
486            return Poll::Ready(Some(Ok(fields.into_entry())));
487        }
488    }
489}
490
491/// Stream of raw `Entry`s.
492pub struct RawEntries<R: Read + Unpin> {
493    archive: Archive<R>,
494    current: (u64, Option<Header>, usize),
495}
496
497impl<R: Read + Unpin> Stream for RawEntries<R> {
498    type Item = io::Result<Entry<Archive<R>>>;
499
500    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
501        let archive = self.archive.clone();
502        let (next, current_header, current_header_pos) = &mut self.current;
503        poll_next_raw(archive, next, current_header, current_header_pos, cx, None)
504    }
505}
506
507fn poll_next_raw<R: Read + Unpin>(
508    mut archive: Archive<R>,
509    next: &mut u64,
510    current_header: &mut Option<Header>,
511    current_header_pos: &mut usize,
512    cx: &mut Context<'_>,
513    pax_extensions_data: Option<&[u8]>,
514) -> Poll<Option<io::Result<Entry<Archive<R>>>>> {
515    let mut header_pos = *next;
516
517    loop {
518        // Seek to the start of the next header in the archive
519        if current_header.is_none() {
520            let delta = *next - archive.inner.pos.load(Ordering::SeqCst);
521            match futures_core::ready!(poll_skip(&mut archive, cx, delta)) {
522                Ok(_) => {}
523                Err(err) => return Poll::Ready(Some(Err(err))),
524            }
525
526            *current_header = Some(Header::new_old());
527            *current_header_pos = 0;
528        }
529
530        let header = current_header.as_mut().unwrap();
531
532        // EOF is an indicator that we are at the end of the archive.
533        match futures_core::ready!(poll_try_read_all(
534            &mut archive,
535            cx,
536            header.as_mut_bytes(),
537            current_header_pos,
538        )) {
539            Ok(true) => {}
540            Ok(false) => return Poll::Ready(None),
541            Err(err) => return Poll::Ready(Some(Err(err))),
542        }
543
544        // If a header is not all zeros, we have another valid header.
545        // Otherwise, check if we are ignoring zeros and continue, or break as if this is the
546        // end of the archive.
547        if !header.as_bytes().iter().all(|i| *i == 0) {
548            *next += BLOCK_SIZE;
549            break;
550        }
551
552        if !archive.inner.ignore_zeros {
553            return Poll::Ready(None);
554        }
555
556        *next += BLOCK_SIZE;
557        header_pos = *next;
558    }
559
560    let header = current_header.as_mut().unwrap();
561
562    // Make sure the checksum is ok
563    let sum = header.as_bytes()[..148]
564        .iter()
565        .chain(&header.as_bytes()[156..])
566        .fold(0, |a, b| a + (*b as u32))
567        + 8 * 32;
568    let cksum = header.cksum()?;
569    if sum != cksum {
570        return Poll::Ready(Some(Err(other("archive header checksum mismatch"))));
571    }
572
573    let file_pos = *next;
574
575    let mut header = current_header.take().unwrap();
576
577    // note when pax extensions are available, the size from the header will be ignored
578    let mut size = header.entry_size()?;
579
580    // the size above will be overriden by the pax data if it has a size field.
581    // same for uid and gid, which will be overridden in the header itself.
582    if let Some(pax) = pax_extensions_data.map(pax_extensions) {
583        for extension in pax {
584            let extension = extension?;
585
586            // ignore keys that aren't parsable as a string at this stage.
587            // that isn't relevant to the size/uid/gid processing.
588            let Ok(key) = extension.key() else {
589                continue;
590            };
591
592            match key {
593                "size" => {
594                    let size_str = extension
595                        .value()
596                        .map_err(|_e| other("failed to parse pax size as string"))?;
597                    size = size_str
598                        .parse::<u64>()
599                        .map_err(|_e| other("failed to parse pax size"))?;
600                }
601
602                "uid" => {
603                    let uid_str = extension
604                        .value()
605                        .map_err(|_e| other("failed to parse pax uid as string"))?;
606                    header.set_uid(
607                        uid_str
608                            .parse::<u64>()
609                            .map_err(|_e| other("failed to parse pax uid"))?,
610                    );
611                }
612
613                "gid" => {
614                    let gid_str = extension
615                        .value()
616                        .map_err(|_e| other("failed to parse pax gid as string"))?;
617                    header.set_gid(
618                        gid_str
619                            .parse::<u64>()
620                            .map_err(|_e| other("failed to parse pax gid"))?,
621                    );
622                }
623
624                _ => {
625                    continue;
626                }
627            }
628        }
629    }
630
631    let mut data = VecDeque::with_capacity(1);
632    data.push_back(EntryIo::Data(archive.clone().take(size)));
633
634    let ret = EntryFields {
635        size,
636        header_pos,
637        file_pos,
638        data,
639        header,
640        long_pathname: None,
641        long_linkname: None,
642        pax_extensions: None,
643        unpack_xattrs: archive.inner.unpack_xattrs,
644        preserve_permissions: archive.inner.preserve_permissions,
645        preserve_mtime: archive.inner.preserve_mtime,
646        overwrite: archive.inner.overwrite,
647        allow_external_symlinks: archive.inner.allow_external_symlinks,
648        read_state: None,
649    };
650
651    // Store where the next entry is, rounding up by 512 bytes (the size of
652    // a header);
653    let size = size
654        .checked_add(BLOCK_SIZE - 1)
655        .ok_or_else(|| other("size overflow"))?;
656    *next = next
657        .checked_add(size & !(BLOCK_SIZE - 1))
658        .ok_or_else(|| other("size overflow"))?;
659
660    Poll::Ready(Some(Ok(ret.into_entry())))
661}
662
663fn poll_parse_sparse_header<R: Read + Unpin>(
664    mut archive: Archive<R>,
665    next: &mut u64,
666    current_ext: &mut Option<GnuExtSparseHeader>,
667    current_ext_pos: &mut usize,
668    entry: &mut EntryFields<Archive<R>>,
669    cx: &mut Context<'_>,
670) -> Poll<io::Result<()>> {
671    if !entry.header.entry_type().is_gnu_sparse() {
672        return Poll::Ready(Ok(()));
673    }
674
675    let gnu = match entry.header.as_gnu() {
676        Some(gnu) => gnu,
677        None => return Poll::Ready(Err(other("sparse entry type listed but not GNU header"))),
678    };
679
680    // Sparse files are represented internally as a list of blocks that are
681    // read. Blocks are either a bunch of 0's or they're data from the
682    // underlying archive.
683    //
684    // Blocks of a sparse file are described by the `GnuSparseHeader`
685    // structure, some of which are contained in `GnuHeader` but some of
686    // which may also be contained after the first header in further
687    // headers.
688    //
689    // We read off all the blocks here and use the `add_block` function to
690    // incrementally add them to the list of I/O block (in `entry.data`).
691    // The `add_block` function also validates that each chunk comes after
692    // the previous, we don't overrun the end of the file, and each block is
693    // aligned to a 512-byte boundary in the archive itself.
694    //
695    // At the end we verify that the sparse file size (`Header::size`) is
696    // the same as the current offset (described by the list of blocks) as
697    // well as the amount of data read equals the size of the entry
698    // (`Header::entry_size`).
699    entry.data.truncate(0);
700
701    let mut cur = 0;
702    let mut remaining = entry.size;
703    {
704        let data = &mut entry.data;
705        let reader = archive.clone();
706        let size = entry.size;
707        let mut add_block = |block: &GnuSparseHeader| -> io::Result<_> {
708            if block.is_empty() {
709                return Ok(());
710            }
711            let off = block.offset()?;
712            let len = block.length()?;
713
714            if len != 0 && (size - remaining) % BLOCK_SIZE != 0 {
715                return Err(other(
716                    "previous block in sparse file was not \
717                     aligned to 512-byte boundary",
718                ));
719            } else if off < cur {
720                return Err(other(
721                    "out of order or overlapping sparse \
722                     blocks",
723                ));
724            } else if cur < off {
725                let block = io::repeat(0).take(off - cur);
726                data.push_back(EntryIo::Pad(block));
727            }
728            cur = off
729                .checked_add(len)
730                .ok_or_else(|| other("more bytes listed in sparse file than u64 can hold"))?;
731            remaining = remaining.checked_sub(len).ok_or_else(|| {
732                other(
733                    "sparse file consumed more data than the header \
734                     listed",
735                )
736            })?;
737            data.push_back(EntryIo::Data(reader.clone().take(len)));
738            Ok(())
739        };
740        for block in gnu.sparse.iter() {
741            add_block(block)?
742        }
743        if gnu.is_extended() {
744            let started_header = current_ext.is_some();
745            if !started_header {
746                let mut ext = GnuExtSparseHeader::new();
747                ext.isextended[0] = 1;
748                *current_ext = Some(ext);
749                *current_ext_pos = 0;
750            }
751
752            let ext = current_ext.as_mut().unwrap();
753            while ext.is_extended() {
754                match futures_core::ready!(poll_try_read_all(
755                    &mut archive,
756                    cx,
757                    ext.as_mut_bytes(),
758                    current_ext_pos,
759                )) {
760                    Ok(true) => {}
761                    Ok(false) => return Poll::Ready(Err(other("failed to read extension"))),
762                    Err(err) => return Poll::Ready(Err(err)),
763                }
764
765                *next += BLOCK_SIZE;
766                for block in ext.sparse.iter() {
767                    add_block(block)?;
768                }
769            }
770        }
771    }
772    if cur != gnu.real_size()? {
773        return Poll::Ready(Err(other(
774            "mismatch in sparse file chunks and \
775             size in header",
776        )));
777    }
778    entry.size = cur;
779    if remaining > 0 {
780        return Poll::Ready(Err(other(
781            "mismatch in sparse file chunks and \
782             entry size in header",
783        )));
784    }
785
786    Poll::Ready(Ok(()))
787}
788
789impl<R: Read + Unpin> Read for Archive<R> {
790    fn poll_read(
791        self: Pin<&mut Self>,
792        cx: &mut Context<'_>,
793        into: &mut io::ReadBuf<'_>,
794    ) -> Poll<io::Result<()>> {
795        let mut r = if let Ok(v) = self.inner.obj.try_lock() {
796            v
797        } else {
798            return Poll::Pending;
799        };
800
801        let res = futures_core::ready!(Pin::new(&mut *r).poll_read(cx, into));
802        match res {
803            Ok(()) => {
804                self.inner
805                    .pos
806                    .fetch_add(into.filled().len() as u64, Ordering::SeqCst);
807                Poll::Ready(Ok(()))
808            }
809            Err(err) => Poll::Ready(Err(err)),
810        }
811    }
812}
813
814/// Try to fill the buffer from the reader.
815///
816/// If the reader reaches its end before filling the buffer at all, returns `false`.
817/// Otherwise returns `true`.
818fn poll_try_read_all<R: Read + Unpin>(
819    mut source: R,
820    cx: &mut Context<'_>,
821    buf: &mut [u8],
822    pos: &mut usize,
823) -> Poll<io::Result<bool>> {
824    while *pos < buf.len() {
825        let mut read_buf = io::ReadBuf::new(&mut buf[*pos..]);
826        match futures_core::ready!(Pin::new(&mut source).poll_read(cx, &mut read_buf)) {
827            Ok(()) if read_buf.filled().is_empty() => {
828                if *pos == 0 {
829                    return Poll::Ready(Ok(false));
830                }
831
832                return Poll::Ready(Err(other("failed to read entire block")));
833            }
834            Ok(()) => *pos += read_buf.filled().len(),
835            Err(err) => return Poll::Ready(Err(err)),
836        }
837    }
838
839    *pos = 0;
840    Poll::Ready(Ok(true))
841}
842
843/// Skip n bytes on the given source.
844fn poll_skip<R: Read + Unpin>(
845    mut source: R,
846    cx: &mut Context<'_>,
847    mut amt: u64,
848) -> Poll<io::Result<()>> {
849    let mut buf = [0u8; 4096 * 8];
850    while amt > 0 {
851        let n = cmp::min(amt, buf.len() as u64);
852        let mut read_buf = io::ReadBuf::new(&mut buf[..n as usize]);
853        match futures_core::ready!(Pin::new(&mut source).poll_read(cx, &mut read_buf)) {
854            Ok(()) if read_buf.filled().is_empty() => {
855                return Poll::Ready(Err(other("unexpected EOF during skip")));
856            }
857            Ok(()) => {
858                amt -= read_buf.filled().len() as u64;
859            }
860            Err(err) => return Poll::Ready(Err(err)),
861        }
862    }
863
864    Poll::Ready(Ok(()))
865}