Step#
Base class#
- class tango.step.Step(step_name=None, cache_results=None, step_format=None, step_config=None, step_unique_id_override=None, step_resources=None, step_metadata=None, step_extra_dependencies=None, **kwargs)[source]#
This class defines one step in your experiment. To write your own step, derive from this class and overwrite the
run()
method. Therun()
method must have parameters with type hints.Step.__init__()
takes all the arguments we want to run the step with. They get passed torun()
(almost) as they are. If the arguments are other instances ofStep
, those will be replaced with the stepâs results before callingrun()
. Further, there are four special parameters:- Parameters:
step_name (
Optional
[str
], default:None
) â contains an optional human-readable name for the step. This name is used for error messages and the like, and has no consequence on the actual computation.cache_results (
Optional
[bool
], default:None
) â specifies whether the results of this step should be cached. If this isFalse
, the step is recomputed every time it is needed. If this is not set at all, andCACHEABLE
isTrue
, we cache if the step is marked asDETERMINISTIC
, and we donât cache otherwise.step_format (
Optional
[Format
], default:None
) â gives you a way to override the stepâs default format (which is given inFORMAT
).step_config (
Union
[Dict
[str
,Any
],Params
,None
], default:None
) â is the original raw part of the experiment config corresponding to this step. This can be accessed via theconfig
property within each stepâsrun()
method.step_unique_id_override (
Optional
[str
], default:None
) â overrides the construction of the stepâs unique id using the hash of inputs.step_resources (
Optional
[StepResources
], default:None
) â gives you a way to set the minimum compute resources required to run this step. Certain executors require this information.step_metadata (
Optional
[Dict
[str
,Any
]], default:None
) â use this to specify additional metadata for your step. This is added to theMETADATA
class variable to form theself.metadata
attribute. Values instep_metadata
take precedence overMETADATA
.step_extra_dependencies (
Optional
[Iterable
[Step
]], default:None
) â use this to force a dependency on other steps. Normally dependencies between steps are determined by the inputs and outputs of the steps, but you can use this parameter to force that other steps run before this step even if this step doesnât explicitly depend on the outputs of those steps.
Important
Overriding the unique id means that the step will always map to this value, regardless of the inputs, and therefore, the step cache will only hold a single copy of the stepâs output (from the last execution). Thus, in most cases, this should not be used when constructing steps. We include this option for the case when the executor creates subprocesses, which also need to access the same
Step
object.- __eq__(other)[source]#
Determines whether this step is equal to another step. Two steps with the same unique ID are considered identical.
- __init__(step_name=None, cache_results=None, step_format=None, step_config=None, step_unique_id_override=None, step_resources=None, step_metadata=None, step_extra_dependencies=None, **kwargs)[source]#
- ensure_result(workspace=None)[source]#
This makes sure that the result of this step is in the cache. It does not return the result.
- Return type:
- classmethod massage_kwargs(kwargs)[source]#
Override this method in your step if you want to change the stepâs arguments before they are passed to the
run()
method.This can be useful if you want to normalize arguments that are passed to your step. For example, you might not care about the case of a string thatâs passed in. You can lowercase the string in this method, and the step will function as if it had been created with a lowercase string from the start. This way you can make sure that the stepâs unique ID does not change when the case of the input changes.
Note
When the input to a step is another step, this method will see the step in the input, not the other stepâs result.
Warning
This is an advanced feature of Tango that you wonât need most of the time.
By default, this method does nothing and just returns its input unchanged.
- result(workspace=None, needed_by=None)[source]#
Returns the result of this step. If the results are cached, it returns those. Otherwise it runs the step and returns the result from there.
If necessary, this method will first produce the results of all steps it depends on.
- Return type:
TypeVar
(T
)
- abstract run(**kwargs)[source]#
Execute the stepâs action.
This method needs to be implemented when creating a
Step
subclass, but it shouldnât be called directly. Instead, callresult()
.- Return type:
TypeVar
(T
)
-
CACHEABLE:
Optional
[bool
] = None# This provides a direct way to turn off caching. For example, a step that reads a HuggingFace dataset doesnât need to be cached, because HuggingFace datasets already have their own caching mechanism. But itâs still a deterministic step, and all following steps are allowed to cache. If it is
None
, the step figures out by itself whether it should be cacheable or not.
-
DETERMINISTIC:
bool
= True# This describes whether this step can be relied upon to produce the same results every time when given the same inputs. If this is
False
, you can still cache the output of the step, but the results might be unexpected. Tango will print a warning in this case.
-
FORMAT:
Format
= <tango.format.DillFormat object># This specifies the format the results of this step will be serialized in. See the documentation for
Format
for details.
-
SKIP_DEFAULT_ARGUMENTS:
Dict
[str
,Any
] = {}# Sometimes, you want to add another argument to your
run()
method, but you donât want to invalidate the cache when this new argument is set to its default value. If that is the case, add the argument to this dictionary with the default value that should be ignored.
-
SKIP_ID_ARGUMENTS:
Set
[str
] = {}# If your
run()
method takes some arguments that donât affect the results, list them here. Arguments listed here will not be used to calculate this stepâs unique ID, and thus changing those arguments does not invalidate the cache.For example, you might use this for the batch size in an inference step, where you only care about the model output, not about how many outputs you can produce at the same time.
-
VERSION:
Optional
[str
] = None# This is optional, but recommended. Specifying a version gives you a way to tell Tango that a step has changed during development, and should now be recomputed. This doesnât invalidate the old results, so when you revert your code, the old cache entries will stick around and be picked up.
- property config: Dict[str, Any]#
The configuration parameters that were used to construct the step. This can be empty if the step was not constructed from a configuration file.
- property dependencies: Set[Step]#
Returns a set of steps that this step depends on. This does not return recursive dependencies.
- property logger: Logger#
A
logging.Logger
that can be used within therun()
method.
- property recursive_dependencies: Set[Step]#
Returns a set of steps that this step depends on. This returns recursive dependencies.
- property resources: StepResources#
Defines the minimum compute resources required to run this step. Certain executors require this information in order to allocate resources for each step.
You can set this with the
step_resources
argument toStep
or you can override this method to automatically define the required resources.
- property unique_id: str#
Returns the unique ID for this step.
Unique IDs are of the shape
$class_name-$version-$hash
, where the hash is the hash of the inputs for deterministic steps, and a random string of characters for non-deterministic ones.
- property work_dir: Path#
The working directory that a step can use while its
:meth:run()
method runs.This is a convenience property for you to call inside your
run()
method.This directory stays around across restarts. You cannot assume that it is empty when your step runs, but you can use it to store information that helps you restart a step if it got killed half-way through the last time it ran.
- tango.step.step(name=None, *, exist_ok=False, bind=False, deterministic=True, cacheable=None, version=None, format=<tango.format.DillFormat object>, skip_id_arguments=None, metadata=None)[source]#
A decorator to create a
Step
from a function.- Parameters:
name (
Optional
[str
], default:None
) â A name to register the step under. By default the name of the function is used.exist_ok (
bool
, default:False
) â If True, overwrites any existing step registered under the samename
. Else, throws an error if a step is already registered undername
.bind (
bool
, default:False
) â IfTrue
, the first argument passed to the step function will be the underlyingStep
instance, i.e. the function will be called as an instance method. In this case you must name the first argument âselfâ or you will get aConfigurationError
when instantiating the class.
See the
Step
class for an explanation of the other parameters.Example
from tango import step @step(version="001") def add(a: int, b: int) -> int: return a + b @step(bind=True) def bound_step(self) -> None: assert self.work_dir.is_dir()
- class tango.step.WithUnresolvedSteps(function, *args, **kwargs)[source]#
This is a helper class for some scenarios where steps depend on other steps.
Letâs say we have two steps,
ConsumeDataStep
andProduceDataStep
. The easiest way to makeConsumeDataStep
depend onProduceDataStep
is to specifyProduce
as one of the arguments to the step. This works whenConsume
takes the output ofProduce
directly, or if it takes it inside standard Python container, like a list, set, or dictionary.But what if the output of
ConsumeDataStep
needs to be added to a complex, custom data structure?WithUnresolvedSteps
takes care of this scenario.For example, this works without any help:
class ProduceDataStep(Step[MyDataClass]): def run(self, ...) -> MyDataClass ... return MyDataClass(...) class ConsumeDataStep(Step): def run(self, input_data: MyDataClass): ... produce = ProduceDataStep() consume = ConsumeDataStep(input_data = produce)
This scenario needs help:
@dataclass class DataWithTimestamp: data: MyDataClass timestamp: float class ProduceDataStep(Step[MyDataClass]): def run(self, ...) -> MyDataClass ... return MyDataClass(...) class ConsumeDataStep(Step): def run(self, input_data: DataWithTimestamp): ... produce = ProduceDataStep() consume = ConsumeDataStep( input_data = DataWithTimestamp(produce, time.now()) )
That does not work, because
DataWithTimestamp
needs an object of typeMyDataClass
, but weâre giving it an object of typeStep[MyDataClass]
. Instead, we change the last line to this:consume = ConsumeDataStep( input_data = WithUnresolvedSteps( DataWithTimestamp, produce, time.now() ) )
WithUnresolvedSteps
will delay calling the constructor ofDataWithTimestamp
until therun()
method runs. Tango will make sure that the results from theproduce
step are available at that time, and replaces the step in the arguments with the stepâs results.- Parameters:
function â The function to call after resolving steps to their results.
args â The args to pass to the function. These may contain steps, which will be resolved before the function is called.
kwargs â The kwargs to pass to the function. These may contain steps, which will be resolved before the function is called.
- construct(workspace)[source]#
Replaces all steps in the args that are stored in this object, and calls the function with those args.
- det_hash_object()[source]#
Return an object to use for deterministic hashing instead of
self
.- Return type:
- class tango.step.StepResources(machine=None, cpu_count=None, gpu_count=None, gpu_type=None, memory=None, shared_memory=None)[source]#
TaskResources describe minimum external hardware requirements which must be available for a step to run.
-
cpu_count:
Optional
[float
] = None# Minimum number of logical CPU cores. It may be fractional.
Examples:
4
,0.5
.
-
gpu_type:
Optional
[str
] = None# The type of GPU that the step requires.
The exact string you should use to define a GPU type depends on the executor. With the Beaker executor, for example, you should use the same strings you see in the Beaker UI, such as âNVIDIA A100-SXM-80GBâ.
-
machine:
Optional
[str
] = None# This is an executor-dependent option.
With the Beaker executor, for example, you can set this to âlocalâ to force the executor to run the step locally instead of on Beaker.
-
memory:
Optional
[str
] = None# Minimum available system memory as a number with unit suffix.
Examples:
2.5GiB
,1024m
.
Size of
/dev/shm
as a number with unit suffix.Examples:
2.5GiB
,1024m
.
-
cpu_count:
Implementations#
Built-in Step
implementations that are not tied to any particular
integration.
- class tango.steps.DatasetCombineStep(step_name=None, cache_results=None, step_format=None, step_config=None, step_unique_id_override=None, step_resources=None, step_metadata=None, step_extra_dependencies=None, **kwargs)[source]#
This step combines multiple
DatasetDict
s into one.Tip
Registered as a
Step
under the name âdataset_combineâ.Examples
input1 = DatasetDict({ "train": list(range(10)), "dev": list(range(10, 15)), }) input2 = DatasetDict({ "train": list(range(15, 25)), "val": list(range(25, 30)), }) combined = DatasetCombineStep(inputs=[input1, input2]) combined_dataset = combined.result()
- run(inputs, shuffle=False, random_seed=1532637578)[source]#
Combines multiple datasets into one. This is done lazily, so all operations are fast.
If a split is present in more than one input dataset, the output dataset will have a split thatâs the concatenation of the input splits.
- Parameters:
inputs (
List
[DatasetDict
]) â The list of input datasets that will be combined.shuffle (
bool
, default:False
) â Whether to shuffle the combined datasets. If you donât do this, the new splits will contain first all the instances from one dataset, and then all the instances from another dataset.random_seed (
int
, default:1532637578
) â Random seed, affects shuffling
- Return type:
- Returns:
Returns a new dataset that is the combination of the input datasets.
-
CACHEABLE:
Optional
[bool
] = False# This provides a direct way to turn off caching. For example, a step that reads a HuggingFace dataset doesnât need to be cached, because HuggingFace datasets already have their own caching mechanism. But itâs still a deterministic step, and all following steps are allowed to cache. If it is
None
, the step figures out by itself whether it should be cacheable or not.
-
DETERMINISTIC:
bool
= True# This describes whether this step can be relied upon to produce the same results every time when given the same inputs. If this is
False
, you can still cache the output of the step, but the results might be unexpected. Tango will print a warning in this case.
-
VERSION:
Optional
[str
] = '001'# This is optional, but recommended. Specifying a version gives you a way to tell Tango that a step has changed during development, and should now be recomputed. This doesnât invalidate the old results, so when you revert your code, the old cache entries will stick around and be picked up.
- class tango.steps.DatasetRemixStep(step_name=None, cache_results=None, step_format=None, step_config=None, step_unique_id_override=None, step_resources=None, step_metadata=None, step_extra_dependencies=None, **kwargs)[source]#
This step can remix splits in a
DatasetDict
into new splits.Tip
Registered as a
Step
under the name âdataset_remixâ.Examples
input = DatasetDict({ "train": list(range(10)), "dev": list(range(10, 15)), }) new_splits = { "all": "train + dev", "crossval_train": "train[0:5] + train[7:]", "crossval_test": "train[5:7]", } remix_step = DatasetRemixStep(input=input, new_splits=new_splits) remixed_dataset = remix_step.result()
- run(input, new_splits, keep_old_splits=True, shuffle_before=False, shuffle_after=False, random_seed=1532637578)[source]#
Remixes and shuffles a dataset. This is done lazily, so all operations are fast.
- Parameters:
input (
DatasetDict
) â The input dataset that will be remixed.new_splits (
Dict
[str
,str
]) âSpecifies the new splits that the output dataset should have. Keys are the name of the new splits. Values refer to the original splits. You can refer to original splits in the following ways:
Mention the original split name to copy it to a new name.
Mention the original split name with Pythonâs slicing syntax to select part of the original splitâs instances. For example,
"train[:1000]"
selects the first 1000 instances from the"train"
split."instances + instances"
concatenates the instances into one split.
You can combine these possibilities.
keep_old_splits (
bool
, default:True
) â Whether to keep the splits from the input dataset in addition to the new ones given bynew_splits
.shuffle_before (
bool
, default:False
) âWhether to shuffle the input splits before creating the new ones.
If you need shuffled instances and youâre not sure the input is properly shuffled, use this.
shuffle_after (
bool
, default:False
) âWhether to shuffle the input splits after creating the new ones.
If you need shuffled instances and youâre slicing or concatenating splits, use this.
If you want to be on the safe side, shuffle both before and after. Shuffling is a cheap operation.
random_seed (
int
, default:1532637578
) â Random seed, affects shuffling
- Return type:
- Returns:
Returns a new dataset that is appropriately remixed.
-
CACHEABLE:
Optional
[bool
] = False# This provides a direct way to turn off caching. For example, a step that reads a HuggingFace dataset doesnât need to be cached, because HuggingFace datasets already have their own caching mechanism. But itâs still a deterministic step, and all following steps are allowed to cache. If it is
None
, the step figures out by itself whether it should be cacheable or not.
-
DETERMINISTIC:
bool
= True# This describes whether this step can be relied upon to produce the same results every time when given the same inputs. If this is
False
, you can still cache the output of the step, but the results might be unexpected. Tango will print a warning in this case.
-
VERSION:
Optional
[str
] = '001'# This is optional, but recommended. Specifying a version gives you a way to tell Tango that a step has changed during development, and should now be recomputed. This doesnât invalidate the old results, so when you revert your code, the old cache entries will stick around and be picked up.
- class tango.steps.PrintStep(step_name=None, cache_results=None, step_format=None, step_config=None, step_unique_id_override=None, step_resources=None, step_metadata=None, step_extra_dependencies=None, **kwargs)[source]#
This step just logs or prints its input and also returns what it prints.
-
CACHEABLE:
Optional
[bool
] = False# This provides a direct way to turn off caching. For example, a step that reads a HuggingFace dataset doesnât need to be cached, because HuggingFace datasets already have their own caching mechanism. But itâs still a deterministic step, and all following steps are allowed to cache. If it is
None
, the step figures out by itself whether it should be cacheable or not.
-
CACHEABLE:
- class tango.steps.ShellStep(step_name=None, cache_results=None, step_format=None, step_config=None, step_unique_id_override=None, step_resources=None, step_metadata=None, step_extra_dependencies=None, **kwargs)[source]#
This step runs a shell command, and returns the standard output as a string.
Tip
Registered as a
Step
under the name âshell_stepâ.- Parameters:
shell_command â The shell command to run.
output_path â The step makes no assumptions about the command being run. If your command produces some output, you can optionally specify the output path for recording the output location, and optionally validating it. See validate_output argument for this.
validate_output â If an expected output_path has been specified, you can choose to validate that the step produced the correct output. By default, it will just check if the output_path exists, but you can pass any other validating function. For example, if your command is a script generating a model output, you can check if the model weights can be loaded.
kwargs â Other kwargs to be passed to subprocess.run(). If you need to take advantage of environment variables, set shell = True.
- run(shell_command, output_path=None, validate_output=<tango.common.registrable.make_registrable.<locals>.function_wrapper.<locals>.WrapperFunc object>, **kwargs)[source]#
Execute the stepâs action.
This method needs to be implemented when creating a
Step
subclass, but it shouldnât be called directly. Instead, callresult()
.