Source code for tango.step_caches.local_step_cache

import collections
import logging
import os
import shutil
import warnings
import weakref
from pathlib import Path
from typing import Any, MutableMapping, Optional, OrderedDict, Union, cast

from tango.common.aliases import PathOrStr
from tango.common.params import Params
from tango.step import Step
from tango.step_cache import CacheMetadata, StepCache
from tango.step_info import StepInfo

logger = logging.getLogger(__name__)


[docs]@StepCache.register("local") class LocalStepCache(StepCache): """ This is a :class:`.StepCache` that stores its results on disk, in the location given in ``dir``. Every cached step gets a directory under ``dir`` with that step's :attr:`~tango.step.Step.unique_id`. In that directory we store the results themselves in some format according to the step's :attr:`~tango.step.Step.FORMAT`, and we also write a ``cache-metadata.json`` file that stores the :class:`.CacheMetadata`. The presence of ``cache-metadata.json`` signifies that the cache entry is complete and has been written successfully. .. tip:: Registered as :class:`.StepCache` under the name "local". """ LRU_CACHE_MAX_SIZE = 8 METADATA_FILE_NAME = "cache-metadata.json" def __init__(self, dir: PathOrStr): self.dir = Path(dir) self.dir.mkdir(parents=True, exist_ok=True) # We keep an in-memory cache as well so we don't have to de-serialize stuff # we happen to have in memory already. self.weak_cache: MutableMapping[str, Any] # Not all Python objects can be referenced weakly, and even if they can they # might get removed too quickly, so we also keep an LRU cache. self.strong_cache: OrderedDict[str, Any] self._init_mem_caches() def _init_mem_caches(self): self.weak_cache = weakref.WeakValueDictionary() self.strong_cache = collections.OrderedDict() def __getstate__(self): """ We override `__getstate__()` to customize how instances of this class are pickled since we don't want to persist values in the weak and strong in-memory caches during pickling. And `WeakValueDictionary` can't be pickled anyway. """ return {"dir": self.dir} def __setstate__(self, state): for k, v in state.items(): setattr(self, k, v) self._init_mem_caches() def _add_to_cache(self, key: str, o: Any) -> None: if hasattr(o, "__next__"): # We never cache iterators, because they are mutable, storing their current position. return self.strong_cache[key] = o self.strong_cache.move_to_end(key) while len(self.strong_cache) > self.LRU_CACHE_MAX_SIZE: del self.strong_cache[next(iter(self.strong_cache))] try: self.weak_cache[key] = o except TypeError: pass # Many native Python objects cannot be referenced weakly, and they throw TypeError when you try def _get_from_cache(self, key: str) -> Optional[Any]: result = self.strong_cache.get(key) if result is not None: self.strong_cache.move_to_end(key) return result try: return self.weak_cache[key] except KeyError: return None def _remove_from_cache(self, key: str) -> None: # check and remove from strong cache if key in self.strong_cache: del self.strong_cache[key] assert key not in self.strong_cache # check and remove from weak cache if key in self.weak_cache: del self.weak_cache[key] assert key not in self.weak_cache def _metadata_path(self, step_or_unique_id: Union[Step, StepInfo, str]) -> Path: return self.step_dir(step_or_unique_id) / self.METADATA_FILE_NAME def __contains__(self, step: object) -> bool: if (isinstance(step, Step) and step.cache_results) or ( isinstance(step, StepInfo) and step.cacheable ): key = step.unique_id if key in self.strong_cache: return True if key in self.weak_cache: return True return self._metadata_path( cast(Union[Step, StepInfo], step) # cast is for mypy :/ ).exists() else: return False def __getitem__(self, step: Union[Step, StepInfo]) -> Any: key = step.unique_id result = self._get_from_cache(key) if result is None: if step not in self: raise KeyError(step) metadata = CacheMetadata.from_params(Params.from_file(self._metadata_path(step))) result = metadata.format.read(self.step_dir(step)) self._add_to_cache(key, result) return result def __setitem__(self, step: Step, value: Any) -> None: if not step.cache_results: warnings.warn( f"Tried to cache step '{step.name}' despite being marked as uncacheable", UserWarning, ) return location = self.step_dir(step) location.mkdir(parents=True, exist_ok=True) metadata_location = self._metadata_path(step) if metadata_location.exists(): raise ValueError(f"{metadata_location} already exists! Will not overwrite.") temp_metadata_location = metadata_location.with_suffix(".temp") try: step.format.write(value, location) metadata = CacheMetadata(step=step.unique_id, format=step.format) metadata.to_params().to_file(temp_metadata_location) self._add_to_cache(step.unique_id, value) temp_metadata_location.rename(metadata_location) except: # noqa: E722 try: temp_metadata_location.unlink() except FileNotFoundError: pass raise def __delitem__(self, step: Union[Step, StepInfo]) -> None: location = str(self.dir) + "/" + str(step.unique_id) try: shutil.rmtree(location) self._remove_from_cache(step.unique_id) except OSError: raise OSError(f"Step cache folder for '{step.unique_id}' not found. Cannot be deleted.") def __len__(self) -> int: return sum(1 for _ in self.dir.glob(f"*/{self.METADATA_FILE_NAME}"))
[docs] def step_dir(self, step_or_unique_id: Union[Step, StepInfo, str]) -> Path: """Returns the directory that contains the results of the step. You can use this even for a step that's not cached yet. In that case it will return the directory where the results will be written.""" if isinstance(step_or_unique_id, (Step, StepInfo)): cacheable = ( step_or_unique_id.cache_results if isinstance(step_or_unique_id, Step) else step_or_unique_id.cacheable ) if not cacheable: class_name = ( step_or_unique_id.class_name if isinstance(step_or_unique_id, Step) else step_or_unique_id.step_class_name ) raise RuntimeError( f"Uncacheable steps (like '{class_name}') don't have step directories." ) unique_id = step_or_unique_id.unique_id else: unique_id = step_or_unique_id return self.dir / unique_id