Step#

Base class#

class tango.step.Step(step_name=None, cache_results=None, step_format=None, step_config=None, step_unique_id_override=None, step_resources=None, step_metadata=None, step_extra_dependencies=None, **kwargs)[source]#

This class defines one step in your experiment. To write your own step, derive from this class and overwrite the run() method. The run() method must have parameters with type hints.

Step.__init__() takes all the arguments we want to run the step with. They get passed to run() (almost) as they are. If the arguments are other instances of Step, those will be replaced with the step’s results before calling run(). Further, there are four special parameters:

Parameters:
  • step_name (Optional[str], default: None) – contains an optional human-readable name for the step. This name is used for error messages and the like, and has no consequence on the actual computation.

  • cache_results (Optional[bool], default: None) – specifies whether the results of this step should be cached. If this is False, the step is recomputed every time it is needed. If this is not set at all, and CACHEABLE is True, we cache if the step is marked as DETERMINISTIC, and we don’t cache otherwise.

  • step_format (Optional[Format], default: None) – gives you a way to override the step’s default format (which is given in FORMAT).

  • step_config (Union[Dict[str, Any], Params, None], default: None) – is the original raw part of the experiment config corresponding to this step. This can be accessed via the config property within each step’s run() method.

  • step_unique_id_override (Optional[str], default: None) – overrides the construction of the step’s unique id using the hash of inputs.

  • step_resources (Optional[StepResources], default: None) – gives you a way to set the minimum compute resources required to run this step. Certain executors require this information.

  • step_metadata (Optional[Dict[str, Any]], default: None) – use this to specify additional metadata for your step. This is added to the METADATA class variable to form the self.metadata attribute. Values in step_metadata take precedence over METADATA.

  • step_extra_dependencies (Optional[Iterable[Step]], default: None) – use this to force a dependency on other steps. Normally dependencies between steps are determined by the inputs and outputs of the steps, but you can use this parameter to force that other steps run before this step even if this step doesn’t explicitly depend on the outputs of those steps.

Important

Overriding the unique id means that the step will always map to this value, regardless of the inputs, and therefore, the step cache will only hold a single copy of the step’s output (from the last execution). Thus, in most cases, this should not be used when constructing steps. We include this option for the case when the executor creates subprocesses, which also need to access the same Step object.

__eq__(other)[source]#

Determines whether this step is equal to another step. Two steps with the same unique ID are considered identical.

__hash__()[source]#

A step’s hash is just its unique ID.

__init__(step_name=None, cache_results=None, step_format=None, step_config=None, step_unique_id_override=None, step_resources=None, step_metadata=None, step_extra_dependencies=None, **kwargs)[source]#
__str__()[source]#

Return str(self).

det_hash_object()[source]#

Returns a tuple of VERSION and this instance itself.

Return type:

Any

ensure_result(workspace=None)[source]#

This makes sure that the result of this step is in the cache. It does not return the result.

Return type:

None

classmethod massage_kwargs(kwargs)[source]#

Override this method in your step if you want to change the step’s arguments before they are passed to the run() method.

This can be useful if you want to normalize arguments that are passed to your step. For example, you might not care about the case of a string that’s passed in. You can lowercase the string in this method, and the step will function as if it had been created with a lowercase string from the start. This way you can make sure that the step’s unique ID does not change when the case of the input changes.

Note

When the input to a step is another step, this method will see the step in the input, not the other step’s result.

Warning

This is an advanced feature of Tango that you won’t need most of the time.

By default, this method does nothing and just returns its input unchanged.

Parameters:

kwargs (Dict[str, Any]) – The original kwargs that were passed to the step during construction.

Return type:

Dict[str, Any]

Returns:

New kwargs that will be passed to the step’s run() method.

result(workspace=None, needed_by=None)[source]#

Returns the result of this step. If the results are cached, it returns those. Otherwise it runs the step and returns the result from there.

If necessary, this method will first produce the results of all steps it depends on.

Return type:

TypeVar(T)

abstract run(**kwargs)[source]#

Execute the step’s action.

This method needs to be implemented when creating a Step subclass, but it shouldn’t be called directly. Instead, call result().

Return type:

TypeVar(T)

CACHEABLE: Optional[bool] = None#

This provides a direct way to turn off caching. For example, a step that reads a HuggingFace dataset doesn’t need to be cached, because HuggingFace datasets already have their own caching mechanism. But it’s still a deterministic step, and all following steps are allowed to cache. If it is None, the step figures out by itself whether it should be cacheable or not.

DETERMINISTIC: bool = True#

This describes whether this step can be relied upon to produce the same results every time when given the same inputs. If this is False, you can still cache the output of the step, but the results might be unexpected. Tango will print a warning in this case.

FORMAT: Format = <tango.format.DillFormat object>#

This specifies the format the results of this step will be serialized in. See the documentation for Format for details.

METADATA: Dict[str, Any] = {}#

Arbitrary metadata about the step.

SKIP_DEFAULT_ARGUMENTS: Dict[str, Any] = {}#

Sometimes, you want to add another argument to your run() method, but you don’t want to invalidate the cache when this new argument is set to its default value. If that is the case, add the argument to this dictionary with the default value that should be ignored.

SKIP_ID_ARGUMENTS: Set[str] = {}#

If your run() method takes some arguments that don’t affect the results, list them here. Arguments listed here will not be used to calculate this step’s unique ID, and thus changing those arguments does not invalidate the cache.

For example, you might use this for the batch size in an inference step, where you only care about the model output, not about how many outputs you can produce at the same time.

VERSION: Optional[str] = None#

This is optional, but recommended. Specifying a version gives you a way to tell Tango that a step has changed during development, and should now be recomputed. This doesn’t invalidate the old results, so when you revert your code, the old cache entries will stick around and be picked up.

property config: Dict[str, Any]#

The configuration parameters that were used to construct the step. This can be empty if the step was not constructed from a configuration file.

property dependencies: Set[Step]#

Returns a set of steps that this step depends on. This does not return recursive dependencies.

property logger: Logger#

A logging.Logger that can be used within the run() method.

property recursive_dependencies: Set[Step]#

Returns a set of steps that this step depends on. This returns recursive dependencies.

property resources: StepResources#

Defines the minimum compute resources required to run this step. Certain executors require this information in order to allocate resources for each step.

You can set this with the step_resources argument to Step or you can override this method to automatically define the required resources.

property unique_id: str#

Returns the unique ID for this step.

Unique IDs are of the shape $class_name-$version-$hash, where the hash is the hash of the inputs for deterministic steps, and a random string of characters for non-deterministic ones.

property work_dir: Path#

The working directory that a step can use while its :meth:run() method runs.

This is a convenience property for you to call inside your run() method.

This directory stays around across restarts. You cannot assume that it is empty when your step runs, but you can use it to store information that helps you restart a step if it got killed half-way through the last time it ran.

property workspace: Workspace#

The Workspace being used.

This is a convenience property for you to call inside your run() method.

tango.step.step(name=None, *, exist_ok=False, bind=False, deterministic=True, cacheable=None, version=None, format=<tango.format.DillFormat object>, skip_id_arguments=None, metadata=None)[source]#

A decorator to create a Step from a function.

Parameters:
  • name (Optional[str], default: None) – A name to register the step under. By default the name of the function is used.

  • exist_ok (bool, default: False) – If True, overwrites any existing step registered under the same name. Else, throws an error if a step is already registered under name.

  • bind (bool, default: False) – If True, the first argument passed to the step function will be the underlying Step instance, i.e. the function will be called as an instance method. In this case you must name the first argument ‘self’ or you will get a ConfigurationError when instantiating the class.

See the Step class for an explanation of the other parameters.

Example

from tango import step

@step(version="001")
def add(a: int, b: int) -> int:
    return a + b

@step(bind=True)
def bound_step(self) -> None:
    assert self.work_dir.is_dir()
class tango.step.WithUnresolvedSteps(function, *args, **kwargs)[source]#

This is a helper class for some scenarios where steps depend on other steps.

Let’s say we have two steps, ConsumeDataStep and ProduceDataStep. The easiest way to make ConsumeDataStep depend on ProduceDataStep is to specify Produce as one of the arguments to the step. This works when Consume takes the output of Produce directly, or if it takes it inside standard Python container, like a list, set, or dictionary.

But what if the output of ConsumeDataStep needs to be added to a complex, custom data structure? WithUnresolvedSteps takes care of this scenario.

For example, this works without any help:

class ProduceDataStep(Step[MyDataClass]):
    def run(self, ...) -> MyDataClass
        ...
        return MyDataClass(...)

class ConsumeDataStep(Step):
    def run(self, input_data: MyDataClass):
        ...

produce = ProduceDataStep()
consume = ConsumeDataStep(input_data = produce)

This scenario needs help:

@dataclass
class DataWithTimestamp:
    data: MyDataClass
    timestamp: float

class ProduceDataStep(Step[MyDataClass]):
    def run(self, ...) -> MyDataClass
        ...
        return MyDataClass(...)

class ConsumeDataStep(Step):
    def run(self, input_data: DataWithTimestamp):
        ...

produce = ProduceDataStep()
consume = ConsumeDataStep(
    input_data = DataWithTimestamp(produce, time.now())
)

That does not work, because DataWithTimestamp needs an object of type MyDataClass, but we’re giving it an object of type Step[MyDataClass]. Instead, we change the last line to this:

consume = ConsumeDataStep(
    input_data = WithUnresolvedSteps(
        DataWithTimestamp, produce, time.now()
    )
)

WithUnresolvedSteps will delay calling the constructor of DataWithTimestamp until the run() method runs. Tango will make sure that the results from the produce step are available at that time, and replaces the step in the arguments with the step’s results.

Parameters:
  • function – The function to call after resolving steps to their results.

  • args – The args to pass to the function. These may contain steps, which will be resolved before the function is called.

  • kwargs – The kwargs to pass to the function. These may contain steps, which will be resolved before the function is called.

construct(workspace)[source]#

Replaces all steps in the args that are stored in this object, and calls the function with those args.

Parameters:

workspace (Workspace) – The Workspace in which to resolve all the steps.

Returns:

The result of calling the function.

det_hash_object()[source]#

Return an object to use for deterministic hashing instead of self.

Return type:

Any

classmethod with_resolved_steps(o, workspace)[source]#

Recursively goes through a Python object and replaces all instances of Step with the results of that step.

Parameters:
  • o (Any) – The Python object to go through

  • workspace (Workspace) – The workspace in which to resolve all steps

Returns:

A new object that’s a copy of the original object, with all instances of Step replaced with the results of the step.

class tango.step.StepResources(machine=None, cpu_count=None, gpu_count=None, gpu_type=None, memory=None, shared_memory=None)[source]#

TaskResources describe minimum external hardware requirements which must be available for a step to run.

cpu_count: Optional[float] = None#

Minimum number of logical CPU cores. It may be fractional.

Examples: 4, 0.5.

gpu_count: Optional[int] = None#

Minimum number of GPUs. It must be non-negative.

gpu_type: Optional[str] = None#

The type of GPU that the step requires.

The exact string you should use to define a GPU type depends on the executor. With the Beaker executor, for example, you should use the same strings you see in the Beaker UI, such as ‘NVIDIA A100-SXM-80GB’.

machine: Optional[str] = None#

This is an executor-dependent option.

With the Beaker executor, for example, you can set this to “local” to force the executor to run the step locally instead of on Beaker.

memory: Optional[str] = None#

Minimum available system memory as a number with unit suffix.

Examples: 2.5GiB, 1024m.

shared_memory: Optional[str] = None#

Size of /dev/shm as a number with unit suffix.

Examples: 2.5GiB, 1024m.

Implementations#

Built-in Step implementations that are not tied to any particular integration.

class tango.steps.DatasetCombineStep(step_name=None, cache_results=None, step_format=None, step_config=None, step_unique_id_override=None, step_resources=None, step_metadata=None, step_extra_dependencies=None, **kwargs)[source]#

This step combines multiple DatasetDict s into one.

Tip

Registered as a Step under the name “dataset_combine”.

Examples

input1 = DatasetDict({
    "train": list(range(10)),
    "dev": list(range(10, 15)),
})
input2 = DatasetDict({
    "train": list(range(15, 25)),
    "val": list(range(25, 30)),
})
combined = DatasetCombineStep(inputs=[input1, input2])
combined_dataset = combined.result()
run(inputs, shuffle=False, random_seed=1532637578)[source]#

Combines multiple datasets into one. This is done lazily, so all operations are fast.

If a split is present in more than one input dataset, the output dataset will have a split that’s the concatenation of the input splits.

Parameters:
  • inputs (List[DatasetDict]) – The list of input datasets that will be combined.

  • shuffle (bool, default: False) – Whether to shuffle the combined datasets. If you don’t do this, the new splits will contain first all the instances from one dataset, and then all the instances from another dataset.

  • random_seed (int, default: 1532637578) – Random seed, affects shuffling

Return type:

DatasetDict

Returns:

Returns a new dataset that is the combination of the input datasets.

CACHEABLE: Optional[bool] = False#

This provides a direct way to turn off caching. For example, a step that reads a HuggingFace dataset doesn’t need to be cached, because HuggingFace datasets already have their own caching mechanism. But it’s still a deterministic step, and all following steps are allowed to cache. If it is None, the step figures out by itself whether it should be cacheable or not.

DETERMINISTIC: bool = True#

This describes whether this step can be relied upon to produce the same results every time when given the same inputs. If this is False, you can still cache the output of the step, but the results might be unexpected. Tango will print a warning in this case.

VERSION: Optional[str] = '001'#

This is optional, but recommended. Specifying a version gives you a way to tell Tango that a step has changed during development, and should now be recomputed. This doesn’t invalidate the old results, so when you revert your code, the old cache entries will stick around and be picked up.

class tango.steps.DatasetRemixStep(step_name=None, cache_results=None, step_format=None, step_config=None, step_unique_id_override=None, step_resources=None, step_metadata=None, step_extra_dependencies=None, **kwargs)[source]#

This step can remix splits in a DatasetDict into new splits.

Tip

Registered as a Step under the name “dataset_remix”.

Examples

input = DatasetDict({
    "train": list(range(10)),
    "dev": list(range(10, 15)),
})
new_splits = {
    "all": "train + dev",
    "crossval_train": "train[0:5] + train[7:]",
    "crossval_test": "train[5:7]",
}
remix_step = DatasetRemixStep(input=input, new_splits=new_splits)
remixed_dataset = remix_step.result()
run(input, new_splits, keep_old_splits=True, shuffle_before=False, shuffle_after=False, random_seed=1532637578)[source]#

Remixes and shuffles a dataset. This is done lazily, so all operations are fast.

Parameters:
  • input (DatasetDict) – The input dataset that will be remixed.

  • new_splits (Dict[str, str]) –

    Specifies the new splits that the output dataset should have. Keys are the name of the new splits. Values refer to the original splits. You can refer to original splits in the following ways:

    • Mention the original split name to copy it to a new name.

    • Mention the original split name with Python’s slicing syntax to select part of the original split’s instances. For example, "train[:1000]" selects the first 1000 instances from the "train" split.

    • "instances + instances" concatenates the instances into one split.

    You can combine these possibilities.

  • keep_old_splits (bool, default: True) – Whether to keep the splits from the input dataset in addition to the new ones given by new_splits.

  • shuffle_before (bool, default: False) –

    Whether to shuffle the input splits before creating the new ones.

    If you need shuffled instances and you’re not sure the input is properly shuffled, use this.

  • shuffle_after (bool, default: False) –

    Whether to shuffle the input splits after creating the new ones.

    If you need shuffled instances and you’re slicing or concatenating splits, use this.

    If you want to be on the safe side, shuffle both before and after. Shuffling is a cheap operation.

  • random_seed (int, default: 1532637578) – Random seed, affects shuffling

Return type:

DatasetDict

Returns:

Returns a new dataset that is appropriately remixed.

CACHEABLE: Optional[bool] = False#

This provides a direct way to turn off caching. For example, a step that reads a HuggingFace dataset doesn’t need to be cached, because HuggingFace datasets already have their own caching mechanism. But it’s still a deterministic step, and all following steps are allowed to cache. If it is None, the step figures out by itself whether it should be cacheable or not.

DETERMINISTIC: bool = True#

This describes whether this step can be relied upon to produce the same results every time when given the same inputs. If this is False, you can still cache the output of the step, but the results might be unexpected. Tango will print a warning in this case.

VERSION: Optional[str] = '001'#

This is optional, but recommended. Specifying a version gives you a way to tell Tango that a step has changed during development, and should now be recomputed. This doesn’t invalidate the old results, so when you revert your code, the old cache entries will stick around and be picked up.

class tango.steps.PrintStep(step_name=None, cache_results=None, step_format=None, step_config=None, step_unique_id_override=None, step_resources=None, step_metadata=None, step_extra_dependencies=None, **kwargs)[source]#

This step just logs or prints its input and also returns what it prints.

run(input)[source]#

Print out the input.

Return type:

str

CACHEABLE: Optional[bool] = False#

This provides a direct way to turn off caching. For example, a step that reads a HuggingFace dataset doesn’t need to be cached, because HuggingFace datasets already have their own caching mechanism. But it’s still a deterministic step, and all following steps are allowed to cache. If it is None, the step figures out by itself whether it should be cacheable or not.

DETERMINISTIC: bool = True#

This describes whether this step can be relied upon to produce the same results every time when given the same inputs. If this is False, you can still cache the output of the step, but the results might be unexpected. Tango will print a warning in this case.

class tango.steps.ShellStep(step_name=None, cache_results=None, step_format=None, step_config=None, step_unique_id_override=None, step_resources=None, step_metadata=None, step_extra_dependencies=None, **kwargs)[source]#

This step runs a shell command, and returns the standard output as a string.

Tip

Registered as a Step under the name “shell_step”.

Parameters:
  • shell_command – The shell command to run.

  • output_path – The step makes no assumptions about the command being run. If your command produces some output, you can optionally specify the output path for recording the output location, and optionally validating it. See validate_output argument for this.

  • validate_output – If an expected output_path has been specified, you can choose to validate that the step produced the correct output. By default, it will just check if the output_path exists, but you can pass any other validating function. For example, if your command is a script generating a model output, you can check if the model weights can be loaded.

  • kwargs – Other kwargs to be passed to subprocess.run(). If you need to take advantage of environment variables, set shell = True.

run(shell_command, output_path=None, validate_output=<tango.common.registrable.make_registrable.<locals>.function_wrapper.<locals>.WrapperFunc object>, **kwargs)[source]#

Execute the step’s action.

This method needs to be implemented when creating a Step subclass, but it shouldn’t be called directly. Instead, call result().