redun.backends package#

Subpackages#

Submodules#

redun.backends.base module#

class redun.backends.base.RedunBackend(type_registry: TypeRegistry | None = None)#

Bases: ABC

A Backend manages the CallGraph (provenance) and cache for a Scheduler.

This is the abstract backend interface. See db/__init__.py for example of a concrete interface for persisting data in a database.

advance_handle(handles: List[Handle], child_handle: Handle)#

Record parent-child relationships between Handles.

check_cache(task_hash: str, args_hash: str, eval_hash: str, execution_id: str, scheduler_task_hashes: Set[str], cache_scope: CacheScope, check_valid: CacheCheckValid, context_hash: str | None = None, allowed_cache_results: Set[CacheResult] | None = None) Tuple[Any, str | None, CacheResult]#

Check cache for recorded values.

This methods checks for several types of cached values when performing graph reduction on the Expression Graph.

  1. Common Subexpression Elimination (CSE): If there is a previously recorded Job in the same Execution, execution_id, that is equivalent (same eval_hash), its final result is replayed. This check is always performed first.

  2. Evaluation cache: When check_valid=”CacheCheckValid.FULL”, previous single reduction evaluations are consulted by their `eval_hash. If a match is found, the resulting single reduction is replayed. This is the most common cache method.

  3. Call cache: When check_valid=”CacheCheckValid.SHALLOW”, previous current CallNodes are consulted by their eval_hash (i.e. task_hash and args_hash). If a match is found, its final result is replayed. A CallNode is considered current only if its Task and all tasks for child CallNodes are current, as defined by having their task_hash in the provided set, scheduler_task_hashes. This cache method is typically used as an optimization, where the validity of all intermediate arguments and results within call subtree do not need to be checked (hence checking is “shallow”).

Parameters:
  • task_hash (str) – Hash of Task used in the call.

  • args_hash (str) – Hash of all arguments used in the call.

  • eval_hash (str) – A hash of the combination of the task_hash and args_hash.

  • execution_id (str) – ID of the current Execution. This is used to perform the CSE check.

  • scheduler_task_hashes (Set[str]) – The set of task hashes known to the scheduler, used for checking the Call cache.

  • cache_scope (CacheScope) – What scope of cache hits to try. CacheScopeType.CSE only allows CSE, and CacheScopeType.NONE disables all caching.

  • check_valid (CacheCheckValid) – If set to CacheCheckValid.FULL perform Evaluation cache check or if CacheCheckValid.SHALLOW perform Call cache check. See above for more details.

  • context_hash (Optional[str]) – If given, restrict cache results to results obtained with a particular context.

  • allowed_cache_results (Optional[Set[CacheResult]]) – If provided, further restrict the allowed types of results.

Returns:

(result, call_hash, cache_type)result is the cached result, or None if no result was found. call_hash is the hash of the CallNode used if CSE or Call cache methods are used. cache_type specifies which cache method was used (CSE, SINGLE, ULTIMATE) or MISS if no cached result is found.

Return type:

Tuple[Any, Optional[str], CacheResult]

delete_tags(entity_id: str, tags: Iterable[Tuple[str, Any]], keys: Iterable[str] = ()) List[Tuple[str, str, str, Any]]#

Delete tags.

explain_cache_miss(task: BaseTask, args_hash: str) Dict[str, Any] | None#

Determine the reason for a cache miss.

get_job(job_id: str) dict | None#

Returns details for a Job.

get_records(ids: Iterable[str], sorted=True) Iterable[dict]#

Returns serialized records for the given ids.

Parameters:
  • ids (Iterable[str]) – Iterable of record ids to fetch serialized records.

  • sorted (bool) – If True, return records in the same order as the ids (Default: True).

get_tags(entity_ids: List[str]) Dict[str, MultiMap[str, Any]]#

Get the tags of an entity (Execution, Job, CallNode, Task, Value).

get_value(value_hash: str) Any#

Returns a Value from the datastore using the value content address (value_hash).

Parameters:

value_hash (str) – Hash of Value to fetch from ValueStore.

Returns:

result, is_cached – Returns the result value and is_cached=True if the value is in the ValueStore, otherwise returns (None, False).

Return type:

Tuple[Any, bool]

is_valid_handle(handle: Handle) bool#

A handle is valid if it current or ancestral to the current handle.

iter_record_ids(root_ids: Iterable[str]) Iterator[str]#

Iterate the record ids of descendants of root_ids.

load(migrate: bool | None = None) None#

Load the backend for use.

Parameters:

migrate (Optional[bool]) – If None, defer to automigration config options. If True, perform migration after establishing database connection.

put_records(records: Iterable[dict]) int#

Writes records to the database and returns number of new records written.

record_call_node(task_name: str, task_hash: str, args_hash: str, expr_args: Tuple[Tuple, dict], eval_args: Tuple[Tuple, dict], result_hash: str, child_call_hashes: List[str]) str#

Record a completed CallNode.

Parameters:
  • task_name (str) – Fullname (with namespace) of task.

  • task_hash (str) – Hash of task.

  • args_hash (str) – Hash of all arguments of the call.

  • expr_args (Tuple[Tuple, dict]) – Original expressions for the task arguments. These expressions are used to record the full upstream dataflow.

  • eval_args (Tuple[Tuple, dict]) – The fully evaluated arguments of the task arguments.

  • result_hash (str) – Hash of the result value of the call.

  • child_call_hashes (List[str]) – call_hashes of any child task calls.

Returns:

The call_hash of the new CallNode.

Return type:

str

record_call_node_context(call_hash: str, context_hash: str | None, context: dict) str#

Records a context dict for a CallNode as a Tag.

record_execution(exec_id, args: List[str]) None#

Records an Execution to the backend.

Parameters:
  • exec_id (str) – The id of the execution.

  • args (List[str]) – Arguments used on the command line to start Execution.

Returns:

UUID of new Execution.

Return type:

str

record_job_end(job: Job, now: datetime | None = None, status: str | None = None) None#

Records the end of a Job.

Create the job if needed, in which case the job will be recorded with start_time==end_time

record_job_start(job: Job, now: datetime | None = None) Any#

Records the start of a new Job.

record_tags(entity_type: TagEntity, entity_id: str, tags: Iterable[Tuple[str, Any]], parents: Iterable = (), update: bool = False, new: bool = False)#

Record tags for an entity (Execution, Job, CallNode, Task, Value).

Parameters:
  • entity_type (TagEntity) – The type of the tagged entity (Execution, Job, etc).

  • entity_id (str) – The id of the tagged entity.

  • tags (Iterable[KeyValue]) – An iterable of key-value pairs to create as tags.

  • parents (Iterable[str]) – Ids of tags to be superseded by the new tags.

  • update (bool) – If True, automatically supersede any existing tags with keys matching those in tags. This also implies new=True.

  • new (bool) – If True, force tags to be current.

Returns:

[(tag_hash, entity_id, key, value)] – Returns a list of the created tags.

Return type:

List[Tuple[str, str, str, Any]]

record_value(value: Any, data: bytes | None = None) str#

Record a Value into the backend.

Parameters:
  • value (Any) – A value to record.

  • data (Optional[bytes]) – Byte stream to record. If not given, usual value serialization is used.

Returns:

value_hash of recorded value.

Return type:

str

rollback_handle(handle: Handle) None#

Rollback all descendant handles.

set_eval_cache(eval_hash: str, task_hash: str, args_hash: str, value: Any, value_hash: str | None = None) None#

Sets a new value in the Evaluation cache.

Parameters:
  • eval_hash (str) – A hash of the combination of the task_hash and args_hash.

  • task_hash (str) – Hash of Task used in the call.

  • args_hash (str) – Hash of all arguments used in the call.

  • value (Any) – Value to record in cache.

  • value_hash (str) – Hash of value to record in cache.

update_tags(entity_type: TagEntity, entity_id: str, old_keys: Iterable[str], new_tags: Iterable[Tuple[str, Any]]) List[Tuple[str, str, str, Any]]#

Update tags.

class redun.backends.base.TagEntity(value)#

Bases: Enum

Entity types for a CallGraph Tag.

CallNode = 'CallNode'#
Execution = 'Execution'#
Job = 'Job'#
Null = 'Null'#
Task = 'Task'#
Value = 'Value'#

redun.backends.value_store module#

class redun.backends.value_store.ValueStore(root_path: str, use_subdir: bool = True)#

Bases: object

Stores redun Values on filesystem (local or remote).

The value store can be configured within the [backend] section using the value_store_path and value_store_min_size variables.

Values in this store are organized similarly to git objects:

<value_store_path>/ab/cdef… for value with hash abcdef…

get(value_hash: str) Tuple[bytes, bool]#

Retrieve Value data.

get_value_path(value_hash: str) str#

Return the path for a value.

has(value_hash: str) bool#

Return True if value is in store.

put(value_hash: str, data: bytes) None#

Store Value data.

size(value_hash: str) int#

Returns the size in bytes of a Value.

Module contents#