Интеграция хранилища HashiCorp с Apache Airflow

Введение

По умолчанию Apache Airflow считывает соединения и переменные из базы данных метаданных, которая, по сути, хранит все, что отображается на соответствующей вкладке пользовательского интерфейса Airflow.

Несмотря на то, что нет абсолютно ничего особенно плохого в добавлении (или удалении) соединений и переменных через пользовательский интерфейс (и, таким образом, сохранении их в базе данных матаданных, которая также предлагает шифрование в состоянии покоя), иногда может быть более управляемым подключение Airflow к центральному инструменту управления секретами. который также используется другими инструментами в организации.

Этот подход, по сути, поможет вам более эффективно управлять своими секретами, так что всякий раз, когда что-то меняется, это отражается во всех инструментах, которые фактически считывают эти секреты, поэтому вам не нужно вручную обновлять каждый секрет в соответствующей системе (например, Airflow). ).

В сегодняшнем руководстве мы покажем, как подключить HashiCorp Vault — один из наиболее часто используемых инструментов управления секретами — к Apache Airflow.

Шаг 1: Обновите файл airflow.cfg

Чтобы интегрировать HashiCorp Vault с Airflow таким образом, чтобы последний извлекал соединения и переменные из первого, нам нужно указать VaultBackend как backend в разделе [secrets] раздела airflow.cfg.

[secrets]
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {
    "connections_path": "connections",
    "variables_path": "variables",
    "url": "http://127.0.0.1:8200",
    "mount_point": "airflow",
}

В приведенной выше конфигурации предполагается, что ваши соединения Airflow хранятся в виде секретов в airflow mount_path и по пути connections (т. е. airflow/connections). Точно так же ваши переменные хранятся в airflow/variables.

Вам также следует убедиться, что вы указали дополнительные параметры, которые позволят вам выполнять аутентификацию между Airflow и Vault. Например, вам может понадобиться передать аргументы approle, role_id и secret_id (или, возможно, аргумент token). Полный список доступных аргументов вы можете посмотреть здесь.

Шаг 2. Добавьте соединения в качестве секретов в Vault

Хранилище подключений должно храниться в connections_path, указанном в конфигурации Airflow, как показано на предыдущем шаге.

Для каждого подключения вам нужно будет создать один секрет Vault, содержащий хотя бы один из следующих ключей:

  • conn_id (str) – идентификатор подключения.
  • conn_type (str) — тип подключения.
  • description (str) — описание подключения.
  • host (str) -- хост.
  • login (str) -- Логин.
  • password (str) -- пароль.
  • schema (str) -- схема.
  • port (int) — номер порта.
  • extra (Union[str, dict]) – дополнительные метаданные. Здесь можно сохранить нестандартные данные, такие как приватные/SSH-ключи. Объект в формате JSON.
  • uri (str) -- адрес URI, описывающий параметры подключения.

Это связано с тем, что содержимое секретов должно совпадать с предоставленным, должно совпадать с ожидаемыми параметрами для класса airflow.models.connections.Connection.

Шаг 3. Добавьте переменные в качестве секретов хранилища.

Теперь, переходя к переменным, вы должны быть немного осторожны при добавлении их в качестве секретов в хранилище, поскольку ожидаемый формат может быть не таким простым, как вы обычно ожидаете.

Предположим, что в airflow.cfg вы указали variables_path как variables, а mount_point как airflow. Если вы хотите сохранить переменную с именем my_var, имеющую значение hello, вам нужно будет сохранить секрет как:

vault kv put airflow/variables/my_var value=hello

Обратите внимание, что секрет Key равен value, а секрет Value равен hello!

Шаг 4. Доступ к соединениям в группах обеспечения доступности баз данных Airflow

Вы можете использовать следующий код, чтобы получить доступ к соединениям Airflow, которые хранятся как секреты хранилища, и просмотреть их детали:

import json 
import logging  
from airflow.hooks.base_hook import BaseHook   
conn = BaseHook.get_connection('secret_name') 
logging.info(     
    f'Login: {conn.login}'     
    f'Password: {conn.password}'     
    f'URI: {conn.get_uri()}'     
    f'Host: {conn.host}'     
    f'Extra: " {json.loads(conn.get_extra())}'   
    # ... 
)

Обратите внимание, что сначала будут выполняться поиски в секретах Vault, затем в переменных среды, а затем в хранилище метаданных (т. е. в соединениях, добавленных через пользовательский интерфейс Airflow). Этот порядок поиска не настраивается.

Шаг 5. Доступ к переменным в группах DAG Airflow

Аналогичным образом следующий фрагмент кода поможет вам получить значения переменных, хранящихся в Vault:

import logging  
from airflow.models import Variable   
my_var = Variable.get('var_name') 
logging.info(f'var_name value: {my_var}')

Шаг 6. Тестирование интеграции хранилища с помощью DAG

В приведенном ниже фрагменте кода вы можете найти пример Airflow DAG, который вы можете использовать, чтобы проверить, может ли Airflow правильно читать переменные и соединения из хранилища, указанного в файле airflow.cfg.

import logging 
from datetime import datetime  
from airflow import DAG 
from airflow.models import Variable 
from airflow.hooks.base_hook import BaseHook 
from airflow.operators.python_operator import PythonOperator

def get_secrets(**kwargs):
     
    # Test connections   
    conn = BaseHook.get_connection(kwargs['my_conn_id'])
    logging.info(
        f"Password: {conn.password}, Login: {conn.login}, "
        f"URI: {conn.get_uri()}, Host: {conn.host}"
    )        
    # Test variables     
    test_var = Variable.get(kwargs['var_name'])
    logging.info(f'my_var_name: {test_var}')
with DAG(   
    'test_vault_connection',    
    start_date=datetime(2020, 1, 1),    
    schedule_interval=None
) as dag:      
    test_task = PythonOperator(         
        task_id='test-task',         
        python_callable=get_secrets,         
        op_kwargs={
            'my_conn_id': 'connection_to_test',
            'var_name': 'my_test_var',
        },     
    )

Последние мысли

В сегодняшнем руководстве мы продемонстрировали, как настроить хранилище HashiCorp, чтобы оно использовалось Apache Airflow в качестве основного внутреннего хранилища секретов. Затем мы продемонстрировали, как добавлять соединения и секреты в Vault.

Наконец, мы рассмотрели несколько практических примеров, демонстрирующих, как можно программно получать соединения и переменные из Vault и использовать их в группах обеспечения доступности баз данных Airflow.

Стать участником и читать все истории на Medium. Ваш членский взнос напрямую поддерживает меня и других писателей, которых вы читаете. Вы также получите полный доступ ко всем историям на Medium.



Статьи по теме, которые вам также могут понравиться