1use portable_atomic::{AtomicU64, Ordering};
2use rustc_hash::FxHashSet;
3use std::{
4 cmp,
5 collections::VecDeque,
6 path::Path,
7 pin::Pin,
8 sync::Arc,
9 task::{Context, Poll},
10};
11use tokio::{
12 fs,
13 io::{self, AsyncRead as Read, AsyncReadExt},
14 sync::Mutex,
15};
16use tokio_stream::*;
17
18use crate::{
19 entry::{EntryFields, EntryIo},
20 error::TarError,
21 other, Entry, GnuExtSparseHeader, GnuSparseHeader, Header,
22};
23use crate::{header::BLOCK_SIZE, pax::pax_extensions};
24
25#[derive(Debug)]
29pub struct Archive<R: Read + Unpin> {
30 inner: Arc<ArchiveInner<R>>,
31}
32
33impl<R: Read + Unpin> Clone for Archive<R> {
34 fn clone(&self) -> Self {
35 Archive {
36 inner: self.inner.clone(),
37 }
38 }
39}
40
41#[derive(Debug)]
42pub struct ArchiveInner<R> {
43 pos: AtomicU64,
44 unpack_xattrs: bool,
45 preserve_permissions: bool,
46 preserve_mtime: bool,
47 allow_external_symlinks: bool,
48 overwrite: bool,
49 ignore_zeros: bool,
50 obj: Mutex<R>,
51}
52
53pub struct ArchiveBuilder<R: Read + Unpin> {
55 obj: R,
56 unpack_xattrs: bool,
57 preserve_permissions: bool,
58 preserve_mtime: bool,
59 allow_external_symlinks: bool,
60 overwrite: bool,
61 ignore_zeros: bool,
62}
63
64impl<R: Read + Unpin> ArchiveBuilder<R> {
65 pub fn new(obj: R) -> Self {
67 ArchiveBuilder {
68 unpack_xattrs: false,
69 preserve_permissions: false,
70 preserve_mtime: true,
71 allow_external_symlinks: true,
72 overwrite: true,
73 ignore_zeros: false,
74 obj,
75 }
76 }
77
78 pub fn set_unpack_xattrs(mut self, unpack_xattrs: bool) -> Self {
86 self.unpack_xattrs = unpack_xattrs;
87 self
88 }
89
90 pub fn set_preserve_permissions(mut self, preserve: bool) -> Self {
96 self.preserve_permissions = preserve;
97 self
98 }
99
100 pub fn set_overwrite(mut self, overwrite: bool) -> Self {
102 self.overwrite = overwrite;
103 self
104 }
105
106 pub fn set_preserve_mtime(mut self, preserve: bool) -> Self {
111 self.preserve_mtime = preserve;
112 self
113 }
114
115 pub fn set_ignore_zeros(mut self, ignore_zeros: bool) -> Self {
120 self.ignore_zeros = ignore_zeros;
121 self
122 }
123
124 pub fn set_allow_external_symlinks(mut self, allow_external_symlinks: bool) -> Self {
130 self.allow_external_symlinks = allow_external_symlinks;
131 self
132 }
133
134 pub fn build(self) -> Archive<R> {
136 let Self {
137 unpack_xattrs,
138 preserve_permissions,
139 preserve_mtime,
140 allow_external_symlinks,
141 overwrite,
142 ignore_zeros,
143 obj,
144 } = self;
145
146 Archive {
147 inner: Arc::new(ArchiveInner {
148 unpack_xattrs,
149 preserve_permissions,
150 preserve_mtime,
151 allow_external_symlinks,
152 overwrite,
153 ignore_zeros,
154 obj: Mutex::new(obj),
155 pos: 0.into(),
156 }),
157 }
158 }
159}
160
161impl<R: Read + Unpin> Archive<R> {
162 pub fn new(obj: R) -> Archive<R> {
164 Archive {
165 inner: Arc::new(ArchiveInner {
166 unpack_xattrs: false,
167 preserve_permissions: false,
168 preserve_mtime: true,
169 allow_external_symlinks: true,
170 overwrite: true,
171 ignore_zeros: false,
172 obj: Mutex::new(obj),
173 pos: 0.into(),
174 }),
175 }
176 }
177
178 pub fn into_inner(self) -> Result<R, Self> {
180 let Self { inner } = self;
181
182 match Arc::try_unwrap(inner) {
183 Ok(inner) => Ok(inner.obj.into_inner()),
184 Err(inner) => Err(Self { inner }),
185 }
186 }
187
188 pub fn entries(&mut self) -> io::Result<Entries<R>> {
195 if self.inner.pos.load(Ordering::SeqCst) != 0 {
196 return Err(other(
197 "cannot call entries unless archive is at \
198 position 0",
199 ));
200 }
201
202 Ok(Entries {
203 archive: self.clone(),
204 pending: None,
205 current: (0, None, 0, None, None),
206 gnu_longlink: (false, None),
207 gnu_longname: (false, None),
208 pax_extensions: (false, None),
209 })
210 }
211
212 pub fn entries_raw(&mut self) -> io::Result<RawEntries<R>> {
219 if self.inner.pos.load(Ordering::SeqCst) != 0 {
220 return Err(other(
221 "cannot call entries_raw unless archive is at \
222 position 0",
223 ));
224 }
225
226 Ok(RawEntries {
227 archive: self.clone(),
228 current: (0, None, 0),
229 })
230 }
231
232 pub async fn unpack<P: AsRef<Path>>(&mut self, dst: P) -> io::Result<()> {
256 let mut entries = self.entries()?;
257 let mut pinned = Pin::new(&mut entries);
258 let dst = dst.as_ref();
259
260 if fs::symlink_metadata(dst).await.is_err() {
261 fs::create_dir_all(&dst)
262 .await
263 .map_err(|e| TarError::new(format!("failed to create `{}`", dst.display()), e))?;
264 }
265
266 let dst = fs::canonicalize(dst).await?;
272
273 let mut targets = FxHashSet::default();
275
276 let mut directories = Vec::new();
280 while let Some(entry) = pinned.next().await {
281 let mut file = entry.map_err(|e| TarError::new("failed to iterate over archive", e))?;
282 if file.header().entry_type() == crate::EntryType::Directory {
283 directories.push(file);
284 } else {
285 file.unpack_in_raw(&dst, &mut targets).await?;
286 }
287 }
288
289 directories.sort_by(|a, b| b.path_bytes().cmp(&a.path_bytes()));
297 for mut dir in directories {
298 dir.unpack_in_raw(&dst, &mut targets).await?;
299 }
300
301 Ok(())
302 }
303}
304
305pub struct Entries<R: Read + Unpin> {
307 archive: Archive<R>,
308 current: (
309 u64,
310 Option<Header>,
311 usize,
312 Option<GnuExtSparseHeader>,
313 Option<Vec<u8>>,
314 ),
315 pending: Option<Entry<Archive<R>>>,
317 gnu_longname: (bool, Option<Vec<u8>>),
323 gnu_longlink: (bool, Option<Vec<u8>>),
329 pax_extensions: (bool, Option<Vec<u8>>),
335}
336
337macro_rules! ready_opt_err {
338 ($val:expr) => {
339 match futures_core::ready!($val) {
340 Some(Ok(val)) => val,
341 Some(Err(err)) => return Poll::Ready(Some(Err(err))),
342 None => return Poll::Ready(None),
343 }
344 };
345}
346
347macro_rules! ready_err {
348 ($val:expr) => {
349 match futures_core::ready!($val) {
350 Ok(val) => val,
351 Err(err) => return Poll::Ready(Some(Err(err))),
352 }
353 };
354}
355
356impl<R: Read + Unpin> Stream for Entries<R> {
357 type Item = io::Result<Entry<Archive<R>>>;
358
359 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
360 loop {
361 let archive = self.archive.clone();
362
363 let entry = if let Some(entry) = self.pending.take() {
364 entry
365 } else {
366 let (next, current_header, current_header_pos, _, pax_extensions) =
367 &mut self.current;
368 ready_opt_err!(poll_next_raw(
369 archive,
370 next,
371 current_header,
372 current_header_pos,
373 cx,
374 pax_extensions.as_deref(),
375 ))
376 };
377
378 let is_recognized_header =
379 entry.header().as_gnu().is_some() || entry.header().as_ustar().is_some();
380
381 if is_recognized_header && entry.header().entry_type().is_gnu_longname() {
382 if self.gnu_longname.0 {
383 return Poll::Ready(Some(Err(other(
384 "two long name entries describing \
385 the same member",
386 ))));
387 }
388
389 let mut ef = EntryFields::from(entry);
390 let cursor = self.gnu_longname.1.get_or_insert_with(|| {
391 let cap = cmp::min(ef.size, 128 * 1024);
392 Vec::with_capacity(cap as usize)
393 });
394 if let Poll::Ready(result) = Pin::new(&mut ef).poll_read_all(cx, cursor) {
395 if let Err(err) = result {
396 return Poll::Ready(Some(Err(err)));
397 }
398 } else {
399 self.pending = Some(ef.into_entry());
400 return Poll::Pending;
401 }
402
403 self.gnu_longname.0 = true;
404 continue;
405 }
406
407 if is_recognized_header && entry.header().entry_type().is_gnu_longlink() {
408 if self.gnu_longlink.0 {
409 return Poll::Ready(Some(Err(other(
410 "two long name entries describing \
411 the same member",
412 ))));
413 }
414
415 let mut ef = EntryFields::from(entry);
416 let cursor = self.gnu_longlink.1.get_or_insert_with(|| {
417 let cap = cmp::min(ef.size, 128 * 1024);
418 Vec::with_capacity(cap as usize)
419 });
420 if let Poll::Ready(result) = Pin::new(&mut ef).poll_read_all(cx, cursor) {
421 if let Err(err) = result {
422 return Poll::Ready(Some(Err(err)));
423 }
424 } else {
425 self.pending = Some(ef.into_entry());
426 return Poll::Pending;
427 }
428
429 self.gnu_longlink.0 = true;
430 continue;
431 }
432
433 if is_recognized_header && entry.header().entry_type().is_pax_local_extensions() {
434 if self.pax_extensions.0 {
435 return Poll::Ready(Some(Err(other(
436 "two pax extensions entries describing \
437 the same member",
438 ))));
439 }
440
441 let mut ef = EntryFields::from(entry);
442 let cursor = self.pax_extensions.1.get_or_insert_with(|| {
443 let cap = cmp::min(ef.size, 128 * 1024);
444 Vec::with_capacity(cap as usize)
445 });
446 if let Poll::Ready(result) = Pin::new(&mut ef).poll_read_all(cx, cursor) {
447 if let Err(err) = result {
448 return Poll::Ready(Some(Err(err)));
449 }
450 } else {
451 self.pending = Some(ef.into_entry());
452 return Poll::Pending;
453 }
454
455 self.pax_extensions.0 = true;
456 self.current.4 = self.pax_extensions.1.clone();
457 continue;
458 }
459
460 let mut fields = EntryFields::from(entry);
461 if self.gnu_longname.0 {
462 fields.long_pathname = self.gnu_longname.1.take();
463 self.gnu_longname.0 = false;
464 }
465 if self.gnu_longlink.0 {
466 fields.long_linkname = self.gnu_longlink.1.take();
467 self.gnu_longlink.0 = false;
468 }
469 if self.pax_extensions.0 {
470 fields.pax_extensions = self.pax_extensions.1.take();
471 self.pax_extensions.0 = false;
472 }
473
474 let archive = self.archive.clone();
475 let (next, _, current_pos, current_ext, _pax_extensions) = &mut self.current;
476
477 ready_err!(poll_parse_sparse_header(
478 archive,
479 next,
480 current_ext,
481 current_pos,
482 &mut fields,
483 cx,
484 ));
485
486 return Poll::Ready(Some(Ok(fields.into_entry())));
487 }
488 }
489}
490
491pub struct RawEntries<R: Read + Unpin> {
493 archive: Archive<R>,
494 current: (u64, Option<Header>, usize),
495}
496
497impl<R: Read + Unpin> Stream for RawEntries<R> {
498 type Item = io::Result<Entry<Archive<R>>>;
499
500 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
501 let archive = self.archive.clone();
502 let (next, current_header, current_header_pos) = &mut self.current;
503 poll_next_raw(archive, next, current_header, current_header_pos, cx, None)
504 }
505}
506
507fn poll_next_raw<R: Read + Unpin>(
508 mut archive: Archive<R>,
509 next: &mut u64,
510 current_header: &mut Option<Header>,
511 current_header_pos: &mut usize,
512 cx: &mut Context<'_>,
513 pax_extensions_data: Option<&[u8]>,
514) -> Poll<Option<io::Result<Entry<Archive<R>>>>> {
515 let mut header_pos = *next;
516
517 loop {
518 if current_header.is_none() {
520 let delta = *next - archive.inner.pos.load(Ordering::SeqCst);
521 match futures_core::ready!(poll_skip(&mut archive, cx, delta)) {
522 Ok(_) => {}
523 Err(err) => return Poll::Ready(Some(Err(err))),
524 }
525
526 *current_header = Some(Header::new_old());
527 *current_header_pos = 0;
528 }
529
530 let header = current_header.as_mut().unwrap();
531
532 match futures_core::ready!(poll_try_read_all(
534 &mut archive,
535 cx,
536 header.as_mut_bytes(),
537 current_header_pos,
538 )) {
539 Ok(true) => {}
540 Ok(false) => return Poll::Ready(None),
541 Err(err) => return Poll::Ready(Some(Err(err))),
542 }
543
544 if !header.as_bytes().iter().all(|i| *i == 0) {
548 *next += BLOCK_SIZE;
549 break;
550 }
551
552 if !archive.inner.ignore_zeros {
553 return Poll::Ready(None);
554 }
555
556 *next += BLOCK_SIZE;
557 header_pos = *next;
558 }
559
560 let header = current_header.as_mut().unwrap();
561
562 let sum = header.as_bytes()[..148]
564 .iter()
565 .chain(&header.as_bytes()[156..])
566 .fold(0, |a, b| a + (*b as u32))
567 + 8 * 32;
568 let cksum = header.cksum()?;
569 if sum != cksum {
570 return Poll::Ready(Some(Err(other("archive header checksum mismatch"))));
571 }
572
573 let file_pos = *next;
574
575 let mut header = current_header.take().unwrap();
576
577 let mut size = header.entry_size()?;
579
580 if let Some(pax) = pax_extensions_data.map(pax_extensions) {
583 for extension in pax {
584 let extension = extension?;
585
586 let Ok(key) = extension.key() else {
589 continue;
590 };
591
592 match key {
593 "size" => {
594 let size_str = extension
595 .value()
596 .map_err(|_e| other("failed to parse pax size as string"))?;
597 size = size_str
598 .parse::<u64>()
599 .map_err(|_e| other("failed to parse pax size"))?;
600 }
601
602 "uid" => {
603 let uid_str = extension
604 .value()
605 .map_err(|_e| other("failed to parse pax uid as string"))?;
606 header.set_uid(
607 uid_str
608 .parse::<u64>()
609 .map_err(|_e| other("failed to parse pax uid"))?,
610 );
611 }
612
613 "gid" => {
614 let gid_str = extension
615 .value()
616 .map_err(|_e| other("failed to parse pax gid as string"))?;
617 header.set_gid(
618 gid_str
619 .parse::<u64>()
620 .map_err(|_e| other("failed to parse pax gid"))?,
621 );
622 }
623
624 _ => {
625 continue;
626 }
627 }
628 }
629 }
630
631 let mut data = VecDeque::with_capacity(1);
632 data.push_back(EntryIo::Data(archive.clone().take(size)));
633
634 let ret = EntryFields {
635 size,
636 header_pos,
637 file_pos,
638 data,
639 header,
640 long_pathname: None,
641 long_linkname: None,
642 pax_extensions: None,
643 unpack_xattrs: archive.inner.unpack_xattrs,
644 preserve_permissions: archive.inner.preserve_permissions,
645 preserve_mtime: archive.inner.preserve_mtime,
646 overwrite: archive.inner.overwrite,
647 allow_external_symlinks: archive.inner.allow_external_symlinks,
648 read_state: None,
649 };
650
651 let size = size
654 .checked_add(BLOCK_SIZE - 1)
655 .ok_or_else(|| other("size overflow"))?;
656 *next = next
657 .checked_add(size & !(BLOCK_SIZE - 1))
658 .ok_or_else(|| other("size overflow"))?;
659
660 Poll::Ready(Some(Ok(ret.into_entry())))
661}
662
663fn poll_parse_sparse_header<R: Read + Unpin>(
664 mut archive: Archive<R>,
665 next: &mut u64,
666 current_ext: &mut Option<GnuExtSparseHeader>,
667 current_ext_pos: &mut usize,
668 entry: &mut EntryFields<Archive<R>>,
669 cx: &mut Context<'_>,
670) -> Poll<io::Result<()>> {
671 if !entry.header.entry_type().is_gnu_sparse() {
672 return Poll::Ready(Ok(()));
673 }
674
675 let gnu = match entry.header.as_gnu() {
676 Some(gnu) => gnu,
677 None => return Poll::Ready(Err(other("sparse entry type listed but not GNU header"))),
678 };
679
680 entry.data.truncate(0);
700
701 let mut cur = 0;
702 let mut remaining = entry.size;
703 {
704 let data = &mut entry.data;
705 let reader = archive.clone();
706 let size = entry.size;
707 let mut add_block = |block: &GnuSparseHeader| -> io::Result<_> {
708 if block.is_empty() {
709 return Ok(());
710 }
711 let off = block.offset()?;
712 let len = block.length()?;
713
714 if len != 0 && (size - remaining) % BLOCK_SIZE != 0 {
715 return Err(other(
716 "previous block in sparse file was not \
717 aligned to 512-byte boundary",
718 ));
719 } else if off < cur {
720 return Err(other(
721 "out of order or overlapping sparse \
722 blocks",
723 ));
724 } else if cur < off {
725 let block = io::repeat(0).take(off - cur);
726 data.push_back(EntryIo::Pad(block));
727 }
728 cur = off
729 .checked_add(len)
730 .ok_or_else(|| other("more bytes listed in sparse file than u64 can hold"))?;
731 remaining = remaining.checked_sub(len).ok_or_else(|| {
732 other(
733 "sparse file consumed more data than the header \
734 listed",
735 )
736 })?;
737 data.push_back(EntryIo::Data(reader.clone().take(len)));
738 Ok(())
739 };
740 for block in gnu.sparse.iter() {
741 add_block(block)?
742 }
743 if gnu.is_extended() {
744 let started_header = current_ext.is_some();
745 if !started_header {
746 let mut ext = GnuExtSparseHeader::new();
747 ext.isextended[0] = 1;
748 *current_ext = Some(ext);
749 *current_ext_pos = 0;
750 }
751
752 let ext = current_ext.as_mut().unwrap();
753 while ext.is_extended() {
754 match futures_core::ready!(poll_try_read_all(
755 &mut archive,
756 cx,
757 ext.as_mut_bytes(),
758 current_ext_pos,
759 )) {
760 Ok(true) => {}
761 Ok(false) => return Poll::Ready(Err(other("failed to read extension"))),
762 Err(err) => return Poll::Ready(Err(err)),
763 }
764
765 *next += BLOCK_SIZE;
766 for block in ext.sparse.iter() {
767 add_block(block)?;
768 }
769 }
770 }
771 }
772 if cur != gnu.real_size()? {
773 return Poll::Ready(Err(other(
774 "mismatch in sparse file chunks and \
775 size in header",
776 )));
777 }
778 entry.size = cur;
779 if remaining > 0 {
780 return Poll::Ready(Err(other(
781 "mismatch in sparse file chunks and \
782 entry size in header",
783 )));
784 }
785
786 Poll::Ready(Ok(()))
787}
788
789impl<R: Read + Unpin> Read for Archive<R> {
790 fn poll_read(
791 self: Pin<&mut Self>,
792 cx: &mut Context<'_>,
793 into: &mut io::ReadBuf<'_>,
794 ) -> Poll<io::Result<()>> {
795 let mut r = if let Ok(v) = self.inner.obj.try_lock() {
796 v
797 } else {
798 return Poll::Pending;
799 };
800
801 let res = futures_core::ready!(Pin::new(&mut *r).poll_read(cx, into));
802 match res {
803 Ok(()) => {
804 self.inner
805 .pos
806 .fetch_add(into.filled().len() as u64, Ordering::SeqCst);
807 Poll::Ready(Ok(()))
808 }
809 Err(err) => Poll::Ready(Err(err)),
810 }
811 }
812}
813
814fn poll_try_read_all<R: Read + Unpin>(
819 mut source: R,
820 cx: &mut Context<'_>,
821 buf: &mut [u8],
822 pos: &mut usize,
823) -> Poll<io::Result<bool>> {
824 while *pos < buf.len() {
825 let mut read_buf = io::ReadBuf::new(&mut buf[*pos..]);
826 match futures_core::ready!(Pin::new(&mut source).poll_read(cx, &mut read_buf)) {
827 Ok(()) if read_buf.filled().is_empty() => {
828 if *pos == 0 {
829 return Poll::Ready(Ok(false));
830 }
831
832 return Poll::Ready(Err(other("failed to read entire block")));
833 }
834 Ok(()) => *pos += read_buf.filled().len(),
835 Err(err) => return Poll::Ready(Err(err)),
836 }
837 }
838
839 *pos = 0;
840 Poll::Ready(Ok(true))
841}
842
843fn poll_skip<R: Read + Unpin>(
845 mut source: R,
846 cx: &mut Context<'_>,
847 mut amt: u64,
848) -> Poll<io::Result<()>> {
849 let mut buf = [0u8; 4096 * 8];
850 while amt > 0 {
851 let n = cmp::min(amt, buf.len() as u64);
852 let mut read_buf = io::ReadBuf::new(&mut buf[..n as usize]);
853 match futures_core::ready!(Pin::new(&mut source).poll_read(cx, &mut read_buf)) {
854 Ok(()) if read_buf.filled().is_empty() => {
855 return Poll::Ready(Err(other("unexpected EOF during skip")));
856 }
857 Ok(()) => {
858 amt -= read_buf.filled().len() as u64;
859 }
860 Err(err) => return Poll::Ready(Err(err)),
861 }
862 }
863
864 Poll::Ready(Ok(()))
865}