Source code for dlairflow.load

# Licensed under a BSD-style 3-clause license - see LICENSE.md.
# -*- coding: utf-8 -*-
"""
dlairflow.load
==============

Tasks that involve ingesting data.
"""
# _legacy_bash = False
try:
    from airflow.providers.standard.operators.bash import BashOperator
except ImportError:
    from airflow.operators.bash import BashOperator
    # _legacy_bash = True
from .postgresql import _connection_to_environment


[docs] def load_table_with_fits2db(connection, schema, table, load_dir): """Create a task to load a database table with :command:`fits2db`. This function assumes that a FITS file is defined by:: f"{load_dir}/{schema}.{table}.fits" This function also assumes that :command:`fits2db` and :command:`psql` are available in the :envvar:`PATH` seen by the Airflow jobs. Parameters ---------- connection : :class:`str` An Airflow database connection string. This is needed to set environment variables. schema : :class:`str` The schema in which `table` is defined. table : :class:`str` The name of the table. load_dir : :class:`str` FITS file to load is in this directory. Returns ------- :class:`~airflow.providers.standard.operators.bash.BashOperator` A BashOperator that will execute :command:`fits2db`. """ load_table_template = ("fits2db -t {{ params.schema }}.{{ params.table }} " + "{{ params.load_dir }}/{{ params.schema }}.{{ params.table }}.fits " + "| psql") pg_env = _connection_to_environment(connection) load_table = BashOperator(task_id='load_table_with_fits2db', bash_command=load_table_template, params={'load_dir': load_dir, 'schema': schema, 'table': table}, env=pg_env, append_env=True) return load_table