redun.backends.db package#

Submodules#

redun.backends.db.dataflow module#

Dataflow visualization.

An upstream dataflow visualization explains the derivation of a value. Take for example this dataflow visualization of the derivation of a VCF file from a bioinformatic analysis:

``` value = File(path=sample4.vcf, hash=********)

value <– <****> call_variants(bam, ref_genome)

bam = <****> File(path=sample4.bam, hash=********) ref_genome = <****> File(path=ref_genome, hash=********)

bam <– argument of <****> call_variants_all(bams, ref_genome)

<– <****> align_reads_all(fastqs, ref_genome) <– <****> align_reads(fastq, ref_genome)

fastq = <****> File(path=sample4.fastq, hash=********) ref_genome = <****> File(path=ref_genome, hash=********)

fastq <– argument of <****> align_reads_all(fastqs, ref_genome)

<– argument of <****> main(fastqs, ref_genome) <– origin

ref_genome <– argument of <****> align_reads_all(fastqs, ref_genome)

<– argument of <****> main(fastqs, ref_genome) <– origin

```

Hash values are indicated by * above. For reference, here is what the workflow might have been:

```

@task() def align_reads(fastq: File, ref_genome: File) -> File:

reads = cast(str, fastq.read()) ref = cast(str, ref_genome.read()) bam = File(fastq.path.replace(“fastq”, “bam”)) bam.write(“align({}, {})”.format(reads, ref)) return bam

@task() def call_variants(bam: File, ref_genome: File) -> File:

align = cast(str, bam.read()) ref = cast(str, ref_genome.read()) vcf = File(bam.path.replace(“bam”, “vcf”)) vcf.write(“calls({}, {})”.format(align, ref)) return vcf

@task() def align_reads_all(fastqs: List[File], ref_genome: File):

bams = [align_reads(fastq, ref_genome) for fastq in fastqs] return bams

@task() def call_variants_all(bams: List[File], ref_genome: File):

vcfs = [call_variants(bam, ref_genome) for bam in bams] return vcfs

@task() def main(fastqs: List[File], ref_genome: File):

bams = align_reads_all(fastqs, ref_genome) vcfs = call_variants_all(bams, ref_genome) return vcfs

```

A dataflow visualization consists of a series of paragraphs called “dataflow sections” that describe how one of the values is derived. Here is the section for the bam value:

``` bam <– argument of <****> call_variants_all(bams, ref_genome)

<– <****> align_reads_all(fastqs, ref_genome) <– <****> align_reads(fastq, ref_genome_2)

fastq = <****> File(path=sample4.fastq, hash=********) ref_genome = <****> File(path=ref_genome, hash=********)

```

A section is made of three clauses: assignment, routing, and arguments.

The assignment clause indicates which CallNode produced this value:

` bam <-- argument of <********> call_variants_all(bams, ref_genome) `

Routing clauses, if present, describe a series of additional CallNodes that “route” the value by passing via arguments from parent CallNode to child CallNode, or by results from child CallNode to parent CallNode.

```

<– result of <****> align_reads_all(fastqs, ref_genome) <– <****> align_reads(fastq, ref_genome_2)

```

Argument clauses define the value for each argument in the final CallNode.

```

fastq = <****> File(path=sample4.fastq, hash=********) ref_genome = <****> File(path=ref_genome, hash=********)

```

To build this visualization, the following strategy is used: - Given a starting value (e.g. a VCF file in the example above), walk the

CallGraph backwards (i.e. upstream) to determine relevant nodes. These are call DataflowNodes, which are connected by DataflowEdges.

  • DataflowEdges are then grouped into sections.

  • Each section is then reorganized into a DataflowSectionDOM. A DataflowDOM is the collection of DataflowSectionDOMs. The DOM(document object model) is an intermediate representation that can be rendered in multiple ways.

  • Once a DataflowDOM is created, it can either be rendered into a textual format, or serialized into JSON for the web frontend.

class redun.backends.db.dataflow.ArgumentValue(argument: Argument, value: Value)#

Bases: NamedTuple

A DataflowNode used for tracing one subvalue in an argument.

argument: Argument#

Alias for field number 0

value: Value#

Alias for field number 1

class redun.backends.db.dataflow.CallNodeValue(call_node: CallNode, value: Value)#

Bases: NamedTuple

A DataflowNode used for tracing one subvalue of a CallNode result.

call_node: CallNode#

Alias for field number 0

value: Value#

Alias for field number 1

class redun.backends.db.dataflow.DataflowArg(var_name: str, value: Value)#

Bases: NamedTuple

An argument clause in a Dataflow DOM.

value: Value#

Alias for field number 1

var_name: str#

Alias for field number 0

class redun.backends.db.dataflow.DataflowAssign(var_name: str, prefix: str, node_display: str, node: ArgumentValue | CallNodeValue | CallNode | Value | None)#

Bases: NamedTuple

The assignment clause in a Dataflow DOM.

node: ArgumentValue | CallNodeValue | CallNode | Value | None#

Alias for field number 3

node_display: str#

Alias for field number 2

prefix: str#

Alias for field number 1

var_name: str#

Alias for field number 0

class redun.backends.db.dataflow.DataflowEdge(src: ArgumentValue | CallNodeValue | CallNode | Value, dest: ArgumentValue | CallNodeValue | CallNode | Value | None)#

Bases: NamedTuple

An edge in a Dataflow graph.

dest: ArgumentValue | CallNodeValue | CallNode | Value | None#

Alias for field number 1

src: ArgumentValue | CallNodeValue | CallNode | Value#

Alias for field number 0

class redun.backends.db.dataflow.DataflowRouting(prefix: str, node_display: str, node: ArgumentValue | CallNodeValue | CallNode | Value | None)#

Bases: NamedTuple

A routing clause in a Dataflow DOM.

node: ArgumentValue | CallNodeValue | CallNode | Value | None#

Alias for field number 2

node_display: str#

Alias for field number 1

prefix: str#

Alias for field number 0

class redun.backends.db.dataflow.DataflowSectionDOM(assign: DataflowAssign, routing: List[DataflowRouting], args: List[DataflowArg])#

Bases: NamedTuple

A section in Dataflow DOM.

args: List[DataflowArg]#

Alias for field number 2

assign: DataflowAssign#

Alias for field number 0

routing: List[DataflowRouting]#

Alias for field number 1

class redun.backends.db.dataflow.DataflowSectionKind(value)#

Bases: Enum

Each dataflow section describes either a task call or a data manipulation.

CALL = 'call'#
DATA = 'data'#
class redun.backends.db.dataflow.DataflowVars#

Bases: object

Manages variable names for nodes in a dataflow.

get_task_args(task: Task) List[str]#

Returns the parameter names of a Task.

new_var_name(node: ArgumentValue | CallNodeValue | CallNode | Value, base_var_name: str | None = None) Tuple[str, str | None]#

Get or create a new variable name for a DataflowNode.

redun.backends.db.dataflow.display_call_node(call_node: CallNode, renames: Dict[str, str]) str#

Formats a CallNode to a string.

redun.backends.db.dataflow.display_dataflow(dom: Iterable[DataflowSectionDOM]) Iterator[str]#

Yields for lines displaying a dataflow DOM.

redun.backends.db.dataflow.display_hash(node: ArgumentValue | CallNodeValue | CallNode | Value | None) str#

Formats hash for a DataflowNode.

redun.backends.db.dataflow.display_node(node: ArgumentValue | CallNodeValue | CallNode | Value | None, renames: Dict[str, str]) Tuple[str, str]#

Formats a dataflow node to a string.

redun.backends.db.dataflow.display_section(dom: DataflowSectionDOM) Iterator[str]#

Yields lines for displaying a dataflow section DOM.

redun.backends.db.dataflow.display_value(value: Value) str#

Format a Value to a string.

redun.backends.db.dataflow.get_callnode_arguments(call_node: CallNode) List[Argument]#

Returns a CallNode’s arguments in sorted order.

redun.backends.db.dataflow.get_dataflow_call_node(node: ArgumentValue | CallNodeValue | CallNode | Value | None) CallNode | None#

Returns the CallNode for a DataflowNode.

redun.backends.db.dataflow.get_default_arg_name(pos: int) str#

Generate the default name for argument.

redun.backends.db.dataflow.get_node_hash(node: ArgumentValue | CallNodeValue | CallNode | Value | None) str | None#

Formats hash for a DataflowNode.

redun.backends.db.dataflow.get_section_edge_type(edge: DataflowEdge) str#

Classifies a DataflowEdge.

redun.backends.db.dataflow.get_task_args(task: Task) List[str]#

Returns list of argument names of a Task. Raises a SyntaxError if the task source code is not properly formatted.

redun.backends.db.dataflow.is_internal_task(task: Task) bool#

Returns True if task is an internal redun task.

We skip such tasks in the dataflow to avoid clutter.

redun.backends.db.dataflow.iter_dataflow_sections(dataflow_edges: Iterable[DataflowEdge]) Iterator[Tuple[DataflowSectionKind, List[DataflowEdge]]]#

Yields dataflow sections from an iterable of dataflow edges.

A dataflow section is a group of edges representing one ‘paragraph’ in a dataflow display.

value <– <1234abcd> call_node(arg1, arg2)

<– result of <2345abcd> call_node2(arg3, arg4)

arg3 = <3456abcd> ‘hello_world’ arg4 = <4567abcd> File(‘foo.txt’)

redun.backends.db.dataflow.iter_subsections(section: List[DataflowEdge]) Iterator[List[DataflowEdge]]#

Determines if a section should be broken down into small sections.

In real life dataflows, there are some cases where the dataflow merges such that the structure is a DAG, not just a tree. These merges represent a value that was passed to two or more different tasks and then their outputs eventually combine again, either into a single Value like a list or as arguments into a common task. For example, value is a merge node in the upstream dataflow of result.

value = task0() output1 = task1(a=value) output2 = task2(b=value) result = task3(c=output1, d=output2)

The upstream dataflow graph of result is:

Value(result)

V

CallNode(task3) ————— | |

V V

ArgumentValue(task3, key=a) ArgumentValue(task3, key=b)

V V

CallNode(task1) CallNode(task2)

V V

ArgumentValue(task1, key=c) ArgumentValue(task2, key=d)

V |

CallNodeValue(task0, value) <–/

V

CallNode(task0)

V

Origin

The function iter_dataflow_section() will break this graph right after every the CallNode –> ArgumentValue edge, resulting in three sections. One of those sections will have a “merge node”, CallNode(task0):

ArgumentValue(task1, key=c) ArgumentValue(task2, key=d)

V |

CallNodeValue(task0, value) <–/

V

CallNode(task0)

V

Origin

This function will further break this section into subsections like this:

Subsection 1:

ArgumentValue(task1, key=c)

V

CallNodeValue(task0, value)

Subsection 2:

ArgumentValue(task2, key=d)

V

CallNodeValue(task0, value)

Subsection 3:

CallNodeValue(task0, value)

V

CallNode(task0)

V

Origin

Ultimately these three subsections get rendered as:

c <– c_2

d <– c_2

c_2 <– task()

<– origin

redun.backends.db.dataflow.iter_unique(items: ~typing.Iterable[~redun.backends.db.dataflow.T], key: ~typing.Callable[[~redun.backends.db.dataflow.T], ~typing.Any] = <function <lambda>>) Iterator[T]#

Iterate through unique items.

redun.backends.db.dataflow.make_data_section_dom(section: List[DataflowEdge], dataflow_vars: DataflowVars, new_varname: str = 'value') DataflowSectionDOM#

Returns a DOM for a data section.

A data section describes how one value was constructed from several subvalues. For example this dataflow section:

parent_value <– derives from

subvalue1 = File(path=’file1’) subvalue2 = File(path=’file2’)

corresponds to this kind of code:

subvalue1 = task1() subvalue2 = task2() parent_value = [subvalue1, subvalue2] task3(parent_value)

redun.backends.db.dataflow.make_dataflow_dom(dataflow_edges: Iterable[DataflowEdge], new_varname: str = 'value') Iterable[DataflowSectionDOM]#

Yields dataflow section DOMs from an iterable of dataflow edges.

It also performs variable renaming to give every value a unique variable name.

redun.backends.db.dataflow.make_section_dom(section: List[DataflowEdge], dataflow_vars: DataflowVars, new_varname: str = 'value') DataflowSectionDOM#

Returns DOM for a dataflow section.

redun.backends.db.dataflow.make_var_name(var_name_base: str, name2var: Dict[str, ArgumentValue | CallNodeValue | CallNode | Value], suffix: int = 2) str#

Generate a new variable using a unique suffix (e.g. myvar_2).

redun.backends.db.dataflow.rewrite_call_node_merges(edges: List[DataflowEdge]) List[DataflowEdge]#

Rewrites dataflow graphs to enforce one CallNodeValue per CallNode.

This function identifies CallNode that have multilple parent CallNodeValues like this:

ArgumentValue –> CallNodeValue – V

CallNode (merge_node) ^

ArgumentValue –> CallNodeValue –/

and rewrite them to unify the CallNodeValues like this:

ArgumentValue – V

CallNodeValue –> CallNode ^

ArgumentValue –/

redun.backends.db.dataflow.serialize_dataflow(dom: Iterable[DataflowSectionDOM]) Iterator[dict]#

Serialize DataflowDOM to JSON.

redun.backends.db.dataflow.serialize_section(dom: DataflowSectionDOM) dict#

Serialize a DataflowSectionDOM to JSON.

redun.backends.db.dataflow.toposort_edges(edges: Iterable[DataflowEdge]) Iterator[DataflowEdge]#

Topologically sort DataflowEdges in depth-first order.

redun.backends.db.dataflow.walk_dataflow(backend: RedunBackendDb, init_node: ArgumentValue | CallNodeValue | CallNode | Value) Iterator[DataflowEdge]#

Iterate through all the upstream dataflow edges of a ‘node’ in the CallGraph.

A ‘node’ can be a Value, CallNode, CallNodeValue, or an ArgumentValue.

redun.backends.db.dataflow.walk_dataflow_argument_value(backend: RedunBackendDb, argument_value: ArgumentValue) Iterator[DataflowEdge]#

Iterates through the upstream dataflow edges of an ArgumentValue.

Edge types: - ArgumentValue <– CallNodeValue:

  • Value came from the result of a CallNode.

  • ArgumentValue <– ArgumentValue - Value came from argument of parent CallNode, i.e. argument-to-argument routing.

  • ArgumentValue <– Origin - Value came directly from user or task.

redun.backends.db.dataflow.walk_dataflow_callnode(backend: RedunBackendDb, call_node: CallNode) Iterator[DataflowEdge]#

Iterates through the upstream Arguments of a CallNode.

redun.backends.db.dataflow.walk_dataflow_callnode_value(backend: RedunBackendDb, call_node_value: CallNodeValue) Iterator[DataflowEdge]#

Iterates through the upstream dataflow edges of a CallNodeValue.

The edges either go deeper to the child CallNodes or stay with this CallNode.

redun.backends.db.dataflow.walk_dataflow_node(backend: RedunBackendDb, node: ArgumentValue | CallNodeValue | CallNode | Value) Iterator[DataflowEdge]#

Iterates through the upstream dataflow edges of a any DataflowNode.

redun.backends.db.dataflow.walk_dataflow_value(backend: RedunBackendDb, value: Value) Iterator[DataflowEdge]#

Iterates through the edges in the upstream dataflow graph of a Value.

redun.backends.db.query module#

class redun.backends.db.query.CallGraphQuery(session: Session, joins: Set[str] | None = None, execution_joins: List[Callable[[Query], Query]] | None = None, filters: List | None = None, order_by: str | None = None, filter_types: Set | None = None, executions: Query | None = None, jobs: Query | None = None, call_nodes: Query | None = None, tasks: Query | None = None, values: Query | None = None, value_subqueries: List[Query] | None = None)#

Bases: object

Query class for efficiently querying across all models in a CallGraph.

ExecTag = <AliasedClass at 0x7f88c2543fd0; Tag>#
MODEL_CLASSES = {'CallNode': <class 'redun.backends.db.CallNode'>, 'Execution': <class 'redun.backends.db.Execution'>, 'Job': <class 'redun.backends.db.Job'>, 'Task': <class 'redun.backends.db.Task'>, 'Value': <class 'redun.backends.db.Value'>}#
MODEL_NAMES = ['Execution', 'Job', 'CallNode', 'Task', 'Value']#
MODEL_PKS = {<class 'redun.backends.db.CallNode'>: 'call_hash', <class 'redun.backends.db.Execution'>: 'id', <class 'redun.backends.db.Job'>: 'id', <class 'redun.backends.db.Task'>: 'hash', <class 'redun.backends.db.Value'>: 'value_hash'}#
all() Iterator[Base]#

Yields all records matching query.

build()#

Apply all joins and filters to subqueries.

clone(**kwargs: Any) CallGraphQuery#

Returns a clone of the query with updates specified by kwargs.

count() Iterator[Tuple[str, int]]#

Returns counts for each record type.

empty() CallGraphQuery#

Returns an empty query.

filter_arguments(value_hashes: List[str]) CallGraphQuery#

Filter jobs by argument values.

filter_execution_ids(execution_ids: Iterable[str]) CallGraphQuery#

Filter query by Execution ids.

filter_execution_statuses(execution_statuses: Iterable[str]) CallGraphQuery#

Filter by Execution status.

filter_execution_tags(tags: Iterable[Tuple[str, Any]]) CallGraphQuery#

Filter by tag on executions.

filter_file_paths(paths: Iterable[str]) CallGraphQuery#

Filter by File.path patterns.

paths can contain “*” to perform wildcard matching.

filter_ids(_ids: Iterable[str]) CallGraphQuery#

Filter query by record ids.

filter_job_statuses(job_statuses: Iterable[str]) CallGraphQuery#

Filter by Job status.

filter_results(value_hashes: List[str]) CallGraphQuery#

Filter jobs by result values.

filter_tags(tags: Iterable[Tuple[str, Any]]) CallGraphQuery#

Filter by tags.

filter_task_hashes(task_hashes: Iterable[str]) CallGraphQuery#

Filter by Task hashes.

filter_task_names(task_names: Iterable[str]) CallGraphQuery#

Filter by Task names.

filter_types(types: Iterable[str]) CallGraphQuery#

Filter query by record type.

filter_value_types(value_types: Iterable[str]) CallGraphQuery#

Filter query by Value types.

first() Base | None#

Returns first record if it exists.

like_id(id: str) CallGraphQuery#

Filter query by record id prefix id.

limit(size) Iterator[Base]#

Yields at most size records from query.

one() Base#

Returns exactly one record. Raises error if too few or too many.

one_or_none() Base#

Returns exactly one record. Returns None if too few or too many.

order_by(order_by: str) CallGraphQuery#

Order query.

order_by: str

The only supported value is ‘time’.

page(page: int, page_size: int) Iterator[Base]#

Yields a page-worth of results from query.

page is zero-indexed.

select(*columns: Iterable[str], flat: bool = False) Iterator[Any]#

Select columns to return in query.

property subqueries: Iterable[Tuple[str, Query]]#

Iterates through all subqueries.

Yields:

(type_name, subquery) (Tuple[str, Query])

redun.backends.db.query.find_file(session: Session, path: str) Tuple[File, Job, str] | None#

Find a File by its path.

  • Prefer instance of File as result from a Task. - Amongst result, prefer the most recent.

  • Otherwise, search for File as argument to a Task. - Amongst arguments, prefer the most recent.

  • For both results and arguments, also search whether File was a Subvalue (e.g. a value within a list, dict, etc).

  • We prefer the most recent, since it has the best chance of still being valid.

redun.backends.db.query.infer_id(session: Session, id: str, include_files: bool = True, required: bool = False) Any#

Try to infer the record based on an id prefix.

redun.backends.db.query.infer_specialty_id(session: Session, id: str, include_files: bool = True) Any#

Try to infer the record based on speciality id (e.g. file paths, - shorthand).

redun.backends.db.query.parse_callgraph_query(query: CallGraphQuery, args: Namespace) CallGraphQuery#

Parse an argparse.Namespace into a CallGraphQuery.

redun.backends.db.query.setup_query_parser(parser: ArgumentParser) ArgumentParser#

Add actions to an ArgumentParser to support querying a CallGraph.

redun.backends.db.serializers module#

class redun.backends.db.serializers.CallNodeSerializer(db_version: DBVersionInfo | None = None)#

Bases: Serializer

deserialize(spec: dict) List[Base]#
pk_field: str = 'call_hash'#
serialize(call_node: Base) dict#
serialize_query(query: Query) Iterator[dict]#
class redun.backends.db.serializers.ExecutionSerializer(db_version: DBVersionInfo | None = None)#

Bases: Serializer

deserialize(spec: dict) List[Base]#
pk_field: str = 'id'#
serialize(execution: Base) dict#
class redun.backends.db.serializers.JobSerializer(db_version: DBVersionInfo | None = None)#

Bases: Serializer

deserialize(spec: dict) List[Base]#
pk_field: str = 'id'#
serialize(job: Base) dict#
serialize_query(query: Query) Iterator[dict]#
class redun.backends.db.serializers.RecordSerializer(db_version: DBVersionInfo | None = None)#

Bases: Serializer

deserialize(spec: dict) List[Base]#
get_pk(spec: dict) str#
get_subserializer(type: str) Serializer#
serialize(obj: Base) dict#
serialize_query(query: Query) Iterator[dict]#
class redun.backends.db.serializers.Serializer(db_version: DBVersionInfo | None = None)#

Bases: ABC

Base class serializer for RedunBackendDb records.

deserialize(spec: dict) List[Base]#
get_pk(spec: dict) str#
pk_field: str = ''#
serialize(row: Base) dict#
serialize_query(query: Query) Iterator[dict]#
class redun.backends.db.serializers.TagSerializer(db_version: DBVersionInfo | None = None)#

Bases: Serializer

deserialize(spec: dict) List[Base]#
pk_field: str = 'tag_hash'#
serialize(tag: Base) dict#
serialize_query(query: Query) Iterator[dict]#
class redun.backends.db.serializers.ValueSerializer(db_version: DBVersionInfo | None = None)#

Bases: Serializer

deserialize(spec: dict) List[Base]#
pk_field: str = 'value_hash'#
serialize(value: Base) dict#
serialize_query(query: Query) Iterator[dict]#
redun.backends.db.serializers.deserialize_timestamp(date_str: str | None) datetime | None#

Deserialize timestamp string to datetime value.

redun.backends.db.serializers.serialize_timestamp(timestamp: datetime | None) str | None#

Serialize a datetime value to a timestamp string.

Module contents#

class redun.backends.db.Argument(**kwargs)#

Bases: Base

Input value for a called Task.

arg_hash#
arg_key#
arg_position#
arg_results#
call_hash#
call_node#
upstream#
value#
value_hash#
property value_parsed: Any | None#
class redun.backends.db.ArgumentResult(**kwargs)#

Bases: Base

Many-to-many relationship between results and Arguments.

arg#
arg_hash#
result_call_hash#
result_call_node#
class redun.backends.db.CallEdge(**kwargs)#

Bases: Base

An edge in the CallGraph.

This is a many-to-many table for CallNode.

call_order#
child_id#
child_node#
parent_id#
parent_node#
class redun.backends.db.CallNode(**kwargs)#

Bases: Base

A CallNode in the CallGraph.

arg_results#
property args_display: str#
args_hash#
arguments#
call_hash#
call_hash_idx = Index('ix_call_node_call_hash_vpo', Column('call_hash', String(length=40), table=<call_node>, primary_key=True, nullable=False), unique=True)#
child_edges#
property children: List[CallNode]#
downstream#
jobs#
parent_edges#
property parents: List[CallNode]#
tags#
task#
task_hash#
task_name#
task_set#
timestamp#
value#
value_hash#
property value_parsed: Any | None#
class redun.backends.db.CallSubtreeTask(**kwargs)#

Bases: Base

call_hash#
call_node#
task_hash#
class redun.backends.db.Column(*args, **kwargs)#

Bases: Column

Use non-null Columns by default.

inherit_cache: bool | None = True#

Indicate if this HasCacheKey instance should make use of the cache key generation scheme used by its immediate superclass.

The attribute defaults to None, which indicates that a construct has not yet taken into account whether or not its appropriate for it to participate in caching; this is functionally equivalent to setting the value to False, except that a warning is also emitted.

This flag can be set to True on a particular class, if the SQL that corresponds to the object does not change based on attributes which are local to this class, and not its superclass.

See also

compilerext_caching - General guideslines for setting the HasCacheKey.inherit_cache attribute for third-party or user defined SQL constructs.

class redun.backends.db.DBVersionInfo(migration_id: str, major: int, minor: int, description: str)#

Bases: NamedTuple

A redun repo database version and migration id.

description: str#

Alias for field number 3

major: int#

Alias for field number 1

migration_id: str#

Alias for field number 0

minor: int#

Alias for field number 2

class redun.backends.db.Evaluation(**kwargs)#

Bases: Base

Cache table for evaluations.

eval_hash (hash of task_hash and args_hash) is the cache key, and value_hash is the cached value.

args_hash#
eval_hash#
task#
task_hash#
value#
value_hash#
property value_parsed: Any | None#
class redun.backends.db.Execution(*args, **kwargs)#

Bases: Base

args#
calc_status(result_type: str | None) str#

Compute status from the result_type of the root Job.

property call_node: CallNode | None#
id#
id_idx = Index('ix_execution_id_vpo', Column('id', String(), table=<execution>, primary_key=True, nullable=False), unique=True)#
job#
job_id#
jobs#
property status: str#

Returns the computed status of the Execution.

property status_display: str#

Return status suitable for display in tables.

tags#
property task: Task | None#
class redun.backends.db.File(**kwargs)#

Bases: Base

A File used as a Value by a Task.

parent_values#
path#
value#
value_hash#
property values: List[Value]#
class redun.backends.db.Handle(**kwargs)#

Bases: Base

children#
fullname#
hash#
is_valid#
key#
parent_values#
parents#
value#
value_hash#
property values: List[Value]#
class redun.backends.db.HandleEdge(**kwargs)#

Bases: Base

child_id#
parent_id#
class redun.backends.db.JSON(*args: Any, **kwargs: Any)#

Bases: TypeDecorator

This custom column type allows use of JSON across both sqlite and postgres.

In potsgres, the column acts like a JSONB column. In sqlite, it acts like a string with normalization (e.g. sorted keys, etc) applied to the JSON serialization. This allows us to do exact matching indexing across both sqlite and postgres.

cache_ok: bool | None = True#

Indicate if statements using this ExternalType are “safe to cache”.

The default value None will emit a warning and then not allow caching of a statement which includes this type. Set to False to disable statements using this type from being cached at all without a warning. When set to True, the object’s class and selected elements from its state will be used as part of the cache key. For example, using a TypeDecorator:

class MyType(TypeDecorator):
    impl = String

    cache_ok = True

    def __init__(self, choices):
        self.choices = tuple(choices)
        self.internal_only = True

The cache key for the above type would be equivalent to:

>>> MyType(["a", "b", "c"])._static_cache_key
(<class '__main__.MyType'>, ('choices', ('a', 'b', 'c')))

The caching scheme will extract attributes from the type that correspond to the names of parameters in the __init__() method. Above, the “choices” attribute becomes part of the cache key but “internal_only” does not, because there is no parameter named “internal_only”.

The requirements for cacheable elements is that they are hashable and also that they indicate the same SQL rendered for expressions using this type every time for a given cache value.

To accommodate for datatypes that refer to unhashable structures such as dictionaries, sets and lists, these objects can be made “cacheable” by assigning hashable structures to the attributes whose names correspond with the names of the arguments. For example, a datatype which accepts a dictionary of lookup values may publish this as a sorted series of tuples. Given a previously un-cacheable type as:

class LookupType(UserDefinedType):
    '''a custom type that accepts a dictionary as a parameter.

    this is the non-cacheable version, as "self.lookup" is not
    hashable.

    '''

    def __init__(self, lookup):
        self.lookup = lookup

    def get_col_spec(self, **kw):
        return "VARCHAR(255)"

    def bind_processor(self, dialect):
        # ...  works with "self.lookup" ...

Where “lookup” is a dictionary. The type will not be able to generate a cache key:

>>> type_ = LookupType({"a": 10, "b": 20})
>>> type_._static_cache_key
<stdin>:1: SAWarning: UserDefinedType LookupType({'a': 10, 'b': 20}) will not
produce a cache key because the ``cache_ok`` flag is not set to True.
Set this flag to True if this type object's state is safe to use
in a cache key, or False to disable this warning.
symbol('no_cache')

If we did set up such a cache key, it wouldn’t be usable. We would get a tuple structure that contains a dictionary inside of it, which cannot itself be used as a key in a “cache dictionary” such as SQLAlchemy’s statement cache, since Python dictionaries aren’t hashable:

>>> # set cache_ok = True
>>> type_.cache_ok = True

>>> # this is the cache key it would generate
>>> key = type_._static_cache_key
>>> key
(<class '__main__.LookupType'>, ('lookup', {'a': 10, 'b': 20}))

>>> # however this key is not hashable, will fail when used with
>>> # SQLAlchemy statement cache
>>> some_cache = {key: "some sql value"}
Traceback (most recent call last): File "<stdin>", line 1,
in <module> TypeError: unhashable type: 'dict'

The type may be made cacheable by assigning a sorted tuple of tuples to the “.lookup” attribute:

class LookupType(UserDefinedType):
    '''a custom type that accepts a dictionary as a parameter.

    The dictionary is stored both as itself in a private variable,
    and published in a public variable as a sorted tuple of tuples,
    which is hashable and will also return the same value for any
    two equivalent dictionaries.  Note it assumes the keys and
    values of the dictionary are themselves hashable.

    '''

    cache_ok = True

    def __init__(self, lookup):
        self._lookup = lookup

        # assume keys/values of "lookup" are hashable; otherwise
        # they would also need to be converted in some way here
        self.lookup = tuple(
            (key, lookup[key]) for key in sorted(lookup)
        )

    def get_col_spec(self, **kw):
        return "VARCHAR(255)"

    def bind_processor(self, dialect):
        # ...  works with "self._lookup" ...

Where above, the cache key for LookupType({"a": 10, "b": 20}) will be:

>>> LookupType({"a": 10, "b": 20})._static_cache_key
(<class '__main__.LookupType'>, ('lookup', (('a', 10), ('b', 20))))

New in version 1.4.14: - added the cache_ok flag to allow some configurability of caching for TypeDecorator classes.

New in version 1.4.28: - added the ExternalType mixin which generalizes the cache_ok flag to both the TypeDecorator and UserDefinedType classes.

See also

sql_caching

impl#

alias of String

load_dialect_impl(dialect)#

Return a TypeEngine object corresponding to a dialect.

This is an end-user override hook that can be used to provide differing types depending on the given dialect. It is used by the TypeDecorator implementation of type_engine() to help determine what type should ultimately be returned for a given TypeDecorator.

By default returns self.impl.

process_bind_param(value: Any, dialect)#

Receive a bound parameter value to be converted.

Custom subclasses of _types.TypeDecorator should override this method to provide custom behaviors for incoming data values. This method is called at statement execution time and is passed the literal Python data value which is to be associated with a bound parameter in the statement.

The operation could be anything desired to perform custom behavior, such as transforming or serializing data. This could also be used as a hook for validating logic.

Parameters:
  • value – Data to operate upon, of any type expected by this method in the subclass. Can be None.

  • dialect – the Dialect in use.

See also

types_typedecorator

_types.TypeDecorator.process_result_value()

process_result_value(value: Any, dialect)#

Receive a result-row column value to be converted.

Custom subclasses of _types.TypeDecorator should override this method to provide custom behaviors for data values being received in result rows coming from the database. This method is called at result fetching time and is passed the literal Python data value that’s extracted from a database result row.

The operation could be anything desired to perform custom behavior, such as transforming or deserializing data.

Parameters:
  • value – Data to operate upon, of any type expected by this method in the subclass. Can be None.

  • dialect – the Dialect in use.

See also

types_typedecorator

_types.TypeDecorator.process_bind_param()

class redun.backends.db.Job(**kwargs)#

Bases: Base

cached#
calc_status(result_type: str | None) str#

Calculate Job status from result type.

call_hash#
call_node#
child_jobs#
property context: dict#

Returns the associated context.

property duration: timedelta | None#

Returns duration of the Job or None if Job end_time is not recorded.

end_time#
execution#
execution_id#
id#
id_idx = Index('ix_job_id_vpo', Column('id', String(), table=<job>, primary_key=True, nullable=False), unique=True)#
parent_id#
parent_job#
start_time#
property status: str#

Returns Job status (RUNNING, FAILED, CACHED, DONE).

property status_display: str#

Return status suitable for display in tables.

tags#
task#
task_hash#
class redun.backends.db.PreviewValue(value: Value, size: int)#

Bases: object

A preview value if the value is too large or if there is an error.

class redun.backends.db.RedunBackendDb(db_uri: str | None = None, config: dict | SectionProxy | Config | None = None, logger: Any | None = None, *args: Any, **kwargs: Any)#

Bases: RedunBackend

A database-based Backend for managing the CallGraph (provenance) and cache for a Scheduler.

This backend makes use of SQLAlchemy to provide support for both a sqlite and postgresql databases.

advance_handle(parent_handles: List[Handle], child_handle: Handle) None#

Record parent-child relationships between Handles.

check_cache(task_hash: str, args_hash: str, eval_hash: str, execution_id: str, scheduler_task_hashes: Set[str], cache_scope: CacheScope, check_valid: CacheCheckValid, context_hash: str | None = None, allowed_cache_results: Set[CacheResult] | None = None) Tuple[Any, str | None, CacheResult]#

See parent method.

clone(session: Session | None = None)#

Return a copy of the backend that shares the instantiated database engine

create_engine() Any#

Initializes a database connection.

delete_tags(entity_id: str, tags: Iterable[Tuple[str, Any]], keys: Iterable[str] = ()) List[Tuple[str, str, str, Any]]#

Delete tags.

explain_cache_miss(task: Task, args_hash: str) Dict[str, Any] | None#

Determine the reason for a cache miss.

static get_all_db_versions() List[DBVersionInfo]#

Returns list of all DB versions and their migration ids.

get_call_cache(call_hash: str) Tuple[Any, bool]#

Returns the result of a previously recorded CallNode.

Parameters:

call_hash (str) – Hash of a CallNode.

Returns:

Recorded final result of a CallNode.

Return type:

Any

get_child_record_ids(model_ids: Iterable[Tuple[Base, str]]) Iterable[Tuple[str, Base, str]]#

Iterates through record’s ownership edges.

Used for walking record graph for syncing.

get_db_version() DBVersionInfo#

Returns the current DB version (major, minor).

static get_db_version_required() Tuple[DBVersionInfo, DBVersionInfo]#

Returns the DB version range required by this library.

get_eval_cache(eval_hash: str) Tuple[Any, bool]#

Checks the Evaluation cache for a cached result.

Parameters:

eval_hash (str) – Hash of the task and arguments used for call.

Returns:

(result, is_cached)result is the cached result, or None if no result was found. is_cached is True if cache hit, otherwise is False.

Return type:

Tuple[Any, bool]

get_job(job_id: str) dict | None#

Returns details for a Job.

get_records(ids: Iterable[str], sorted: bool = True) Iterable[dict]#

Returns serialized records for the given ids.

Parameters:
  • ids (Iterable[str]) – Iterable of record ids to fetch serialized records.

  • sorted (bool) – If True, return records in the same order as the ids (Default: True).

get_tags(entity_ids: List[str]) Dict[str, MultiMap[str, Any]]#

Get the tags of an entity (Execution, Job, CallNode, Task, Value).

get_value(value_hash: str) Tuple[Any, bool]#

Returns a Value from the datastore using the value content address (value_hash).

Parameters:

value_hash (str) – Hash of Value to fetch from ValueStore.

Returns:

result, is_cached – Returns the result value and is_cached=True if the value is in the ValueStore, otherwise returns (None, False).

Return type:

Tuple[Any, bool]

has_records(record_ids: Iterable[str]) List[str]#

Returns record_ids that exist in the database.

is_db_compatible() bool#

Returns True if database is compatible with library.

is_valid_handle(handle: Handle) bool#

A handle is valid if it is current or ancestral to the current handle.

iter_record_ids(root_ids: Iterable[str]) Iterator[str]#

Iterate the record ids of descendants of root_ids.

load(migrate: bool | None = None) None#

Load backend database.

For protection, only upgrades are allowed when automigrating. Downgrades could potentially drop data. Users should explicitly downgrade using something like redun db downgrade XXX.

Parameters:

migrate (Optional[bool]) – If None, defer to automigration config options. If True, perform migration after establishing database connection.

migrate(desired_version: DBVersionInfo | None = None, upgrade_only: bool = False) None#

Migrate database to desired version.

Parameters:
  • desired_version (Optional[DBVersionInfo]) – Desired version to update redun database to. If null, update to latest version.

  • upgrade_only (bool) – By default, this function will perform both upgrades and downgrades. Set this to true to prevent downgrades (such as during automigration).

put_records(records: Iterable[dict]) int#

Writes records to the database and returns number of new records written.

record_call_node(task_name: str, task_hash: str, args_hash: str, expr_args: Tuple[Tuple, dict], eval_args: Tuple[Tuple, dict], result_hash: str, child_call_hashes: List[str]) str#

Record a completed CallNode.

Parameters:
  • task_name (str) – Fullname (with namespace) of task.

  • task_hash (str) – Hash of task.

  • args_hash (str) – Hash of all arguments of the call.

  • expr_args (Tuple[Tuple, dict]) – Original expressions for the task arguments. These expressions are used to record the full upstream dataflow.

  • eval_args (Tuple[Tuple, dict]) – The fully evaluated arguments of the task arguments.

  • result_hash (str) – Hash of the result value of the call.

  • child_call_hashes (List[str]) – call_hashes of any child task calls.

Returns:

The call_hash of the new CallNode.

Return type:

str

record_call_node_context(call_hash: str, context_hash: str | None, context: dict) str#

Records a context dict for a CallNode as a Tag.

record_execution(exec_id: str, args: List[str]) None#

Records an Execution to the backend.

Parameters:
  • exec_id (str) – The id of the execution.

  • args (List[str]) – Arguments used on the command line to start Execution.

Returns:

UUID of new Execution.

Return type:

str

record_job_end(job: BaseJob, now: datetime | None = None, status: str | None = None) None#

Records the end of a Job.

Create the job if needed, in which case the job will be recorded with start_time==end_time

record_job_start(job: BaseJob, now: datetime | None = None) Job#

Records the start of a new Job.

record_tags(entity_type: TagEntity, entity_id: str, tags: Iterable[Tuple[str, Any]], parents: Iterable[str] = (), update: bool = False, new: bool = False) List[Tuple[str, str, str, Any]]#

Record tags for an entity (Execution, Job, CallNode, Task, Value).

Parameters:
  • entity_type (TagEntity) – The type of the tagged entity (Execution, Job, etc).

  • entity_id (str) – The id of the tagged entity.

  • tags (Iterable[KeyValue]) – An iterable of key-value pairs to create as tags.

  • parents (Iterable[str]) – Ids of tags to be superseded by the new tags.

  • update (bool) – If True, automatically supersede any existing tags with keys matching those in tags. This also implies new=True.

  • new (bool) – If True, force tags to be current.

Returns:

[(tag_hash, entity_id, key, value)] – Returns a list of the created tags.

Return type:

List[Tuple[str, str, str, Any]]

record_value(value: Any, data: bytes | None = None) str#

Record a Value into the backend.

Parameters:
  • value (Any) – A value to record.

  • data (Optional[bytes]) – Byte stream to record. If not given, usual value serialization is used.

Returns:

value_hash of recorded value.

Return type:

str

rollback_handle(handle: Handle) None#

Rollback all descendant handles.

set_eval_cache(eval_hash: str, task_hash: str, args_hash: str, value: Any, value_hash: str | None = None) None#

Sets a new value in the Evaluation cache.

Parameters:
  • eval_hash (str) – A hash of the combination of the task_hash and args_hash.

  • task_hash (str) – Hash of Task used in the call.

  • args_hash (str) – Hash of all arguments used in the call.

  • value (Any) – Value to record in cache.

  • value_hash (str) – Hash of value to record in cache.

update_tags(entity_type: TagEntity, entity_id: str, old_keys: Iterable[str], new_tags: Iterable[Tuple[str, Any]]) List[Tuple[str, str, str, Any]]#

Update tags.

exception redun.backends.db.RedunDatabaseError#

Bases: Exception

class redun.backends.db.RedunMigration(**kwargs)#

Bases: Base

Migration history of redun database (aka alembic table).

version_num#
class redun.backends.db.RedunSession(backend: RedunBackendDb, *args, **kwargs)#

Bases: Session

Sqlalchemy Session with a reference to the redun backend.

This is used to give Sqlalchemy models access to the redun backend API.

class redun.backends.db.RedunVersion(**kwargs)#

Bases: Base

Version of redun database.

id#
timestamp#
version#
exception redun.backends.db.RedunVersionError#

Bases: RedunDatabaseError

class redun.backends.db.Subvalue(**kwargs)#

Bases: Base

A Value that is a subvalue of another Value.

child#
parent#
parent_value_hash#
value_hash#
class redun.backends.db.Tag(**kwargs)#

Bases: Base

property base_key: str#
call_nodes#
child_edits#
children#
property entity: Execution | Job | CallNode | Task | Value#
entity_id#
entity_type#
executions#
static get_delete_tag() Tag#

Returns a delete tag, which can be used to mark parent tags deleted.

is_current#
jobs#
key#
property namespace: str#
parent_edits#
parents#
tag_hash#
tag_hash_idx = Index('ix_tag_tag_hash_current', Column('tag_hash', String(length=40), table=<tag>, primary_key=True, nullable=False), unique=True)#
tasks#
value#
property value_parsed: Any#

Returns parsed value referenced by Tag.

values#
class redun.backends.db.TagEdit(**kwargs)#

Bases: Base

child#
child_id#
parent#
parent_id#
class redun.backends.db.Task(**kwargs)#

Bases: Base

call_nodes#
evals#
property fullname: str#
hash#
hash_idx = Index('ix_task_hash_vpo', Column('hash', String(length=40), table=<task>, primary_key=True, nullable=False), unique=True)#
jobs#
name#
name_idx = Index('ix_task_name_vpo', Column('name', String(), table=<task>, nullable=False))#
namespace#
namespace_idx = Index('ix_task_namespace_vpo', Column('namespace', String(), table=<task>, nullable=False))#
show_source() None#
source#
tags#
value#
class redun.backends.db.Value(**kwargs)#

Bases: Base

A value used as input (Argument) or output (Result) from a Task.

arguments#
child_edges#
children#
evals#
file#
format#
handle#
property in_value_store: bool#

Returns True if value data is in a ValueStore.

parent_edges#
parents#
property preview: Any#

Returns a deserialized value, or a preview if there is an error or value is too large.

results#
subfiles#
subhandles#
tags#
task#
type#
value#
value_hash#
value_hash_idx = Index('ix_value_value_hash_vpo', Column('value_hash', String(length=40), table=<value>, primary_key=True, nullable=False), unique=True)#
property value_parsed: Any | None#

Returns a deserialized value, or a preview if there is an error.

redun.backends.db.get_abs_path(root_dir: str, path: str) str#

Get the absolute path of path if it is a local path.

redun.backends.db.get_call_node_child_edges(session: Session, ids: Iterable[str]) Iterable[Tuple[str, Base, str]]#
redun.backends.db.get_execution_child_edges(session: Session, ids: Iterable[str]) Iterable[Tuple[str, Base, str]]#
redun.backends.db.get_job_child_edges(session: Session, ids: Iterable[str]) Iterable[Tuple[str, Base, str]]#
redun.backends.db.get_tag_child_edges(session: Session, ids: Iterable[str]) Iterable[Tuple[str, Base, str]]#

Get parents and children of the tags with the supplied ids.

redun.backends.db.get_tag_entity_child_edges(session: Session, ids: Iterable[str]) Iterable[Tuple[str, Base, str]]#

Iterates through the child edges for Tags of a set of entity ids.

redun.backends.db.get_value_child_edges(session: Session, ids: Iterable[str]) Iterable[Tuple[str, Base, str]]#
redun.backends.db.parse_db_version(version_str: str) DBVersionInfo#

Parses a db version string such as “2.0” or “3” into a DbVersionInfo.