redun.backends.db package¶
Submodules¶
redun.backends.db.dataflow module¶
Dataflow visualization.
An upstream dataflow visualization explains the derivation of a value. Take for example this dataflow visualization of the derivation of a VCF file from a bioinformatic analysis:
``` value = File(path=sample4.vcf, hash=********)
- value <– <****> call_variants(bam, ref_genome)
bam = <****> File(path=sample4.bam, hash=********) ref_genome = <****> File(path=ref_genome, hash=********)
- bam <– argument of <****> call_variants_all(bams, ref_genome)
<– <****> align_reads_all(fastqs, ref_genome) <– <****> align_reads(fastq, ref_genome)
fastq = <****> File(path=sample4.fastq, hash=********) ref_genome = <****> File(path=ref_genome, hash=********)
- fastq <– argument of <****> align_reads_all(fastqs, ref_genome)
<– argument of <****> main(fastqs, ref_genome) <– origin
- ref_genome <– argument of <****> align_reads_all(fastqs, ref_genome)
<– argument of <****> main(fastqs, ref_genome) <– origin
Hash values are indicated by * above. For reference, here is what the workflow might have been:
- ```
@task() def align_reads(fastq: File, ref_genome: File) -> File:
reads = cast(str, fastq.read()) ref = cast(str, ref_genome.read()) bam = File(fastq.path.replace(“fastq”, “bam”)) bam.write(“align({}, {})”.format(reads, ref)) return bam
@task() def call_variants(bam: File, ref_genome: File) -> File:
align = cast(str, bam.read()) ref = cast(str, ref_genome.read()) vcf = File(bam.path.replace(“bam”, “vcf”)) vcf.write(“calls({}, {})”.format(align, ref)) return vcf
@task() def align_reads_all(fastqs: List[File], ref_genome: File):
bams = [align_reads(fastq, ref_genome) for fastq in fastqs] return bams
@task() def call_variants_all(bams: List[File], ref_genome: File):
vcfs = [call_variants(bam, ref_genome) for bam in bams] return vcfs
@task() def main(fastqs: List[File], ref_genome: File):
bams = align_reads_all(fastqs, ref_genome) vcfs = call_variants_all(bams, ref_genome) return vcfs
A dataflow visualization consists of a series of paragraphs called “dataflow sections” that describe how one of the values is derived. Here is the section for the bam value:
``` bam <– argument of <****> call_variants_all(bams, ref_genome)
<– <****> align_reads_all(fastqs, ref_genome) <– <****> align_reads(fastq, ref_genome_2)
fastq = <****> File(path=sample4.fastq, hash=********) ref_genome = <****> File(path=ref_genome, hash=********)
A section is made of three clauses: assignment, routing, and arguments.
The assignment clause indicates which CallNode produced this value:
`
bam <-- argument of <********> call_variants_all(bams, ref_genome)
`
Routing clauses, if present, describe a series of additional CallNodes that “route” the value by passing via arguments from parent CallNode to child CallNode, or by results from child CallNode to parent CallNode.
- ```
<– result of <****> align_reads_all(fastqs, ref_genome) <– <****> align_reads(fastq, ref_genome_2)
Argument clauses define the value for each argument in the final CallNode.
- ```
fastq = <****> File(path=sample4.fastq, hash=********) ref_genome = <****> File(path=ref_genome, hash=********)
To build this visualization, the following strategy is used: - Given a starting value (e.g. a VCF file in the example above), walk the
CallGraph backwards (i.e. upstream) to determine relevant nodes. These are call DataflowNodes, which are connected by DataflowEdges.
DataflowEdges are then grouped into sections.
Each section is then reorganized into a DataflowSectionDOM. A DataflowDOM is the collection of DataflowSectionDOMs. The DOM(document object model) is an intermediate representation that can be rendered in multiple ways.
Once a DataflowDOM is created, it can either be rendered into a textual format, or serialized into JSON for the web frontend.
- class redun.backends.db.dataflow.ArgumentValue(argument: Argument, value: Value)¶
Bases:
NamedTuple
A DataflowNode used for tracing one subvalue in an argument.
- class redun.backends.db.dataflow.CallNodeValue(call_node: CallNode, value: Value)¶
Bases:
NamedTuple
A DataflowNode used for tracing one subvalue of a CallNode result.
- class redun.backends.db.dataflow.DataflowArg(var_name: str, value: Value)¶
Bases:
NamedTuple
An argument clause in a Dataflow DOM.
- var_name: str¶
Alias for field number 0
- class redun.backends.db.dataflow.DataflowAssign(var_name: str, prefix: str, node_display: str, node: ArgumentValue | CallNodeValue | CallNode | Value | None)¶
Bases:
NamedTuple
The assignment clause in a Dataflow DOM.
- node: ArgumentValue | CallNodeValue | CallNode | Value | None¶
Alias for field number 3
- node_display: str¶
Alias for field number 2
- prefix: str¶
Alias for field number 1
- var_name: str¶
Alias for field number 0
- class redun.backends.db.dataflow.DataflowEdge(src: ArgumentValue | CallNodeValue | CallNode | Value, dest: ArgumentValue | CallNodeValue | CallNode | Value | None)¶
Bases:
NamedTuple
An edge in a Dataflow graph.
- dest: ArgumentValue | CallNodeValue | CallNode | Value | None¶
Alias for field number 1
- src: ArgumentValue | CallNodeValue | CallNode | Value¶
Alias for field number 0
- class redun.backends.db.dataflow.DataflowRouting(prefix: str, node_display: str, node: ArgumentValue | CallNodeValue | CallNode | Value | None)¶
Bases:
NamedTuple
A routing clause in a Dataflow DOM.
- node: ArgumentValue | CallNodeValue | CallNode | Value | None¶
Alias for field number 2
- node_display: str¶
Alias for field number 1
- prefix: str¶
Alias for field number 0
- class redun.backends.db.dataflow.DataflowSectionDOM(assign: DataflowAssign, routing: List[DataflowRouting], args: List[DataflowArg])¶
Bases:
NamedTuple
A section in Dataflow DOM.
- args: List[DataflowArg]¶
Alias for field number 2
- assign: DataflowAssign¶
Alias for field number 0
- routing: List[DataflowRouting]¶
Alias for field number 1
- class redun.backends.db.dataflow.DataflowSectionKind(value)¶
Bases:
Enum
Each dataflow section describes either a task call or a data manipulation.
- CALL = 'call'¶
- DATA = 'data'¶
- class redun.backends.db.dataflow.DataflowVars¶
Bases:
object
Manages variable names for nodes in a dataflow.
- new_var_name(node: ArgumentValue | CallNodeValue | CallNode | Value, base_var_name: str | None = None) Tuple[str, str | None] ¶
Get or create a new variable name for a DataflowNode.
- redun.backends.db.dataflow.display_call_node(call_node: CallNode, renames: Dict[str, str]) str ¶
Formats a CallNode to a string.
- redun.backends.db.dataflow.display_dataflow(dom: Iterable[DataflowSectionDOM]) Iterator[str] ¶
Yields for lines displaying a dataflow DOM.
- redun.backends.db.dataflow.display_hash(node: ArgumentValue | CallNodeValue | CallNode | Value | None) str ¶
Formats hash for a DataflowNode.
- redun.backends.db.dataflow.display_node(node: ArgumentValue | CallNodeValue | CallNode | Value | None, renames: Dict[str, str]) Tuple[str, str] ¶
Formats a dataflow node to a string.
- redun.backends.db.dataflow.display_section(dom: DataflowSectionDOM) Iterator[str] ¶
Yields lines for displaying a dataflow section DOM.
- redun.backends.db.dataflow.get_callnode_arguments(call_node: CallNode) List[Argument] ¶
Returns a CallNode’s arguments in sorted order.
- redun.backends.db.dataflow.get_dataflow_call_node(node: ArgumentValue | CallNodeValue | CallNode | Value | None) CallNode | None ¶
Returns the CallNode for a DataflowNode.
- redun.backends.db.dataflow.get_default_arg_name(pos: int) str ¶
Generate the default name for argument.
- redun.backends.db.dataflow.get_node_hash(node: ArgumentValue | CallNodeValue | CallNode | Value | None) str | None ¶
Formats hash for a DataflowNode.
- redun.backends.db.dataflow.get_section_edge_type(edge: DataflowEdge) str ¶
Classifies a DataflowEdge.
- redun.backends.db.dataflow.get_task_args(task: Task) List[str] ¶
Returns list of argument names of a Task. Raises a SyntaxError if the task source code is not properly formatted.
- redun.backends.db.dataflow.is_internal_task(task: Task) bool ¶
Returns True if task is an internal redun task.
We skip such tasks in the dataflow to avoid clutter.
- redun.backends.db.dataflow.iter_dataflow_sections(dataflow_edges: Iterable[DataflowEdge]) Iterator[Tuple[DataflowSectionKind, List[DataflowEdge]]] ¶
Yields dataflow sections from an iterable of dataflow edges.
A dataflow section is a group of edges representing one ‘paragraph’ in a dataflow display.
- value <– <1234abcd> call_node(arg1, arg2)
<– result of <2345abcd> call_node2(arg3, arg4)
arg3 = <3456abcd> ‘hello_world’ arg4 = <4567abcd> File(‘foo.txt’)
- redun.backends.db.dataflow.iter_subsections(section: List[DataflowEdge]) Iterator[List[DataflowEdge]] ¶
Determines if a section should be broken down into small sections.
In real life dataflows, there are some cases where the dataflow merges such that the structure is a DAG, not just a tree. These merges represent a value that was passed to two or more different tasks and then their outputs eventually combine again, either into a single Value like a list or as arguments into a common task. For example, value is a merge node in the upstream dataflow of result.
value = task0() output1 = task1(a=value) output2 = task2(b=value) result = task3(c=output1, d=output2)
The upstream dataflow graph of result is:
The function iter_dataflow_section() will break this graph right after every the CallNode –> ArgumentValue edge, resulting in three sections. One of those sections will have a “merge node”, CallNode(task0):
- ArgumentValue(task1, key=c) ArgumentValue(task2, key=d)
-
V |
- CallNodeValue(task0, value) <–/
V
- CallNode(task0)
V
Origin
This function will further break this section into subsections like this:
Subsection 1:
- ArgumentValue(task1, key=c)
V
CallNodeValue(task0, value)
Subsection 2:
- ArgumentValue(task2, key=d)
V
CallNodeValue(task0, value)
Subsection 3:
- CallNodeValue(task0, value)
V
- CallNode(task0)
V
Origin
Ultimately these three subsections get rendered as:
c <– c_2
d <– c_2
- c_2 <– task()
<– origin
- redun.backends.db.dataflow.iter_unique(items: ~typing.Iterable[~redun.backends.db.dataflow.T], key: ~typing.Callable[[~redun.backends.db.dataflow.T], ~typing.Any] = <function <lambda>>) Iterator[T] ¶
Iterate through unique items.
- redun.backends.db.dataflow.make_data_section_dom(section: List[DataflowEdge], dataflow_vars: DataflowVars, new_varname: str = 'value') DataflowSectionDOM ¶
Returns a DOM for a data section.
A data section describes how one value was constructed from several subvalues. For example this dataflow section:
- parent_value <– derives from
subvalue1 = File(path=’file1’) subvalue2 = File(path=’file2’)
corresponds to this kind of code:
subvalue1 = task1() subvalue2 = task2() parent_value = [subvalue1, subvalue2] task3(parent_value)
- redun.backends.db.dataflow.make_dataflow_dom(dataflow_edges: Iterable[DataflowEdge], new_varname: str = 'value') Iterable[DataflowSectionDOM] ¶
Yields dataflow section DOMs from an iterable of dataflow edges.
It also performs variable renaming to give every value a unique variable name.
- redun.backends.db.dataflow.make_section_dom(section: List[DataflowEdge], dataflow_vars: DataflowVars, new_varname: str = 'value') DataflowSectionDOM ¶
Returns DOM for a dataflow section.
- redun.backends.db.dataflow.make_var_name(var_name_base: str, name2var: Dict[str, ArgumentValue | CallNodeValue | CallNode | Value], suffix: int = 2) str ¶
Generate a new variable using a unique suffix (e.g. myvar_2).
- redun.backends.db.dataflow.rewrite_call_node_merges(edges: List[DataflowEdge]) List[DataflowEdge] ¶
Rewrites dataflow graphs to enforce one CallNodeValue per CallNode.
This function identifies CallNode that have multilple parent CallNodeValues like this:
- ArgumentValue –> CallNodeValue – V
CallNode (merge_node) ^
ArgumentValue –> CallNodeValue –/
and rewrite them to unify the CallNodeValues like this:
- ArgumentValue – V
CallNodeValue –> CallNode ^
ArgumentValue –/
- redun.backends.db.dataflow.serialize_dataflow(dom: Iterable[DataflowSectionDOM]) Iterator[dict] ¶
Serialize DataflowDOM to JSON.
- redun.backends.db.dataflow.serialize_section(dom: DataflowSectionDOM) dict ¶
Serialize a DataflowSectionDOM to JSON.
- redun.backends.db.dataflow.toposort_edges(edges: Iterable[DataflowEdge]) Iterator[DataflowEdge] ¶
Topologically sort DataflowEdges in depth-first order.
- redun.backends.db.dataflow.walk_dataflow(backend: RedunBackendDb, init_node: ArgumentValue | CallNodeValue | CallNode | Value) Iterator[DataflowEdge] ¶
Iterate through all the upstream dataflow edges of a ‘node’ in the CallGraph.
A ‘node’ can be a Value, CallNode, CallNodeValue, or an ArgumentValue.
- redun.backends.db.dataflow.walk_dataflow_argument_value(backend: RedunBackendDb, argument_value: ArgumentValue) Iterator[DataflowEdge] ¶
Iterates through the upstream dataflow edges of an ArgumentValue.
Edge types: - ArgumentValue <– CallNodeValue:
Value came from the result of a CallNode.
ArgumentValue <– ArgumentValue - Value came from argument of parent CallNode, i.e. argument-to-argument routing.
ArgumentValue <– Origin - Value came directly from user or task.
- redun.backends.db.dataflow.walk_dataflow_callnode(backend: RedunBackendDb, call_node: CallNode) Iterator[DataflowEdge] ¶
Iterates through the upstream Arguments of a CallNode.
- redun.backends.db.dataflow.walk_dataflow_callnode_value(backend: RedunBackendDb, call_node_value: CallNodeValue) Iterator[DataflowEdge] ¶
Iterates through the upstream dataflow edges of a CallNodeValue.
The edges either go deeper to the child CallNodes or stay with this CallNode.
- redun.backends.db.dataflow.walk_dataflow_node(backend: RedunBackendDb, node: ArgumentValue | CallNodeValue | CallNode | Value) Iterator[DataflowEdge] ¶
Iterates through the upstream dataflow edges of a any DataflowNode.
- redun.backends.db.dataflow.walk_dataflow_value(backend: RedunBackendDb, value: Value) Iterator[DataflowEdge] ¶
Iterates through the edges in the upstream dataflow graph of a Value.
redun.backends.db.query module¶
- class redun.backends.db.query.CallGraphQuery(session: Session, joins: Set[str] | None = None, execution_joins: List[Callable[[Query], Query]] | None = None, filters: List | None = None, order_by: str | None = None, filter_types: Set | None = None, executions: Query | None = None, jobs: Query | None = None, call_nodes: Query | None = None, tasks: Query | None = None, values: Query | None = None, value_subqueries: List[Query] | None = None)¶
Bases:
object
Query class for efficiently querying across all models in a CallGraph.
- ExecTag = <AliasedClass at 0x7f730b44b850; Tag>¶
- MODEL_CLASSES = {'CallNode': <class 'redun.backends.db.CallNode'>, 'Execution': <class 'redun.backends.db.Execution'>, 'Job': <class 'redun.backends.db.Job'>, 'Task': <class 'redun.backends.db.Task'>, 'Value': <class 'redun.backends.db.Value'>}¶
- MODEL_NAMES = ['Execution', 'Job', 'CallNode', 'Task', 'Value']¶
- MODEL_PKS = {<class 'redun.backends.db.CallNode'>: 'call_hash', <class 'redun.backends.db.Execution'>: 'id', <class 'redun.backends.db.Job'>: 'id', <class 'redun.backends.db.Task'>: 'hash', <class 'redun.backends.db.Value'>: 'value_hash'}¶
- all() Iterator[Base] ¶
Yields all records matching query.
- build()¶
Apply all joins and filters to subqueries.
- clone(**kwargs: Any) CallGraphQuery ¶
Returns a clone of the query with updates specified by kwargs.
- count() Iterator[Tuple[str, int]] ¶
Returns counts for each record type.
- empty() CallGraphQuery ¶
Returns an empty query.
- filter_arguments(value_hashes: List[str]) CallGraphQuery ¶
Filter jobs by argument values.
- filter_execution_ids(execution_ids: Iterable[str]) CallGraphQuery ¶
Filter query by Execution ids.
- filter_execution_statuses(execution_statuses: Iterable[str]) CallGraphQuery ¶
Filter by Execution status.
- filter_execution_tags(tags: Iterable[Tuple[str, Any]]) CallGraphQuery ¶
Filter by tag on executions.
- filter_file_paths(paths: Iterable[str]) CallGraphQuery ¶
Filter by File.path patterns.
paths can contain “*” to perform wildcard matching.
- filter_ids(_ids: Iterable[str]) CallGraphQuery ¶
Filter query by record ids.
- filter_job_statuses(job_statuses: Iterable[str]) CallGraphQuery ¶
Filter by Job status.
- filter_results(value_hashes: List[str]) CallGraphQuery ¶
Filter jobs by result values.
- filter_tags(tags: Iterable[Tuple[str, Any]]) CallGraphQuery ¶
Filter by tags.
- filter_task_hashes(task_hashes: Iterable[str]) CallGraphQuery ¶
Filter by Task hashes.
- filter_task_names(task_names: Iterable[str]) CallGraphQuery ¶
Filter by Task names.
- filter_types(types: Iterable[str]) CallGraphQuery ¶
Filter query by record type.
- filter_value_types(value_types: Iterable[str]) CallGraphQuery ¶
Filter query by Value types.
- first() Base | None ¶
Returns first record if it exists.
- like_id(id: str) CallGraphQuery ¶
Filter query by record id prefix id.
- limit(size) Iterator[Base] ¶
Yields at most size records from query.
- one() Base ¶
Returns exactly one record. Raises error if too few or too many.
- one_or_none() Base ¶
Returns exactly one record. Returns None if too few or too many.
- order_by(order_by: str) CallGraphQuery ¶
Order query.
- order_by: str
The only supported value is ‘time’.
- page(page: int, page_size: int) Iterator[Base] ¶
Yields a page-worth of results from query.
page is zero-indexed.
- select(*columns: Iterable[str], flat: bool = False) Iterator[Any] ¶
Select columns to return in query.
- property subqueries: Iterable[Tuple[str, Query]]¶
Iterates through all subqueries.
- Yields:
(type_name, subquery) (Tuple[str, Query])
- redun.backends.db.query.find_file(session: Session, path: str) Tuple[File, Job, str] | None ¶
Find a File by its path.
Prefer instance of File as result from a Task. - Amongst result, prefer the most recent.
Otherwise, search for File as argument to a Task. - Amongst arguments, prefer the most recent.
For both results and arguments, also search whether File was a Subvalue (e.g. a value within a list, dict, etc).
We prefer the most recent, since it has the best chance of still being valid.
- redun.backends.db.query.infer_id(session: Session, id: str, include_files: bool = True, required: bool = False) Any ¶
Try to infer the record based on an id prefix.
- redun.backends.db.query.infer_specialty_id(session: Session, id: str, include_files: bool = True) Any ¶
Try to infer the record based on speciality id (e.g. file paths, - shorthand).
- redun.backends.db.query.parse_callgraph_query(query: CallGraphQuery, args: Namespace) CallGraphQuery ¶
Parse an argparse.Namespace into a CallGraphQuery.
- redun.backends.db.query.setup_query_parser(parser: ArgumentParser) ArgumentParser ¶
Add actions to an ArgumentParser to support querying a CallGraph.
redun.backends.db.serializers module¶
- class redun.backends.db.serializers.CallNodeSerializer(db_version: DBVersionInfo | None = None)¶
Bases:
Serializer
- deserialize(spec: dict) List[Base] ¶
- pk_field: str = 'call_hash'¶
- serialize(call_node: Base) dict ¶
- serialize_query(query: Query) Iterator[dict] ¶
- class redun.backends.db.serializers.ExecutionSerializer(db_version: DBVersionInfo | None = None)¶
Bases:
Serializer
- deserialize(spec: dict) List[Base] ¶
- pk_field: str = 'id'¶
- serialize(execution: Base) dict ¶
- class redun.backends.db.serializers.JobSerializer(db_version: DBVersionInfo | None = None)¶
Bases:
Serializer
- deserialize(spec: dict) List[Base] ¶
- pk_field: str = 'id'¶
- serialize(job: Base) dict ¶
- serialize_query(query: Query) Iterator[dict] ¶
- class redun.backends.db.serializers.RecordSerializer(db_version: DBVersionInfo | None = None)¶
Bases:
Serializer
- deserialize(spec: dict) List[Base] ¶
- get_pk(spec: dict) str ¶
- get_subserializer(type: str) Serializer ¶
- serialize(obj: Base) dict ¶
- serialize_query(query: Query) Iterator[dict] ¶
- class redun.backends.db.serializers.Serializer(db_version: DBVersionInfo | None = None)¶
Bases:
ABC
Base class serializer for RedunBackendDb records.
- deserialize(spec: dict) List[Base] ¶
- get_pk(spec: dict) str ¶
- pk_field: str = ''¶
- serialize(row: Base) dict ¶
- serialize_query(query: Query) Iterator[dict] ¶
- class redun.backends.db.serializers.TagSerializer(db_version: DBVersionInfo | None = None)¶
Bases:
Serializer
- deserialize(spec: dict) List[Base] ¶
- pk_field: str = 'tag_hash'¶
- serialize(tag: Base) dict ¶
- serialize_query(query: Query) Iterator[dict] ¶
- class redun.backends.db.serializers.ValueSerializer(db_version: DBVersionInfo | None = None)¶
Bases:
Serializer
- deserialize(spec: dict) List[Base] ¶
- pk_field: str = 'value_hash'¶
- serialize(value: Base) dict ¶
- serialize_query(query: Query) Iterator[dict] ¶
- redun.backends.db.serializers.deserialize_timestamp(date_str: str | None) datetime | None ¶
Deserialize timestamp string to datetime value.
- redun.backends.db.serializers.serialize_timestamp(timestamp: datetime | None) str | None ¶
Serialize a datetime value to a timestamp string.
Module contents¶
- class redun.backends.db.Argument(**kwargs)¶
Bases:
Base
Input value for a called Task.
- arg_hash¶
- arg_key¶
- arg_position¶
- arg_results¶
- call_hash¶
- call_node¶
- upstream¶
- value¶
- value_hash¶
- property value_parsed: Any | None¶
- class redun.backends.db.ArgumentResult(**kwargs)¶
Bases:
Base
Many-to-many relationship between results and Arguments.
- arg¶
- arg_hash¶
- result_call_hash¶
- result_call_node¶
- class redun.backends.db.CallEdge(**kwargs)¶
Bases:
Base
An edge in the CallGraph.
This is a many-to-many table for CallNode.
- call_order¶
- child_id¶
- child_node¶
- parent_id¶
- parent_node¶
- class redun.backends.db.CallNode(**kwargs)¶
Bases:
Base
A CallNode in the CallGraph.
- arg_results¶
- property args_display: str¶
- args_hash¶
- arguments¶
- call_hash¶
- call_hash_idx = Index('ix_call_node_call_hash_vpo', Column('call_hash', String(length=40), table=<call_node>, primary_key=True, nullable=False), unique=True)¶
- child_edges¶
- downstream¶
- jobs¶
- parent_edges¶
- tags¶
- task¶
- task_hash¶
- task_name¶
- task_set¶
- timestamp¶
- value¶
- value_hash¶
- property value_parsed: Any | None¶
- class redun.backends.db.Column(*args, **kwargs)¶
Bases:
Column
Use non-null Columns by default.
- inherit_cache: bool | None = True¶
Indicate if this
HasCacheKey
instance should make use of the cache key generation scheme used by its immediate superclass.The attribute defaults to
None
, which indicates that a construct has not yet taken into account whether or not its appropriate for it to participate in caching; this is functionally equivalent to setting the value toFalse
, except that a warning is also emitted.This flag can be set to
True
on a particular class, if the SQL that corresponds to the object does not change based on attributes which are local to this class, and not its superclass.See also
compilerext_caching - General guideslines for setting the
HasCacheKey.inherit_cache
attribute for third-party or user defined SQL constructs.
- class redun.backends.db.DBVersionInfo(migration_id: str, major: int, minor: int, description: str)¶
Bases:
NamedTuple
A redun repo database version and migration id.
- description: str¶
Alias for field number 3
- major: int¶
Alias for field number 1
- migration_id: str¶
Alias for field number 0
- minor: int¶
Alias for field number 2
- class redun.backends.db.DateTimeUTC(*args: Any, **kwargs: Any)¶
Bases:
TypeDecorator
- cache_ok: bool | None = True¶
Indicate if statements using this
ExternalType
are “safe to cache”.The default value
None
will emit a warning and then not allow caching of a statement which includes this type. Set toFalse
to disable statements using this type from being cached at all without a warning. When set toTrue
, the object’s class and selected elements from its state will be used as part of the cache key. For example, using aTypeDecorator
:class MyType(TypeDecorator): impl = String cache_ok = True def __init__(self, choices): self.choices = tuple(choices) self.internal_only = True
The cache key for the above type would be equivalent to:
>>> MyType(["a", "b", "c"])._static_cache_key (<class '__main__.MyType'>, ('choices', ('a', 'b', 'c')))
The caching scheme will extract attributes from the type that correspond to the names of parameters in the
__init__()
method. Above, the “choices” attribute becomes part of the cache key but “internal_only” does not, because there is no parameter named “internal_only”.The requirements for cacheable elements is that they are hashable and also that they indicate the same SQL rendered for expressions using this type every time for a given cache value.
To accommodate for datatypes that refer to unhashable structures such as dictionaries, sets and lists, these objects can be made “cacheable” by assigning hashable structures to the attributes whose names correspond with the names of the arguments. For example, a datatype which accepts a dictionary of lookup values may publish this as a sorted series of tuples. Given a previously un-cacheable type as:
class LookupType(UserDefinedType): '''a custom type that accepts a dictionary as a parameter. this is the non-cacheable version, as "self.lookup" is not hashable. ''' def __init__(self, lookup): self.lookup = lookup def get_col_spec(self, **kw): return "VARCHAR(255)" def bind_processor(self, dialect): # ... works with "self.lookup" ...
Where “lookup” is a dictionary. The type will not be able to generate a cache key:
>>> type_ = LookupType({"a": 10, "b": 20}) >>> type_._static_cache_key <stdin>:1: SAWarning: UserDefinedType LookupType({'a': 10, 'b': 20}) will not produce a cache key because the ``cache_ok`` flag is not set to True. Set this flag to True if this type object's state is safe to use in a cache key, or False to disable this warning. symbol('no_cache')
If we did set up such a cache key, it wouldn’t be usable. We would get a tuple structure that contains a dictionary inside of it, which cannot itself be used as a key in a “cache dictionary” such as SQLAlchemy’s statement cache, since Python dictionaries aren’t hashable:
>>> # set cache_ok = True >>> type_.cache_ok = True >>> # this is the cache key it would generate >>> key = type_._static_cache_key >>> key (<class '__main__.LookupType'>, ('lookup', {'a': 10, 'b': 20})) >>> # however this key is not hashable, will fail when used with >>> # SQLAlchemy statement cache >>> some_cache = {key: "some sql value"} Traceback (most recent call last): File "<stdin>", line 1, in <module> TypeError: unhashable type: 'dict'
The type may be made cacheable by assigning a sorted tuple of tuples to the “.lookup” attribute:
class LookupType(UserDefinedType): '''a custom type that accepts a dictionary as a parameter. The dictionary is stored both as itself in a private variable, and published in a public variable as a sorted tuple of tuples, which is hashable and will also return the same value for any two equivalent dictionaries. Note it assumes the keys and values of the dictionary are themselves hashable. ''' cache_ok = True def __init__(self, lookup): self._lookup = lookup # assume keys/values of "lookup" are hashable; otherwise # they would also need to be converted in some way here self.lookup = tuple( (key, lookup[key]) for key in sorted(lookup) ) def get_col_spec(self, **kw): return "VARCHAR(255)" def bind_processor(self, dialect): # ... works with "self._lookup" ...
Where above, the cache key for
LookupType({"a": 10, "b": 20})
will be:>>> LookupType({"a": 10, "b": 20})._static_cache_key (<class '__main__.LookupType'>, ('lookup', (('a', 10), ('b', 20))))
Added in version 1.4.14: - added the
cache_ok
flag to allow some configurability of caching forTypeDecorator
classes.Added in version 1.4.28: - added the
ExternalType
mixin which generalizes thecache_ok
flag to both theTypeDecorator
andUserDefinedType
classes.See also
sql_caching
- impl: TypeEngine[Any] | Type[TypeEngine[Any]] = DateTime(timezone=True)¶
- process_bind_param(value: Any, dialect)¶
Receive a bound parameter value to be converted.
Custom subclasses of
_types.TypeDecorator
should override this method to provide custom behaviors for incoming data values. This method is called at statement execution time and is passed the literal Python data value which is to be associated with a bound parameter in the statement.The operation could be anything desired to perform custom behavior, such as transforming or serializing data. This could also be used as a hook for validating logic.
- Parameters:
value – Data to operate upon, of any type expected by this method in the subclass. Can be
None
.dialect – the
Dialect
in use.
See also
types_typedecorator
_types.TypeDecorator.process_result_value()
- process_result_value(value: Any, dialect)¶
Receive a result-row column value to be converted.
Custom subclasses of
_types.TypeDecorator
should override this method to provide custom behaviors for data values being received in result rows coming from the database. This method is called at result fetching time and is passed the literal Python data value that’s extracted from a database result row.The operation could be anything desired to perform custom behavior, such as transforming or deserializing data.
- Parameters:
value – Data to operate upon, of any type expected by this method in the subclass. Can be
None
.dialect – the
Dialect
in use.
See also
types_typedecorator
_types.TypeDecorator.process_bind_param()
- class redun.backends.db.Evaluation(**kwargs)¶
Bases:
Base
Cache table for evaluations.
eval_hash (hash of task_hash and args_hash) is the cache key, and value_hash is the cached value.
- args_hash¶
- eval_hash¶
- task¶
- task_hash¶
- value¶
- value_hash¶
- property value_parsed: Any | None¶
- class redun.backends.db.Execution(*args, **kwargs)¶
Bases:
Base
- args¶
- calc_status(result_type: str | None) str ¶
Compute status from the result_type of the root Job.
- id¶
- id_idx = Index('ix_execution_id_vpo', Column('id', String(), table=<execution>, primary_key=True, nullable=False), unique=True)¶
- job¶
- job_id¶
- jobs¶
- property status: str¶
Returns the computed status of the Execution.
- property status_display: str¶
Return status suitable for display in tables.
- tags¶
- updated_time¶
- class redun.backends.db.File(**kwargs)¶
Bases:
Base
A File used as a Value by a Task.
- parent_values¶
- path¶
- value¶
- value_hash¶
- class redun.backends.db.Handle(**kwargs)¶
Bases:
Base
- children¶
- fullname¶
- hash¶
- is_valid¶
- key¶
- parent_values¶
- parents¶
- value¶
- value_hash¶
- class redun.backends.db.JSON(*args: Any, **kwargs: Any)¶
Bases:
TypeDecorator
This custom column type allows use of JSON across both sqlite and postgres.
In potsgres, the column acts like a JSONB column. In sqlite, it acts like a string with normalization (e.g. sorted keys, etc) applied to the JSON serialization. This allows us to do exact matching indexing across both sqlite and postgres.
- cache_ok: bool | None = True¶
Indicate if statements using this
ExternalType
are “safe to cache”.The default value
None
will emit a warning and then not allow caching of a statement which includes this type. Set toFalse
to disable statements using this type from being cached at all without a warning. When set toTrue
, the object’s class and selected elements from its state will be used as part of the cache key. For example, using aTypeDecorator
:class MyType(TypeDecorator): impl = String cache_ok = True def __init__(self, choices): self.choices = tuple(choices) self.internal_only = True
The cache key for the above type would be equivalent to:
>>> MyType(["a", "b", "c"])._static_cache_key (<class '__main__.MyType'>, ('choices', ('a', 'b', 'c')))
The caching scheme will extract attributes from the type that correspond to the names of parameters in the
__init__()
method. Above, the “choices” attribute becomes part of the cache key but “internal_only” does not, because there is no parameter named “internal_only”.The requirements for cacheable elements is that they are hashable and also that they indicate the same SQL rendered for expressions using this type every time for a given cache value.
To accommodate for datatypes that refer to unhashable structures such as dictionaries, sets and lists, these objects can be made “cacheable” by assigning hashable structures to the attributes whose names correspond with the names of the arguments. For example, a datatype which accepts a dictionary of lookup values may publish this as a sorted series of tuples. Given a previously un-cacheable type as:
class LookupType(UserDefinedType): '''a custom type that accepts a dictionary as a parameter. this is the non-cacheable version, as "self.lookup" is not hashable. ''' def __init__(self, lookup): self.lookup = lookup def get_col_spec(self, **kw): return "VARCHAR(255)" def bind_processor(self, dialect): # ... works with "self.lookup" ...
Where “lookup” is a dictionary. The type will not be able to generate a cache key:
>>> type_ = LookupType({"a": 10, "b": 20}) >>> type_._static_cache_key <stdin>:1: SAWarning: UserDefinedType LookupType({'a': 10, 'b': 20}) will not produce a cache key because the ``cache_ok`` flag is not set to True. Set this flag to True if this type object's state is safe to use in a cache key, or False to disable this warning. symbol('no_cache')
If we did set up such a cache key, it wouldn’t be usable. We would get a tuple structure that contains a dictionary inside of it, which cannot itself be used as a key in a “cache dictionary” such as SQLAlchemy’s statement cache, since Python dictionaries aren’t hashable:
>>> # set cache_ok = True >>> type_.cache_ok = True >>> # this is the cache key it would generate >>> key = type_._static_cache_key >>> key (<class '__main__.LookupType'>, ('lookup', {'a': 10, 'b': 20})) >>> # however this key is not hashable, will fail when used with >>> # SQLAlchemy statement cache >>> some_cache = {key: "some sql value"} Traceback (most recent call last): File "<stdin>", line 1, in <module> TypeError: unhashable type: 'dict'
The type may be made cacheable by assigning a sorted tuple of tuples to the “.lookup” attribute:
class LookupType(UserDefinedType): '''a custom type that accepts a dictionary as a parameter. The dictionary is stored both as itself in a private variable, and published in a public variable as a sorted tuple of tuples, which is hashable and will also return the same value for any two equivalent dictionaries. Note it assumes the keys and values of the dictionary are themselves hashable. ''' cache_ok = True def __init__(self, lookup): self._lookup = lookup # assume keys/values of "lookup" are hashable; otherwise # they would also need to be converted in some way here self.lookup = tuple( (key, lookup[key]) for key in sorted(lookup) ) def get_col_spec(self, **kw): return "VARCHAR(255)" def bind_processor(self, dialect): # ... works with "self._lookup" ...
Where above, the cache key for
LookupType({"a": 10, "b": 20})
will be:>>> LookupType({"a": 10, "b": 20})._static_cache_key (<class '__main__.LookupType'>, ('lookup', (('a', 10), ('b', 20))))
Added in version 1.4.14: - added the
cache_ok
flag to allow some configurability of caching forTypeDecorator
classes.Added in version 1.4.28: - added the
ExternalType
mixin which generalizes thecache_ok
flag to both theTypeDecorator
andUserDefinedType
classes.See also
sql_caching
- impl¶
alias of
String
- load_dialect_impl(dialect)¶
Return a
TypeEngine
object corresponding to a dialect.This is an end-user override hook that can be used to provide differing types depending on the given dialect. It is used by the
TypeDecorator
implementation oftype_engine()
to help determine what type should ultimately be returned for a givenTypeDecorator
.By default returns
self.impl
.
- process_bind_param(value: Any, dialect)¶
Receive a bound parameter value to be converted.
Custom subclasses of
_types.TypeDecorator
should override this method to provide custom behaviors for incoming data values. This method is called at statement execution time and is passed the literal Python data value which is to be associated with a bound parameter in the statement.The operation could be anything desired to perform custom behavior, such as transforming or serializing data. This could also be used as a hook for validating logic.
- Parameters:
value – Data to operate upon, of any type expected by this method in the subclass. Can be
None
.dialect – the
Dialect
in use.
See also
types_typedecorator
_types.TypeDecorator.process_result_value()
- process_result_value(value: Any, dialect)¶
Receive a result-row column value to be converted.
Custom subclasses of
_types.TypeDecorator
should override this method to provide custom behaviors for data values being received in result rows coming from the database. This method is called at result fetching time and is passed the literal Python data value that’s extracted from a database result row.The operation could be anything desired to perform custom behavior, such as transforming or deserializing data.
- Parameters:
value – Data to operate upon, of any type expected by this method in the subclass. Can be
None
.dialect – the
Dialect
in use.
See also
types_typedecorator
_types.TypeDecorator.process_bind_param()
- class redun.backends.db.Job(**kwargs)¶
Bases:
Base
- cached¶
- calc_status(result_type: str | None) str ¶
Calculate Job status from result type.
- call_hash¶
- call_node¶
- child_jobs¶
- property context: dict¶
Returns the associated context.
- property duration: timedelta | None¶
Returns duration of the Job or None if Job end_time is not recorded.
- end_time¶
- execution¶
- execution_id¶
- id¶
- id_idx = Index('ix_job_id_vpo', Column('id', String(), table=<job>, primary_key=True, nullable=False), unique=True)¶
- parent_id¶
- parent_job¶
- start_time¶
- property status: str¶
Returns Job status (RUNNING, FAILED, CACHED, DONE).
- property status_display: str¶
Return status suitable for display in tables.
- tags¶
- task¶
- task_hash¶
- class redun.backends.db.PreviewValue(value: Value, size: int)¶
Bases:
object
A preview value if the value is too large or if there is an error.
- class redun.backends.db.RedunBackendDb(db_uri: str | None = None, config: dict | SectionProxy | Config | None = None, logger: Any | None = None, *args: Any, **kwargs: Any)¶
Bases:
RedunBackend
A database-based Backend for managing the CallGraph (provenance) and cache for a Scheduler.
This backend makes use of SQLAlchemy to provide support for both a sqlite and postgresql databases.
- advance_handle(parent_handles: List[Handle], child_handle: Handle) None ¶
Record parent-child relationships between Handles.
- check_cache(task_hash: str, args_hash: str, eval_hash: str, execution_id: str, scheduler_task_hashes: Set[str], cache_scope: CacheScope, check_valid: CacheCheckValid, context_hash: str | None = None, allowed_cache_results: Set[CacheResult] | None = None) Tuple[Any, str | None, CacheResult] ¶
See parent method.
- clone(session: Session | None = None)¶
Return a copy of the backend that shares the instantiated database engine
- create_engine() Any ¶
Initializes a database connection.
- delete_tags(entity_id: str, tags: Iterable[Tuple[str, Any]], keys: Iterable[str] = ()) List[Tuple[str, str, str, Any]] ¶
Delete tags.
- explain_cache_miss(task: Task, args_hash: str) Dict[str, Any] | None ¶
Determine the reason for a cache miss.
- static get_all_db_versions() List[DBVersionInfo] ¶
Returns list of all DB versions and their migration ids.
- get_call_cache(call_hash: str) Tuple[Any, bool] ¶
Returns the result of a previously recorded CallNode.
- Parameters:
call_hash (str) – Hash of a CallNode.
- Returns:
Recorded final result of a CallNode.
- Return type:
Any
- get_child_record_ids(model_ids: Iterable[Tuple[Base, str]]) Iterable[Tuple[str, Base, str]] ¶
Iterates through record’s ownership edges.
Used for walking record graph for syncing.
- get_db_version() DBVersionInfo ¶
Returns the current DB version (major, minor).
- static get_db_version_required() Tuple[DBVersionInfo, DBVersionInfo] ¶
Returns the DB version range required by this library.
- get_eval_cache(eval_hash: str) Tuple[Any, bool] ¶
Checks the Evaluation cache for a cached result.
- Parameters:
eval_hash (str) – Hash of the task and arguments used for call.
- Returns:
(result, is_cached) – result is the cached result, or None if no result was found. is_cached is True if cache hit, otherwise is False.
- Return type:
Tuple[Any, bool]
- get_job(job_id: str) dict | None ¶
Returns details for a Job.
- get_records(ids: Iterable[str], sorted: bool = True) Iterable[dict] ¶
Returns serialized records for the given ids.
- Parameters:
ids (Iterable[str]) – Iterable of record ids to fetch serialized records.
sorted (bool) – If True, return records in the same order as the ids (Default: True).
- get_tags(entity_ids: List[str]) Dict[str, MultiMap[str, Any]] ¶
Get the tags of an entity (Execution, Job, CallNode, Task, Value).
- get_value(value_hash: str) Tuple[Any, bool] ¶
Returns a Value from the datastore using the value content address (value_hash).
- Parameters:
value_hash (str) – Hash of Value to fetch from ValueStore.
- Returns:
result, is_cached – Returns the result value and is_cached=True if the value is in the ValueStore, otherwise returns (None, False).
- Return type:
Tuple[Any, bool]
- has_records(record_ids: Iterable[str]) List[str] ¶
Returns record_ids that exist in the database.
- is_db_compatible() bool ¶
Returns True if database is compatible with library.
- is_valid_handle(handle: Handle) bool ¶
A handle is valid if it is current or ancestral to the current handle.
- iter_record_ids(root_ids: Iterable[str]) Iterator[str] ¶
Iterate the record ids of descendants of root_ids.
- load(migrate: bool | None = None) None ¶
Load backend database.
For protection, only upgrades are allowed when automigrating. Downgrades could potentially drop data. Users should explicitly downgrade using something like redun db downgrade XXX.
- Parameters:
migrate (Optional[bool]) – If None, defer to automigration config options. If True, perform migration after establishing database connection.
- migrate(desired_version: DBVersionInfo | None = None, upgrade_only: bool = False) None ¶
Migrate database to desired version.
- Parameters:
desired_version (Optional[DBVersionInfo]) – Desired version to update redun database to. If null, update to latest version.
upgrade_only (bool) – By default, this function will perform both upgrades and downgrades. Set this to true to prevent downgrades (such as during automigration).
- put_records(records: Iterable[dict]) int ¶
Writes records to the database and returns number of new records written.
- record_call_node(task_name: str, task_hash: str, args_hash: str, expr_args: Tuple[Tuple, dict], eval_args: Tuple[Tuple, dict], result_hash: str, child_call_hashes: List[str]) str ¶
Record a completed CallNode.
- Parameters:
task_name (str) – Fullname (with namespace) of task.
task_hash (str) – Hash of task.
args_hash (str) – Hash of all arguments of the call.
expr_args (Tuple[Tuple, dict]) – Original expressions for the task arguments. These expressions are used to record the full upstream dataflow.
eval_args (Tuple[Tuple, dict]) – The fully evaluated arguments of the task arguments.
result_hash (str) – Hash of the result value of the call.
child_call_hashes (List[str]) – call_hashes of any child task calls.
- Returns:
The call_hash of the new CallNode.
- Return type:
str
- record_call_node_context(call_hash: str, context_hash: str | None, context: dict) str ¶
Records a context dict for a CallNode as a Tag.
- record_execution(exec_id: str, args: List[str]) None ¶
Records an Execution to the backend.
- Parameters:
exec_id (str) – The id of the execution.
args (List[str]) – Arguments used on the command line to start Execution.
- Returns:
UUID of new Execution.
- Return type:
str
- record_job_end(job: BaseJob, now: datetime | None = None, status: str | None = None) None ¶
Records the end of a Job.
Create the job if needed, in which case the job will be recorded with start_time==end_time
- record_tags(entity_type: TagEntity, entity_id: str, tags: Iterable[Tuple[str, Any]], parents: Iterable[str] = (), update: bool = False, new: bool = False) List[Tuple[str, str, str, Any]] ¶
Record tags for an entity (Execution, Job, CallNode, Task, Value).
- Parameters:
entity_type (TagEntity) – The type of the tagged entity (Execution, Job, etc).
entity_id (str) – The id of the tagged entity.
tags (Iterable[KeyValue]) – An iterable of key-value pairs to create as tags.
parents (Iterable[str]) – Ids of tags to be superseded by the new tags.
update (bool) – If True, automatically supersede any existing tags with keys matching those in tags. This also implies new=True.
new (bool) – If True, force tags to be current.
- Returns:
[(tag_hash, entity_id, key, value)] – Returns a list of the created tags.
- Return type:
List[Tuple[str, str, str, Any]]
- record_updated_time() None ¶
Updates updated_time (heartbeat) timestamp for the current Execution.
- record_value(value: Any, data: bytes | None = None) str ¶
Record a Value into the backend.
- Parameters:
value (Any) – A value to record.
data (Optional[bytes]) – Byte stream to record. If not given, usual value serialization is used.
- Returns:
value_hash of recorded value.
- Return type:
str
- set_eval_cache(eval_hash: str, task_hash: str, args_hash: str, value: Any, value_hash: str | None = None) None ¶
Sets a new value in the Evaluation cache.
- Parameters:
eval_hash (str) – A hash of the combination of the task_hash and args_hash.
task_hash (str) – Hash of Task used in the call.
args_hash (str) – Hash of all arguments used in the call.
value (Any) – Value to record in cache.
value_hash (str) – Hash of value to record in cache.
- exception redun.backends.db.RedunDatabaseError¶
Bases:
Exception
- class redun.backends.db.RedunMigration(**kwargs)¶
Bases:
Base
Migration history of redun database (aka alembic table).
- version_num¶
- class redun.backends.db.RedunSession(backend: RedunBackendDb, *args, **kwargs)¶
Bases:
Session
Sqlalchemy Session with a reference to the redun backend.
This is used to give Sqlalchemy models access to the redun backend API.
- class redun.backends.db.RedunVersion(**kwargs)¶
Bases:
Base
Version of redun database.
- id¶
- timestamp¶
- version¶
- exception redun.backends.db.RedunVersionError¶
Bases:
RedunDatabaseError
- class redun.backends.db.Subvalue(**kwargs)¶
Bases:
Base
A Value that is a subvalue of another Value.
- child¶
- parent¶
- parent_value_hash¶
- value_hash¶
- class redun.backends.db.Tag(**kwargs)¶
Bases:
Base
- property base_key: str¶
- call_nodes¶
- child_edits¶
- children¶
- entity_id¶
- entity_type¶
- executions¶
- is_current¶
- jobs¶
- key¶
- property namespace: str¶
- parent_edits¶
- parents¶
- tag_hash¶
- tag_hash_idx = Index('ix_tag_tag_hash_current', Column('tag_hash', String(length=40), table=<tag>, primary_key=True, nullable=False), unique=True)¶
- tasks¶
- value¶
- property value_parsed: Any¶
Returns parsed value referenced by Tag.
- values¶
- class redun.backends.db.Task(**kwargs)¶
Bases:
Base
- call_nodes¶
- evals¶
- property fullname: str¶
- hash¶
- hash_idx = Index('ix_task_hash_vpo', Column('hash', String(length=40), table=<task>, primary_key=True, nullable=False), unique=True)¶
- jobs¶
- name¶
- name_idx = Index('ix_task_name_vpo', Column('name', String(), table=<task>, nullable=False))¶
- namespace¶
- namespace_idx = Index('ix_task_namespace_vpo', Column('namespace', String(), table=<task>, nullable=False))¶
- show_source() None ¶
- source¶
- tags¶
- value¶
- class redun.backends.db.Value(**kwargs)¶
Bases:
Base
A value used as input (Argument) or output (Result) from a Task.
- arguments¶
- child_edges¶
- children¶
- evals¶
- file¶
- format¶
- handle¶
- property in_value_store: bool¶
Returns True if value data is in a ValueStore.
- parent_edges¶
- parents¶
- property preview: Any¶
Returns a deserialized value, or a preview if there is an error or value is too large.
- results¶
- subfiles¶
- subhandles¶
- tags¶
- task¶
- type¶
- value¶
- value_hash¶
- value_hash_idx = Index('ix_value_value_hash_vpo', Column('value_hash', String(length=40), table=<value>, primary_key=True, nullable=False), unique=True)¶
- property value_parsed: Any | None¶
Returns a deserialized value, or a preview if there is an error.
- redun.backends.db.get_abs_path(root_dir: str, path: str) str ¶
Get the absolute path of path if it is a local path.
- redun.backends.db.get_call_node_child_edges(session: Session, ids: Iterable[str]) Iterable[Tuple[str, Base, str]] ¶
- redun.backends.db.get_execution_child_edges(session: Session, ids: Iterable[str]) Iterable[Tuple[str, Base, str]] ¶
- redun.backends.db.get_job_child_edges(session: Session, ids: Iterable[str]) Iterable[Tuple[str, Base, str]] ¶
- redun.backends.db.get_tag_child_edges(session: Session, ids: Iterable[str]) Iterable[Tuple[str, Base, str]] ¶
Get parents and children of the tags with the supplied ids.
- redun.backends.db.get_tag_entity_child_edges(session: Session, ids: Iterable[str]) Iterable[Tuple[str, Base, str]] ¶
Iterates through the child edges for Tags of a set of entity ids.
- redun.backends.db.get_value_child_edges(session: Session, ids: Iterable[str]) Iterable[Tuple[str, Base, str]] ¶
- redun.backends.db.parse_db_version(version_str: str) DBVersionInfo ¶
Parses a db version string such as “2.0” or “3” into a DbVersionInfo.