redun package#

Subpackages#

Submodules#

redun.bcoding module#

From:

https://github.com/flying-sheep/bcode/blob/master/bcoding.py

Main change made here is to explicitly safeguard against encoding bools as they derive from int. Formatting and linting changes were also applied.

Original License:

Copyright (c) 2013 Phil Schaf

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

bencode/decode library.

bencoding is used in bittorrent files

use the exposed functions to encode/decode them.

redun.bcoding.assert_btype(byte, typ)#
redun.bcoding.bdecode(f_or_data)#

bdecodes data by looking up the type byte, and using it to look up the respective decoding function, which in turn is used to return the decoded object

The parameter can be a file opened in bytes mode, bytes or a string (the last of which will be decoded)

redun.bcoding.bencode(data, f=None)#

Writes a serializable data piece to f The order of tests is nonarbitrary, as strings and mappings are iterable.

If f is None, it writes to a byte buffer and returns a bytestring

redun.bcoding.main(args=None)#

Decodes bencoded files to python syntax (like JSON, but with bytes support)

redun.cli module#

class redun.cli.ArgFormatter(prog, indent_increment=2, max_help_position=24, width=None)#

Bases: ArgumentDefaultsHelpFormatter, RawDescriptionHelpFormatter

An argument formatter that shows default values and does not reflow description strings.

class redun.cli.RedunClient(stdout: ~typing.IO = <_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, stderr: ~typing.IO = <_io.TextIOWrapper name='<stderr>' mode='w' encoding='utf-8'>)#

Bases: object

Command-line (CLI) client for interacting with redun.

STATUS_WIDTH = 6#
aws_kill_jobs_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Kill AWS Batch jobs.

aws_list_jobs_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

List AWS Batch jobs.

aws_logs_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Fetch AWS Batch job logs.

check_version(required_version: str) None#

Enfore a required redun version.

console_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Performs the console command (CLI GUI).

db_downgrade_command(args: Namespace, extra_args: List[str], argv: List[str]) None#
db_info_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Display information about redun repo database.

db_migrate_command(direction: str, args: Namespace, extra_args: List[str], argv: List[str]) None#

Migrate redun repo database to a new version.

db_upgrade_command(args: Namespace, extra_args: List[str], argv: List[str]) None#
db_versions_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Display all available redun repo database versions.

display(*messages: Any, pretty: bool = False, indent: int = 0, newline: bool = True) None#

Write text to standard output.

display_doc_tags(tags: List[Tag], indent: int = 0) None#

Display doc tags.

execute(argv: List[str] | None = None) Any#

Execute a command from the command line.

export_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Export records from redun database.

fs_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Give information on redun-supported filesystems.

fs_copy_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Copy files between redun-supported filesystems.

get_backend(args: Namespace) RedunBackendDb#
get_command_parser() ArgumentParser#

Returns the command line parser.

get_record_ids(prefix_ids: Sequence[str]) List[str]#

Expand prefix record ids to full record ids.

get_scheduler(args: Namespace, migrate: bool = False, migrate_if_local: bool = False) Scheduler#
get_session(args: Namespace) Session#
help_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Show help information.

import_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Import records into redun database.

infer_file_path(path: str) Base | None#

Try to infer if path matches any File.

Returns a query iterating over Files and relevant Jobs.

infer_id(id: str, include_files: bool = True, required: bool = False) Any#

Try to infer the record based on an id prefix.

init_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Initialize redun project directory.

launch_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Run a workflow within an Executor (e.g. docker or batch).

log_call_node(call_node: CallNode, indent: int = 0, show_children: bool = True, show_parents: bool = True, show_result: bool = True, show_task: bool = True, show_dataflow: bool = True, detail: bool = True) None#

Display a CallNode.

log_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Performs the log command.

This is the main command for querying the CallGraph.

log_dataflow(value: Value, value_name: str = 'value', indent: int = 0)#

Display the upstream dataflow of a Value.

log_execution(execution: Execution, show_jobs: bool = True, indent: int = 0, detail: bool = True) None#

Display an Execution.

log_file(file: File, kind: str | None = None, indent: int = 0) None#

Display a File.

log_files(query: CallGraphQuery, indent: int = 0) None#

Display File-specific query results.

log_job(job: Job, indent: int = 0, show_children: bool = False, detail: bool = True) None#

Display a Job.

log_record(record: Any, detail: bool = True, indent: int = 0, format: str = 'text') None#

Display one record or a list of records.

log_task(task: Task | Task, show_source: bool = True, show_job: bool = True, indent: int = 0, detail: bool = True) None#

Display a Task.

log_traceback(job: Job, indent: int = 0, detail: bool = True) None#

Display a Job traceback.

log_value(value: Value, indent: int = 0, detail: bool = True) None#

Display a Value.

oneshot_command(args: Namespace, extra_args: List[str], argv: List[str]) Any#

Evaluate a single task.

pull_command(args, extra_args, argv)#

Pull records into another redun database.

push_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Push records into a redun repository.

repl_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Get a read-eval-print-loop for querying the history.

repo_add_command(args: Namespace, extra_args: List[str], argv: List[str]) None#
repo_list_command(args: Namespace, extra_args: List[str], argv: List[str]) None#
repo_remove_command(args: Namespace, extra_args: List[str], argv: List[str]) None#
resolve_context_updates(context_file: str, context_updates: List[str]) Dict[str, Any]#

Create a dict with updates to the context from command line arguments.

run_command(args: Namespace, extra_args: List[str], argv: List[str]) Any#

Performs the run command.

server_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Run redun local web server UI.

start_pager() None#

Redirect output to a pager if stdout is a tty.

stop_pager() None#

Stop sending output to a pager.

tag_add_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Add new tags to entities.

tag_list_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Search tags.

tag_rm_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Delete tags.

tag_update_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Update entity tags.

viz_command(args: Namespace, extra_args: List[str], argv: List[str]) None#

Performs the visualization command.

exception redun.cli.RedunClientError#

Bases: Exception

redun.cli.add_value_arg_parser(parser: ArgumentParser, arg_name: str, anno: Any, default: Any) Action#

Add argument parser inferred from a parameter type.

redun.cli.arg_name2cli_name(arg_name: str) str#

Convert a snake_case argument into a –kabob-case argument.

redun.cli.check_version(version: str, version_spec: str) bool#

Returns True if version satisfies version specification.

redun.cli.find_config_dir(cwd: str | None = None) str | None#

Search up directories from current working directory to find config dir.

redun.cli.format_arguments(args: List[Argument]) str#

Display CallNode arguments.

For example, if args has 2 positional and 1 keyword argument, we would display that as:

‘prog’, 10, extra_file=File(path=prog.c, hash=763bc10f)

redun.cli.format_id(id: str, detail: bool = False, prefix=8) str#

Display a record id.

redun.cli.format_setup_help(parser: ArgumentParser) Iterator[str]#

Yields lines of help text for setup argument parser.

redun.cli.format_tags(tags: List[Tag], max_length: int = 50) str#

Format a set of tags.

redun.cli.format_timedelta(duration: timedelta) str#

Format timedelta as a string.

redun.cli.get_abs_path(path: str) str#

Returns absolute path of input string, which can be an s3 path.

redun.cli.get_anno_origin(anno: Any) Any | None#

Returns the origin of an annotation.

Origin is Python’s term for the main type of a generic type. For example, the origin of List[int] is list and its argument is int. This function abstracts away change that have occurred in the Python API since 3.6.

https://docs.python.org/3/library/typing.html#typing.get_origin

redun.cli.get_config_dir(config_dir: str | None = None) str#

Get the redun config dir.

We use the following precedence: - command line (config_dir) - environment variable - search filesystem (parent directories) for config dir - assume .redun in current working directory.

Note that the returned config dir may be relative.

redun.cli.get_config_path(config_dir: str | None = None) str#
redun.cli.get_default_execution_tags(user: str | None = None, project: str | None = None, doc: str | None = None, exclude_users: List[str] = []) List[Tuple[str, Any]]#

Get default tags for the Execution.

redun.cli.get_gcp_user() str | None#

Returns the user account for any active GCP logins, otherwise None.

redun.cli.get_setup_parser(setup_func: Callable) Tuple[ArgumentParser, Dict[str, str]]#

Returns an ArgumentParser for setup arguments.

redun.cli.get_task_arg_parser(task: Task, include_defaults: bool) Tuple[ArgumentParser, Dict[str, str]]#

Returns a CLI parser for a redun Task.

Parameters:
  • task (BaseTask) – The task to generate a parser for.

  • include_defaults (bool) – If true, set defaults for the parser based on the task defaults. If false, do not. This can be useful for determining if a particular argument was actually set by the user.

redun.cli.get_user_setup_func(config: Config) Callable[[...], Scheduler]#

Returns scheduler setup function based on user config.

redun.cli.get_username() str#

Returns the current redun user

redun.cli.import_script(filename_or_module: str, add_cwd: bool = True) module#

Import a python script as a module.

Parameters:
  • filename_or_module (str) – This argument can be a filepath to a python script (e.g. path/to/script.py) or a dot-delimited module (e.g. lib.workflows.workflow).

  • add_cwd (bool) – If True, add the current working directory to the python import paths (sys.path).

redun.cli.infer_project_name(task: Task) str | None#

Infer a project name from the top-level task.

redun.cli.is_config_local_db(config: Config) bool#

Returns True if config uses a local sqlite db.

redun.cli.is_port_in_use(hostname: str, port: int | str) bool#

Check if TCP/IP port on hostname is in use

redun.cli.is_python_filename(name: str) bool#

Returns True if string looks like a python filename.

redun.cli.make_parse_arg_func(arg_anno: Any) Callable[[str], Any]#

Returns parser for argument annotation.

redun.cli.parse_func_path(path: str) Tuple[str, str]#

Parses a function path ‘file_or_module::func’.

Parameters:

path (str) – path should have the format: ‘file_or_module::func’, where file_or_module is a filename or python module and func is a function name.

Returns:

A tuple of file_or_module and function name.

Return type:

Tuple[str, str]

redun.cli.parse_setup_args(setup_func: Callable, setup_args: List[str] | None) Dict[str, Any]#

Parse setup arguments into keyword arguments.

redun.cli.parse_version(version: str) Tuple#

Parse a version number into a tuple of ints.

redun.cli.run_command(argv: List[str]) str | None#

Run a command and return its output.

redun.cli.setup_backend_db(config_dir: str | None = None, repo: str = 'default') RedunBackendDb#

Setup RedunBackendDb from config directory.

redun.cli.setup_config(config_dir: str | None = None, db_uri: str | None = None, repo: str = 'default', initialize=True) Config#

Setup config file.

redun.cli.setup_repo_config(config: Config, repo: str) Config#

Uses configuration from another repository specified in local config.

redun.cli.setup_scheduler(config_dir: str | None = None, setup_args: List[str] | None = None, repo: str = 'default', migrate: bool | None = None, migrate_if_local: bool = False) Scheduler#

Setup Scheduler from config directory.

redun.cli.with_pager(client: RedunClient, args: Namespace) Iterator[None]#

Context manager for running a pager (e.g. less) for output.

redun.config module#

class redun.config.Config(config_dict: dict | None = None, configdir: str | None = None)#

Bases: object

Extends ConfigParser to support nested sections.

property configdir: str#
get(key: str, default: Any | None = None) Any#
get_config_dict(replace_config_dir=None) Dict[str, Dict]#

Return a python dict that can be used to reconstruct the Config object

config2 = Config(config_dict=config1.get_config_dict()) should result in identical Configs config1 and config2.

Note: The structure of the returned Dict is essentially a two-level dict corresponding to INI file structure. Top-level key is a dot-separated section name. Top-level value is a single-level dict containing key/values for a single section.

Parameters:

replace_config_dir – if not None, replaces any variables containing the machine-local config_dir (as obtained by redun.cli.get_config_dir() with replace_config_dir. Typical values that are replaced are backend::config_dir, db_uri, repos.default::config_dir.

Returns:

  • A copy of this object’s config in two-level format, optionally with values containing

  • the local config directory replaced.

items()#
keys() Iterable[str]#
read_dict(config_dict: dict) None#
read_path(filename: str) None#
read_string(string: str) None#
class redun.config.RedunConfigParser(*args, config: Config, **kwargs)#

Bases: ConfigParser

optionxform(optionstr)#
class redun.config.RedunExtendedInterpolation#

Bases: ExtendedInterpolation

When performing variable interpolation fallback to environment variables.

For example, if the environment variable ROLE is defined, we can reference it in the redun.ini file as follows:

before_get(parser, section, option, value, defaults)#
redun.config.create_config_section(config_dict: dict | None = None) SectionProxy#

Create a default section.

redun.context module#

(scheduler_task)redun.context.get_context(var_path: str, default: Any | None = None) T#

Returns a value from the current context.

Parameters:
  • var_path (str) – A dot-separated path for a variable to fetch from the current context.

  • default (Optional[Any]) – A default value to return if the context variable is not defined. None by default.

redun.context.get_context_value(context: dict, var_path: str, default: T) T#

Returns a value from a context dict using a dot-separated path (e.g. ‘foo.bar.baz’).

Parameters:
  • context (dict) – A redun context dict.

  • var_path (str) – A dot-separated path for a variable to fetch from the current context.

  • default (T) – A default value to return if the context variable is not defined.

redun.db_utils module#

redun.db_utils.filter_in(query: Query, column: Column, values: Iterable, chunk: int = 100) Iterable[Any]#

Perform an IN-filter on a sqlalchemy query with an iterable of values.

Returns an iterable of results.

redun.db_utils.get_or_create(session: Session, Model: ModelType, filter: Dict[str, Any], update: Dict[str, Any] | None = None) Tuple[ModelType, bool]#

Get or create a row for a sqlalchemy Model.

redun.db_utils.query_filter_in(query: Query, column: Column, values: Iterable, chunk: int = 100) Iterable[Query]#

Perform an IN-filter on a sqlalchemy query with an iterable of values.

Returns an iterable of queries with IN-filter applied.

redun.expression module#

class redun.expression.ApplyExpression(args: Tuple, kwargs: dict)#

Bases: Expression[Result]

Lazy expression for applying a function or task to arguments.

type_name: str | None = 'redun.expression.ApplyExpression'#
class redun.expression.Expression#

Bases: Value, Generic[Result]

Base class for lazy expressions.

Lazy expressions are used to defer compute until the redun scheduler is ready to execute it.

get_hash(data: bytes | None = None) str#

Returns a hash for the value. Users may override to provide custom behavior.

Most basic objects are covered by the default approach of hashing the binary pickle serialization.

type_name: str | None = 'redun.expression.Expression'#
class redun.expression.QuotedExpression(expr: Result)#

Bases: Generic[Result]

A quoted expression that does will not evaluate until forced.

eval() Result#

Evaluate the quoted expression.

class redun.expression.SchedulerExpression(task_name: str, args: Tuple, kwargs: dict, task_options: dict | None = None, length: int | None = None)#

Bases: TaskExpression[Result]

Lazy expression that is evalulated within the scheduler for redun-specific operations.

type_name: str | None = 'redun.expression.SchedulerExpression'#
class redun.expression.SimpleExpression(func_name: str, args: Tuple = (), kwargs: dict = {})#

Bases: ApplyExpression[Result]

Lazy expression for a simple computation (e.g. getattr, getitem, call).

type_name: str | None = 'redun.expression.SimpleExpression'#
class redun.expression.TaskExpression(task_name: str, args: Tuple, kwargs: dict, task_options: dict | None = None, length: int | None = None)#

Bases: ApplyExpression[Result]

Lazy expression for applying a task to arguments.

is_valid() bool#

Returns True if the value may be used. For ordinary values, this will typically always be true. For state-tracking values, the deserialized cache value may no longer be valid. If this method returns False, the object will typically be discarded.

This method may be called repeatedly on the same object.

For example, if the value was a File reference and the corresponding file on the filesystem has since been removed/altered, the File reference is no longer valid.

type_name: str | None = 'redun.expression.TaskExpression'#
class redun.expression.ValueExpression(value: Result)#

Bases: Expression[Result]

Lifts a concrete value into an Expression type.

type_name: str | None = 'redun.expression.ValueExpression'#
redun.expression.derive_expression(orig_expr: Expression, derived_value: Expression | Any) Expression#

Record derived_value as downstream of orig_expression.

redun.expression.format_arguments(args, kwargs) str#

Format Task arguments.

redun.expression.get_lazy_operation(name: str) Callable | None#

Retrieves lazy operation by registered name

redun.expression.lazy_operation(method: str | None = None, name: str | None = None) Callable[[Callable], Callable]#

Function decorator to declare lazy operations on Expression object.

Parameters:
  • method – Name of method registered to Expression class. Defaults to None.

  • name – Registered name of operation by which it will be retrieved. Defaults to None.

redun.expression.quote(expr: Result) QuotedExpression[Result]#

Quote an Expression so that it does not evaluate.

redun.federated_tasks module#

exception redun.federated_tasks.InvocationException#

Bases: Exception

Custom exception that is raised when there is an error encountered invoking an AWS lambda.

(scheduler_task)redun.federated_tasks.federated_task(entrypoint: str, *task_args, **task_kwargs) T#

Execute a task that has been indirectly specified in the scheduler config file by providing a federated_task and its executor. This allows us to invoke code we do not have locally.

Since the code isn’t visible, the cache_scope for the subrun is always set to CSE.

Parameters:
  • entrypoint (str) – The name of the federated_task section in the config that identifies the task to perform.

  • task_args (Optional[List[Any]]) – Positional arguments for the task

  • task_kwargs (Any) – Keyword arguments for the task

(scheduler_task)redun.federated_tasks.lambda_federated_task()#

Submit a task to an AWS Lambda Function, by packaging the inputs and sending as the payload with instructions.

This allows a workflow to trigger execution of a remote workflow, to be executed by another service.

This is fire-and-forget, you do not get back the results. We do not implement a way to wait for results, because it’s not currently possible to monitor the backend database and gracefully determine whether an execution has finished. Specifically, executions look “errored” until they succeed, so we would have to rely on timeouts to determine if an error actually happened. Or, we would need to create another side-channel to record progress, which we did not want to undertake.

Parameters:
  • config_name (str) – The name of the config to use; the lambda defines how this name is chosen, not redun.

  • entrypoint (str) – The name of the entrypoint into the above config.

  • url (str) – The name of the AWS Lambda function to invoke

  • scratch_prefix (str) – The path where the packaged inputs can be written. This is dictated by the lambda, hence is a user input.

  • dryrun (bool) – If false, actually invoke the lambda. If true, skip it.

  • itself. (Other args are intended for the task) –

Returns:

  • str – The execution ID we asked the proxy lambda to use

  • Dict – The data package.

redun.federated_tasks.launch_federated_task(federated_config_path: str, entrypoint: str, task_args: Tuple | None = None, task_kwargs: Dict | None = None, input_path: str | None = None, execution_id: str | None = None) str#

Launch the described federated task. Among other purposes, this is designed to make it easy to implement a REST server that can process the messages from rest_federated_task.

As with the launch CLI verb, we briefly start a scheduler to help with sending off the work, but immediately shut it down.

Inputs may be provided in explicit form via task_args and task_kwargs, or pre-packaged, but not both.

Parameters:
  • federated_config_path (str) – The path to the federated config file to use.

  • entrypoint (str) – The name of the entrypoint into the above config.

  • task_args (Optional[Tuple]) – Arguments to package and pass to the task.

  • task_kwargs (Optional[Dict]) – Arguments to package and pass to the task.

  • input_path (Optional[str]) – The path to already-packaged inputs, which must be suitable for use with the redun launch flag –input. This allows a server to forward inputs created by another application. This file is not opened, because we don’t assume we have the code to unpickle the objects inside.

  • execution_id (Optional[str]) – If provided, use this execution id. Otherwise, one will be generated for you.

Returns:

The execution id we asked the redun invocation to use.

Return type:

str

(scheduler_task)redun.federated_tasks.rest_federated_task()#

Submit a task to a server over REST, by packaging the inputs and POSTing a data blob with instructions.

This allows a workflow to trigger execution of a remote workflow, to be executed by another service.

This is fire-and-forget, you do not get back the results. We do not implement a way to wait for results, because it’s not currently possible to monitor the backend database and gracefully determine whether an execution has finished. Specifically, executions look “errored” until they succeed, so we would have to rely on timeouts to determine if an error actually happened. Or, we would need to create another side-channel to record progress, which we did not want to undertake.

Parameters:
  • config_name (str) – The name of the config to use; the server defines how this name is chosen, not redun.

  • entrypoint (str) – The name of the entrypoint into the above config.

  • url (str) – The URL to POST the result to.

  • scratch_prefix (str) – The path where the packaged inputs can be written. This is dictated by the server, hence is a user input.

  • dryrun (bool) – If false, actually perform the POST. If true, skip it.

  • itself. (Other args are intended for the task) –

Returns:

  • str – The execution ID we asked the proxy to use

  • Dict – The data package.

redun.file module#

class redun.file.AzureBlobFileSystem#

Bases: FsspecFileSystem

property az_credential#
copy(src_path: str, dest_path: str) None#

Copy a file from src_path to dest_path.

exists(path: str) bool#

Returns True if path exists on filesystem.

filesize(path: str) int#

Returns file size of path in bytes.

static get_account_name_from_path(path: str) str#
get_fs_for_path(path: str)#
get_hash(path: str) str#

Return a hash for the file at path.

glob(pattern: str) List[str]#

Returns filenames matching pattern.

isdir(path: str) bool#

Returns True if path is a directory.

isfile(path: str) bool#

Returns True if path is a file.

mkdir(path: str) None#

Creates the directory in the filesystem.

name: str = 'az'#
remove(path: str) None#

Delete a path from the filesystem.

rmdir(path: str, recursive: bool = False) None#

Removes a directory from the filesystem. If recursive, removes all contents of the directory. Otherwise, raises OSError on non-empty directories

class redun.file.ContentDir(path: str)#

Bases: Dir

classes = <redun.file.ContentFileClasses object>#
type_basename = 'ContentDir'#
type_name: str | None = 'redun.ContentDir'#
class redun.file.ContentFile(path: str)#

Bases: File

Content-based file hashing.

classes = <redun.file.ContentFileClasses object>#
type_basename = 'ContentFile'#
type_name: str | None = 'redun.ConentFile'#
class redun.file.ContentFileClasses#

Bases: FileClasses

A grouping of related ContentFile classes.

class redun.file.ContentFileSet(pattern: str)#

Bases: FileSet

classes = <redun.file.ContentFileClasses object>#
type_basename = 'ContentFileSet'#
type_name: str | None = 'redun.CnotentFileSet'#
class redun.file.ContentStagingDir(local: T | str, remote: T | str)#

Bases: Staging[Dir]

classes = <redun.file.ContentFileClasses object>#
type_basename = 'ContentStagingDir'#
type_name: str | None = 'redun.ContentStagingDir'#
class redun.file.ContentStagingFile(local: File | str, remote: File | str)#

Bases: StagingFile

classes = <redun.file.ContentFileClasses object>#
type_basename = 'ContentStagingFile'#
type_name: str | None = 'redun.ContentStagingFile'#
class redun.file.Dir(path: str)#

Bases: FileSet

classes = <redun.file.FileClasses object>#
copy_to(dest_dir: Dir, skip_if_exists: bool = False) Dir#
exists() bool#
file(rel_path: str) File#
mkdir() None#
rel_path(path: str) str#
rmdir(recursive: bool = False) None#
shell_copy_to(dest_path: str, as_mount: bool = False) str#

Returns a shell command for copying the directory to a destination path.

Parameters:
  • dest_path (str) – Destination path to copy to.

  • as_mount (bool) – Treat src/dest as mounted.

stage(local: str | None = None) StagingDir#
type_basename = 'Dir'#
type_name: str | None = 'redun.Dir'#
class redun.file.FTPFileSystem#

Bases: FsspecFileSystem

FileSystem methods for a FTP.

name: str = 'ftp'#
class redun.file.File(path: str)#

Bases: Value

Class for assisting file IO in redun tasks.

File objects are hashed based on their contents and abstract over storage backends such as local disk or cloud object storage.

basename() str#
classes = <redun.file.FileClasses object>#
copy_to(dest_file: File, skip_if_exists: bool = False) File#
dirname() str#
exists() bool#
get_hash(data: bytes | None = None) str#

Returns a hash for the value. Users may override to provide custom behavior.

Most basic objects are covered by the default approach of hashing the binary pickle serialization.

property hash: str#
is_valid() bool#

Returns True if the value may be used. For ordinary values, this will typically always be true. For state-tracking values, the deserialized cache value may no longer be valid. If this method returns False, the object will typically be discarded.

This method may be called repeatedly on the same object.

For example, if the value was a File reference and the corresponding file on the filesystem has since been removed/altered, the File reference is no longer valid.

isdir() bool#
isfile() bool#
open(mode: str = 'r', encoding: str | None = None, **kwargs: Any) IO#

Open a file stream.

Parameters:
  • mode (str) – Stream mode for reading or writing (‘r’, ‘w’, ‘b’, ‘a’).

  • encoding (str) – Text encoding (e.g. ‘utf-8’) to use when read or writing.

  • **kwargs – Additional arguments for the underlying file stream. They are Filesystem-specific.

read(mode: str = 'r', encoding: str | None = None) str | bytes#
readlines(mode: str = 'r') List[str | bytes]#
remove() None#
shell_copy_to(dest_path: str | None, as_mount: bool = False) str#

Returns a shell command for copying the file to a destination path.

Parameters:
  • dest_path (Optional[str]) – Destination path to copy to. If None, use stdout.

  • as_mount (Optional[str]) – Copy files from mounted directories.

size() int#
stage(local: str | None = None) StagingFile#
touch(time: Tuple[int, int] | Tuple[float, float] | None = None) None#
type_basename = 'File'#
type_name: str | None = 'redun.File'#
update_hash() None#
write(data: str | bytes, mode: str = 'w', encoding: str | None = None) None#
class redun.file.FileClasses#

Bases: object

A grouping of related File classes.

Dir: Type[Dir]#
File: Type[File]#
FileSet: Type[FileSet]#
StagingDir: Type[StagingDir]#
StagingFile: Type[StagingFile]#
class redun.file.FileSet(pattern: str)#

Bases: Value

classes = <redun.file.FileClasses object>#
files() List[File]#
get_hash(data: bytes | None = None) str#

Returns a hash for the value. Users may override to provide custom behavior.

Most basic objects are covered by the default approach of hashing the binary pickle serialization.

property hash: str#
is_valid() bool#

Returns True if the value may be used. For ordinary values, this will typically always be true. For state-tracking values, the deserialized cache value may no longer be valid. If this method returns False, the object will typically be discarded.

This method may be called repeatedly on the same object.

For example, if the value was a File reference and the corresponding file on the filesystem has since been removed/altered, the File reference is no longer valid.

iter_subvalues() Iterator[Value]#

Iterates through the FileSet’s subvalues (Files).

type_basename = 'FileSet'#
type_name: str | None = 'redun.FileSet'#
update_hash() None#
class redun.file.FileSystem#

Bases: ABC

Base class filesystem access.

copy(src_path: str, dest_path: str) None#

Copy a file from src_path to dest_path.

exists(path: str) bool#

Returns True if path exists on filesystem.

filesize(path: str) int#

Returns file size of path in bytes.

get_hash(path: str) str#

Return a hash for the file at path.

glob(pattern: str) List[str]#

Returns filenames matching pattern.

isdir(path: str) bool#

Returns True if path is a directory.

isfile(path: str) bool#

Returns True if path is a file.

mkdir(path: str) None#

Creates the directory in the filesystem.

name: str = 'base'#
open(path: str, mode: str, encoding: str | None = None, **kwargs: Any) IO#

Open a file stream from the filesystem.

Parameters:
  • path (str) – Url or path of file to open.

  • mode (str) – Stream mode for reading or writing (‘r’, ‘w’, ‘b’, ‘a’).

  • encoding (str) – Text encoding (e.g. ‘utf-8’) to use when read or writing.

  • **kwargs – Additional arguments for the underlying file stream. They are Filesystem-specific.

remove(path: str) None#

Delete a path from the filesystem.

rmdir(path: str, recursive: bool = False) None#

Removes a directory from the filesystem. If recursive, removes all contents of the directory. Otherwise, raises OSError on non-empty directories

shell_copy(src_path: str | None, dest_path: str | None, recursive: bool = False, as_mount: bool = False) str#

Returns a shell command for performing a file copy.

Parameters:
  • src_path (Optional[str]) – Source path to copy from. If None, use stdin.

  • dest_path (Optional[str]) – Destination path to copy to. If None, use stdout.

  • recursive (bool) – If True, copy a directory tree of files.

  • as_mount (bool) – Treat src/dest as mounted.

touch(path: str, time: Tuple[int, int] | Tuple[float, float] | None = None) None#

Create the path on the filesystem with timestamp.

class redun.file.FsspecFileSystem#

Bases: FileSystem

copy(src_path: str, dest_path: str) None#

Copy a file from src_path to dest_path.

exists(path: str) bool#

Returns True if path exists on filesystem.

filesize(path: str) int#

Returns file size of path in bytes.

property fs#
get_hash(path: str) str#

Return a hash for the file at path.

glob(pattern: str) List[str]#

Returns filenames matching pattern.

isdir(path: str) bool#

Returns True if path is a directory.

isfile(path: str) bool#

Returns True if path is a file.

mkdir(path: str) None#

Creates the directory in the filesystem.

name: str = 'fsspec'#
remove(path: str) None#

Delete a path from the filesystem.

rmdir(path: str, recursive: bool = False) None#

Removes a directory from the filesystem. If recursive, removes all contents of the directory. Otherwise, raises OSError on non-empty directories

shell_copy(src_path: str | None, dest_path: str | None, recursive: bool = False, as_mount: bool = False) str#

Returns a shell command for performing a file copy.

Parameters:
  • src_path (Optional[str]) – Source path to copy from. If None, use stdin.

  • dest_path (Optional[str]) – Destination path to copy to. If None, use stdout.

  • recursive (bool) – If True, copy a directory tree of files.

  • as_mount (bool) – Treat src/dest as mounted.

class redun.file.GSFileSystem#

Bases: FsspecFileSystem

FileSystem methods for a Google Cloud Storage.

glob(pattern: str) List[str]#

Returns filenames matching pattern.

name: str = 'gs'#
shell_copy(src_path: str | None, dest_path: str | None, recursive: bool = False, as_mount: bool = False) str#

Returns a shell command for performing a file copy.

Parameters:
  • src_path (Optional[str]) – Source path to copy from. If None, use stdin.

  • dest_path (Optional[str]) – Destination path to copy to. If None, use stdout.

  • recursive (bool) – If True, copy a directory tree of files.

  • as_mount (bool) – Treat src/dest as mounted.

class redun.file.HTTPFileSystem#

Bases: FsspecFileSystem

FileSystem methods for a HTTP urls.

name: str = 'http'#
class redun.file.HTTPSFileSystem#

Bases: FsspecFileSystem

FileSystem methods for a HTTPS urls.

name: str = 'https'#
class redun.file.IDir(path: str)#

Bases: Dir

classes = <redun.file.IFileClasses object>#
type_basename = 'IDir'#
type_name: str | None = 'redun.IDir'#
class redun.file.IFile(path: str)#

Bases: File

Immutable file.

This class should be used for files that are write once and then immutable.

classes = <redun.file.IFileClasses object>#
is_valid() bool#

Returns True if the value may be used. For ordinary values, this will typically always be true. For state-tracking values, the deserialized cache value may no longer be valid. If this method returns False, the object will typically be discarded.

This method may be called repeatedly on the same object.

For example, if the value was a File reference and the corresponding file on the filesystem has since been removed/altered, the File reference is no longer valid.

type_basename = 'IFile'#
type_name: str | None = 'redun.IFile'#
class redun.file.IFileClasses#

Bases: FileClasses

A grouping of related IFile classes.

class redun.file.IFileSet(pattern: str)#

Bases: FileSet

classes = <redun.file.IFileClasses object>#
is_valid() bool#

Returns True if the value may be used. For ordinary values, this will typically always be true. For state-tracking values, the deserialized cache value may no longer be valid. If this method returns False, the object will typically be discarded.

This method may be called repeatedly on the same object.

For example, if the value was a File reference and the corresponding file on the filesystem has since been removed/altered, the File reference is no longer valid.

type_basename = 'IFileSet'#
type_name: str | None = 'redun.IFileSet'#
class redun.file.IStagingDir(local: T | str, remote: T | str)#

Bases: Staging[Dir]

classes = <redun.file.IFileClasses object>#
type_basename = 'IStagingDir'#
type_name: str | None = 'redun.IStagingDir'#
class redun.file.IStagingFile(local: File | str, remote: File | str)#

Bases: StagingFile

classes = <redun.file.IFileClasses object>#
type_basename = 'IStagingFile'#
type_name: str | None = 'redun.IStagingFile'#
class redun.file.LocalFileSystem#

Bases: FileSystem

FileSystem methods for a local POSIX filesystem.

copy(src_path: str, dest_path: str) None#

Copy a file from src_path to dest_path.

exists(path: str) bool#

Returns True if path exists on filesystem.

filesize(path: str) int#

Returns file size of path in bytes.

get_hash(path: str) str#

Return a hash for the file at path.

glob(pattern: str) List[str]#

Returns filenames matching pattern.

isdir(path: str) bool#

Returns True if path is a directory.

isfile(path: str) bool#

Returns True if path is a file.

mkdir(path: str) None#

Creates the directory in the filesystem.

name: str = 'local'#
remove(path: str) None#

Delete a path from the filesystem.

rmdir(path: str, recursive: bool = False) None#

Removes a directory from the filesystem. If recursive, removes all contents of the directory. Otherwise, raises OSError on non-empty directories

shell_copy(src_path: str | None, dest_path: str | None, recursive: bool = False, as_mount: bool = False) str#

Returns a shell command for performing a file copy.

Parameters:
  • src_path (Optional[str]) – Source path to copy from. If None, use stdin.

  • dest_path (Optional[str]) – Destination path to copy to. If None, use stdout.

  • recursive (bool) – If True, copy a directory tree of files.

  • as_mount (bool) – Treat src/dest as mounted.

touch(path: str, time: Tuple[int, int] | Tuple[float, float] | None = None) None#

Create the path on the filesystem with timestamp.

exception redun.file.RedunFileNotFoundError(path: str, *args)#

Bases: FileNotFoundError

Redun-specific FileNotFoundError.

exception redun.file.RedunOSError(path: str, *args)#

Bases: OSError

Redun-specific OSError.

exception redun.file.RedunPermissionError(path: str, *args)#

Bases: PermissionError

Redun-specific PermissionError.

class redun.file.S3FileSystem#

Bases: FileSystem

FileSystem methods for a AWS S3.

copy(src_path: str, dest_path: str) None#

Copy a file from src_path to dest_path.

exists(path: str) bool#

Returns True if path exists in filesystem.

filesize(path: str) int#

Returns file size of path in bytes.

get_hash(path: str) str#

Return a hash for the file at path.

glob(pattern: str) List[str]#

Returns filenames matching pattern.

isdir(path: str) bool#

Returns True if path is a directory.

isfile(path: str) bool#

Returns True if path is a file.

mkdir(path: str) None#

Creates the directory in the filesystem.

name: str = 's3'#
remove(path: str) None#

Delete a path from the filesystem.

rmdir(path: str, recursive: bool = False) None#

Removes a directory from the filesystem. If recursive, removes all contents of the directory. Otherwise, raises OSError on non-empty directories

property s3: S3FileSystem#
property s3_raw: Any#
shell_copy(src_path: str | None, dest_path: str | None, recursive: bool = False, as_mount: bool = False) str#

Returns a shell command for performing a file copy.

Parameters:
  • src_path (Optional[str]) – Source path to copy from. If None, use stdin.

  • dest_path (Optional[str]) – Destination path to copy to. If None, use stdout.

  • recursive (bool) – If True, copy a directory tree of files.

  • as_mount (bool) – Treat src/dest as mounted.

class redun.file.ShardedS3Dataset(path: str, format: str = 'parquet', recurse: bool = True)#

Bases: Value

A sharded dataset on S3. “Sharded” means a collection of files that when concatenated comprise the complete dataset. Several formats are supported but parquet is the best tested with redun to date due to the quality of its integration with AWS services and because it allows reading of only portions of the dataset.

The hash of the S3ShardedDataset is just the hash of the sorted list of files in the dataset. So changing the files included (such as with recurse=True), adding or removing files, or doing some kind of dataset write operation (which creates new shards) cause the hash of the dataset to change. This does not recognize individual shards from being altered by other code, however.

property filenames: List[str]#
property format: str#
classmethod from_data(dataset: pandas.DataFrame | pyspark.sql.DataFrame, output_path: str, format: str = 'parquet', partition_keys: List[str] = [], catalog_database: str = 'default', catalog_table: str | None = None, format_options: Dict[str, Any] = {}) ShardedS3Dataset#

Helper function to create a ShardedS3Dataset from an existing DataFrame-like object.

Parameters:
  • dataset (Union[pandas.DataFrame, pyspark.sql.DataFrame]) – Dataset to save

  • output_path (str) – Path on S3 to which data will be saved as multiple files of format format.

  • format (str) – Format to save the data in. Supported formats are: [“avro”, “csv”, “ion”, “grokLog”, “json”, “orc”, “parquet”, “xml”] Defaults to parquet.

  • partition_keys (List[str]) – Dataset keys to partition on. Each key will be a subdirectory in self.path containing data for each value of that key. For example, partition on the column ‘K’, will make subdirectores ‘K=1’, ‘K=2’, ‘K=3’, etc.

  • catalog_database (str) – Datacatalog name to write to, if creating a table in the Data Catalog. Defaults to ‘default’

  • catalog_table (Optional[str]) – If present, written data will be available in AWS Data Catalog / Glue / Athena with the indicated table name.

  • format_options (Dict[str, Any]) – Additional options for the data loader. Documented here: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format.html

classmethod from_files(files: List[File], format: str | None = None, allow_additional_files: bool = False) ShardedS3Dataset#

Helper function to create a ShardedS3Dataset from a list of redun Files. This can be helpful for provenance tracking from methods that return multiple files and the “root directory” is wanted for later use.

If a dataset cannot be created that contains all the files, such as being in different S3 prefixes, a ValueError is raised. If files do not all have the same format, a ValueError will be raised.

Parameters:
  • files (List[File]) – Files that should be included in the dataset.

  • format (Optional[str]) – If specified, files are considered to be in this format.

  • allow_additional_files (bool) – If True, allows the resulting ShardedS3Dataset to contain files not in files. If False, a ValueError will be raised if a ShardedS3Dataset cannot be constructed.

Raises:

ValueError – If a dataset could not be created from the given files.

get_hash(data: bytes | None = None) str#

Returns a hash for the value. Users may override to provide custom behavior.

Most basic objects are covered by the default approach of hashing the binary pickle serialization.

property hash: str#
is_valid() bool#

Returns True if the value may be used. For ordinary values, this will typically always be true. For state-tracking values, the deserialized cache value may no longer be valid. If this method returns False, the object will typically be discarded.

This method may be called repeatedly on the same object.

For example, if the value was a File reference and the corresponding file on the filesystem has since been removed/altered, the File reference is no longer valid.

iter_subvalues() Iterator[Value]#

Iterates through the Value’s subvalues.

load_pandas(max_shards: int = -1) pandas.DataFrame#

Loads the ShardedS3Dataset as a Pandas DataFrame.

Parameters:

max_shards (int) – Maximum number of shards to load. If -1, will load all of them.

Returns:

All

Return type:

pandas.DataFrame

load_pandas_shards(max_shards: int = -1) List[pandas.DataFrame]#

Loads the ShardedS3Dataset as a list of Pandas DataFrames. This is deterministic and will load the shards in the same order every time.

Parameters:

max_shards (int) – Maximum number of shards to load. If -1 (default), will load all shards.

Returns:

Loaded shards, one per entry in list. Shards

Return type:

List[pandas.DataFrame]

load_spark(validate: bool = False, format_options: Dict[str, Any] = {}) pyspark.sql.DataFrame#

Loads the ShardedS3Dataset as a Spark DataFrame. Must be running in a Spark context.

Parameters:
Returns:

pyspark.sql.DataFrame

Return type:

loaded dataset

property path: str#
postprocess(postprocess_args) ShardedS3Dataset#

Post process a value resulting from a Task, and return the postprocessed value.

This is a scheduler lifecycle method and is not typically invoked by users. This method may be called repeatedly on a particular instance, for example, if it is returned recursively.

Parameters:

postprocess_args (dict) – Extra data provided by the scheduler. Implementations that are not part of redun should discard this argument.

purge_spark(remove_older_than: int = 1, manifest_file_path: str | None = None) None#

Recursively removes all files older than remove_older_than hours. Defaults to 1 hour. Optionally writes removed files to manifest_file_path/Success.csv

property recurse: bool#
save_spark(dataset: pandas.DataFrame | pyspark.sql.DataFrame, partition_keys: List[str] = [], catalog_database: str = 'default', catalog_table: str | None = None, format_options: Dict[str, Any] = {}) None#

Writes a pandas or spark DataFrame to the given path in the given format, optionally partitioning on dataset keys. Must be done from a spark environment.

Parameters:
  • dataset (Union[pandas.DataFrame, pyspark.sql.DataFrame]) – Dataset to save

  • partition_keys (List[str]) – Dataset keys to partition on. Each key will be a subdirectory in self.path containing data for each value of that key. For example, partition on the column ‘K’, will make subdirectores ‘K=1’, ‘K=2’, ‘K=3’, etc.

  • catalog_database (str) – Datacatalog name to write to, if creating a table in the Data Catalog. Defaults to ‘default’

  • catalog_table (Optional[str]) – If present, written data will be available in AWS Data Catalog / Glue / Athena with the indicated table name.

  • format_options (Dict[str, Any]) – Additional options for the data loader. Documented here: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format.html

type_name: str | None = 'redun.ShardedS3Dataset'#
update_hash() None#
class redun.file.Staging(local: T | str, remote: T | str)#

Bases: Value, Generic[T]

classmethod parse_arg(raw_type: type, arg: str) Any#

Parse a command line argument in a new Value.

render_stage(as_mount: bool = False) str#
render_unstage(as_mount: bool = False) str#
stage() T#
type_name: str | None = 'redun.file.Staging'#
unstage() T#
class redun.file.StagingDir(local: Dir | str, remote: Dir | str)#

Bases: Staging[Dir]

classes = <redun.file.FileClasses object>#
get_hash(data: bytes | None = None) str#

Returns a hash for the value. Users may override to provide custom behavior.

Most basic objects are covered by the default approach of hashing the binary pickle serialization.

render_stage(as_mount: bool = False) str#

Returns a shell command for staging a directory.

render_unstage(as_mount: bool = False) str#

Returns a shell command for unstaging a directory.

stage() Dir#
type_basename = 'StagingDir'#
type_name: str | None = 'redun.StagingDir'#
unstage() Dir#
class redun.file.StagingFile(local: File | str, remote: File | str)#

Bases: Staging[File]

classes = <redun.file.FileClasses object>#
get_hash(data: bytes | None = None) str#

Returns a hash for the value. Users may override to provide custom behavior.

Most basic objects are covered by the default approach of hashing the binary pickle serialization.

render_stage(as_mount: bool = False) str#

Returns a shell command for staging a file.

render_unstage(as_mount: bool = False) str#

Returns a shell command for unstaging a file.

stage() File#
type_basename = 'StagingFile'#
type_name: str | None = 'redun.StagingFile'#
unstage() File#
redun.file.copy_file(src_path: str | None, dest_path: str | None, recursive: bool = False) None#

Copy a file. Files can be from different filesystems.

Parameters:
  • src_path (Optional[str]) – Source path to copy from. If None, use stdin.

  • dest_path (Optional[str]) – Destination path to copy to. If None, use stdout.

  • recursive (bool) – If True, copy a directory tree of files.

redun.file.get_filesystem(proto: str | None = None, url: str | None = None) FileSystem#

Returns the corresponding FileSystem for a given url or protocol.

redun.file.get_filesystem_class(proto: str | None = None, url: str | None = None) Type[FileSystem]#

Returns the corresponding FileSystem class for a given url or protocol.

redun.file.get_proto(url: str | None = None) str#

Returns the protocol for a url.

For example the protocol for ‘http://example.com’ is ‘http’. Local paths ‘/path/to/my/file’ have the protocol ‘local’.

redun.file.glob_file(pattern: str) List[str]#
redun.file.list_filesystems() List[Type[FileSystem]]#

Returns list of supported filesystems.

redun.file.open_file(url: str, mode: str = 'r', encoding: str | None = None, **kwargs: Any) IO#

Open a file stream.

Parameters:
  • url (str) – Url or path of file to open.

  • mode (str) – Stream mode for reading or writing (‘r’, ‘w’, ‘b’, ‘a’).

  • encoding (str) – Text encoding (e.g. ‘utf-8’) to use when read or writing.

  • **kwargs – Additional arguments for the underlying file stream. They are Filesystem-specific.

redun.file.register_filesystem(cls: Type[FileSystem]) Type[FileSystem]#

redun.functools module#

(task)redun.functools.apply_func(func: Callable[..., T], *args: Any, **kwargs: Any) T#

Apply a Python function to possibly lazy arguments.

The function func must be defined at the module-level.

lazy_list = a_task_returning_a_list()
lazy_length = apply_func(len)(lazy_list)
redun.functools.as_task(func: Callable[[...], T]) Task[Callable[[...], T]]#

Transform a plain Python function into a redun Task.

The function func must be defined at the module-level.

assert as_task(max)(10, 20, 30, 25) == 30
redun.functools.compose(*tasks: Task) Task#

Compose Tasks together.

(task)redun.functools.compose_apply(tasks: Sequence[redun.task.Task], *args: Any, **kwargs: Any) Any#

Helper function for applying a composition of Tasks to arguments.

(task)redun.functools.const(x: T, _: Any) T#

Returns the first argument unchanged (constant) and discards the second.

This is useful for forcing execution of the second argument, but discarding it’s value from downstream parts of the workflow.

redun.functools.delay(x: T) Task[Callable[[], T]]#

Delay the evaluation of a value x.

The name delay() is inspired from other programming languages: - http://web.mit.edu/scheme_v9.2/doc/mit-scheme-ref/Promises.html - http://people.cs.aau.dk/~normark/prog3-03/html/notes/eval-order_themes-delay-stream-section.html # noqa: E501 - https://docs.racket-lang.org/reference/Delayed_Evaluation.html

(task)redun.functools.eval_(code: str, *args: Any, **kwargs: Any) Any#

Evaluate code with kwargs as local variables.

If a position argument is given, its value is assigned to local variables defined by pos_args.

This can be useful for small manipulations of lazy values.

records = task1()
names = eval_("[record.name for record in records]", records=records)
result = task2(names)
(task)redun.functools.flat_map(a_task: redun.task.Task[Callable[..., List[T]]], values: List) List[T]#

Apply a task a_task on a sequence of values and flatten the result.

(task)redun.functools.flatten(list_of_lists: Sequence[Sequence[T]]) List[T]#

Flatten a list of lists into a flat list.

redun.functools.force(x: Task[Callable[[], T]]) T#

Force the evaluation of a delayed evaluation.

The name force() is inspired from other programming languages: - http://web.mit.edu/scheme_v9.2/doc/mit-scheme-ref/Promises.html - http://people.cs.aau.dk/~normark/prog3-03/html/notes/eval-order_themes-delay-stream-section.html # noqa: E501 - https://docs.racket-lang.org/reference/Delayed_Evaluation.html

(task)redun.functools.identity(x: T) T#

Returns its input argument.

(scheduler_task)redun.functools.map_(a_task: redun.task.Task, values: Sequence[Any]) T#

Map a task to a list of values, similar to map(f, xs).

(scheduler_task)redun.functools.seq(exprs: Sequence[Any]) T#

Evaluate the expressions serially.

The name seq() is inspired from Haskell: - https://wiki.haskell.org/Seq

redun.functools.starmap(a_task: Task[Callable[[...], T]], kwargs: List[Dict] | Expression[List[Dict]] = []) List[T]#

Map a task to a list of keyword arguments.

(task)redun.functools.zip_(*lists: List[T]) List[Tuple[T, ...]]#

Zips two or lists into a list of tuples.

This is a task equivalent of zip().

redun.glue module#

Helper functions for glue jobs.

Glue related imports are kept isolated, so this can be imported people’s local machines without issue, and then they call the functions that will run when the context is defined.

redun.glue.clean_datacatalog_table(database: str, table: str, remove_older_than: int = 1) None#

Removes records and files from a datacatalog table older than remove_older_than hours. Underlying data on S3 will be deleted, too.

redun.glue.get_glue_context() GlueContext#

Returns the current glue context

redun.glue.get_num_workers() int#

Returns the number of workers in the current Spark context.

redun.glue.get_spark_context() SparkContext#

Returns the current spark context.

redun.glue.get_spark_session() SparkSession#
redun.glue.load_datacatalog_spark_dataset(database: str, table_name: str, hashfield: str | None = None) DataFrame#

Loads a dataset from DataCatalog

redun.glue.setup_glue_job(job_name: str, job_args: List[str]) GlueJob#
redun.glue.sql_query(dataset: DataFrame, query: str, dataset_alias: str = 'dataset') DataFrame#

Runs a SQL-style query on a Spark DataFrame.

Parameters:
  • dataset (pyspark.sql.DataFrame) – The dataset to query.

  • query (str) – SQL query string.

  • dataset_alias (str) – Name for dataset in SQL context. Defaults to “dataset”. The SQL query should reference the alias like “SELECT * FROM {dataset_alias}”

redun.glue.udf(wrapped_func: Callable = None, return_type: DataType | None = None) Callable#

Creates a spark user defined function. Wraps pyspark.sql.functions.udf so spark context is only needed at runtime rather than redun task expression time.

redun.handle module#

class redun.handle.Handle(name: str | None = None, *args, namespace: str | None = None, **kwargs)#

Bases: Value

A Value that opaquely accumulates state as it passes through Task`s. This subclass of `Value has deep support from the scheduler and its backend, in order to provide a foundation for encapsulating interactions with a deterministic, stateful, external system, such as a database.

This model of state supports being initialized, then opaque transformations as Task`s are applied. The hash data and serialization data is designed to be opaque to the actual data held within the handle. At initialization time, only the constructor data is captured for serialization and hashing. Upon transformation (that is, when a `Handle is returned by a Task) the hash is replaced with the hash identifying the Task invocation, including both code and inputs.

See the design documents for more discussion of the principles behind the design: docs/source/design.md.

In general use, users should not alter the serialization methods.

class HandleInfo(name: str, args: Tuple, kwargs: dict, class_name: str, namespace: str | None = None, call_hash: str = '', hash: str | None = None, key: str = '')#

Bases: object

The fullname is an identification key for the handle. It is used globally across the redun backend to identify other potential instances of this Handle. However, hashes are used, as usual, to perform equality-like tests.

apply_call(handle: Handle, call_hash: str) Handle#
clone(handle: Handle) Handle#
fork(handle: Handle, key: str) Handle#
get_hash() str#

Returns hash of the handle.

Implementation note: the Handle state model requires that the computation proceeds differently for new Handles and ones that are computed by other tasks.

get_state() dict#

Returns serializable state dict.

update_hash() None#
apply_call(call_hash: str) Handle#

Returns a new Handle derived from this one assuming passage through a call with call_hash.

fork(key: str) Handle#

Forks the handle into a second one for use in parallel tasks. A key must be provided to differentiate the fork from the original (although the original may have a key)

get_hash(data: bytes | None = None) str#

Returns a hash of the handle.

is_valid() bool#

Returns True if handle is still valid (i.e., has not been rolled back).

postprocess(postprocess_args: dict) Handle#

Applies the call_hash to the handle as it returns from a task.

preprocess(preprocess_args: dict) Handle#

Forks a handle as it passes into a task.

type_name: str | None = 'redun.Handle'#
redun.handle.get_fullname(namespace: str | None, name: str) str#

Constructs a fullname from a namespace and a name.

redun.handle.get_handle_class(handle_class_name: str) Type[Handle]#

Returns a Handle class from the TypeRegistry.

redun.hashing module#

class redun.hashing.Hash(length=40)#

Bases: object

A convenience class for creating hashes.

hexdigest() str#
update(data)#
redun.hashing.hash_arguments(type_registry: TypeRegistry, args: Sequence, kwargs: dict)#

Hash the arguments for a Task call.

redun.hashing.hash_bytes(bytes: bytes) str#

Hash a byte sequence.

redun.hashing.hash_call_node(task_hash: str, args_hash: str, result_hash: str, child_call_hashes: List[str]) str#

Calculates the call_hash for a CallNode.

redun.hashing.hash_eval(type_registry: TypeRegistry, task_hash: str, args: Sequence, kwargs: dict) Tuple[str, str]#

Hash Task evaluation and arguments.

redun.hashing.hash_kwargs(type_registry: TypeRegistry, kwargs: Dict[str, Any]) Dict[str, str]#

Hash a list of arguments.

redun.hashing.hash_positional_args(type_registry: TypeRegistry, args: Sequence) List[str]#

Hash a list of arguments.

redun.hashing.hash_stream(stream: IO, block_size: int = 1024) str#

Hash a stream of bytes.

redun.hashing.hash_struct(struct: Any) str#

Hash a structure by using canonical serialization using bencode.

redun.hashing.hash_tag(entity_id: str, key: str, value: Any, parents: List[str]) str#

Hash a CallGraph Tag.

redun.hashing.hash_tag_bytes(tag: str, bytes: bytes) str#

Hash a tag followed by a byte sequence.

redun.hashing.hash_text(text: str) str#

Returns the hash for a string.

redun.job_array module#

class redun.job_array.JobArrayer(submit_jobs: Callable[[List[Job]], None], on_error: Callable[[Exception], None], submit_interval: float, stale_time: float, min_array_size: int, max_array_size: int = 10000)#

Bases: object

A ‘sink’ for submitted jobs that detects when jobs can be submitted as an array job. Eligible jobs will have the same task, task options, and resource requirements, but can differ in the args and kwargs passed to the task.

The method uses “staleness” of submitted jobs in the ‘sink’. Eligible jobs are grouped in the pool (really a dict). Grouped jobs that haven’t had new ones added to the group for stale_time sec will be submitted.

Parameters:
  • submit_jobs (Callable[[List[Job]], None]) – Callback called when a group of jobs are ready to be submitted as an array.

  • on_error (Callable[[Exception], None]) – Callback called when there is a top-level error.

  • submit_interval (float) – How frequently the monitor thread will check for submittable stale jobs. Should be less than stale_time, ideally.

  • stale_time (float) – Job groupings that haven’t had new jobs added for this many seconds will be submitted.

  • min_array_size (int) – Minimum number of jobs in a group to be submitted as an array job instead of individual jobs. Can be anywhere from 2 to MAX_ARRAY_SIZE-1.

  • max_array_size (int) – Maximum number of jobs that can be submitted as an array job. Must be in (min_array_size, MAX_ARRAY_SIZE].

add_job(job: Job) None#

Adds a new job

get_stale_descrs() List[JobDescription]#

Submits jobs that haven’t been touched in a while

start() None#
stop() None#
submit_pending_jobs(descr: JobDescription) None#
class redun.job_array.JobDescription(job: Job)#

Bases: object

redun.job_array.get_job_array_index(env: dict = environ({'GITHUB_STATE': '/home/runner/work/_temp/_runner_file_commands/save_state_9fb2b74b-3aee-401a-b34e-c3cecd84c0a0', 'STATS_TRP': 'true', 'DOTNET_NOLOGO': '1', 'DEPLOYMENT_BASEPATH': '/opt/runner', 'USER': 'runner', 'CI': 'true', 'GITHUB_ENV': '/home/runner/work/_temp/_runner_file_commands/set_env_9fb2b74b-3aee-401a-b34e-c3cecd84c0a0', 'PIPX_HOME': '/opt/pipx', 'RUNNER_ENVIRONMENT': 'github-hosted', 'JAVA_HOME_8_X64': '/usr/lib/jvm/temurin-8-jdk-amd64', 'SHLVL': '1', 'OLDPWD': '/home/runner/work/redun/redun', 'HOME': '/home/runner', 'RUNNER_TEMP': '/home/runner/work/_temp', 'GITHUB_EVENT_PATH': '/home/runner/work/_temp/_github_workflow/event.json', 'GITHUB_REPOSITORY_OWNER': 'insitro', 'JAVA_HOME_11_X64': '/usr/lib/jvm/temurin-11-jdk-amd64', 'PIPX_BIN_DIR': '/opt/pipx_bin', 'STATS_RDCL': 'true', 'ANDROID_NDK_LATEST_HOME': '/usr/local/lib/android/sdk/ndk/26.2.11394342', 'GRADLE_HOME': '/usr/share/gradle-8.6', 'GITHUB_RETENTION_DAYS': '90', 'JAVA_HOME_21_X64': '/usr/lib/jvm/temurin-21-jdk-amd64', 'POWERSHELL_DISTRIBUTION_CHANNEL': 'GitHub-Actions-ubuntu22', 'GITHUB_HEAD_REF': '', 'GITHUB_REPOSITORY_OWNER_ID': '41341906', 'AZURE_EXTENSION_DIR': '/opt/az/azcliextensions', 'MAKEFLAGS': '', 'SYSTEMD_EXEC_PID': '595', 'GITHUB_GRAPHQL_URL': 'https://api.github.com/graphql', 'NVM_DIR': '/home/runner/.nvm', 'GOROOT_1_20_X64': '/opt/hostedtoolcache/go/1.20.14/x64', 'DOTNET_SKIP_FIRST_TIME_EXPERIENCE': '1', 'JAVA_HOME_17_X64': '/usr/lib/jvm/temurin-17-jdk-amd64', 'GOROOT_1_21_X64': '/opt/hostedtoolcache/go/1.21.7/x64', 'ImageVersion': '20240225.1.0', 'RUNNER_OS': 'Linux', 'GITHUB_API_URL': 'https://api.github.com', 'SWIFT_PATH': '/usr/share/swift/usr/bin', 'GOROOT_1_22_X64': '/opt/hostedtoolcache/go/1.22.0/x64', 'RUNNER_USER': 'runner', 'CHROMEWEBDRIVER': '/usr/local/share/chromedriver-linux64', '_': '/usr/bin/make', 'JOURNAL_STREAM': '8:19322', 'GITHUB_WORKFLOW': 'sphinx-docs', 'STATS_V3PS': 'true', 'GITHUB_RUN_ID': '8161889770', 'ACTIONS_RUNNER_ACTION_ARCHIVE_CACHE': '/opt/actionarchivecache', 'GITHUB_WORKFLOW_SHA': '8c61f27b1f4393747fa7cbdab7cb4beb63d3fb09', 'BOOTSTRAP_HASKELL_NONINTERACTIVE': '1', 'GITHUB_REF_TYPE': 'branch', 'ImageOS': 'ubuntu22', 'GITHUB_BASE_REF': '', 'GITHUB_ACTION_REPOSITORY': '', 'PERFLOG_LOCATION_SETTING': 'RUNNER_PERFLOG', 'GITHUB_WORKFLOW_REF': 'insitro/redun/.github/workflows/publish-docs.yml@refs/heads/main', 'PATH': '/snap/bin:/home/runner/.local/bin:/opt/pipx_bin:/home/runner/.cargo/bin:/home/runner/.config/composer/vendor/bin:/usr/local/.ghcup/bin:/home/runner/.dotnet/tools:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin', 'RUNNER_TRACKING_ID': 'github_487e0ea0-2aeb-4115-972e-83f5411c910c', 'DOTNET_MULTILEVEL_LOOKUP': '0', 'INVOCATION_ID': '6505af83fa2b4505a15b61beb8052945', 'RUNNER_TOOL_CACHE': '/opt/hostedtoolcache', 'ANT_HOME': '/usr/share/ant', 'GITHUB_TRIGGERING_ACTOR': 'mattrasmus', 'GITHUB_RUN_NUMBER': '54', 'RUNNER_ARCH': 'X64', 'XDG_RUNTIME_DIR': '/run/user/1001', 'AGENT_TOOLSDIRECTORY': '/opt/hostedtoolcache', 'GITHUB_ACTION': '__run', 'MAKELEVEL': '1', 'LANG': 'C.UTF-8', 'VCPKG_INSTALLATION_ROOT': '/usr/local/share/vcpkg', 'RUNNER_NAME': 'GitHub Actions 228', 'GITHUB_REF_NAME': 'main', 'XDG_CONFIG_HOME': '/home/runner/.config', 'CONDA': '/usr/share/miniconda', 'STATS_VMD': 'true', 'GITHUB_REPOSITORY': 'insitro/redun', 'STATS_UE': 'true', 'GITHUB_ACTION_REF': '', 'ANDROID_NDK_ROOT': '/usr/local/lib/android/sdk/ndk/25.2.9519653', 'DEBIAN_FRONTEND': 'noninteractive', 'GITHUB_REPOSITORY_ID': '424419603', 'GITHUB_ACTIONS': 'true', 'GITHUB_REF_PROTECTED': 'true', 'ACCEPT_EULA': 'Y', 'RUNNER_PERFLOG': '/home/runner/perflog', 'GITHUB_JOB': 'build', 'GITHUB_WORKSPACE': '/home/runner/work/redun/redun', 'GITHUB_SHA': '8c61f27b1f4393747fa7cbdab7cb4beb63d3fb09', 'GITHUB_RUN_ATTEMPT': '1', 'GITHUB_REF': 'refs/heads/main', 'ANDROID_SDK_ROOT': '/usr/local/lib/android/sdk', 'GITHUB_ACTOR': 'mattrasmus', 'LEIN_HOME': '/usr/local/lib/lein', 'JAVA_HOME': '/usr/lib/jvm/temurin-11-jdk-amd64', 'PWD': '/home/runner/work/redun/redun/docs', 'RUNNER_WORKSPACE': '/home/runner/work/redun', 'GITHUB_ACTOR_ID': '56042231', 'GITHUB_PATH': '/home/runner/work/_temp/_runner_file_commands/add_path_9fb2b74b-3aee-401a-b34e-c3cecd84c0a0', 'GHCUP_INSTALL_BASE_PREFIX': '/usr/local', 'GITHUB_EVENT_NAME': 'push', 'GITHUB_SERVER_URL': 'https://github.com', 'STATS_TIS': 'mining', 'ANDROID_HOME': '/usr/local/lib/android/sdk', 'LEIN_JAR': '/usr/local/lib/lein/self-installs/leiningen-2.11.2-standalone.jar', 'GECKOWEBDRIVER': '/usr/local/share/gecko_driver', 'HOMEBREW_CLEANUP_PERIODIC_FULL_DAYS': '3650', 'GITHUB_OUTPUT': '/home/runner/work/_temp/_runner_file_commands/set_output_9fb2b74b-3aee-401a-b34e-c3cecd84c0a0', 'HOMEBREW_NO_AUTO_UPDATE': '1', 'EDGEWEBDRIVER': '/usr/local/share/edge_driver', 'STATS_EXT': 'true', 'SGX_AESM_ADDR': '1', 'CHROME_BIN': '/usr/bin/google-chrome', 'MFLAGS': '', 'ANDROID_NDK': '/usr/local/lib/android/sdk/ndk/25.2.9519653', 'SELENIUM_JAR_PATH': '/usr/share/java/selenium-server.jar', 'STATS_EXTP': 'https://provjobdsettingscdn.blob.core.windows.net/settings/provjobdsettings-0.5.154/provjobd.data', 'ANDROID_NDK_HOME': '/usr/local/lib/android/sdk/ndk/25.2.9519653', 'GITHUB_STEP_SUMMARY': '/home/runner/work/_temp/_runner_file_commands/step_summary_9fb2b74b-3aee-401a-b34e-c3cecd84c0a0', 'DOCUTILSCONFIG': '/home/runner/work/redun/redun/docs/source/docutils.conf'})) int | None#

Finds any Job Array Index environment variables and returns the index.

redun.logging module#

redun.namespace module#

redun.namespace.compute_namespace(obj: Any, namespace: str | None = None)#

Compute the namespace for the provided object.

Precedence: - Explicit namespace provided (note: an empty string is a valid explicit value) - Infer it from a redun_namespace variable in the same module as func - The current namespace, configured with set_current_namespace

WARNING: This computation interacts with the global “current namespace” (see above), so results may be context dependent.

redun.namespace.get_current_namespace()#

Returns the current task namespace during import.

redun.namespace.namespace(_namespace=None)#

Set the current task namespace.

Use None to unset the current namespace.

redun.promise module#

class redun.promise.Promise(func: Callable | None = None)#

Bases: Generic[T]

A light-weight single-thread implementation of Promise.

We specifically are careful to remove references as soon as possible to enable garbage collection.

A Promise can only be resolved or rejected once. Callbacks will be called at most once. Callbacks can be added both before and after the resolution.

classmethod all(subpromises: Sequence[Promise[T]]) Promise[List[T]]#

Return a promise that waits for all subpromises to resolve.

catch(rejector: Callable[[Exception], S]) Promise[S]#

Register an error callback.

do_reject(error: Exception) Exception#

Reject the promise with an error.

do_resolve(result: T) T#

Resolve the promise to a result.

property error: Exception#

Returns final error.

then(resolver: Callable[[T], S] | None = None, rejector: Callable[[Exception], S] | None = None) Promise[S]#

Register callbacks to the promise.

property value: T#

Returns final result or error.

redun.promise.wait_promises(subpromises: List[Promise[T]]) Promise[List[Promise[T]]]#

Wait for all promises to finish (either fulfill or reject).

redun.pytest module#

redun.scheduler module#

exception redun.scheduler.DryRunResult#

Bases: Exception

class redun.scheduler.ErrorValue(error: Exception, traceback: Traceback | None = None)#

Bases: Value

Value for wrapping Exceptions raised by Task.

get_hash(data: bytes | None = None) str#

Returns a hash for the value. Users may override to provide custom behavior.

Most basic objects are covered by the default approach of hashing the binary pickle serialization.

type_name: str | None = 'redun.ErrorValue'#
class redun.scheduler.Execution(id: str | None = None, context: dict | None = None)#

Bases: object

An Execution tracks the execution of a workflow of :class:`redun.task.Task`s.

add_job(job: Job) None#
class redun.scheduler.Frame(filename: str, lineno: int, name: str, locals: Dict[str, Any], lookup_line: bool = True, line: str | None = None, job: Job | None = None)#

Bases: FrameSummary, Value

Frame of a Traceback for Job failure.

filename#
lineno#
locals#
name#
type_name: str | None = 'redun.Frame'#
class redun.scheduler.Job(task: Task, expr: TaskExpression, id: str | None = None, parent_job: Job | None = None, execution: Execution | None = None, options: dict | None = None)#

Bases: object

A Job tracks the execution of a redun.task.Task through its various stages.

STATUSES = ['PENDING', 'RUNNING', 'FAILED', 'CACHED', 'DONE', 'TOTAL']#
add_parent(parent_job: Job) None#

Maintains the Job tree but connecting the job with a parent job.

clear()#

Free execution state from Job.

collapse(other_job: Job) None#

Collapse this Job into other_job.

This method is used when equivalent Jobs are detected, and we want to perform Common Subexpression Elimination (CSE).

get_context() dict#

Returns the context variables for the Job.

get_limits() Dict[str, int]#

Returns resource limits required for this job to run.

get_option(key: str, default: Any | None = None, as_type: Type | None = None) Any#

Returns a task option associated with a Job.

Precedence is given to task options defined at call-time (e.g. task.options(option=foo)(arg1, arg2)) over task definition-time (e.g. @task(option=foo)).

get_options() dict#

Returns task options for this job.

Precedence is given to task options defined at call-time (e.g. task.options(option=foo)(arg1, arg2)) over task definition-time (e.g. @task(option=foo)).

get_raw_option(key: str, default: Any | None = None, as_type: Type | None = None) Any#

Returns an evaluated task option associated with a Job.

Precedence is given to task options defined at call-time (e.g. task.options(option=foo)(arg1, arg2)) over task definition-time (e.g. @task(option=foo)).

get_raw_options() dict#

Returns the unevaluated task options for this job.

Precedence is given to task options defined at call-time (e.g. task.options(option=foo)(arg1, arg2)) over task definition-time (e.g. @task(option=foo)).

reject(error: Any) None#

Rejects a Job with an error exception.

resolve(result: Any) None#

Resolves a Job with a final concrete value, result.

property status: str#
class redun.scheduler.JobEnv(job: Job, context: dict)#

Bases: Job

Job environment.

Customize a Job with a specific context dict.

get_context() dict#

Returns the context variables for the Job.

exception redun.scheduler.NoCurrentScheduler#

Bases: Exception

class redun.scheduler.Scheduler(config: Config | None = None, backend: RedunBackend | None = None, executor: Executor | None = None, logger: Any | None = None, use_task_traceback: bool = True, job_status_interval: int | None = None, migrate: bool | None = None)#

Bases: object

Scheduler for evaluating redun tasks.

A thread may only have a single running scheduler at a time, and a scheduler can perform exactly one execution at a time. While running, the scheduler will register itself into a thread-local variable, see get_current_scheduler and set_current_scheduler.

A scheduler may be reused for multiple executions, and makes every effort to be stateless between executions. That is, the scheduler clears its internal state before starting a new execution, providing isolation from any others we may have performed.

Although the scheduler collaborates with executors that may use multiple threads during execution, the scheduler logic relies upon being executed from a single thread. Therefore, the main lifecycle methods used by executors defer back to the scheduler thread. The scheduler is implemented around an event loop and asynchronous callbacks, allowing it to coordinate many in-flight jobs in parallel.

Scheduler tasks are generally considered “friends” of the scheduler and may need to access some private methods.

add_executor(executor: Executor) None#

Add executor to scheduler.

add_job_tags(job: Job, tags: List[Tuple[str, Any]]) None#

Callback for adding job tags during job execution.

clear()#

Release resources

done_job(job: Job, result: Any, job_tags: List[Tuple[str, Any]] = []) None#

Mark a Job as successfully done with a result.

A primary Executor lifecycle method, hence is thread safe.

evaluate(expr: Any, parent_job: Job | None = None) Promise#

Begin an asynchronous evaluation of an expression (concrete value or Expression). Assumes this scheduler is currently running.

This method is not a typical entry point for users, however, it is often used by `SchedulerTask`s, to trigger further computations.

Returns a Promise that will resolve when the evaluation is complete.

extend_run(expr: Expression[Result] | Result, parent_job_id: str, dryrun: bool = False, cache: bool = True, context: dict = {}) dict#

Extend an existing scheduler execution (run) to evaluate a Task or Expression.

This is an alternative to the run method, and acts as a primary user entry point.

get_job_status_report() List[str]#
property is_running: bool#
load(migrate: bool | None = None) None#
log(*messages: Any, indent: int = 0, multiline: bool = False, level: int = 20) None#
log_job_statuses() None#

Display Job statuses.

reject_job(job: Job | None, error: Any, error_traceback: Traceback | None = None, job_tags: List[Tuple[str, Any]] = []) None#

Reject a Job that has failed with an error.

A primary Executor lifecycle method, hence is thread safe.

run(expr: Expression[Result], exec_argv: List[str] | None = None, dryrun: bool = False, cache: bool = True, tags: Iterable[Tuple[str, Any]] = (), context: dict = {}, execution_id: str | None = None) Result#
run(expr: Result, exec_argv: List[str] | None = None, dryrun: bool = False, cache: bool = True, tags: Iterable[Tuple[str, Any]] = (), context: dict = {}, execution_id: str | None = None) Result

Run the scheduler to evaluate an Expression expr.

This is the primary user entry point to the scheduler. The scheduler will register itself as this thread’s running scheduler for the duration of the execution. This function blocks, running the event loop until the result is ready and can be returned.

The scheduler can only run one execution at a time.

set_cache(eval_hash: str, task_hash: str, args_hash: str, value: Any) None#

Set the cache for an evaluation (eval_hash) with result value.

This method should only be used by the scheduler or `SchedulerTask`s

exception redun.scheduler.SchedulerError#

Bases: Exception

class redun.scheduler.Traceback(error: Any, frames: List[FrameSummary], logs: List[str] | None = None)#

Bases: Value

Traceback for Job failure.

format() Iterator[str]#

Iterates through lines displaying the Traceback.

classmethod from_error(error: Exception, trim_frames=True) Traceback#

Returns a new Traceback derived from an Exception.

Parameters:
  • error (Exception) – Exception object from which to derive a Traceback.

  • trim_frames (bool) – If True, do not include redun scheduler related frames in Traceback.

classmethod trim_frames(frames: List[FrameSummary]) List[FrameSummary]#
type_name: str | None = 'redun.Traceback'#
(task)redun.scheduler._subrun_root_task(expr: redun.expression.QuotedExpression, config: Dict[str, Dict], config_dir: str | None, load_modules: List[str], run_config: Dict[str, Any]) Any#

Launches a sub-scheduler and runs the provided expression by first “unwrapping” it. The evaluated result is returned within a dict alongside other sub-scheduler-related state to the caller.

This task is limited to execution cache scope because it is difficult for the caller to be reactive to the details of the subrun. When retrieving a cache value for this task, the parent scheduler only sees the final values, not the whole call tree, so it can only perform shallow checking. Selecting execution scope means that validity checking is not required. We also set shallow checking as a reminder of how it behaves.

In the case that there is no execution-scope cache hit, then the sub-scheduler has to be started. However, the sub-scheduler is free to perform any cache logic it desires. It has the complete context to be fully reactive, or not, as configured. So, if there is a backend-scope cache hit available, the sub-scheduler may return almost instantly.

The parent scheduler may or may not have access to the code, which also makes full reactivity difficult. It is a mild assumption that the code does not change during an execution.

There is one particular case we have not implemented, where the parent scheduler could bypass the need to start the sub-scheduler. If 1) the user only wants shallow validity checking and 2) the parent scheduler has all access all the code (to compute task hashes), then the parent scheduler could correctly identify a backend-scope cache hit using information stored from the prior call node. This is left to future work.

Parameters:
  • expr (QuotedExpression) – QuotedExpression to be run by sub-scheduler.

  • config – A two-level python dict to configure the sub-scheduler. (Will be used to initialize a Config object via the config_dict kwarg.) Used if a config_dir is not provided.

  • config_dir (Optional[str]) – If supplied, this config is loaded and used instead of the supplied config

  • load_modules – List of modules that must be imported before starting the sub-scheduler. Before launching the sub-scheduler, the modules that define the user tasks must be imported.

  • run_config – Sub-scheduler run() kwargs. These are typically the local-scheduler run() kwargs that should also apply to the sub-scheduler such as dryrun and cache settings.

Returns:

Returns a dict result from running the sub-scheduler on expr. The dict contains extra information pertaining to the sub-scheduler as follows: {

’result’: sub-scheduler evaluation of expr, present if successful ‘error’: exception raised by sub-scheduler, present if error was encountered ‘dryrun’: present if dryrun was requested and workflow can’t complete

’job_id’: Job id of root Job in sub-scheduler,

present if parent_job_id given in run_config

’call_hash’: call_hash of root CallNode in sub-scheduler,

present if parent_job_id given in run_config

’status’: sub-scheduler final job status poll (list of str) ‘config’: sub-scheduler config dict (useful for confirming exactly what config

settings were used).

’run_config’: a dict of run configuration used by the sub-scheduler

}

parent_job_id is present in run_config, only results are return. Errors are raised.

Return type:

Dict[str, Any]

(scheduler_task)redun.scheduler.apply_tags(value: Any, tags: List[Tuple[str, Any]] = [], job_tags: List[Tuple[str, Any]] = [], execution_tags: List[Tuple[str, Any]] = []) T#

Apply tags to a value, job, or execution.

Returns the original value.

@task
def task1():
    x = 10
    # Apply tags on the value 10.
    return apply_tags(x, [("env": "prod", "project": "acme")])

@task
def task2():
    x = 10
    # Apply tags on the current job.
    return apply_tags(x, job_tags=[("env": "prod", "project": "acme")])

@task
def task3():
    x = 10
    # Apply tags on the current execution.
    return apply_tags(x, execution_tags=[("env": "prod", "project": "acme")])
(scheduler_task)redun.scheduler.catch(expr: Any, *catch_args: Any) T#

Catch exceptions error of class error_class from expr and evaluate recover(error).

The following example catches the ZeroDivisionError raised by divider(0) and returns 0.0 instead.

@task()
def divider(denom):
    return 1.0 / denom

@task()
def recover(error):
    return 0.0

@task()
def main():
    return catch(
        divider(0),
        ZeroDivisionError,
        recover
    )

This is equivalent to the regular python code:

def divider(denom):
    return 1.0 / denom

def main():
    try:
        return denom(0)
    except ZeroDivisionError as error:
        return 0.0

catch() can also handle multiple Exception classes as well as multiple recover expressions.

@task()
def main():
    return catch(
        task1(),
        (ZeroDivisionError, ValueError),
        recover1,
        KeyError,
        recover2,
    )

which is equivalent to the regular python code:

try:
    return task1()
except (ZeroDivisionError, ValueError) as error:
    return recover1(error)
except KeyError as error:
    return recover2(error)

Implementation/behavior note: Usually, we don’t cache errors, since this is rarely productive. Also recall that we typically cache each round of evaluation separately, allowing the scheduler to retrace the call graph and detect if any subtasks have changed (i.e. hash change) or if any Values in the subworkflow are now invalid (e.g. File hash has changed). To correctly cache a caught expression, we need to cache both the fact that expr resolved to an Exception, then that the recover_expr produced a non-error result. However, we only want to cache the exception if it is successfully handled, so if the recovery re-raises the exception or issues another error, the error should not be cached.

In order to create this behavior, we have to implement custom caching behavior that delays caching the error until we see that it has been successfully handled.

Parameters:
  • expr (Expression) – Main expression to evaluate.

  • error_recover_pairs – A list of alternating error_class and recover Tasks.

(scheduler_task)redun.scheduler.catch_all(exprs: T, error_class: NoneType | Exception | Tuple[Exception, ...] = None, recover: redun.task.Task[Callable[..., S]] | None = None) T#

Catch all exceptions that occur in the nested value exprs.

This task works similar to catch, except it takes multiple expressions within a nested value (e.g. a list, set, dict, etc). Any exceptions raised by the expressions are caught and processed only after all expressions succeed or fail. For example, this can be useful in large fanouts where the user wants to avoid one small error stopping the whole workflow immediately. Consider the following code:

@task
def divider(denom):
    return 1.0 / denom

@task
def recover(values):
    nerrors = sum(1 for value in values if isinstance(value, Exception))
    raise Exception(f"{nerrors} error(s) occurred.")

@task
def main():
    result catch_all([divider(1), divider(0), divider(2)], ZeroDivisionError, recover)

Each of the expressions in the list [divider(1), divider(0), divider(2)] will be allowed to finish before the exceptions are processed by recover. If there are no exceptions, the evaluated list is returned. If there are any exceptions, the list is passed to recover, where it will contain each successful result or Exception. recover then has the opportunity to process all errors together. In the example above, the total number of errors is reraised.

Parameters:
  • exprs – A nested value (list, set, dict, tuple, NamedTuple) of expressions to evaluate.

  • error_class (Union[Exception, Tuple[Expression, ...]]) – An Exception or Exceptions to catch.

  • recover (Task) – A task to call if any errors occurs. It will be called with the nested value containing both successful results and errors.

(scheduler_task)redun.scheduler.cond(cond_expr: Any, then_expr: Any, *rest: Any) T#

Conditionally execute expressions, i.e. a lazy if-statement.

@task()
def is_even(x):
    return x % 2 == 0

@task()
def task1():
    # ...

@task()
def task2():
    # ...

@task()
def main(x: int):
    return cond(
        is_even(x),
        task1(),
        task2()
    )
redun.scheduler.format_arg(arg_name: str, value: Any, max_length: int = 200) str#

Format a Task argument into a string.

redun.scheduler.format_job_statuses(job_status_counts: Dict[str, Dict[str, int]], timestamp: datetime | None = None) Iterator[str]#

Format job status table (Dict[task_name, Dict[status, count]]).

redun.scheduler.format_task_call(task: Task, args: Tuple, kwargs: dict) str#

Format a Task call into a string.

` my_task(arg1=10, my_file=File('path/to/file.txt') `

redun.scheduler.get_arg_defaults(task: Task, args: Tuple, kwargs: dict) dict#

Get default arguments from Task signature.

redun.scheduler.get_backend_from_config(backend_config: SectionProxy | None = None) RedunBackend#

Parses a redun redun.backends.base.RedunBackend from a config object.

redun.scheduler.get_current_scheduler(required=True) Scheduler | None#

Returns the currently running Scheduler for this thread.

redun.scheduler.get_ignore_warnings_from_config(scheduler_config: dict | SectionProxy | Config) Set[str]#

Parses ignore warnings from config.

redun.scheduler.get_limits_from_config(limits_config: dict | SectionProxy | Config | None = None) Dict[str, int]#

Parses resource limits from a config object.

redun.scheduler.is_debugger_active() bool#

Returns True if debugger REPL is currently active.

(scheduler_task)redun.scheduler.merge_handles(handles_expr: List[redun.handle.Handle]) T#

Merge multiple handles into one.

redun.scheduler.needs_root_task(task_registry: TaskRegistry, expr: Any) bool#

Returns True if expression expr needs to be wrapped in a root task.

(task)redun.scheduler.root_task(expr: redun.expression.QuotedExpression[Result]) Result#

Default task used for a root job in an Execution.

redun.scheduler.set_arg_defaults(task: Task, args: Tuple, kwargs: dict) Tuple[Tuple, dict]#

Set default arguments from Task signature.

redun.scheduler.set_current_scheduler(scheduler: Scheduler | None) None#

Sets the current running Scheduler for this thread.

redun.scheduler.settrace_patch(tracefunc: Any) None#

Monkey patch for recording whether debugger REPL is active or not.

sys.settrace() is called by debuggers such as pdb whenever the debugger REPL is activated. We use a monkey patch in order to record tracing process-wide. Relying on sys.gettrace() is not enough because it is thread-specific.

(scheduler_task)redun.scheduler.subrun(expr: Any, executor: str, config: Dict[str, Any] | None = None, config_dir: str | None = None, new_execution: bool = False, load_modules: List[str] | None = None, **task_options: dict) T#

Evaluates an expression expr in a sub-scheduler running within Executor executor.

executor and optional task_options are used to configure the special redun task ( _subrun_root_task) that starts the sub-scheduler. For example, you can configure the task with a batch executor to run the sub-scheduler on AWS Batch.

config: To ease configuration management of the sub-scheduler, you can either pass a config dict which contains configuration that would otherwise require a redun.ini file in the sub-scheduler environment, or you can provide a config_dir to be loaded when the subrun starts. If you do not pass either, the local scheduler’s config will be forwarded to the sub-scheduler (replacing the local config_dir with “.”). In practice, the sub-scheduler’s config_dir should be less important as you probably want to log both local and sub-scheduler call graphs to a common database. You can also obtain a copy of the local scheduler’s config and customize it as needed. Instantiate the scheduler directly instead of calling redun run. Then access its config via scheduler.py::get_scheduler_config_dict().

Note on code packaging: The user is responsible for ensuring that the chosen Executor for invoking the sub-scheduler copies over all user-task scripts. E.g. the local scheduler may be launched on local tasks defined in workflow1.py but subrun(executor=”batch) is invoked on taskX defined in workflow2.py. In this case, the user must ensure workflow2.py is copied to the batch node by placing it within the same directory tree as workflow1.py.

The cache behavior of subrun is customized. Since recursive reductions are supposed to happen inside the subrun, we can’t accept a single reduction. If the user wants full validity checking, that requires using single reductions, which means we have to actually start the sub-scheduler and let it handle the caching logic. In contrast, if a CSE or ultimate reduction is available, then we won’t need to start the sub-scheduler.

Most scheduler tasks don’t accept the cache options cache_scope and check_valid, but subrun does, so we can forward them to the underlying Task that implements the subrun. Note that we also set the default check_valid value to be CacheCheckValid.SHALLOW, unlike its usual value.

Parameters:
  • expr (Any) – Expression to be run by sub-scheduler.

  • executor (str) – Executor name for the special redun task that launches the sub-scheduler (_subrun_root_task). E.g. batch to launch the sub-scheduler in AWS Batch. Note that this is a config key in the local scheduler’s config.

  • config (dict) – Optional sub-scheduler config dict. Must be a two-level dict that can be used to initialize a Config object (see :method:`Config.get_config_dict()`). If None or empty, the local Scheduler’s config will be passed to the sub-scheduler and any values with local config_dir will be replaced with “.”. Do not include database credentials as they will be logged as clear text in the call graph.

  • config_dir (str) – Optional path to load a config from. Must be available in the execution context of the subrun.

  • new_execution (bool) – If True, record the provenance of the evaluation of expr as a new Execution, otherwise extend the current Execution with new Jobs.

  • load_modules (Optional[List[str]]) – If provided, an explicit list of the modules to load to prepare the subrun. If not supplied, the load_module for every task in the task registry will be included.

  • **task_options (Any) – Task options for _subrun_root_task. E.g. when running the sub-scheduler via Batch executor, you can pass ECS configuration (e.g. memory, vcpus, batch_tags).

Returns:

Returns a dict result of evaluating the _subrun_root_task

Return type:

Dict[str, Any]

(task)redun.scheduler.throw(error: Exception) None#

Raises an exception.

This is task is useful in cases where raising needs to be done lazily.

redun.scheduler_config module#

redun.scheduler_config.get_abs_config_dir(config_dir: str, cwd: str | None = None) str#

Returns absolute path to config_dir.

If config_dir is a relative path, it is assumed to be relative to cwd. cwd defaults to the current working directory.

redun.scheduler_config.get_abs_db_uri(db_uri: str, config_dir: str, cwd: str | None = None) str#

Returns DB_URI with absolute path.

If db_uri is a relative path, it is assumed to be relative to config_dir. config_dir itself may be relative to the current working directory (cwd).

redun.scheduler_config.get_abs_url(uri: str, base: str) str#

Returns URI with absolute path.

redun.scheduler_config.postprocess_config(config: Config, config_dir: str) Config#

Postprocess config.

Parameters:
  • config (Config) – The body of the config to update. Mutated by this operation.

  • config_dir (str) – The location the config came from, for the purposes of recording into the config.

redun.scripting module#

exception redun.scripting.ScriptError(stderr: bytes)#

Bases: Exception

Error raised when user script returns failure (non-zero exit code).

(task)redun.scripting._script(command: str, inputs: Any, outputs: Any, task_options: dict = {}, temp_path: str | None = None) Any#

Internal task for executing a script.

This task correctly implements reactivity to changing inputs and outputs. script_task() alone is unable to implement such reactivity because its only argument is a shell script string and its output is the stdout. Thus, the ultimate input and output files of the script are accessed outside the usual redun detection mechanisms (task arguments and return values).

To achieve the correct reactivity, script_task() is special-cased in the Scheduler to not use caching, in order to force it to always execute when called. Additionally, _script() is configured with check_valid=”shallow” to skip execution of its child tasks, script_task() and postprocess_script(), if its previous outputs are still valid (i.e. not altered or deleted).

redun.scripting.exec_script(command: str) bytes#

Run a script as a subprocess.

redun.scripting.get_command_eof(command: str, eof_prefix: str = 'EOF') str#

Determine a safe end-of-file keyword to use for a given command to wrap.

redun.scripting.get_task_command(task: Task, args: Tuple, kwargs: dict) str#

Get command from a script task.

redun.scripting.get_wrapped_command(command: str, eof_prefix: str = 'EOF') str#

Returns a shell script for executing a script written in any language.

Consider command written in python:

'''
#!/usr/bin/env python

print('Hello, World')
'''

In order to turn this into a regular sh shell script, we need to write this command to a temporary file, mark the file as executable, execute the file, and remove the temporary file.

(task)redun.scripting.postprocess_script(result: Any, outputs: Any, temp_path: str | None = None) Any#

Postprocess the results of a script task.

redun.scripting.prepare_command(command: str, default_shell='#!/usr/bin/env bash\nset -exo pipefail') str#

Prepare a command string execution by removing surrounding blank lines and dedent.

Also if an interpreter is not specified, add the default shell as interpreter.

redun.scripting.script(command: str | ~typing.List, inputs: ~typing.Any = [], outputs: ~typing.Any = <object object>, tempdir: bool = False, as_mount: bool = False, **task_options: ~typing.Any) Any#

Execute a shell script as a redun task with file staging.

See the docs for a full explanation:

https://insitro.github.io/redun/design.html#file-staging

Parameters:
  • command (Union[str, List]) – Command string or argv list to execute.

  • inputs (Any) – Collection of FileStaging objects used to stage cloud input files to local files.

  • outputs (Any) – Collection of FileStaging objects used to unstage local output files back to cloud storage.

  • tempdir (bool) – If True, run the command within a temporary directory.

  • as_mount (bool) – If True, make use of cloud storage mounting (if available) to stage files.

  • **task_options (Any) – Options to configure the Executor, such as vcpus=2 or memory=3.

Returns:

A result the same shape as outputs but with all FileStaging objects converted to their corresponding remote Files.

Return type:

Any

(task)redun.scripting.script_task(command: str) str#

Execute a shell script as redun Task.

redun.sphinx module#

Sphinx documentation plugin used to document redun tasks.

Introduction#

Usage#

Add the extension to your docs/conf.py configuration module:

extensions = (...,
              'redun.sphinx')

If you’d like to change the prefix for tasks in reference documentation then you can change the redun_task_prefix configuration value:

redun_task_prefix = '(task)'  # < default

With the extension installed autodoc will automatically find task decorated objects (e.g. when using the automodule directive) and generate the correct (as well as add a (task) prefix), and you can also refer to the tasks using :task:proj.tasks.add syntax.

Use .. autotask:: to alternatively manually document a task.

class redun.sphinx.SchedulerTaskDirective(name, arguments, options, content, lineno, content_offset, block_text, state, state_machine)#

Bases: PyFunction

Sphinx scheduler_task directive.

get_signature_prefix(sig: str) List[Node]#

May return a prefix to put before the object name in the signature.

class redun.sphinx.SchedulerTaskDocumenter(directive: DocumenterBridge, name: str, indent: str = '')#

Bases: TaskDocumenter

Document redun scheduler_task definitions.

classmethod can_document_member(member, membername, isattr, parent) bool#

Called to see if a member can be documented by this Documenter.

format_args(**kwargs: Any) str#

Returns a string documenting the task’s parameters.

member_order = 11#

order if autodoc_member_order is set to ‘groupwise’

objtype = 'scheduler_task'#

name by which the directive is called (auto…) and the default generated directive name

class redun.sphinx.TaskDirective(name, arguments, options, content, lineno, content_offset, block_text, state, state_machine)#

Bases: PyFunction

Sphinx task directive.

get_signature_prefix(sig: str) List[Node]#

May return a prefix to put before the object name in the signature.

class redun.sphinx.TaskDocumenter(directive: DocumenterBridge, name: str, indent: str = '')#

Bases: FunctionDocumenter

Document redun task definitions.

classmethod can_document_member(member, membername, isattr, parent) bool#

Called to see if a member can be documented by this Documenter.

check_module() bool#

Check if self.object is really defined in the module given by self.modname.

document_members(all_members=False)#

Generate reST for member documentation.

If all_members is True, document all members, else those given by self.options.members.

format_args(**kwargs: Any) str#

Returns a string documenting the task’s parameters.

get_doc(ignore: int | None = None) List[List[str]]#

Returns the formatted docstring for the task.

member_order = 11#

order if autodoc_member_order is set to ‘groupwise’

objtype = 'task'#

name by which the directive is called (auto…) and the default generated directive name

redun.sphinx.autodoc_skip_member_handler(app, what, name, obj, skip, options) bool | None#

Handler for autodoc-skip-member event.

redun.sphinx.setup(app)#

Setup Sphinx extension.

redun.tags module#

redun.tags.format_tag_key_value(key: str, value: Any, max_length: int = 50) str#

Format a tag key value pair.

redun.tags.format_tag_value(value: Any) str#

Format a tag value.

redun.tags.parse_tag_key_value(key_value: str, value_required=True) Tuple[str, Any]#

Parse a tag key=value pair from cli arguments.

redun.tags.parse_tag_value(value_str: str) Any#

Parse a tag value from the cli arguments.

redun.tags.str2literal(value_str: str) bool | None#

Parse a JSON literal.

redun.task module#

class redun.task.CacheCheckValid(value)#

Bases: Enum

Types of validity checking for cache hits.

FULL = 'full'#
SHALLOW = 'shallow'#
class redun.task.CacheResult(value)#

Bases: Enum

Types of cache hits and misses.

CSE = 'CSE'#
MISS = 'MISS'#
SINGLE = 'SINGLE'#
ULTIMATE = 'ULTIMATE'#
class redun.task.CacheScope(value)#

Bases: Enum

Types of cache hits and misses.

BACKEND = 'BACKEND'#
CSE = 'CSE'#
NONE = 'NONE'#
class redun.task.PartialTask(task: Task[Func2], args: tuple, kwargs: dict)#

Bases: Task[Func], Generic[Func, Func2]

A Task with only some arguments partially applied.

The type of this class is parameterized by Func and Func2, where Func2 is the type of the original function and Func is the type of partially applied function. They should match on their return types.

is_valid() bool#

Returns True if the Task Value is still valid (task hash matches registry).

Tasks are first-class Values in redun. They can be cached and fetched in future executions. When fetching a Task from the cache, the cached hash might no longer exist in the code base (registered tasks).

options(**task_options_update: Any) Task[Func]#

Returns a new Task with task_option overrides.

partial(*args, **kwargs) PartialTask[Callable[[...], Result], Callable[[...], Result]]#

Partially apply some arguments to the Task.

type_name: str | None = 'redun.PartialTask'#
class redun.task.SchedulerTask(func: Callable, name: str | None = None, namespace: str | None = None, version: str | None = None, compat: List[str] | None = None, script: bool = False, task_options_base: dict | None = None, task_options_override: dict | None = None, hash_includes: list | None = None, source: str | None = None)#

Bases: Task[Func]

A Task that executes within the scheduler to allow custom evaluation.

type_name: str | None = 'redun.task.SchedulerTask'#
class redun.task.Task(func: Callable, name: str | None = None, namespace: str | None = None, version: str | None = None, compat: List[str] | None = None, script: bool = False, task_options_base: dict | None = None, task_options_override: dict | None = None, hash_includes: list | None = None, source: str | None = None)#

Bases: Value, Generic[Func]

A redun Task.

Tasks are the basic unit of execution in redun. Tasks are often defined using the redun.task.task() decorator:

@task()
def my_task(x: int) -> int:
    return x + 1

Similar to pickling of functions, Tasks specify the work to execute by reference, not by value. This is important for serialization and caching, since a task is always reattached to the latest implementation and task options just-in-time.

Task options may give direction to the scheduler about the execution environment, but must not alter the result of the function, since changing options will not trigger cache invalidation and we are allowed to return cached results with differing options. These can be provided at two times: 1) at module-load time by decorator options, 2) at run-time by creating task expressions. The former are not serialized so they are attached the latest value in the source code. The latter must be serialized, since they are not available except at run-time.

Task are routinely serialized and deserialized and are cached based on their serialization. This relies on the serialized format being as minimal as possible, while providing the needed reference to the code to run.

If needed, extra hashable data may be provided to trigger re-evaluation/cache-invalidation.

T = ~T#
property fullname: str#

‘{namespace}.{name}’

Type:

Returns the fullname of a Task

get_hash(data: bytes | None = None) str#

Returns the Task hash.

get_task_option(option_name: str) Any | None#
get_task_option(option_name: str, default: T) T

Fetch the requested option, preferring run-time updates over options from task construction. Like the dictionary get method, returns the default a KeyError on missing keys.

get_task_options() dict#

Merge and return the task options.

has_task_option(option_name: str) bool#

Return true if the task has an option with name option_name

is_valid() bool#

Returns True if the Task Value is still valid (task hash matches registry).

Tasks are first-class Values in redun. They can be cached and fetched in future executions. When fetching a Task from the cache, the cached hash might no longer exist in the code base (registered tasks).

property load_module: str#
property nout: int | None#

Determines nout from task options and return type.

The precedence is: - the task option - function return type

options(**task_options_update: Any) Task[Func]#

Returns a new Task with task_option overrides.

partial(*args, **kwargs) PartialTask[Callable[[...], Result], Callable[[...], Result]]#

Partially apply some arguments to the Task.

recompute_hash()#
property signature: Signature#

Signature of the function wrapped by the task.

type_name: str | None = 'redun.Task'#
update_context(context: dict = {}, **kwargs: Any) Task[Func]#

Update the context variables for the task.

class redun.task.TaskRegistry#

Bases: object

A registry of currently registered Tasks. The @task() decorator registers tasks to the current registry.

add(task: Task) None#
get(task_name: str | None = None, hash: str | None = None) Task | None#
rename(old_name: str, new_namespace: str, new_name: str) Task#
redun.task.get_task_registry()#

Returns the global task registry.

redun.task.get_tuple_type_length(tuple_type: Any) int | None#

Returns the length of a tuple type if inferable.

redun.task.hash_args_eval(type_registry: TypeRegistry, task: Task, args: tuple, kwargs: dict) Tuple[str, str]#

Compute eval_hash and args_hash for a task call, filtering out config args before hashing.

redun.task.scheduler_task(name: str | None = None, namespace: str | None = None, version: str = '1', **task_options_base: Any) Callable[[Callable[[...], Promise[Result]]], SchedulerTask[Callable[[...], Result]]]#

Decorator to register a function as a scheduler task.

Unlike usual tasks, scheduler tasks are lower-level tasks that are evaluated within the Scheduler and allow defining custom evaluation semantics. For example, one can implement cond(), seq() and catch() using scheduler tasks.

When evaluated, scheduler tasks are called with a reference to the Scheduler, the parent Job, and the full SchedulerExpression as it’s first three arguments. It’s remaining arguments are the same as those passed from the user, however, they are not evaluated and may contain Expression`s. It is the responsibility of the scheduler task to explicitly evaluate arguments by using `Scheduler.evaluate() as needed. Overall, this allows the scheduler task to implement custom evaluation semantics. Lastly, the scheduler task must return a Promise that resolves to the result of the task.

This concept corresponds to fexpr in Lisp: https://en.wikipedia.org/wiki/Fexpr

For example, one could implement a lazy if-statement called cond using this scheduler task:

@scheduler_task()
def cond(scheduler: Scheduler, parent_job: Job, scheduler_expr: SchedulerExpression,
         pred_expr: Any, then_expr: Any, else_expr: Any) -> Promise:
    def then(pred):
        if pred:
            return scheduler.evaluate(then_expr, parent_job=parent_job)
        else:
            return scheduler.evaluate(else_expr, parent_job=parent_job)

    return scheduler.evaluate(pred_expr, parent_job=parent_job).then(then)

Once defined, the new cond expression can be used like this:

@task()
def main():
    result = task1()
    return cond(result, task2(), task3())
redun.task.split_task_fullname(task_fullname: str) Tuple[str, str]#

Split a Task fullname into a namespace and name.

redun.task.task(func: Func) Task[Func]#
redun.task.task(*, name: str | None = None, namespace: str | None = None, version: str | None = None, compat: List[str] | None = None, script: bool = False, hash_includes: list | None = None, source: str | None = None, **task_options_base: Any) Callable[[Func], Task[Func]]

Decorator to register a function as a redun Task.

Parameters:
  • func (Optional[Func]) – A python function to register as a redun Task. If not given, a parameterized decorator is returned.

  • name (Optional[str]) – Name of task (Default: infer from function func.__name__)

  • namespace (Optional[str]) – Namespace of task (Default: Infer from context if possible, else None. See Task._compute_namespace)

  • version (Optional[str]) – Optional manual versioning for a task (Default: source code of task is hashed).

  • compat (Optional[List[str]]) – Optional redun version compatibility. Not currently implemented.

  • script (bool) – If True, this is a script-style task which returns a shell script string.

  • hash_includes (Optional[list]) – If provided, extra data that should be hashed. That is, extra data that should be considered as part of cache invalidation. This list may be reordered without impacting the computation. Each list item must be hashable by redun.value.TypeRegistry.get_hash.

  • source (Optional[str]) – If provided, task.source will be set to this string. It is the caller’s responsibility to ensure that source matches the provided func for proper hashing.

  • **task_options_base (Any) –

    Additional options for configuring a task or specifying behaviors of tasks. Since these are provided at task construction time (this is typically at Python module-load time), they are the “base” set. Example keys:

    load_moduleOptional[str]

    The module to load to import this task. (Default: infer from func.__module__)

    wrapped_taskOptional[Task]

    If present, a reference to the task wrapped by this one.

redun.task.undefined_task(fullname: str, *args: Any, **kwargs: Any) None#

Default function used for a deserialized Task with unknown definition.

redun.task.wraps_task(wrapper_name: str | None = None, wrapper_hash_includes: list = [], **wrapper_task_options_base: Any) Callable[[Callable[[Task[Func]], Func]], Callable[[Func], Task[Func]]]#

A helper for creating new decorators that can be used like @task, that allow us to wrap the task in another one. Conceptually inspired by @functools.wraps, which makes it easier to create decorators that enclose functions.

Specifically, this helps us create a decorator that accepts a task and wraps it. The task passed in is moved into an inner namespace, hiding it. Then a new task is created from the wrapper implementation that assumes its identity; the wrapper is given access to both the run-time arguments and the hidden task definition. Since Tasks may be wrapped repeatedly, the hiding step is recursive. The load_module for all of the layers is dictated by the innermost concrete Task object.

A simple usage example is a new decorator @doubled_task, which simply runs the original task and multiplies by two.

def doubled_task() -> Callable[[Func], Task[Func]]:

    # The name of this inner function is used to create the nested namespace,
    # so idiomatically, use the same name as the decorator with a leading underscore.
    @wraps_task()
    def _doubled_task(inner_task: Task) -> Callable[[Task[Func]], Func]:

        # The behavior when the task is run. Note we have both the task and the
        # runtime args.
        def do_doubling(*task_args, **task_kwargs) -> Any:
            return 2 * inner_task.func(*task_args, **task_kwargs)

        return do_doubling
    return _doubled_task

# The simplest use is to wrap a task that is already created
@doubled_task()
@task()
def value_task1(x: int):
    return 1 + x

# We can skip the inner decorator and the task will get created implicitly
# Use the explicit form if you need to pass arguments to task creation.
@doubled_task()
def value_task2(x: int):
    return 1 + x

# We can keep going
@doubled_task()
@doubled_task()
def value_task3(x: int):
    return 1 + x

There is an additional subtlety if the wrapper itself accepts arguments. These must be passed along to the wrapper so they are visible to the scheduler. Needing to do this manually is the cost of the extra powers we have.

# An example of arguments consumed by the wrapper
def wrapper_with_args(wrapper_arg: int) -> Callable[[Func], Task[Func]]:

    # WARNING: Be sure to pass the extra data for hashing so it participates in the cache
    # evaluation
    @wraps_task(wrapper_hash_includes=[wrapper_arg])
    def _wrapper_with_args(inner_task: Task) -> Callable[[Task[Func]], Func]:

        def do_wrapper_with_args(*task_args, **task_kwargs) -> Any:
            return wrapper_arg * inner_task.func(*task_args, **task_kwargs)

        return do_wrapper_with_args
    return _wrapper_with_args
Parameters:
  • wrapper_name (Optional[str]) – The name of the wrapper, which is used to create the inner namespace. (Default: infer from the wrapper wrapper_func.__name__)

  • wrapper_task_options_base (Any) – Additional options for the wrapper task.

  • wrapper_hash_includes (Optional[list]) – If provided, extra data that should be hashed. That is, extra data that should be considered as part of cache invalidation. This list may be reordered without impacting the computation. Each list item must be hashable by redun.value.TypeRegistry.get_hash

redun.tools module#

(task)redun.tools.copy_dir(src_dir: redun.file.Dir, dest_path: str, skip_if_exists: bool = False, copy_options: Dict = {}) redun.file.Dir#

Copy a Dir to a new path.

File copies will be done in parallel.

(task)redun.tools.copy_file(src_file: redun.file.File, dest_path: str, skip_if_exists: bool = False) redun.file.File#

Copy a File to a new path.

This task can help in parallelizing file copies and providing data provenance.

(task)redun.tools.copy_files(src_files: List[redun.file.File], dest_paths: List[str], skip_if_exists: bool = False, copy_options: Dict = {}) List[redun.file.File]#

Copy multiple Files to new paths.

File copies will be done in parallel.

(task)redun.tools.debug_task(value: T) T#

Launch pdb debugger during one point of the redun workflow.

For example, to inspect the lazy value x in the workflow below we can pass it through debug_task():

x = task1()
x = debug_task(x)
y = task2(x)
(task)redun.tools.render_template(out_path: str, template: redun.file.File, context: dict, template_dir: str | None = None) redun.file.File#

Render a jinja2 template.

redun.utils module#

class redun.utils.CacheArgs(cache_key, args, kwargs)#

Bases: object

Arguments for a cached function.

class redun.utils.Comparable(*args, **kwargs)#

Bases: Protocol

Protocol for annotating comparable types.

class redun.utils.MultiMap(items: Iterable[Tuple[Key, Value] | List] = ())#

Bases: Generic[Key, Value]

An associative array with repeated keys.

add(key: Key, value: Value) None#
as_dict() Dict[Key, List[Value]]#
get(key: ~redun.utils.Key, default: ~typing.Any = <object object>) List[Value]#
has_item(key: Key, value: Value) bool#
items() Iterator[Tuple[Key, Value]]#
keys() Iterator[Key]#
values() Iterator[Value]#
class redun.utils.PreviewClass(*args, **kwargs)#

Bases: ABC

Generic class to use for unknown classes in a pickle

class redun.utils.PreviewUnpicklable(error)#

Bases: object

class redun.utils.PreviewingUnpickler(*args, **kwargs)#

Bases: Unpickler

A specialized unpickler that allows previewing of class attributes for unknown classes.

find_class(module: str, name: str) Type#

Return an object from a specified module.

If necessary, the module will be imported. Subclasses may override this method (e.g. to restrict unpickling of arbitrary classes and functions).

This method is called whenever a class or a function object is needed. Both arguments passed are str objects.

redun.utils.add_import_path(path: str) None#

Add a path to the python import paths.

redun.utils.assert_never(value: NoReturn) NoReturn#

Helper method for exhaustive type checking.

https://hakibenita.com/python-mypy-exhaustive-checking#type-narrowing-in-mypy

redun.utils.clear_import_paths() None#

Clear all extra python import paths.

redun.utils.format_table(table: List[List], justs: str, min_width: int = 0) Iterator[str]#

Format table with justified columns.

redun.utils.get_func_source(func: Callable) str#

Return the source code for a function.

redun.utils.get_import_paths() List[str]#

Returns extra import paths that have been added.

redun.utils.iter_nested_value(value: Any) Iterable[Any]#

Iterate through the leaf values of a nested value.

redun.utils.iter_nested_value_children(value: Any) Iterable[Tuple[bool, Any]]#

Helper function that iterates through the children of a possibly nested value.

Yields: (is_leaf: Bool, value: Any)

redun.utils.json_cache_key(args: tuple, kwargs: dict) str#

Returns a cache key for arguments that are JSON compatible.

redun.utils.json_dumps(value: Any) str#

Convert JSON-like values into a normalized string.

Keys are sorted and no whitespace is used around delimiters.

redun.utils.lru_cache_custom(maxsize: int, cache_key=<function <lambda>>)#

LRU cache with custom cache key.

redun.utils.map_nested_value(func: Callable, value: Any) Any#

Map a func to the leaves of a nested value.

redun.utils.merge_dicts(dicts: List[T]) T#

Merge a list of (nested) dicts into a single dict. .. code-block:: python

assert merge_dicts([

{“a”: 1, “b”: 2}, {“b”: 3, “c”: 4},

]) == {“a”: 1, “b”: 3, “c”: 4} assert merge_dicts([

{“a”: {“a1”: 1}}, {“a”: {“a2”: 2}}

]) == {“a”: {“a1”: 1, “a2”: 2}}

redun.utils.pickle_dump(obj: Any, file: IO) None#

Official pickling method for redun.

This is used to standardize the protocol used for serialization.

redun.utils.pickle_dumps(obj: Any) bytes#

Official pickling method for redun.

This is used to standardize the protocol used for serialization.

redun.utils.pickle_loads(data: bytes) Any#

Official unpickling method for redun.

This is used to allow pickle previews with a preview context is active.

redun.utils.pickle_preview(data: bytes, raise_error: bool = False) Any#

Loads a pickle file, falling back to previewing objects as needed.

For example, if the class is unknown, will return a PreviewClass subclass that can be used to peek at attributes.

redun.utils.str2bool(text: str) bool#

Parse a string into a bool.

redun.utils.trim_string(text: str, max_length: int = 200, ellipsis: str = '...') str#

If text is longer than max_length then return a trimmed string.

redun.utils.with_pickle_preview(raise_error: bool = False) Iterator[None]#

Enable pickle preview within a context.

We keep a count of how many nested contexts are active.

redun.value module#

class redun.value.Bool(instance: Any)#

Bases: ProxyValue

Augment builtins.bool to support argument parsing.

classmethod parse_arg(raw_type: Type, arg: str) Any#

Parse a command line argument in a new Value.

type#

alias of bool

type_name: str | None = 'builtins.bool'#
class redun.value.DatetimeType(instance: Any)#

Bases: ProxyValue

Augment datetime.datetime to support argument parsing.

classmethod parse_arg(raw_type: Type, arg: str) datetime#

Parse a command line argument in a new Value.

type#

alias of datetime

type_name: str | None = 'datetime.datetime'#
class redun.value.EnumType(instance: Any)#

Bases: ProxyValue

Augment enum.Enum to support argument parsing with choices.

classmethod parse_arg(raw_type: Type, arg: str) Any#

Parse a command line argument in a new Value.

type#

alias of Enum

type_name: str | None = 'enum.Enum'#
class redun.value.FileCache(instance: Any)#

Bases: ProxyValue

Baseclass for caching values to Files.

This baseclass can be used to control where the serialized data of potentially large values is stored.

# Example of user-specific data type.
class Data:
    def __init__(self, data):
        self.data = data

# Subclassing from FileCache and setting `type = Data` will register
# with redun that File-caching should be used for any Value of type
# `Data`.
class DataType(FileCache):
    type = Data

    # This path specifies where the files containing serialized
    # values should be stored. This variable accepts all paths
    # acceptable to File (e.g. local, s3, etc).
    base_path = "tmp"

@task()
def task1() -> Data:
    # This value will be serialized to a File instead of the default
    # redun database.
    return Data("my data")
base_path = '.'#
classmethod deserialize(raw_type: type, filename: bytes) Any#

Returns a deserialization of bytes data into a new Value.

serialize() bytes#

Serializes the Value into a byte sequence.

type_name: str | None = 'redun.value.FileCache'#
class redun.value.Function(instance: Callable)#

Bases: ProxyValue

Value class to allow redun to hash and cache plain Python functions.

classmethod deserialize(raw_type: Any, data: bytes) Any#

Returns a deserialization of bytes data into a new Value.

property fullname: str#

Return a fullname for a function including its module.

get_hash(data: bytes | None = None) str#

Returns a hash for the value.

is_valid() bool#

Value is valid to use from the cache if we can reassociate the function.

classmethod parse_arg(raw_type: Type, arg: str) Any#

Parses function by fully qualified name from command-line.

serialize() bytes#

Serializes the Value into a byte sequence.

type#

alias of function

type_name: str | None = 'builtins.function'#
exception redun.value.InvalidValueError#

Bases: Exception

class redun.value.MetaValue(name, bases, dct, **kwargs)#

Bases: type

Registers Value classes as they are defined.

class redun.value.ProxyValue(instance: Any)#

Bases: Value

Class for overriding Value behavior (hashing, serialization) for raw python types.

get_hash(data: bytes | None = None) str#

Returns a hash for the value.

iter_subvalues() Iterator[Value]#

Iterates through the Value’s subvalues.

postprocess(postprocess_args) Value#

Post process a value resulting from a Task.

preprocess(preprocess_args) Value#

Preprocess a value before passing to a Task.

proxy = True#
serialize() bytes#

Serializes the Value into a byte sequence.

type: Any = None#
type_name: str | None = 'redun.value.ProxyValue'#
class redun.value.Set(instance: Any)#

Bases: ProxyValue

Augment builtins.set to support stable hashing.

get_hash(data: bytes | None = None) str#

Returns a hash for the value.

type#

alias of set

type_name: str | None = 'builtins.set'#
exception redun.value.TypeError#

Bases: Exception

class redun.value.TypeRegistry#

Bases: object

A registry for redun types and a dispatch for value methods.

deserialize(type_name: str, data: bytes) Any#

Returns a deserialization of bytes data into a new Value.

get_hash(value: Any, data: bytes | None = None) str#

Hashes a value according to the hash method precedence.

get_serialization_format(value: Any) str#

Returns mimetype of serialization.

get_type(raw_type: Type, use_default: bool = True) Type[Value] | None#

Return a registered Value class for a raw python type.

get_type_name(raw_type: Type) str#

Returns the type name for a python type.

get_value(raw_value: Any) Value#

Return a Value instance for a raw python type.

is_valid(value: Any) bool#

Returns True if the value is still good to use since being cached.

For example, if the value was a File reference and the corresponding file on the filesystem has since been removed/altered, the File reference is no longer valid.

is_valid_nested(nested_value: Any) bool#

Returns True if nested value is still good to use since being cached.

iter_subvalues(raw_value: Any) Iterator[Value]#

Iterates through the Value’s subvalues.

parse_arg(raw_type: Type, arg: str) Any#

Parse a command-line argument of type value_type.

parse_type_name(type_name: str) Type#

Parse a type name into a python type.

postprocess(raw_value: Any, postprocess_args: dict) Any#

Post process a value resulting from a Task.

preprocess(raw_value: Any, preprocess_args: dict) Any#

Preprocess a value before passing to a Task.

register(value_type: Type[Value]) None#

Register a Value class with the type system.

serialize(value: Any) bytes#

Serializes the Value into a byte sequence.

class redun.value.Value(*args, **kwargs)#

Bases: object

The base class for tracking inputs and outputs in redun, encapsulating both value and state, in a hashable and serializable form.

This class underpins the implementation of memoization of redun.Task`s. Hashing is used as the sole, unique identifier for a value; there is no fallback to deep equality testing. Hence, we can hash `Task inputs to identify repeated invocations and supply the cached result instead. Serialization makes it feasible to cache outputs of tasks.

In addition to handling ordinary values, i.e., data or objects represented in memory, complex workflows often require reasoning about state that may be complex or external, such as a database or filesystem. Redun accomplishes reasoning over this state by converting it into value-like semantics, after which, redun can proceed without differentiating between values and state. Specifically, this class provides that abstraction. Therefore, this class offers a more complex lifecycle than would be required for ordinary values.

The default hashing and serialization methods are designed for simple object values. Child classes may customize these as needed; see below for the API details.

We introduce two atypical lifecycle steps that are needed for more complex cases, especially those involving values that wrap state: validity checking and pre/post processing. The scheduler will arrange for pre- and post-processing hooks to be called on Value objects around their use in Task implementations. This allows users to implement some more complex concepts, such as lazy initialization or to temporarily allocate resources.

See docs/source/design.md for more information about “Validity” of values, which is an important concept for this class.

classmethod deserialize(raw_type: type, data: bytes) Any#

Returns a deserialization of bytes data into a new Value.

get_hash(data: bytes | None = None) str#

Returns a hash for the value. Users may override to provide custom behavior.

Most basic objects are covered by the default approach of hashing the binary pickle serialization.

get_serialization_format() str#

Returns mimetype of serialization.

is_valid() bool#

Returns True if the value may be used. For ordinary values, this will typically always be true. For state-tracking values, the deserialized cache value may no longer be valid. If this method returns False, the object will typically be discarded.

This method may be called repeatedly on the same object.

For example, if the value was a File reference and the corresponding file on the filesystem has since been removed/altered, the File reference is no longer valid.

iter_subvalues() Iterator[Value]#

Iterates through the Value’s subvalues.

classmethod parse_arg(raw_type: type, arg: str) Any#

Parse a command line argument in a new Value.

postprocess(postprocess_args: dict) Value#

Post process a value resulting from a Task, and return the postprocessed value.

This is a scheduler lifecycle method and is not typically invoked by users. This method may be called repeatedly on a particular instance, for example, if it is returned recursively.

Parameters:

postprocess_args (dict) – Extra data provided by the scheduler. Implementations that are not part of redun should discard this argument.

preprocess(preprocess_args: dict) Value#

Preprocess a value before passing it to a Task, and return the preprocessed value.

This is a scheduler lifecycle method and is not typically invoked by users. This method may be called repeatedly on a particular instance, for example, if a single Value is passed to multiple `Task`s.

Parameters:

preprocess_args (dict) – Extra data provided by the scheduler. Implementations that are not part of redun should discard this argument.

serialize() bytes#

Serializes the Value into a byte sequence.

type_name: str | None = 'redun.value.Value'#
redun.value.get_raw_type_name(raw_type: Any) str#
redun.value.get_type_registry() TypeRegistry#

Return the global singleton instance of TypeRegistry.

redun.value.is_unknown_function(func: Callable) bool#

Returns True if the function was unknown when trying to reimport.

redun.value.make_unknown_function(func_name: str) Callable#

Returns a stub function in place of a function that could not be reimported.

redun.version module#

redun.visualize module#

Module contents#

class redun.Dir(path: str)#

Bases: FileSet

classes = <redun.file.FileClasses object>#
copy_to(dest_dir: Dir, skip_if_exists: bool = False) Dir#
exists() bool#
file(rel_path: str) File#
filesystem: FileSystem#
mkdir() None#
rel_path(path: str) str#
rmdir(recursive: bool = False) None#
shell_copy_to(dest_path: str, as_mount: bool = False) str#

Returns a shell command for copying the directory to a destination path.

Parameters:
  • dest_path (str) – Destination path to copy to.

  • as_mount (bool) – Treat src/dest as mounted.

stage(local: str | None = None) StagingDir#
type_basename = 'Dir'#
type_name: str | None = 'redun.Dir'#
class redun.File(path: str)#

Bases: Value

Class for assisting file IO in redun tasks.

File objects are hashed based on their contents and abstract over storage backends such as local disk or cloud object storage.

basename() str#
classes = <redun.file.FileClasses object>#
copy_to(dest_file: File, skip_if_exists: bool = False) File#
dirname() str#
exists() bool#
filesystem: FileSystem#
get_hash(data: bytes | None = None) str#

Returns a hash for the value. Users may override to provide custom behavior.

Most basic objects are covered by the default approach of hashing the binary pickle serialization.

property hash: str#
is_valid() bool#

Returns True if the value may be used. For ordinary values, this will typically always be true. For state-tracking values, the deserialized cache value may no longer be valid. If this method returns False, the object will typically be discarded.

This method may be called repeatedly on the same object.

For example, if the value was a File reference and the corresponding file on the filesystem has since been removed/altered, the File reference is no longer valid.

isdir() bool#
isfile() bool#
open(mode: str = 'r', encoding: str | None = None, **kwargs: Any) IO#

Open a file stream.

Parameters:
  • mode (str) – Stream mode for reading or writing (‘r’, ‘w’, ‘b’, ‘a’).

  • encoding (str) – Text encoding (e.g. ‘utf-8’) to use when read or writing.

  • **kwargs – Additional arguments for the underlying file stream. They are Filesystem-specific.

path: str#
read(mode: str = 'r', encoding: str | None = None) str | bytes#
readlines(mode: str = 'r') List[str | bytes]#
remove() None#
shell_copy_to(dest_path: str | None, as_mount: bool = False) str#

Returns a shell command for copying the file to a destination path.

Parameters:
  • dest_path (Optional[str]) – Destination path to copy to. If None, use stdout.

  • as_mount (Optional[str]) – Copy files from mounted directories.

size() int#
stage(local: str | None = None) StagingFile#
stream: IO | None#
touch(time: Tuple[int, int] | Tuple[float, float] | None = None) None#
type_basename = 'File'#
type_name: str | None = 'redun.File'#
update_hash() None#
write(data: str | bytes, mode: str = 'w', encoding: str | None = None) None#
class redun.Handle(name: str | None = None, *args, namespace: str | None = None, **kwargs)#

Bases: Value

A Value that opaquely accumulates state as it passes through Task`s. This subclass of `Value has deep support from the scheduler and its backend, in order to provide a foundation for encapsulating interactions with a deterministic, stateful, external system, such as a database.

This model of state supports being initialized, then opaque transformations as Task`s are applied. The hash data and serialization data is designed to be opaque to the actual data held within the handle. At initialization time, only the constructor data is captured for serialization and hashing. Upon transformation (that is, when a `Handle is returned by a Task) the hash is replaced with the hash identifying the Task invocation, including both code and inputs.

See the design documents for more discussion of the principles behind the design: docs/source/design.md.

In general use, users should not alter the serialization methods.

class HandleInfo(name: str, args: Tuple, kwargs: dict, class_name: str, namespace: str | None = None, call_hash: str = '', hash: str | None = None, key: str = '')#

Bases: object

The fullname is an identification key for the handle. It is used globally across the redun backend to identify other potential instances of this Handle. However, hashes are used, as usual, to perform equality-like tests.

apply_call(handle: Handle, call_hash: str) Handle#
clone(handle: Handle) Handle#
fork(handle: Handle, key: str) Handle#
get_hash() str#

Returns hash of the handle.

Implementation note: the Handle state model requires that the computation proceeds differently for new Handles and ones that are computed by other tasks.

get_state() dict#

Returns serializable state dict.

update_hash() None#
apply_call(call_hash: str) Handle#

Returns a new Handle derived from this one assuming passage through a call with call_hash.

fork(key: str) Handle#

Forks the handle into a second one for use in parallel tasks. A key must be provided to differentiate the fork from the original (although the original may have a key)

get_hash(data: bytes | None = None) str#

Returns a hash of the handle.

is_valid() bool#

Returns True if handle is still valid (i.e., has not been rolled back).

postprocess(postprocess_args: dict) Handle#

Applies the call_hash to the handle as it returns from a task.

preprocess(preprocess_args: dict) Handle#

Forks a handle as it passes into a task.

type_name: str | None = 'redun.Handle'#
class redun.PartialTask(task: Task[Func2], args: tuple, kwargs: dict)#

Bases: Task[Func], Generic[Func, Func2]

A Task with only some arguments partially applied.

The type of this class is parameterized by Func and Func2, where Func2 is the type of the original function and Func is the type of partially applied function. They should match on their return types.

is_valid() bool#

Returns True if the Task Value is still valid (task hash matches registry).

Tasks are first-class Values in redun. They can be cached and fetched in future executions. When fetching a Task from the cache, the cached hash might no longer exist in the code base (registered tasks).

options(**task_options_update: Any) Task[Func]#

Returns a new Task with task_option overrides.

partial(*args, **kwargs) PartialTask[Callable[[...], Result], Callable[[...], Result]]#

Partially apply some arguments to the Task.

type_name: str | None = 'redun.PartialTask'#
class redun.Scheduler(config: Config | None = None, backend: RedunBackend | None = None, executor: Executor | None = None, logger: Any | None = None, use_task_traceback: bool = True, job_status_interval: int | None = None, migrate: bool | None = None)#

Bases: object

Scheduler for evaluating redun tasks.

A thread may only have a single running scheduler at a time, and a scheduler can perform exactly one execution at a time. While running, the scheduler will register itself into a thread-local variable, see get_current_scheduler and set_current_scheduler.

A scheduler may be reused for multiple executions, and makes every effort to be stateless between executions. That is, the scheduler clears its internal state before starting a new execution, providing isolation from any others we may have performed.

Although the scheduler collaborates with executors that may use multiple threads during execution, the scheduler logic relies upon being executed from a single thread. Therefore, the main lifecycle methods used by executors defer back to the scheduler thread. The scheduler is implemented around an event loop and asynchronous callbacks, allowing it to coordinate many in-flight jobs in parallel.

Scheduler tasks are generally considered “friends” of the scheduler and may need to access some private methods.

add_executor(executor: Executor) None#

Add executor to scheduler.

add_job_tags(job: Job, tags: List[Tuple[str, Any]]) None#

Callback for adding job tags during job execution.

clear()#

Release resources

done_job(job: Job, result: Any, job_tags: List[Tuple[str, Any]] = []) None#

Mark a Job as successfully done with a result.

A primary Executor lifecycle method, hence is thread safe.

evaluate(expr: Any, parent_job: Job | None = None) Promise#

Begin an asynchronous evaluation of an expression (concrete value or Expression). Assumes this scheduler is currently running.

This method is not a typical entry point for users, however, it is often used by `SchedulerTask`s, to trigger further computations.

Returns a Promise that will resolve when the evaluation is complete.

extend_run(expr: Expression[Result] | Result, parent_job_id: str, dryrun: bool = False, cache: bool = True, context: dict = {}) dict#

Extend an existing scheduler execution (run) to evaluate a Task or Expression.

This is an alternative to the run method, and acts as a primary user entry point.

get_job_status_report() List[str]#
property is_running: bool#
load(migrate: bool | None = None) None#
log(*messages: Any, indent: int = 0, multiline: bool = False, level: int = 20) None#
log_job_statuses() None#

Display Job statuses.

reject_job(job: Job | None, error: Any, error_traceback: Traceback | None = None, job_tags: List[Tuple[str, Any]] = []) None#

Reject a Job that has failed with an error.

A primary Executor lifecycle method, hence is thread safe.

run(expr: Expression[Result], exec_argv: List[str] | None = None, dryrun: bool = False, cache: bool = True, tags: Iterable[Tuple[str, Any]] = (), context: dict = {}, execution_id: str | None = None) Result#
run(expr: Result, exec_argv: List[str] | None = None, dryrun: bool = False, cache: bool = True, tags: Iterable[Tuple[str, Any]] = (), context: dict = {}, execution_id: str | None = None) Result

Run the scheduler to evaluate an Expression expr.

This is the primary user entry point to the scheduler. The scheduler will register itself as this thread’s running scheduler for the duration of the execution. This function blocks, running the event loop until the result is ready and can be returned.

The scheduler can only run one execution at a time.

set_cache(eval_hash: str, task_hash: str, args_hash: str, value: Any) None#

Set the cache for an evaluation (eval_hash) with result value.

This method should only be used by the scheduler or `SchedulerTask`s

class redun.ShardedS3Dataset(path: str, format: str = 'parquet', recurse: bool = True)#

Bases: Value

A sharded dataset on S3. “Sharded” means a collection of files that when concatenated comprise the complete dataset. Several formats are supported but parquet is the best tested with redun to date due to the quality of its integration with AWS services and because it allows reading of only portions of the dataset.

The hash of the S3ShardedDataset is just the hash of the sorted list of files in the dataset. So changing the files included (such as with recurse=True), adding or removing files, or doing some kind of dataset write operation (which creates new shards) cause the hash of the dataset to change. This does not recognize individual shards from being altered by other code, however.

property filenames: List[str]#
filesystem: FileSystem#
property format: str#
classmethod from_data(dataset: pandas.DataFrame | pyspark.sql.DataFrame, output_path: str, format: str = 'parquet', partition_keys: List[str] = [], catalog_database: str = 'default', catalog_table: str | None = None, format_options: Dict[str, Any] = {}) ShardedS3Dataset#

Helper function to create a ShardedS3Dataset from an existing DataFrame-like object.

Parameters:
  • dataset (Union[pandas.DataFrame, pyspark.sql.DataFrame]) – Dataset to save

  • output_path (str) – Path on S3 to which data will be saved as multiple files of format format.

  • format (str) – Format to save the data in. Supported formats are: [“avro”, “csv”, “ion”, “grokLog”, “json”, “orc”, “parquet”, “xml”] Defaults to parquet.

  • partition_keys (List[str]) – Dataset keys to partition on. Each key will be a subdirectory in self.path containing data for each value of that key. For example, partition on the column ‘K’, will make subdirectores ‘K=1’, ‘K=2’, ‘K=3’, etc.

  • catalog_database (str) – Datacatalog name to write to, if creating a table in the Data Catalog. Defaults to ‘default’

  • catalog_table (Optional[str]) – If present, written data will be available in AWS Data Catalog / Glue / Athena with the indicated table name.

  • format_options (Dict[str, Any]) – Additional options for the data loader. Documented here: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format.html

classmethod from_files(files: List[File], format: str | None = None, allow_additional_files: bool = False) ShardedS3Dataset#

Helper function to create a ShardedS3Dataset from a list of redun Files. This can be helpful for provenance tracking from methods that return multiple files and the “root directory” is wanted for later use.

If a dataset cannot be created that contains all the files, such as being in different S3 prefixes, a ValueError is raised. If files do not all have the same format, a ValueError will be raised.

Parameters:
  • files (List[File]) – Files that should be included in the dataset.

  • format (Optional[str]) – If specified, files are considered to be in this format.

  • allow_additional_files (bool) – If True, allows the resulting ShardedS3Dataset to contain files not in files. If False, a ValueError will be raised if a ShardedS3Dataset cannot be constructed.

Raises:

ValueError – If a dataset could not be created from the given files.

get_hash(data: bytes | None = None) str#

Returns a hash for the value. Users may override to provide custom behavior.

Most basic objects are covered by the default approach of hashing the binary pickle serialization.

property hash: str#
is_valid() bool#

Returns True if the value may be used. For ordinary values, this will typically always be true. For state-tracking values, the deserialized cache value may no longer be valid. If this method returns False, the object will typically be discarded.

This method may be called repeatedly on the same object.

For example, if the value was a File reference and the corresponding file on the filesystem has since been removed/altered, the File reference is no longer valid.

iter_subvalues() Iterator[Value]#

Iterates through the Value’s subvalues.

load_pandas(max_shards: int = -1) pandas.DataFrame#

Loads the ShardedS3Dataset as a Pandas DataFrame.

Parameters:

max_shards (int) – Maximum number of shards to load. If -1, will load all of them.

Returns:

All

Return type:

pandas.DataFrame

load_pandas_shards(max_shards: int = -1) List[pandas.DataFrame]#

Loads the ShardedS3Dataset as a list of Pandas DataFrames. This is deterministic and will load the shards in the same order every time.

Parameters:

max_shards (int) – Maximum number of shards to load. If -1 (default), will load all shards.

Returns:

Loaded shards, one per entry in list. Shards

Return type:

List[pandas.DataFrame]

load_spark(validate: bool = False, format_options: Dict[str, Any] = {}) pyspark.sql.DataFrame#

Loads the ShardedS3Dataset as a Spark DataFrame. Must be running in a Spark context.

Parameters:
Returns:

pyspark.sql.DataFrame

Return type:

loaded dataset

property path: str#
postprocess(postprocess_args) ShardedS3Dataset#

Post process a value resulting from a Task, and return the postprocessed value.

This is a scheduler lifecycle method and is not typically invoked by users. This method may be called repeatedly on a particular instance, for example, if it is returned recursively.

Parameters:

postprocess_args (dict) – Extra data provided by the scheduler. Implementations that are not part of redun should discard this argument.

purge_spark(remove_older_than: int = 1, manifest_file_path: str | None = None) None#

Recursively removes all files older than remove_older_than hours. Defaults to 1 hour. Optionally writes removed files to manifest_file_path/Success.csv

property recurse: bool#
save_spark(dataset: pandas.DataFrame | pyspark.sql.DataFrame, partition_keys: List[str] = [], catalog_database: str = 'default', catalog_table: str | None = None, format_options: Dict[str, Any] = {}) None#

Writes a pandas or spark DataFrame to the given path in the given format, optionally partitioning on dataset keys. Must be done from a spark environment.

Parameters:
  • dataset (Union[pandas.DataFrame, pyspark.sql.DataFrame]) – Dataset to save

  • partition_keys (List[str]) – Dataset keys to partition on. Each key will be a subdirectory in self.path containing data for each value of that key. For example, partition on the column ‘K’, will make subdirectores ‘K=1’, ‘K=2’, ‘K=3’, etc.

  • catalog_database (str) – Datacatalog name to write to, if creating a table in the Data Catalog. Defaults to ‘default’

  • catalog_table (Optional[str]) – If present, written data will be available in AWS Data Catalog / Glue / Athena with the indicated table name.

  • format_options (Dict[str, Any]) – Additional options for the data loader. Documented here: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format.html

type_name: str | None = 'redun.ShardedS3Dataset'#
update_hash() None#
class redun.Task(func: Callable, name: str | None = None, namespace: str | None = None, version: str | None = None, compat: List[str] | None = None, script: bool = False, task_options_base: dict | None = None, task_options_override: dict | None = None, hash_includes: list | None = None, source: str | None = None)#

Bases: Value, Generic[Func]

A redun Task.

Tasks are the basic unit of execution in redun. Tasks are often defined using the redun.task.task() decorator:

@task()
def my_task(x: int) -> int:
    return x + 1

Similar to pickling of functions, Tasks specify the work to execute by reference, not by value. This is important for serialization and caching, since a task is always reattached to the latest implementation and task options just-in-time.

Task options may give direction to the scheduler about the execution environment, but must not alter the result of the function, since changing options will not trigger cache invalidation and we are allowed to return cached results with differing options. These can be provided at two times: 1) at module-load time by decorator options, 2) at run-time by creating task expressions. The former are not serialized so they are attached the latest value in the source code. The latter must be serialized, since they are not available except at run-time.

Task are routinely serialized and deserialized and are cached based on their serialization. This relies on the serialized format being as minimal as possible, while providing the needed reference to the code to run.

If needed, extra hashable data may be provided to trigger re-evaluation/cache-invalidation.

T = ~T#
property fullname: str#

‘{namespace}.{name}’

Type:

Returns the fullname of a Task

get_hash(data: bytes | None = None) str#

Returns the Task hash.

get_task_option(option_name: str) Any | None#
get_task_option(option_name: str, default: T) T

Fetch the requested option, preferring run-time updates over options from task construction. Like the dictionary get method, returns the default a KeyError on missing keys.

get_task_options() dict#

Merge and return the task options.

has_task_option(option_name: str) bool#

Return true if the task has an option with name option_name

is_valid() bool#

Returns True if the Task Value is still valid (task hash matches registry).

Tasks are first-class Values in redun. They can be cached and fetched in future executions. When fetching a Task from the cache, the cached hash might no longer exist in the code base (registered tasks).

property load_module: str#
property nout: int | None#

Determines nout from task options and return type.

The precedence is: - the task option - function return type

options(**task_options_update: Any) Task[Func]#

Returns a new Task with task_option overrides.

partial(*args, **kwargs) PartialTask[Callable[[...], Result], Callable[[...], Result]]#

Partially apply some arguments to the Task.

recompute_hash()#
property signature: Signature#

Signature of the function wrapped by the task.

type_name: str | None = 'redun.Task'#
update_context(context: dict = {}, **kwargs: Any) Task[Func]#

Update the context variables for the task.

(scheduler_task)redun.apply_tags(value: Any, tags: List[Tuple[str, Any]] = [], job_tags: List[Tuple[str, Any]] = [], execution_tags: List[Tuple[str, Any]] = []) T#

Apply tags to a value, job, or execution.

Returns the original value.

@task
def task1():
    x = 10
    # Apply tags on the value 10.
    return apply_tags(x, [("env": "prod", "project": "acme")])

@task
def task2():
    x = 10
    # Apply tags on the current job.
    return apply_tags(x, job_tags=[("env": "prod", "project": "acme")])

@task
def task3():
    x = 10
    # Apply tags on the current execution.
    return apply_tags(x, execution_tags=[("env": "prod", "project": "acme")])
(scheduler_task)redun.catch(expr: Any, *catch_args: Any) T#

Catch exceptions error of class error_class from expr and evaluate recover(error).

The following example catches the ZeroDivisionError raised by divider(0) and returns 0.0 instead.

@task()
def divider(denom):
    return 1.0 / denom

@task()
def recover(error):
    return 0.0

@task()
def main():
    return catch(
        divider(0),
        ZeroDivisionError,
        recover
    )

This is equivalent to the regular python code:

def divider(denom):
    return 1.0 / denom

def main():
    try:
        return denom(0)
    except ZeroDivisionError as error:
        return 0.0

catch() can also handle multiple Exception classes as well as multiple recover expressions.

@task()
def main():
    return catch(
        task1(),
        (ZeroDivisionError, ValueError),
        recover1,
        KeyError,
        recover2,
    )

which is equivalent to the regular python code:

try:
    return task1()
except (ZeroDivisionError, ValueError) as error:
    return recover1(error)
except KeyError as error:
    return recover2(error)

Implementation/behavior note: Usually, we don’t cache errors, since this is rarely productive. Also recall that we typically cache each round of evaluation separately, allowing the scheduler to retrace the call graph and detect if any subtasks have changed (i.e. hash change) or if any Values in the subworkflow are now invalid (e.g. File hash has changed). To correctly cache a caught expression, we need to cache both the fact that expr resolved to an Exception, then that the recover_expr produced a non-error result. However, we only want to cache the exception if it is successfully handled, so if the recovery re-raises the exception or issues another error, the error should not be cached.

In order to create this behavior, we have to implement custom caching behavior that delays caching the error until we see that it has been successfully handled.

Parameters:
  • expr (Expression) – Main expression to evaluate.

  • error_recover_pairs – A list of alternating error_class and recover Tasks.

(scheduler_task)redun.cond(cond_expr: Any, then_expr: Any, *rest: Any) T#

Conditionally execute expressions, i.e. a lazy if-statement.

@task()
def is_even(x):
    return x % 2 == 0

@task()
def task1():
    # ...

@task()
def task2():
    # ...

@task()
def main(x: int):
    return cond(
        is_even(x),
        task1(),
        task2()
    )
(scheduler_task)redun.get_context(var_path: str, default: Any | None = None) T#

Returns a value from the current context.

Parameters:
  • var_path (str) – A dot-separated path for a variable to fetch from the current context.

  • default (Optional[Any]) – A default value to return if the context variable is not defined. None by default.

redun.get_current_namespace()#

Returns the current task namespace during import.

redun.get_current_scheduler(required=True) Scheduler | None#

Returns the currently running Scheduler for this thread.

redun.get_task_registry()#

Returns the global task registry.

(scheduler_task)redun.merge_handles(handles_expr: List[redun.handle.Handle]) T#

Merge multiple handles into one.

redun.namespace(_namespace=None)#

Set the current task namespace.

Use None to unset the current namespace.

redun.script(command: str | ~typing.List, inputs: ~typing.Any = [], outputs: ~typing.Any = <object object>, tempdir: bool = False, as_mount: bool = False, **task_options: ~typing.Any) Any#

Execute a shell script as a redun task with file staging.

See the docs for a full explanation:

https://insitro.github.io/redun/design.html#file-staging

Parameters:
  • command (Union[str, List]) – Command string or argv list to execute.

  • inputs (Any) – Collection of FileStaging objects used to stage cloud input files to local files.

  • outputs (Any) – Collection of FileStaging objects used to unstage local output files back to cloud storage.

  • tempdir (bool) – If True, run the command within a temporary directory.

  • as_mount (bool) – If True, make use of cloud storage mounting (if available) to stage files.

  • **task_options (Any) – Options to configure the Executor, such as vcpus=2 or memory=3.

Returns:

A result the same shape as outputs but with all FileStaging objects converted to their corresponding remote Files.

Return type:

Any

redun.set_current_scheduler(scheduler: Scheduler | None) None#

Sets the current running Scheduler for this thread.

redun.task(func: Func | None = None, *, name: str | None = None, namespace: str | None = None, version: str | None = None, compat: List[str] | None = None, script: bool = False, hash_includes: list | None = None, source: str | None = None, **task_options_base: Any) Task[Func] | Callable[[Func], Task[Func]]#

Decorator to register a function as a redun Task.

Parameters:
  • func (Optional[Func]) – A python function to register as a redun Task. If not given, a parameterized decorator is returned.

  • name (Optional[str]) – Name of task (Default: infer from function func.__name__)

  • namespace (Optional[str]) – Namespace of task (Default: Infer from context if possible, else None. See Task._compute_namespace)

  • version (Optional[str]) – Optional manual versioning for a task (Default: source code of task is hashed).

  • compat (Optional[List[str]]) – Optional redun version compatibility. Not currently implemented.

  • script (bool) – If True, this is a script-style task which returns a shell script string.

  • hash_includes (Optional[list]) – If provided, extra data that should be hashed. That is, extra data that should be considered as part of cache invalidation. This list may be reordered without impacting the computation. Each list item must be hashable by redun.value.TypeRegistry.get_hash.

  • source (Optional[str]) – If provided, task.source will be set to this string. It is the caller’s responsibility to ensure that source matches the provided func for proper hashing.

  • **task_options_base (Any) –

    Additional options for configuring a task or specifying behaviors of tasks. Since these are provided at task construction time (this is typically at Python module-load time), they are the “base” set. Example keys:

    load_moduleOptional[str]

    The module to load to import this task. (Default: infer from func.__module__)

    wrapped_taskOptional[Task]

    If present, a reference to the task wrapped by this one.

(task)redun.throw(error: Exception) None#

Raises an exception.

This is task is useful in cases where raising needs to be done lazily.