Конечно, есть библиотека, которая реализует адаптер ClickHouse к Airflow, НО она не работает с новым Aiflow… Да и не поддерживается уже 2 года.
Поэтому расскажу, как подключить его вручную.
Настройка Connections
- Сначала заходим во вкладку Connections
- Там создаем Connection и прописываем доступы до clickhouse
- Если нет password или login просто оставляем пустыми
- Port обязательно 8123 (если вы его не меняли)
- Назовем для простоты ‘clickhouse’
Clickhouse_driver
Устанавливаем clickhouse_driver
pip install clickhouse_driver
Session
Чтобы создать сессию, я реализовал простенький Singleton класс, его код должен быть понятен:
from clickhouse_driver import Client class ClickHouseConnection: connection = None def get_connection(connection_name='clickhouse'): if ClickHouseConnection.connection: return connection db_props = BaseHook.get_connection(connection_name) ClickHouseConnection.connection = Client(db_props.host) return ClickHouseConnection.connection
Exec
Теперь, чтобы использовать в коде просто получаем connection и делаем raw sql запросы:
def task(): ch_connection = ClickHouseConnection.get_connection() logging.info("CLIENT ADDED") ch_connection.execute("TRUNCATE TABLE") logging.info("TRUNCATED") ch_connection.execute("INSERT INTO ... VALUES", ...)
Единственное странное, что я заметил: не сохраняется None
значения.