Computational model#

Here, we review the computational model of redun and the semantics of workflow execution. These details help to define what is correct behavior when implementing redun.

The redun Expression Graph language#

redun uses expression graphs to represent workflows. We can describe the structure of those graphs using the following formal grammar:

# An expression, e, can be one of these forms:
e = TaskExpression(task_name, args=(e, ...), kwargs={arg_key: e, ...}, options)
  | SchedulerExpression(task_name, args=(e, ...), kwargs={arg_key: e, ...}, options)
  | SimpleExpression(op_name, (e, ...), {e, ...})
  | nested_value
  | concrete_value

# Nested values are container types that redun recurses into to evaluate:
nested_value = [e, ...]
             | {e: e, ...}
             | (e, ...)
             | {e, ...}
             | named_tuple_cls(e, ...)
             | dataclass_cls(e, ...)

options = {key: e, ...}
  where key is an option name

named_tuple_cls = User defined NamedTuple
dataclass_cls = User defined dataclass

task_name = Task name string
arg_key = Task argument name string

op_name = "add"
        | "mult"
        | "div"
        | ... name of any supported Python operator (e.g. `__getitem__`, `__call__`)

concrete_value = Any other python value (e.g. 10, "hello", pd.DataFrame(...))

Evaluation process#

Workflow execution is carried out by recursively evaluating Expression Graphs using the following Graph Reduction rules:

eval(expr, parent_job) =
  Evaluate an expression expr within Job parent_job

exec(func, args, kwargs, options) =
  Execute the function func with arguments (args and kwargs) and execution configuration options

# Main expression evaluations:
eval(TaskExpression(task_name, args, kwargs, expr_options), p) => eval(exec(
    func=f,
    args=eval(args, p),
    kwargs=eval(default_args(t, args, kwargs), options_job) | eval(kwargs, p),
    options=j.exec_options
  ), j)
  where f = function associated with task t with name task_name
        j = Job(expression=TaskExpression(...), task=t, parent_job=p)
        options_job = Job(parent_job=p, context=j.context)
        p = parent job of the TaskExpression or None if this the root Expression
        expr_options = option overrides specific to this TaskExpression
        default_args(t, args, kwargs) = default arguments of task t that are not specified
          by args and kwargs
        j.exec_options, j.context = see next section

eval(SchedulerExpression(task_name, args, kwargs, options), p) => eval(f(scheduler, p, s, *args, **kwargs), p)
  where scheduler = redun scheduler
        s = SchedulerExpression(task_name, args, kwargs, options)
        p = parent job of the SchedulerExpression
        options = configuration to customize the execution environment of f

eval(SimpleExpression(op_name, args, kwargs), p) = op(*args, **kwargs)
  where op = an operation associated with operation name `op_name`, such as
             `add`, `sub`, `__call__`, `__getitem__`, etc.
        p = parent job of the SimpleExpression

eval(ValueExpression(concrete_value), p) => concrete_value
   where p = parent job of the ValueExpression

# Evaluation of nested values:
eval([a, b, ...], p) => [eval(a, p), eval(b, p), ...]
eval({a: b, c: d, ...}, p) => {eval(a, p): eval(b, p), eval(c, p): eval(d, p), ...}
eval((a, b, ...), p) => (eval(a, p), eval(b, p), ...)
eval({a, b, ...}, p) => {eval(a, p), eval(b, p), ...}
eval(named_tuple_cls(a, b, ...), p) => named_tuple_cls(eval(a, p), eval(b, p), ...)
eval(dataclass_cls(a, b, ...), p) => dataclass_cls(eval(a, p), eval(b, p), ...)

# Concrete values evaluate to themselves:
eval(concrete_value, p) => concrete_value

Noteworthy design choices:

  • We only create new Job j for evaluating TaskExpressions. We do not create new Jobs for evaluating the other expression types. This design choice was made to record provenance at an appropriate level of detail.

  • We propagate the parent jobs through the evaluation in order to facilitate CallGraph recording and to pass down environment like data to child Jobs.

  • When evaluating a SchedulerExpression, we pass the positional arguments args and keywords arguments kwargs unevaluated into f. This allows f to customize how to evaluate the arguments, if at all.

  • Default arguments of a task can be expressions themselves (see eval(default_args(t, args, kwargs), options_job)) and they are evaluated prior to passing to the underlying function f. This allows chaining extra logic just before task execution

    @task
    def my_task(x, y=another_task()):
        # Both x and y will be concrete at this point.
        # ...
    

Task option evaluation and overriding#

Task options allow customization of Job execution (e.g. memory, cpu, executor, caching, etc). These options can be defined in a number of places. In general, more local options definitions override more globally defined options. Here, we define the specific options overriding semantics:

# Final options used when executing a Job:
job.exec_options = job.executor_options  # Options defined in the Job's Executor config in `redun.ini`
                 | job.eval_options      

# Evaluated options for the Job. This is needed since job.options might contain expressions.
job.eval_options = eval(job.options, job.parent_job)

# Job options prior to considering executor options.
job.options =
    job.task.options        # Options defined in the @task() decorator
  | job.inherited_options   # Exported options from the parent Job.
  | job.expression.options  # Options defined at call-time using my_task.options(opt=X)(arg1, arg2)
  | scheduler_options       # Options defined by the Scheduler or CLI (e.g. `redun run --no-cache`)

# Options that inherit from the parent Job, if it exists.
job.inherited_options = {
  k: v for k, v in job.parent_job.options.items()
  if k in job.parent_job.exported_options
} if job.parent_job exists else {}

# Jobs export options defined by the union of parent job, task, and expression.
# Here we determine the option names to export.
job.exported_options = (job.parent_job.exported_options if job.parent_job exists else set())
                     U job.expression.exported_options
                     U job.task.exported_options

# Job context inherits from parent Job with overrides from Expression.
job.context =
  job.parent_job.context | job.eval_options["_context_override"]; if job.parent_job exists
  config_context | scheduler_context                            ; otherwise

config_context = Context defined in the `redun.ini` configuration file.
scheduler_context = Context defined on the CLI (`--context x=10`) or Scheduler (`Scheduler.run(expr, context={"x": 10})`)

Noteworthy design choices:

  • Options can be expressions as well and are evaluated prior to their use (see eval(job.options, job.parent_job)). This allows further run-time configuration:

    @task(memory=get_memory_task())
    def my_task():
        # ...
    
  • Context is passed from parent to child Jobs and is modified by expression options (update_context() specifically performs this modification).

    @task(memory=get_context("memory"))
    def my_task(x, y=get_context("y")):
        # pass
    
    result = my_task.update_context(memory=2, y=10)(9)
    
    • In order for get_context("y") in the default argument to pick up the modification to y with update_context(), we need to evaluate the options using a special parent job options_job that uses the context from job.