dlairflow API
dlairflow
Reusable code components for building Apache Airflow® DAGs.
dlairflow.load
Tasks that involve ingesting data.
- dlairflow.load.load_table_with_fits2db(connection, schema, table, load_dir)[source]
Create a task to load a database table with fits2db.
This function assumes that a FITS file is defined by:
f"{load_dir}/{schema}.{table}.fits"
This function also assumes that fits2db and psql are available in the
PATHseen by the Airflow jobs.- Parameters:
- Returns:
A BashOperator that will execute fits2db.
- Return type:
BashOperator
dlairflow.meta
Tasks that involve metadata, verification, etc.
- dlairflow.meta.fitsverify(filename)[source]
Run fitsverify on filename.
- Parameters:
filename (
str) – Name of a FITS file to verify.- Returns:
A BashOperator that will execute fitsverify.
- Return type:
BashOperator
- dlairflow.meta.get(source, item)[source]
Obtain metadata about item from source.
- Parameters:
- Returns:
A Felis
datamodelobject containing the metadata.- Return type:
SchemaorTableorColumn- Raises:
ValueError – If item does not match the expected format.
Notes
Formats for item:
- name1
The metadata associated with the entire schema ‘name1’ will be extracted.
- name1.name2
The metadata associated with table ‘name2’ in schema ‘name1’ will be extracted.
- name1.name2.name3
The metadata associated with column ‘name3’ in table ‘name2’ in schema ‘name1’ will be extracted.
dlairflow.postgresql
Standard tasks for working with PostgreSQL that can be imported into a DAG.
- dlairflow.postgresql._PostgresOperatorWrapper(**kwargs)[source]
Handle different call signatures for PostgresOperator in different versions of Airflow.
- dlairflow.postgresql._connection_to_environment(connection)[source]
Convert a database connection to environment variables.
- dlairflow.postgresql.index_columns(connection, schema, table, columns, tablespace=None, overwrite=False)[source]
Create “generic” indexes for a set of columns
- Parameters:
connection (
str) – An Airflow database connection string.schema (
str) – The name of the database schema.table (
str) – The name of the table in schema.columns (
list) – A list of columns to index. See below for the possible entries in the list of columns.tablespace (
str, optional) – Create the indexes in a specific tablespace if set.overwrite (
bool, optional) – IfTruereplace any existing SQL template file.
- Returns:
A task to create several indexes.
- Return type:
SQLExecuteQueryOperator
Notes
columns may be a list containing multiple types:
- dlairflow.postgresql.pg_dump_schema(connection, schema, dump_dir)[source]
Dump an entire database schema using pg_dump.
- dlairflow.postgresql.pg_restore_schema(connection, schema, dump_dir)[source]
Restore a database schema using pg_restore.
- dlairflow.postgresql.primary_key(connection, schema, primary_keys, tablespace=None, overwrite=False)[source]
Create a primary key on one or more tables in schema.
- Parameters:
connection (
str) – An Airflow database connection string.schema (
str) – The name of the database schema.primary_keys (
dict) – A dictionary containing the of the table in schema mapped to the primary key column(s). See below for details.tablespace (
str, optional) – Create the indexes in a specific tablespace if set.overwrite (
bool, optional) – IfTruereplace any existing SQL template file.
- Returns:
A task to create a primary key.
- Return type:
SQLExecuteQueryOperator
Notes
primary_keys may be a
dictcontaining multiple types:
- dlairflow.postgresql.q3c_index(connection, schema, table, ra='ra', dec='dec', tablespace=None, overwrite=False)[source]
Create a q3c index on schema.`table`.
- Parameters:
connection (
str) – An Airflow database connection string.schema (
str) – The name of the database schema.table (
str) – The name of the table in schema.ra (
str, optional) – Name of the column containing Right Ascension, default ‘ra’.dec (
str, optional) – Name of the column containing Declination, default ‘dec’.tablespace (
str, optional) – Create the index in a specific tablespace if set.overwrite (
bool, optional) – IfTruereplace any existing SQL template file.
- Returns:
A task to create a q3c index.
- Return type:
SQLExecuteQueryOperator
- dlairflow.postgresql.truncate_table(connection, schema, table, restart=False, cascade=False, overwrite=False)[source]
Run
TRUNCATE TABLEon one or more tables in schema.- Parameters:
connection (
str) – An Airflow database connection string.schema (
str) – The name of the database schema.restart (
bool, optional) – IfTrue, any sequences associated with columns in the table(s) will be reset. The default is not to reset such sequences.cascade (
bool, optional) – IfTrue, theTRUNCATEcommand will also truncate tables connected by foreign key relationships. This is extrememly dangerous!overwrite (
bool, optional) – IfTrue, replace any existing SQL template file.
- Returns:
A task to run a
TRUNCATE TABLEcommand.- Return type:
SQLExecuteQueryOperator- Raises:
ValueError – If table is not a string or list-like object.
- dlairflow.postgresql.vacuum_analyze(connection, schema, table, full=False, overwrite=False)[source]
Run
VACUUMandANALYZEon one or more tables in schema.- Parameters:
- Returns:
A task to run a
VACUUMcommand.- Return type:
SQLExecuteQueryOperator- Raises:
ValueError – If table is not a string or list-like object.
Notes
The returned
SQLExecuteQueryOperatorhas autocommit=True set, which inhibits execution of SQL commands in a transaction block. Normally a transaction block is a good thing, butVACUUMcannot be run in a transaction block.
dlairflow.scripts
Entry points for command-line scripts.
- dlairflow.scripts.clean_dlairflow_sql_templates()[source]
Entry-point for clean_dlairflow_sql_templates.
- Returns:
An integer suitable for passing to
sys.exit().- Return type:
dlairflow.util
Generic, low-level utility functions. Some functions may be intended for internal use by the package itself.
- dlairflow.util.ensure_sql()[source]
Ensure that
${AIRFLOW_HOME}/dags/sqlexists.- Returns:
The full path to the directory.
- Return type:
- Raises:
KeyError – If
AIRFLOW_HOMEis not defined.
- dlairflow.util.user_scratch(user_key)[source]
A standard, per-user scratch directory.
This function simply returns a path. It does not guarantee the directory exists. The environment variable
DLAIRFLOW_SCRATCH_ROOTmust be set.- Parameters:
user_key (
str) – The key associated with the a user’s scratch space. This can be any arbitrary string such as a single DAG owner, e.g.dag.owner.- Returns:
The name of the directory.
- Return type:
- Raises:
KeyError – If
DLAIRFLOW_SCRATCH_ROOTis not set.ValueError – If
DLAIRFLOW_SCRATCH_ROOTis set but empty.