redun.backends.db package

Submodules

redun.backends.db.dataflow module

Dataflow visualization.

An upstream dataflow visualization explains the derivation of a value. Take for example this dataflow visualization of the derivation of a VCF file from a bioinformatic analysis:

``` value = File(path=sample4.vcf, hash=********)

value <– <****> call_variants(bam, ref_genome)

bam = <****> File(path=sample4.bam, hash=********) ref_genome = <****> File(path=ref_genome, hash=********)

bam <– argument of <****> call_variants_all(bams, ref_genome)

<– <****> align_reads_all(fastqs, ref_genome) <– <****> align_reads(fastq, ref_genome)

fastq = <****> File(path=sample4.fastq, hash=********) ref_genome = <****> File(path=ref_genome, hash=********)

fastq <– argument of <****> align_reads_all(fastqs, ref_genome)

<– argument of <****> main(fastqs, ref_genome) <– origin

ref_genome <– argument of <****> align_reads_all(fastqs, ref_genome)

<– argument of <****> main(fastqs, ref_genome) <– origin

```

Hash values are indicated by * above. For reference, here is what the workflow might have been:

```

@task() def align_reads(fastq: File, ref_genome: File) -> File:

reads = cast(str, fastq.read()) ref = cast(str, ref_genome.read()) bam = File(fastq.path.replace(“fastq”, “bam”)) bam.write(“align({}, {})”.format(reads, ref)) return bam

@task() def call_variants(bam: File, ref_genome: File) -> File:

align = cast(str, bam.read()) ref = cast(str, ref_genome.read()) vcf = File(bam.path.replace(“bam”, “vcf”)) vcf.write(“calls({}, {})”.format(align, ref)) return vcf

@task() def align_reads_all(fastqs: List[File], ref_genome: File):

bams = [align_reads(fastq, ref_genome) for fastq in fastqs] return bams

@task() def call_variants_all(bams: List[File], ref_genome: File):

vcfs = [call_variants(bam, ref_genome) for bam in bams] return vcfs

@task() def main(fastqs: List[File], ref_genome: File):

bams = align_reads_all(fastqs, ref_genome) vcfs = call_variants_all(bams, ref_genome) return vcfs

```

A dataflow visualization consists of a series of paragraphs called “dataflow sections” that describe how one of the values is derived. Here is the section for the bam value:

``` bam <– argument of <****> call_variants_all(bams, ref_genome)

<– <****> align_reads_all(fastqs, ref_genome) <– <****> align_reads(fastq, ref_genome_2)

fastq = <****> File(path=sample4.fastq, hash=********) ref_genome = <****> File(path=ref_genome, hash=********)

```

A section is made of three clauses: assignment, routing, and arguments.

The assignment clause indicates which CallNode produced this value:

` bam <-- argument of <********> call_variants_all(bams, ref_genome) `

Routing clauses, if present, describe a series of additional CallNodes that “route” the value by passing via arguments from parent CallNode to child CallNode, or by results from child CallNode to parent CallNode.

```

<– result of <****> align_reads_all(fastqs, ref_genome) <– <****> align_reads(fastq, ref_genome_2)

```

Argument clauses define the value for each argument in the final CallNode.

```

fastq = <****> File(path=sample4.fastq, hash=********) ref_genome = <****> File(path=ref_genome, hash=********)

```

To build this visualization, the following strategy is used: - Given a starting value (e.g. a VCF file in the example above), walk the

CallGraph backwards (i.e. upstream) to determine relevant nodes. These are call DataflowNodes, which are connected by DataflowEdges.

  • DataflowEdges are then grouped into sections.

  • Each section is then reorganized into a DataflowSectionDOM. A DataflowDOM is the collection of DataflowSectionDOMs. The DOM(document object model) is an intermediate representation that can be rendered in multiple ways.

  • Once a DataflowDOM is created, it can either be rendered into a textual format, or serialized into JSON for the web frontend.

class redun.backends.db.dataflow.ArgumentValue(argument: Argument, value: Value)

Bases: NamedTuple

A DataflowNode used for tracing one subvalue in an argument.

argument: Argument

Alias for field number 0

value: Value

Alias for field number 1

class redun.backends.db.dataflow.CallNodeValue(call_node: CallNode, value: Value)

Bases: NamedTuple

A DataflowNode used for tracing one subvalue of a CallNode result.

call_node: CallNode

Alias for field number 0

value: Value

Alias for field number 1

class redun.backends.db.dataflow.DataflowArg(var_name: str, value: Value)

Bases: NamedTuple

An argument clause in a Dataflow DOM.

value: Value

Alias for field number 1

var_name: str

Alias for field number 0

class redun.backends.db.dataflow.DataflowAssign(var_name: str, prefix: str, node_display: str, node: ArgumentValue | CallNodeValue | CallNode | Value | None)

Bases: NamedTuple

The assignment clause in a Dataflow DOM.

node: ArgumentValue | CallNodeValue | CallNode | Value | None

Alias for field number 3

node_display: str

Alias for field number 2

prefix: str

Alias for field number 1

var_name: str

Alias for field number 0

class redun.backends.db.dataflow.DataflowEdge(src: ArgumentValue | CallNodeValue | CallNode | Value, dest: ArgumentValue | CallNodeValue | CallNode | Value | None)

Bases: NamedTuple

An edge in a Dataflow graph.

dest: ArgumentValue | CallNodeValue | CallNode | Value | None

Alias for field number 1

src: ArgumentValue | CallNodeValue | CallNode | Value

Alias for field number 0

class redun.backends.db.dataflow.DataflowRouting(prefix: str, node_display: str, node: ArgumentValue | CallNodeValue | CallNode | Value | None)

Bases: NamedTuple

A routing clause in a Dataflow DOM.

node: ArgumentValue | CallNodeValue | CallNode | Value | None

Alias for field number 2

node_display: str

Alias for field number 1

prefix: str

Alias for field number 0

class redun.backends.db.dataflow.DataflowSectionDOM(assign: DataflowAssign, routing: List[DataflowRouting], args: List[DataflowArg])

Bases: NamedTuple

A section in Dataflow DOM.

args: List[DataflowArg]

Alias for field number 2

assign: DataflowAssign

Alias for field number 0

routing: List[DataflowRouting]

Alias for field number 1

class redun.backends.db.dataflow.DataflowSectionKind(value)

Bases: Enum

Each dataflow section describes either a task call or a data manipulation.

CALL = 'call'
DATA = 'data'
class redun.backends.db.dataflow.DataflowVars

Bases: object

Manages variable names for nodes in a dataflow.

get_task_args(task: Task) List[str]

Returns the parameter names of a Task.

new_var_name(node: ArgumentValue | CallNodeValue | CallNode | Value, base_var_name: str | None = None) Tuple[str, str | None]

Get or create a new variable name for a DataflowNode.

redun.backends.db.dataflow.display_call_node(call_node: CallNode, renames: Dict[str, str]) str

Formats a CallNode to a string.

redun.backends.db.dataflow.display_dataflow(dom: Iterable[DataflowSectionDOM]) Iterator[str]

Yields for lines displaying a dataflow DOM.

redun.backends.db.dataflow.display_hash(node: ArgumentValue | CallNodeValue | CallNode | Value | None) str

Formats hash for a DataflowNode.

redun.backends.db.dataflow.display_node(node: ArgumentValue | CallNodeValue | CallNode | Value | None, renames: Dict[str, str]) Tuple[str, str]

Formats a dataflow node to a string.

redun.backends.db.dataflow.display_section(dom: DataflowSectionDOM) Iterator[str]

Yields lines for displaying a dataflow section DOM.

redun.backends.db.dataflow.display_value(value: Value) str

Format a Value to a string.

redun.backends.db.dataflow.get_callnode_arguments(call_node: CallNode) List[Argument]

Returns a CallNode’s arguments in sorted order.

redun.backends.db.dataflow.get_dataflow_call_node(node: ArgumentValue | CallNodeValue | CallNode | Value | None) CallNode | None

Returns the CallNode for a DataflowNode.

redun.backends.db.dataflow.get_default_arg_name(pos: int) str

Generate the default name for argument.

redun.backends.db.dataflow.get_node_hash(node: ArgumentValue | CallNodeValue | CallNode | Value | None) str | None

Formats hash for a DataflowNode.

redun.backends.db.dataflow.get_section_edge_type(edge: DataflowEdge) str

Classifies a DataflowEdge.

redun.backends.db.dataflow.get_task_args(task: Task) List[str]

Returns list of argument names of a Task. Raises a SyntaxError if the task source code is not properly formatted.

redun.backends.db.dataflow.is_internal_task(task: Task) bool

Returns True if task is an internal redun task.

We skip such tasks in the dataflow to avoid clutter.

redun.backends.db.dataflow.iter_dataflow_sections(dataflow_edges: Iterable[DataflowEdge]) Iterator[Tuple[DataflowSectionKind, List[DataflowEdge]]]

Yields dataflow sections from an iterable of dataflow edges.

A dataflow section is a group of edges representing one ‘paragraph’ in a dataflow display.

value <– <1234abcd> call_node(arg1, arg2)

<– result of <2345abcd> call_node2(arg3, arg4)

arg3 = <3456abcd> ‘hello_world’ arg4 = <4567abcd> File(‘foo.txt’)

redun.backends.db.dataflow.iter_subsections(section: List[DataflowEdge]) Iterator[List[DataflowEdge]]

Determines if a section should be broken down into small sections.

In real life dataflows, there are some cases where the dataflow merges such that the structure is a DAG, not just a tree. These merges represent a value that was passed to two or more different tasks and then their outputs eventually combine again, either into a single Value like a list or as arguments into a common task. For example, value is a merge node in the upstream dataflow of result.

value = task0() output1 = task1(a=value) output2 = task2(b=value) result = task3(c=output1, d=output2)

The upstream dataflow graph of result is:

Value(result)

V

CallNode(task3) ————— | |

V V

ArgumentValue(task3, key=a) ArgumentValue(task3, key=b)

V V

CallNode(task1) CallNode(task2)

V V

ArgumentValue(task1, key=c) ArgumentValue(task2, key=d)

V |

CallNodeValue(task0, value) <–/

V

CallNode(task0)

V

Origin

The function iter_dataflow_section() will break this graph right after every the CallNode –> ArgumentValue edge, resulting in three sections. One of those sections will have a “merge node”, CallNode(task0):

ArgumentValue(task1, key=c) ArgumentValue(task2, key=d)

V |

CallNodeValue(task0, value) <–/

V

CallNode(task0)

V

Origin

This function will further break this section into subsections like this:

Subsection 1:

ArgumentValue(task1, key=c)

V

CallNodeValue(task0, value)

Subsection 2:

ArgumentValue(task2, key=d)

V

CallNodeValue(task0, value)

Subsection 3:

CallNodeValue(task0, value)

V

CallNode(task0)

V

Origin

Ultimately these three subsections get rendered as:

c <– c_2

d <– c_2

c_2 <– task()

<– origin

redun.backends.db.dataflow.iter_unique(items: ~typing.Iterable[~redun.backends.db.dataflow.T], key: ~typing.Callable[[~redun.backends.db.dataflow.T], ~typing.Any] = <function <lambda>>) Iterator[T]

Iterate through unique items.

redun.backends.db.dataflow.make_data_section_dom(section: List[DataflowEdge], dataflow_vars: DataflowVars, new_varname: str = 'value') DataflowSectionDOM

Returns a DOM for a data section.

A data section describes how one value was constructed from several subvalues. For example this dataflow section:

parent_value <– derives from

subvalue1 = File(path=’file1’) subvalue2 = File(path=’file2’)

corresponds to this kind of code:

subvalue1 = task1() subvalue2 = task2() parent_value = [subvalue1, subvalue2] task3(parent_value)

redun.backends.db.dataflow.make_dataflow_dom(dataflow_edges: Iterable[DataflowEdge], new_varname: str = 'value') Iterable[DataflowSectionDOM]

Yields dataflow section DOMs from an iterable of dataflow edges.

It also performs variable renaming to give every value a unique variable name.

redun.backends.db.dataflow.make_section_dom(section: List[DataflowEdge], dataflow_vars: DataflowVars, new_varname: str = 'value') DataflowSectionDOM

Returns DOM for a dataflow section.

redun.backends.db.dataflow.make_var_name(var_name_base: str, name2var: Dict[str, ArgumentValue | CallNodeValue | CallNode | Value], suffix: int = 2) str

Generate a new variable using a unique suffix (e.g. myvar_2).

redun.backends.db.dataflow.rewrite_call_node_merges(edges: List[DataflowEdge]) List[DataflowEdge]

Rewrites dataflow graphs to enforce one CallNodeValue per CallNode.

This function identifies CallNode that have multilple parent CallNodeValues like this:

ArgumentValue –> CallNodeValue – V

CallNode (merge_node) ^

ArgumentValue –> CallNodeValue –/

and rewrite them to unify the CallNodeValues like this:

ArgumentValue – V

CallNodeValue –> CallNode ^

ArgumentValue –/

redun.backends.db.dataflow.serialize_dataflow(dom: Iterable[DataflowSectionDOM]) Iterator[dict]

Serialize DataflowDOM to JSON.

redun.backends.db.dataflow.serialize_section(dom: DataflowSectionDOM) dict

Serialize a DataflowSectionDOM to JSON.

redun.backends.db.dataflow.toposort_edges(edges: Iterable[DataflowEdge]) Iterator[DataflowEdge]

Topologically sort DataflowEdges in depth-first order.

redun.backends.db.dataflow.walk_dataflow(backend: RedunBackendDb, init_node: ArgumentValue | CallNodeValue | CallNode | Value) Iterator[DataflowEdge]

Iterate through all the upstream dataflow edges of a ‘node’ in the CallGraph.

A ‘node’ can be a Value, CallNode, CallNodeValue, or an ArgumentValue.

redun.backends.db.dataflow.walk_dataflow_argument_value(backend: RedunBackendDb, argument_value: ArgumentValue) Iterator[DataflowEdge]

Iterates through the upstream dataflow edges of an ArgumentValue.

Edge types: - ArgumentValue <– CallNodeValue:

  • Value came from the result of a CallNode.

  • ArgumentValue <– ArgumentValue - Value came from argument of parent CallNode, i.e. argument-to-argument routing.

  • ArgumentValue <– Origin - Value came directly from user or task.

redun.backends.db.dataflow.walk_dataflow_callnode(backend: RedunBackendDb, call_node: CallNode) Iterator[DataflowEdge]

Iterates through the upstream Arguments of a CallNode.

redun.backends.db.dataflow.walk_dataflow_callnode_value(backend: RedunBackendDb, call_node_value: CallNodeValue) Iterator[DataflowEdge]

Iterates through the upstream dataflow edges of a CallNodeValue.

The edges either go deeper to the child CallNodes or stay with this CallNode.

redun.backends.db.dataflow.walk_dataflow_node(backend: RedunBackendDb, node: ArgumentValue | CallNodeValue | CallNode | Value) Iterator[DataflowEdge]

Iterates through the upstream dataflow edges of a any DataflowNode.

redun.backends.db.dataflow.walk_dataflow_value(backend: RedunBackendDb, value: Value) Iterator[DataflowEdge]

Iterates through the edges in the upstream dataflow graph of a Value.

redun.backends.db.query module

class redun.backends.db.query.CallGraphQuery(session: Session, joins: Set[str] | None = None, execution_joins: List[Callable[[Query], Query]] | None = None, filters: List | None = None, order_by: str | None = None, filter_types: Set | None = None, executions: Query | None = None, jobs: Query | None = None, call_nodes: Query | None = None, tasks: Query | None = None, values: Query | None = None, value_subqueries: List[Query] | None = None)

Bases: object

Query class for efficiently querying across all models in a CallGraph.

ExecTag = <AliasedClass at 0x7f730b44b850; Tag>
MODEL_CLASSES = {'CallNode': <class 'redun.backends.db.CallNode'>, 'Execution': <class 'redun.backends.db.Execution'>, 'Job': <class 'redun.backends.db.Job'>, 'Task': <class 'redun.backends.db.Task'>, 'Value': <class 'redun.backends.db.Value'>}
MODEL_NAMES = ['Execution', 'Job', 'CallNode', 'Task', 'Value']
MODEL_PKS = {<class 'redun.backends.db.CallNode'>: 'call_hash', <class 'redun.backends.db.Execution'>: 'id', <class 'redun.backends.db.Job'>: 'id', <class 'redun.backends.db.Task'>: 'hash', <class 'redun.backends.db.Value'>: 'value_hash'}
all() Iterator[Base]

Yields all records matching query.

build()

Apply all joins and filters to subqueries.

clone(**kwargs: Any) CallGraphQuery

Returns a clone of the query with updates specified by kwargs.

count() Iterator[Tuple[str, int]]

Returns counts for each record type.

empty() CallGraphQuery

Returns an empty query.

filter_arguments(value_hashes: List[str]) CallGraphQuery

Filter jobs by argument values.

filter_execution_ids(execution_ids: Iterable[str]) CallGraphQuery

Filter query by Execution ids.

filter_execution_statuses(execution_statuses: Iterable[str]) CallGraphQuery

Filter by Execution status.

filter_execution_tags(tags: Iterable[Tuple[str, Any]]) CallGraphQuery

Filter by tag on executions.

filter_file_paths(paths: Iterable[str]) CallGraphQuery

Filter by File.path patterns.

paths can contain “*” to perform wildcard matching.

filter_ids(_ids: Iterable[str]) CallGraphQuery

Filter query by record ids.

filter_job_statuses(job_statuses: Iterable[str]) CallGraphQuery

Filter by Job status.

filter_results(value_hashes: List[str]) CallGraphQuery

Filter jobs by result values.

filter_tags(tags: Iterable[Tuple[str, Any]]) CallGraphQuery

Filter by tags.

filter_task_hashes(task_hashes: Iterable[str]) CallGraphQuery

Filter by Task hashes.

filter_task_names(task_names: Iterable[str]) CallGraphQuery

Filter by Task names.

filter_types(types: Iterable[str]) CallGraphQuery

Filter query by record type.

filter_value_types(value_types: Iterable[str]) CallGraphQuery

Filter query by Value types.

first() Base | None

Returns first record if it exists.

like_id(id: str) CallGraphQuery

Filter query by record id prefix id.

limit(size) Iterator[Base]

Yields at most size records from query.

one() Base

Returns exactly one record. Raises error if too few or too many.

one_or_none() Base

Returns exactly one record. Returns None if too few or too many.

order_by(order_by: str) CallGraphQuery

Order query.

order_by: str

The only supported value is ‘time’.

page(page: int, page_size: int) Iterator[Base]

Yields a page-worth of results from query.

page is zero-indexed.

select(*columns: Iterable[str], flat: bool = False) Iterator[Any]

Select columns to return in query.

property subqueries: Iterable[Tuple[str, Query]]

Iterates through all subqueries.

Yields:

(type_name, subquery) (Tuple[str, Query])

redun.backends.db.query.find_file(session: Session, path: str) Tuple[File, Job, str] | None

Find a File by its path.

  • Prefer instance of File as result from a Task. - Amongst result, prefer the most recent.

  • Otherwise, search for File as argument to a Task. - Amongst arguments, prefer the most recent.

  • For both results and arguments, also search whether File was a Subvalue (e.g. a value within a list, dict, etc).

  • We prefer the most recent, since it has the best chance of still being valid.

redun.backends.db.query.infer_id(session: Session, id: str, include_files: bool = True, required: bool = False) Any

Try to infer the record based on an id prefix.

redun.backends.db.query.infer_specialty_id(session: Session, id: str, include_files: bool = True) Any

Try to infer the record based on speciality id (e.g. file paths, - shorthand).

redun.backends.db.query.parse_callgraph_query(query: CallGraphQuery, args: Namespace) CallGraphQuery

Parse an argparse.Namespace into a CallGraphQuery.

redun.backends.db.query.setup_query_parser(parser: ArgumentParser) ArgumentParser

Add actions to an ArgumentParser to support querying a CallGraph.

redun.backends.db.serializers module

class redun.backends.db.serializers.CallNodeSerializer(db_version: DBVersionInfo | None = None)

Bases: Serializer

deserialize(spec: dict) List[Base]
pk_field: str = 'call_hash'
serialize(call_node: Base) dict
serialize_query(query: Query) Iterator[dict]
class redun.backends.db.serializers.ExecutionSerializer(db_version: DBVersionInfo | None = None)

Bases: Serializer

deserialize(spec: dict) List[Base]
pk_field: str = 'id'
serialize(execution: Base) dict
class redun.backends.db.serializers.JobSerializer(db_version: DBVersionInfo | None = None)

Bases: Serializer

deserialize(spec: dict) List[Base]
pk_field: str = 'id'
serialize(job: Base) dict
serialize_query(query: Query) Iterator[dict]
class redun.backends.db.serializers.RecordSerializer(db_version: DBVersionInfo | None = None)

Bases: Serializer

deserialize(spec: dict) List[Base]
get_pk(spec: dict) str
get_subserializer(type: str) Serializer
serialize(obj: Base) dict
serialize_query(query: Query) Iterator[dict]
class redun.backends.db.serializers.Serializer(db_version: DBVersionInfo | None = None)

Bases: ABC

Base class serializer for RedunBackendDb records.

deserialize(spec: dict) List[Base]
get_pk(spec: dict) str
pk_field: str = ''
serialize(row: Base) dict
serialize_query(query: Query) Iterator[dict]
class redun.backends.db.serializers.TagSerializer(db_version: DBVersionInfo | None = None)

Bases: Serializer

deserialize(spec: dict) List[Base]
pk_field: str = 'tag_hash'
serialize(tag: Base) dict
serialize_query(query: Query) Iterator[dict]
class redun.backends.db.serializers.ValueSerializer(db_version: DBVersionInfo | None = None)

Bases: Serializer

deserialize(spec: dict) List[Base]
pk_field: str = 'value_hash'
serialize(value: Base) dict
serialize_query(query: Query) Iterator[dict]
redun.backends.db.serializers.deserialize_timestamp(date_str: str | None) datetime | None

Deserialize timestamp string to datetime value.

redun.backends.db.serializers.serialize_timestamp(timestamp: datetime | None) str | None

Serialize a datetime value to a timestamp string.

Module contents

class redun.backends.db.Argument(**kwargs)

Bases: Base

Input value for a called Task.

arg_hash
arg_key
arg_position
arg_results
call_hash
call_node
upstream
value
value_hash
property value_parsed: Any | None
class redun.backends.db.ArgumentResult(**kwargs)

Bases: Base

Many-to-many relationship between results and Arguments.

arg
arg_hash
result_call_hash
result_call_node
class redun.backends.db.CallEdge(**kwargs)

Bases: Base

An edge in the CallGraph.

This is a many-to-many table for CallNode.

call_order
child_id
child_node
parent_id
parent_node
class redun.backends.db.CallNode(**kwargs)

Bases: Base

A CallNode in the CallGraph.

arg_results
property args_display: str
args_hash
arguments
call_hash
call_hash_idx = Index('ix_call_node_call_hash_vpo', Column('call_hash', String(length=40), table=<call_node>, primary_key=True, nullable=False), unique=True)
child_edges
property children: List[CallNode]
downstream
jobs
parent_edges
property parents: List[CallNode]
tags
task
task_hash
task_name
task_set
timestamp
value
value_hash
property value_parsed: Any | None
class redun.backends.db.CallSubtreeTask(**kwargs)

Bases: Base

call_hash
call_node
task_hash
class redun.backends.db.Column(*args, **kwargs)

Bases: Column

Use non-null Columns by default.

inherit_cache: bool | None = True

Indicate if this HasCacheKey instance should make use of the cache key generation scheme used by its immediate superclass.

The attribute defaults to None, which indicates that a construct has not yet taken into account whether or not its appropriate for it to participate in caching; this is functionally equivalent to setting the value to False, except that a warning is also emitted.

This flag can be set to True on a particular class, if the SQL that corresponds to the object does not change based on attributes which are local to this class, and not its superclass.

See also

compilerext_caching - General guideslines for setting the HasCacheKey.inherit_cache attribute for third-party or user defined SQL constructs.

class redun.backends.db.DBVersionInfo(migration_id: str, major: int, minor: int, description: str)

Bases: NamedTuple

A redun repo database version and migration id.

description: str

Alias for field number 3

major: int

Alias for field number 1

migration_id: str

Alias for field number 0

minor: int

Alias for field number 2

class redun.backends.db.DateTimeUTC(*args: Any, **kwargs: Any)

Bases: TypeDecorator

cache_ok: bool | None = True

Indicate if statements using this ExternalType are “safe to cache”.

The default value None will emit a warning and then not allow caching of a statement which includes this type. Set to False to disable statements using this type from being cached at all without a warning. When set to True, the object’s class and selected elements from its state will be used as part of the cache key. For example, using a TypeDecorator:

class MyType(TypeDecorator):
    impl = String

    cache_ok = True

    def __init__(self, choices):
        self.choices = tuple(choices)
        self.internal_only = True

The cache key for the above type would be equivalent to:

>>> MyType(["a", "b", "c"])._static_cache_key
(<class '__main__.MyType'>, ('choices', ('a', 'b', 'c')))

The caching scheme will extract attributes from the type that correspond to the names of parameters in the __init__() method. Above, the “choices” attribute becomes part of the cache key but “internal_only” does not, because there is no parameter named “internal_only”.

The requirements for cacheable elements is that they are hashable and also that they indicate the same SQL rendered for expressions using this type every time for a given cache value.

To accommodate for datatypes that refer to unhashable structures such as dictionaries, sets and lists, these objects can be made “cacheable” by assigning hashable structures to the attributes whose names correspond with the names of the arguments. For example, a datatype which accepts a dictionary of lookup values may publish this as a sorted series of tuples. Given a previously un-cacheable type as:

class LookupType(UserDefinedType):
    '''a custom type that accepts a dictionary as a parameter.

    this is the non-cacheable version, as "self.lookup" is not
    hashable.

    '''

    def __init__(self, lookup):
        self.lookup = lookup

    def get_col_spec(self, **kw):
        return "VARCHAR(255)"

    def bind_processor(self, dialect):
        # ...  works with "self.lookup" ...

Where “lookup” is a dictionary. The type will not be able to generate a cache key:

>>> type_ = LookupType({"a": 10, "b": 20})
>>> type_._static_cache_key
<stdin>:1: SAWarning: UserDefinedType LookupType({'a': 10, 'b': 20}) will not
produce a cache key because the ``cache_ok`` flag is not set to True.
Set this flag to True if this type object's state is safe to use
in a cache key, or False to disable this warning.
symbol('no_cache')

If we did set up such a cache key, it wouldn’t be usable. We would get a tuple structure that contains a dictionary inside of it, which cannot itself be used as a key in a “cache dictionary” such as SQLAlchemy’s statement cache, since Python dictionaries aren’t hashable:

>>> # set cache_ok = True
>>> type_.cache_ok = True

>>> # this is the cache key it would generate
>>> key = type_._static_cache_key
>>> key
(<class '__main__.LookupType'>, ('lookup', {'a': 10, 'b': 20}))

>>> # however this key is not hashable, will fail when used with
>>> # SQLAlchemy statement cache
>>> some_cache = {key: "some sql value"}
Traceback (most recent call last): File "<stdin>", line 1,
in <module> TypeError: unhashable type: 'dict'

The type may be made cacheable by assigning a sorted tuple of tuples to the “.lookup” attribute:

class LookupType(UserDefinedType):
    '''a custom type that accepts a dictionary as a parameter.

    The dictionary is stored both as itself in a private variable,
    and published in a public variable as a sorted tuple of tuples,
    which is hashable and will also return the same value for any
    two equivalent dictionaries.  Note it assumes the keys and
    values of the dictionary are themselves hashable.

    '''

    cache_ok = True

    def __init__(self, lookup):
        self._lookup = lookup

        # assume keys/values of "lookup" are hashable; otherwise
        # they would also need to be converted in some way here
        self.lookup = tuple(
            (key, lookup[key]) for key in sorted(lookup)
        )

    def get_col_spec(self, **kw):
        return "VARCHAR(255)"

    def bind_processor(self, dialect):
        # ...  works with "self._lookup" ...

Where above, the cache key for LookupType({"a": 10, "b": 20}) will be:

>>> LookupType({"a": 10, "b": 20})._static_cache_key
(<class '__main__.LookupType'>, ('lookup', (('a', 10), ('b', 20))))

Added in version 1.4.14: - added the cache_ok flag to allow some configurability of caching for TypeDecorator classes.

Added in version 1.4.28: - added the ExternalType mixin which generalizes the cache_ok flag to both the TypeDecorator and UserDefinedType classes.

See also

sql_caching

impl: TypeEngine[Any] | Type[TypeEngine[Any]] = DateTime(timezone=True)
process_bind_param(value: Any, dialect)

Receive a bound parameter value to be converted.

Custom subclasses of _types.TypeDecorator should override this method to provide custom behaviors for incoming data values. This method is called at statement execution time and is passed the literal Python data value which is to be associated with a bound parameter in the statement.

The operation could be anything desired to perform custom behavior, such as transforming or serializing data. This could also be used as a hook for validating logic.

Parameters:
  • value – Data to operate upon, of any type expected by this method in the subclass. Can be None.

  • dialect – the Dialect in use.

See also

types_typedecorator

_types.TypeDecorator.process_result_value()

process_result_value(value: Any, dialect)

Receive a result-row column value to be converted.

Custom subclasses of _types.TypeDecorator should override this method to provide custom behaviors for data values being received in result rows coming from the database. This method is called at result fetching time and is passed the literal Python data value that’s extracted from a database result row.

The operation could be anything desired to perform custom behavior, such as transforming or deserializing data.

Parameters:
  • value – Data to operate upon, of any type expected by this method in the subclass. Can be None.

  • dialect – the Dialect in use.

See also

types_typedecorator

_types.TypeDecorator.process_bind_param()

class redun.backends.db.Evaluation(**kwargs)

Bases: Base

Cache table for evaluations.

eval_hash (hash of task_hash and args_hash) is the cache key, and value_hash is the cached value.

args_hash
eval_hash
task
task_hash
value
value_hash
property value_parsed: Any | None
class redun.backends.db.Execution(*args, **kwargs)

Bases: Base

args
calc_status(result_type: str | None) str

Compute status from the result_type of the root Job.

property call_node: CallNode | None
id
id_idx = Index('ix_execution_id_vpo', Column('id', String(), table=<execution>, primary_key=True, nullable=False), unique=True)
job
job_id
jobs
property status: str

Returns the computed status of the Execution.

property status_display: str

Return status suitable for display in tables.

tags
property task: Task | None
updated_time
class redun.backends.db.File(**kwargs)

Bases: Base

A File used as a Value by a Task.

parent_values
path
value
value_hash
property values: List[Value]
class redun.backends.db.Handle(**kwargs)

Bases: Base

children
fullname
hash
is_valid
key
parent_values
parents
value
value_hash
property values: List[Value]
class redun.backends.db.HandleEdge(**kwargs)

Bases: Base

child_id
parent_id
class redun.backends.db.JSON(*args: Any, **kwargs: Any)

Bases: TypeDecorator

This custom column type allows use of JSON across both sqlite and postgres.

In potsgres, the column acts like a JSONB column. In sqlite, it acts like a string with normalization (e.g. sorted keys, etc) applied to the JSON serialization. This allows us to do exact matching indexing across both sqlite and postgres.

cache_ok: bool | None = True

Indicate if statements using this ExternalType are “safe to cache”.

The default value None will emit a warning and then not allow caching of a statement which includes this type. Set to False to disable statements using this type from being cached at all without a warning. When set to True, the object’s class and selected elements from its state will be used as part of the cache key. For example, using a TypeDecorator:

class MyType(TypeDecorator):
    impl = String

    cache_ok = True

    def __init__(self, choices):
        self.choices = tuple(choices)
        self.internal_only = True

The cache key for the above type would be equivalent to:

>>> MyType(["a", "b", "c"])._static_cache_key
(<class '__main__.MyType'>, ('choices', ('a', 'b', 'c')))

The caching scheme will extract attributes from the type that correspond to the names of parameters in the __init__() method. Above, the “choices” attribute becomes part of the cache key but “internal_only” does not, because there is no parameter named “internal_only”.

The requirements for cacheable elements is that they are hashable and also that they indicate the same SQL rendered for expressions using this type every time for a given cache value.

To accommodate for datatypes that refer to unhashable structures such as dictionaries, sets and lists, these objects can be made “cacheable” by assigning hashable structures to the attributes whose names correspond with the names of the arguments. For example, a datatype which accepts a dictionary of lookup values may publish this as a sorted series of tuples. Given a previously un-cacheable type as:

class LookupType(UserDefinedType):
    '''a custom type that accepts a dictionary as a parameter.

    this is the non-cacheable version, as "self.lookup" is not
    hashable.

    '''

    def __init__(self, lookup):
        self.lookup = lookup

    def get_col_spec(self, **kw):
        return "VARCHAR(255)"

    def bind_processor(self, dialect):
        # ...  works with "self.lookup" ...

Where “lookup” is a dictionary. The type will not be able to generate a cache key:

>>> type_ = LookupType({"a": 10, "b": 20})
>>> type_._static_cache_key
<stdin>:1: SAWarning: UserDefinedType LookupType({'a': 10, 'b': 20}) will not
produce a cache key because the ``cache_ok`` flag is not set to True.
Set this flag to True if this type object's state is safe to use
in a cache key, or False to disable this warning.
symbol('no_cache')

If we did set up such a cache key, it wouldn’t be usable. We would get a tuple structure that contains a dictionary inside of it, which cannot itself be used as a key in a “cache dictionary” such as SQLAlchemy’s statement cache, since Python dictionaries aren’t hashable:

>>> # set cache_ok = True
>>> type_.cache_ok = True

>>> # this is the cache key it would generate
>>> key = type_._static_cache_key
>>> key
(<class '__main__.LookupType'>, ('lookup', {'a': 10, 'b': 20}))

>>> # however this key is not hashable, will fail when used with
>>> # SQLAlchemy statement cache
>>> some_cache = {key: "some sql value"}
Traceback (most recent call last): File "<stdin>", line 1,
in <module> TypeError: unhashable type: 'dict'

The type may be made cacheable by assigning a sorted tuple of tuples to the “.lookup” attribute:

class LookupType(UserDefinedType):
    '''a custom type that accepts a dictionary as a parameter.

    The dictionary is stored both as itself in a private variable,
    and published in a public variable as a sorted tuple of tuples,
    which is hashable and will also return the same value for any
    two equivalent dictionaries.  Note it assumes the keys and
    values of the dictionary are themselves hashable.

    '''

    cache_ok = True

    def __init__(self, lookup):
        self._lookup = lookup

        # assume keys/values of "lookup" are hashable; otherwise
        # they would also need to be converted in some way here
        self.lookup = tuple(
            (key, lookup[key]) for key in sorted(lookup)
        )

    def get_col_spec(self, **kw):
        return "VARCHAR(255)"

    def bind_processor(self, dialect):
        # ...  works with "self._lookup" ...

Where above, the cache key for LookupType({"a": 10, "b": 20}) will be:

>>> LookupType({"a": 10, "b": 20})._static_cache_key
(<class '__main__.LookupType'>, ('lookup', (('a', 10), ('b', 20))))

Added in version 1.4.14: - added the cache_ok flag to allow some configurability of caching for TypeDecorator classes.

Added in version 1.4.28: - added the ExternalType mixin which generalizes the cache_ok flag to both the TypeDecorator and UserDefinedType classes.

See also

sql_caching

impl

alias of String

load_dialect_impl(dialect)

Return a TypeEngine object corresponding to a dialect.

This is an end-user override hook that can be used to provide differing types depending on the given dialect. It is used by the TypeDecorator implementation of type_engine() to help determine what type should ultimately be returned for a given TypeDecorator.

By default returns self.impl.

process_bind_param(value: Any, dialect)

Receive a bound parameter value to be converted.

Custom subclasses of _types.TypeDecorator should override this method to provide custom behaviors for incoming data values. This method is called at statement execution time and is passed the literal Python data value which is to be associated with a bound parameter in the statement.

The operation could be anything desired to perform custom behavior, such as transforming or serializing data. This could also be used as a hook for validating logic.

Parameters:
  • value – Data to operate upon, of any type expected by this method in the subclass. Can be None.

  • dialect – the Dialect in use.

See also

types_typedecorator

_types.TypeDecorator.process_result_value()

process_result_value(value: Any, dialect)

Receive a result-row column value to be converted.

Custom subclasses of _types.TypeDecorator should override this method to provide custom behaviors for data values being received in result rows coming from the database. This method is called at result fetching time and is passed the literal Python data value that’s extracted from a database result row.

The operation could be anything desired to perform custom behavior, such as transforming or deserializing data.

Parameters:
  • value – Data to operate upon, of any type expected by this method in the subclass. Can be None.

  • dialect – the Dialect in use.

See also

types_typedecorator

_types.TypeDecorator.process_bind_param()

class redun.backends.db.Job(**kwargs)

Bases: Base

cached
calc_status(result_type: str | None) str

Calculate Job status from result type.

call_hash
call_node
child_jobs
property context: dict

Returns the associated context.

property duration: timedelta | None

Returns duration of the Job or None if Job end_time is not recorded.

end_time
execution
execution_id
id
id_idx = Index('ix_job_id_vpo', Column('id', String(), table=<job>, primary_key=True, nullable=False), unique=True)
parent_id
parent_job
start_time
property status: str

Returns Job status (RUNNING, FAILED, CACHED, DONE).

property status_display: str

Return status suitable for display in tables.

tags
task
task_hash
class redun.backends.db.PreviewValue(value: Value, size: int)

Bases: object

A preview value if the value is too large or if there is an error.

class redun.backends.db.RedunBackendDb(db_uri: str | None = None, config: dict | SectionProxy | Config | None = None, logger: Any | None = None, *args: Any, **kwargs: Any)

Bases: RedunBackend

A database-based Backend for managing the CallGraph (provenance) and cache for a Scheduler.

This backend makes use of SQLAlchemy to provide support for both a sqlite and postgresql databases.

advance_handle(parent_handles: List[Handle], child_handle: Handle) None

Record parent-child relationships between Handles.

check_cache(task_hash: str, args_hash: str, eval_hash: str, execution_id: str, scheduler_task_hashes: Set[str], cache_scope: CacheScope, check_valid: CacheCheckValid, context_hash: str | None = None, allowed_cache_results: Set[CacheResult] | None = None) Tuple[Any, str | None, CacheResult]

See parent method.

clone(session: Session | None = None)

Return a copy of the backend that shares the instantiated database engine

create_engine() Any

Initializes a database connection.

delete_tags(entity_id: str, tags: Iterable[Tuple[str, Any]], keys: Iterable[str] = ()) List[Tuple[str, str, str, Any]]

Delete tags.

explain_cache_miss(task: Task, args_hash: str) Dict[str, Any] | None

Determine the reason for a cache miss.

static get_all_db_versions() List[DBVersionInfo]

Returns list of all DB versions and their migration ids.

get_call_cache(call_hash: str) Tuple[Any, bool]

Returns the result of a previously recorded CallNode.

Parameters:

call_hash (str) – Hash of a CallNode.

Returns:

Recorded final result of a CallNode.

Return type:

Any

get_child_record_ids(model_ids: Iterable[Tuple[Base, str]]) Iterable[Tuple[str, Base, str]]

Iterates through record’s ownership edges.

Used for walking record graph for syncing.

get_db_version() DBVersionInfo

Returns the current DB version (major, minor).

static get_db_version_required() Tuple[DBVersionInfo, DBVersionInfo]

Returns the DB version range required by this library.

get_eval_cache(eval_hash: str) Tuple[Any, bool]

Checks the Evaluation cache for a cached result.

Parameters:

eval_hash (str) – Hash of the task and arguments used for call.

Returns:

(result, is_cached)result is the cached result, or None if no result was found. is_cached is True if cache hit, otherwise is False.

Return type:

Tuple[Any, bool]

get_job(job_id: str) dict | None

Returns details for a Job.

get_records(ids: Iterable[str], sorted: bool = True) Iterable[dict]

Returns serialized records for the given ids.

Parameters:
  • ids (Iterable[str]) – Iterable of record ids to fetch serialized records.

  • sorted (bool) – If True, return records in the same order as the ids (Default: True).

get_tags(entity_ids: List[str]) Dict[str, MultiMap[str, Any]]

Get the tags of an entity (Execution, Job, CallNode, Task, Value).

get_value(value_hash: str) Tuple[Any, bool]

Returns a Value from the datastore using the value content address (value_hash).

Parameters:

value_hash (str) – Hash of Value to fetch from ValueStore.

Returns:

result, is_cached – Returns the result value and is_cached=True if the value is in the ValueStore, otherwise returns (None, False).

Return type:

Tuple[Any, bool]

has_records(record_ids: Iterable[str]) List[str]

Returns record_ids that exist in the database.

is_db_compatible() bool

Returns True if database is compatible with library.

is_valid_handle(handle: Handle) bool

A handle is valid if it is current or ancestral to the current handle.

iter_record_ids(root_ids: Iterable[str]) Iterator[str]

Iterate the record ids of descendants of root_ids.

load(migrate: bool | None = None) None

Load backend database.

For protection, only upgrades are allowed when automigrating. Downgrades could potentially drop data. Users should explicitly downgrade using something like redun db downgrade XXX.

Parameters:

migrate (Optional[bool]) – If None, defer to automigration config options. If True, perform migration after establishing database connection.

migrate(desired_version: DBVersionInfo | None = None, upgrade_only: bool = False) None

Migrate database to desired version.

Parameters:
  • desired_version (Optional[DBVersionInfo]) – Desired version to update redun database to. If null, update to latest version.

  • upgrade_only (bool) – By default, this function will perform both upgrades and downgrades. Set this to true to prevent downgrades (such as during automigration).

put_records(records: Iterable[dict]) int

Writes records to the database and returns number of new records written.

record_call_node(task_name: str, task_hash: str, args_hash: str, expr_args: Tuple[Tuple, dict], eval_args: Tuple[Tuple, dict], result_hash: str, child_call_hashes: List[str]) str

Record a completed CallNode.

Parameters:
  • task_name (str) – Fullname (with namespace) of task.

  • task_hash (str) – Hash of task.

  • args_hash (str) – Hash of all arguments of the call.

  • expr_args (Tuple[Tuple, dict]) – Original expressions for the task arguments. These expressions are used to record the full upstream dataflow.

  • eval_args (Tuple[Tuple, dict]) – The fully evaluated arguments of the task arguments.

  • result_hash (str) – Hash of the result value of the call.

  • child_call_hashes (List[str]) – call_hashes of any child task calls.

Returns:

The call_hash of the new CallNode.

Return type:

str

record_call_node_context(call_hash: str, context_hash: str | None, context: dict) str

Records a context dict for a CallNode as a Tag.

record_execution(exec_id: str, args: List[str]) None

Records an Execution to the backend.

Parameters:
  • exec_id (str) – The id of the execution.

  • args (List[str]) – Arguments used on the command line to start Execution.

Returns:

UUID of new Execution.

Return type:

str

record_job_end(job: BaseJob, now: datetime | None = None, status: str | None = None) None

Records the end of a Job.

Create the job if needed, in which case the job will be recorded with start_time==end_time

record_job_start(job: BaseJob, now: datetime | None = None) Job

Records the start of a new Job.

record_tags(entity_type: TagEntity, entity_id: str, tags: Iterable[Tuple[str, Any]], parents: Iterable[str] = (), update: bool = False, new: bool = False) List[Tuple[str, str, str, Any]]

Record tags for an entity (Execution, Job, CallNode, Task, Value).

Parameters:
  • entity_type (TagEntity) – The type of the tagged entity (Execution, Job, etc).

  • entity_id (str) – The id of the tagged entity.

  • tags (Iterable[KeyValue]) – An iterable of key-value pairs to create as tags.

  • parents (Iterable[str]) – Ids of tags to be superseded by the new tags.

  • update (bool) – If True, automatically supersede any existing tags with keys matching those in tags. This also implies new=True.

  • new (bool) – If True, force tags to be current.

Returns:

[(tag_hash, entity_id, key, value)] – Returns a list of the created tags.

Return type:

List[Tuple[str, str, str, Any]]

record_updated_time() None

Updates updated_time (heartbeat) timestamp for the current Execution.

record_value(value: Any, data: bytes | None = None) str

Record a Value into the backend.

Parameters:
  • value (Any) – A value to record.

  • data (Optional[bytes]) – Byte stream to record. If not given, usual value serialization is used.

Returns:

value_hash of recorded value.

Return type:

str

rollback_handle(handle: Handle) None

Rollback all descendant handles.

set_eval_cache(eval_hash: str, task_hash: str, args_hash: str, value: Any, value_hash: str | None = None) None

Sets a new value in the Evaluation cache.

Parameters:
  • eval_hash (str) – A hash of the combination of the task_hash and args_hash.

  • task_hash (str) – Hash of Task used in the call.

  • args_hash (str) – Hash of all arguments used in the call.

  • value (Any) – Value to record in cache.

  • value_hash (str) – Hash of value to record in cache.

update_tags(entity_type: TagEntity, entity_id: str, old_keys: Iterable[str], new_tags: Iterable[Tuple[str, Any]]) List[Tuple[str, str, str, Any]]

Update tags.

exception redun.backends.db.RedunDatabaseError

Bases: Exception

class redun.backends.db.RedunMigration(**kwargs)

Bases: Base

Migration history of redun database (aka alembic table).

version_num
class redun.backends.db.RedunSession(backend: RedunBackendDb, *args, **kwargs)

Bases: Session

Sqlalchemy Session with a reference to the redun backend.

This is used to give Sqlalchemy models access to the redun backend API.

class redun.backends.db.RedunVersion(**kwargs)

Bases: Base

Version of redun database.

id
timestamp
version
exception redun.backends.db.RedunVersionError

Bases: RedunDatabaseError

class redun.backends.db.Subvalue(**kwargs)

Bases: Base

A Value that is a subvalue of another Value.

child
parent
parent_value_hash
value_hash
class redun.backends.db.Tag(**kwargs)

Bases: Base

property base_key: str
call_nodes
child_edits
children
property entity: Execution | Job | CallNode | Task | Value
entity_id
entity_type
executions
static get_delete_tag() Tag

Returns a delete tag, which can be used to mark parent tags deleted.

is_current
jobs
key
property namespace: str
parent_edits
parents
tag_hash
tag_hash_idx = Index('ix_tag_tag_hash_current', Column('tag_hash', String(length=40), table=<tag>, primary_key=True, nullable=False), unique=True)
tasks
value
property value_parsed: Any

Returns parsed value referenced by Tag.

values
class redun.backends.db.TagEdit(**kwargs)

Bases: Base

child
child_id
parent
parent_id
class redun.backends.db.Task(**kwargs)

Bases: Base

call_nodes
evals
property fullname: str
hash
hash_idx = Index('ix_task_hash_vpo', Column('hash', String(length=40), table=<task>, primary_key=True, nullable=False), unique=True)
jobs
name
name_idx = Index('ix_task_name_vpo', Column('name', String(), table=<task>, nullable=False))
namespace
namespace_idx = Index('ix_task_namespace_vpo', Column('namespace', String(), table=<task>, nullable=False))
show_source() None
source
tags
value
class redun.backends.db.Value(**kwargs)

Bases: Base

A value used as input (Argument) or output (Result) from a Task.

arguments
child_edges
children
evals
file
format
handle
property in_value_store: bool

Returns True if value data is in a ValueStore.

parent_edges
parents
property preview: Any

Returns a deserialized value, or a preview if there is an error or value is too large.

results
subfiles
subhandles
tags
task
type
value
value_hash
value_hash_idx = Index('ix_value_value_hash_vpo', Column('value_hash', String(length=40), table=<value>, primary_key=True, nullable=False), unique=True)
property value_parsed: Any | None

Returns a deserialized value, or a preview if there is an error.

redun.backends.db.get_abs_path(root_dir: str, path: str) str

Get the absolute path of path if it is a local path.

redun.backends.db.get_call_node_child_edges(session: Session, ids: Iterable[str]) Iterable[Tuple[str, Base, str]]
redun.backends.db.get_execution_child_edges(session: Session, ids: Iterable[str]) Iterable[Tuple[str, Base, str]]
redun.backends.db.get_job_child_edges(session: Session, ids: Iterable[str]) Iterable[Tuple[str, Base, str]]
redun.backends.db.get_tag_child_edges(session: Session, ids: Iterable[str]) Iterable[Tuple[str, Base, str]]

Get parents and children of the tags with the supplied ids.

redun.backends.db.get_tag_entity_child_edges(session: Session, ids: Iterable[str]) Iterable[Tuple[str, Base, str]]

Iterates through the child edges for Tags of a set of entity ids.

redun.backends.db.get_value_child_edges(session: Session, ids: Iterable[str]) Iterable[Tuple[str, Base, str]]
redun.backends.db.parse_db_version(version_str: str) DBVersionInfo

Parses a db version string such as “2.0” or “3” into a DbVersionInfo.