Workspace#

Base class#

class tango.workspace.Workspace[source]#

A workspace is a place for Tango to put the results of steps, intermediate results, and various other pieces of metadata. If you don’t want to worry about all that, do nothing and Tango will use the default LocalWorkspace that puts everything into a directory of your choosing.

If you want to do fancy things like store results in the cloud, share state across machines, etc., this is your integration point.

If you got here solely because you want to share results between machines, consider that LocalWorkspace works fine on an NFS drive.

capture_logs_for_run(name)[source]#

Should return a context manager that can be used to capture the logs for a run.

By default, this doesn’t do anything.

Return type:

ContextManager[None]

Examples

The LocalWorkspace implementation uses this method to capture logs to a file in the workspace’s directory using the file_handler() context manager, similar to this:

from tango.common.logging import file_handler
from tango.workspace import Workspace

class MyLocalWorkspace(Workspace):
    def capture_logs_for_run(self, name: str):
        return file_handler("/path/to/workspace/" + name + ".log")
abstract classmethod from_parsed_url(parsed_url)[source]#

Subclasses should override this so that can be initialized from a URL.

Parameters:

parsed_url (ParseResult) – The parsed URL object.

Return type:

Workspace

classmethod from_url(url)[source]#

Initialize a Workspace from a workspace URL or path, e.g. local:///tmp/workspace would give you a LocalWorkspace in the directory /tmp/workspace.

For LocalWorkspace, you can also just pass in a plain path, e.g. /tmp/workspace. :rtype: Workspace

Tip

Registered as a workspace constructor under the name “from_url”.

num_registered_runs(*, match=None)[source]#

Get the number of registered runs.

Parameters:

match (Optional[str], default: None) – Only count runs with a name matching this string.

Return type:

int

num_steps(*, match=None, state=None)[source]#

Get the total number of registered steps.

Parameters:
  • match (Optional[str], default: None) – Only count steps with a unique ID matching this string.

  • state (Optional[StepState], default: None) – Only count steps that are in the given state.

Return type:

int

abstract register_run(targets, name=None)[source]#

Register a run in the workspace. A run is a set of target steps that a user wants to execute.

Parameters:
  • targets (Iterable[Step]) – The steps that the user wants to execute. This could come from a StepGraph.

  • name (Optional[str], default: None) – A name for the run. Runs must have unique names. If not given, this method invents a name and returns it.

Return type:

Run

Returns:

The run object

abstract registered_run(name)[source]#

Returns the run with the given name

Return type:

Run

Returns:

A Run object representing the named run

Raises:

KeyError – If there is no run with the given name.

abstract registered_runs()[source]#

Returns all runs in the workspace

Return type:

Dict[str, Run]

Returns:

A dictionary mapping run names to Run objects

abstract remove_step(step_unique_id)[source]#

Removes cached step using the given unique step id :raises KeyError: If there is no step with the given name.

search_registered_runs(*, sort_by=None, sort_descending=True, match=None, start=0, stop=None)[source]#

Search through registered runs in the workspace.

This method is primarily meant to be used to implement a UI, and workspaces don’t necessarily need to implement all sort_by or filter operations. They should only implement those that can be done efficiently.

Note

The data type returned in the list here is RunInfo, which contains a subset of the data in the Run type.

Parameters:
  • sort_by (Optional[RunSort], default: None) – The field to sort the results by.

  • sort_descending (bool, default: True) – Sort the results in descending order of the sort_by field.

  • match (Optional[str], default: None) – Only return results with a name matching this string.

  • start (int, default: 0) – Start from a certain index in the results.

  • stop (Optional[int], default: None) – Stop at a certain index in the results.

Raises:

NotImplementedError – If a workspace doesn’t support an efficient implementation for the given sorting/filtering criteria.

Return type:

List[RunInfo]

search_step_info(*, sort_by=None, sort_descending=True, match=None, state=None, start=0, stop=None)[source]#

Search through steps in the workspace.

This method is primarily meant to be used to implement a UI, and workspaces don’t necessarily need to implement all sort_by or filter operations. They should only implement those that can be done efficiently.

Parameters:
  • sort_by (Optional[StepInfoSort], default: None) – The field to sort the results by.

  • sort_descending (bool, default: True) – Sort the results in descending order of the sort_by field.

  • match (Optional[str], default: None) – Only return steps with a unique ID matching this string.

  • state (Optional[StepState], default: None) – Only return steps that are in the given state.

  • start (int, default: 0) – Start from a certain index in the results.

  • stop (Optional[int], default: None) – Stop at a certain index in the results.

Raises:

NotImplementedError – If a workspace doesn’t support an efficient implementation for the given sorting/filtering criteria.

Return type:

List[StepInfo]

abstract step_failed(step, e)[source]#

The Step class calls this when a step failed.

Parameters:
Raises:

StepStateError – If the step is in an unexpected state (e.g. RUNNING).

Return type:

None

abstract step_finished(step, result)[source]#

The Step class calls this when a step finished running.

Parameters:

step (Step) – The step that finished.

Raises:

StepStateError – If the step is in an unexpected state (e.g. RUNNING).

Return type:

TypeVar(T)

This method is given the result of the step’s Step.run() method. It is expected to return that result. This gives it the opportunity to make changes to the result if necessary. For example, if the Step.run() method returns an iterator, that iterator would be consumed when it’s written to the cache. So this method can handle the situation and return something other than the now-consumed iterator.

abstract step_info(step_or_unique_id)[source]#

Returns a StepInfo for a given step.

Raises:

KeyError – If the corresponding step info cannot be found or created. This should never happen if you pass a Step object to this method since a StepInfo can always be created from a Step.

Return type:

StepInfo

step_result(step_name)[source]#

Get the result of a step from the latest run with a step by that name.

Raises:

KeyError – If there is no run with the given step.

Return type:

Any

step_result_for_run(run_name, step_name)[source]#

Get the result of a step from a run.

Raises:

KeyError – If there is no run or step with the given name.

Return type:

Any

abstract step_starting(step)[source]#

The Step class calls this when a step is about to start running.

Parameters:

step (Step) – The step that is about to start.

Raises:

StepStateError – If the step is in an unexpected state (e.g. RUNNING).

Return type:

None

work_dir(step)[source]#

Steps that can be restarted (like a training job that gets interrupted half-way through) must save their state somewhere. A StepCache can help by providing a suitable location in this method.

By default, the step dir is a temporary directory that gets cleaned up after every run. This effectively disables restartability of steps.

Return type:

Path

abstract property step_cache: StepCache#

A StepCache to store step results in

abstract property url: str#

Get a URL for the workspace that can be used to instantiate the same workspace using from_url().

Implementations#

class tango.workspaces.LocalWorkspace(dir)[source]#

This is a Workspace that keeps all its data in a local directory. This works great for single-machine jobs, or for multiple machines in a cluster if they can all access the same NFS drive.

Parameters:

dir (Union[str, PathLike]) – The directory to store all the data in

The directory will have three subdirectories, cache/ for the step cache, runs/ for the runs, and latest/ for the results of the latest run. For the format of the cache/ directory, refer to LocalStepCache. The runs/ directory will contain one subdirectory for each registered run. Each one of those contains a symlink from the name of the step to the results directory in the step cache. Note that LocalWorkspace creates these symlinks even for steps that have not finished yet. You can tell the difference because either the symlink points to a directory that doesn’t exist, or it points to a directory in the step cache that doesn’t contain results.

Tip

Registered as a Workspace under the name “local”.

You can also instantiate this workspace from a URL with the scheme local://. For example, Workspace.from_url("local:///tmp/workspace") gives you a LocalWorkspace in the directory /tmp/workspace.

class tango.workspaces.MemoryWorkspace[source]#

This is a workspace that keeps all its data in memory. This is useful for debugging or for quick jobs, but of course you don’t get any caching across restarts.

Tip

Registered as a Workspace under the name “memory”.

Metadata#

class tango.workspace.Run(name, steps, start_date)[source]#

Stores information about a single Tango run.

name: str#

The name of the run

start_date: datetime#

The time at which the run was registered in the workspace.

steps: Dict[str, StepInfo]#

A mapping from step names to StepInfo, for all the target steps in the run.

This only contains the targets of a run. Usually, that means it contains all named steps. Un-named dependencies (or dependencies that are not targets) are not contained in steps.

class tango.workspace.RunInfo(name, steps=None, start_date=None)[source]#

Stores partial data about a run. This is the type that you get back from Workspace.search_registered_runs(). The data here is a subset of the data in the Run type because not all workspaces can fetch all of the data in the Run type efficiently.

name: str#

The name of the run.

start_date: Optional[datetime] = None#

The time at which the run was registered in the workspace.

steps: Optional[Dict[str, str]] = None#

The steps within the run. An optional mapping of step name to step unique ID.

Miscellaneous#

class tango.workspace.RunSort(value)[source]#

An enumeration.

class tango.workspace.StepInfoSort(value)[source]#

An enumeration.