Step#

Base class#

class tango.step.Step(step_name=None, cache_results=None, step_format=None, step_config=None, step_unique_id_override=None, step_resources=None, **kwargs)[source]#

This class defines one step in your experiment. To write your own step, derive from this class and overwrite the run() method. The run() method must have parameters with type hints.

Step.__init__() takes all the arguments we want to run the step with. They get passed to run() (almost) as they are. If the arguments are other instances of Step, those will be replaced with the step’s results before calling run(). Further, there are four special parameters:

Parameters
  • step_name (Optional[str], default: None) – contains an optional human-readable name for the step. This name is used for error messages and the like, and has no consequence on the actual computation.

  • cache_results (Optional[bool], default: None) – specifies whether the results of this step should be cached. If this is False, the step is recomputed every time it is needed. If this is not set at all, and CACHEABLE is True, we cache if the step is marked as DETERMINISTIC, and we don’t cache otherwise.

  • step_format (Optional[Format], default: None) – gives you a way to override the step’s default format (which is given in FORMAT).

  • step_config (Optional[Dict[str, Any]], default: None) – is the original raw part of the experiment config corresponding to this step. This can be accessed via the config property within each step’s run() method.

  • step_unique_id_override (Optional[str], default: None) – overrides the construction of the step’s unique id using the hash of inputs.

  • step_resources (Optional[StepResources], default: None) – gives you a way to set the minimum compute resources required to run this step. Certain executors require this information.

Important

Overriding the unique id means that the step will always map to this value, regardless of the inputs, and therefore, the step cache will only hold a single copy of the step’s output (from the last execution). Thus, in most cases, this should not be used when constructing steps. We include this option for the case when the executor creates subprocesses, which also need to access the same Step object.

__eq__(other)[source]#

Determines whether this step is equal to another step. Two steps with the same unique ID are considered identical.

__hash__()[source]#

A step’s hash is just its unique ID.

__init__(step_name=None, cache_results=None, step_format=None, step_config=None, step_unique_id_override=None, step_resources=None, **kwargs)[source]#
__str__()[source]#

Return str(self).

det_hash_object()[source]#

Returns a tuple of VERSION and this instance itself.

Return type

Any

ensure_result(workspace=None)[source]#

This makes sure that the result of this step is in the cache. It does not return the result.

Return type

None

classmethod massage_kwargs(kwargs)[source]#

Override this method in your step if you want to change the step’s arguments before they are passed to the run() method.

This can be useful if you want to normalize arguments that are passed to your step. For example, you might not care about the case of a string that’s passed in. You can lowercase the string in this method, and the step will function as if it had been created with a lowercase string from the start. This way you can make sure that the step’s unique ID does not change when the case of the input changes.

Note

When the input to a step is another step, this method will see the step in the input, not the other step’s result.

Warning

This is an advanced feature of Tango that you won’t need most of the time.

By default, this method does nothing and just returns its input unchanged.

Parameters

kwargs (Dict[str, Any]) – The original kwargs that were passed to the step during construction.

Return type

Dict[str, Any]

Returns

New kwargs that will be passed to the step’s run() method.

result(workspace=None, needed_by=None)[source]#

Returns the result of this step. If the results are cached, it returns those. Otherwise it runs the step and returns the result from there.

If necessary, this method will first produce the results of all steps it depends on.

Return type

TypeVar(T)

abstract run(**kwargs)[source]#

Execute the step’s action.

This method needs to be implemented when creating a Step subclass, but it shouldn’t be called directly. Instead, call result().

Return type

TypeVar(T)

CACHEABLE: Optional[bool] = None#

This provides a direct way to turn off caching. For example, a step that reads a HuggingFace dataset doesn’t need to be cached, because HuggingFace datasets already have their own caching mechanism. But it’s still a deterministic step, and all following steps are allowed to cache. If it is None, the step figures out by itself whether it should be cacheable or not.

DETERMINISTIC: bool = True#

This describes whether this step can be relied upon to produce the same results every time when given the same inputs. If this is False, you can still cache the output of the step, but the results might be unexpected. Tango will print a warning in this case.

FORMAT: Format = <tango.format.DillFormat object>#

This specifies the format the results of this step will be serialized in. See the documentation for Format for details.

SKIP_ID_ARGUMENTS: Set[str] = {}#

If your run() method takes some arguments that don’t affect the results, list them here. Arguments listed here will not be used to calculate this step’s unique ID, and thus changing those arguments does not invalidate the cache.

For example, you might use this for the batch size in an inference step, where you only care about the model output, not about how many outputs you can produce at the same time.

VERSION: Optional[str] = None#

This is optional, but recommended. Specifying a version gives you a way to tell Tango that a step has changed during development, and should now be recomputed. This doesn’t invalidate the old results, so when you revert your code, the old cache entries will stick around and be picked up.

property config: Dict[str, Any]#

The configuration parameters that were used to construct the step. This can be empty if the step was not constructed from a configuration file.

Return type

Dict[str, Any]

property dependencies: Set[Step]#

Returns a set of steps that this step depends on. This does not return recursive dependencies.

Return type

Set[Step]

property logger: Logger#

A logging.Logger that can be used within the run() method.

Return type

Logger

property recursive_dependencies: Set[Step]#

Returns a set of steps that this step depends on. This returns recursive dependencies.

Return type

Set[Step]

property resources: StepResources#

Defines the minimum compute resources required to run this step. Certain executors require this information in order to allocate resources for each step.

You can set this with the step_resources argument to Step or you can override this method to automatically define the required resources.

Return type

StepResources

property unique_id: str#

Returns the unique ID for this step.

Unique IDs are of the shape $class_name-$version-$hash, where the hash is the hash of the inputs for deterministic steps, and a random string of characters for non-deterministic ones.

Return type

str

property work_dir: Path#

The working directory that a step can use while its :meth:run() method runs.

This is a convenience property for you to call inside your run() method.

This directory stays around across restarts. You cannot assume that it is empty when your step runs, but you can use it to store information that helps you restart a step if it got killed half-way through the last time it ran.

Return type

Path

property workspace: Workspace#

The Workspace being used.

This is a convenience property for you to call inside your run() method.

Return type

Workspace

class tango.step.WithUnresolvedSteps(function, *args, **kwargs)[source]#

This is a helper class for some scenarios where steps depend on other steps.

Let’s say we have two steps, ConsumeDataStep and ProduceDataStep. The easiest way to make ConsumeDataStep depend on ProduceDataStep is to specify Produce as one of the arguments to the step. This works when Consume takes the output of Produce directly, or if it takes it inside standard Python container, like a list, set, or dictionary.

But what if the output of ConsumeDataStep needs to be added to a complex, custom data structure? WithUnresolvedSteps takes care of this scenario.

For example, this works without any help:

class ProduceDataStep(Step[MyDataClass]):
    def run(self, ...) -> MyDataClass
        ...
        return MyDataClass(...)

class ConsumeDataStep(Step):
    def run(self, input_data: MyDataClass):
        ...

produce = ProduceDataStep()
consume = ConsumeDataStep(input_data = produce)

This scenario needs help:

@dataclass
class DataWithTimestamp:
    data: MyDataClass
    timestamp: float

class ProduceDataStep(Step[MyDataClass]):
    def run(self, ...) -> MyDataClass
        ...
        return MyDataClass(...)

class ConsumeDataStep(Step):
    def run(self, input_data: DataWithTimestamp):
        ...

produce = ProduceDataStep()
consume = ConsumeDataStep(
    input_data = DataWithTimestamp(produce, time.now())
)

That does not work, because DataWithTimestamp needs an object of type MyDataClass, but we’re giving it an object of type Step[MyDataClass]. Instead, we change the last line to this:

consume = ConsumeDataStep(
    input_data = WithUnresolvedSteps(
        DataWithTimestamp, produce, time.now()
    )
)

WithUnresolvedSteps will delay calling the constructor of DataWithTimestamp until the run() method runs. Tango will make sure that the results from the produce step are available at that time, and replaces the step in the arguments with the step’s results.

Parameters
  • function – The function to call after resolving steps to their results.

  • args – The args to pass to the function. These may contain steps, which will be resolved before the function is called.

  • kwargs – The kwargs to pass to the function. These may contain steps, which will be resolved before the function is called.

construct(workspace)[source]#

Replaces all steps in the args that are stored in this object, and calls the function with those args.

Parameters

workspace (Workspace) – The Workspace in which to resolve all the steps.

Returns

The result of calling the function.

det_hash_object()[source]#

Return an object to use for deterministic hashing instead of self.

Return type

Any

classmethod with_resolved_steps(o, workspace)[source]#

Recursively goes through a Python object and replaces all instances of Step with the results of that step.

Parameters
  • o (Any) – The Python object to go through

  • workspace (Workspace) – The workspace in which to resolve all steps

Returns

A new object that’s a copy of the original object, with all instances of Step replaced with the results of the step.

class tango.step.StepResources(cpu_count=None, gpu_count=None, memory=None, shared_memory=None)[source]#

TaskResources describe minimum external hardware requirements which must be available for a step to run.

cpu_count: Optional[float] = None#

Minimum number of logical CPU cores. It may be fractional.

Examples: 4, 0.5.

gpu_count: Optional[float] = None#

Minimum number of GPUs. It must be non-negative.

memory: Optional[str] = None#

Minimum available system memory as a number with unit suffix.

Examples: 2.5GiB, 1024m.

shared_memory: Optional[str] = None#

Size of /dev/shm as a number with unit suffix.

Examples: 2.5GiB, 1024m.

Implementations#

Built-in Step implementations that are not tied to any particular integration.

class tango.steps.DatasetCombineStep(step_name=None, cache_results=None, step_format=None, step_config=None, step_unique_id_override=None, step_resources=None, **kwargs)[source]#

This step combines multiple DatasetDict s into one.

Tip

Registered as a Step under the name “dataset_combine”.

Examples

input1 = DatasetDict({
    "train": list(range(10)),
    "dev": list(range(10, 15)),
})
input2 = DatasetDict({
    "train": list(range(15, 25)),
    "val": list(range(25, 30)),
})
combined = DatasetCombineStep(inputs=[input1, input2])
combined_dataset = combined.result()
run(inputs, shuffle=False, random_seed=1532637578)[source]#

Combines multiple datasets into one. This is done lazily, so all operations are fast.

If a split is present in more than one input dataset, the output dataset will have a split that’s the concatenation of the input splits.

Parameters
  • inputs (List[DatasetDict]) – The list of input datasets that will be combined.

  • shuffle (bool, default: False) – Whether to shuffle the combined datasets. If you don’t do this, the new splits will contain first all the instances from one dataset, and then all the instances from another dataset.

  • random_seed (int, default: 1532637578) – Random seed, affects shuffling

Return type

DatasetDict

Returns

Returns a new dataset that is the combination of the input datasets.

CACHEABLE: Optional[bool] = False#

This provides a direct way to turn off caching. For example, a step that reads a HuggingFace dataset doesn’t need to be cached, because HuggingFace datasets already have their own caching mechanism. But it’s still a deterministic step, and all following steps are allowed to cache. If it is None, the step figures out by itself whether it should be cacheable or not.

DETERMINISTIC: bool = True#

This describes whether this step can be relied upon to produce the same results every time when given the same inputs. If this is False, you can still cache the output of the step, but the results might be unexpected. Tango will print a warning in this case.

VERSION: Optional[str] = '001'#

This is optional, but recommended. Specifying a version gives you a way to tell Tango that a step has changed during development, and should now be recomputed. This doesn’t invalidate the old results, so when you revert your code, the old cache entries will stick around and be picked up.

class tango.steps.DatasetRemixStep(step_name=None, cache_results=None, step_format=None, step_config=None, step_unique_id_override=None, step_resources=None, **kwargs)[source]#

This step can remix splits in a DatasetDict into new splits.

Tip

Registered as a Step under the name “dataset_remix”.

Examples

input = DatasetDict({
    "train": list(range(10)),
    "dev": list(range(10, 15)),
})
new_splits = {
    "all": "train + dev",
    "crossval_train": "train[0:5] + train[7:]",
    "crossval_test": "train[5:7]",
}
remix_step = DatasetRemixStep(input=input, new_splits=new_splits)
remixed_dataset = remix_step.result()
run(input, new_splits, keep_old_splits=True, shuffle_before=False, shuffle_after=False, random_seed=1532637578)[source]#

Remixes and shuffles a dataset. This is done lazily, so all operations are fast.

Parameters
  • input (DatasetDict) – The input dataset that will be remixed.

  • new_splits (Dict[str, str]) –

    Specifies the new splits that the output dataset should have. Keys are the name of the new splits. Values refer to the original splits. You can refer to original splits in the following ways:

    • Mention the original split name to copy it to a new name.

    • Mention the original split name with Python’s slicing syntax to select part of the original split’s instances. For example, "train[:1000]" selects the first 1000 instances from the "train" split.

    • "instances + instances" concatenates the instances into one split.

    You can combine these possibilities.

  • keep_old_splits (bool, default: True) – Whether to keep the splits from the input dataset in addition to the new ones given by new_splits.

  • shuffle_before (bool, default: False) –

    Whether to shuffle the input splits before creating the new ones.

    If you need shuffled instances and you’re not sure the input is properly shuffled, use this.

  • shuffle_after (bool, default: False) –

    Whether to shuffle the input splits after creating the new ones.

    If you need shuffled instances and you’re slicing or concatenating splits, use this.

    If you want to be on the safe side, shuffle both before and after. Shuffling is a cheap operation.

  • random_seed (int, default: 1532637578) – Random seed, affects shuffling

Return type

DatasetDict

Returns

Returns a new dataset that is appropriately remixed.

CACHEABLE: Optional[bool] = False#

This provides a direct way to turn off caching. For example, a step that reads a HuggingFace dataset doesn’t need to be cached, because HuggingFace datasets already have their own caching mechanism. But it’s still a deterministic step, and all following steps are allowed to cache. If it is None, the step figures out by itself whether it should be cacheable or not.

DETERMINISTIC: bool = True#

This describes whether this step can be relied upon to produce the same results every time when given the same inputs. If this is False, you can still cache the output of the step, but the results might be unexpected. Tango will print a warning in this case.

VERSION: Optional[str] = '001'#

This is optional, but recommended. Specifying a version gives you a way to tell Tango that a step has changed during development, and should now be recomputed. This doesn’t invalidate the old results, so when you revert your code, the old cache entries will stick around and be picked up.

class tango.steps.PrintStep(step_name=None, cache_results=None, step_format=None, step_config=None, step_unique_id_override=None, step_resources=None, **kwargs)[source]#

This step just logs or prints its input and also returns what it prints.

run(input)[source]#

Print out the input.

Return type

str

CACHEABLE: Optional[bool] = False#

This provides a direct way to turn off caching. For example, a step that reads a HuggingFace dataset doesn’t need to be cached, because HuggingFace datasets already have their own caching mechanism. But it’s still a deterministic step, and all following steps are allowed to cache. If it is None, the step figures out by itself whether it should be cacheable or not.

DETERMINISTIC: bool = True#

This describes whether this step can be relied upon to produce the same results every time when given the same inputs. If this is False, you can still cache the output of the step, but the results might be unexpected. Tango will print a warning in this case.