Source code for tango.integrations.beaker.executor

import json
import logging
import os
import threading
import time
import uuid
import warnings
from abc import abstractmethod
from typing import Dict, List, NamedTuple, Optional, Sequence, Set, Tuple, Union

from beaker import (
    Beaker,
    DataMount,
    Dataset,
    DatasetConflict,
    DatasetNotFound,
    Digest,
    EnvVar,
    Experiment,
    ExperimentNotFound,
    ExperimentSpec,
    JobFailedError,
    JobTimeoutError,
    NodeResources,
    Priority,
    TaskResources,
    TaskSpec,
    TaskStoppedError,
)
from git import Git, GitCommandError, InvalidGitRepositoryError, Repo

from tango.common.exceptions import (
    CancellationError,
    ConfigurationError,
    ExecutorError,
    RunCancelled,
)
from tango.common.logging import cli_logger, log_exception
from tango.common.registrable import Registrable
from tango.executor import ExecutionMetadata, Executor, ExecutorOutput
from tango.step import Step
from tango.step_graph import StepGraph
from tango.step_info import GitMetadata
from tango.version import VERSION
from tango.workspace import Workspace

from .common import Constants, get_client

logger = logging.getLogger(__name__)


class StepFailedError(ExecutorError):
    def __init__(self, msg: str, experiment_url: str):
        super().__init__(msg)
        self.experiment_url = experiment_url


[docs]class ResourceAssignmentError(ExecutorError): """ Raised when a scheduler can't find enough free resources at the moment to run a step. """
class UnrecoverableResourceAssignmentError(ExecutorError): """ An unrecoverable version of :class:`ResourceAssignmentError`. Raises this from a :class:`BeakerScheduler` will cause the executor to fail. """
[docs]class ResourceAssignment(NamedTuple): """ Resources assigned to a step. """ cluster: Union[str, List[str]] """ The cluster(s) to use to execute the step. """ resources: TaskResources """ The compute resources on the cluster to allocate for execution of the step. """ priority: Union[str, Priority] """ The priority to execute the step with. """
[docs]class BeakerScheduler(Registrable): """ A :class:`BeakerScheduler` is responsible for determining which resources and priority to assign to the execution of a step. """ default_implementation = "simple" """ The default implementation is :class:`SimpleBeakerScheduler`. """ def __init__(self): self._beaker: Optional[Beaker] = None @property def beaker(self) -> Beaker: if self._beaker is None: raise ValueError("'beaker' client has not be assigned to scheduler yet!") return self._beaker @beaker.setter def beaker(self, beaker: Beaker) -> None: self._beaker = beaker
[docs] @abstractmethod def schedule(self, step: Step) -> ResourceAssignment: """ Determine the :class:`ResourceAssignment` for a step. :raises ResourceAssignmentError: If the scheduler can't find enough free resources at the moment to run the step. """ raise NotImplementedError()
[docs]@BeakerScheduler.register("simple") class SimpleBeakerScheduler(BeakerScheduler): """ The :class:`SimpleBeakerScheduler` just searches the given clusters for one with enough resources to match what's specified by the step's required resources. """ def __init__(self, clusters: List[str], priority: Union[str, Priority]): super().__init__() self.clusters = clusters self.priority = priority self._node_resources: Optional[Dict[str, List[NodeResources]]] = None if not self.clusters: raise ConfigurationError("At least one cluster is required in 'clusters'") @property def node_resources(self) -> Dict[str, List[NodeResources]]: if self._node_resources is None: node_resources = { cluster: [node.limits for node in self.beaker.cluster.nodes(cluster)] for cluster in self.clusters } self._node_resources = node_resources return node_resources else: return self._node_resources def schedule(self, step: Step) -> ResourceAssignment: step_resources = step.resources task_resources = TaskResources( cpu_count=step_resources.cpu_count, gpu_count=step_resources.gpu_count, memory=step_resources.memory, shared_memory=step_resources.shared_memory, ) clusters = self.clusters if step_resources.gpu_type is not None: clusters = [ cluster for cluster, nodes in self.node_resources.items() if all([node.gpu_type == step_resources.gpu_type for node in nodes]) ] if not clusters: raise UnrecoverableResourceAssignmentError( f"Could not find cluster with nodes that have GPU type '{step_resources.gpu_type}'" ) return ResourceAssignment( cluster=clusters, resources=task_resources, priority=self.priority )
[docs]@Executor.register("beaker") class BeakerExecutor(Executor): """ This is a :class:`~tango.executor.Executor` that runs steps on `Beaker`_. Each step is run as its own Beaker experiment. .. tip:: Registered as an :class:`~tango.executor.Executor` under the name "beaker". .. important:: The :class:`BeakerExecutor` requires that you run Tango within a GitHub repository and you push all of your changes prior to each ``tango run`` call. It also requires that you have a `GitHub personal access token <https://github.com/settings/tokens/new>`_ with at least the "repo" scope set to the environment variable ``GITHUB_TOKEN`` (you can also set it using the ``github_token`` parameter, see below). This is because :class:`BeakerExecutor` has to be able to clone your code from Beaker. .. important:: The :class:`BeakerExecutor` will try to recreate your Python environment on Beaker every time a step is run, so it's important that you specify all of your dependencies in a PIP ``requirements.txt`` file, ``setup.py`` file, or a conda ``environment.yml`` file. Alternatively you could provide the ``install_cmd`` argument. .. important:: The :class:`BeakerExecutor` takes no responsibility for saving the results of steps that it runs on Beaker. That's the job of your workspace. So make sure your using the right type of workspace or your results will be lost. For example, any "remote" workspace (like the :class:`BeakerWorkspace`) would work, or in some cases you could use a :class:`~tango.workspaces.LocalWorkspace` on an NFS drive. .. important:: If you're running a step that requires special hardware, e.g. a GPU, you should specify that in the ``step_resources`` parameter to the step, or by overriding the step's :meth:`.resources() <tango.step.Step.resources>` property method. :param workspace: The :class:`~tango.workspace.Workspace` to use. :param clusters: A list of Beaker clusters that the executor may use to run steps. If ``scheduler`` is specified, this argument is ignored. :param include_package: A list of Python packages to import before running steps. :param beaker_workspace: The name or ID of the Beaker workspace to use. :param github_token: You can use this parameter to set a GitHub personal access token instead of using the ``GITHUB_TOKEN`` environment variable. :param google_token: You can use this parameter to set a Google Cloud token instead of using the ``GOOGLE_TOKEN`` environment variable. :param beaker_image: The name or ID of a Beaker image to use for running steps on Beaker. The image must come with bash and `conda <https://docs.conda.io/en/latest/index.html>`_ installed (Miniconda is okay). This is mutually exclusive with the ``docker_image`` parameter. If neither ``beaker_image`` nor ``docker_image`` is specified, the :data:`DEFAULT_BEAKER_IMAGE` will be used. :param docker_image: The name of a publicly-available Docker image to use for running steps on Beaker. The image must come with bash and `conda <https://docs.conda.io/en/latest/index.html>`_ installed (Miniconda is okay). This is mutually exclusive with the ``beaker_image`` parameter. :param datasets: External data sources to mount into the Beaker job for each step. You could use this to mount an NFS drive, for example. :param env_vars: Environment variables to set in the Beaker job for each step. :param venv_name: The name of the conda virtual environment to use or create on the image. If you're using your own image that already has a conda environment you want to use, you should set this variable to the name of that environment. You can also set this to "base" to use the base environment. :param parallelism: Control the maximum number of steps run in parallel on Beaker. :param install_cmd: Override the command used to install your code and its dependencies in each Beaker job. For example, you could set ``install_cmd="pip install .[dev]"``. :param priority: The default task priority to assign to jobs ran on Beaker. If ``scheduler`` is specified, this argument is ignored. :param scheduler: A :class:`BeakerScheduler` to use for assigning resources to steps. If not specified the :class:`SimpleBeakerScheduler` is used with the given ``clusters`` and ``priority``. :param allow_dirty: By default, the Beaker Executor requires that your git working directory has no uncommitted changes. If you set this to ``True``, we skip this check. :param kwargs: Additional keyword arguments passed to :meth:`Beaker.from_env() <beaker.Beaker.from_env()>`. .. attention:: Certain parameters should not be included in the :data:`~tango.settings.TangoGlobalSettings.executor` part of your ``tango.yml`` file, namely ``workspace`` and ``include_package``. Instead use the top-level :data:`~tango.settings.TangoGlobalSettings.workspace` and :data:`~tango.settings.TangoGlobalSettings.include_package` fields, respectively. :examples: **Minimal tango.yaml file** You can use this executor by specifying it in your ``tango.yml`` settings file: .. code:: yaml executor: type: beaker beaker_workspace: ai2/my-workspace clusters: - ai2/general-cirrascale **Using GPUs** If you have a step that requires a GPU, there are two things you need to do: 1. First, you'll need to ensure that the :class:`BeakerExecutor` can install your dependencies the right way to support the GPU hardware. There are usually two ways to do this: use a Docker image that comes with a proper installation of your hardware-specific dependencies (e.g. PyTorch), or add a conda ``environment.yml`` file to your project that specifies the proper version of those dependencies. If you go with first option you don't necessarily need to build your own Docker image. If PyTorch is the only hardware-specific dependency you have, you could just use one of AI2's pre-built PyTorch images. Just add these lines to your ``tango.yml`` file: .. code:: diff executor: type: beaker beaker_workspace: ai2/my-workspace + docker_image: ghcr.io/allenai/pytorch:1.12.0-cuda11.3-python3.9 + venv_name: base clusters: - ai2/general-cirrascale The ``venv_name: base`` line tells the :class:`BeakerExecutor` to use the existing conda environment called "base" on the image instead of creating a new one. Alternatively, you could use the :data:`default image <DEFAULT_BEAKER_IMAGE>` and just add a conda ``environment.yml`` file to the root of your project that looks like this: .. code:: yaml name: torch-env channels: - pytorch dependencies: - python=3.9 - cudatoolkit=11.3 - numpy - pytorch - ... 2. And second, you'll need to specify the GPUs required by each step in the config for that step under the :class:`step_resources <tango.step.StepResources>` parameter. For example, .. code:: json "steps": { "train": { "type": "torch::train", "step_resources": { "gpu_count": 1 } } } """ DEFAULT_BEAKER_IMAGE: str = "ai2/conda" """ The default image. Used if neither ``beaker_image`` nor ``docker_image`` are set. """ DEFAULT_NFS_DRIVE = "/net/nfs.cirrascale" RESOURCE_ASSIGNMENT_WARNING_INTERVAL = 60 * 5 def __init__( self, workspace: Workspace, clusters: Optional[List[str]] = None, include_package: Optional[Sequence[str]] = None, beaker_workspace: Optional[str] = None, github_token: Optional[str] = None, google_token: Optional[str] = None, beaker_image: Optional[str] = None, docker_image: Optional[str] = None, datasets: Optional[List[DataMount]] = None, env_vars: Optional[List[EnvVar]] = None, venv_name: Optional[str] = None, parallelism: Optional[int] = None, install_cmd: Optional[str] = None, priority: Optional[Union[str, Priority]] = None, allow_dirty: bool = False, scheduler: Optional[BeakerScheduler] = None, **kwargs, ): # Pre-validate arguments. if beaker_image is None and docker_image is None: beaker_image = self.DEFAULT_BEAKER_IMAGE elif (beaker_image is None) == (docker_image is None): raise ConfigurationError( "Either 'beaker_image' or 'docker_image' must be specified for BeakerExecutor, but not both." ) from tango.workspaces import LocalWorkspace, MemoryWorkspace if isinstance(workspace, MemoryWorkspace): raise ConfigurationError( "You cannot use the `MemoryWorkspace` with the `BeakerExecutor`! " "Please specify a different workspace." ) elif isinstance(workspace, LocalWorkspace): if str(workspace.dir).startswith(self.DEFAULT_NFS_DRIVE): # Mount the NFS drive if not mount already. datasets = datasets or [] if not datasets or not any( [ dm.source.host_path is not None and dm.source.host_path.startswith(self.DEFAULT_NFS_DRIVE) for dm in datasets ] ): nfs_mount = DataMount.new( self.DEFAULT_NFS_DRIVE, host_path=self.DEFAULT_NFS_DRIVE ) datasets.append(nfs_mount) else: warnings.warn( "It appears that you're using a `LocalWorkspace` on a directory that is not an NFS drive. " "If the `BeakerExecutor` cannot access this directory from Beaker, your results will be lost.", UserWarning, ) super().__init__(workspace, include_package=include_package, parallelism=parallelism) self.max_thread_workers = self.parallelism or min(32, (os.cpu_count() or 1) + 4) self.beaker = get_client(beaker_workspace=beaker_workspace, **kwargs) self.beaker_image = beaker_image self.docker_image = docker_image self.datasets = datasets self.env_vars = env_vars self.venv_name = venv_name self.install_cmd = install_cmd self.allow_dirty = allow_dirty self.scheduler: BeakerScheduler if scheduler is None: if clusters is None: raise ConfigurationError( "Either 'scheduler' or 'clusters' argument to BeakerExecutor is required" ) self.scheduler = SimpleBeakerScheduler(clusters, priority=priority or Priority.normal) else: if clusters is not None: warnings.warn( "The 'clusters' parameter will be ignored since you specified a 'scheduler'", UserWarning, ) if priority is not None: warnings.warn( "The 'priority' parameter will be ignored since you specified a 'scheduler'", UserWarning, ) self.scheduler = scheduler self.scheduler.beaker = self.beaker self._is_cancelled = threading.Event() self._logged_git_info = False self._last_resource_assignment_warning: Optional[float] = None self._jobs = 0 try: self.github_token: str = github_token or os.environ["GITHUB_TOKEN"] except KeyError: raise ConfigurationError( "A GitHub personal access token with the 'repo' scope is required. " "This can be set with the 'github_token' argument to the BeakerExecutor, " "or as the environment variable 'GITHUB_TOKEN'." ) self.google_token = google_token or os.environ.get("GOOGLE_TOKEN") # Check if google auth credentials are in the default location if self.google_token is None and os.path.exists(Constants.DEFAULT_GOOGLE_CREDENTIALS_FILE): self.google_token = Constants.DEFAULT_GOOGLE_CREDENTIALS_FILE # If credentials are provided in the form of a file path, load the credentials # so that they can be used in beaker. Do this only if required, i.e., only if GSWorkspace # is being used. if self.google_token is not None and self.google_token.endswith(".json"): from tango.integrations.gs import GSWorkspace if isinstance(workspace, GSWorkspace): with open(self.google_token) as f: self.google_token = f.read() if self.google_token is None: self.google_token = "default" # Ensure entrypoint dataset exists. self._ensure_entrypoint_dataset() # Get repo info and make sure we're in a GitHub-hosted repository. git = GitMetadata.check_for_repo() if ( git is None or git.commit is None or git.remote is None or "github.com" not in git.remote ): raise ExecutorError( f"Missing git data. " f"BeakerExecutor requires a git repository with a GitHub remote." ) self._github_account, self._github_repo = self._parse_git_remote(git.remote) self._git_commit = git.commit def check_repo_state(self): if not self.allow_dirty: # Make sure repository is clean, if we're in one. try: # Check for uncommitted changes. repo = Repo(".") if repo.is_dirty(): raise ExecutorError( "You have uncommitted changes! Commit your changes or use the 'allow_dirty' option." ) # Check for un-pushed commits. remote_name = repo.remote().name git = Git(".") if git.log([f"{remote_name}..HEAD", "--not", "--remotes", "--oneline"]): raise ExecutorError( "You have unpushed changes! Push your changes or use the 'allow_dirty' option." ) except InvalidGitRepositoryError: raise ExecutorError( "It appears you're not in a valid git repository. " "The Beaker executor requires a git repository." ) except GitCommandError: pass def execute_step_graph( self, step_graph: StepGraph, run_name: Optional[str] = None ) -> ExecutorOutput: import concurrent.futures self.check_repo_state() self._is_cancelled.clear() # These will hold the final results which we'll update along the way. successful: Dict[str, ExecutionMetadata] = {} failed: Dict[str, ExecutionMetadata] = {} not_run: Dict[str, ExecutionMetadata] = {} # Keeps track of steps that are next up to run on Beaker. steps_to_run: Set[str] = set() # These are steps that have been submitted to Beaker but haven't completed yet. submitted_steps: Set[str] = set() # Futures for tracking the Beaker runs for each step. step_futures: List[concurrent.futures.Future] = [] uncacheable_leaf_steps = step_graph.uncacheable_leaf_steps() # These are all of the steps that still need to be run at some point. steps_left_to_run = uncacheable_leaf_steps | { step for step in step_graph.values() if step.cache_results } def update_steps_to_run(): nonlocal steps_to_run, not_run for step_name, step in step_graph.items(): if ( step_name in submitted_steps or step_name in successful or step_name in failed or step_name in not_run ): # Make sure step is no longer in queue. steps_to_run.discard(step_name) # This does NOT raise KeyError if not found else: # Check dependencies. for dependency in step.dependencies: if dependency.name not in successful and dependency.cache_results: if dependency.name in failed or dependency.name in not_run: # A dependency failed or can't be run, so this step can't be run. not_run[step_name] = ExecutionMetadata() steps_to_run.discard(step_name) steps_left_to_run.discard(step) break else: # Dependencies are OK, so we can run this step now. if step.cache_results or step in uncacheable_leaf_steps: steps_to_run.add(step_name) def make_future_done_callback(step_name: str): def future_done_callback(future: concurrent.futures.Future): nonlocal successful, failed, steps_left_to_run self._jobs = max(0, self._jobs - 1) step = step_graph[step_name] try: exc = future.exception() if exc is None: successful[step_name] = ExecutionMetadata( result_location=None if not step.cache_results else self.workspace.step_info(step).result_location, logs_location=future.result(), ) steps_left_to_run.discard(step) elif isinstance(exc, ResourceAssignmentError): submitted_steps.discard(step_name) self._emit_resource_assignment_warning() elif isinstance(exc, StepFailedError): failed[step_name] = ExecutionMetadata(logs_location=exc.experiment_url) steps_left_to_run.discard(step) elif isinstance(exc, (ExecutorError, CancellationError)): failed[step_name] = ExecutionMetadata() steps_left_to_run.discard(step) else: log_exception(exc, logger) failed[step_name] = ExecutionMetadata() steps_left_to_run.discard(step) except concurrent.futures.TimeoutError as exc: log_exception(exc, logger) failed[step_name] = ExecutionMetadata() steps_left_to_run.discard(step) return future_done_callback last_progress_update = time.monotonic() def log_progress(): nonlocal last_progress_update now = time.monotonic() if now - last_progress_update >= 60 * 2: last_progress_update = now waiting_for = [ step_name for step_name in submitted_steps if step_name not in failed and step_name not in successful ] if len(waiting_for) > 5: logger.info( "Waiting for %d steps...", len(waiting_for), ) elif len(waiting_for) > 1: logger.info( "Waiting for %d steps (%s)...", len(waiting_for), "'" + "', '".join(waiting_for) + "'", ) elif len(waiting_for) == 1: logger.info("Waiting for 1 step ('%s')...", list(waiting_for)[0]) still_to_run = [ step.name for step in steps_left_to_run if step.name not in submitted_steps ] if len(still_to_run) > 5: logger.info( "Still waiting to submit %d more steps...", len(still_to_run), ) elif len(still_to_run) > 1: logger.info( "Still waiting to submit %d more steps (%s)...", len(still_to_run), "'" + "', '".join(still_to_run) + "'", ) elif len(still_to_run) == 1: logger.info("Still waiting to submit 1 more step ('%s')...", still_to_run[0]) update_steps_to_run() try: with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_thread_workers) as pool: while steps_left_to_run: # Submit steps left to run. for step_name in steps_to_run: future = pool.submit( self._execute_sub_graph_for_step, step_graph, step_name, True ) future.add_done_callback(make_future_done_callback(step_name)) self._jobs += 1 step_futures.append(future) submitted_steps.add(step_name) if step_futures: # Wait for something to happen. _, not_done = concurrent.futures.wait( step_futures, return_when=concurrent.futures.FIRST_COMPLETED, timeout=2.0, ) # Update the list of running futures. step_futures.clear() step_futures = list(not_done) else: time.sleep(2.0) # Update the step queue. update_steps_to_run() log_progress() except (KeyboardInterrupt, CancellationError): if step_futures: cli_logger.warning("Received interrupt, canceling steps...") self._is_cancelled.set() concurrent.futures.wait(step_futures) raise finally: self._is_cancelled.clear() # NOTE: The 'done callback' added to each future is executed in a thread, # and so might not complete before the last 'update_steps_to_run()' is called # in the loop above. Therefore we have to call 'update_steps_to_run()' # one last time here to ensure the 'not_run' set is up-to-date. update_steps_to_run() return ExecutorOutput(successful=successful, failed=failed, not_run=not_run) def _emit_resource_assignment_warning(self): if self._last_resource_assignment_warning is None or ( time.monotonic() - self._last_resource_assignment_warning > self.RESOURCE_ASSIGNMENT_WARNING_INTERVAL ): self._last_resource_assignment_warning = time.monotonic() logger.warning( "Some steps can't be run yet - waiting on more Beaker resources " "to become available..." ) def _check_if_cancelled(self): if self._is_cancelled.is_set(): raise RunCancelled def _execute_sub_graph_for_step( self, step_graph: StepGraph, step_name: str, in_thread: bool = False, ) -> Optional[str]: if not in_thread: self._is_cancelled.clear() else: self._check_if_cancelled() step = step_graph[step_name] if step.cache_results and step in self.workspace.step_cache: cli_logger.info( '[green]\N{check mark} Found output for step [bold]"%s"[/] in cache...[/]', step_name, ) return None if step.resources.machine == "local": if step.cache_results: step.ensure_result(self.workspace) else: result = step.result(self.workspace) if hasattr(result, "__next__"): from collections import deque deque(result, maxlen=0) return None experiment: Optional[Experiment] = None experiment_url: Optional[str] = None ephemeral_datasets: List[Dataset] = [] # Try to find any existing experiments for this step that are still running. if step.cache_results: for exp in self.beaker.workspace.experiments( match=f"{Constants.STEP_EXPERIMENT_PREFIX}{step.unique_id}-" ): self._check_if_cancelled() try: latest_job = self.beaker.experiment.latest_job(exp) except (ValueError, ExperimentNotFound): continue if latest_job is not None and not latest_job.is_done: experiment = exp experiment_url = self.beaker.experiment.url(exp) cli_logger.info( "[blue]\N{black rightwards arrow} Found existing Beaker experiment [b]%s[/] for " 'step [b]"%s"[/] that is still running...[/]', experiment_url, step_name, ) break # Otherwise we submit a new experiment... if experiment is None: # Initialize experiment and task spec. experiment_name, spec, ephemeral_datasets = self._build_experiment_spec( step_graph, step_name ) self._check_if_cancelled() step.log_starting() # Create experiment. experiment = self.beaker.experiment.create(experiment_name, spec) experiment_url = self.beaker.experiment.url(experiment) cli_logger.info( '[blue]\N{black rightwards arrow} Submitted Beaker experiment [b]%s[/] for step [b]"%s"[/]...[/]', experiment_url, step_name, ) assert experiment is not None assert experiment_url is not None # Follow the experiment until it completes. try: while True: poll_interval = min(60, 5 * min(self._jobs, self.max_thread_workers)) try: self._check_if_cancelled() self.beaker.experiment.wait_for( experiment, strict=True, quiet=True, timeout=poll_interval + 2, poll_interval=poll_interval, ) break except JobTimeoutError: time.sleep(poll_interval) continue except (JobFailedError, TaskStoppedError): cli_logger.error( '[red]\N{ballot x} Step [b]"%s"[/] failed. You can check the logs at [b]%s[/][/]', step_name, experiment_url, ) raise StepFailedError( f'Beaker job for step "{step_name}" failed. ' f"You can check the logs at {experiment_url}", experiment_url, ) except (KeyboardInterrupt, CancellationError): cli_logger.warning( 'Stopping Beaker experiment [cyan]%s[/] for step [b]"%s"[/] (%s)', experiment_url, step_name, step.unique_id, ) self.beaker.experiment.stop(experiment) raise else: step.log_finished() finally: # Remove ephemeral datasets. result_dataset = self.beaker.experiment.results(experiment) if result_dataset is not None: ephemeral_datasets.append(result_dataset) for dataset in ephemeral_datasets: try: self.beaker.dataset.delete(dataset) except DatasetNotFound: pass return experiment_url @staticmethod def _parse_git_remote(url: str) -> Tuple[str, str]: """ Parse a git remote URL into a GitHub (account, repo) pair. """ account, repo = ( url.split("https://github.com/")[-1] .split("git@github.com:")[-1] .split(".git")[0] .split("/") ) return account, repo def _ensure_entrypoint_dataset(self) -> Dataset: import hashlib from importlib.resources import read_binary import tango.integrations.beaker workspace_id = self.beaker.workspace.get().id # Get hash of the local entrypoint source file. sha256_hash = hashlib.sha256() contents = read_binary(tango.integrations.beaker, Constants.ENTRYPOINT_FILENAME) sha256_hash.update(contents) entrypoint_dataset_name = ( f"{Constants.ENTRYPOINT_DATASET_PREFIX}{workspace_id}-{sha256_hash.hexdigest()[:6]}" ) tmp_entrypoint_dataset_name = ( f"{Constants.ENTRYPOINT_DATASET_PREFIX}{str(uuid.uuid4())}-tmp" ) # Ensure entrypoint dataset exists. entrypoint_dataset: Dataset try: entrypoint_dataset = self.beaker.dataset.get(entrypoint_dataset_name) except DatasetNotFound: # Create it. logger.debug(f"Creating entrypoint dataset '{entrypoint_dataset_name}'") try: tmp_entrypoint_dataset = self.beaker.dataset.create( tmp_entrypoint_dataset_name, quiet=True, commit=False ) self.beaker.dataset.upload( tmp_entrypoint_dataset, contents, Constants.ENTRYPOINT_FILENAME, quiet=True ) self.beaker.dataset.commit(tmp_entrypoint_dataset) entrypoint_dataset = self.beaker.dataset.rename( tmp_entrypoint_dataset, entrypoint_dataset_name ) except DatasetConflict: # could be in a race with another `tango` process. time.sleep(1.0) entrypoint_dataset = self.beaker.dataset.get(entrypoint_dataset_name) # Verify contents. err_msg = ( f"Checksum failed for entrypoint dataset {self.beaker.dataset.url(entrypoint_dataset)}\n" f"This could be a bug, or it could mean someone has tampered with the dataset.\n" f"If you're sure no one has tampered with it, you can delete the dataset from " f"the Beaker dashboard and try again." ) file_info = self.beaker.dataset.file_info(entrypoint_dataset, Constants.ENTRYPOINT_FILENAME) if file_info.digest is not None and file_info.digest != Digest.from_decoded( sha256_hash.digest(), "SHA256" ): raise ExecutorError(err_msg) return entrypoint_dataset def _ensure_step_graph_dataset(self, step_graph: StepGraph) -> Dataset: step_graph_dataset_name = f"{Constants.STEP_GRAPH_ARTIFACT_PREFIX}{str(uuid.uuid4())}" try: dataset = self.beaker.dataset.create(step_graph_dataset_name, quiet=True, commit=False) self.beaker.dataset.upload( dataset, json.dumps({"steps": step_graph.to_config(include_unique_id=True)}).encode(), Constants.STEP_GRAPH_FILENAME, quiet=True, ) self.beaker.dataset.commit(dataset) except DatasetConflict: # could be in a race with another `tango` process. time.sleep(1.0) dataset = self.beaker.dataset.get(step_graph_dataset_name) return dataset def _build_experiment_spec( self, step_graph: StepGraph, step_name: str ) -> Tuple[str, ExperimentSpec, List[Dataset]]: from tango.common.logging import TANGO_LOG_LEVEL step = step_graph[step_name] sub_graph = step_graph.sub_graph(step_name) step_info = self.workspace.step_info(step) experiment_name = ( f"{Constants.STEP_EXPERIMENT_PREFIX}{step.unique_id}-{str(uuid.uuid4())[:8]}" ) github_account, github_repo, git_ref = ( self._github_account, self._github_repo, self._git_commit, ) if not self._logged_git_info: self._logged_git_info = True cli_logger.info( "[blue]Using source code from " "[b]https://github.com/%s/%s/commit/%s[/] to run steps on Beaker[/]", github_account, github_repo, git_ref, ) # Get cluster, resources, and priority to use. clusters, task_resources, priority = self.scheduler.schedule(step) self._check_if_cancelled() # Ensure dataset with the entrypoint script exists and get it. entrypoint_dataset = self._ensure_entrypoint_dataset() self._check_if_cancelled() # Create dataset for step graph. step_graph_dataset = self._ensure_step_graph_dataset(sub_graph) self._check_if_cancelled() # Write the GitHub token secret. self.beaker.secret.write(Constants.GITHUB_TOKEN_SECRET_NAME, self.github_token) self._check_if_cancelled() # Write the Beaker token secret. self.beaker.secret.write(Constants.BEAKER_TOKEN_SECRET_NAME, self.beaker.config.user_token) self._check_if_cancelled() # Write the Google Cloud token secret. if self.google_token is not None: self.beaker.secret.write(Constants.GOOGLE_TOKEN_SECRET_NAME, self.google_token) self._check_if_cancelled() # Build Tango command to run. command = [ "tango", "--log-level", "debug", "--called-by-executor", "beaker-executor-run", Constants.INPUT_DIR + "/" + Constants.STEP_GRAPH_FILENAME, step.name, self.workspace.url, ] if self.include_package is not None: for package in self.include_package: command += ["-i", package, "--log-level", TANGO_LOG_LEVEL or "debug"] self._check_if_cancelled() # Ignore the patch version. # E.g. '3.9.7' -> '3.9' python_version = step_info.environment.python python_version = python_version[: python_version.find(".", python_version.find(".") + 1)] # Build task spec. task_spec = ( TaskSpec.new( step.unique_id, beaker_image=self.beaker_image, docker_image=self.docker_image, result_path=Constants.RESULTS_DIR, command=["bash", Constants.ENTRYPOINT_DIR + "/" + Constants.ENTRYPOINT_FILENAME], arguments=command, resources=task_resources, datasets=self.datasets, env_vars=self.env_vars, priority=priority, ) .with_constraint(cluster=[clusters] if isinstance(clusters, str) else clusters) .with_env_var(name="TANGO_VERSION", value=VERSION) .with_env_var(name="GITHUB_TOKEN", secret=Constants.GITHUB_TOKEN_SECRET_NAME) .with_env_var(name="BEAKER_TOKEN", secret=Constants.BEAKER_TOKEN_SECRET_NAME) .with_env_var(name="GOOGLE_TOKEN", secret=Constants.GOOGLE_TOKEN_SECRET_NAME) .with_env_var(name="GITHUB_REPO", value=f"{github_account}/{github_repo}") .with_env_var(name="GIT_REF", value=git_ref) .with_env_var(name="PYTHON_VERSION", value=python_version) .with_env_var(name="BEAKER_EXPERIMENT_NAME", value=experiment_name) .with_dataset(Constants.ENTRYPOINT_DIR, beaker=entrypoint_dataset.id) .with_dataset(Constants.INPUT_DIR, beaker=step_graph_dataset.id) ) if self.venv_name is not None: task_spec = task_spec.with_env_var(name="VENV_NAME", value=self.venv_name) if self.install_cmd is not None: task_spec = task_spec.with_env_var(name="INSTALL_CMD", value=self.install_cmd) return ( experiment_name, ExperimentSpec( tasks=[task_spec], description=f'Tango step "{step_name}" ({step.unique_id})' ), [step_graph_dataset], )