Apache Airflow to obecnie jedno z najpopularniejszych narzędzi służących do zdecentralizowania cyklicznie uruchamianych zadań w postaci workflow/pipeline. Za pomocą kodu definiujemy przepływy pracy i planujemy ich wykonanie. Dostępny lekki interfejs API jak i interfejs graficzny (WebUI) co daje możliwość wizualizacji nawet dość skomplikowanych diagramów i monitorowania ich działania co przekłada się na łatwiejsze rozwiązywanie problemów. Mamy do dyspozycji dziennik, historia zadań i szablony jinja co znacząco rozszerza możliwości kodu. Mamy jedno miejsce, gdzie przechowujemy nasz kod i jego wyniki - rejestr działania naszych zadań - znacząco to ułatwia, gdyż samo opisanie tych przepływów jako kod daje większą swobodę w utrzymywaniu, wersjonowaniu i współdzieleniu naszych zadań.
W środowiskach produkcyjnych potrafi uruchomić 3500 sparkowych jobów dziennie na 50 klastrach Apache Hadoop źródło - duża skala, robi wrażenie.
Apache Airflow udowodnił, że można budować przepływy pracy bez skomplikowanych plików konfiguracyjnych lub szczegółowych definicji DAG.
Jak powstał Airflow?
Projekt Airflow wystartował w październiku 2014 roku dzięki Maxime Beauchemin w Airbnb. Od początku było to oprogramowanie typu open source i od czerwca 2015 roku oficjalnie hostowane na GitHub na profilu Airbnb. Projekt dołączył do programu Inkubatora Apache Software Foundation. W marcu 2016 r. Fundacja ogłosiła Apache Airflow jako projekt “najwyższego poziomu” (ang. first level) w styczniu 2019 r.
Warto wspomnieć o sponsorach tego projektu, m. in.
Comcast | Pineapple Fund | Microsoft | |
LeaseWeb | Tencent | ||
Amazon Web Services | Cloudera |
Pozostali to: ARM, Bloomberg, Handshake, Huawei, IBM, Indeed, Union Investment, Workday jak i anonimowi darczyńcy.
Zalety
Główny język to Python
Python jest użyty do tworzenia kodu przepływu pracy w tym formatów daty i godziny do planowania zadań i pętli do dynamicznego generowania zadań.
Interfejs graficzny
Monitoruje, planuje i zarządza przepływami pracy za pomocą interfejsu webowego, gdzie mamy dostęp do w statusu ukończonych i bieżących zadań wraz z wglądem w ich logi.
Duża liczba zintegrowanych usług
Dzięki plug-and-play mamy ułatwione zadanie, aby móc się połączyć z pewnymi usługami udostępnionymi przez np. dostawców chmurowych AWS, GCP czy Azure.
Łatwość
Wystarcza wiedza o języku Python, aby móc już transferować dane, zarządzać infrastrukturą czy nawet budować modele machine learning.
Projekt Open Source
Airflow ma wielu aktywnych użytkowników, którzy chętnie dzielą się swoimi doświadczeniami. Zalet OS jest wiele, więc chętnie z tego korzystamy :).
Architektura
Ogólna architektura przedstawianego rozwiązania składa się z poniższych elementów:
web server | master repo | scheduler | code repo | metadata database | workers |
[ PostgreSQL ] [ MariaDB ]
____/_____________/_ _ _ _ _
|
| A I R F L O W
|
[ webserver ] [ executor ]
[ repo ] scheduler + worker
code + master
Wybrane elementy systemu i pojęcia
DAG Directed Acyclic Graph
Najłatwiej tłumaczyć to sobie jako zadania a grupa takich zadań wykonuje już jakąś pracę. Przykładem może być zadanie Sparkowe, ETL, ML, CI, etc.
Jinja szablony
Wbudowane szablony pozwalają na używanie wielu zmiennych czy makr (np. szczegóły zadania, znacznik czasu wykonania) podczas wykonywania zadania. Ogólnie pobierają informację o środowisku.
API zaimplementowane w Pythonie
Interfejs API może zostać wykorzystany do m. in. bardziej jawnej kontroli procesów.
Czas i Planowanie
W Airflow nie ma możliwości uruchomienia jakiegokolwiek zadania DAG bez wskazania daty wykonania. Podobnie DAG nie może być uruchomiony kilkukrotnie w tym samym czasie tj. dla tej samej daty wykonania. Jeśli istnieje konieczność uruchomienia wielu instancji zadania to (bez kreatywności - potrzeba matką wynalazków;)) nie ma takiej możliwości. Ciekawsze jest to, że data wykonania execution_date
to nie jest to samo co data rozpoczęcia zadania - należy o tym pamiętać. Data wykonania to miejsce w czasie od którego może zostać uruchomione zadanie DAG.
Na poniższym wykresie gwiazdki
oznaczają planowany czas wykonania zadania (interwał to codziennie o 5:05), kratka
wskazuje zdefiniowaną datę wykonania, dolar
oznacza kiedy uruchomiliśmy DAG a duży iks
kiedy zadanie się jeszcze nie mogło wykonać. (Tak, lubię ASCII art).
data (harmonogram)
uruchomienia [schedule_interval]
zadania DAG '5 5 * * '
| |
_ _ _ | _ ________________________|_____________ _ _
| | | | | | | | |
- -X-$---X---#-*-----*-----*-----*-----*-----*- - - >
| (oś czasu)
'{{ ds }}'
[execution_date]
(data wykonania)
W przypadku, gdy nie chcemy planowania wyłączamy możliwość za pomocą schedule_interval=None
. Jeśli mamy cykliczność naszego zadania DAG to możemy użyć adnotacji podobnie jak w linuksowym cronie - przykład dla oby dwóch form w tabelce.
opisowo | jak w CRONie | interwal |
---|---|---|
None | brak zapisu | Brak powtarzalności |
@once | brak zapisu | Uruchamiamy tylko raz |
@hourly | 0 * * * * | Godzina |
@daily | 0 0 * * * | Dzień |
@weekly | 0 0 * * | Tydzień |
@monthly | 0 0 1 * * | Miesiąc |
@yearly | 0 0 1 1 * | Rok |
Do dyspozycji posiadamy jeszcze określenie punktu początkowego start_date
i końcowego end_date
na osi czasu. Data początkowa niekoniecznie musi być wykonana, gdy pierwszy przebieg DAG zostanie wykonany, ponieważ
Domyślne zmienne
Zmienne określające czas w większości się duplikują a wszystko za sprawą postfiksa _nodash
, który wprowadza inny zapis. Dla przykładu zapis {{ ds }}
i {{ ds_nodash }}
wskazuje na tą samą zmienną i wartość z tą różnicą, że wartość ma dwa różne zapisy. Przykłady w tabelce.
ds | YYYY-MM-DD |
---|---|
Przykład | 2020-03-03 |
ds_nodash | YYYYMMDD |
---|---|
Przykład | 20200303 |
Pod tabelką opisałem ważniejsze cechy.
Zmienne | Opis |
---|---|
{{ ds }} {{ ds_nodash }} | Data wykonania |
{{ prev_ds }} {{ prev_ds_nodash }} | Jeśli istnieje - wcześniejsza data wykonania |
{{ next_ds }} {{ next_ds_nodash }} | Jeśli istnieje to kolejny zapis czasu kiedy jest planowane uruchomienie |
{{ yesterday_ds }} {{ yesterday_ds_nodash }} | Dzień wcześniej |
{{ tomorrow_ds }} {{ tomorrow_ds_nodash }} | Kolejny dzień |
{{ ts }} {{ ts_nodash }} | Dokładniejszy czas w formacie ISO |
Dla prev_ds
/ prev_ds_nodash
wcześniejszą datę możemy otrzymać, jeśli był wprowadzony interwal w schedule_interval
np. @daily
- tu zapis jaki powinien się pojawić to 2020-03-02
/ 20200302
. Tak samo to wygląda dla przyszłych dat a więc dla next_ds
/ next_ds_nodash
. Należy zauważyć: jeżeli Airflow nie jest w stanie wyświetlić właściwych wartości to przekaże None
.
Format ISO:
- dla
ts
: 2020-03-03T00:00:00+01:00 - wzorowy zapis, - dla
ts_nodash
: 20200303T0000000100 - usunięte wszystkie znaki, które nie są alfanumeryczne i brak informacji o strefie czasowej, - dla
ts_nodash_with_tz
: 20200303T0000000100 - usunięte wszystkie znaki, które nie są alfanumeryczne.
Operatory
Każde zadanie jest zbudowane za pomocą operatorów - można wykorzystać już istniejące (domyślne wbudowane, zobacz https://github.com/apache/airflow/tree/master/airflow/operators) lub napisać własne. Operatory określają co zostanie wykonane przy uruchomieniu zadania. Podzielić je można na kilka grup:
BashOperator | Dingding Operators | Google Cloud Operators | |
Papermill | PythonOperator | Cross-DAG Dependencies |
Przykładem może być: https://airflow.apache.org/docs/stable/_modules/airflow/example_dags/example_bash_operator.html. Operator można połączyć z szablonami co znacząco rozszerza jego produktywność. Nie bez znaczenia jest spacja na końcu polecenia, bez niej Python nie może odczytać szablonu.
Błędna składnia
passion0x00 = BashOperator(
task_id=`bigdata`,
bash_command="bigdatapassion.sh",
dag=dag)
Właściwa składnia (z odstępem na końcu polecenia)
passion0x00 = BashOperator(
task_id=`bigdata`,
bash_command="bigdatapassion.sh ",
dag=dag)
Bardzo szybki start
Nie musimy instalować, możemy skorzystać z już istniejących rozwiązań SaaS dostępnych w chmurze - np. Google Cloud Platform (GCP) (https://cloud.google.com/composer/docs/how-to/accessing/airflow-web-interface) lub lokalnie uruchomić obraz Docker’a - https://github.com/puckel/docker-airflow.
Instalacja
Zalecanym sposobem wdrożenia środowiska Airflow jest instalacja za pomocą menedżera pakietów pip
.
pip install apache-airflow
Powyższe polecenie zainstaluje minimalny zestaw pakietów potrzebny do uruchomienia środowiska. Fakultatywnie dla naszego środowiska można domieszać takie składniki jak integracja z bazami danych (np. PostgreSQL, MariaDB, Microsoft SQL Server), integracje z platformami autoryzacji (np. LDAP, Google Auth, GitHub Auth), czy z środowiskami chmurowymi (np. Amazon Web Services, Microsoft Azure, Google Cloud Platform). Przykład dla zdalnego wykonywania zadań za pomocą połączenia SSH:
pip install 'apache-airflow[ssh]'
Do dyspozycji mam ponad 30 dodatkowych pakietów umożliwiające nam rozwinięcie funkcjonalności naszej instancji Airflow. Należy zwrócić uwagę, że dla środowisk produkcyjnych należy posłużyć się zewnętrzną bazą danych (np. PostgreSQL lub MariaDB/MySQL) a dla testowych pozostać można przy domyślnej SQLite.
Po instalacji wymagana jest inicjalizacja bazy danych - nie ma znaczenia czy to dla SQLite czy innych zewnętrznych.
airflow initdb
Połączenie z bazami danych realizowane jest za pomocą biblioteki SqlAlchemy (udostępnia funkcjonalność SQL bez potrzeby pisania bezpośrednio zapytań do bazy za pomocą tradycyjnego SQLa, wspiera ponad 25 typów połączeń do baz danych), dzięki czemu Airflow jest w stanie korzystać z dowolnej bazy obsługującej backend SqlAlchemy.
Jak ubogacić nasze środowisko?
Z czasem nasze środowisko może się skomplikować, możemy je próbować posprzątać za pomocą:
clear-missing-dags | Okresowo czyści wpisy w tabeli DAG, dla których nie ma już odpowiedniego pliku Python |
db-cleanup | Dbanie o porządek w MetaStore czyszcząc DagRun, TaskInstance, Log, XCom, Job DB, wpisy SlaMiss |
kill-halted-tasks | Cyklicznie wyszukuje zadania, które są uruchomione a nie ma ich w bazie danych |
log-cleanup | Czyścimy dziennik zadań, aby nie utrzymywać jego zbyt dużego rozmiaru |
delete-broken-dags | Czyszczenie tabeli ImportErrora, uruchamiamy aby okresowo usuwać pliki DAG i czyścić wpisy w tabeli ImportError dla DAG, których Airflow nie może poprawnie przeanalizować ani zaimportować |
Jak nie Airflow to co?
W świecie Big Data jest mnogość alternatyw - zestawienie (poglądowe posortowane alfabetycznie) w tabelce.
Nazwa | Adres / Opis |
---|---|
Adage | https://github.com/yadage/adage |
Argo | https://argoproj.github.io/argo/ |
Aurora | http://aurora.apache.org/ |
Azkaban | https://azkaban.github.io/ |
Azure Data Factory | https://docs.microsoft.com/en-us/azure/data-factory |
Chronos | https://mesos.github.io/chronos/ |
Falcon | http://falcon.apache.org/ |
Kedro | https://github.com/quantumblacklabs/kedro |
Luigi | https://github.com/spotify/luigi |
NiFi | https://nifi.apache.org/ |
Oozie | http://oozie.apache.org/ |
Pinball | https://github.com/pinterest/pinball |
Schedoscope | https://github.com/ottogroup/schedoscope |
Sparrow Scheduler | https://github.com/radlab/sparrow |