redun.backends package¶
Subpackages¶
- redun.backends.db package
- Submodules
- redun.backends.db.dataflow module
ArgumentValue
CallNodeValue
DataflowArg
DataflowAssign
DataflowEdge
DataflowRouting
DataflowSectionDOM
DataflowSectionKind
DataflowVars
display_call_node()
display_dataflow()
display_hash()
display_node()
display_section()
display_value()
get_callnode_arguments()
get_dataflow_call_node()
get_default_arg_name()
get_node_hash()
get_section_edge_type()
get_task_args()
is_internal_task()
iter_dataflow_sections()
iter_subsections()
iter_unique()
make_data_section_dom()
make_dataflow_dom()
make_section_dom()
make_var_name()
rewrite_call_node_merges()
serialize_dataflow()
serialize_section()
toposort_edges()
walk_dataflow()
walk_dataflow_argument_value()
walk_dataflow_callnode()
walk_dataflow_callnode_value()
walk_dataflow_node()
walk_dataflow_value()
- redun.backends.db.query module
CallGraphQuery
CallGraphQuery.ExecTag
CallGraphQuery.MODEL_CLASSES
CallGraphQuery.MODEL_NAMES
CallGraphQuery.MODEL_PKS
CallGraphQuery.all()
CallGraphQuery.build()
CallGraphQuery.clone()
CallGraphQuery.count()
CallGraphQuery.empty()
CallGraphQuery.filter_arguments()
CallGraphQuery.filter_execution_ids()
CallGraphQuery.filter_execution_statuses()
CallGraphQuery.filter_execution_tags()
CallGraphQuery.filter_file_paths()
CallGraphQuery.filter_ids()
CallGraphQuery.filter_job_statuses()
CallGraphQuery.filter_results()
CallGraphQuery.filter_tags()
CallGraphQuery.filter_task_hashes()
CallGraphQuery.filter_task_names()
CallGraphQuery.filter_types()
CallGraphQuery.filter_value_types()
CallGraphQuery.first()
CallGraphQuery.like_id()
CallGraphQuery.limit()
CallGraphQuery.one()
CallGraphQuery.one_or_none()
CallGraphQuery.order_by()
CallGraphQuery.page()
CallGraphQuery.select()
CallGraphQuery.subqueries
find_file()
infer_id()
infer_specialty_id()
parse_callgraph_query()
setup_query_parser()
- redun.backends.db.serializers module
- Module contents
Argument
ArgumentResult
CallEdge
CallNode
CallNode.arg_results
CallNode.args_display
CallNode.args_hash
CallNode.arguments
CallNode.call_hash
CallNode.call_hash_idx
CallNode.child_edges
CallNode.children
CallNode.downstream
CallNode.jobs
CallNode.parent_edges
CallNode.parents
CallNode.tags
CallNode.task
CallNode.task_hash
CallNode.task_name
CallNode.task_set
CallNode.timestamp
CallNode.value
CallNode.value_hash
CallNode.value_parsed
CallSubtreeTask
Column
DBVersionInfo
DateTimeUTC
Evaluation
Execution
File
Handle
HandleEdge
JSON
Job
PreviewValue
RedunBackendDb
RedunBackendDb.advance_handle()
RedunBackendDb.check_cache()
RedunBackendDb.clone()
RedunBackendDb.create_engine()
RedunBackendDb.delete_tags()
RedunBackendDb.explain_cache_miss()
RedunBackendDb.get_all_db_versions()
RedunBackendDb.get_call_cache()
RedunBackendDb.get_child_record_ids()
RedunBackendDb.get_db_version()
RedunBackendDb.get_db_version_required()
RedunBackendDb.get_eval_cache()
RedunBackendDb.get_job()
RedunBackendDb.get_records()
RedunBackendDb.get_tags()
RedunBackendDb.get_value()
RedunBackendDb.has_records()
RedunBackendDb.is_db_compatible()
RedunBackendDb.is_valid_handle()
RedunBackendDb.iter_record_ids()
RedunBackendDb.load()
RedunBackendDb.migrate()
RedunBackendDb.put_records()
RedunBackendDb.record_call_node()
RedunBackendDb.record_call_node_context()
RedunBackendDb.record_execution()
RedunBackendDb.record_job_end()
RedunBackendDb.record_job_start()
RedunBackendDb.record_tags()
RedunBackendDb.record_updated_time()
RedunBackendDb.record_value()
RedunBackendDb.rollback_handle()
RedunBackendDb.set_eval_cache()
RedunBackendDb.update_tags()
RedunDatabaseError
RedunMigration
RedunSession
RedunVersion
RedunVersionError
Subvalue
Tag
TagEdit
Task
Value
Value.arguments
Value.child_edges
Value.children
Value.evals
Value.file
Value.format
Value.handle
Value.in_value_store
Value.parent_edges
Value.parents
Value.preview
Value.results
Value.subfiles
Value.subhandles
Value.tags
Value.task
Value.type
Value.value
Value.value_hash
Value.value_hash_idx
Value.value_parsed
get_abs_path()
get_call_node_child_edges()
get_execution_child_edges()
get_job_child_edges()
get_tag_child_edges()
get_tag_entity_child_edges()
get_value_child_edges()
parse_db_version()
Submodules¶
redun.backends.base module¶
- class redun.backends.base.RedunBackend(type_registry: TypeRegistry | None = None)¶
Bases:
ABC
A Backend manages the CallGraph (provenance) and cache for a Scheduler.
This is the abstract backend interface. See db/__init__.py for example of a concrete interface for persisting data in a database.
- abstract advance_handle(handles: List[Handle], child_handle: Handle)¶
Record parent-child relationships between Handles.
- abstract check_cache(task_hash: str, args_hash: str, eval_hash: str, execution_id: str, scheduler_task_hashes: Set[str], cache_scope: CacheScope, check_valid: CacheCheckValid, context_hash: str | None = None, allowed_cache_results: Set[CacheResult] | None = None) Tuple[Any, str | None, CacheResult] ¶
Check cache for recorded values.
This methods checks for several types of cached values when performing graph reduction on the Expression Graph.
Common Subexpression Elimination (CSE): If there is a previously recorded Job in the same Execution, execution_id, that is equivalent (same eval_hash), its final result is replayed. This check is always performed first.
Evaluation cache: When check_valid=”CacheCheckValid.FULL”, previous single reduction evaluations are consulted by their `eval_hash. If a match is found, the resulting single reduction is replayed. This is the most common cache method.
Call cache: When check_valid=”CacheCheckValid.SHALLOW”, previous current CallNodes are consulted by their eval_hash (i.e. task_hash and args_hash). If a match is found, its final result is replayed. A CallNode is considered current only if its Task and all tasks for child CallNodes are current, as defined by having their task_hash in the provided set, scheduler_task_hashes. This cache method is typically used as an optimization, where the validity of all intermediate arguments and results within call subtree do not need to be checked (hence checking is “shallow”).
- Parameters:
task_hash (str) – Hash of Task used in the call.
args_hash (str) – Hash of all arguments used in the call.
eval_hash (str) – A hash of the combination of the task_hash and args_hash.
execution_id (str) – ID of the current Execution. This is used to perform the CSE check.
scheduler_task_hashes (Set[str]) – The set of task hashes known to the scheduler, used for checking the Call cache.
cache_scope (CacheScope) – What scope of cache hits to try. CacheScopeType.CSE only allows CSE, and CacheScopeType.NONE disables all caching.
check_valid (CacheCheckValid) – If set to CacheCheckValid.FULL perform Evaluation cache check or if CacheCheckValid.SHALLOW perform Call cache check. See above for more details.
context_hash (Optional[str]) – If given, restrict cache results to results obtained with a particular context.
allowed_cache_results (Optional[Set[CacheResult]]) – If provided, further restrict the allowed types of results.
- Returns:
(result, call_hash, cache_type) – result is the cached result, or None if no result was found. call_hash is the hash of the CallNode used if CSE or Call cache methods are used. cache_type specifies which cache method was used (CSE, SINGLE, ULTIMATE) or MISS if no cached result is found.
- Return type:
Tuple[Any, Optional[str], CacheResult]
- abstract delete_tags(entity_id: str, tags: Iterable[Tuple[str, Any]], keys: Iterable[str] = ()) List[Tuple[str, str, str, Any]] ¶
Delete tags.
- abstract explain_cache_miss(task: BaseTask, args_hash: str) Dict[str, Any] | None ¶
Determine the reason for a cache miss.
- abstract get_job(job_id: str) dict | None ¶
Returns details for a Job.
- abstract get_records(ids: Iterable[str], sorted=True) Iterable[dict] ¶
Returns serialized records for the given ids.
- Parameters:
ids (Iterable[str]) – Iterable of record ids to fetch serialized records.
sorted (bool) – If True, return records in the same order as the ids (Default: True).
- abstract get_tags(entity_ids: List[str]) Dict[str, MultiMap[str, Any]] ¶
Get the tags of an entity (Execution, Job, CallNode, Task, Value).
- abstract get_value(value_hash: str) Any ¶
Returns a Value from the datastore using the value content address (value_hash).
- Parameters:
value_hash (str) – Hash of Value to fetch from ValueStore.
- Returns:
result, is_cached – Returns the result value and is_cached=True if the value is in the ValueStore, otherwise returns (None, False).
- Return type:
Tuple[Any, bool]
- abstract is_valid_handle(handle: Handle) bool ¶
A handle is valid if it current or ancestral to the current handle.
- abstract iter_record_ids(root_ids: Iterable[str]) Iterator[str] ¶
Iterate the record ids of descendants of root_ids.
- abstract load(migrate: bool | None = None) None ¶
Load the backend for use.
- Parameters:
migrate (Optional[bool]) – If None, defer to automigration config options. If True, perform migration after establishing database connection.
- abstract put_records(records: Iterable[dict]) int ¶
Writes records to the database and returns number of new records written.
- abstract record_call_node(task_name: str, task_hash: str, args_hash: str, expr_args: Tuple[Tuple, dict], eval_args: Tuple[Tuple, dict], result_hash: str, child_call_hashes: List[str]) str ¶
Record a completed CallNode.
- Parameters:
task_name (str) – Fullname (with namespace) of task.
task_hash (str) – Hash of task.
args_hash (str) – Hash of all arguments of the call.
expr_args (Tuple[Tuple, dict]) – Original expressions for the task arguments. These expressions are used to record the full upstream dataflow.
eval_args (Tuple[Tuple, dict]) – The fully evaluated arguments of the task arguments.
result_hash (str) – Hash of the result value of the call.
child_call_hashes (List[str]) – call_hashes of any child task calls.
- Returns:
The call_hash of the new CallNode.
- Return type:
str
- abstract record_call_node_context(call_hash: str, context_hash: str | None, context: dict) str ¶
Records a context dict for a CallNode as a Tag.
- abstract record_execution(exec_id, args: List[str]) None ¶
Records an Execution to the backend.
- Parameters:
exec_id (str) – The id of the execution.
args (List[str]) – Arguments used on the command line to start Execution.
- Returns:
UUID of new Execution.
- Return type:
str
- abstract record_job_end(job: Job, now: datetime | None = None, status: str | None = None) None ¶
Records the end of a Job.
Create the job if needed, in which case the job will be recorded with start_time==end_time
- abstract record_job_start(job: Job, now: datetime | None = None) Any ¶
Records the start of a new Job.
- abstract record_tags(entity_type: TagEntity, entity_id: str, tags: Iterable[Tuple[str, Any]], parents: Iterable = (), update: bool = False, new: bool = False)¶
Record tags for an entity (Execution, Job, CallNode, Task, Value).
- Parameters:
entity_type (TagEntity) – The type of the tagged entity (Execution, Job, etc).
entity_id (str) – The id of the tagged entity.
tags (Iterable[KeyValue]) – An iterable of key-value pairs to create as tags.
parents (Iterable[str]) – Ids of tags to be superseded by the new tags.
update (bool) – If True, automatically supersede any existing tags with keys matching those in tags. This also implies new=True.
new (bool) – If True, force tags to be current.
- Returns:
[(tag_hash, entity_id, key, value)] – Returns a list of the created tags.
- Return type:
List[Tuple[str, str, str, Any]]
- abstract record_updated_time() None ¶
Updates updated_time (heartbeat) timestamp for the current Execution.
- abstract record_value(value: Any, data: bytes | None = None) str ¶
Record a Value into the backend.
- Parameters:
value (Any) – A value to record.
data (Optional[bytes]) – Byte stream to record. If not given, usual value serialization is used.
- Returns:
value_hash of recorded value.
- Return type:
str
- abstract set_eval_cache(eval_hash: str, task_hash: str, args_hash: str, value: Any, value_hash: str | None = None) None ¶
Sets a new value in the Evaluation cache.
- Parameters:
eval_hash (str) – A hash of the combination of the task_hash and args_hash.
task_hash (str) – Hash of Task used in the call.
args_hash (str) – Hash of all arguments used in the call.
value (Any) – Value to record in cache.
value_hash (str) – Hash of value to record in cache.
redun.backends.value_store module¶
- class redun.backends.value_store.ValueStore(root_path: str, use_subdir: bool = True)¶
Bases:
object
Stores redun Values on filesystem (local or remote).
The value store can be configured within the [backend] section using the value_store_path and value_store_min_size variables.
Values in this store are organized similarly to git objects:
<value_store_path>/ab/cdef… for value with hash abcdef…
- get(value_hash: str) Tuple[bytes, bool] ¶
Retrieve Value data.
- get_value_path(value_hash: str) str ¶
Return the path for a value.
- has(value_hash: str) bool ¶
Return True if value is in store.
- put(value_hash: str, data: bytes) None ¶
Store Value data.
- size(value_hash: str) int ¶
Returns the size in bytes of a Value.