dlairflow API

dlairflow

Reusable code components for building Apache Airflow® DAGs.

dlairflow.load

Tasks that involve ingesting data.

dlairflow.load.load_table_with_fits2db(connection, schema, table, load_dir)[source]

Create a task to load a database table with fits2db.

This function assumes that a FITS file is defined by:

f"{load_dir}/{schema}.{table}.fits"

This function also assumes that fits2db and psql are available in the PATH seen by the Airflow jobs.

Parameters:
  • connection (str) – An Airflow database connection string. This is needed to set environment variables.

  • schema (str) – The schema in which table is defined.

  • table (str) – The name of the table.

  • load_dir (str) – FITS file to load is in this directory.

Returns:

A BashOperator that will execute fits2db.

Return type:

BashOperator

dlairflow.meta

Tasks that involve metadata, verification, etc.

dlairflow.meta.fitsverify(filename)[source]

Run fitsverify on filename.

Parameters:

filename (str) – Name of a FITS file to verify.

Returns:

A BashOperator that will execute fitsverify.

Return type:

BashOperator

dlairflow.meta.get(source, item)[source]

Obtain metadata about item from source.

Parameters:
  • source (str) – The name of the metadata source. This could be a Felis YAML file or a database connection ID.

  • item (str) – What metadata to extract. See the Notes below for the format of this string.

Returns:

A Felis datamodel object containing the metadata.

Return type:

Schema or Table or Column

Raises:

ValueError – If item does not match the expected format.

Notes

Formats for item:

name1

The metadata associated with the entire schema ‘name1’ will be extracted.

name1.name2

The metadata associated with table ‘name2’ in schema ‘name1’ will be extracted.

name1.name2.name3

The metadata associated with column ‘name3’ in table ‘name2’ in schema ‘name1’ will be extracted.

dlairflow.postgresql

Standard tasks for working with PostgreSQL that can be imported into a DAG.

dlairflow.postgresql._PostgresOperatorWrapper(**kwargs)[source]

Handle different call signatures for PostgresOperator in different versions of Airflow.

dlairflow.postgresql._connection_to_environment(connection)[source]

Convert a database connection to environment variables.

Parameters:

connection (str) – An Airflow database connection string.

Returns:

A dictionary suitable for passing to the env keyword on, e.g. BashOperator.

Return type:

dict

dlairflow.postgresql.index_columns(connection, schema, table, columns, tablespace=None, overwrite=False)[source]

Create “generic” indexes for a set of columns

Parameters:
  • connection (str) – An Airflow database connection string.

  • schema (str) – The name of the database schema.

  • table (str) – The name of the table in schema.

  • columns (list) – A list of columns to index. See below for the possible entries in the list of columns.

  • tablespace (str, optional) – Create the indexes in a specific tablespace if set.

  • overwrite (bool, optional) – If True replace any existing SQL template file.

Returns:

A task to create several indexes.

Return type:

SQLExecuteQueryOperator

Notes

columns may be a list containing multiple types:

  • str: create an index on one column.

  • tuple: create an index on the set of columns in the tuple.

  • dict: create a function index. The key is the name of the function and the value is the column that is the argument to the function.

  • Any other type in columns will be ignored.

dlairflow.postgresql.pg_dump_schema(connection, schema, dump_dir)[source]

Dump an entire database schema using pg_dump.

Parameters:
  • connection (str) – An Airflow database connection string.

  • schema (str) – The name of the database schema.

  • dump_dir (str) – Place the dump file in this directory.

Returns:

A BashOperator that will execute pg_dump.

Return type:

BashOperator

dlairflow.postgresql.pg_restore_schema(connection, schema, dump_dir)[source]

Restore a database schema using pg_restore.

Parameters:
  • connection (str) – An Airflow database connection string.

  • schema (str) – The name of the database schema.

  • dump_dir (str) – Find the dump file in this directory.

Returns:

A BashOperator that will execute pg_restore.

Return type:

BashOperator

dlairflow.postgresql.primary_key(connection, schema, primary_keys, tablespace=None, overwrite=False)[source]

Create a primary key on one or more tables in schema.

Parameters:
  • connection (str) – An Airflow database connection string.

  • schema (str) – The name of the database schema.

  • primary_keys (dict) – A dictionary containing the of the table in schema mapped to the primary key column(s). See below for details.

  • tablespace (str, optional) – Create the indexes in a specific tablespace if set.

  • overwrite (bool, optional) – If True replace any existing SQL template file.

Returns:

A task to create a primary key.

Return type:

SQLExecuteQueryOperator

Notes

primary_keys may be a dict containing multiple types:

  • The key is the table name within schema.

  • The value can be:

    • str: create a primary key on one column.

    • tuple: create a primary key on the set of columns in the tuple.

    • Any other type will be ignored.

dlairflow.postgresql.q3c_index(connection, schema, table, ra='ra', dec='dec', tablespace=None, overwrite=False)[source]

Create a q3c index on schema.`table`.

Parameters:
  • connection (str) – An Airflow database connection string.

  • schema (str) – The name of the database schema.

  • table (str) – The name of the table in schema.

  • ra (str, optional) – Name of the column containing Right Ascension, default ‘ra’.

  • dec (str, optional) – Name of the column containing Declination, default ‘dec’.

  • tablespace (str, optional) – Create the index in a specific tablespace if set.

  • overwrite (bool, optional) – If True replace any existing SQL template file.

Returns:

A task to create a q3c index.

Return type:

SQLExecuteQueryOperator

dlairflow.postgresql.truncate_table(connection, schema, table, restart=False, cascade=False, overwrite=False)[source]

Run TRUNCATE TABLE on one or more tables in schema.

Parameters:
  • connection (str) – An Airflow database connection string.

  • schema (str) – The name of the database schema.

  • table (str or list) – The table(s) to operate on.

  • restart (bool, optional) – If True, any sequences associated with columns in the table(s) will be reset. The default is not to reset such sequences.

  • cascade (bool, optional) – If True, the TRUNCATE command will also truncate tables connected by foreign key relationships. This is extrememly dangerous!

  • overwrite (bool, optional) – If True, replace any existing SQL template file.

Returns:

A task to run a TRUNCATE TABLE command.

Return type:

SQLExecuteQueryOperator

Raises:

ValueError – If table is not a string or list-like object.

dlairflow.postgresql.vacuum_analyze(connection, schema, table, full=False, overwrite=False)[source]

Run VACUUM and ANALYZE on one or more tables in schema.

Parameters:
  • connection (str) – An Airflow database connection string.

  • schema (str) – The name of the database schema.

  • table (str or list) – The table(s) to operate on.

  • full (bool, optional) – If True, run VACUUM FULL.

  • overwrite (bool, optional) – If True replace any existing SQL template file.

Returns:

A task to run a VACUUM command.

Return type:

SQLExecuteQueryOperator

Raises:

ValueError – If table is not a string or list-like object.

Notes

The returned SQLExecuteQueryOperator has autocommit=True set, which inhibits execution of SQL commands in a transaction block. Normally a transaction block is a good thing, but VACUUM cannot be run in a transaction block.

dlairflow.scripts

Entry points for command-line scripts.

dlairflow.scripts.clean_dlairflow_sql_templates()[source]

Entry-point for clean_dlairflow_sql_templates.

Returns:

An integer suitable for passing to sys.exit().

Return type:

int

dlairflow.util

Generic, low-level utility functions. Some functions may be intended for internal use by the package itself.

dlairflow.util.ensure_sql()[source]

Ensure that ${AIRFLOW_HOME}/dags/sql exists.

Returns:

The full path to the directory.

Return type:

str

Raises:

KeyError – If AIRFLOW_HOME is not defined.

dlairflow.util.user_scratch(user_key)[source]

A standard, per-user scratch directory.

This function simply returns a path. It does not guarantee the directory exists. The environment variable DLAIRFLOW_SCRATCH_ROOT must be set.

Parameters:

user_key (str) – The key associated with the a user’s scratch space. This can be any arbitrary string such as a single DAG owner, e.g. dag.owner.

Returns:

The name of the directory.

Return type:

str

Raises: