Apache Airflow — это инструмент для автоматизации рабочих процессов, задач и координации других программ в кластерах компьютеров. Airflow расширяет возможности организаций благодаря простому языку, основанному на правилах, который позволяет кодировать сложную обработку данных за считанные минуты. В этом посте мы узнаем об операторах воздушного потока, которые вы можете использовать для создания собственных пайплайнов.

Операторы выполняют инструкции, содержащиеся в вашем скрипте или файле описания рабочего процесса (например, .py, .json). Есть несколько операторов Airflow, которые могут помочь вам в достижении ваших целей. Однако может быть сложно понять поведение этих операторов, не имея хорошего концептуального понимания самого Airflow.

Что такое операторы воздушного потока Apache?

Apache Airflow — это инструмент MLOps and Data с открытым исходным кодом для моделирования и запуска конвейеров данных. Операторы воздушного потока — это команды, выполняемые вашей группой обеспечения доступности баз данных каждый раз, когда задача оператора запускается во время выполнения группы обеспечения доступности баз данных. В целом, каждый раз, когда задача оператора завершается без каких-либо результатов, вам следует использовать задачи с осторожностью, поскольку они потребляют время процессора и увеличить задержку.

Рекомендуемая литература: Как автоматизировать конвейеры данных с помощью Airflow?

Если ваша группа обеспечения доступности баз данных работает стабильно, задачи могут быть простым способом решения проблемы. Однако вам необходимо знать, как взаимодействуют операторы и где их использовать для достижения наилучших результатов. Проще говоря, когда вы создаете объекты-операторы, вы создаете задачи.

Рекомендуемая литература: Что такое DAG?

Если вы хотите, чтобы некоторые данные обрабатывались как можно быстрее и вам не нужны результаты сразу, а вместо этого вам нужен вывод этих данных как часть анализа или рабочего процесса, вам следует использовать задачи.

Свойства операторов воздушного потока:

  • Он определяет характер задачи и способ ее выполнения.
  • Когда создается экземпляр оператора, задача становится узлом в DAG.
  • Он автоматически повторяет попытку в случае сбоя.

Типы операторов воздушного потока:

Оператор действия

  • Это программа, которая выполняет определенное действие.
  • Например, EmailOperator, и BashOperator.

Оператор передачи

  • Он отвечает за перемещение данных из одной системы в другую.
  • Если вы работаете с большим набором данных, избегайте использования этого оператора.

Оператор датчика

  • Оператор датчика ожидает поступления данных в определенное место.
  • Это долгосрочные задачи.
  • Они полезны для отслеживания внешних процессов, таких как загрузка файлов.

Операторы играют решающую роль в процессе воздушного потока. Позже мы рассмотрим несколько наиболее популярных операторов, но сначала давайте рассмотрим взаимосвязь между задачей и оператором.

Различия между задачей и оператором могут поначалу сбивать с толку. Рисунок ниже может помочь вам понять взаимосвязь между DAG, задачей и оператором.

Задачи идеально автономны и не полагаются на информацию из других задач. Когда вы запускаете объект класса оператора, он становится задачей. Как правило, operator() создает ‹operator.objects›, которые преобразуются в задачи.

Определение Дага

dag= DAG(
        dag_id='t0',
        schedule='@time',
        ...
    )

Определение задач

t01= op01(
        task_id='name_task_1',
        operator_params=...,
        dag=dag,
        ...
    )
t02= op02(
        task_id='name_task_2',
        operator_params=...,
        dag=dag,
        ...
    )

Определение отношений между задачами

t01 >> t02
t02 >> t03
...
#Task 1 -> Task 2 -> Task 3

Давайте посмотрим на некоторые из самых популярных операторов:

Оператор Apache Airflow Bash — выполняет команду bash.

BashOperator в Apache Airflow предоставляет простой способ запуска команд bash в вашем рабочем процессе. Это оператор, который вы захотите использовать для указания задания, если ваша DAG выполняет команду или сценарий bash.

t1 = BashOperator(
        task_id=t1,
        dag=dag,
        bash_command='echo "Text"'
        )

Код BashOperator — Github

Оператор Apache Airflow Python — вызывает произвольную функцию Python.

Airflow PythonOperator предоставляет простой, но эффективный оператор, который позволяет вам запускать вызываемую функцию Python из вашей DAG.

def print_string():
    print("Test String")
t2 = PythonOperator(
        task_id="t3",
        dag=dag,
        python_callable=print_string,
      )

Код оператора Python — Github

Оператор электронной почты Apache Airflow — отправляет электронное письмо

EmailOperator — самый простой способ отправки писем из airflow. С помощью оператора электронной почты вы можете отправлять электронные письма, связанные с задачами, или создавать систему оповещения. Самым большим недостатком является то, что этот оператор не очень настраиваемый.

t4= EmailOperator(
       task_id=t4,
       to='[email protected]',
       subject='Alert Mail',
       html_content=""" Mail Test """,
       dag=dag
)

Код оператора электронной почты — Github

Apache Airflow PostgresOperator

Интерфейс оператора Postgres определяет задачи, взаимодействующие с базой данных PostgreSQL. Он будет использоваться для создания таблиц, удаления записей, вставки записей и многого другого.

with DAG(
    dag_id="postgres_operator_dag",
    start_date=datetime.datetime(2021, 10, 11),
    schedule_interval="@once",
    catchup=False,
) as dag:
t4= PostgresOperator(
        task_id="t4",
        sql="""
            CREATE TABLE IF NOT EXISTS pet (
            table_id SERIAL PRIMARY KEY,
            name VARCHAR NOT NULL,
            table_type VARCHAR NOT NULL,
            birth_date DATE NOT NULL,
            OWNER VARCHAR NOT NULL);
          """,
    )

Код PostgreOperator

Оператор Apache Airflow SSH

t5 = SSHOperator(
        task_id='SSHOperator',
        ssh_conn_id='ssh_connectionid',
        command='echo "Text from SSH Operator"'
    )

Код оператора SSH

Оператор докера Apache Airflow

Docker Operator помогает выполнять команды внутри контейнера Docker. Docker — это инструмент для создания и управления «контейнерами», которые представляют собой крошечные виртуальные системы, в которых вы можете запускать свой код. С помощью оператора докера airflow вы можете хранить файлы во временной директории, созданной на хосте и смонтированной в контейнер.

t6 = DockerOperator(
            task_id='docker_command',
            image='centos:latest',
            api_version='auto',
            auto_remove=True,
            command="/bin/sleep 30",
            docker_url="unix://var/run/docker.sock",
            network_mode="bridge"
)

Код оператора докера

Оператор HTTP Apache Airflow

Для выполнения действия выполняется вызов конечной точки в системе HTTP. Это полезно, если вы используете API, который возвращает большую полезную нагрузку JSON, и вас интересует только ее часть.

t7 = HttpSensor(
    task_id='t7',
    http_conn_id='http_default',
    endpoint='',
    request_params={},
    response_check=lambda response: "httpbin" in response.text,
    poke_interval=4,
    dag=dag,
)

Код оператора HTTP

Оператор Apache Airflow Snowflake

SnowflakeOperator выполняет команды SQL в базе данных Snowflake. Эти операторы могут создавать, вставлять, объединять, обновлять, удалять, копировать и завершать задачи, если это необходимо.

dag = DAG(
    'example_snowflake',
    start_date=datetime(2021, 11, 11),
    default_args={'snowflake_id': SNOWFLAKE_ID},
    tags=['example'],
    catchup=False,
)

t8 = SnowflakeOperator(
    task_id='t8',
    dag=dag,
    sql=CREATE_TABLE_SQL_STRING,
    warehouse=SNOWFLAKE_WAREHOUSE,
    database=SNOWFLAKE_DATABASE,
    schema=SNOWFLAKE_SCHEMA,
    role=SNOWFLAKE_ROLE,
)

Код оператора Snowflake

Операторы Apache Airflow Spark

Apache Spark — это быстрое и масштабируемое решение для кластерных вычислений общего назначения. Он предоставляет Spark SQL для обработки SQL и структурированных данных, MLlib для машинного обучения и многое другое. Все конфигурации для SparkSqlOperator исходят из параметров оператора.

t9= SparkJDBCOperator(
    cmd_type='spark_to_jdbc',
    jdbc_table="foo",
    spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
    jdbc_driver="org.postgresql.Driver",
    metastore_table="bar",
    save_mode="append",
    task_id="t9",
)

Код оператора искры

Операторы Apache Airflow SensorBase

Операторы сенсора продолжают работать с заданным интервалом, успешно, когда выполняется набор критериев, и терпят неудачу, если истекает время ожидания. Вам обязательно ждать файл? Можно ли увидеть, существует ли элемент SQL? Можно ли отложить выполнение вашей DAG? Это предел возможностей датчиков воздушного потока.

def _failure_callback():
if isinstance(context['exception'], AirflowSensorTimeout):
print("timed out message")
with DAG() as dag:
t10= FileSensor(
task_id='t10',
poke_interval=100,
timeout=20,
mode="reschedule",
fail_callback=fail_callback
)

Код оператора датчика

Операторы больших запросов Apache Airflow

BigQueryCheckOperator можно использовать для проверки BigQuery.

create_table = BigQueryCreateEmptyTableOperator(
task_id="t11",
dataset_id=DATASET_NAME,
table_id="test_table11",
schema_fields=[
	{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
	{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)

Код BigqueryOperator

Операторов гораздо больше; Вы можете просмотреть полный список операторов воздушного потока.

Проверьте Документацию Apache, чтобы узнать больше.

Рекомендации для операторов Apache Airflow

Чтобы получить максимальную отдачу от этих операторов, вы должны знать, что они делают и когда их целесообразно применять в вашем конкретном случае использования. В целом операторы воздушного потока делятся на две категории: задачи планирования или задачи обработки данных. Оператор планирования будет планировать события на основе некоторого шаблона времени, например, истечения срока действия в течение заданного периода времени. Оператор манипулирования данными будет выполнять определенную задачу обработки входящих наборов данных, например, разбивать таблицы для улучшения возможностей запросов.

  • Не используйте Airflow в одной кодовой базе с базами данных. Если кто-то изменяет ваши модели, Airflow выдаст ошибки при запуске в базе данных. Это может сбить вас с толку и увеличить время работы. Кроме того, обязательно оставляйте комментарии во всем своем коде, чтобы объяснить, что делает каждая строка. Это облегчает людям, незнакомым с Airflow, понимание того, как все работает.
  • Используйте операторы экономно. Операторы могут быть отличными и экономить время, но они также могут быть очень трудоемкими и раздутыми.
  • Планировщик Airflow регулярно запускает DAG в зависимости от параметров даты начала и интервала расписания, указанных в файле DAG. Вместо того, чтобы запускать запуск DAG в начале запланированного периода, Планировщик Airflow запускает его ближе к завершению. Рассмотрим следующую DAG, которая запускается каждый день в 9 утра:

dag = DAG(‘dagname’, default_args=default_args, schedule_interval=’0 9 * * *’)

  • Используйте операторы, которые не изменяют существующие данные, например: stream_tasks_by_updated_time и update_datasets_by_.
  • При создании объекта DAG необходимо указать идентификатор DAG. Во всех группах DAG идентификатор DAG должен быть уникальным. Если у вас есть две группы обеспечения доступности баз данных с одинаковым идентификатором DAG, появится только одна из них, и вы можете столкнуться с непредвиденным поведением. Лучше всего определить подходящий идентификатор DAG и описание DAG. Если вы используете псевдоним для ссылки на идентификатор, вы получите сообщение об ошибке, говорящее о том, что он не соответствует данным при запуске этой модели.
  • Всегда назначайте датчику параметр тайм-аута. Подумайте о своем приложении и о том, как долго вы ожидаете, что датчик будет ждать, прежде чем настроить тайм-аут датчика.
  • Используйте режим тыкания, если интервал тыкания относительно короткий. Например, использование режима перепланирования может привести к перегрузке вашего планировщика. По возможности используйте режим перепланирования, особенно для датчиков с длительным сроком службы, чтобы датчик не занимал все время рабочий слот. Это позволяет избежать взаимоблокировок в Airflow, когда датчики используют все доступные рабочие слоты.

Узнайте, как Улучшить состояние модели с помощью Censius AI

Заключение

Доступно несколько инструментов MLOps, но Apache Airflow предлагает уникальные преимущества, и все больше компаний используют его для управления своими конвейерами данных. Оператор AirFlow — это оркестратор данных, доставляемых конвейером Airflow.Оператор сообщает конвейеру, куда отправлять данные, как часто их отправлять и какие действия предпринимать при поступлении новых данных. Мы рассмотрели что такое операторы и обсуждались несколько типов операторов в этой статье. Надеюсь, вам понравилась статья и берегите себя!

Первоначально опубликовано на: https://censius.ai/blogs/apache-airflow-operators-guide