redun.executors package#

Submodules#

redun.executors.alias module#

class redun.executors.alias.AliasExecutor(name: str, scheduler: Scheduler | None = None, config: SectionProxy | None = None, target: Executor | None = None)#

Bases: Executor

A simple executor that lazily defers to another one by name. This can be useful when tasks are defined with differing executors, allowing them to be configured separately, but sometimes need to share the underlying executor; this can be important to enforce per-executor resource limits.

Unfortunately, this implementation isn’t able to check validity of the link at init time.

start() None#
stop() None#
submit(job: Job) None#

Execute the provided job.

Implementations must provide results back to the scheduler by either calling done_job or reject_job.

submit_script(job: Job) None#

Execute the provided script job.

Implementations must provide results back to the scheduler by either calling done_job or reject_job.

redun.executors.aws_batch module#

class redun.executors.aws_batch.AWSBatchExecutor(name: str, scheduler: Scheduler | None = None, config: SectionProxy | None = None)#

Bases: Executor

A redun Executor for running jobs on AWS Batch.

gather_inflight_jobs() None#
get_array_child_jobs(job_id: str, statuses: List[str] = ['SUBMITTED', 'PENDING', 'RUNNABLE', 'STARTING', 'RUNNING']) List[Dict[str, Any]]#
get_jobs(statuses: List[str] | None = None) Iterator[dict]#

Returns AWS Batch Job statuses from the AWS API.

kill_jobs(job_ids: Iterable[str], reason: str = 'Terminated by user') Iterator[dict]#

Kill AWS Batch Jobs.

scratch_root() str#
set_scheduler(scheduler: Scheduler) None#
stop() None#

Stop Executor and monitoring thread.

submit(job: Job) None#

Submit Job to executor.

submit_script(job: Job) None#

Submit Job for script task to executor.

exception redun.executors.aws_batch.AWSBatchJobTimeoutError#

Bases: Exception

Custom exception to raise when AWS Batch Jobs are killed due to timeout.

redun.executors.aws_batch.aws_describe_jobs(job_ids: List[str], chunk_size: int = 100, aws_region: str = 'us-west-2') Iterator[dict]#

Returns AWS Batch Job descriptions from the AWS API.

redun.executors.aws_batch.batch_submit(batch_job_args: dict, queue: str, image: str, job_def_name: str | None = None, job_def_suffix: str = '-jd', job_name: str = 'batch-job', array_size: int = 0, memory: int = 4, vcpus: int = 1, gpus: int = 0, num_nodes: int | None = None, shared_memory: int | None = None, retries: int = 1, role: str | None = None, job_def_extra: dict | None = None, aws_region: str = 'us-west-2', privileged: bool = False, autocreate_job_def: bool = True, timeout: int | None = None, batch_tags: Dict[str, str] | None = None, propagate_tags: bool = True, share_id: str | None = None, scheduling_priority_override: str | None = None) Dict[str, Any]#

Actually perform job submission to AWS batch. Create or retrieve the job definition, then use it to submit the job.

batch_job_argsdict

These are passed as kwargs to the submit_job API. Generally, it must configure the commands to be run by setting node or container overrides. This function will modify the provided dictionary, at a minimum applying overrides for the resource requirements stated by other arguments to this function.

redun.executors.aws_batch.create_job_override_command(command: List[str], command_worker: List[str] | None = None, num_nodes: int | None = None) Dict[str, Any]#

Format the command into the form needed for the AWS Batch submit_job API.

command: A list of tokens comprising the command command_worker: List of command tokens for worker nodes, if present. num_nodes: If None, this is a single-node job. If not None, a multi-node job.

Returns a dictionary to be passed to submit_job as kwargs.

redun.executors.aws_batch.ecr_push(repo_name: str, registry: str | None = None) None#
redun.executors.aws_batch.equiv_job_def(job_def1: dict, job_def2: dict) bool#

Returns True if two job definition are equivalent.

Limit equality to the keys of job_def1.

redun.executors.aws_batch.format_log_stream_event(event: dict) str#

Format a logStream event as a line.

redun.executors.aws_batch.get_batch_job_name(prefix: str, job_hash: str, array: bool = False) str#

Return a AWS Batch Job name by either job or job hash.

redun.executors.aws_batch.get_batch_job_options(job_options: dict) dict#

Returns AWS Batch-specific job options from general job options.

redun.executors.aws_batch.get_default_registry() str#

Returns the default ECR registry.

redun.executors.aws_batch.get_hash_from_job_name(job_name: str) str | None#

Returns the job/task eval_hash that corresponds with a particular job name on Batch.

redun.executors.aws_batch.get_job_def_revision(job_def_name: str) int#

Returns the job definition revision from a job definition name.

redun.executors.aws_batch.get_job_definition(job_def_name: str, aws_region: str = 'us-west-2', batch_client: Session = None) dict#

Returns a job definition with the supplied name or empty dict if no matches are found..

The returned job definition will be the most recent, active revision if there are more than one returned from the API.

redun.executors.aws_batch.get_job_details(image: str, command: List[str] | None = None, memory: int = 4, vcpus: int = 1, num_nodes: int | None = None, shared_memory: int | None = None, role: str | None = None, aws_region: str = 'us-west-2', privileged: bool = False) dict#

Returns a JSON that can be used for creating a job definition.

redun.executors.aws_batch.get_job_log_stream(job: dict | None, aws_region: str) str | None#

Extract the log stream from a JobDetail status dictionary. For non-multi-node jobs, (i.e., single node and array jobs), this is simply a field in the detail dictionary. But for multi-node jobs, this requires another query to get the log stream for the main node.

redun.executors.aws_batch.get_or_create_job_definition(job_def_name: str, image: str, command: List[str] | None = None, memory: int = 4, vcpus: int = 1, num_nodes: int | None = None, shared_memory: int | None = None, role: str | None = None, aws_region: str = 'us-west-2', privileged: bool = False, job_def_extra: dict | None = None) dict#

Returns a job definition with the specified requirements. Although the resource requirements provided are used when creating a job, they are specifically excluded from creating new job definitions.

Either an existing active job definition is used or a new one is created.

num_nodes - if present, create a multi-node batch job.

redun.executors.aws_batch.get_or_create_repo(repo_name: str) dict#

Get or create an ECR repository.

redun.executors.aws_batch.is_array_job_name(job_name: str) bool#
redun.executors.aws_batch.iter_batch_job_log_lines(job_id: str, aws_region: str = 'us-west-2', log_group_name: str = '/aws/batch/job', reverse: bool = False, required: bool = True) Iterator[str]#

Iterate through the log lines of an AWS Batch job.

redun.executors.aws_batch.iter_batch_job_logs(job_id: str, aws_region: str = 'us-west-2', log_group_name: str = '/aws/batch/job', limit: int | None = None, reverse: bool = False, required: bool = True) Iterator[dict]#

Iterate through the log events of an AWS Batch job.

redun.executors.aws_batch.iter_batch_job_status(job_ids: List[str], pending_truncate: int = 10, aws_region: str = 'us-west-2') Iterator[dict]#

Yields AWS Batch jobs statuses.

If pending_truncate is used (> 0) then rely on AWS Batch’s behavior of running jobs approximately in order. This allows us to truncate the polling of jobs once we see a sufficient number of pending jobs.

Parameters:
  • job_ids (List[str]) – Batch job ids that should be in order of submission.

  • pending_truncate (int) – After seeing pending_truncate number of pending jobs, assume the rest are pending. Use a negative int to disable this optimization.

  • aws_region (str) – AWS region that jobs are running in.

redun.executors.aws_batch.make_job_def_name(image_name: str, job_def_suffix: str = '-jd') str#

Autogenerate a job definition name from an image name.

redun.executors.aws_batch.parse_job_error(s3_scratch_prefix: str, job: Job, batch_job_metadata: dict | None = None) Tuple[Exception, Traceback]#

Parse task error from s3 scratch path.

redun.executors.aws_batch.parse_job_logs(batch_job_id: str, max_lines: int = 1000, required: bool = True, aws_region: str = 'us-west-2') Iterator[str]#

Iterates through most recent CloudWatch logs of an AWS Batch Job.

redun.executors.aws_batch.parse_nullable_json(text: str | None) Any#
redun.executors.aws_batch.submit_command(image: str, queue: str, s3_scratch_prefix: str, job: Job, command: str, job_options: dict = {}, aws_region: str = 'us-west-2') dict#

Submit a shell command to AWS Batch.

redun.executors.aws_batch.submit_task(image: str, queue: str, s3_scratch_prefix: str, job: Job, a_task: Task, args: Tuple = (), kwargs: Dict[str, Any] = {}, job_options: dict = {}, array_uuid: str | None = None, array_size: int = 0, code_file: File | None = None, aws_region: str = 'us-west-2') Dict[str, Any]#

Submit a redun Task to AWS Batch.

redun.executors.aws_glue module#

exception redun.executors.aws_glue.AWSGlueError#

Bases: Exception

class redun.executors.aws_glue.AWSGlueExecutor(name: str, scheduler: Scheduler | None = None, config=None)#

Bases: Executor

A redun Executor for running jobs on AWS Glue.

gather_inflight_jobs() None#
get_jobs(statuses: List[str] | None = None) Iterator[dict]#

Gets all job runs with given status.

get_or_create_job_definition() None#

Get or create the default Glue job.

scratch_root() str#
stop() None#
submit(job: Job) None#

Submit job to executor.

submit_pending_job(job: Job) str | None#

Returns true if job submission was successful

exception redun.executors.aws_glue.AWSGlueJobStoppedError#

Bases: Exception

exception redun.executors.aws_glue.AWSGlueJobTimeoutError#

Bases: Exception

Custom exception to raise when AWS Glue jobs are killed due to timeout.

redun.executors.aws_glue.get_default_glue_service_role(account_num: str | None = None, aws_region: str = 'us-west-2') str#

Returns the default Glue service role for the current account.

redun.executors.aws_glue.get_glue_oneshot_scratch_file(s3_scratch_prefix: str, code_hash: str) str#

Returns s3 scratch path for a code package tar file.

redun.executors.aws_glue.get_job_insight_traceback(job_id: str, log_group_name: str, aws_region: str = 'us-west-2', max_results=200) List[str]#

Gets the traceback from AWS’ Glue Job insights.

redun.executors.aws_glue.get_or_create_glue_job_definition(script_location: str, role: str, temp_dir: str, extra_py_files: str, spark_history_dir: str, additional_python_modules: List[str] = ['alembic>=1.4', 'mako', 'promise', 's3fs>=2021.11.1', 'sqlalchemy>=1.4.0or<2.1'], aws_region: str = 'us-west-2') str#

Gets or creates an AWS Glue Job.

Parameters:
  • script_location (str) – S3 path of script that runs Spark job.

  • role (str) – ARN of IAM role to associate with Glue job.

  • temp_dir (str) – S3 path of scratch directory associated with job data and code.

  • extra_py_files (str) – Comma separated S3 file paths for additional Python code needed for job.

  • spark_history_dir (str) – S3 path where Spark event logs are stored.

  • additional_python_modules (List[str]) – Python modules to be installed with pip before job start.

  • aws_region (str) – AWS region in which to run the Glue job.

Returns:

Unique Glue job definition name.

Return type:

str

redun.executors.aws_glue.get_redun_lib_files() Iterator[str]#

Iterates through the files of the redun library.

redun.executors.aws_glue.get_redun_lib_scratch_file(s3_scratch_prefix: str, lib_hash: str) str#

Returns s3 scratch path for a code package tar file.

redun.executors.aws_glue.get_spark_history_dir(s3_scratch_prefix: str) str#

Returns s3 scratch path for Spark UI monitoring files.

redun.executors.aws_glue.glue_describe_jobs(job_ids: List[str], glue_job_name: str, aws_region: str = 'us-west-2') Iterator[Dict[str, Any]]#
redun.executors.aws_glue.package_redun_lib(s3_scratch_prefix: str) File#

Package redun lib to S3.

redun.executors.aws_glue.parse_job_error(s3_scratch_prefix: str, job: Job, glue_job_metadata: dict | None = None) Tuple[Exception, Traceback]#

Parse job error from s3 scratch path.

redun.executors.aws_glue.submit_glue_job(job: Job, a_task: Task, s3_scratch_prefix: str, glue_job_name: str, redun_zip_location: str, code_file: File, job_options: dict = {}, aws_region: str = 'us-west-2') Dict[str, Any]#

Submits a redun task to AWS glue.

Parameters:
  • job (Job) – The redun.scheduler.Job that is in the process of resolving the Task a_task.

  • a_task (Task) – The redun.task.Task to be run by the job.

  • s3_scratch_prefix (str) – Prefix for S3 url where job files, including additional Python and data files, will reside.

  • glue_job_name (str) – Name of Glue job definition to run.

  • redun_zip_location (str) – S3 path of zipped redun source code.

  • code_file (File) – Code to be executed as a redun workflow. Path within redun.file.File should be an S3 url.

  • job_options (Dict[str, Any]) –

    Option map to configure job initialization and execution.

    Required fields

    worker_type: str timeout: int workers: int

    Optional fields

    additional_libs: List[str] extra_py_files: List[str] extra_files: List[str]

  • aws_region (str) – AWS region in which to run the Glue job.

Returns:

Response contains JobRunId field.

Return type:

Dict[str, Any]

Raises:

ValueError – If worker_type is not a valid predefined Glue worker type string.

redun.executors.aws_utils module#

class redun.executors.aws_utils.JobStatus(all, pending, inflight, success, failure, stopped, timeout)#

Bases: NamedTuple

all: List[str]#

Alias for field number 0

failure: List[str]#

Alias for field number 4

inflight: List[str]#

Alias for field number 2

pending: List[str]#

Alias for field number 1

stopped: List[str]#

Alias for field number 5

success: List[str]#

Alias for field number 3

timeout: List[str]#

Alias for field number 6

redun.executors.aws_utils.copy_to_s3(file_path: str, s3_scratch_dir: str) str#

Copies a file to the S3 scratch directory if it is not already on S3. Returns the path to the file on S3.

redun.executors.aws_utils.get_aws_client(service: str, aws_region: str = 'us-west-2') Session#

Get an AWS Client with caching.

redun.executors.aws_utils.get_aws_env_vars() Dict[str, str]#

Determines the current AWS credentials.

redun.executors.aws_utils.get_aws_user(aws_region: str = 'us-west-2') str#

Returns the current AWS user.

redun.executors.aws_utils.get_default_region() str#

Returns the default AWS region.

redun.executors.aws_utils.get_simple_aws_user(aws_region: str = 'us-west-2') str#

Returns the current AWS user simplified.

By full AWS identify has the format:

arn:aws:sts::ACCOUNT_ID:USER_NAME arn:aws:sts::ACCOUNT_ID:assumed-role/ROLE_NAME/USER_NAME

This function will return USER_NAME as a simplification.

redun.executors.aws_utils.is_ec2_instance() bool#

Returns True if this process is running on an EC2 instance.

We use the presence of a link-local address as a sign we are on an EC2 instance.

redun.executors.aws_utils.iter_log_stream(log_group_name: str, log_stream: str, limit: int | None = None, reverse: bool = False, required: bool = True, aws_region: str = 'us-west-2') Iterator[dict]#

Iterate through the events of logStream.

redun.executors.base module#

class redun.executors.base.Executor(name: str, scheduler: Scheduler | None = None, config=None)#

Bases: object

Note that most Executors should Register themselves with the method register_executor, below.

log(*messages: Any, **kwargs) None#

Display log message through Scheduler.

scratch_root() str#
set_scheduler(scheduler: Scheduler) None#
start() None#
stop() None#
submit(job: Job) None#

Execute the provided job.

Implementations must provide results back to the scheduler by either calling done_job or reject_job.

submit_script(job: Job) None#

Execute the provided script job.

Implementations must provide results back to the scheduler by either calling done_job or reject_job.

exception redun.executors.base.ExecutorError#

Bases: Exception

redun.executors.base.get_executor_class(executor_name: str, required: bool = True) Type[Executor] | None#

Get an Executor by name from the executor registry.

Parameters:
  • executor_name (str) – Name of executor class to retrieve.

  • required (bool) – If True, raises error if executor is not registered. If False, None is returned for unknown executor name.

redun.executors.base.get_executor_from_config(executors_config: dict, executor_name: str) Executor#

Create and return the executor by name. Raise an error if it is not present.

redun.executors.base.get_executors_from_config(executors_config: dict) Iterator[Executor]#

Instantiate executors defined in an executors config section.

redun.executors.base.load_task_module(module_name: str, task_name: str) None#

Helper method that Executors may call to load a task’s module code.

Presently, the main benefit is this method provides better error handling.

Parameters:
  • module_name – Task module name.

  • task_name – Task name

redun.executors.base.register_executor(executor_name: str) Callable#

Register an Executor class to be used by the scheduler.

Note that registered classes are responsible for ensuring their modules get loaded, so this decorator is actually run. For example, this can be done by adding them to the redun module __init__.py.

redun.executors.code_packaging module#

redun.executors.code_packaging.create_tar(tar_path: str, file_paths: Iterable[str], arcname_prefix: str | None = None) File#

Create a tar file from local file paths. :param arcname_prefix: prefix to add to each file path in the tar file.

redun.executors.code_packaging.create_zip(zip_path: str, base_path: str, file_paths: Iterable[str], arcname_prefix: str | None = None) File#

Create a zip file from local file paths. :param arcname_prefix: prefix to add to each file path in the tar file.

redun.executors.code_packaging.extract_tar(tar_file: File, dest_dir: str = '.') None#

Extract a tar file to local paths.

redun.executors.code_packaging.find_code_files(basedir: str = '.', includes: List[str] | None = None, excludes: List[str] | None = None) Iterable[str]#

Find all the workflow code files consistent with the include/exclude patterns.

redun.executors.code_packaging.package_code(scratch_prefix: str, code_package: dict = {}, use_zip: bool = False, basename: str | None = None, arcname_prefix: str | None = None) File#

Package code to scratch directory. :param basename: If provided, uses this string as the basename instead of the

calculated tarball hash.

Parameters:

arcname_prefix – Optional suffix to append to tarball basename.

redun.executors.code_packaging.parse_code_package_config(config) dict | bool#

Parse the code package options from a AWSBatchExecutor config.

redun.executors.command module#

redun.executors.command.get_oneshot_command(scratch_prefix: str, job: Job, a_task: Task, args: Tuple = (), kwargs: Dict[str, Any] = {}, job_options: dict = {}, code_file: File | None = None, array_uuid: str | None = None, input_path: str | None = None, output_path: str | None = None, error_path: str | None = None) List[str]#

Returns a redun oneshot command for a Job.

redun.executors.command.get_script_task_command(scratch_prefix: str, job: Job, command: str, exit_command: str = '', as_mount: bool = False) List[str]#

Returns a shell script to run a script task.

redun.executors.docker module#

exception redun.executors.docker.DockerError#

Bases: Exception

class redun.executors.docker.DockerExecutor(name: str, scheduler: Scheduler | None = None, config: SectionProxy | None = None)#

Bases: Executor

A redun Executor for running jobs on local Docker containers.

scratch_root() str#
set_scheduler(scheduler: Scheduler) None#
stop() None#

Stop Executor and monitoring thread.

submit(job: Job) None#

Submit Job to executor.

submit_script(job: Job) None#

Submit Job for script task to executor.

redun.executors.docker.get_docker_executor_config(config: SectionProxy) SectionProxy#

Returns a config for DockerExecutor.

redun.executors.docker.get_docker_job_options(job_options: dict, scratch_path: str) dict#

Returns Docker-specific job options from general job options.

Adds the scratch_path as a volume mount.

redun.executors.docker.iter_job_status(scratch_prefix: str, job_id2job: Dict[str, Job]) Iterator[dict]#

Returns local Docker jobs grouped by their status.

redun.executors.docker.run_docker(command: List[str], image: str, volumes: Iterable[Tuple[str, str]] = [], interactive: bool = True, cleanup: bool = False, memory: int = 4, vcpus: int = 1, gpus: int = 0, shared_memory: int | None = None, include_aws_env: bool = False) str#

Run a Docker container locally.

Parameters:
  • command (List[str]) – A shell command to run within the docker container (e.g. [“ls” “-la”]).

  • image (str) – A Docker image.

  • volumes (Iterable[Tuple[str, srt]]) – A list of (‘host’, ‘container’) path pairs for volume mounting.

  • interactive (bool) – If True, the Docker container is run in interactive mode.

  • cleanup (bool) – If True, remove the container after execution.

  • memory (int) – Number of GB of memory to reserve for the container.

  • vcpus (int) – Number of CPUs to reserve for the container.

  • gpus (int) – Number of GPUs to reserve for the container.

  • shared_memory (Optional[int]) – Number of GB of shared memory to reserve for the container.

  • include_aws_env (bool) – If True, forward AWS environment variables to the container.

redun.executors.docker.submit_command(image: str, scratch_prefix: str, job: Job, command: str, job_options: dict = {}, include_aws_env: bool = False) dict#

Submit a shell command to Docker.

redun.executors.docker.submit_task(image: str, scratch_prefix: str, job: Job, a_task: Task, args: Tuple = (), kwargs: Dict[str, Any] = {}, job_options: dict = {}, code_file: File | None = None, include_aws_env: bool = False) Dict[str, Any]#

Submit a redun Task to Docker.

redun.executors.gcp_batch module#

redun.executors.gcp_utils module#

redun.executors.k8s module#

redun.executors.k8s_utils module#

redun.executors.launch module#

redun.executors.launch.launch_script(config: Config, script_command: List[str], executor: Executor | None = None, executor_name: str | None = None, task_options: dict | None = None, execution_id: str | None = None) None#

Submit the provided script command to the executor, then exit.

Use a local scheduler with the default config. This means we won’t record the entry point, but we have no intention of being around long enough to record the results, so there’s not much point.

WARNING: This won’t actually work on all executor types, such as the local ones. To work, the executor needs to be “fire and forget” for submit_script.

Parameters:
  • config (Config) – Config object containing executor information.

  • script_command (List[str]) – The script command to run on the executor.

  • executor (Optional[Executor]) – Optional Executor on which to run the script. Must be set if executor_name is not set.

  • executor_name (Optional[str]) – Name of the executor in the config. Must be set if executor is not set.

  • task_options (Optional[dict]) – Task options to pass to the script task.

  • execution_id (Optional[str]) – If provided, use this execution id. This is only relevant for tagging.

Return type:

None

redun.executors.local module#

class redun.executors.local.LocalExecutor(name: str, scheduler: Scheduler | None = None, config=None, mode: str = 'thread')#

Bases: Executor

A redun Executor for running jobs locally using a thread or process pool.

DEFAULT_START_METHOD = 'forkserver'#
MODES = ['thread', 'process']#
START_METHODS = ['fork', 'spawn', 'forkserver']#
scratch_root() str#
stop() None#

Stop Executor pools.

submit(job: Job) None#

Execute the provided job.

Implementations must provide results back to the scheduler by either calling done_job or reject_job.

submit_script(job: Job) None#

Execute the provided script job.

Implementations must provide results back to the scheduler by either calling done_job or reject_job.

redun.executors.local.exec_script_task(mode: str, module_name: str, task_fullname: str, args: Tuple, kwargs: dict) bytes#

Execute a script task from the task registry.

redun.executors.local.exec_task(mode: str, module_name: str, task_fullname: str, args: Tuple, kwargs: dict) Any#

Execute a task in the new process.

redun.executors.scratch module#

Utilities for working with executor scratch directories.

exception redun.executors.scratch.ExceptionNotFoundError#

Bases: ScratchError

Error when serialized exception is not found in scratch directory.

exception redun.executors.scratch.ScratchError#

Bases: Exception

Error when reading data from scratch directory.

redun.executors.scratch.get_array_scratch_file(scratch_prefix: str, job_array_id: str, filename: str) str#

Returns an scratch path for a file related to an array of jobs.

redun.executors.scratch.get_code_scratch_file(scratch_prefix: str, tar_hash: str, use_zip: bool = False) str#

Returns scratch path for a code package tar file.

redun.executors.scratch.get_execution_scratch_file(scratch_prefix: str, execution_id: str, filename: str) str#

Returns an scratch path for a sending data to and from a remote execution.

redun.executors.scratch.get_job_scratch_dir(scratch_prefix: str, job: Job) str#

Returns scratch directory for a redun Job.

redun.executors.scratch.get_job_scratch_file(scratch_prefix: str, job: Job, filename: str) str#

Returns scratch path for a file related to a redun Job.

redun.executors.scratch.parse_job_error(scratch_prefix: str, job: Job) Tuple[Exception, Traceback]#

Returns job error from scratch directory.

redun.executors.scratch.parse_job_result(scratch_prefix: str, job: Job, is_valid_value: Callable[[Any], bool] | None = None) Tuple[Any, bool]#

Returns job output from scratch directory.

Returns a tuple of (result, exists).

Module contents#