redun.backends package

Subpackages

Submodules

redun.backends.base module

class redun.backends.base.RedunBackend(type_registry: TypeRegistry | None = None)

Bases: ABC

A Backend manages the CallGraph (provenance) and cache for a Scheduler.

This is the abstract backend interface. See db/__init__.py for example of a concrete interface for persisting data in a database.

abstract advance_handle(handles: List[Handle], child_handle: Handle)

Record parent-child relationships between Handles.

abstract check_cache(task_hash: str, args_hash: str, eval_hash: str, execution_id: str, scheduler_task_hashes: Set[str], cache_scope: CacheScope, check_valid: CacheCheckValid, context_hash: str | None = None, allowed_cache_results: Set[CacheResult] | None = None) Tuple[Any, str | None, CacheResult]

Check cache for recorded values.

This methods checks for several types of cached values when performing graph reduction on the Expression Graph.

  1. Common Subexpression Elimination (CSE): If there is a previously recorded Job in the same Execution, execution_id, that is equivalent (same eval_hash), its final result is replayed. This check is always performed first.

  2. Evaluation cache: When check_valid=”CacheCheckValid.FULL”, previous single reduction evaluations are consulted by their `eval_hash. If a match is found, the resulting single reduction is replayed. This is the most common cache method.

  3. Call cache: When check_valid=”CacheCheckValid.SHALLOW”, previous current CallNodes are consulted by their eval_hash (i.e. task_hash and args_hash). If a match is found, its final result is replayed. A CallNode is considered current only if its Task and all tasks for child CallNodes are current, as defined by having their task_hash in the provided set, scheduler_task_hashes. This cache method is typically used as an optimization, where the validity of all intermediate arguments and results within call subtree do not need to be checked (hence checking is “shallow”).

Parameters:
  • task_hash (str) – Hash of Task used in the call.

  • args_hash (str) – Hash of all arguments used in the call.

  • eval_hash (str) – A hash of the combination of the task_hash and args_hash.

  • execution_id (str) – ID of the current Execution. This is used to perform the CSE check.

  • scheduler_task_hashes (Set[str]) – The set of task hashes known to the scheduler, used for checking the Call cache.

  • cache_scope (CacheScope) – What scope of cache hits to try. CacheScopeType.CSE only allows CSE, and CacheScopeType.NONE disables all caching.

  • check_valid (CacheCheckValid) – If set to CacheCheckValid.FULL perform Evaluation cache check or if CacheCheckValid.SHALLOW perform Call cache check. See above for more details.

  • context_hash (Optional[str]) – If given, restrict cache results to results obtained with a particular context.

  • allowed_cache_results (Optional[Set[CacheResult]]) – If provided, further restrict the allowed types of results.

Returns:

(result, call_hash, cache_type)result is the cached result, or None if no result was found. call_hash is the hash of the CallNode used if CSE or Call cache methods are used. cache_type specifies which cache method was used (CSE, SINGLE, ULTIMATE) or MISS if no cached result is found.

Return type:

Tuple[Any, Optional[str], CacheResult]

abstract delete_tags(entity_id: str, tags: Iterable[Tuple[str, Any]], keys: Iterable[str] = ()) List[Tuple[str, str, str, Any]]

Delete tags.

abstract explain_cache_miss(task: BaseTask, args_hash: str) Dict[str, Any] | None

Determine the reason for a cache miss.

abstract get_job(job_id: str) dict | None

Returns details for a Job.

abstract get_records(ids: Iterable[str], sorted=True) Iterable[dict]

Returns serialized records for the given ids.

Parameters:
  • ids (Iterable[str]) – Iterable of record ids to fetch serialized records.

  • sorted (bool) – If True, return records in the same order as the ids (Default: True).

abstract get_tags(entity_ids: List[str]) Dict[str, MultiMap[str, Any]]

Get the tags of an entity (Execution, Job, CallNode, Task, Value).

abstract get_value(value_hash: str) Any

Returns a Value from the datastore using the value content address (value_hash).

Parameters:

value_hash (str) – Hash of Value to fetch from ValueStore.

Returns:

result, is_cached – Returns the result value and is_cached=True if the value is in the ValueStore, otherwise returns (None, False).

Return type:

Tuple[Any, bool]

abstract is_valid_handle(handle: Handle) bool

A handle is valid if it current or ancestral to the current handle.

abstract iter_record_ids(root_ids: Iterable[str]) Iterator[str]

Iterate the record ids of descendants of root_ids.

abstract load(migrate: bool | None = None) None

Load the backend for use.

Parameters:

migrate (Optional[bool]) – If None, defer to automigration config options. If True, perform migration after establishing database connection.

abstract put_records(records: Iterable[dict]) int

Writes records to the database and returns number of new records written.

abstract record_call_node(task_name: str, task_hash: str, args_hash: str, expr_args: Tuple[Tuple, dict], eval_args: Tuple[Tuple, dict], result_hash: str, child_call_hashes: List[str]) str

Record a completed CallNode.

Parameters:
  • task_name (str) – Fullname (with namespace) of task.

  • task_hash (str) – Hash of task.

  • args_hash (str) – Hash of all arguments of the call.

  • expr_args (Tuple[Tuple, dict]) – Original expressions for the task arguments. These expressions are used to record the full upstream dataflow.

  • eval_args (Tuple[Tuple, dict]) – The fully evaluated arguments of the task arguments.

  • result_hash (str) – Hash of the result value of the call.

  • child_call_hashes (List[str]) – call_hashes of any child task calls.

Returns:

The call_hash of the new CallNode.

Return type:

str

abstract record_call_node_context(call_hash: str, context_hash: str | None, context: dict) str

Records a context dict for a CallNode as a Tag.

abstract record_execution(exec_id, args: List[str]) None

Records an Execution to the backend.

Parameters:
  • exec_id (str) – The id of the execution.

  • args (List[str]) – Arguments used on the command line to start Execution.

Returns:

UUID of new Execution.

Return type:

str

abstract record_job_end(job: Job, now: datetime | None = None, status: str | None = None) None

Records the end of a Job.

Create the job if needed, in which case the job will be recorded with start_time==end_time

abstract record_job_start(job: Job, now: datetime | None = None) Any

Records the start of a new Job.

abstract record_tags(entity_type: TagEntity, entity_id: str, tags: Iterable[Tuple[str, Any]], parents: Iterable = (), update: bool = False, new: bool = False)

Record tags for an entity (Execution, Job, CallNode, Task, Value).

Parameters:
  • entity_type (TagEntity) – The type of the tagged entity (Execution, Job, etc).

  • entity_id (str) – The id of the tagged entity.

  • tags (Iterable[KeyValue]) – An iterable of key-value pairs to create as tags.

  • parents (Iterable[str]) – Ids of tags to be superseded by the new tags.

  • update (bool) – If True, automatically supersede any existing tags with keys matching those in tags. This also implies new=True.

  • new (bool) – If True, force tags to be current.

Returns:

[(tag_hash, entity_id, key, value)] – Returns a list of the created tags.

Return type:

List[Tuple[str, str, str, Any]]

abstract record_updated_time() None

Updates updated_time (heartbeat) timestamp for the current Execution.

abstract record_value(value: Any, data: bytes | None = None) str

Record a Value into the backend.

Parameters:
  • value (Any) – A value to record.

  • data (Optional[bytes]) – Byte stream to record. If not given, usual value serialization is used.

Returns:

value_hash of recorded value.

Return type:

str

abstract rollback_handle(handle: Handle) None

Rollback all descendant handles.

abstract set_eval_cache(eval_hash: str, task_hash: str, args_hash: str, value: Any, value_hash: str | None = None) None

Sets a new value in the Evaluation cache.

Parameters:
  • eval_hash (str) – A hash of the combination of the task_hash and args_hash.

  • task_hash (str) – Hash of Task used in the call.

  • args_hash (str) – Hash of all arguments used in the call.

  • value (Any) – Value to record in cache.

  • value_hash (str) – Hash of value to record in cache.

abstract update_tags(entity_type: TagEntity, entity_id: str, old_keys: Iterable[str], new_tags: Iterable[Tuple[str, Any]]) List[Tuple[str, str, str, Any]]

Update tags.

class redun.backends.base.TagEntity(value)

Bases: Enum

Entity types for a CallGraph Tag.

CallNode = 'CallNode'
Execution = 'Execution'
Job = 'Job'
Null = 'Null'
Task = 'Task'
Value = 'Value'

redun.backends.value_store module

class redun.backends.value_store.ValueStore(root_path: str, use_subdir: bool = True)

Bases: object

Stores redun Values on filesystem (local or remote).

The value store can be configured within the [backend] section using the value_store_path and value_store_min_size variables.

Values in this store are organized similarly to git objects:

<value_store_path>/ab/cdef… for value with hash abcdef…

get(value_hash: str) Tuple[bytes, bool]

Retrieve Value data.

get_value_path(value_hash: str) str

Return the path for a value.

has(value_hash: str) bool

Return True if value is in store.

put(value_hash: str, data: bytes) None

Store Value data.

size(value_hash: str) int

Returns the size in bytes of a Value.

Module contents