Computational model¶
Here, we review the computational model of redun and the semantics of workflow execution. These details help to define what is correct behavior when implementing redun.
The redun Expression Graph language¶
redun uses expression graphs to represent workflows. We can describe the structure of those graphs using the following formal grammar:
# An expression, e, can be one of these forms:
e = TaskExpression(task_name, args=(e, ...), kwargs={arg_key: e, ...}, options)
| SchedulerExpression(task_name, args=(e, ...), kwargs={arg_key: e, ...}, options)
| SimpleExpression(op_name, (e, ...), {e, ...})
| nested_value
| concrete_value
# Nested values are container types that redun recurses into to evaluate:
nested_value = [e, ...]
| {e: e, ...}
| (e, ...)
| {e, ...}
| named_tuple_cls(e, ...)
| dataclass_cls(e, ...)
options = {key: e, ...}
where key is an option name
named_tuple_cls = User defined NamedTuple
dataclass_cls = User defined dataclass
task_name = Task name string
arg_key = Task argument name string
op_name = "add"
| "mult"
| "div"
| ... name of any supported Python operator (e.g. `__getitem__`, `__call__`)
concrete_value = Any other python value (e.g. 10, "hello", pd.DataFrame(...))
Evaluation process¶
Workflow execution is carried out by recursively evaluating Expression Graphs using the following Graph Reduction rules:
eval(expr, parent_job) =
Evaluate an expression expr within Job parent_job
exec(func, args, kwargs, options) =
Execute the function func with arguments (args and kwargs) and execution configuration options
# Main expression evaluations:
eval(TaskExpression(task_name, args, kwargs, expr_options), p) => eval(exec(
func=f,
args=eval(args, p),
kwargs=eval(default_args(t, args, kwargs), options_job) | eval(kwargs, p),
options=j.exec_options
), j)
where f = function associated with task t with name task_name
j = Job(expression=TaskExpression(...), task=t, parent_job=p)
options_job = Job(parent_job=p, context=j.context)
p = parent job of the TaskExpression or None if this the root Expression
expr_options = option overrides specific to this TaskExpression
default_args(t, args, kwargs) = default arguments of task t that are not specified
by args and kwargs
j.exec_options, j.context = see next section
eval(SchedulerExpression(task_name, args, kwargs, options), p) => eval(f(scheduler, p, s, *args, **kwargs), p)
where scheduler = redun scheduler
s = SchedulerExpression(task_name, args, kwargs, options)
p = parent job of the SchedulerExpression
options = configuration to customize the execution environment of f
eval(SimpleExpression(op_name, args, kwargs), p) = op(*args, **kwargs)
where op = an operation associated with operation name `op_name`, such as
`add`, `sub`, `__call__`, `__getitem__`, etc.
p = parent job of the SimpleExpression
eval(ValueExpression(concrete_value), p) => concrete_value
where p = parent job of the ValueExpression
# Evaluation of nested values:
eval([a, b, ...], p) => [eval(a, p), eval(b, p), ...]
eval({a: b, c: d, ...}, p) => {eval(a, p): eval(b, p), eval(c, p): eval(d, p), ...}
eval((a, b, ...), p) => (eval(a, p), eval(b, p), ...)
eval({a, b, ...}, p) => {eval(a, p), eval(b, p), ...}
eval(named_tuple_cls(a, b, ...), p) => named_tuple_cls(eval(a, p), eval(b, p), ...)
eval(dataclass_cls(a, b, ...), p) => dataclass_cls(eval(a, p), eval(b, p), ...)
# Concrete values evaluate to themselves:
eval(concrete_value, p) => concrete_value
Noteworthy design choices:
We only create new Job
jfor evaluating TaskExpressions. We do not create new Jobs for evaluating the other expression types. This design choice was made to record provenance at an appropriate level of detail.We propagate the parent jobs through the evaluation in order to facilitate CallGraph recording and to pass down environment like data to child Jobs.
When evaluating a
SchedulerExpression, we pass the positional argumentsargsand keywords argumentskwargsunevaluated intof. This allowsfto customize how to evaluate the arguments, if at all.Default arguments of a task can be expressions themselves (see
eval(default_args(t, args, kwargs), options_job)) and they are evaluated prior to passing to the underlying functionf. This allows chaining extra logic just before task execution@task def my_task(x, y=another_task()): # Both x and y will be concrete at this point. # ...
Task option evaluation and overriding¶
Task options allow customization of Job execution (e.g. memory, cpu, executor, caching, etc). These options can be defined in a number of places. In general, more local options definitions override more globally defined options. Here, we define the specific options overriding semantics:
# Final options used when executing a Job:
job.exec_options = job.executor_options # Options defined in the Job's Executor config in `redun.ini`
| job.eval_options
# Evaluated options for the Job. This is needed since job.options might contain expressions.
job.eval_options = eval(job.options, job.parent_job)
# Job options prior to considering executor options.
job.options =
job.task.options # Options defined in the @task() decorator
| job.inherited_options # Exported options from the parent Job.
| job.expression.options # Options defined at call-time using my_task.options(opt=X)(arg1, arg2)
| scheduler_options # Options defined by the Scheduler or CLI (e.g. `redun run --no-cache`)
# Options that inherit from the parent Job, if it exists.
job.inherited_options = {
k: v for k, v in job.parent_job.options.items()
if k in job.parent_job.exported_options
} if job.parent_job exists else {}
# Jobs export options defined by the union of parent job, task, and expression.
# Here we determine the option names to export.
job.exported_options = (job.parent_job.exported_options if job.parent_job exists else set())
U job.expression.exported_options
U job.task.exported_options
# Job context inherits from parent Job with overrides from Expression.
job.context =
job.parent_job.context | job.eval_options["_context_override"]; if job.parent_job exists
config_context | scheduler_context ; otherwise
config_context = Context defined in the `redun.ini` configuration file.
scheduler_context = Context defined on the CLI (`--context x=10`) or Scheduler (`Scheduler.run(expr, context={"x": 10})`)
Noteworthy design choices:
Options can be expressions as well and are evaluated prior to their use (see
eval(job.options, job.parent_job)). This allows further run-time configuration:@task(memory=get_memory_task()) def my_task(): # ...
Context is passed from parent to child Jobs and is modified by expression options (
update_context()specifically performs this modification).@task(memory=get_context("memory")) def my_task(x, y=get_context("y")): # pass result = my_task.update_context(memory=2, y=10)(9)
In order for
get_context("y")in the default argument to pick up the modification toywithupdate_context(), we need to evaluate the options using a special parent joboptions_jobthat uses the context fromjob.