redun package¶
Subpackages¶
- redun.backends package
- Subpackages
- Submodules
- redun.backends.base module
RedunBackend
RedunBackend.advance_handle()
RedunBackend.check_cache()
RedunBackend.delete_tags()
RedunBackend.explain_cache_miss()
RedunBackend.get_job()
RedunBackend.get_records()
RedunBackend.get_tags()
RedunBackend.get_value()
RedunBackend.is_valid_handle()
RedunBackend.iter_record_ids()
RedunBackend.load()
RedunBackend.put_records()
RedunBackend.record_call_node()
RedunBackend.record_call_node_context()
RedunBackend.record_execution()
RedunBackend.record_job_end()
RedunBackend.record_job_start()
RedunBackend.record_tags()
RedunBackend.record_updated_time()
RedunBackend.record_value()
RedunBackend.rollback_handle()
RedunBackend.set_eval_cache()
RedunBackend.update_tags()
TagEntity
- redun.backends.value_store module
- Module contents
- redun.console package
- Submodules
- redun.console.app module
- redun.console.parser module
- redun.console.screens module
ExecutionScreen
ExecutionScreen.BINDINGS
ExecutionScreen.action_back()
ExecutionScreen.action_focus()
ExecutionScreen.action_next()
ExecutionScreen.action_prev()
ExecutionScreen.action_refresh()
ExecutionScreen.action_repl()
ExecutionScreen.action_unfocus()
ExecutionScreen.can_focus
ExecutionScreen.can_focus_children
ExecutionScreen.compose()
ExecutionScreen.get_parser()
ExecutionScreen.get_path()
ExecutionScreen.jobs
ExecutionScreen.load_jobs()
ExecutionScreen.on_job_list_selected()
ExecutionScreen.on_table_selected()
ExecutionScreen.path_pattern
ExecutionScreen.watch_args()
ExecutionsNamespaceScreen
ExecutionsNamespaceScreen.BINDINGS
ExecutionsNamespaceScreen.action_repl()
ExecutionsNamespaceScreen.action_sort()
ExecutionsNamespaceScreen.can_focus
ExecutionsNamespaceScreen.can_focus_children
ExecutionsNamespaceScreen.compose()
ExecutionsNamespaceScreen.get_parser()
ExecutionsNamespaceScreen.get_path()
ExecutionsNamespaceScreen.load_namespaces()
ExecutionsNamespaceScreen.on_mount()
ExecutionsNamespaceScreen.on_table_selected()
ExecutionsNamespaceScreen.path_pattern
ExecutionsNamespaceScreen.watch_args()
ExecutionsScreen
ExecutionsScreen.BINDINGS
ExecutionsScreen.action_back()
ExecutionsScreen.action_next()
ExecutionsScreen.action_prev()
ExecutionsScreen.action_refresh()
ExecutionsScreen.action_repl()
ExecutionsScreen.can_focus
ExecutionsScreen.can_focus_children
ExecutionsScreen.compose()
ExecutionsScreen.get_parser()
ExecutionsScreen.get_path()
ExecutionsScreen.load_executions()
ExecutionsScreen.notify_args()
ExecutionsScreen.on_execution_list_selected()
ExecutionsScreen.on_screen_resume()
ExecutionsScreen.path_pattern
ExecutionsScreen.watch_args()
FilesScreen
FilesScreen.BINDINGS
FilesScreen.action_next()
FilesScreen.action_prev()
FilesScreen.action_repl()
FilesScreen.can_focus
FilesScreen.can_focus_children
FilesScreen.compose()
FilesScreen.get_parser()
FilesScreen.get_path()
FilesScreen.load_files()
FilesScreen.on_table_selected()
FilesScreen.path_pattern
FilesScreen.watch_args()
FilterScreen
HelpScreen
JobScreen
JobScreen.BINDINGS
JobScreen.action_children()
JobScreen.action_click_exec()
JobScreen.action_click_parent_job()
JobScreen.action_click_task()
JobScreen.action_click_value()
JobScreen.action_parent()
JobScreen.action_repl()
JobScreen.can_focus
JobScreen.can_focus_children
JobScreen.compose()
JobScreen.get_path()
JobScreen.path_pattern
RedunScreen
RedunScreen.action_back()
RedunScreen.action_filter()
RedunScreen.can_focus
RedunScreen.can_focus_children
RedunScreen.compose()
RedunScreen.get_options()
RedunScreen.get_parser()
RedunScreen.get_path()
RedunScreen.notify_args()
RedunScreen.on_mount()
RedunScreen.parse()
RedunScreen.path_pattern
RedunScreen.watch_args()
ReplScreen
ReplScreen.BINDINGS
ReplScreen.action_clear()
ReplScreen.can_focus
ReplScreen.can_focus_children
ReplScreen.compose()
ReplScreen.get_path()
ReplScreen.on_command_input_complete()
ReplScreen.on_input_submitted()
ReplScreen.on_mount()
ReplScreen.on_write()
ReplScreen.path_pattern
ReplScreen.update()
ReplScreen.write_intro()
SearchScreen
SearchScreen.BINDINGS
SearchScreen.action_next()
SearchScreen.action_prev()
SearchScreen.action_repl()
SearchScreen.can_focus
SearchScreen.can_focus_children
SearchScreen.compose()
SearchScreen.get_parser()
SearchScreen.get_path()
SearchScreen.load_results()
SearchScreen.on_input_submitted()
SearchScreen.on_table_selected()
SearchScreen.page
SearchScreen.path_pattern
SearchScreen.results
SearchScreen.watch_args()
SearchScreen.watch_page()
SearchScreen.watch_results()
TaskScreen
TaskVersionsScreen
TaskVersionsScreen.BINDINGS
TaskVersionsScreen.action_repl()
TaskVersionsScreen.can_focus
TaskVersionsScreen.can_focus_children
TaskVersionsScreen.compose()
TaskVersionsScreen.get_path()
TaskVersionsScreen.load_tasks()
TaskVersionsScreen.on_table_selected()
TaskVersionsScreen.path_pattern
TaskVersionsScreen.watch_args()
TasksScreen
ValueScreen
ValueScreen.BINDINGS
ValueScreen.action_click_value()
ValueScreen.action_downstream_jobs()
ValueScreen.action_repl()
ValueScreen.action_upstream()
ValueScreen.action_upstream_jobs()
ValueScreen.can_focus
ValueScreen.can_focus_children
ValueScreen.compose()
ValueScreen.get_path()
ValueScreen.path_pattern
tree_sort_jobs()
- redun.console.upstream_screen module
UpstreamDataflowScreen
UpstreamDataflowScreen.BINDINGS
UpstreamDataflowScreen.action_click_hash()
UpstreamDataflowScreen.action_click_value()
UpstreamDataflowScreen.action_repl()
UpstreamDataflowScreen.can_focus
UpstreamDataflowScreen.can_focus_children
UpstreamDataflowScreen.compose()
UpstreamDataflowScreen.get_path()
UpstreamDataflowScreen.path_pattern
display_call_node()
display_dataflow()
display_hash()
display_link()
display_node()
display_section()
display_value()
- redun.console.utils module
- redun.console.widgets module
- Module contents
- redun.executors package
- Submodules
- redun.executors.alias module
- redun.executors.aws_batch module
AWSBatchExecutor
AWSBatchJobTimeoutError
aws_describe_jobs()
batch_submit()
create_job_override_command()
ecr_push()
equiv_job_def()
format_log_stream_event()
get_batch_job_name()
get_batch_job_options()
get_default_registry()
get_hash_from_job_name()
get_job_def_revision()
get_job_definition()
get_job_details()
get_job_log_stream()
get_or_create_job_definition()
get_or_create_repo()
is_array_job_name()
iter_batch_job_log_lines()
iter_batch_job_logs()
iter_batch_job_status()
make_job_def_name()
parse_job_error()
parse_job_logs()
parse_nullable_json()
submit_command()
submit_task()
- redun.executors.aws_glue module
AWSGlueError
AWSGlueExecutor
AWSGlueJobStoppedError
AWSGlueJobTimeoutError
get_default_glue_service_role()
get_glue_oneshot_scratch_file()
get_job_insight_traceback()
get_or_create_glue_job_definition()
get_redun_lib_files()
get_redun_lib_scratch_file()
get_spark_history_dir()
glue_describe_jobs()
package_redun_lib()
parse_job_error()
submit_glue_job()
- redun.executors.aws_utils module
- redun.executors.base module
- redun.executors.code_packaging module
- redun.executors.command module
- redun.executors.docker module
- redun.executors.gcp_batch module
- redun.executors.gcp_utils module
- redun.executors.k8s module
- redun.executors.k8s_utils module
- redun.executors.launch module
- redun.executors.local module
- redun.executors.scratch module
- Module contents
Submodules¶
redun.azure_utils module¶
Helper functions for Azure integration.
- redun.azure_utils.get_az_credential()¶
Retrieve credential object and validate if it has permissions for storage operations (it doesn’t validate the permissions to specific account, only if it can get token from storage.azure.com).
- The order of getting credentials is:
DefaultAzureCredential. By default, it uses AZURE_CLIENT_ID to look up managed identity info.
AzureMLOnBehalfOfCredential which is available only in AML compute (the presence of AZUREML_WORKSPACE_ID env variable is used to determine if we run in AML compute).
ManagedIdentityCredential if DEFAULT_IDENTITY_CLIENT_ID env variable is present. This is the last option in case we run a task in a compute cluster with default assigned identity and we want to override it.
- redun.azure_utils.is_azure_managed_node() bool ¶
Function that checks if the current compute is backed by an Azure VM. Returns False for local devices or VMs hosted on other clouds (EC2). https://learn.microsoft.com/en-us/azure/virtual-machines/instance-metadata-service?tabs=linux
redun.bcoding module¶
Main change made here is to explicitly safeguard against encoding bools as they derive from int. Formatting and linting changes were also applied.
Original License:
Copyright (c) 2013 Phil Schaf
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
bencode/decode library.
bencoding is used in bittorrent files
use the exposed functions to encode/decode them.
- redun.bcoding.assert_btype(byte, typ)¶
- redun.bcoding.bdecode(f_or_data)¶
bdecodes data by looking up the type byte, and using it to look up the respective decoding function, which in turn is used to return the decoded object
The parameter can be a file opened in bytes mode, bytes or a string (the last of which will be decoded)
- redun.bcoding.bencode(data, f=None)¶
Writes a serializable data piece to f The order of tests is nonarbitrary, as strings and mappings are iterable.
If f is None, it writes to a byte buffer and returns a bytestring
- redun.bcoding.main(args=None)¶
Decodes bencoded files to python syntax (like JSON, but with bytes support)
redun.cli module¶
- class redun.cli.ArgFormatter(prog, indent_increment=2, max_help_position=24, width=None)¶
Bases:
ArgumentDefaultsHelpFormatter
,RawDescriptionHelpFormatter
An argument formatter that shows default values and does not reflow description strings.
- class redun.cli.RedunClient(stdout: ~typing.IO = <_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, stderr: ~typing.IO = <_io.TextIOWrapper name='<stderr>' mode='w' encoding='utf-8'>)¶
Bases:
object
Command-line (CLI) client for interacting with redun.
- STATUS_WIDTH = 6¶
- aws_kill_jobs_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Kill AWS Batch jobs.
- aws_list_jobs_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
List AWS Batch jobs.
- aws_logs_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Fetch AWS Batch job logs.
- check_version(required_version: str) None ¶
Enfore a required redun version.
- console_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Performs the console command (CLI GUI).
- db_downgrade_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
- db_info_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Display information about redun repo database.
- db_migrate_command(direction: str, args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Migrate redun repo database to a new version.
- db_upgrade_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
- db_versions_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Display all available redun repo database versions.
- display(*messages: Any, pretty: bool = False, indent: int = 0, newline: bool = True) None ¶
Write text to standard output.
- execute(argv: List[str] | None = None) Any ¶
Execute a command from the command line.
- export_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Export records from redun database.
- fs_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Give information on redun-supported filesystems.
- fs_copy_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Copy files between redun-supported filesystems.
- get_backend(args: Namespace) RedunBackendDb ¶
- get_command_parser() ArgumentParser ¶
Returns the command line parser.
- get_record_ids(prefix_ids: Sequence[str]) List[str] ¶
Expand prefix record ids to full record ids.
- get_session(args: Namespace) Session ¶
- help_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Show help information.
- import_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Import records into redun database.
- infer_file_path(path: str) Base | None ¶
Try to infer if path matches any File.
Returns a query iterating over Files and relevant Jobs.
- infer_id(id: str, include_files: bool = True, required: bool = False) Any ¶
Try to infer the record based on an id prefix.
- init_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Initialize redun project directory.
- launch_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Run a workflow within an Executor (e.g. docker or batch).
- log_call_node(call_node: CallNode, indent: int = 0, show_children: bool = True, show_parents: bool = True, show_result: bool = True, show_task: bool = True, show_dataflow: bool = True, detail: bool = True) None ¶
Display a CallNode.
- log_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Performs the log command.
This is the main command for querying the CallGraph.
- log_dataflow(value: Value, value_name: str = 'value', indent: int = 0)¶
Display the upstream dataflow of a Value.
- log_execution(execution: Execution, show_jobs: bool = True, indent: int = 0, detail: bool = True) None ¶
Display an Execution.
- log_files(query: CallGraphQuery, indent: int = 0) None ¶
Display File-specific query results.
- log_job(job: Job, indent: int = 0, show_children: bool = False, detail: bool = True) None ¶
Display a Job.
- log_record(record: Any, detail: bool = True, indent: int = 0, format: str = 'text') None ¶
Display one record or a list of records.
- log_task(task: Task | Task, show_source: bool = True, show_job: bool = True, indent: int = 0, detail: bool = True) None ¶
Display a Task.
- oneshot_command(args: Namespace, extra_args: List[str], argv: List[str]) Any ¶
Evaluate a single task.
- pull_command(args, extra_args, argv)¶
Pull records into another redun database.
- push_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Push records into a redun repository.
- repl_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Get a read-eval-print-loop for querying the history.
- repo_add_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
- repo_list_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
- repo_remove_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
- resolve_context_updates(context_file: str, context_updates: List[str]) Dict[str, Any] ¶
Create a dict with updates to the context from command line arguments.
- run_command(args: Namespace, extra_args: List[str], argv: List[str]) Any ¶
Performs the run command.
- server_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Run redun local web server UI.
- start_pager() None ¶
Redirect output to a pager if stdout is a tty.
- stop_pager() None ¶
Stop sending output to a pager.
- tag_add_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Add new tags to entities.
- tag_list_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Search tags.
- tag_rm_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Delete tags.
- tag_update_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Update entity tags.
- viz_command(args: Namespace, extra_args: List[str], argv: List[str]) None ¶
Performs the visualization command.
- exception redun.cli.RedunClientError¶
Bases:
Exception
- redun.cli.add_value_arg_parser(parser: ArgumentParser, arg_name: str, anno: Any, default: Any) Action ¶
Add argument parser inferred from a parameter type.
- redun.cli.arg_name2cli_name(arg_name: str) str ¶
Convert a snake_case argument into a –kabob-case argument.
- redun.cli.check_version(version: str, version_spec: str) bool ¶
Returns True if version satisfies version specification.
- redun.cli.find_config_dir(cwd: str | None = None) str | None ¶
Search up directories from current working directory to find config dir.
- redun.cli.format_arguments(args: List[Argument]) str ¶
Display CallNode arguments.
For example, if args has 2 positional and 1 keyword argument, we would display that as:
‘prog’, 10, extra_file=File(path=prog.c, hash=763bc10f)
- redun.cli.format_id(id: str, detail: bool = False, prefix=8) str ¶
Display a record id.
- redun.cli.format_setup_help(parser: ArgumentParser) Iterator[str] ¶
Yields lines of help text for setup argument parser.
- redun.cli.get_abs_path(path: str) str ¶
Returns absolute path of input string, which can be an s3 path.
- redun.cli.get_anno_origin(anno: Any) Any | None ¶
Returns the origin of an annotation.
Origin is Python’s term for the main type of a generic type. For example, the origin of List[int] is list and its argument is int. This function abstracts away change that have occurred in the Python API since 3.6.
https://docs.python.org/3/library/typing.html#typing.get_origin
- redun.cli.get_config_dir(config_dir: str | None = None) str ¶
Get the redun config dir.
We use the following precedence: - command line (config_dir) - environment variable - search filesystem (parent directories) for config dir - assume .redun in current working directory.
Note that the returned config dir may be relative.
- redun.cli.get_config_path(config_dir: str | None = None) str ¶
- redun.cli.get_default_execution_tags(user: str | None = None, project: str | None = None, doc: str | None = None, exclude_users: List[str] = []) List[Tuple[str, Any]] ¶
Get default tags for the Execution.
- redun.cli.get_gcp_user() str | None ¶
Returns the user account for any active GCP logins, otherwise None.
- redun.cli.get_setup_parser(setup_func: Callable) Tuple[ArgumentParser, Dict[str, str]] ¶
Returns an ArgumentParser for setup arguments.
- redun.cli.get_task_arg_parser(task: Task, include_defaults: bool) Tuple[ArgumentParser, Dict[str, str]] ¶
Returns a CLI parser for a redun Task.
- Parameters:
task (BaseTask) – The task to generate a parser for.
include_defaults (bool) – If true, set defaults for the parser based on the task defaults. If false, do not. This can be useful for determining if a particular argument was actually set by the user.
- redun.cli.get_user_setup_func(config: Config) Callable[[...], Scheduler] ¶
Returns scheduler setup function based on user config.
- redun.cli.get_username() str ¶
Returns the current redun user
- redun.cli.import_script(filename_or_module: str, add_cwd: bool = True) ModuleType ¶
Import a python script as a module.
- Parameters:
filename_or_module (str) – This argument can be a filepath to a python script (e.g. path/to/script.py) or a dot-delimited module (e.g. lib.workflows.workflow).
add_cwd (bool) – If True, add the current working directory to the python import paths (sys.path).
- redun.cli.is_port_in_use(hostname: str, port: int | str) bool ¶
Check if TCP/IP port on hostname is in use
- redun.cli.is_python_filename(name: str) bool ¶
Returns True if string looks like a python filename.
- redun.cli.make_parse_arg_func(arg_anno: Any) Callable[[str], Any] ¶
Returns parser for argument annotation.
- redun.cli.parse_func_path(path: str) Tuple[str, str] ¶
Parses a function path ‘file_or_module::func’.
- Parameters:
path (str) – path should have the format: ‘file_or_module::func’, where file_or_module is a filename or python module and func is a function name.
- Returns:
A tuple of file_or_module and function name.
- Return type:
Tuple[str, str]
- redun.cli.parse_setup_args(setup_func: Callable, setup_args: List[str] | None) Dict[str, Any] ¶
Parse setup arguments into keyword arguments.
- redun.cli.parse_version(version: str) Tuple ¶
Parse a version number into a tuple of ints.
- redun.cli.run_command(argv: List[str]) str | None ¶
Run a command and return its output.
- redun.cli.setup_backend_db(config_dir: str | None = None, repo: str = 'default') RedunBackendDb ¶
Setup RedunBackendDb from config directory.
- redun.cli.setup_config(config_dir: str | None = None, db_uri: str | None = None, repo: str = 'default', initialize=True) Config ¶
Setup config file.
- redun.cli.setup_repo_config(config: Config, repo: str) Config ¶
Uses configuration from another repository specified in local config.
- redun.cli.setup_scheduler(config_dir: str | None = None, setup_args: List[str] | None = None, repo: str = 'default', migrate: bool | None = None, migrate_if_local: bool = False) Scheduler ¶
Setup Scheduler from config directory.
- redun.cli.with_pager(client: RedunClient, args: Namespace) Iterator[None] ¶
Context manager for running a pager (e.g. less) for output.
redun.config module¶
- class redun.config.Config(config_dict: dict | None = None, configdir: str | None = None)¶
Bases:
object
Extends ConfigParser to support nested sections.
- property configdir: str¶
- get(key: str, default: Any | None = None) Any ¶
- get_config_dict(replace_config_dir=None) Dict[str, Dict] ¶
Return a python dict that can be used to reconstruct the Config object
config2 = Config(config_dict=config1.get_config_dict()) should result in identical Configs config1 and config2.
Note: The structure of the returned Dict is essentially a two-level dict corresponding to INI file structure. Top-level key is a dot-separated section name. Top-level value is a single-level dict containing key/values for a single section.
- Parameters:
replace_config_dir – if not None, replaces any variables containing the machine-local config_dir (as obtained by redun.cli.get_config_dir() with replace_config_dir. Typical values that are replaced are backend::config_dir, db_uri, repos.default::config_dir.
- Returns:
A copy of this object’s config in two-level format, optionally with values containing
the local config directory replaced.
- items()¶
- keys() Iterable[str] ¶
- read_dict(config_dict: dict) None ¶
- read_path(filename: str) None ¶
- read_string(string: str) None ¶
- class redun.config.RedunConfigParser(*args, config: Config, **kwargs)¶
Bases:
ConfigParser
- optionxform(optionstr)¶
- class redun.config.RedunExtendedInterpolation¶
Bases:
ExtendedInterpolation
When performing variable interpolation fallback to environment variables.
For example, if the environment variable ROLE is defined, we can reference it in the redun.ini file as follows:
- before_get(parser, section, option, value, defaults)¶
- redun.config.create_config_section(config_dict: dict | None = None) SectionProxy ¶
Create a default section.
redun.context module¶
- (scheduler_task)redun.context.get_context(var_path: str, default: Any | None = None) T ¶
Returns a value from the current context.
- Parameters:
var_path (str) – A dot-separated path for a variable to fetch from the current context.
default (Optional[Any]) – A default value to return if the context variable is not defined. None by default.
- redun.context.get_context_value(context: dict, var_path: str, default: T) T ¶
Returns a value from a context dict using a dot-separated path (e.g. ‘foo.bar.baz’).
- Parameters:
context (dict) – A redun context dict.
var_path (str) – A dot-separated path for a variable to fetch from the current context.
default (T) – A default value to return if the context variable is not defined.
redun.db_utils module¶
- redun.db_utils.filter_in(query: Query, column: Column, values: Iterable, chunk: int = 100) Iterable[Any] ¶
Perform an IN-filter on a sqlalchemy query with an iterable of values.
Returns an iterable of results.
- redun.db_utils.get_or_create(session: Session, Model: ModelType, filter: Dict[str, Any], update: Dict[str, Any] | None = None) Tuple[ModelType, bool] ¶
Get or create a row for a sqlalchemy Model.
- redun.db_utils.query_filter_in(query: Query, column: Column, values: Iterable, chunk: int = 100) Iterable[Query] ¶
Perform an IN-filter on a sqlalchemy query with an iterable of values.
Returns an iterable of queries with IN-filter applied.
redun.expression module¶
- class redun.expression.ApplyExpression(args: Tuple, kwargs: dict)¶
Bases:
Expression
[Result
]Lazy expression for applying a function or task to arguments.
- type_name: str | None = 'redun.expression.ApplyExpression'¶
- class redun.expression.Expression¶
Bases:
Value
,Generic
[Result
]Base class for lazy expressions.
Lazy expressions are used to defer compute until the redun scheduler is ready to execute it.
- get_hash(data: bytes | None = None) str ¶
Returns a hash for the value. Users may override to provide custom behavior.
Most basic objects are covered by the default approach of hashing the binary pickle serialization.
- type_name: str | None = 'redun.expression.Expression'¶
- class redun.expression.QuotedExpression(expr: Result)¶
Bases:
Generic
[Result
]A quoted expression that does will not evaluate until forced.
- eval() Result ¶
Evaluate the quoted expression.
- class redun.expression.SchedulerExpression(task_name: str, args: Tuple, kwargs: dict, task_options: dict | None = None, length: int | None = None)¶
Bases:
TaskExpression
[Result
]Lazy expression that is evalulated within the scheduler for redun-specific operations.
- type_name: str | None = 'redun.expression.SchedulerExpression'¶
- class redun.expression.SimpleExpression(func_name: str, args: Tuple = (), kwargs: dict = {})¶
Bases:
ApplyExpression
[Result
]Lazy expression for a simple computation (e.g. getattr, getitem, call).
- type_name: str | None = 'redun.expression.SimpleExpression'¶
- class redun.expression.TaskExpression(task_name: str, args: Tuple, kwargs: dict, task_options: dict | None = None, length: int | None = None)¶
Bases:
ApplyExpression
[Result
]Lazy expression for applying a task to arguments.
- is_valid() bool ¶
Returns True if the value may be used. For ordinary values, this will typically always be true. For state-tracking values, the deserialized cache value may no longer be valid. If this method returns False, the object will typically be discarded.
This method may be called repeatedly on the same object.
For example, if the value was a File reference and the corresponding file on the filesystem has since been removed/altered, the File reference is no longer valid.
- type_name: str | None = 'redun.expression.TaskExpression'¶
- class redun.expression.ValueExpression(value: Result)¶
Bases:
Expression
[Result
]Lifts a concrete value into an Expression type.
- type_name: str | None = 'redun.expression.ValueExpression'¶
- redun.expression.derive_expression(orig_expr: Expression, derived_value: Expression | Any) Expression ¶
Record derived_value as downstream of orig_expression.
- redun.expression.format_arguments(args, kwargs) str ¶
Format Task arguments.
- redun.expression.get_lazy_operation(name: str) Callable | None ¶
Retrieves lazy operation by registered name
- redun.expression.lazy_operation(method: str | None = None, name: str | None = None) Callable[[Callable], Callable] ¶
Function decorator to declare lazy operations on Expression object.
- Parameters:
method – Name of method registered to Expression class. Defaults to None.
name – Registered name of operation by which it will be retrieved. Defaults to None.
- redun.expression.quote(expr: Result) QuotedExpression[Result] ¶
Quote an Expression so that it does not evaluate.
redun.federated_tasks module¶
- exception redun.federated_tasks.InvocationException¶
Bases:
Exception
Custom exception that is raised when there is an error encountered invoking an AWS lambda.
- (scheduler_task)redun.federated_tasks.federated_task(entrypoint: str, *task_args, **task_kwargs) T ¶
Execute a task that has been indirectly specified in the scheduler config file by providing a federated_task and its executor. This allows us to invoke code we do not have locally.
Since the code isn’t visible, the cache_scope for the subrun is always set to CSE.
- Parameters:
entrypoint (str) – The name of the federated_task section in the config that identifies the task to perform.
task_args (Optional[List[Any]]) – Positional arguments for the task
task_kwargs (Any) – Keyword arguments for the task
- (scheduler_task)redun.federated_tasks.lambda_federated_task()¶
Submit a task to an AWS Lambda Function, by packaging the inputs and sending as the payload with instructions.
This allows a workflow to trigger execution of a remote workflow, to be executed by another service.
This is fire-and-forget, you do not get back the results. We do not implement a way to wait for results, because it’s not currently possible to monitor the backend database and gracefully determine whether an execution has finished. Specifically, executions look “errored” until they succeed, so we would have to rely on timeouts to determine if an error actually happened. Or, we would need to create another side-channel to record progress, which we did not want to undertake.
- Parameters:
config_name (str) – The name of the config to use; the lambda defines how this name is chosen, not redun.
entrypoint (str) – The name of the entrypoint into the above config.
url (str) – The name of the AWS Lambda function to invoke
scratch_prefix (str) – The path where the packaged inputs can be written. This is dictated by the lambda, hence is a user input.
dryrun (bool) – If false, actually invoke the lambda. If true, skip it.
itself. (Other args are intended for the task)
- Returns:
str – The execution ID we asked the proxy lambda to use
Dict – The data package.
- redun.federated_tasks.launch_federated_task(federated_config_path: str, entrypoint: str, task_args: Tuple | None = None, task_kwargs: Dict | None = None, input_path: str | None = None, execution_id: str | None = None) str ¶
Launch the described federated task. Among other purposes, this is designed to make it easy to implement a REST server that can process the messages from rest_federated_task.
As with the launch CLI verb, we briefly start a scheduler to help with sending off the work, but immediately shut it down.
Inputs may be provided in explicit form via task_args and task_kwargs, or pre-packaged, but not both.
- Parameters:
federated_config_path (str) – The path to the federated config file to use.
entrypoint (str) – The name of the entrypoint into the above config.
task_args (Optional[Tuple]) – Arguments to package and pass to the task.
task_kwargs (Optional[Dict]) – Arguments to package and pass to the task.
input_path (Optional[str]) – The path to already-packaged inputs, which must be suitable for use with the redun launch flag –input. This allows a server to forward inputs created by another application. This file is not opened, because we don’t assume we have the code to unpickle the objects inside.
execution_id (Optional[str]) – If provided, use this execution id. Otherwise, one will be generated for you.
- Returns:
The execution id we asked the redun invocation to use.
- Return type:
str
- (scheduler_task)redun.federated_tasks.rest_federated_task()¶
Submit a task to a server over REST, by packaging the inputs and POSTing a data blob with instructions.
This allows a workflow to trigger execution of a remote workflow, to be executed by another service.
This is fire-and-forget, you do not get back the results. We do not implement a way to wait for results, because it’s not currently possible to monitor the backend database and gracefully determine whether an execution has finished. Specifically, executions look “errored” until they succeed, so we would have to rely on timeouts to determine if an error actually happened. Or, we would need to create another side-channel to record progress, which we did not want to undertake.
- Parameters:
config_name (str) – The name of the config to use; the server defines how this name is chosen, not redun.
entrypoint (str) – The name of the entrypoint into the above config.
url (str) – The URL to POST the result to.
scratch_prefix (str) – The path where the packaged inputs can be written. This is dictated by the server, hence is a user input.
dryrun (bool) – If false, actually perform the POST. If true, skip it.
itself. (Other args are intended for the task)
- Returns:
str – The execution ID we asked the proxy to use
Dict – The data package.
redun.file module¶
- class redun.file.AzureBlobFileSystem¶
Bases:
FsspecFileSystem
- property az_credential¶
- copy(src_path: str, dest_path: str) None ¶
Copy a file from src_path to dest_path.
- exists(path: str) bool ¶
Returns True if path exists on filesystem.
- filesize(path: str) int ¶
Returns file size of path in bytes.
- static get_account_name_from_path(path: str) str ¶
- get_fs_for_path(path: str)¶
- get_hash(path: str) str ¶
Return a hash for the file at path.
- glob(pattern: str) List[str] ¶
Returns filenames matching pattern.
- isdir(path: str) bool ¶
Returns True if path is a directory.
- isfile(path: str) bool ¶
Returns True if path is a file.
- mkdir(path: str) None ¶
Creates the directory in the filesystem.
- name: str = 'az'¶
- remove(path: str) None ¶
Delete a path from the filesystem.
- rmdir(path: str, recursive: bool = False) None ¶
Removes a directory from the filesystem. If recursive, removes all contents of the directory. Otherwise, raises OSError on non-empty directories
- class redun.file.ContentDir(path: str)¶
Bases:
Dir
- classes = <redun.file.ContentFileClasses object>¶
- type_basename = 'ContentDir'¶
- type_name: str | None = 'redun.ContentDir'¶
- class redun.file.ContentFile(path: str)¶
Bases:
File
Content-based file hashing.
- classes = <redun.file.ContentFileClasses object>¶
- type_basename = 'ContentFile'¶
- type_name: str | None = 'redun.ConentFile'¶
- class redun.file.ContentFileClasses¶
Bases:
FileClasses
A grouping of related ContentFile classes.
- class redun.file.ContentFileSet(pattern: str)¶
Bases:
FileSet
- classes = <redun.file.ContentFileClasses object>¶
- type_basename = 'ContentFileSet'¶
- type_name: str | None = 'redun.CnotentFileSet'¶
- class redun.file.ContentStagingDir(local: Dir | str, remote: Dir | str)¶
Bases:
StagingDir
- classes = <redun.file.ContentFileClasses object>¶
- type_basename = 'ContentStagingDir'¶
- type_name: str | None = 'redun.ContentStagingDir'¶
- class redun.file.ContentStagingFile(local: File | str, remote: File | str)¶
Bases:
StagingFile
- classes = <redun.file.ContentFileClasses object>¶
- type_basename = 'ContentStagingFile'¶
- type_name: str | None = 'redun.ContentStagingFile'¶
- class redun.file.Dir(path: str)¶
Bases:
FileSet
- classes = <redun.file.FileClasses object>¶
- exists() bool ¶
- mkdir() None ¶
- rel_path(path: str) str ¶
- rmdir(recursive: bool = False) None ¶
- shell_copy_to(dest_path: str, as_mount: bool = False) str ¶
Returns a shell command for copying the directory to a destination path.
- Parameters:
dest_path (str) – Destination path to copy to.
as_mount (bool) – Treat src/dest as mounted.
- stage(local: str | None = None) StagingDir ¶
- type_basename = 'Dir'¶
- type_name: str | None = 'redun.Dir'¶
- class redun.file.FTPFileSystem¶
Bases:
FsspecFileSystem
FileSystem methods for a FTP.
- name: str = 'ftp'¶
- class redun.file.File(path: str)¶
Bases:
Value
Class for assisting file IO in redun tasks.
File objects are hashed based on their contents and abstract over storage backends such as local disk or cloud object storage.
- basename() str ¶
- classes = <redun.file.FileClasses object>¶
- dirname() str ¶
- exists() bool ¶
- get_hash(data: bytes | None = None) str ¶
Returns a hash for the value. Users may override to provide custom behavior.
Most basic objects are covered by the default approach of hashing the binary pickle serialization.
- property hash: str¶
- is_valid() bool ¶
Returns True if the value may be used. For ordinary values, this will typically always be true. For state-tracking values, the deserialized cache value may no longer be valid. If this method returns False, the object will typically be discarded.
This method may be called repeatedly on the same object.
For example, if the value was a File reference and the corresponding file on the filesystem has since been removed/altered, the File reference is no longer valid.
- isdir() bool ¶
- isfile() bool ¶
- open(mode: str = 'r', encoding: str | None = None, **kwargs: Any) IO ¶
Open a file stream.
- Parameters:
mode (str) – Stream mode for reading or writing (‘r’, ‘w’, ‘b’, ‘a’).
encoding (str) – Text encoding (e.g. ‘utf-8’) to use when read or writing.
**kwargs – Additional arguments for the underlying file stream. They are Filesystem-specific.
- read(mode: str = 'r', encoding: str | None = None) str | bytes ¶
- readlines(mode: str = 'r') List[str | bytes] ¶
- remove() None ¶
- shell_copy_to(dest_path: str | None, as_mount: bool = False) str ¶
Returns a shell command for copying the file to a destination path.
- Parameters:
dest_path (Optional[str]) – Destination path to copy to. If None, use stdout.
as_mount (Optional[str]) – Copy files from mounted directories.
- size() int ¶
- stage(local: str | None = None) StagingFile ¶
- touch(time: Tuple[int, int] | Tuple[float, float] | None = None) None ¶
- type_basename = 'File'¶
- type_name: str | None = 'redun.File'¶
- update_hash() None ¶
- write(data: str | bytes, mode: str = 'w', encoding: str | None = None) None ¶
- class redun.file.FileClasses¶
Bases:
object
A grouping of related File classes.
- StagingDir: Type[StagingDir]¶
- StagingFile: Type[StagingFile]¶
- class redun.file.FileSet(pattern: str)¶
Bases:
Value
- classes = <redun.file.FileClasses object>¶
- get_hash(data: bytes | None = None) str ¶
Returns a hash for the value. Users may override to provide custom behavior.
Most basic objects are covered by the default approach of hashing the binary pickle serialization.
- property hash: str¶
- is_valid() bool ¶
Returns True if the value may be used. For ordinary values, this will typically always be true. For state-tracking values, the deserialized cache value may no longer be valid. If this method returns False, the object will typically be discarded.
This method may be called repeatedly on the same object.
For example, if the value was a File reference and the corresponding file on the filesystem has since been removed/altered, the File reference is no longer valid.
- type_basename = 'FileSet'¶
- type_name: str | None = 'redun.FileSet'¶
- update_hash() None ¶
- class redun.file.FileSystem¶
Bases:
ABC
Base class filesystem access.
- copy(src_path: str, dest_path: str) None ¶
Copy a file from src_path to dest_path.
- abstract exists(path: str) bool ¶
Returns True if path exists on filesystem.
- abstract filesize(path: str) int ¶
Returns file size of path in bytes.
- abstract get_hash(path: str) str ¶
Return a hash for the file at path.
- abstract glob(pattern: str) List[str] ¶
Returns filenames matching pattern.
- abstract isdir(path: str) bool ¶
Returns True if path is a directory.
- abstract isfile(path: str) bool ¶
Returns True if path is a file.
- abstract mkdir(path: str) None ¶
Creates the directory in the filesystem.
- name: str = 'base'¶
- open(path: str, mode: str, encoding: str | None = None, **kwargs: Any) IO ¶
Open a file stream from the filesystem.
- Parameters:
path (str) – Url or path of file to open.
mode (str) – Stream mode for reading or writing (‘r’, ‘w’, ‘b’, ‘a’).
encoding (str) – Text encoding (e.g. ‘utf-8’) to use when read or writing.
**kwargs – Additional arguments for the underlying file stream. They are Filesystem-specific.
- abstract remove(path: str) None ¶
Delete a path from the filesystem.
- abstract rmdir(path: str, recursive: bool = False) None ¶
Removes a directory from the filesystem. If recursive, removes all contents of the directory. Otherwise, raises OSError on non-empty directories
- shell_copy(src_path: str | None, dest_path: str | None, recursive: bool = False, as_mount: bool = False) str ¶
Returns a shell command for performing a file copy.
- Parameters:
src_path (Optional[str]) – Source path to copy from. If None, use stdin.
dest_path (Optional[str]) – Destination path to copy to. If None, use stdout.
recursive (bool) – If True, copy a directory tree of files.
as_mount (bool) – Treat src/dest as mounted.
- touch(path: str, time: Tuple[int, int] | Tuple[float, float] | None = None) None ¶
Create the path on the filesystem with timestamp.
- class redun.file.FsspecFileSystem¶
Bases:
FileSystem
- copy(src_path: str, dest_path: str) None ¶
Copy a file from src_path to dest_path.
- exists(path: str) bool ¶
Returns True if path exists on filesystem.
- filesize(path: str) int ¶
Returns file size of path in bytes.
- property fs¶
- get_hash(path: str) str ¶
Return a hash for the file at path.
- glob(pattern: str) List[str] ¶
Returns filenames matching pattern.
- isdir(path: str) bool ¶
Returns True if path is a directory.
- isfile(path: str) bool ¶
Returns True if path is a file.
- mkdir(path: str) None ¶
Creates the directory in the filesystem.
- name: str = 'fsspec'¶
- remove(path: str) None ¶
Delete a path from the filesystem.
- rmdir(path: str, recursive: bool = False) None ¶
Removes a directory from the filesystem. If recursive, removes all contents of the directory. Otherwise, raises OSError on non-empty directories
- shell_copy(src_path: str | None, dest_path: str | None, recursive: bool = False, as_mount: bool = False) str ¶
Returns a shell command for performing a file copy.
- Parameters:
src_path (Optional[str]) – Source path to copy from. If None, use stdin.
dest_path (Optional[str]) – Destination path to copy to. If None, use stdout.
recursive (bool) – If True, copy a directory tree of files.
as_mount (bool) – Treat src/dest as mounted.
- class redun.file.GSFileSystem¶
Bases:
FsspecFileSystem
FileSystem methods for a Google Cloud Storage.
- glob(pattern: str) List[str] ¶
Returns filenames matching pattern.
- name: str = 'gs'¶
- shell_copy(src_path: str | None, dest_path: str | None, recursive: bool = False, as_mount: bool = False) str ¶
Returns a shell command for performing a file copy.
- Parameters:
src_path (Optional[str]) – Source path to copy from. If None, use stdin.
dest_path (Optional[str]) – Destination path to copy to. If None, use stdout.
recursive (bool) – If True, copy a directory tree of files.
as_mount (bool) – Treat src/dest as mounted.
- class redun.file.HTTPFileSystem¶
Bases:
FsspecFileSystem
FileSystem methods for a HTTP urls.
- name: str = 'http'¶
- class redun.file.HTTPSFileSystem¶
Bases:
FsspecFileSystem
FileSystem methods for a HTTPS urls.
- name: str = 'https'¶
- class redun.file.IDir(path: str)¶
Bases:
Dir
- classes = <redun.file.IFileClasses object>¶
- type_basename = 'IDir'¶
- type_name: str | None = 'redun.IDir'¶
- class redun.file.IFile(path: str)¶
Bases:
File
Immutable file.
This class should be used for files that are write once and then immutable.
- classes = <redun.file.IFileClasses object>¶
- is_valid() bool ¶
Returns True if the value may be used. For ordinary values, this will typically always be true. For state-tracking values, the deserialized cache value may no longer be valid. If this method returns False, the object will typically be discarded.
This method may be called repeatedly on the same object.
For example, if the value was a File reference and the corresponding file on the filesystem has since been removed/altered, the File reference is no longer valid.
- type_basename = 'IFile'¶
- type_name: str | None = 'redun.IFile'¶
- class redun.file.IFileClasses¶
Bases:
FileClasses
A grouping of related IFile classes.
- class redun.file.IFileSet(pattern: str)¶
Bases:
FileSet
- classes = <redun.file.IFileClasses object>¶
- is_valid() bool ¶
Returns True if the value may be used. For ordinary values, this will typically always be true. For state-tracking values, the deserialized cache value may no longer be valid. If this method returns False, the object will typically be discarded.
This method may be called repeatedly on the same object.
For example, if the value was a File reference and the corresponding file on the filesystem has since been removed/altered, the File reference is no longer valid.
- type_basename = 'IFileSet'¶
- type_name: str | None = 'redun.IFileSet'¶
- class redun.file.IStagingDir(local: Dir | str, remote: Dir | str)¶
Bases:
StagingDir
- classes = <redun.file.IFileClasses object>¶
- type_basename = 'IStagingDir'¶
- type_name: str | None = 'redun.IStagingDir'¶
- class redun.file.IStagingFile(local: File | str, remote: File | str)¶
Bases:
StagingFile
- classes = <redun.file.IFileClasses object>¶
- type_basename = 'IStagingFile'¶
- type_name: str | None = 'redun.IStagingFile'¶
- class redun.file.LocalFileSystem¶
Bases:
FileSystem
FileSystem methods for a local POSIX filesystem.
- copy(src_path: str, dest_path: str) None ¶
Copy a file from src_path to dest_path.
- exists(path: str) bool ¶
Returns True if path exists on filesystem.
- filesize(path: str) int ¶
Returns file size of path in bytes.
- get_hash(path: str) str ¶
Return a hash for the file at path.
- glob(pattern: str) List[str] ¶
Returns filenames matching pattern.
- isdir(path: str) bool ¶
Returns True if path is a directory.
- isfile(path: str) bool ¶
Returns True if path is a file.
- mkdir(path: str) None ¶
Creates the directory in the filesystem.
- name: str = 'local'¶
- remove(path: str) None ¶
Delete a path from the filesystem.
- rmdir(path: str, recursive: bool = False) None ¶
Removes a directory from the filesystem. If recursive, removes all contents of the directory. Otherwise, raises OSError on non-empty directories
- shell_copy(src_path: str | None, dest_path: str | None, recursive: bool = False, as_mount: bool = False) str ¶
Returns a shell command for performing a file copy.
- Parameters:
src_path (Optional[str]) – Source path to copy from. If None, use stdin.
dest_path (Optional[str]) – Destination path to copy to. If None, use stdout.
recursive (bool) – If True, copy a directory tree of files.
as_mount (bool) – Treat src/dest as mounted.
- touch(path: str, time: Tuple[int, int] | Tuple[float, float] | None = None) None ¶
Create the path on the filesystem with timestamp.
- exception redun.file.RedunFileNotFoundError(path: str, *args)¶
Bases:
FileNotFoundError
Redun-specific FileNotFoundError.
- exception redun.file.RedunOSError(path: str, *args)¶
Bases:
OSError
Redun-specific OSError.
- exception redun.file.RedunPermissionError(path: str, *args)¶
Bases:
PermissionError
Redun-specific PermissionError.
- class redun.file.S3FileSystem¶
Bases:
FileSystem
FileSystem methods for a AWS S3.
- copy(src_path: str, dest_path: str) None ¶
Copy a file from src_path to dest_path.
- exists(path: str) bool ¶
Returns True if path exists in filesystem.
- filesize(path: str) int ¶
Returns file size of path in bytes.
- static get_bucket_and_key(path: str) Tuple[str, str] ¶
Returns the bucket and key from an S3 path, useful for boto calls.
- get_hash(path: str) str ¶
Return a hash for the file at path.
- glob(pattern: str) List[str] ¶
Returns filenames matching pattern.
- isdir(path: str) bool ¶
Returns True if path is a directory.
- isfile(path: str) bool ¶
Returns True if path is a file.
- mkdir(path: str) None ¶
Creates the directory in the filesystem.
- name: str = 's3'¶
- remove(path: str) None ¶
Delete a path from the filesystem.
- rmdir(path: str, recursive: bool = False) None ¶
Removes a directory from the filesystem. If recursive, removes all contents of the directory. Otherwise, raises OSError on non-empty directories
- property s3: S3FileSystem¶
- property s3_raw: Any¶
- shell_copy(src_path: str | None, dest_path: str | None, recursive: bool = False, as_mount: bool = False) str ¶
Returns a shell command for performing a file copy.
- Parameters:
src_path (Optional[str]) – Source path to copy from. If None, use stdin.
dest_path (Optional[str]) – Destination path to copy to. If None, use stdout.
recursive (bool) – If True, copy a directory tree of files.
as_mount (bool) – Treat src/dest as mounted.
- class redun.file.ShardedS3Dataset(path: str, format: str = 'parquet', recurse: bool = True)¶
Bases:
Value
A sharded dataset on S3. “Sharded” means a collection of files that when concatenated comprise the complete dataset. Several formats are supported but parquet is the best tested with redun to date due to the quality of its integration with AWS services and because it allows reading of only portions of the dataset.
The hash of the S3ShardedDataset is just the hash of the sorted list of files in the dataset. So changing the files included (such as with recurse=True), adding or removing files, or doing some kind of dataset write operation (which creates new shards) cause the hash of the dataset to change. This does not recognize individual shards from being altered by other code, however.
- property filenames: List[str]¶
- property format: str¶
- classmethod from_data(dataset: pandas.DataFrame | pyspark.sql.DataFrame, output_path: str, format: str = 'parquet', partition_keys: List[str] = [], catalog_database: str = 'default', catalog_table: str | None = None, format_options: Dict[str, Any] = {}) ShardedS3Dataset ¶
Helper function to create a ShardedS3Dataset from an existing DataFrame-like object.
- Parameters:
dataset (Union[pandas.DataFrame, pyspark.sql.DataFrame]) – Dataset to save
output_path (str) – Path on S3 to which data will be saved as multiple files of format format.
format (str) – Format to save the data in. Supported formats are: [“avro”, “csv”, “ion”, “grokLog”, “json”, “orc”, “parquet”, “xml”] Defaults to parquet.
partition_keys (List[str]) – Dataset keys to partition on. Each key will be a subdirectory in self.path containing data for each value of that key. For example, partition on the column ‘K’, will make subdirectores ‘K=1’, ‘K=2’, ‘K=3’, etc.
catalog_database (str) – Datacatalog name to write to, if creating a table in the Data Catalog. Defaults to ‘default’
catalog_table (Optional[str]) – If present, written data will be available in AWS Data Catalog / Glue / Athena with the indicated table name.
format_options (Dict[str, Any]) – Additional options for the data loader. Documented here: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format.html
- classmethod from_files(files: List[File], format: str | None = None, allow_additional_files: bool = False) ShardedS3Dataset ¶
Helper function to create a ShardedS3Dataset from a list of redun Files. This can be helpful for provenance tracking from methods that return multiple files and the “root directory” is wanted for later use.
If a dataset cannot be created that contains all the files, such as being in different S3 prefixes, a ValueError is raised. If files do not all have the same format, a ValueError will be raised.
- Parameters:
files (List[File]) – Files that should be included in the dataset.
format (Optional[str]) – If specified, files are considered to be in this format.
allow_additional_files (bool) – If True, allows the resulting ShardedS3Dataset to contain files not in files. If False, a ValueError will be raised if a ShardedS3Dataset cannot be constructed.
- Raises:
ValueError – If a dataset could not be created from the given files.
- get_hash(data: bytes | None = None) str ¶
Returns a hash for the value. Users may override to provide custom behavior.
Most basic objects are covered by the default approach of hashing the binary pickle serialization.
- property hash: str¶
- is_valid() bool ¶
Returns True if the value may be used. For ordinary values, this will typically always be true. For state-tracking values, the deserialized cache value may no longer be valid. If this method returns False, the object will typically be discarded.
This method may be called repeatedly on the same object.
For example, if the value was a File reference and the corresponding file on the filesystem has since been removed/altered, the File reference is no longer valid.
- load_pandas(max_shards: int = -1) pandas.DataFrame ¶
Loads the ShardedS3Dataset as a Pandas DataFrame.
- Parameters:
max_shards (int) – Maximum number of shards to load. If -1, will load all of them.
- Returns:
All
- Return type:
pandas.DataFrame
- load_pandas_shards(max_shards: int = -1) List[pandas.DataFrame] ¶
Loads the ShardedS3Dataset as a list of Pandas DataFrames. This is deterministic and will load the shards in the same order every time.
- Parameters:
max_shards (int) – Maximum number of shards to load. If -1 (default), will load all shards.
- Returns:
Loaded shards, one per entry in list. Shards
- Return type:
List[pandas.DataFrame]
- load_spark(validate: bool = False, format_options: Dict[str, Any] = {}) pyspark.sql.DataFrame ¶
Loads the ShardedS3Dataset as a Spark DataFrame. Must be running in a Spark context.
- Parameters:
validate (bool) – If True, will check that dataset has at least 1 row. Requires a count operation that can take a few minutes, so set to False for performance.
format_options (Dict[str, Any]) – Additional options for the data loader. Documented here: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format.html
- Returns:
pyspark.sql.DataFrame
- Return type:
loaded dataset
- property path: str¶
- postprocess(postprocess_args) ShardedS3Dataset ¶
Post process a value resulting from a Task, and return the postprocessed value.
This is a scheduler lifecycle method and is not typically invoked by users. This method may be called repeatedly on a particular instance, for example, if it is returned recursively.
- Parameters:
postprocess_args (dict) – Extra data provided by the scheduler. Implementations that are not part of redun should discard this argument.
- purge_spark(remove_older_than: int = 1, manifest_file_path: str | None = None) None ¶
Recursively removes all files older than remove_older_than hours. Defaults to 1 hour. Optionally writes removed files to manifest_file_path/Success.csv
- property recurse: bool¶
- save_spark(dataset: pandas.DataFrame | pyspark.sql.DataFrame, partition_keys: List[str] = [], catalog_database: str = 'default', catalog_table: str | None = None, format_options: Dict[str, Any] = {}) None ¶
Writes a pandas or spark DataFrame to the given path in the given format, optionally partitioning on dataset keys. Must be done from a spark environment.
- Parameters:
dataset (Union[pandas.DataFrame, pyspark.sql.DataFrame]) – Dataset to save
partition_keys (List[str]) – Dataset keys to partition on. Each key will be a subdirectory in self.path containing data for each value of that key. For example, partition on the column ‘K’, will make subdirectores ‘K=1’, ‘K=2’, ‘K=3’, etc.
catalog_database (str) – Datacatalog name to write to, if creating a table in the Data Catalog. Defaults to ‘default’
catalog_table (Optional[str]) – If present, written data will be available in AWS Data Catalog / Glue / Athena with the indicated table name.
format_options (Dict[str, Any]) – Additional options for the data loader. Documented here: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format.html
- type_name: str | None = 'redun.ShardedS3Dataset'¶
- update_hash() None ¶
- class redun.file.Staging(local: T | str, remote: T | str)¶
Bases:
Value
,Generic
[T
]- classmethod parse_arg(raw_type: type, arg: str) Any ¶
Parse a command line argument in a new Value.
- abstract render_stage(as_mount: bool = False) str ¶
- abstract render_unstage(as_mount: bool = False) str ¶
- abstract stage() T ¶
- type_name: str | None = 'redun.file.Staging'¶
- abstract unstage() T ¶
- class redun.file.StagingDir(local: Dir | str, remote: Dir | str)¶
-
- classes = <redun.file.FileClasses object>¶
- get_hash(data: bytes | None = None) str ¶
Returns a hash for the value. Users may override to provide custom behavior.
Most basic objects are covered by the default approach of hashing the binary pickle serialization.
- render_stage(as_mount: bool = False) str ¶
Returns a shell command for staging a directory.
- render_unstage(as_mount: bool = False) str ¶
Returns a shell command for unstaging a directory.
- type_basename = 'StagingDir'¶
- type_name: str | None = 'redun.StagingDir'¶
- class redun.file.StagingFile(local: File | str, remote: File | str)¶
-
- classes = <redun.file.FileClasses object>¶
- get_hash(data: bytes | None = None) str ¶
Returns a hash for the value. Users may override to provide custom behavior.
Most basic objects are covered by the default approach of hashing the binary pickle serialization.
- render_stage(as_mount: bool = False) str ¶
Returns a shell command for staging a file.
- render_unstage(as_mount: bool = False) str ¶
Returns a shell command for unstaging a file.
- type_basename = 'StagingFile'¶
- type_name: str | None = 'redun.StagingFile'¶
- redun.file.copy_file(src_path: str | None, dest_path: str | None, recursive: bool = False) None ¶
Copy a file. Files can be from different filesystems.
- Parameters:
src_path (Optional[str]) – Source path to copy from. If None, use stdin.
dest_path (Optional[str]) – Destination path to copy to. If None, use stdout.
recursive (bool) – If True, copy a directory tree of files.
- redun.file.get_filesystem(proto: str | None = None, url: str | None = None) FileSystem ¶
Returns the corresponding FileSystem for a given url or protocol.
- redun.file.get_filesystem_class(proto: str | None = None, url: str | None = None) Type[FileSystem] ¶
Returns the corresponding FileSystem class for a given url or protocol.
- redun.file.get_proto(url: str | None = None) str ¶
Returns the protocol for a url.
For example the protocol for ‘http://example.com’ is ‘http’. Local paths ‘/path/to/my/file’ have the protocol ‘local’.
- redun.file.glob_file(pattern: str) List[str] ¶
- redun.file.list_filesystems() List[Type[FileSystem]] ¶
Returns list of supported filesystems.
- redun.file.open_file(url: str, mode: str = 'r', encoding: str | None = None, **kwargs: Any) IO ¶
Open a file stream.
- Parameters:
url (str) – Url or path of file to open.
mode (str) – Stream mode for reading or writing (‘r’, ‘w’, ‘b’, ‘a’).
encoding (str) – Text encoding (e.g. ‘utf-8’) to use when read or writing.
**kwargs – Additional arguments for the underlying file stream. They are Filesystem-specific.
- redun.file.register_filesystem(cls: Type[FileSystem]) Type[FileSystem] ¶
redun.functools module¶
- (task)redun.functools.apply_func(func: Callable[..., T], *args: Any, **kwargs: Any) T ¶
Apply a Python function to possibly lazy arguments.
The function func must be defined at the module-level.
lazy_list = a_task_returning_a_list() lazy_length = apply_func(len)(lazy_list)
- redun.functools.as_task(func: Callable[[...], T]) PartialTask ¶
Transform a plain Python function into a redun Task.
The function func must be defined at the module-level.
assert as_task(max)(10, 20, 30, 25) == 30
- (task)redun.functools.compose_apply(tasks: Sequence[redun.task.Task], *args: Any, **kwargs: Any) Any ¶
Helper function for applying a composition of Tasks to arguments.
- (task)redun.functools.const(x: T, _: Any) T ¶
Returns the first argument unchanged (constant) and discards the second.
This is useful for forcing execution of the second argument, but discarding it’s value from downstream parts of the workflow.
- redun.functools.delay(x: T) PartialTask ¶
Delay the evaluation of a value x.
The name delay() is inspired from other programming languages: - http://web.mit.edu/scheme_v9.2/doc/mit-scheme-ref/Promises.html - http://people.cs.aau.dk/~normark/prog3-03/html/notes/eval-order_themes-delay-stream-section.html # noqa: E501 - https://docs.racket-lang.org/reference/Delayed_Evaluation.html
- (task)redun.functools.eval_(code: str, *args: Any, **kwargs: Any) Any ¶
Evaluate code with kwargs as local variables.
If a position argument is given, its value is assigned to local variables defined by pos_args.
This can be useful for small manipulations of lazy values.
records = task1() names = eval_("[record.name for record in records]", records=records) result = task2(names)
- (task)redun.functools.flat_map(a_task: redun.task.Task[Callable[..., List[T]]], values: List) List[T] ¶
Apply a task a_task on a sequence of values and flatten the result.
- (task)redun.functools.flatten(list_of_lists: Sequence[Sequence[T]]) List[T] ¶
Flatten a list of lists into a flat list.
- redun.functools.force(x: Task[Callable[[], T]]) T ¶
Force the evaluation of a delayed evaluation.
The name force() is inspired from other programming languages: - http://web.mit.edu/scheme_v9.2/doc/mit-scheme-ref/Promises.html - http://people.cs.aau.dk/~normark/prog3-03/html/notes/eval-order_themes-delay-stream-section.html # noqa: E501 - https://docs.racket-lang.org/reference/Delayed_Evaluation.html
- (task)redun.functools.identity(x: T) T ¶
Returns its input argument.
- (scheduler_task)redun.functools.map_(a_task: redun.task.Task, values: Sequence[Any]) T ¶
Map a task to a list of values, similar to map(f, xs).
- (scheduler_task)redun.functools.seq(exprs: Sequence[Any]) T ¶
Evaluate the expressions serially.
The name seq() is inspired from Haskell: - https://wiki.haskell.org/Seq
- redun.functools.starmap(a_task: Task[Callable[[...], T]], kwargs: List[Dict] | Expression[List[Dict]] = []) List[T] ¶
Map a task to a list of keyword arguments.
- (task)redun.functools.zip_(*lists: List[T]) List[Tuple[T, ...]] ¶
Zips two or lists into a list of tuples.
This is a task equivalent of zip().
redun.glue module¶
Helper functions for glue jobs.
Glue related imports are kept isolated, so this can be imported people’s local machines without issue, and then they call the functions that will run when the context is defined.
- redun.glue.clean_datacatalog_table(database: str, table: str, remove_older_than: int = 1) None ¶
Removes records and files from a datacatalog table older than remove_older_than hours. Underlying data on S3 will be deleted, too.
- redun.glue.get_glue_context() GlueContext ¶
Returns the current glue context
- redun.glue.get_num_workers() int ¶
Returns the number of workers in the current Spark context.
- redun.glue.get_spark_context() SparkContext ¶
Returns the current spark context.
- redun.glue.get_spark_session() SparkSession ¶
- redun.glue.load_datacatalog_spark_dataset(database: str, table_name: str, hashfield: str | None = None) DataFrame ¶
Loads a dataset from DataCatalog
- redun.glue.setup_glue_job(job_name: str, job_args: List[str]) GlueJob ¶
- redun.glue.sql_query(dataset: DataFrame, query: str, dataset_alias: str = 'dataset') DataFrame ¶
Runs a SQL-style query on a Spark DataFrame.
- Parameters:
dataset (pyspark.sql.DataFrame) – The dataset to query.
query (str) – SQL query string.
dataset_alias (str) – Name for dataset in SQL context. Defaults to “dataset”. The SQL query should reference the alias like “SELECT * FROM {dataset_alias}”
- redun.glue.udf(wrapped_func: Callable | None = None, return_type: DataType | None = None) Callable ¶
Creates a spark user defined function. Wraps pyspark.sql.functions.udf so spark context is only needed at runtime rather than redun task expression time.
redun.handle module¶
- class redun.handle.Handle(name: str | None = None, *args, namespace: str | None = None, **kwargs)¶
Bases:
Value
A Value that opaquely accumulates state as it passes through Task`s. This subclass of `Value has deep support from the scheduler and its backend, in order to provide a foundation for encapsulating interactions with a deterministic, stateful, external system, such as a database.
This model of state supports being initialized, then opaque transformations as Task`s are applied. The hash data and serialization data is designed to be opaque to the actual data held within the handle. At initialization time, only the constructor data is captured for serialization and hashing. Upon transformation (that is, when a `Handle is returned by a Task) the hash is replaced with the hash identifying the Task invocation, including both code and inputs.
See the design documents for more discussion of the principles behind the design: docs/source/design.md.
In general use, users should not alter the serialization methods.
- class HandleInfo(name: str, args: Tuple, kwargs: dict, class_name: str, namespace: str | None = None, call_hash: str = '', hash: str | None = None, key: str = '')¶
Bases:
object
The fullname is an identification key for the handle. It is used globally across the redun backend to identify other potential instances of this Handle. However, hashes are used, as usual, to perform equality-like tests.
- get_hash() str ¶
Returns hash of the handle.
Implementation note: the Handle state model requires that the computation proceeds differently for new Handles and ones that are computed by other tasks.
- get_state() dict ¶
Returns serializable state dict.
- update_hash() None ¶
- apply_call(call_hash: str) Handle ¶
Returns a new Handle derived from this one assuming passage through a call with call_hash.
- fork(key: str) Handle ¶
Forks the handle into a second one for use in parallel tasks. A key must be provided to differentiate the fork from the original (although the original may have a key)
- get_hash(data: bytes | None = None) str ¶
Returns a hash of the handle.
- is_valid() bool ¶
Returns True if handle is still valid (i.e., has not been rolled back).
- postprocess(postprocess_args: dict) Handle ¶
Applies the call_hash to the handle as it returns from a task.
- type_name: str | None = 'redun.Handle'¶
- redun.handle.get_fullname(namespace: str | None, name: str) str ¶
Constructs a fullname from a namespace and a name.
redun.hashing module¶
- class redun.hashing.Hash(length=40)¶
Bases:
object
A convenience class for creating hashes.
- hexdigest() str ¶
- update(data)¶
- redun.hashing.hash_arguments(type_registry: TypeRegistry, args: Sequence, kwargs: dict)¶
Hash the arguments for a Task call.
- redun.hashing.hash_bytes(bytes: bytes) str ¶
Hash a byte sequence.
- redun.hashing.hash_call_node(task_hash: str, args_hash: str, result_hash: str, child_call_hashes: List[str]) str ¶
Calculates the call_hash for a CallNode.
- redun.hashing.hash_eval(type_registry: TypeRegistry, task_hash: str, args: Sequence, kwargs: dict) Tuple[str, str] ¶
Hash Task evaluation and arguments.
- redun.hashing.hash_kwargs(type_registry: TypeRegistry, kwargs: Dict[str, Any]) Dict[str, str] ¶
Hash a list of arguments.
- redun.hashing.hash_positional_args(type_registry: TypeRegistry, args: Sequence) List[str] ¶
Hash a list of arguments.
- redun.hashing.hash_stream(stream: IO, block_size: int = 1024) str ¶
Hash a stream of bytes.
- redun.hashing.hash_struct(struct: Any) str ¶
Hash a structure by using canonical serialization using bencode.
- redun.hashing.hash_tag(entity_id: str, key: str, value: Any, parents: List[str]) str ¶
Hash a CallGraph Tag.
- redun.hashing.hash_tag_bytes(tag: str, bytes: bytes) str ¶
Hash a tag followed by a byte sequence.
- redun.hashing.hash_text(text: str) str ¶
Returns the hash for a string.
redun.job_array module¶
- class redun.job_array.JobArrayer(submit_jobs: Callable[[List[Job]], None], on_error: Callable[[Exception], None], submit_interval: float, stale_time: float, min_array_size: int, max_array_size: int = 10000)¶
Bases:
object
A ‘sink’ for submitted jobs that detects when jobs can be submitted as an array job. Eligible jobs will have the same task, task options, and resource requirements, but can differ in the args and kwargs passed to the task.
The method uses “staleness” of submitted jobs in the ‘sink’. Eligible jobs are grouped in the pool (really a dict). Grouped jobs that haven’t had new ones added to the group for stale_time sec will be submitted.
- Parameters:
submit_jobs (Callable[[List[Job]], None]) – Callback called when a group of jobs are ready to be submitted as an array.
on_error (Callable[[Exception], None]) – Callback called when there is a top-level error.
submit_interval (float) – How frequently the monitor thread will check for submittable stale jobs. Should be less than stale_time, ideally.
stale_time (float) – Job groupings that haven’t had new jobs added for this many seconds will be submitted.
min_array_size (int) – Minimum number of jobs in a group to be submitted as an array job instead of individual jobs. Can be anywhere from 2 to MAX_ARRAY_SIZE-1.
max_array_size (int) – Maximum number of jobs that can be submitted as an array job. Must be in (min_array_size, MAX_ARRAY_SIZE].
- get_stale_descrs() List[JobDescription] ¶
Submits jobs that haven’t been touched in a while
- start() None ¶
- stop() None ¶
- submit_pending_jobs(descr: JobDescription) None ¶
- redun.job_array.get_job_array_index(env: dict = environ({'GITHUB_STATE': '/home/runner/work/_temp/_runner_file_commands/save_state_53f414fc-dac2-4f2a-98fc-63617cccc994', 'STATS_TRP': 'true', 'DOTNET_NOLOGO': '1', 'DEPLOYMENT_BASEPATH': '/opt/runner', 'USER': 'runner', 'CI': 'true', 'GITHUB_ENV': '/home/runner/work/_temp/_runner_file_commands/set_env_53f414fc-dac2-4f2a-98fc-63617cccc994', 'PIPX_HOME': '/opt/pipx', 'RUNNER_ENVIRONMENT': 'github-hosted', 'JAVA_HOME_8_X64': '/usr/lib/jvm/temurin-8-jdk-amd64', 'SHLVL': '1', 'OLDPWD': '/home/runner/work/redun/redun', 'HOME': '/home/runner', 'RUNNER_TEMP': '/home/runner/work/_temp', 'GITHUB_EVENT_PATH': '/home/runner/work/_temp/_github_workflow/event.json', 'GITHUB_REPOSITORY_OWNER': 'insitro', 'JAVA_HOME_11_X64': '/usr/lib/jvm/temurin-11-jdk-amd64', 'PIPX_BIN_DIR': '/opt/pipx_bin', 'STATS_RDCL': 'true', 'ANDROID_NDK_LATEST_HOME': '/usr/local/lib/android/sdk/ndk/27.2.12479018', 'GRADLE_HOME': '/usr/share/gradle-8.11.1', 'GITHUB_RETENTION_DAYS': '90', 'JAVA_HOME_21_X64': '/usr/lib/jvm/temurin-21-jdk-amd64', 'POWERSHELL_DISTRIBUTION_CHANNEL': 'GitHub-Actions-ubuntu22', 'GITHUB_HEAD_REF': '', 'GITHUB_REPOSITORY_OWNER_ID': '41341906', 'AZURE_EXTENSION_DIR': '/opt/az/azcliextensions', 'MAKEFLAGS': '', 'SYSTEMD_EXEC_PID': '602', 'GITHUB_GRAPHQL_URL': 'https://api.github.com/graphql', 'NVM_DIR': '/home/runner/.nvm', 'DOTNET_SKIP_FIRST_TIME_EXPERIENCE': '1', 'JAVA_HOME_17_X64': '/usr/lib/jvm/temurin-17-jdk-amd64', 'GOROOT_1_21_X64': '/opt/hostedtoolcache/go/1.21.13/x64', 'ImageVersion': '20241215.1.0', 'RUNNER_OS': 'Linux', 'GITHUB_API_URL': 'https://api.github.com', 'SWIFT_PATH': '/usr/share/swift/usr/bin', 'GOROOT_1_22_X64': '/opt/hostedtoolcache/go/1.22.10/x64', 'RUNNER_USER': 'runner', 'GOROOT_1_23_X64': '/opt/hostedtoolcache/go/1.23.4/x64', 'CHROMEWEBDRIVER': '/usr/local/share/chromedriver-linux64', '_': '/usr/bin/make', 'JOURNAL_STREAM': '8:16366', 'GITHUB_WORKFLOW': 'sphinx-docs', 'STATS_V3PS': 'true', 'STATS_D': 'false', 'GITHUB_RUN_ID': '12642622334', 'ACTIONS_RUNNER_ACTION_ARCHIVE_CACHE': '/opt/actionarchivecache', 'STATS_VMFE': 'true', 'GITHUB_WORKFLOW_SHA': '147c19dedf762ae1ebd76805e6d4328726a4ee12', 'BOOTSTRAP_HASKELL_NONINTERACTIVE': '1', 'GITHUB_REF_TYPE': 'branch', 'ImageOS': 'ubuntu22', 'GITHUB_BASE_REF': '', 'STATS_BLT': 'true', 'GITHUB_ACTION_REPOSITORY': '', 'PERFLOG_LOCATION_SETTING': 'RUNNER_PERFLOG', 'GITHUB_WORKFLOW_REF': 'insitro/redun/.github/workflows/publish-docs.yml@refs/heads/main', 'PATH': '/snap/bin:/home/runner/.local/bin:/opt/pipx_bin:/home/runner/.cargo/bin:/home/runner/.config/composer/vendor/bin:/usr/local/.ghcup/bin:/home/runner/.dotnet/tools:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin', 'RUNNER_TRACKING_ID': 'github_97d27f0a-2a1a-4882-8ba5-5250b359e9de', 'DOTNET_MULTILEVEL_LOOKUP': '0', 'INVOCATION_ID': 'b059134d0b6747d0a3d27be9e4962019', 'RUNNER_TOOL_CACHE': '/opt/hostedtoolcache', 'ANT_HOME': '/usr/share/ant', 'GITHUB_TRIGGERING_ACTOR': 'mattrasmus', 'GITHUB_RUN_NUMBER': '60', 'RUNNER_ARCH': 'X64', 'XDG_RUNTIME_DIR': '/run/user/1001', 'AGENT_TOOLSDIRECTORY': '/opt/hostedtoolcache', 'GITHUB_ACTION': '__run', 'MAKELEVEL': '1', 'LANG': 'C.UTF-8', 'VCPKG_INSTALLATION_ROOT': '/usr/local/share/vcpkg', 'RUNNER_NAME': 'GitHub Actions 414', 'GITHUB_REF_NAME': 'main', 'STATS_D_D': 'false', 'XDG_CONFIG_HOME': '/home/runner/.config', 'CONDA': '/usr/share/miniconda', 'STATS_VMD': 'true', 'GITHUB_REPOSITORY': 'insitro/redun', 'STATS_UE': 'true', 'GITHUB_ACTION_REF': '', 'ANDROID_NDK_ROOT': '/usr/local/lib/android/sdk/ndk/27.2.12479018', 'DEBIAN_FRONTEND': 'noninteractive', 'GITHUB_REPOSITORY_ID': '424419603', 'STATS_PIP': 'false', 'GITHUB_ACTIONS': 'true', 'GITHUB_REF_PROTECTED': 'true', 'ACCEPT_EULA': 'Y', 'RUNNER_PERFLOG': '/home/runner/perflog', 'GITHUB_JOB': 'build', 'GITHUB_WORKSPACE': '/home/runner/work/redun/redun', 'GITHUB_SHA': '147c19dedf762ae1ebd76805e6d4328726a4ee12', 'GITHUB_RUN_ATTEMPT': '1', 'STATS_D_TC': 'true', 'GITHUB_REF': 'refs/heads/main', 'ANDROID_SDK_ROOT': '/usr/local/lib/android/sdk', 'GITHUB_ACTOR': 'mattrasmus', 'LEIN_HOME': '/usr/local/lib/lein', 'JAVA_HOME': '/usr/lib/jvm/temurin-11-jdk-amd64', 'PWD': '/home/runner/work/redun/redun/docs', 'RUNNER_WORKSPACE': '/home/runner/work/redun', 'GITHUB_ACTOR_ID': '56042231', 'GITHUB_PATH': '/home/runner/work/_temp/_runner_file_commands/add_path_53f414fc-dac2-4f2a-98fc-63617cccc994', 'GHCUP_INSTALL_BASE_PREFIX': '/usr/local', 'GITHUB_EVENT_NAME': 'push', 'GITHUB_SERVER_URL': 'https://github.com', 'STATS_TIS': 'mining', 'ANDROID_HOME': '/usr/local/lib/android/sdk', 'LEIN_JAR': '/usr/local/lib/lein/self-installs/leiningen-2.11.2-standalone.jar', 'GECKOWEBDRIVER': '/usr/local/share/gecko_driver', 'HOMEBREW_CLEANUP_PERIODIC_FULL_DAYS': '3650', 'GITHUB_OUTPUT': '/home/runner/work/_temp/_runner_file_commands/set_output_53f414fc-dac2-4f2a-98fc-63617cccc994', 'HOMEBREW_NO_AUTO_UPDATE': '1', 'EDGEWEBDRIVER': '/usr/local/share/edge_driver', 'STATS_EXT': 'true', 'SGX_AESM_ADDR': '1', 'CHROME_BIN': '/usr/bin/google-chrome', 'MFLAGS': '', 'ANDROID_NDK': '/usr/local/lib/android/sdk/ndk/27.2.12479018', 'SELENIUM_JAR_PATH': '/usr/share/java/selenium-server.jar', 'STATS_EXTP': 'https://provjobdprod.z13.web.core.windows.net/settings/provjobdsettings-latest/provjobd.data', 'ANDROID_NDK_HOME': '/usr/local/lib/android/sdk/ndk/27.2.12479018', 'GITHUB_STEP_SUMMARY': '/home/runner/work/_temp/_runner_file_commands/step_summary_53f414fc-dac2-4f2a-98fc-63617cccc994', 'DOCUTILSCONFIG': '/home/runner/work/redun/redun/docs/source/docutils.conf'})) int | None ¶
Finds any Job Array Index environment variables and returns the index.
redun.logging module¶
redun.namespace module¶
- redun.namespace.compute_namespace(obj: Any, namespace: str | None = None)¶
Compute the namespace for the provided object.
Precedence: - Explicit namespace provided (note: an empty string is a valid explicit value) - Infer it from a redun_namespace variable in the same module as func - The current namespace, configured with set_current_namespace
WARNING: This computation interacts with the global “current namespace” (see above), so results may be context dependent.
- redun.namespace.get_current_namespace()¶
Returns the current task namespace during import.
- redun.namespace.namespace(_namespace=None)¶
Set the current task namespace.
Use None to unset the current namespace.
redun.promise module¶
- class redun.promise.Promise(func: Callable | None = None)¶
Bases:
Generic
[T
]A light-weight single-thread implementation of Promise.
We specifically are careful to remove references as soon as possible to enable garbage collection.
A Promise can only be resolved or rejected once. Callbacks will be called at most once. Callbacks can be added both before and after the resolution.
- classmethod all(subpromises: Sequence[Promise[T]]) Promise[List[T]] ¶
Return a promise that waits for all subpromises to resolve.
- do_reject(error: Exception) Exception ¶
Reject the promise with an error.
- do_resolve(result: T) T ¶
Resolve the promise to a result.
- property error: Exception¶
Returns final error.
- then(resolver: Callable[[T], Any] | None = None, rejector: Callable[[Exception], Any] | None = None) Promise[S] ¶
Register callbacks to the promise.
- property value: T¶
Returns final result or error.
redun.pytest module¶
redun.scheduler module¶
- exception redun.scheduler.DryRunResult¶
Bases:
Exception
- class redun.scheduler.ErrorValue(error: Exception, traceback: Traceback | None = None)¶
Bases:
Value
Value for wrapping Exceptions raised by Task.
- get_hash(data: bytes | None = None) str ¶
Returns a hash for the value. Users may override to provide custom behavior.
Most basic objects are covered by the default approach of hashing the binary pickle serialization.
- type_name: str | None = 'redun.ErrorValue'¶
- class redun.scheduler.Execution(id: str | None = None, context: dict | None = None)¶
Bases:
object
An Execution tracks the execution of a workflow of :class:`redun.task.Task`s.
- class redun.scheduler.Frame(filename: str, lineno: int, name: str, locals: Dict[str, Any], lookup_line: bool = True, line: str | None = None, job: Job | None = None)¶
Bases:
FrameSummary
,Value
Frame of a
Traceback
forJob
failure.- filename¶
- lineno¶
- locals¶
- name¶
- type_name: str | None = 'redun.Frame'¶
- class redun.scheduler.Job(task: Task, expr: TaskExpression, id: str | None = None, parent_job: Job | None = None, execution: Execution | None = None, options: dict | None = None)¶
Bases:
object
A Job tracks the execution of a
redun.task.Task
through its various stages.- STATUSES = ['PENDING', 'RUNNING', 'FAILED', 'CACHED', 'DONE', 'TOTAL']¶
- clear()¶
Free execution state from Job.
- collapse(other_job: Job) None ¶
Collapse this Job into other_job.
This method is used when equivalent Jobs are detected, and we want to perform Common Subexpression Elimination (CSE).
- get_context() dict ¶
Returns the context variables for the Job.
- get_limits() Dict[str, int] ¶
Returns resource limits required for this job to run.
- get_option(key: str, default: Any | None = None, as_type: Type | None = None) Any ¶
Returns a task option associated with a
Job
.Precedence is given to task options defined at call-time (e.g. task.options(option=foo)(arg1, arg2)) over task definition-time (e.g. @task(option=foo)).
- get_options() dict ¶
Returns task options for this job.
Precedence is given to task options defined at call-time (e.g. task.options(option=foo)(arg1, arg2)) over task definition-time (e.g. @task(option=foo)).
- get_raw_option(key: str, default: Any | None = None, as_type: Type | None = None) Any ¶
Returns an evaluated task option associated with a
Job
.Precedence is given to task options defined at call-time (e.g. task.options(option=foo)(arg1, arg2)) over task definition-time (e.g. @task(option=foo)).
- get_raw_options() dict ¶
Returns the unevaluated task options for this job.
Precedence is given to task options defined at call-time (e.g. task.options(option=foo)(arg1, arg2)) over task definition-time (e.g. @task(option=foo)).
- reject(error: Any) None ¶
Rejects a Job with an error exception.
- resolve(result: Any) None ¶
Resolves a Job with a final concrete value, result.
- property status: str¶
- class redun.scheduler.JobEnv(job: Job, context: dict)¶
Bases:
Job
Job environment.
Customize a Job with a specific context dict.
- get_context() dict ¶
Returns the context variables for the Job.
- exception redun.scheduler.NoCurrentScheduler¶
Bases:
Exception
- class redun.scheduler.Scheduler(config: Config | None = None, backend: RedunBackend | None = None, executor: Executor | None = None, logger: Any | None = None, use_task_traceback: bool = True, job_status_interval: int | None = None, migrate: bool | None = None)¶
Bases:
object
Scheduler for evaluating redun tasks.
A thread may only have a single running scheduler at a time, and a scheduler can perform exactly one execution at a time. While running, the scheduler will register itself into a thread-local variable, see get_current_scheduler and set_current_scheduler.
A scheduler may be reused for multiple executions, and makes every effort to be stateless between executions. That is, the scheduler clears its internal state before starting a new execution, providing isolation from any others we may have performed.
Although the scheduler collaborates with executors that may use multiple threads during execution, the scheduler logic relies upon being executed from a single thread. Therefore, the main lifecycle methods used by executors defer back to the scheduler thread. The scheduler is implemented around an event loop and asynchronous callbacks, allowing it to coordinate many in-flight jobs in parallel.
Scheduler tasks are generally considered “friends” of the scheduler and may need to access some private methods.
- add_job_tags(job: Job, tags: List[Tuple[str, Any]]) None ¶
Callback for adding job tags during job execution.
- clear()¶
Release resources
- done_job(job: Job, result: Any, job_tags: List[Tuple[str, Any]] = []) None ¶
Mark a
Job
as successfully done with a result.A primary Executor lifecycle method, hence is thread safe.
- evaluate(expr: Any, parent_job: Job | None = None) Promise ¶
Begin an asynchronous evaluation of an expression (concrete value or Expression). Assumes this scheduler is currently running.
This method is not a typical entry point for users, however, it is often used by `SchedulerTask`s, to trigger further computations.
Returns a Promise that will resolve when the evaluation is complete.
- extend_run(expr: Expression[Result] | Result, parent_job_id: str, dryrun: bool = False, cache: bool = True, context: dict = {}) dict ¶
Extend an existing scheduler execution (run) to evaluate a Task or Expression.
This is an alternative to the run method, and acts as a primary user entry point.
- get_job_status_report() List[str] ¶
- property is_running: bool¶
- load(migrate: bool | None = None) None ¶
- log(*messages: Any, indent: int = 0, multiline: bool = False, level: int = 20) None ¶
- log_job_statuses() None ¶
Display Job statuses.
- reject_job(job: Job | None, error: Any, error_traceback: Traceback | None = None, job_tags: List[Tuple[str, Any]] = []) None ¶
Reject a
Job
that has failed with an error.A primary Executor lifecycle method, hence is thread safe.
- run(expr: Expression[Result], exec_argv: List[str] | None = None, dryrun: bool = False, cache: bool = True, tags: Iterable[Tuple[str, Any]] = (), context: dict = {}, execution_id: str | None = None) Result ¶
- run(expr: Result, exec_argv: List[str] | None = None, dryrun: bool = False, cache: bool = True, tags: Iterable[Tuple[str, Any]] = (), context: dict = {}, execution_id: str | None = None) Result
Run the scheduler to evaluate an Expression expr.
This is the primary user entry point to the scheduler. The scheduler will register itself as this thread’s running scheduler for the duration of the execution. This function blocks, running the event loop until the result is ready and can be returned.
The scheduler can only run one execution at a time.
- exception redun.scheduler.SchedulerError¶
Bases:
Exception
- class redun.scheduler.Traceback(error: Any, frames: List[FrameSummary], logs: List[str] | None = None)¶
Bases:
Value
Traceback for
Job
failure.- classmethod from_error(error: Exception, trim_frames=True) Traceback ¶
Returns a new
Traceback
derived from an Exception.- Parameters:
error (Exception) – Exception object from which to derive a Traceback.
trim_frames (bool) – If True, do not include redun scheduler related frames in Traceback.
- classmethod trim_frames(frames: List[FrameSummary]) List[FrameSummary] ¶
- type_name: str | None = 'redun.Traceback'¶
- (task)redun.scheduler._subrun_root_task(expr: redun.expression.QuotedExpression, config: Dict[str, Dict], config_dir: str | None, load_modules: List[str], run_config: Dict[str, Any]) Any ¶
Launches a sub-scheduler and runs the provided expression by first “unwrapping” it. The evaluated result is returned within a dict alongside other sub-scheduler-related state to the caller.
This task is limited to execution cache scope because it is difficult for the caller to be reactive to the details of the subrun. When retrieving a cache value for this task, the parent scheduler only sees the final values, not the whole call tree, so it can only perform shallow checking. Selecting execution scope means that validity checking is not required. We also set shallow checking as a reminder of how it behaves.
In the case that there is no execution-scope cache hit, then the sub-scheduler has to be started. However, the sub-scheduler is free to perform any cache logic it desires. It has the complete context to be fully reactive, or not, as configured. So, if there is a backend-scope cache hit available, the sub-scheduler may return almost instantly.
The parent scheduler may or may not have access to the code, which also makes full reactivity difficult. It is a mild assumption that the code does not change during an execution.
There is one particular case we have not implemented, where the parent scheduler could bypass the need to start the sub-scheduler. If 1) the user only wants shallow validity checking and 2) the parent scheduler has all access all the code (to compute task hashes), then the parent scheduler could correctly identify a backend-scope cache hit using information stored from the prior call node. This is left to future work.
- Parameters:
expr (QuotedExpression) – QuotedExpression to be run by sub-scheduler.
config – A two-level python dict to configure the sub-scheduler. (Will be used to initialize a
Config
object via the config_dict kwarg.) Used if a config_dir is not provided.config_dir (Optional[str]) – If supplied, this config is loaded and used instead of the supplied config
load_modules – List of modules that must be imported before starting the sub-scheduler. Before launching the sub-scheduler, the modules that define the user tasks must be imported.
run_config – Sub-scheduler run() kwargs. These are typically the local-scheduler run() kwargs that should also apply to the sub-scheduler such as dryrun and cache settings.
- Returns:
Returns a dict result from running the sub-scheduler on expr. The dict contains extra information pertaining to the sub-scheduler as follows: {
’result’: sub-scheduler evaluation of expr, present if successful ‘error’: exception raised by sub-scheduler, present if error was encountered ‘dryrun’: present if dryrun was requested and workflow can’t complete
- ’job_id’: Job id of root Job in sub-scheduler,
present if parent_job_id given in run_config
- ’call_hash’: call_hash of root CallNode in sub-scheduler,
present if parent_job_id given in run_config
’status’: sub-scheduler final job status poll (list of str) ‘config’: sub-scheduler config dict (useful for confirming exactly what config
settings were used).
’run_config’: a dict of run configuration used by the sub-scheduler
}
parent_job_id is present in run_config, only results are return. Errors are raised.
- Return type:
Dict[str, Any]
- (scheduler_task)redun.scheduler.apply_tags(value: Any, tags: List[Tuple[str, Any]] = [], job_tags: List[Tuple[str, Any]] = [], execution_tags: List[Tuple[str, Any]] = []) T ¶
Apply tags to a value, job, or execution.
Returns the original value.
@task def task1(): x = 10 # Apply tags on the value 10. return apply_tags(x, [("env": "prod", "project": "acme")]) @task def task2(): x = 10 # Apply tags on the current job. return apply_tags(x, job_tags=[("env": "prod", "project": "acme")]) @task def task3(): x = 10 # Apply tags on the current execution. return apply_tags(x, execution_tags=[("env": "prod", "project": "acme")])
- (scheduler_task)redun.scheduler.catch(expr: Any, *catch_args: Any) T ¶
Catch exceptions error of class error_class from expr and evaluate recover(error).
The following example catches the ZeroDivisionError raised by divider(0) and returns 0.0 instead.
@task() def divider(denom): return 1.0 / denom @task() def recover(error): return 0.0 @task() def main(): return catch( divider(0), ZeroDivisionError, recover )
This is equivalent to the regular python code:
def divider(denom): return 1.0 / denom def main(): try: return denom(0) except ZeroDivisionError as error: return 0.0
catch() can also handle multiple Exception classes as well as multiple recover expressions.
@task() def main(): return catch( task1(), (ZeroDivisionError, ValueError), recover1, KeyError, recover2, )
which is equivalent to the regular python code:
try: return task1() except (ZeroDivisionError, ValueError) as error: return recover1(error) except KeyError as error: return recover2(error)
Implementation/behavior note: Usually, we don’t cache errors, since this is rarely productive. Also recall that we typically cache each round of evaluation separately, allowing the scheduler to retrace the call graph and detect if any subtasks have changed (i.e. hash change) or if any Values in the subworkflow are now invalid (e.g. File hash has changed). To correctly cache a caught expression, we need to cache both the fact that expr resolved to an Exception, then that the recover_expr produced a non-error result. However, we only want to cache the exception if it is successfully handled, so if the recovery re-raises the exception or issues another error, the error should not be cached.
In order to create this behavior, we have to implement custom caching behavior that delays caching the error until we see that it has been successfully handled.
- Parameters:
expr (Expression) – Main expression to evaluate.
error_recover_pairs – A list of alternating error_class and recover Tasks.
- (scheduler_task)redun.scheduler.catch_all(exprs: T, error_class: NoneType | Exception | Tuple[Exception, ...] = None, recover: redun.task.Task[Callable[..., S]] | None = None) T ¶
Catch all exceptions that occur in the nested value exprs.
This task works similar to catch, except it takes multiple expressions within a nested value (e.g. a list, set, dict, etc). Any exceptions raised by the expressions are caught and processed only after all expressions succeed or fail. For example, this can be useful in large fanouts where the user wants to avoid one small error stopping the whole workflow immediately. Consider the following code:
@task def divider(denom): return 1.0 / denom @task def recover(values): nerrors = sum(1 for value in values if isinstance(value, Exception)) raise Exception(f"{nerrors} error(s) occurred.") @task def main(): result catch_all([divider(1), divider(0), divider(2)], ZeroDivisionError, recover)
Each of the expressions in the list [divider(1), divider(0), divider(2)] will be allowed to finish before the exceptions are processed by recover. If there are no exceptions, the evaluated list is returned. If there are any exceptions, the list is passed to recover, where it will contain each successful result or Exception. recover then has the opportunity to process all errors together. In the example above, the total number of errors is reraised.
- Parameters:
exprs – A nested value (list, set, dict, tuple, NamedTuple) of expressions to evaluate.
error_class (Union[Exception, Tuple[Expression, ...]]) – An Exception or Exceptions to catch.
recover (Task) – A task to call if any errors occurs. It will be called with the nested value containing both successful results and errors.
- (scheduler_task)redun.scheduler.cond(cond_expr: Any, then_expr: Any, *rest: Any) T ¶
Conditionally execute expressions, i.e. a lazy if-statement.
@task() def is_even(x): return x % 2 == 0 @task() def task1(): # ... @task() def task2(): # ... @task() def main(x: int): return cond( is_even(x), task1(), task2() )
- redun.scheduler.format_arg(arg_name: str, value: Any, max_length: int = 200) str ¶
Format a Task argument into a string.
- redun.scheduler.format_job_statuses(job_status_counts: Dict[str, Dict[str, int]], timestamp: datetime | None = None) Iterator[str] ¶
Format job status table (Dict[task_name, Dict[status, count]]).
- redun.scheduler.format_task_call(task: Task, args: Tuple, kwargs: dict) str ¶
Format a Task call into a string.
` my_task(arg1=10, my_file=File('path/to/file.txt') `
- redun.scheduler.get_arg_defaults(task: Task, args: Tuple, kwargs: dict) dict ¶
Get default arguments from Task signature.
- redun.scheduler.get_backend_from_config(backend_config: SectionProxy | None = None) RedunBackend ¶
Parses a redun
redun.backends.base.RedunBackend
from a config object.
- redun.scheduler.get_current_scheduler(required=True) Scheduler | None ¶
Returns the currently running Scheduler for this thread.
- redun.scheduler.get_ignore_warnings_from_config(scheduler_config: dict | SectionProxy | Config) Set[str] ¶
Parses ignore warnings from config.
- redun.scheduler.get_limits_from_config(limits_config: dict | SectionProxy | Config | None = None) Dict[str, int] ¶
Parses resource limits from a config object.
- redun.scheduler.is_debugger_active() bool ¶
Returns True if debugger REPL is currently active.
- (scheduler_task)redun.scheduler.merge_handles(handles_expr: List[redun.handle.Handle]) T ¶
Merge multiple handles into one.
- redun.scheduler.needs_root_task(task_registry: TaskRegistry, expr: Any) bool ¶
Returns True if expression expr needs to be wrapped in a root task.
- (task)redun.scheduler.root_task(expr: redun.expression.QuotedExpression[Result]) Result ¶
Default task used for a root job in an Execution.
- redun.scheduler.set_arg_defaults(task: Task, args: Tuple, kwargs: dict) Tuple[Tuple, dict] ¶
Set default arguments from Task signature.
- redun.scheduler.set_current_scheduler(scheduler: Scheduler | None) None ¶
Sets the current running Scheduler for this thread.
- redun.scheduler.settrace_patch(tracefunc: Any) None ¶
Monkey patch for recording whether debugger REPL is active or not.
sys.settrace() is called by debuggers such as pdb whenever the debugger REPL is activated. We use a monkey patch in order to record tracing process-wide. Relying on sys.gettrace() is not enough because it is thread-specific.
- (scheduler_task)redun.scheduler.subrun(expr: Any, executor: str, config: Dict[str, Any] | None = None, config_dir: str | None = None, new_execution: bool = False, load_modules: List[str] | None = None, **task_options: dict) T ¶
Evaluates an expression expr in a sub-scheduler running within Executor executor.
executor and optional task_options are used to configure the special redun task ( _subrun_root_task) that starts the sub-scheduler. For example, you can configure the task with a batch executor to run the sub-scheduler on AWS Batch.
config: To ease configuration management of the sub-scheduler, you can either pass a config dict which contains configuration that would otherwise require a redun.ini file in the sub-scheduler environment, or you can provide a config_dir to be loaded when the subrun starts. If you do not pass either, the local scheduler’s config will be forwarded to the sub-scheduler (replacing the local config_dir with “.”). In practice, the sub-scheduler’s config_dir should be less important as you probably want to log both local and sub-scheduler call graphs to a common database. You can also obtain a copy of the local scheduler’s config and customize it as needed. Instantiate the scheduler directly instead of calling redun run. Then access its config via scheduler.py::get_scheduler_config_dict().
Note on code packaging: The user is responsible for ensuring that the chosen Executor for invoking the sub-scheduler copies over all user-task scripts. E.g. the local scheduler may be launched on local tasks defined in workflow1.py but subrun(executor=”batch) is invoked on taskX defined in workflow2.py. In this case, the user must ensure workflow2.py is copied to the batch node by placing it within the same directory tree as workflow1.py.
The cache behavior of subrun is customized. Since recursive reductions are supposed to happen inside the subrun, we can’t accept a single reduction. If the user wants full validity checking, that requires using single reductions, which means we have to actually start the sub-scheduler and let it handle the caching logic. In contrast, if a CSE or ultimate reduction is available, then we won’t need to start the sub-scheduler.
Most scheduler tasks don’t accept the cache options cache_scope and check_valid, but subrun does, so we can forward them to the underlying Task that implements the subrun. Note that we also set the default check_valid value to be CacheCheckValid.SHALLOW, unlike its usual value.
- Parameters:
expr (Any) – Expression to be run by sub-scheduler.
executor (str) – Executor name for the special redun task that launches the sub-scheduler (_subrun_root_task). E.g. batch to launch the sub-scheduler in AWS Batch. Note that this is a config key in the local scheduler’s config.
config (dict) – Optional sub-scheduler config dict. Must be a two-level dict that can be used to initialize a
Config
object (see :method:`Config.get_config_dict()`). If None or empty, the local Scheduler’s config will be passed to the sub-scheduler and any values with local config_dir will be replaced with “.”. Do not include database credentials as they will be logged as clear text in the call graph.config_dir (str) – Optional path to load a config from. Must be available in the execution context of the subrun.
new_execution (bool) – If True, record the provenance of the evaluation of expr as a new Execution, otherwise extend the current Execution with new Jobs.
load_modules (Optional[List[str]]) – If provided, an explicit list of the modules to load to prepare the subrun. If not supplied, the load_module for every task in the task registry will be included.
**task_options (Any) – Task options for _subrun_root_task. E.g. when running the sub-scheduler via Batch executor, you can pass ECS configuration (e.g. memory, vcpus, batch_tags).
- Returns:
Returns a dict result of evaluating the _subrun_root_task
- Return type:
Dict[str, Any]
- (task)redun.scheduler.throw(error: Exception) None ¶
Raises an exception.
This is task is useful in cases where raising needs to be done lazily.
redun.scheduler_config module¶
- redun.scheduler_config.get_abs_config_dir(config_dir: str, cwd: str | None = None) str ¶
Returns absolute path to config_dir.
If config_dir is a relative path, it is assumed to be relative to cwd. cwd defaults to the current working directory.
- redun.scheduler_config.get_abs_db_uri(db_uri: str, config_dir: str, cwd: str | None = None) str ¶
Returns DB_URI with absolute path.
If db_uri is a relative path, it is assumed to be relative to config_dir. config_dir itself may be relative to the current working directory (cwd).
- redun.scheduler_config.get_abs_url(uri: str, base: str) str ¶
Returns URI with absolute path.
redun.scripting module¶
- exception redun.scripting.ScriptError(stderr: bytes)¶
Bases:
Exception
Error raised when user script returns failure (non-zero exit code).
- (task)redun.scripting._script(command: str, inputs: Any, outputs: Any, task_options: dict = {}, temp_path: str | None = None) Any ¶
Internal task for executing a script.
This task correctly implements reactivity to changing inputs and outputs. script_task() alone is unable to implement such reactivity because its only argument is a shell script string and its output is the stdout. Thus, the ultimate input and output files of the script are accessed outside the usual redun detection mechanisms (task arguments and return values).
To achieve the correct reactivity, script_task() is special-cased in the Scheduler to not use caching, in order to force it to always execute when called. Additionally, _script() is configured with check_valid=”shallow” to skip execution of its child tasks, script_task() and postprocess_script(), if its previous outputs are still valid (i.e. not altered or deleted).
- redun.scripting.exec_script(command: str) bytes ¶
Run a script as a subprocess.
- redun.scripting.get_command_eof(command: str, eof_prefix: str = 'EOF') str ¶
Determine a safe end-of-file keyword to use for a given command to wrap.
- redun.scripting.get_task_command(task: Task, args: Tuple, kwargs: dict) str ¶
Get command from a script task.
- redun.scripting.get_wrapped_command(command: str, eof_prefix: str = 'EOF') str ¶
Returns a shell script for executing a script written in any language.
Consider command written in python:
''' #!/usr/bin/env python print('Hello, World') '''
In order to turn this into a regular sh shell script, we need to write this command to a temporary file, mark the file as executable, execute the file, and remove the temporary file.
- (task)redun.scripting.postprocess_script(result: Any, outputs: Any, temp_path: str | None = None) Any ¶
Postprocess the results of a script task.
- redun.scripting.prepare_command(command: str, default_shell='#!/usr/bin/env bash\nset -exo pipefail') str ¶
Prepare a command string execution by removing surrounding blank lines and dedent.
Also if an interpreter is not specified, add the default shell as interpreter.
- redun.scripting.script(command: str | ~typing.List, inputs: ~typing.Any = [], outputs: ~typing.Any = <object object>, tempdir: bool = False, as_mount: bool = False, **task_options: ~typing.Any) Any ¶
Execute a shell script as a redun task with file staging.
- See the docs for a full explanation:
- Parameters:
command (Union[str, List]) – Command string or argv list to execute.
inputs (Any) – Collection of FileStaging objects used to stage cloud input files to local files.
outputs (Any) – Collection of FileStaging objects used to unstage local output files back to cloud storage.
tempdir (bool) – If True, run the command within a temporary directory.
as_mount (bool) – If True, make use of cloud storage mounting (if available) to stage files.
**task_options (Any) – Options to configure the Executor, such as vcpus=2 or memory=3.
- Returns:
A result the same shape as outputs but with all FileStaging objects converted to their corresponding remote Files.
- Return type:
Any
- (task)redun.scripting.script_task(command: str) str ¶
Execute a shell script as redun Task.
redun.sphinx module¶
Sphinx documentation plugin used to document redun tasks.
Introduction¶
Usage¶
Add the extension to your docs/conf.py
configuration module:
extensions = (...,
'redun.sphinx')
If you’d like to change the prefix for tasks in reference documentation
then you can change the redun_task_prefix
configuration value:
redun_task_prefix = '(task)' # < default
With the extension installed autodoc will automatically find
task decorated objects (e.g. when using the automodule directive)
and generate the correct (as well as add a (task)
prefix),
and you can also refer to the tasks using :task:proj.tasks.add
syntax.
Use .. autotask::
to alternatively manually document a task.
- class redun.sphinx.SchedulerTaskDirective(name, arguments, options, content, lineno, content_offset, block_text, state, state_machine)¶
Bases:
PyFunction
Sphinx scheduler_task directive.
- get_signature_prefix(sig: str) List[Node] ¶
May return a prefix to put before the object name in the signature.
- class redun.sphinx.SchedulerTaskDocumenter(directive: DocumenterBridge, name: str, indent: str = '')¶
Bases:
TaskDocumenter
Document redun scheduler_task definitions.
- classmethod can_document_member(member, membername, isattr, parent) bool ¶
Called to see if a member can be documented by this Documenter.
- format_args(**kwargs: Any) str ¶
Returns a string documenting the task’s parameters.
- member_order = 11¶
order if autodoc_member_order is set to ‘groupwise’
- objtype = 'scheduler_task'¶
name by which the directive is called (auto…) and the default generated directive name
- class redun.sphinx.TaskDirective(name, arguments, options, content, lineno, content_offset, block_text, state, state_machine)¶
Bases:
PyFunction
Sphinx task directive.
- get_signature_prefix(sig: str) List[Node] ¶
May return a prefix to put before the object name in the signature.
- class redun.sphinx.TaskDocumenter(directive: DocumenterBridge, name: str, indent: str = '')¶
Bases:
FunctionDocumenter
Document redun task definitions.
- classmethod can_document_member(member, membername, isattr, parent) bool ¶
Called to see if a member can be documented by this Documenter.
- check_module() bool ¶
Check if self.object is really defined in the module given by self.modname.
- document_members(all_members=False)¶
Generate reST for member documentation.
If all_members is True, document all members, else those given by self.options.members.
- format_args(**kwargs: Any) str ¶
Returns a string documenting the task’s parameters.
- get_doc(ignore: int | None = None) List[List[str]] ¶
Returns the formatted docstring for the task.
- member_order = 11¶
order if autodoc_member_order is set to ‘groupwise’
- objtype = 'task'¶
name by which the directive is called (auto…) and the default generated directive name
- redun.sphinx.autodoc_skip_member_handler(app, what, name, obj, skip, options) bool | None ¶
Handler for autodoc-skip-member event.
- redun.sphinx.setup(app)¶
Setup Sphinx extension.
redun.task module¶
- class redun.task.CacheCheckValid(value)¶
Bases:
Enum
Types of validity checking for cache hits.
- FULL = 'full'¶
- SHALLOW = 'shallow'¶
- class redun.task.CacheResult(value)¶
Bases:
Enum
Types of cache hits and misses.
- CSE = 'CSE'¶
- MISS = 'MISS'¶
- SINGLE = 'SINGLE'¶
- ULTIMATE = 'ULTIMATE'¶
- class redun.task.CacheScope(value)¶
Bases:
Enum
Types of cache hits and misses.
- BACKEND = 'BACKEND'¶
- CSE = 'CSE'¶
- NONE = 'NONE'¶
- class redun.task.PartialTask(task: Task[Func2], args: tuple, kwargs: dict)¶
Bases:
Task
[Func
],Generic
[Func
,Func2
]A Task with only some arguments partially applied.
The type of this class is parameterized by Func and Func2, where Func2 is the type of the original function and Func is the type of partially applied function. They should match on their return types.
- get_task_option(option_name: str) Any | None ¶
- get_task_option(option_name: str, default: T) T
Fetch the requested option, preferring run-time updates over options from task construction. Like the dictionary get method, returns the default a KeyError on missing keys.
- get_task_options() dict ¶
Merge and return the task options.
- has_task_option(option_name: str) bool ¶
Return true if the task has an option with name option_name
- is_valid() bool ¶
Returns True if the Task Value is still valid (task hash matches registry).
Tasks are first-class Values in redun. They can be cached and fetched in future executions. When fetching a Task from the cache, the cached hash might no longer exist in the code base (registered tasks).
- partial(*args, **kwargs) PartialTask[Callable[[...], Result], Callable[[...], Result]] ¶
Partially apply some arguments to the Task.
- type_name: str | None = 'redun.PartialTask'¶
- class redun.task.SchedulerTask(func: Callable, name: str | None = None, namespace: str | None = None, version: str | None = None, compat: List[str] | None = None, script: bool = False, task_options_base: dict | None = None, task_options_override: dict | None = None, hash_includes: list | None = None, source: str | None = None)¶
Bases:
Task
[Func
]A Task that executes within the scheduler to allow custom evaluation.
- type_name: str | None = 'redun.task.SchedulerTask'¶
- class redun.task.Task(func: Callable, name: str | None = None, namespace: str | None = None, version: str | None = None, compat: List[str] | None = None, script: bool = False, task_options_base: dict | None = None, task_options_override: dict | None = None, hash_includes: list | None = None, source: str | None = None)¶
Bases:
Value
,Generic
[Func
]A redun Task.
Tasks are the basic unit of execution in redun. Tasks are often defined using the
redun.task.task()
decorator:@task() def my_task(x: int) -> int: return x + 1
Similar to pickling of functions, Tasks specify the work to execute by reference, not by value. This is important for serialization and caching, since a task is always reattached to the latest implementation and task options just-in-time.
Task options may give direction to the scheduler about the execution environment, but must not alter the result of the function, since changing options will not trigger cache invalidation and we are allowed to return cached results with differing options. These can be provided at two times: 1) at module-load time by decorator options, 2) at run-time by creating task expressions. The former are not serialized so they are attached the latest value in the source code. The latter must be serialized, since they are not available except at run-time.
Task are routinely serialized and deserialized and are cached based on their serialization. This relies on the serialized format being as minimal as possible, while providing the needed reference to the code to run.
If needed, extra hashable data may be provided to trigger re-evaluation/cache-invalidation.
- property fullname: str¶
‘{namespace}.{name}’
- Type:
Returns the fullname of a Task
- get_hash(data: bytes | None = None) str ¶
Returns the Task hash.
- get_task_option(option_name: str) Any | None ¶
- get_task_option(option_name: str, default: T) T
Fetch the requested option, preferring run-time updates over options from task construction. Like the dictionary get method, returns the default a KeyError on missing keys.
- get_task_options() dict ¶
Merge and return the task options.
- has_task_option(option_name: str) bool ¶
Return true if the task has an option with name option_name
- is_valid() bool ¶
Returns True if the Task Value is still valid (task hash matches registry).
Tasks are first-class Values in redun. They can be cached and fetched in future executions. When fetching a Task from the cache, the cached hash might no longer exist in the code base (registered tasks).
- property load_module: str¶
- property nout: int | None¶
Determines nout from task options and return type.
The precedence is: - the task option - function return type
- partial(*args, **kwargs) PartialTask[Callable[[...], Result], Callable[[...], Result]] ¶
Partially apply some arguments to the Task.
- recompute_hash()¶
- property signature: Signature¶
Signature of the function wrapped by the task.
- type_name: str | None = 'redun.Task'¶
- class redun.task.TaskRegistry¶
Bases:
object
A registry of currently registered Tasks. The @task() decorator registers tasks to the current registry.
- redun.task.get_task_registry()¶
Returns the global task registry.
- redun.task.get_tuple_type_length(tuple_type: Any) int | None ¶
Returns the length of a tuple type if inferable.
- redun.task.hash_args_eval(type_registry: TypeRegistry, task: Task, args: tuple, kwargs: dict) Tuple[str, str] ¶
Compute eval_hash and args_hash for a task call, filtering out config args before hashing.
- redun.task.scheduler_task(name: str | None = None, namespace: str | None = None, version: str = '1', **task_options_base: Any) Callable[[Callable[[...], Promise[Result]]], SchedulerTask[Callable[[...], Result]]] ¶
Decorator to register a function as a scheduler task.
Unlike usual tasks, scheduler tasks are lower-level tasks that are evaluated within the
Scheduler
and allow defining custom evaluation semantics. For example, one can implement cond(), seq() and catch() using scheduler tasks.When evaluated, scheduler tasks are called with a reference to the
Scheduler
, the parentJob
, and the fullSchedulerExpression
as it’s first three arguments. It’s remaining arguments are the same as those passed from the user, however, they are not evaluated and may containExpression`s. It is the responsibility of the scheduler task to explicitly evaluate arguments by using `Scheduler.evaluate()
as needed. Overall, this allows the scheduler task to implement custom evaluation semantics. Lastly, the scheduler task must return aPromise
that resolves to the result of the task.This concept corresponds to fexpr in Lisp: https://en.wikipedia.org/wiki/Fexpr
For example, one could implement a lazy if-statement called cond using this scheduler task:
@scheduler_task() def cond(scheduler: Scheduler, parent_job: Job, scheduler_expr: SchedulerExpression, pred_expr: Any, then_expr: Any, else_expr: Any) -> Promise: def then(pred): if pred: return scheduler.evaluate(then_expr, parent_job=parent_job) else: return scheduler.evaluate(else_expr, parent_job=parent_job) return scheduler.evaluate(pred_expr, parent_job=parent_job).then(then)
Once defined, the new cond expression can be used like this:
@task() def main(): result = task1() return cond(result, task2(), task3())
- redun.task.split_task_fullname(task_fullname: str) Tuple[str, str] ¶
Split a Task fullname into a namespace and name.
- redun.task.task(func: Func) Task[Func] ¶
- redun.task.task(*, name: str | None = None, namespace: str | None = None, version: str | None = None, compat: List[str] | None = None, script: bool = False, hash_includes: list | None = None, source: str | None = None, **task_options_base: Any) Callable[[Func], Task[Func]]
Decorator to register a function as a redun
Task
.- Parameters:
func (Optional[Func]) – A python function to register as a redun Task. If not given, a parameterized decorator is returned.
name (Optional[str]) – Name of task (Default: infer from function func.__name__)
namespace (Optional[str]) – Namespace of task (Default: Infer from context if possible, else None. See Task._compute_namespace)
version (Optional[str]) – Optional manual versioning for a task (Default: source code of task is hashed).
compat (Optional[List[str]]) – Optional redun version compatibility. Not currently implemented.
script (bool) – If True, this is a script-style task which returns a shell script string.
hash_includes (Optional[list]) – If provided, extra data that should be hashed. That is, extra data that should be considered as part of cache invalidation. This list may be reordered without impacting the computation. Each list item must be hashable by redun.value.TypeRegistry.get_hash.
source (Optional[str]) – If provided, task.source will be set to this string. It is the caller’s responsibility to ensure that source matches the provided func for proper hashing.
**task_options_base (Any) –
Additional options for configuring a task or specifying behaviors of tasks. Since these are provided at task construction time (this is typically at Python module-load time), they are the “base” set. Example keys:
- load_moduleOptional[str]
The module to load to import this task. (Default: infer from func.__module__)
- wrapped_taskOptional[Task]
If present, a reference to the task wrapped by this one.
- redun.task.undefined_task(fullname: str, *args: Any, **kwargs: Any) None ¶
Default function used for a deserialized Task with unknown definition.
- redun.task.wraps_task(wrapper_name: str | None = None, wrapper_hash_includes: list = [], use_wrapper_signature: bool = False, **wrapper_task_options_base: Any) Callable[[Callable[[Task[Func]], Func]], Callable[[Func], Task[Func]]] ¶
A helper for creating new decorators that can be used like @task, that allow us to wrap the task in another one. Conceptually inspired by @functools.wraps, which makes it easier to create decorators that enclose functions.
Specifically, this helps us create a decorator that accepts a task and wraps it. The task passed in is moved into an inner namespace, hiding it. Then a new task is created from the wrapper implementation that assumes its identity; the wrapper is given access to both the run-time arguments and the hidden task definition. Since Tasks may be wrapped repeatedly, the hiding step is recursive. The load_module for all of the layers is dictated by the innermost concrete Task object.
A simple usage example is a new decorator @doubled_task, which simply runs the original task and multiplies by two.
def doubled_task() -> Callable[[Func], Task[Func]]: # The name of this inner function is used to create the nested namespace, # so idiomatically, use the same name as the decorator with a leading underscore. @wraps_task() def _doubled_task(inner_task: Task) -> Callable[[Task[Func]], Func]: # The behavior when the task is run. Note we have both the task and the # runtime args. def do_doubling(*task_args, **task_kwargs) -> Any: return 2 * inner_task.func(*task_args, **task_kwargs) return do_doubling return _doubled_task # The simplest use is to wrap a task that is already created @doubled_task() @task() def value_task1(x: int): return 1 + x # We can skip the inner decorator and the task will get created implicitly # Use the explicit form if you need to pass arguments to task creation. @doubled_task() def value_task2(x: int): return 1 + x # We can keep going @doubled_task() @doubled_task() def value_task3(x: int): return 1 + x
There is an additional subtlety if the wrapper itself accepts arguments. These must be passed along to the wrapper so they are visible to the scheduler. Needing to do this manually is the cost of the extra powers we have.
# An example of arguments consumed by the wrapper def wrapper_with_args(wrapper_arg: int) -> Callable[[Func], Task[Func]]: # WARNING: Be sure to pass the extra data for hashing so it participates in the cache # evaluation @wraps_task(wrapper_hash_includes=[wrapper_arg]) def _wrapper_with_args(inner_task: Task) -> Callable[[Task[Func]], Func]: def do_wrapper_with_args(*task_args, **task_kwargs) -> Any: return wrapper_arg * inner_task.func(*task_args, **task_kwargs) return do_wrapper_with_args return _wrapper_with_args
- Parameters:
wrapper_name (Optional[str]) – The name of the wrapper, which is used to create the inner namespace. (Default: infer from the wrapper wrapper_func.__name__)
wrapper_task_options_base (Any) – Additional options for the wrapper task.
wrapper_hash_includes (Optional[list]) – If provided, extra data that should be hashed. That is, extra data that should be considered as part of cache invalidation. This list may be reordered without impacting the computation. Each list item must be hashable by redun.value.TypeRegistry.get_hash
use_wrapper_signature (bool) – By default (False), the resulting task will keep the parameter signature of its inner most wrapped task. Set this to True, in order to use the parameter signature of the wrapper task. Signatures are used to automatically infer command line options, among other things.
redun.tools module¶
- (task)redun.tools.copy_dir(src_dir: redun.file.Dir, dest_path: str, skip_if_exists: bool = False, copy_options: Dict = {}) redun.file.Dir ¶
Copy a Dir to a new path.
File copies will be done in parallel.
- (task)redun.tools.copy_file(src_file: redun.file.File, dest_path: str, skip_if_exists: bool = False) redun.file.File ¶
Copy a File to a new path.
This task can help in parallelizing file copies and providing data provenance.
- (task)redun.tools.copy_files(src_files: List[redun.file.File], dest_paths: List[str], skip_if_exists: bool = False, copy_options: Dict = {}) List[redun.file.File] ¶
Copy multiple Files to new paths.
File copies will be done in parallel.
- (task)redun.tools.debug_task(value: T) T ¶
Launch pdb debugger during one point of the redun workflow.
For example, to inspect the lazy value x in the workflow below we can pass it through debug_task():
x = task1() x = debug_task(x) y = task2(x)
- (task)redun.tools.render_template(out_path: str, template: redun.file.File, context: dict, template_dir: str | None = None) redun.file.File ¶
Render a jinja2 template.
redun.utils module¶
- class redun.utils.CacheArgs(cache_key, args, kwargs)¶
Bases:
object
Arguments for a cached function.
- class redun.utils.Comparable(*args, **kwargs)¶
Bases:
Protocol
Protocol for annotating comparable types.
- class redun.utils.MultiMap(items: Iterable[Tuple[Key, Value] | List] = ())¶
Bases:
Generic
[Key
,Value
]An associative array with repeated keys.
- add(key: Key, value: Value) None ¶
- as_dict() Dict[Key, List[Value]] ¶
- get(key: ~redun.utils.Key, default: ~typing.Any = <object object>) List[Value] ¶
- has_item(key: Key, value: Value) bool ¶
- items() Iterator[Tuple[Key, Value]] ¶
- keys() Iterator[Key] ¶
- values() Iterator[Value] ¶
- class redun.utils.PreviewClass(*args, **kwargs)¶
Bases:
ABC
Generic class to use for unknown classes in a pickle
- class redun.utils.PreviewUnpicklable(error)¶
Bases:
object
- class redun.utils.PreviewingUnpickler(*args, **kwargs)¶
Bases:
Unpickler
A specialized unpickler that allows previewing of class attributes for unknown classes.
- find_class(module: str, name: str) Type ¶
Return an object from a specified module.
If necessary, the module will be imported. Subclasses may override this method (e.g. to restrict unpickling of arbitrary classes and functions).
This method is called whenever a class or a function object is needed. Both arguments passed are str objects.
- redun.utils.add_import_path(path: str) None ¶
Add a path to the python import paths.
- redun.utils.assert_never(value: NoReturn) NoReturn ¶
Helper method for exhaustive type checking.
https://hakibenita.com/python-mypy-exhaustive-checking#type-narrowing-in-mypy
- redun.utils.clear_import_paths() None ¶
Clear all extra python import paths.
- redun.utils.format_table(table: List[List], justs: str, min_width: int = 0) Iterator[str] ¶
Format table with justified columns.
- redun.utils.format_timedelta(duration: timedelta) str ¶
Format timedelta as a string.
- redun.utils.format_timestamp(timestamp: datetime, tz: tzinfo | None = None) str ¶
Returns a timestamp as a local time string.
If tz is given, use it as the local timezone, otherwise use system local timezone.
- redun.utils.get_func_source(func: Callable) str ¶
Return the source code for a function.
- redun.utils.get_import_paths() List[str] ¶
Returns extra import paths that have been added.
- redun.utils.iter_nested_value(value: Any) Iterable[Any] ¶
Iterate through the leaf values of a nested value.
- redun.utils.iter_nested_value_children(value: Any) Iterable[Tuple[bool, Any]] ¶
Helper function that iterates through the children of a possibly nested value.
Yields: (is_leaf: Bool, value: Any)
- redun.utils.json_cache_key(args: tuple, kwargs: dict) str ¶
Returns a cache key for arguments that are JSON compatible.
- redun.utils.json_dumps(value: Any) str ¶
Convert JSON-like values into a normalized string.
Keys are sorted and no whitespace is used around delimiters.
- redun.utils.lru_cache_custom(maxsize: int, cache_key=<function <lambda>>)¶
LRU cache with custom cache key.
- redun.utils.map_nested_value(func: Callable, value: Any) Any ¶
Map a func to the leaves of a nested value.
- redun.utils.merge_dicts(dicts: List[T]) T ¶
Merge a list of (nested) dicts into a single dict. .. code-block:: python
- assert merge_dicts([
{“a”: 1, “b”: 2}, {“b”: 3, “c”: 4},
]) == {“a”: 1, “b”: 3, “c”: 4} assert merge_dicts([
{“a”: {“a1”: 1}}, {“a”: {“a2”: 2}}
]) == {“a”: {“a1”: 1, “a2”: 2}}
- redun.utils.pickle_dump(obj: Any, file: IO) None ¶
Official pickling method for redun.
This is used to standardize the protocol used for serialization.
- redun.utils.pickle_dumps(obj: Any) bytes ¶
Official pickling method for redun.
This is used to standardize the protocol used for serialization.
- redun.utils.pickle_loads(data: bytes) Any ¶
Official unpickling method for redun.
This is used to allow pickle previews with a preview context is active.
- redun.utils.pickle_preview(data: bytes, raise_error: bool = False) Any ¶
Loads a pickle file, falling back to previewing objects as needed.
For example, if the class is unknown, will return a PreviewClass subclass that can be used to peek at attributes.
- redun.utils.str2bool(text: str) bool ¶
Parse a string into a bool.
- redun.utils.trim_string(text: str, max_length: int = 200, ellipsis: str = '...') str ¶
If text is longer than max_length then return a trimmed string.
- redun.utils.utcnow() datetime ¶
Returns the current timestamp in UTC timezone.
- redun.utils.with_pickle_preview(raise_error: bool = False) Iterator[None] ¶
Enable pickle preview within a context.
We keep a count of how many nested contexts are active.
redun.value module¶
- class redun.value.Bool(instance: Any)¶
Bases:
ProxyValue
Augment builtins.bool to support argument parsing.
- classmethod parse_arg(raw_type: Type, arg: str) Any ¶
Parse a command line argument in a new Value.
- type¶
alias of
bool
- type_name: str | None = 'builtins.bool'¶
- class redun.value.DatetimeType(instance: Any)¶
Bases:
ProxyValue
Augment datetime.datetime to support argument parsing.
- classmethod parse_arg(raw_type: Type, arg: str) datetime ¶
Parse a command line argument in a new Value.
- type¶
alias of
datetime
- type_name: str | None = 'datetime.datetime'¶
- class redun.value.EnumType(instance: Any)¶
Bases:
ProxyValue
Augment enum.Enum to support argument parsing with choices.
- classmethod parse_arg(raw_type: Type, arg: str) Any ¶
Parse a command line argument in a new Value.
- type¶
alias of
Enum
- type_name: str | None = 'enum.Enum'¶
- class redun.value.FileCache(instance: Any)¶
Bases:
ProxyValue
Baseclass for caching values to Files.
This baseclass can be used to control where the serialized data of potentially large values is stored.
# Example of user-specific data type. class Data: def __init__(self, data): self.data = data # Subclassing from FileCache and setting `type = Data` will register # with redun that File-caching should be used for any Value of type # `Data`. class DataType(FileCache): type = Data # This path specifies where the files containing serialized # values should be stored. This variable accepts all paths # acceptable to File (e.g. local, s3, etc). base_path = "tmp" @task() def task1() -> Data: # This value will be serialized to a File instead of the default # redun database. return Data("my data")
- base_path = '.'¶
- classmethod deserialize(raw_type: type, filename: bytes) Any ¶
Returns a deserialization of bytes data into a new Value.
- serialize() bytes ¶
Serializes the Value into a byte sequence.
- type_name: str | None = 'redun.value.FileCache'¶
- class redun.value.Function(instance: Callable)¶
Bases:
ProxyValue
Value class to allow redun to hash and cache plain Python functions.
- classmethod deserialize(raw_type: Any, data: bytes) Any ¶
Returns a deserialization of bytes data into a new Value.
- property fullname: str¶
Return a fullname for a function including its module.
- get_hash(data: bytes | None = None) str ¶
Returns a hash for the value.
- is_valid() bool ¶
Value is valid to use from the cache if we can reassociate the function.
- classmethod parse_arg(raw_type: Type, arg: str) Any ¶
Parses function by fully qualified name from command-line.
- serialize() bytes ¶
Serializes the Value into a byte sequence.
- type¶
alias of
LambdaType
- type_name: str | None = 'builtins.function'¶
- exception redun.value.InvalidValueError¶
Bases:
Exception
- class redun.value.MetaValue(name, bases, dct, **kwargs)¶
Bases:
type
Registers Value classes as they are defined.
- class redun.value.ProxyValue(instance: Any)¶
Bases:
Value
Class for overriding Value behavior (hashing, serialization) for raw python types.
- get_hash(data: bytes | None = None) str ¶
Returns a hash for the value.
- proxy = True¶
- serialize() bytes ¶
Serializes the Value into a byte sequence.
- type: Any = None¶
- type_name: str | None = 'redun.value.ProxyValue'¶
- class redun.value.Set(instance: Any)¶
Bases:
ProxyValue
Augment builtins.set to support stable hashing.
- get_hash(data: bytes | None = None) str ¶
Returns a hash for the value.
- type¶
alias of
set
- type_name: str | None = 'builtins.set'¶
- exception redun.value.TypeError¶
Bases:
Exception
- class redun.value.TypeRegistry¶
Bases:
object
A registry for redun types and a dispatch for value methods.
- deserialize(type_name: str, data: bytes) Any ¶
Returns a deserialization of bytes data into a new Value.
- get_hash(value: Any, data: bytes | None = None) str ¶
Hashes a value according to the hash method precedence.
- get_serialization_format(value: Any) str ¶
Returns mimetype of serialization.
- get_type(raw_type: Type, use_default: bool = True) Type[Value] | None ¶
Return a registered Value class for a raw python type.
- get_type_name(raw_type: Type) str ¶
Returns the type name for a python type.
- is_valid(value: Any) bool ¶
Returns True if the value is still good to use since being cached.
For example, if the value was a File reference and the corresponding file on the filesystem has since been removed/altered, the File reference is no longer valid.
- is_valid_nested(nested_value: Any) bool ¶
Returns True if nested value is still good to use since being cached.
- parse_arg(raw_type: Type, arg: str) Any ¶
Parse a command-line argument of type value_type.
- parse_type_name(type_name: str) Type ¶
Parse a type name into a python type.
- postprocess(raw_value: Any, postprocess_args: dict) Any ¶
Post process a value resulting from a Task.
- preprocess(raw_value: Any, preprocess_args: dict) Any ¶
Preprocess a value before passing to a Task.
- serialize(value: Any) bytes ¶
Serializes the Value into a byte sequence.
- class redun.value.Value(*args, **kwargs)¶
Bases:
object
The base class for tracking inputs and outputs in redun, encapsulating both value and state, in a hashable and serializable form.
This class underpins the implementation of memoization of redun.Task`s. Hashing is used as the sole, unique identifier for a value; there is no fallback to deep equality testing. Hence, we can hash `Task inputs to identify repeated invocations and supply the cached result instead. Serialization makes it feasible to cache outputs of tasks.
In addition to handling ordinary values, i.e., data or objects represented in memory, complex workflows often require reasoning about state that may be complex or external, such as a database or filesystem. Redun accomplishes reasoning over this state by converting it into value-like semantics, after which, redun can proceed without differentiating between values and state. Specifically, this class provides that abstraction. Therefore, this class offers a more complex lifecycle than would be required for ordinary values.
The default hashing and serialization methods are designed for simple object values. Child classes may customize these as needed; see below for the API details.
We introduce two atypical lifecycle steps that are needed for more complex cases, especially those involving values that wrap state: validity checking and pre/post processing. The scheduler will arrange for pre- and post-processing hooks to be called on Value objects around their use in Task implementations. This allows users to implement some more complex concepts, such as lazy initialization or to temporarily allocate resources.
See docs/source/design.md for more information about “Validity” of values, which is an important concept for this class.
- classmethod deserialize(raw_type: type, data: bytes) Any ¶
Returns a deserialization of bytes data into a new Value.
- get_hash(data: bytes | None = None) str ¶
Returns a hash for the value. Users may override to provide custom behavior.
Most basic objects are covered by the default approach of hashing the binary pickle serialization.
- get_serialization_format() str ¶
Returns mimetype of serialization.
- is_valid() bool ¶
Returns True if the value may be used. For ordinary values, this will typically always be true. For state-tracking values, the deserialized cache value may no longer be valid. If this method returns False, the object will typically be discarded.
This method may be called repeatedly on the same object.
For example, if the value was a File reference and the corresponding file on the filesystem has since been removed/altered, the File reference is no longer valid.
- classmethod parse_arg(raw_type: type, arg: str) Any ¶
Parse a command line argument in a new Value.
- postprocess(postprocess_args: dict) Value ¶
Post process a value resulting from a Task, and return the postprocessed value.
This is a scheduler lifecycle method and is not typically invoked by users. This method may be called repeatedly on a particular instance, for example, if it is returned recursively.
- Parameters:
postprocess_args (dict) – Extra data provided by the scheduler. Implementations that are not part of redun should discard this argument.
- preprocess(preprocess_args: dict) Value ¶
Preprocess a value before passing it to a Task, and return the preprocessed value.
This is a scheduler lifecycle method and is not typically invoked by users. This method may be called repeatedly on a particular instance, for example, if a single Value is passed to multiple `Task`s.
- Parameters:
preprocess_args (dict) – Extra data provided by the scheduler. Implementations that are not part of redun should discard this argument.
- serialize() bytes ¶
Serializes the Value into a byte sequence.
- type_name: str | None = 'redun.value.Value'¶
- redun.value.get_raw_type_name(raw_type: Any) str ¶
- redun.value.get_type_registry() TypeRegistry ¶
Return the global singleton instance of TypeRegistry.
- redun.value.is_unknown_function(func: Callable) bool ¶
Returns True if the function was unknown when trying to reimport.
- redun.value.make_unknown_function(func_name: str) Callable ¶
Returns a stub function in place of a function that could not be reimported.
redun.version module¶
redun.visualize module¶
Module contents¶
- class redun.Dir(path: str)¶
Bases:
FileSet
- classes = <redun.file.FileClasses object>¶
- exists() bool ¶
- filesystem: FileSystem¶
- mkdir() None ¶
- rel_path(path: str) str ¶
- rmdir(recursive: bool = False) None ¶
- shell_copy_to(dest_path: str, as_mount: bool = False) str ¶
Returns a shell command for copying the directory to a destination path.
- Parameters:
dest_path (str) – Destination path to copy to.
as_mount (bool) – Treat src/dest as mounted.
- stage(local: str | None = None) StagingDir ¶
- type_basename = 'Dir'¶
- type_name: str | None = 'redun.Dir'¶
- class redun.File(path: str)¶
Bases:
Value
Class for assisting file IO in redun tasks.
File objects are hashed based on their contents and abstract over storage backends such as local disk or cloud object storage.
- basename() str ¶
- classes = <redun.file.FileClasses object>¶
- dirname() str ¶
- exists() bool ¶
- filesystem: FileSystem¶
- get_hash(data: bytes | None = None) str ¶
Returns a hash for the value. Users may override to provide custom behavior.
Most basic objects are covered by the default approach of hashing the binary pickle serialization.
- property hash: str¶
- is_valid() bool ¶
Returns True if the value may be used. For ordinary values, this will typically always be true. For state-tracking values, the deserialized cache value may no longer be valid. If this method returns False, the object will typically be discarded.
This method may be called repeatedly on the same object.
For example, if the value was a File reference and the corresponding file on the filesystem has since been removed/altered, the File reference is no longer valid.
- isdir() bool ¶
- isfile() bool ¶
- open(mode: str = 'r', encoding: str | None = None, **kwargs: Any) IO ¶
Open a file stream.