wasmtime_internal_cache/
worker.rs

1//! Background worker that watches over the cache.
2//!
3//! It cleans up old cache, updates statistics and optimizes the cache.
4//! We allow losing some messages (it doesn't hurt) and some races,
5//! but we guarantee eventual consistency and fault tolerancy.
6//! Background tasks can be CPU intensive, but the worker thread has low priority.
7
8#![cfg_attr(
9    not(test),
10    expect(
11        clippy::useless_conversion,
12        reason = "cfg(test) and cfg(not(test)) have a different definition \
13                  of `SystemTime`, so conversions below are needed in \
14                  one mode but not the other, just ignore the lint in this \
15                  module in not(test) mode where the conversion isn't required",
16    )
17)]
18
19use super::{CacheConfig, fs_write_atomic};
20use log::{debug, info, trace, warn};
21use serde_derive::{Deserialize, Serialize};
22use std::cmp;
23use std::collections::HashMap;
24use std::ffi::OsStr;
25use std::fmt;
26use std::fs;
27use std::path::{Path, PathBuf};
28use std::sync::mpsc::{Receiver, SyncSender, sync_channel};
29#[cfg(test)]
30use std::sync::{Arc, Condvar, Mutex};
31use std::thread;
32use std::time::Duration;
33#[cfg(not(test))]
34use std::time::SystemTime;
35#[cfg(test)]
36use tests::system_time_stub::SystemTimeStub as SystemTime;
37
38#[derive(Clone)]
39pub(super) struct Worker {
40    sender: SyncSender<CacheEvent>,
41    #[cfg(test)]
42    stats: Arc<(Mutex<WorkerStats>, Condvar)>,
43}
44
45struct WorkerThread {
46    receiver: Receiver<CacheEvent>,
47    cache_config: CacheConfig,
48    #[cfg(test)]
49    stats: Arc<(Mutex<WorkerStats>, Condvar)>,
50}
51
52#[cfg(test)]
53#[derive(Default)]
54struct WorkerStats {
55    dropped: u32,
56    sent: u32,
57    handled: u32,
58}
59
60#[derive(Debug, Clone)]
61enum CacheEvent {
62    OnCacheGet(PathBuf),
63    OnCacheUpdate(PathBuf),
64}
65
66impl Worker {
67    pub(super) fn start_new(cache_config: &CacheConfig) -> Self {
68        let queue_size = match cache_config.worker_event_queue_size() {
69            num if num <= usize::max_value() as u64 => num as usize,
70            _ => usize::max_value(),
71        };
72        let (tx, rx) = sync_channel(queue_size);
73
74        #[cfg(test)]
75        let stats = Arc::new((Mutex::new(WorkerStats::default()), Condvar::new()));
76
77        let worker_thread = WorkerThread {
78            receiver: rx,
79            cache_config: cache_config.clone(),
80            #[cfg(test)]
81            stats: stats.clone(),
82        };
83
84        // when self is dropped, sender will be dropped, what will cause the channel
85        // to hang, and the worker thread to exit -- it happens in the tests
86        // non-tests binary has only a static worker, so Rust doesn't drop it
87        thread::spawn(move || worker_thread.run());
88
89        Self {
90            sender: tx,
91            #[cfg(test)]
92            stats,
93        }
94    }
95
96    pub(super) fn on_cache_get_async(&self, path: impl AsRef<Path>) {
97        let event = CacheEvent::OnCacheGet(path.as_ref().to_path_buf());
98        self.send_cache_event(event);
99    }
100
101    pub(super) fn on_cache_update_async(&self, path: impl AsRef<Path>) {
102        let event = CacheEvent::OnCacheUpdate(path.as_ref().to_path_buf());
103        self.send_cache_event(event);
104    }
105
106    #[inline]
107    fn send_cache_event(&self, event: CacheEvent) {
108        let sent_event = self.sender.try_send(event.clone());
109
110        if let Err(ref err) = sent_event {
111            info!(
112                "Failed to send asynchronously message to worker thread, \
113                 event: {event:?}, error: {err}"
114            );
115        }
116
117        #[cfg(test)]
118        {
119            let mut stats = self
120                .stats
121                .0
122                .lock()
123                .expect("Failed to acquire worker stats lock");
124
125            if sent_event.is_ok() {
126                stats.sent += 1;
127            } else {
128                stats.dropped += 1;
129            }
130        }
131    }
132
133    #[cfg(test)]
134    pub(super) fn events_dropped(&self) -> u32 {
135        let stats = self
136            .stats
137            .0
138            .lock()
139            .expect("Failed to acquire worker stats lock");
140        stats.dropped
141    }
142
143    #[cfg(test)]
144    pub(super) fn wait_for_all_events_handled(&self) {
145        let (stats, condvar) = &*self.stats;
146        let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
147        while stats.handled != stats.sent {
148            stats = condvar
149                .wait(stats)
150                .expect("Failed to reacquire worker stats lock");
151        }
152    }
153}
154
155impl fmt::Debug for Worker {
156    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157        f.debug_struct("Worker").finish()
158    }
159}
160
161#[derive(Serialize, Deserialize)]
162struct ModuleCacheStatistics {
163    pub usages: u64,
164    #[serde(rename = "optimized-compression")]
165    pub compression_level: i32,
166}
167
168impl ModuleCacheStatistics {
169    fn default(cache_config: &CacheConfig) -> Self {
170        Self {
171            usages: 0,
172            compression_level: cache_config.baseline_compression_level(),
173        }
174    }
175}
176
177enum CacheEntry {
178    Recognized {
179        path: PathBuf,
180        mtime: SystemTime,
181        size: u64,
182    },
183    Unrecognized {
184        path: PathBuf,
185        is_dir: bool,
186    },
187}
188
189macro_rules! unwrap_or_warn {
190    ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
191        match $result {
192            Ok(val) => val,
193            Err(err) => {
194                warn!("{}, path: {}, msg: {}", $err_msg, $path.display(), err);
195                $cont
196            }
197        }
198    };
199}
200
201impl WorkerThread {
202    fn run(self) {
203        debug!("Cache worker thread started.");
204
205        Self::lower_thread_priority();
206
207        #[cfg(test)]
208        let (stats, condvar) = &*self.stats;
209
210        for event in self.receiver.iter() {
211            match event {
212                CacheEvent::OnCacheGet(path) => self.handle_on_cache_get(path),
213                CacheEvent::OnCacheUpdate(path) => self.handle_on_cache_update(path),
214            }
215
216            #[cfg(test)]
217            {
218                let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
219                stats.handled += 1;
220                condvar.notify_all();
221            }
222        }
223    }
224
225    #[cfg(target_os = "fuchsia")]
226    fn lower_thread_priority() {
227        // TODO This needs to use Fuchsia thread profiles
228        // https://fuchsia.dev/fuchsia-src/reference/kernel_objects/profile
229        warn!(
230            "Lowering thread priority on Fuchsia is currently a noop. It might affect application performance."
231        );
232    }
233
234    #[cfg(target_os = "windows")]
235    fn lower_thread_priority() {
236        use windows_sys::Win32::System::Threading::*;
237
238        // https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-setthreadpriority
239        // https://docs.microsoft.com/en-us/windows/win32/procthread/scheduling-priorities
240
241        if unsafe { SetThreadPriority(GetCurrentThread(), THREAD_MODE_BACKGROUND_BEGIN) } == 0 {
242            warn!(
243                "Failed to lower worker thread priority. It might affect application performance."
244            );
245        }
246    }
247
248    #[cfg(not(any(target_os = "windows", target_os = "fuchsia")))]
249    fn lower_thread_priority() {
250        // http://man7.org/linux/man-pages/man7/sched.7.html
251
252        const NICE_DELTA_FOR_BACKGROUND_TASKS: i32 = 3;
253
254        match rustix::process::nice(NICE_DELTA_FOR_BACKGROUND_TASKS) {
255            Ok(current_nice) => {
256                debug!("New nice value of worker thread: {current_nice}");
257            }
258            Err(err) => {
259                warn!(
260                    "Failed to lower worker thread priority ({err:?}). It might affect application performance."
261                );
262            }
263        };
264    }
265
266    /// Increases the usage counter and recompresses the file
267    /// if the usage counter reached configurable threshold.
268    fn handle_on_cache_get(&self, path: PathBuf) {
269        trace!("handle_on_cache_get() for path: {}", path.display());
270
271        // construct .stats file path
272        let filename = path.file_name().unwrap().to_str().unwrap();
273        let stats_path = path.with_file_name(format!("{filename}.stats"));
274
275        // load .stats file (default if none or error)
276        let mut stats = read_stats_file(stats_path.as_ref())
277            .unwrap_or_else(|| ModuleCacheStatistics::default(&self.cache_config));
278
279        // step 1: update the usage counter & write to the disk
280        //         it's racy, but it's fine (the counter will be just smaller,
281        //         sometimes will retrigger recompression)
282        stats.usages += 1;
283        if !write_stats_file(stats_path.as_ref(), &stats) {
284            return;
285        }
286
287        // step 2: recompress if there's a need
288        let opt_compr_lvl = self.cache_config.optimized_compression_level();
289        if stats.compression_level >= opt_compr_lvl
290            || stats.usages
291                < self
292                    .cache_config
293                    .optimized_compression_usage_counter_threshold()
294        {
295            return;
296        }
297
298        let lock_path = if let Some(p) = acquire_task_fs_lock(
299            path.as_ref(),
300            self.cache_config.optimizing_compression_task_timeout(),
301            self.cache_config
302                .allowed_clock_drift_for_files_from_future(),
303        ) {
304            p
305        } else {
306            return;
307        };
308
309        trace!("Trying to recompress file: {}", path.display());
310
311        // recompress, write to other file, rename (it's atomic file content exchange)
312        // and update the stats file
313        let compressed_cache_bytes = unwrap_or_warn!(
314            fs::read(&path),
315            return,
316            "Failed to read old cache file",
317            path
318        );
319
320        let cache_bytes = unwrap_or_warn!(
321            zstd::decode_all(&compressed_cache_bytes[..]),
322            return,
323            "Failed to decompress cached code",
324            path
325        );
326
327        let recompressed_cache_bytes = unwrap_or_warn!(
328            zstd::encode_all(&cache_bytes[..], opt_compr_lvl),
329            return,
330            "Failed to compress cached code",
331            path
332        );
333
334        unwrap_or_warn!(
335            fs::write(&lock_path, &recompressed_cache_bytes),
336            return,
337            "Failed to write recompressed cache",
338            lock_path
339        );
340
341        unwrap_or_warn!(
342            fs::rename(&lock_path, &path),
343            {
344                if let Err(error) = fs::remove_file(&lock_path) {
345                    warn!(
346                        "Failed to clean up (remove) recompressed cache, path {}, err: {}",
347                        lock_path.display(),
348                        error
349                    );
350                }
351
352                return;
353            },
354            "Failed to rename recompressed cache",
355            lock_path
356        );
357
358        // update stats file (reload it! recompression can take some time)
359        if let Some(mut new_stats) = read_stats_file(stats_path.as_ref()) {
360            if new_stats.compression_level >= opt_compr_lvl {
361                // Rare race:
362                //    two instances with different opt_compr_lvl: we don't know in which order they updated
363                //    the cache file and the stats file (they are not updated together atomically)
364                // Possible solution is to use directories per cache entry, but it complicates the system
365                // and is not worth it.
366                debug!(
367                    "DETECTED task did more than once (or race with new file): \
368                     recompression of {}. Note: if optimized compression level setting \
369                     has changed in the meantine, the stats file might contain \
370                     inconsistent compression level due to race.",
371                    path.display()
372                );
373            } else {
374                new_stats.compression_level = opt_compr_lvl;
375                let _ = write_stats_file(stats_path.as_ref(), &new_stats);
376            }
377
378            if new_stats.usages < stats.usages {
379                debug!(
380                    "DETECTED lower usage count (new file or race with counter \
381                     increasing): file {}",
382                    path.display()
383                );
384            }
385        } else {
386            debug!(
387                "Can't read stats file again to update compression level (it might got \
388                 cleaned up): file {}",
389                stats_path.display()
390            );
391        }
392
393        trace!("Task finished: recompress file: {}", path.display());
394    }
395
396    fn directory(&self) -> &PathBuf {
397        self.cache_config
398            .directory()
399            .expect("CacheConfig should be validated before being passed to a WorkerThread")
400    }
401
402    fn handle_on_cache_update(&self, path: PathBuf) {
403        trace!("handle_on_cache_update() for path: {}", path.display());
404
405        // ---------------------- step 1: create .stats file
406
407        // construct .stats file path
408        let filename = path
409            .file_name()
410            .expect("Expected valid cache file name")
411            .to_str()
412            .expect("Expected valid cache file name");
413        let stats_path = path.with_file_name(format!("{filename}.stats"));
414
415        // create and write stats file
416        let mut stats = ModuleCacheStatistics::default(&self.cache_config);
417        stats.usages += 1;
418        write_stats_file(&stats_path, &stats);
419
420        // ---------------------- step 2: perform cleanup task if needed
421
422        // acquire lock for cleanup task
423        // Lock is a proof of recent cleanup task, so we don't want to delete them.
424        // Expired locks will be deleted by the cleanup task.
425        let cleanup_file = self.directory().join(".cleanup"); // some non existing marker file
426        if acquire_task_fs_lock(
427            &cleanup_file,
428            self.cache_config.cleanup_interval(),
429            self.cache_config
430                .allowed_clock_drift_for_files_from_future(),
431        )
432        .is_none()
433        {
434            return;
435        }
436
437        trace!("Trying to clean up cache");
438
439        let mut cache_index = self.list_cache_contents();
440        let future_tolerance = SystemTime::now()
441            .checked_add(
442                self.cache_config
443                    .allowed_clock_drift_for_files_from_future(),
444            )
445            .expect("Brace your cache, the next Big Bang is coming (time overflow)");
446        cache_index.sort_unstable_by(|lhs, rhs| {
447            // sort by age
448            use CacheEntry::*;
449            match (lhs, rhs) {
450                (Recognized { mtime: lhs_mt, .. }, Recognized { mtime: rhs_mt, .. }) => {
451                    match (*lhs_mt > future_tolerance, *rhs_mt > future_tolerance) {
452                        // later == younger
453                        (false, false) => rhs_mt.cmp(lhs_mt),
454                        // files from far future are treated as oldest recognized files
455                        // we want to delete them, so the cache keeps track of recent files
456                        // however, we don't delete them uncodintionally,
457                        // because .stats file can be overwritten with a meaningful mtime
458                        (true, false) => cmp::Ordering::Greater,
459                        (false, true) => cmp::Ordering::Less,
460                        (true, true) => cmp::Ordering::Equal,
461                    }
462                }
463                // unrecognized is kind of infinity
464                (Recognized { .. }, Unrecognized { .. }) => cmp::Ordering::Less,
465                (Unrecognized { .. }, Recognized { .. }) => cmp::Ordering::Greater,
466                (Unrecognized { .. }, Unrecognized { .. }) => cmp::Ordering::Equal,
467            }
468        });
469
470        // find "cut" boundary:
471        // - remove unrecognized files anyway,
472        // - remove some cache files if some quota has been exceeded
473        let mut total_size = 0u64;
474        let mut start_delete_idx = None;
475        let mut start_delete_idx_if_deleting_recognized_items: Option<usize> = None;
476
477        let total_size_limit = self.cache_config.files_total_size_soft_limit();
478        let file_count_limit = self.cache_config.file_count_soft_limit();
479        let tsl_if_deleting = total_size_limit
480            .checked_mul(
481                self.cache_config
482                    .files_total_size_limit_percent_if_deleting() as u64,
483            )
484            .unwrap()
485            / 100;
486        let fcl_if_deleting = file_count_limit
487            .checked_mul(self.cache_config.file_count_limit_percent_if_deleting() as u64)
488            .unwrap()
489            / 100;
490
491        for (idx, item) in cache_index.iter().enumerate() {
492            let size = if let CacheEntry::Recognized { size, .. } = item {
493                size
494            } else {
495                start_delete_idx = Some(idx);
496                break;
497            };
498
499            total_size += size;
500            if start_delete_idx_if_deleting_recognized_items.is_none()
501                && (total_size > tsl_if_deleting || (idx + 1) as u64 > fcl_if_deleting)
502            {
503                start_delete_idx_if_deleting_recognized_items = Some(idx);
504            }
505
506            if total_size > total_size_limit || (idx + 1) as u64 > file_count_limit {
507                start_delete_idx = start_delete_idx_if_deleting_recognized_items;
508                break;
509            }
510        }
511
512        if let Some(idx) = start_delete_idx {
513            for item in &cache_index[idx..] {
514                let (result, path, entity) = match item {
515                    CacheEntry::Recognized { path, .. }
516                    | CacheEntry::Unrecognized {
517                        path,
518                        is_dir: false,
519                    } => (fs::remove_file(path), path, "file"),
520                    CacheEntry::Unrecognized { path, is_dir: true } => {
521                        (fs::remove_dir_all(path), path, "directory")
522                    }
523                };
524                if let Err(err) = result {
525                    warn!(
526                        "Failed to remove {} during cleanup, path: {}, err: {}",
527                        entity,
528                        path.display(),
529                        err
530                    );
531                }
532            }
533        }
534
535        trace!("Task finished: clean up cache");
536    }
537
538    // Be fault tolerant: list as much as you can, and ignore the rest
539    fn list_cache_contents(&self) -> Vec<CacheEntry> {
540        fn enter_dir(
541            vec: &mut Vec<CacheEntry>,
542            dir_path: &Path,
543            level: u8,
544            cache_config: &CacheConfig,
545        ) {
546            macro_rules! add_unrecognized {
547                (file: $path:expr) => {
548                    add_unrecognized!(false, $path)
549                };
550                (dir: $path:expr) => {
551                    add_unrecognized!(true, $path)
552                };
553                ($is_dir:expr, $path:expr) => {
554                    vec.push(CacheEntry::Unrecognized {
555                        path: $path.to_path_buf(),
556                        is_dir: $is_dir,
557                    })
558                };
559            }
560            macro_rules! add_unrecognized_and {
561                ([ $( $ty:ident: $path:expr ),* ], $cont:stmt) => {{
562                    $( add_unrecognized!($ty: $path); )*
563                        $cont
564                }};
565            }
566
567            macro_rules! unwrap_or {
568                ($result:expr, $cont:stmt, $err_msg:expr) => {
569                    unwrap_or!($result, $cont, $err_msg, dir_path)
570                };
571                ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
572                    unwrap_or_warn!(
573                        $result,
574                        $cont,
575                        format!("{}, level: {}", $err_msg, level),
576                        $path
577                    )
578                };
579            }
580
581            // If we fail to list a directory, something bad is happening anyway
582            // (something touches our cache or we have disk failure)
583            // Try to delete it, so we can stay within soft limits of the cache size.
584            // This comment applies later in this function, too.
585            let it = unwrap_or!(
586                fs::read_dir(dir_path),
587                add_unrecognized_and!([dir: dir_path], return),
588                "Failed to list cache directory, deleting it"
589            );
590
591            let mut cache_files = HashMap::new();
592            for entry in it {
593                // read_dir() returns an iterator over results - in case some of them are errors
594                // we don't know their names, so we can't delete them. We don't want to delete
595                // the whole directory with good entries too, so we just ignore the erroneous entries.
596                let entry = unwrap_or!(
597                    entry,
598                    continue,
599                    "Failed to read a cache dir entry (NOT deleting it, it still occupies space)"
600                );
601                let path = entry.path();
602                match (level, path.is_dir()) {
603                    (0..=1, true) => enter_dir(vec, &path, level + 1, cache_config),
604                    (0..=1, false) => {
605                        if level == 0
606                            && path.file_stem() == Some(OsStr::new(".cleanup"))
607                                && path.extension().is_some()
608                                // assume it's cleanup lock
609                                && !is_fs_lock_expired(
610                                    Some(&entry),
611                                    &path,
612                                    cache_config.cleanup_interval(),
613                                    cache_config.allowed_clock_drift_for_files_from_future(),
614                                )
615                        {
616                            continue; // skip active lock
617                        }
618                        add_unrecognized!(file: path);
619                    }
620                    (2, false) => {
621                        match path.extension().and_then(OsStr::to_str) {
622                            // mod or stats file
623                            None | Some("stats") => {
624                                cache_files.insert(path, entry);
625                            }
626
627                            Some(ext) => {
628                                // check if valid lock
629                                let recognized = ext.starts_with("wip-")
630                                    && !is_fs_lock_expired(
631                                        Some(&entry),
632                                        &path,
633                                        cache_config.optimizing_compression_task_timeout(),
634                                        cache_config.allowed_clock_drift_for_files_from_future(),
635                                    );
636
637                                if !recognized {
638                                    add_unrecognized!(file: path);
639                                }
640                            }
641                        }
642                    }
643                    (_, is_dir) => add_unrecognized!(is_dir, path),
644                }
645            }
646
647            // associate module with its stats & handle them
648            // assumption: just mods and stats
649            for (path, entry) in cache_files.iter() {
650                let path_buf: PathBuf;
651                let (mod_, stats_, is_mod) = match path.extension() {
652                    Some(_) => {
653                        path_buf = path.with_extension("");
654                        (
655                            cache_files.get(&path_buf).map(|v| (&path_buf, v)),
656                            Some((path, entry)),
657                            false,
658                        )
659                    }
660                    None => {
661                        path_buf = path.with_extension("stats");
662                        (
663                            Some((path, entry)),
664                            cache_files.get(&path_buf).map(|v| (&path_buf, v)),
665                            true,
666                        )
667                    }
668                };
669
670                // construct a cache entry
671                match (mod_, stats_, is_mod) {
672                    (Some((mod_path, mod_entry)), Some((stats_path, stats_entry)), true) => {
673                        let mod_metadata = unwrap_or!(
674                            mod_entry.metadata(),
675                            add_unrecognized_and!([file: stats_path, file: mod_path], continue),
676                            "Failed to get metadata, deleting BOTH module cache and stats files",
677                            mod_path
678                        );
679                        let stats_mtime = unwrap_or!(
680                            stats_entry.metadata().and_then(|m| m.modified()),
681                            add_unrecognized_and!(
682                                [file: stats_path],
683                                unwrap_or!(
684                                    mod_metadata.modified(),
685                                    add_unrecognized_and!(
686                                        [file: stats_path, file: mod_path],
687                                        continue
688                                    ),
689                                    "Failed to get mtime, deleting BOTH module cache and stats \
690                                     files",
691                                    mod_path
692                                )
693                            ),
694                            "Failed to get metadata/mtime, deleting the file",
695                            stats_path
696                        );
697                        // .into() called for the SystemTimeStub if cfg(test)
698                        vec.push(CacheEntry::Recognized {
699                            path: mod_path.to_path_buf(),
700                            mtime: stats_mtime.into(),
701                            size: mod_metadata.len(),
702                        })
703                    }
704                    (Some(_), Some(_), false) => (), // was or will be handled by previous branch
705                    (Some((mod_path, mod_entry)), None, _) => {
706                        let (mod_metadata, mod_mtime) = unwrap_or!(
707                            mod_entry
708                                .metadata()
709                                .and_then(|md| md.modified().map(|mt| (md, mt))),
710                            add_unrecognized_and!([file: mod_path], continue),
711                            "Failed to get metadata/mtime, deleting the file",
712                            mod_path
713                        );
714                        // .into() called for the SystemTimeStub if cfg(test)
715                        vec.push(CacheEntry::Recognized {
716                            path: mod_path.to_path_buf(),
717                            mtime: mod_mtime.into(),
718                            size: mod_metadata.len(),
719                        })
720                    }
721                    (None, Some((stats_path, _stats_entry)), _) => {
722                        debug!("Found orphaned stats file: {}", stats_path.display());
723                        add_unrecognized!(file: stats_path);
724                    }
725                    _ => unreachable!(),
726                }
727            }
728        }
729
730        let mut vec = Vec::new();
731        enter_dir(&mut vec, self.directory(), 0, &self.cache_config);
732        vec
733    }
734}
735
736fn read_stats_file(path: &Path) -> Option<ModuleCacheStatistics> {
737    fs::read_to_string(path)
738        .map_err(|err| {
739            trace!(
740                "Failed to read stats file, path: {}, err: {}",
741                path.display(),
742                err
743            )
744        })
745        .and_then(|contents| {
746            toml::from_str::<ModuleCacheStatistics>(&contents).map_err(|err| {
747                trace!(
748                    "Failed to parse stats file, path: {}, err: {}",
749                    path.display(),
750                    err,
751                )
752            })
753        })
754        .ok()
755}
756
757fn write_stats_file(path: &Path, stats: &ModuleCacheStatistics) -> bool {
758    toml::to_string_pretty(&stats)
759        .map_err(|err| {
760            warn!(
761                "Failed to serialize stats file, path: {}, err: {}",
762                path.display(),
763                err
764            )
765        })
766        .and_then(|serialized| {
767            fs_write_atomic(path, "stats", serialized.as_bytes()).map_err(|_| ())
768        })
769        .is_ok()
770}
771
772/// Tries to acquire a lock for specific task.
773///
774/// Returns Some(path) to the lock if succeeds. The task path must not
775/// contain any extension and have file stem.
776///
777/// To release a lock you need either manually rename or remove it,
778/// or wait until it expires and cleanup task removes it.
779///
780/// Note: this function is racy. Main idea is: be fault tolerant and
781///       never block some task. The price is that we rarely do some task
782///       more than once.
783fn acquire_task_fs_lock(
784    task_path: &Path,
785    timeout: Duration,
786    allowed_future_drift: Duration,
787) -> Option<PathBuf> {
788    assert!(task_path.extension().is_none());
789    assert!(task_path.file_stem().is_some());
790
791    // list directory
792    let dir_path = task_path.parent()?;
793    let it = fs::read_dir(dir_path)
794        .map_err(|err| {
795            warn!(
796                "Failed to list cache directory, path: {}, err: {}",
797                dir_path.display(),
798                err
799            )
800        })
801        .ok()?;
802
803    // look for existing locks
804    for entry in it {
805        let entry = entry
806            .map_err(|err| {
807                warn!(
808                    "Failed to list cache directory, path: {}, err: {}",
809                    dir_path.display(),
810                    err
811                )
812            })
813            .ok()?;
814
815        let path = entry.path();
816        if path.is_dir() || path.file_stem() != task_path.file_stem() {
817            continue;
818        }
819
820        // check extension and mtime
821        match path.extension() {
822            None => continue,
823            Some(ext) => {
824                if let Some(ext_str) = ext.to_str() {
825                    // if it's None, i.e. not valid UTF-8 string, then that's not our lock for sure
826                    if ext_str.starts_with("wip-")
827                        && !is_fs_lock_expired(Some(&entry), &path, timeout, allowed_future_drift)
828                    {
829                        return None;
830                    }
831                }
832            }
833        }
834    }
835
836    // create the lock
837    let lock_path = task_path.with_extension(format!("wip-{}", std::process::id()));
838    let _file = fs::OpenOptions::new()
839        .create_new(true)
840        .write(true)
841        .open(&lock_path)
842        .map_err(|err| {
843            warn!(
844                "Failed to create lock file (note: it shouldn't exists): path: {}, err: {}",
845                lock_path.display(),
846                err
847            )
848        })
849        .ok()?;
850
851    Some(lock_path)
852}
853
854// we have either both, or just path; dir entry is desirable since on some platforms we can get
855// metadata without extra syscalls
856// furthermore: it's better to get a path if we have it instead of allocating a new one from the dir entry
857fn is_fs_lock_expired(
858    entry: Option<&fs::DirEntry>,
859    path: &PathBuf,
860    threshold: Duration,
861    allowed_future_drift: Duration,
862) -> bool {
863    let mtime = match entry
864        .map_or_else(|| path.metadata(), |e| e.metadata())
865        .and_then(|metadata| metadata.modified())
866    {
867        Ok(mt) => mt,
868        Err(err) => {
869            warn!(
870                "Failed to get metadata/mtime, treating as an expired lock, path: {}, err: {}",
871                path.display(),
872                err
873            );
874            return true; // can't read mtime, treat as expired, so this task will not be starved
875        }
876    };
877
878    // DON'T use: mtime.elapsed() -- we must call SystemTime directly for the tests to be deterministic
879    match SystemTime::now().duration_since(mtime) {
880        Ok(elapsed) => elapsed >= threshold,
881        Err(err) => {
882            trace!(
883                "Found mtime in the future, treating as a not expired lock, path: {}, err: {}",
884                path.display(),
885                err
886            );
887            // the lock is expired if the time is too far in the future
888            // it is fine to have network share and not synchronized clocks,
889            // but it's not good when user changes time in their system clock
890            err.duration() > allowed_future_drift
891        }
892    }
893}
894
895#[cfg(test)]
896mod tests;