redun.executors package

Submodules

redun.executors.alias module

class redun.executors.alias.AliasExecutor(name: str, scheduler: Scheduler | None = None, config: SectionProxy | None = None, target: Executor | None = None)

Bases: Executor

A simple executor that lazily defers to another one by name. This can be useful when tasks are defined with differing executors, allowing them to be configured separately, but sometimes need to share the underlying executor; this can be important to enforce per-executor resource limits.

Unfortunately, this implementation isn’t able to check validity of the link at init time.

start() None
stop() None
submit(job: Job) None

Execute the provided job.

Implementations must provide results back to the scheduler by either calling done_job or reject_job.

submit_script(job: Job) None

Execute the provided script job.

Implementations must provide results back to the scheduler by either calling done_job or reject_job.

redun.executors.aws_batch module

class redun.executors.aws_batch.AWSBatchExecutor(name: str, scheduler: Scheduler | None = None, config: SectionProxy | None = None)

Bases: Executor

A redun Executor for running jobs on AWS Batch.

gather_inflight_jobs() None
get_array_child_jobs(job_id: str, statuses: List[str] = ['SUBMITTED', 'PENDING', 'RUNNABLE', 'STARTING', 'RUNNING']) List[Dict[str, Any]]
get_jobs(statuses: List[str] | None = None) Iterator[dict]

Returns AWS Batch Job statuses from the AWS API.

kill_jobs(job_ids: Iterable[str], reason: str = 'Terminated by user') Iterator[dict]

Kill AWS Batch Jobs.

scratch_root() str
set_scheduler(scheduler: Scheduler) None
stop() None

Stop Executor and monitoring thread.

submit(job: Job) None

Submit Job to executor.

submit_script(job: Job) None

Submit Job for script task to executor.

exception redun.executors.aws_batch.AWSBatchJobTimeoutError

Bases: Exception

Custom exception to raise when AWS Batch Jobs are killed due to timeout.

redun.executors.aws_batch.aws_describe_jobs(job_ids: List[str], chunk_size: int = 100, aws_region: str = 'us-west-2') Iterator[dict]

Returns AWS Batch Job descriptions from the AWS API.

redun.executors.aws_batch.batch_submit(batch_job_args: dict, queue: str, image: str, job_def_name: str | None = None, job_def_suffix: str = '-jd', job_name: str = 'batch-job', array_size: int = 0, memory: int = 4, vcpus: int = 1, gpus: int = 0, num_nodes: int | None = None, shared_memory: int | None = None, retries: int = 1, role: str | None = None, job_def_extra: dict | None = None, aws_region: str = 'us-west-2', privileged: bool = False, autocreate_job_def: bool = True, timeout: int | None = None, batch_tags: Dict[str, str] | None = None, propagate_tags: bool = True, share_id: str | None = None, scheduling_priority_override: str | None = None) Dict[str, Any]

Actually perform job submission to AWS batch. Create or retrieve the job definition, then use it to submit the job.

batch_job_argsdict

These are passed as kwargs to the submit_job API. Generally, it must configure the commands to be run by setting node or container overrides. This function will modify the provided dictionary, at a minimum applying overrides for the resource requirements stated by other arguments to this function.

redun.executors.aws_batch.create_job_override_command(command: List[str], command_worker: List[str] | None = None, num_nodes: int | None = None) Dict[str, Any]

Format the command into the form needed for the AWS Batch submit_job API.

command: A list of tokens comprising the command command_worker: List of command tokens for worker nodes, if present. num_nodes: If None, this is a single-node job. If not None, a multi-node job.

Returns a dictionary to be passed to submit_job as kwargs.

redun.executors.aws_batch.ecr_push(repo_name: str, registry: str | None = None) None
redun.executors.aws_batch.equiv_job_def(job_def1: dict, job_def2: dict) bool

Returns True if two job definition are equivalent.

Limit equality to the keys of job_def1.

redun.executors.aws_batch.format_log_stream_event(event: dict) str

Format a logStream event as a line.

redun.executors.aws_batch.get_batch_job_name(prefix: str, job_hash: str, array: bool = False) str

Return a AWS Batch Job name by either job or job hash.

redun.executors.aws_batch.get_batch_job_options(job_options: dict) dict

Returns AWS Batch-specific job options from general job options.

redun.executors.aws_batch.get_default_registry() str

Returns the default ECR registry.

redun.executors.aws_batch.get_hash_from_job_name(job_name: str) str | None

Returns the job/task eval_hash that corresponds with a particular job name on Batch.

redun.executors.aws_batch.get_job_def_revision(job_def_name: str) int

Returns the job definition revision from a job definition name.

redun.executors.aws_batch.get_job_definition(job_def_name: str, aws_region: str = 'us-west-2', batch_client: Session = None) dict

Returns a job definition with the supplied name or empty dict if no matches are found..

The returned job definition will be the most recent, active revision if there are more than one returned from the API.

redun.executors.aws_batch.get_job_details(image: str, command: List[str] | None = None, memory: int = 4, vcpus: int = 1, num_nodes: int | None = None, shared_memory: int | None = None, role: str | None = None, aws_region: str = 'us-west-2', privileged: bool = False) dict

Returns a JSON that can be used for creating a job definition.

redun.executors.aws_batch.get_job_log_stream(job: dict | None, aws_region: str) str | None

Extract the log stream from a JobDetail status dictionary. For non-multi-node jobs, (i.e., single node and array jobs), this is simply a field in the detail dictionary. But for multi-node jobs, this requires another query to get the log stream for the main node.

redun.executors.aws_batch.get_or_create_job_definition(job_def_name: str, image: str, command: List[str] | None = None, memory: int = 4, vcpus: int = 1, num_nodes: int | None = None, shared_memory: int | None = None, role: str | None = None, aws_region: str = 'us-west-2', privileged: bool = False, job_def_extra: dict | None = None) dict

Returns a job definition with the specified requirements. Although the resource requirements provided are used when creating a job, they are specifically excluded from creating new job definitions.

Either an existing active job definition is used or a new one is created.

num_nodes - if present, create a multi-node batch job.

redun.executors.aws_batch.get_or_create_repo(repo_name: str) dict

Get or create an ECR repository.

redun.executors.aws_batch.is_array_job_name(job_name: str) bool
redun.executors.aws_batch.iter_batch_job_log_lines(job_id: str, aws_region: str = 'us-west-2', log_group_name: str = '/aws/batch/job', reverse: bool = False, required: bool = True) Iterator[str]

Iterate through the log lines of an AWS Batch job.

redun.executors.aws_batch.iter_batch_job_logs(job_id: str, aws_region: str = 'us-west-2', log_group_name: str = '/aws/batch/job', limit: int | None = None, reverse: bool = False, required: bool = True) Iterator[dict]

Iterate through the log events of an AWS Batch job.

redun.executors.aws_batch.iter_batch_job_status(job_ids: List[str], pending_truncate: int = 10, aws_region: str = 'us-west-2') Iterator[dict]

Yields AWS Batch jobs statuses.

If pending_truncate is used (> 0) then rely on AWS Batch’s behavior of running jobs approximately in order. This allows us to truncate the polling of jobs once we see a sufficient number of pending jobs.

Parameters:
  • job_ids (List[str]) – Batch job ids that should be in order of submission.

  • pending_truncate (int) – After seeing pending_truncate number of pending jobs, assume the rest are pending. Use a negative int to disable this optimization.

  • aws_region (str) – AWS region that jobs are running in.

redun.executors.aws_batch.make_job_def_name(image_name: str, job_def_suffix: str = '-jd') str

Autogenerate a job definition name from an image name.

redun.executors.aws_batch.parse_job_error(s3_scratch_prefix: str, job: Job, batch_job_metadata: dict | None = None) Tuple[Exception, Traceback]

Parse task error from s3 scratch path.

redun.executors.aws_batch.parse_job_logs(batch_job_id: str, max_lines: int = 1000, required: bool = True, aws_region: str = 'us-west-2') Iterator[str]

Iterates through most recent CloudWatch logs of an AWS Batch Job.

redun.executors.aws_batch.parse_nullable_json(text: str | None) Any
redun.executors.aws_batch.submit_command(image: str, queue: str, s3_scratch_prefix: str, job: Job, command: str, job_options: dict = {}, aws_region: str = 'us-west-2') dict

Submit a shell command to AWS Batch.

redun.executors.aws_batch.submit_task(image: str, queue: str, s3_scratch_prefix: str, job: Job, a_task: Task, args: Tuple = (), kwargs: Dict[str, Any] = {}, job_options: dict = {}, array_uuid: str | None = None, array_size: int = 0, code_file: File | None = None, aws_region: str = 'us-west-2') Dict[str, Any]

Submit a redun Task to AWS Batch.

redun.executors.aws_glue module

exception redun.executors.aws_glue.AWSGlueError

Bases: Exception

class redun.executors.aws_glue.AWSGlueExecutor(name: str, scheduler: Scheduler | None = None, config=None)

Bases: Executor

A redun Executor for running jobs on AWS Glue.

gather_inflight_jobs() None
get_jobs(statuses: List[str] | None = None) Iterator[dict]

Gets all job runs with given status.

get_or_create_job_definition() None

Get or create the default Glue job.

scratch_root() str
stop() None
submit(job: Job) None

Submit job to executor.

submit_pending_job(job: Job) str | None

Returns true if job submission was successful

exception redun.executors.aws_glue.AWSGlueJobStoppedError

Bases: Exception

exception redun.executors.aws_glue.AWSGlueJobTimeoutError

Bases: Exception

Custom exception to raise when AWS Glue jobs are killed due to timeout.

redun.executors.aws_glue.get_default_glue_service_role(account_num: str | None = None, aws_region: str = 'us-west-2') str

Returns the default Glue service role for the current account.

redun.executors.aws_glue.get_glue_oneshot_scratch_file(s3_scratch_prefix: str, code_hash: str) str

Returns s3 scratch path for a code package tar file.

redun.executors.aws_glue.get_job_insight_traceback(job_id: str, log_group_name: str, aws_region: str = 'us-west-2', max_results=200) List[str]

Gets the traceback from AWS’ Glue Job insights.

redun.executors.aws_glue.get_or_create_glue_job_definition(script_location: str, role: str, glue_version: str, temp_dir: str, extra_py_files: str, spark_history_dir: str, enable_metrics: bool = False, additional_python_modules: List[str] = ['alembic>=1.4', 'mako', 'promise', 's3fs>=2021.11.1', 'sqlalchemy>=1.4.0or<2.1'], aws_region: str = 'us-west-2') str

Gets or creates an AWS Glue Job.

Parameters:
  • script_location (str) – S3 path of script that runs Spark job.

  • role (str) – ARN of IAM role to associate with Glue job.

  • glue_version (str) – Glue version to use for the job.

  • temp_dir (str) – S3 path of scratch directory associated with job data and code.

  • extra_py_files (str) – Comma separated S3 file paths for additional Python code needed for job.

  • spark_history_dir (str) – S3 path where Spark event logs are stored.

  • enable_metrics (bool) – Whether to enable observability and profiling via Cloudwatch.

  • additional_python_modules (List[str]) – Python modules to be installed with pip before job start.

  • aws_region (str) – AWS region in which to run the Glue job.

Returns:

Unique Glue job definition name.

Return type:

str

redun.executors.aws_glue.get_redun_lib_files() Iterator[str]

Iterates through the files of the redun library.

redun.executors.aws_glue.get_redun_lib_scratch_file(s3_scratch_prefix: str, lib_hash: str) str

Returns s3 scratch path for a code package tar file.

redun.executors.aws_glue.get_spark_history_dir(s3_scratch_prefix: str) str

Returns s3 scratch path for Spark UI monitoring files.

redun.executors.aws_glue.glue_describe_jobs(job_ids: List[str], glue_job_name: str, aws_region: str = 'us-west-2') Iterator[Dict[str, Any]]
redun.executors.aws_glue.package_redun_lib(s3_scratch_prefix: str) File

Package redun lib to S3.

redun.executors.aws_glue.parse_job_error(s3_scratch_prefix: str, job: Job, glue_job_metadata: dict | None = None) Tuple[Exception, Traceback]

Parse job error from s3 scratch path.

redun.executors.aws_glue.submit_glue_job(job: Job, a_task: Task, s3_scratch_prefix: str, glue_job_name: str, redun_zip_location: str, code_file: File, job_options: dict = {}, aws_region: str = 'us-west-2') Dict[str, Any]

Submits a redun task to AWS glue.

Parameters:
  • job (Job) – The redun.scheduler.Job that is in the process of resolving the Task a_task.

  • a_task (Task) – The redun.task.Task to be run by the job.

  • s3_scratch_prefix (str) – Prefix for S3 url where job files, including additional Python and data files, will reside.

  • glue_job_name (str) – Name of Glue job definition to run.

  • redun_zip_location (str) – S3 path of zipped redun source code.

  • code_file (File) – Code to be executed as a redun workflow. Path within redun.file.File should be an S3 url.

  • job_options (Dict[str, Any]) –

    Option map to configure job initialization and execution.

    Required fields

    worker_type: str timeout: int workers: int

    Optional fields

    additional_libs: List[str] extra_py_files: List[str] extra_files: List[str]

  • aws_region (str) – AWS region in which to run the Glue job.

Returns:

Response contains JobRunId field.

Return type:

Dict[str, Any]

Raises:

ValueError – If worker_type is not a valid predefined Glue worker type string.

redun.executors.aws_utils module

class redun.executors.aws_utils.JobStatus(all, pending, inflight, success, failure, stopped, timeout)

Bases: NamedTuple

all: List[str]

Alias for field number 0

failure: List[str]

Alias for field number 4

inflight: List[str]

Alias for field number 2

pending: List[str]

Alias for field number 1

stopped: List[str]

Alias for field number 5

success: List[str]

Alias for field number 3

timeout: List[str]

Alias for field number 6

redun.executors.aws_utils.copy_to_s3(file_path: str, s3_scratch_dir: str) str

Copies a file to the S3 scratch directory if it is not already on S3. Returns the path to the file on S3.

redun.executors.aws_utils.get_aws_client(service: str, aws_region: str = 'us-west-2') Session

Get an AWS Client with caching.

redun.executors.aws_utils.get_aws_env_vars() Dict[str, str]

Determines the current AWS credentials.

redun.executors.aws_utils.get_aws_user(aws_region: str = 'us-west-2') str

Returns the current AWS user.

redun.executors.aws_utils.get_default_region() str

Returns the default AWS region.

redun.executors.aws_utils.get_simple_aws_user(aws_region: str = 'us-west-2') str

Returns the current AWS user simplified.

By full AWS identify has the format:

arn:aws:sts::ACCOUNT_ID:USER_NAME arn:aws:sts::ACCOUNT_ID:assumed-role/ROLE_NAME/USER_NAME

This function will return USER_NAME as a simplification.

redun.executors.aws_utils.is_ec2_instance() bool

Returns True if this process is running on an EC2 instance.

We use the presence of a link-local address as a sign we are on an EC2 instance.

redun.executors.aws_utils.iter_log_stream(log_group_name: str, log_stream: str, limit: int | None = None, reverse: bool = False, required: bool = True, aws_region: str = 'us-west-2') Iterator[dict]

Iterate through the events of logStream.

redun.executors.aws_utils.set_boto_config(config: Dict[str, Any]) None

Set the boto3 config. Resets the cache of clients as a side effect, in case any were made that did not use this config.

redun.executors.base module

class redun.executors.base.Executor(name: str, scheduler: Scheduler | None = None, config=None)

Bases: object

Note that most Executors should Register themselves with the method register_executor, below.

log(*messages: Any, **kwargs) None

Display log message through Scheduler.

scratch_root() str
set_scheduler(scheduler: Scheduler) None
start() None
stop() None
submit(job: Job) None

Execute the provided job.

Implementations must provide results back to the scheduler by either calling done_job or reject_job.

submit_script(job: Job) None

Execute the provided script job.

Implementations must provide results back to the scheduler by either calling done_job or reject_job.

exception redun.executors.base.ExecutorError

Bases: Exception

redun.executors.base.get_executor_class(executor_name: str, required: bool = True) Type[Executor] | None

Get an Executor by name from the executor registry.

Parameters:
  • executor_name (str) – Name of executor class to retrieve.

  • required (bool) – If True, raises error if executor is not registered. If False, None is returned for unknown executor name.

redun.executors.base.get_executor_from_config(executors_config: dict, executor_name: str) Executor

Create and return the executor by name. Raise an error if it is not present.

redun.executors.base.get_executors_from_config(executors_config: dict) Iterator[Executor]

Instantiate executors defined in an executors config section.

redun.executors.base.load_task_module(module_name: str, task_name: str) None

Helper method that Executors may call to load a task’s module code.

Presently, the main benefit is this method provides better error handling.

Parameters:
  • module_name – Task module name.

  • task_name – Task name

redun.executors.base.register_executor(executor_name: str) Callable

Register an Executor class to be used by the scheduler.

Note that registered classes are responsible for ensuring their modules get loaded, so this decorator is actually run. For example, this can be done by adding them to the redun module __init__.py.

redun.executors.code_packaging module

redun.executors.code_packaging.create_tar(tar_path: str, file_paths: Iterable[str], arcname_prefix: str | None = None) File

Create a tar file from local file paths. :param arcname_prefix: prefix to add to each file path in the tar file.

redun.executors.code_packaging.create_zip(zip_path: str, base_path: str, file_paths: Iterable[str], arcname_prefix: str | None = None) File

Create a zip file from local file paths. :param arcname_prefix: prefix to add to each file path in the tar file.

redun.executors.code_packaging.extract_tar(tar_file: File, dest_dir: str = '.') None

Extract a tar file to local paths.

redun.executors.code_packaging.find_code_files(basedir: str = '.', includes: List[str] | None = None, excludes: List[str] | None = None) Iterable[str]

Find all the workflow code files consistent with the include/exclude patterns.

redun.executors.code_packaging.package_code(scratch_prefix: str, code_package: dict = {}, use_zip: bool = False, basename: str | None = None, arcname_prefix: str | None = None) File

Package code to scratch directory. :param basename: If provided, uses this string as the basename instead of the

calculated tarball hash.

Parameters:

arcname_prefix – Optional suffix to append to tarball basename.

redun.executors.code_packaging.parse_code_package_config(config) dict | bool

Parse the code package options from a AWSBatchExecutor config.

redun.executors.command module

redun.executors.command.get_oneshot_command(scratch_prefix: str, job: Job, a_task: Task, args: Tuple = (), kwargs: Dict[str, Any] = {}, job_options: dict = {}, code_file: File | None = None, array_uuid: str | None = None, input_path: str | None = None, output_path: str | None = None, error_path: str | None = None) List[str]

Returns a redun oneshot command for a Job.

redun.executors.command.get_script_task_command(scratch_prefix: str, job: Job, command: str, exit_command: str = '', as_mount: bool = False) List[str]

Returns a shell script to run a script task.

redun.executors.docker module

exception redun.executors.docker.DockerError

Bases: Exception

class redun.executors.docker.DockerExecutor(name: str, scheduler: Scheduler | None = None, config: SectionProxy | None = None)

Bases: Executor

A redun Executor for running jobs on local Docker containers.

scratch_root() str
set_scheduler(scheduler: Scheduler) None
stop() None

Stop Executor and monitoring thread.

submit(job: Job) None

Submit Job to executor.

submit_script(job: Job) None

Submit Job for script task to executor.

redun.executors.docker.get_docker_executor_config(config: SectionProxy) SectionProxy

Returns a config for DockerExecutor.

redun.executors.docker.get_docker_job_options(job_options: dict, scratch_path: str) dict

Returns Docker-specific job options from general job options.

Adds the scratch_path as a volume mount.

redun.executors.docker.iter_job_status(scratch_prefix: str, job_id2job: Dict[str, Job]) Iterator[dict]

Returns local Docker jobs grouped by their status.

redun.executors.docker.run_docker(command: List[str], image: str, volumes: Iterable[Tuple[str, str]] = [], interactive: bool = True, cleanup: bool = False, memory: int = 4, vcpus: int = 1, gpus: int = 0, shared_memory: int | None = None, include_aws_env: bool = False) str

Run a Docker container locally.

Parameters:
  • command (List[str]) – A shell command to run within the docker container (e.g. [“ls” “-la”]).

  • image (str) – A Docker image.

  • volumes (Iterable[Tuple[str, srt]]) – A list of (‘host’, ‘container’) path pairs for volume mounting.

  • interactive (bool) – If True, the Docker container is run in interactive mode.

  • cleanup (bool) – If True, remove the container after execution.

  • memory (int) – Number of GB of memory to reserve for the container.

  • vcpus (int) – Number of CPUs to reserve for the container.

  • gpus (int) – Number of GPUs to reserve for the container.

  • shared_memory (Optional[int]) – Number of GB of shared memory to reserve for the container.

  • include_aws_env (bool) – If True, forward AWS environment variables to the container.

redun.executors.docker.submit_command(image: str, scratch_prefix: str, job: Job, command: str, job_options: dict = {}, include_aws_env: bool = False) dict

Submit a shell command to Docker.

redun.executors.docker.submit_task(image: str, scratch_prefix: str, job: Job, a_task: Task, args: Tuple = (), kwargs: Dict[str, Any] = {}, job_options: dict = {}, code_file: File | None = None, include_aws_env: bool = False) Dict[str, Any]

Submit a redun Task to Docker.

redun.executors.gcp_batch module

redun.executors.gcp_utils module

redun.executors.k8s module

redun.executors.k8s_utils module

redun.executors.launch module

redun.executors.launch.launch_script(config: Config, script_command: List[str], executor: Executor | None = None, executor_name: str | None = None, task_options: dict | None = None, execution_id: str | None = None) None

Submit the provided script command to the executor, then exit.

Use a local scheduler with the default config. This means we won’t record the entry point, but we have no intention of being around long enough to record the results, so there’s not much point.

WARNING: This won’t actually work on all executor types, such as the local ones. To work, the executor needs to be “fire and forget” for submit_script.

Parameters:
  • config (Config) – Config object containing executor information.

  • script_command (List[str]) – The script command to run on the executor.

  • executor (Optional[Executor]) – Optional Executor on which to run the script. Must be set if executor_name is not set.

  • executor_name (Optional[str]) – Name of the executor in the config. Must be set if executor is not set.

  • task_options (Optional[dict]) – Task options to pass to the script task.

  • execution_id (Optional[str]) – If provided, use this execution id. This is only relevant for tagging.

Return type:

None

redun.executors.local module

class redun.executors.local.LocalExecutor(name: str, scheduler: Scheduler | None = None, config=None, mode: str = 'thread')

Bases: Executor

A redun Executor for running jobs locally using a thread or process pool.

DEFAULT_START_METHOD = 'forkserver'
MODES = ['thread', 'process']
START_METHODS = ['fork', 'spawn', 'forkserver']
scratch_root() str
stop() None

Stop Executor pools.

submit(job: Job) None

Execute the provided job.

Implementations must provide results back to the scheduler by either calling done_job or reject_job.

submit_script(job: Job) None

Execute the provided script job.

Implementations must provide results back to the scheduler by either calling done_job or reject_job.

redun.executors.local.exec_script_task(mode: str, module_name: str, task_fullname: str, args: Tuple, kwargs: dict) bytes

Execute a script task from the task registry.

redun.executors.local.exec_task(mode: str, module_name: str, task_fullname: str, args: Tuple, kwargs: dict) Any

Execute a task in the new process.

redun.executors.scratch module

Utilities for working with executor scratch directories.

exception redun.executors.scratch.ExceptionNotFoundError

Bases: ScratchError

Error when serialized exception is not found in scratch directory.

exception redun.executors.scratch.ScratchError

Bases: Exception

Error when reading data from scratch directory.

redun.executors.scratch.get_array_scratch_file(scratch_prefix: str, job_array_id: str, filename: str) str

Returns an scratch path for a file related to an array of jobs.

redun.executors.scratch.get_code_scratch_file(scratch_prefix: str, tar_hash: str, use_zip: bool = False) str

Returns scratch path for a code package tar file.

redun.executors.scratch.get_execution_scratch_file(scratch_prefix: str, execution_id: str, filename: str) str

Returns an scratch path for a sending data to and from a remote execution.

redun.executors.scratch.get_job_scratch_dir(scratch_prefix: str, job: Job) str

Returns scratch directory for a redun Job.

redun.executors.scratch.get_job_scratch_file(scratch_prefix: str, job: Job, filename: str) str

Returns scratch path for a file related to a redun Job.

redun.executors.scratch.parse_job_error(scratch_prefix: str, job: Job) Tuple[Exception, Traceback]

Returns job error from scratch directory.

redun.executors.scratch.parse_job_result(scratch_prefix: str, job: Job, is_valid_value: Callable[[Any], bool] | None = None) Tuple[Any, bool]

Returns job output from scratch directory.

Returns a tuple of (result, exists).

Module contents