Big Data Passion

Big Data Passion

Big Data Passion to strona dla pasjonatów szeroko rozumianego Big Data

Marcin Wojtczak

Apache Airflow to obecnie jedno z najpopularniejszych narzędzi służących do zdecentralizowania cyklicznie uruchamianych zadań w postaci workflow/pipeline. Za pomocą kodu definiujemy przepływy pracy i planujemy ich wykonanie. Dostępny lekki interfejs API jak i interfejs graficzny (WebUI) co daje możliwość wizualizacji nawet dość skomplikowanych diagramów i monitorowania ich działania co przekłada się na łatwiejsze rozwiązywanie problemów. Mamy do dyspozycji dziennik, historia zadań i szablony jinja co znacząco rozszerza możliwości kodu. Mamy jedno miejsce, gdzie przechowujemy nasz kod i jego wyniki - rejestr działania naszych zadań - znacząco to ułatwia, gdyż samo opisanie tych przepływów jako kod daje większą swobodę w utrzymywaniu, wersjonowaniu i współdzieleniu naszych zadań.

W środowiskach produkcyjnych potrafi uruchomić 3500 sparkowych jobów dziennie na 50 klastrach Apache Hadoop źródło - duża skala, robi wrażenie.

Apache Airflow udowodnił, że można budować przepływy pracy bez skomplikowanych plików konfiguracyjnych lub szczegółowych definicji DAG.

Jak powstał Airflow?

Projekt Airflow wystartował w październiku 2014 roku dzięki Maxime Beauchemin w Airbnb. Od początku było to oprogramowanie typu open source i od czerwca 2015 roku oficjalnie hostowane na GitHub na profilu Airbnb. Projekt dołączył do programu Inkubatora Apache Software Foundation. W marcu 2016 r. Fundacja ogłosiła Apache Airflow jako projekt “najwyższego poziomu” (ang. first level) w styczniu 2019 r.

Warto wspomnieć o sponsorach tego projektu, m. in.

Comcast Pineapple Fund Microsoft
Google LeaseWeb Tencent
Amazon Web Services Facebook Cloudera

Pozostali to: ARM, Bloomberg, Handshake, Huawei, IBM, Indeed, Union Investment, Workday jak i anonimowi darczyńcy.

Zalety

AirFlow Features BigDataPassion

Główny język to Python

Python jest użyty do tworzenia kodu przepływu pracy w tym formatów daty i godziny do planowania zadań i pętli do dynamicznego generowania zadań.

Interfejs graficzny

Monitoruje, planuje i zarządza przepływami pracy za pomocą interfejsu webowego, gdzie mamy dostęp do w statusu ukończonych i bieżących zadań wraz z wglądem w ich logi.

Duża liczba zintegrowanych usług

Dzięki plug-and-play mamy ułatwione zadanie, aby móc się połączyć z pewnymi usługami udostępnionymi przez np. dostawców chmurowych AWS, GCP czy Azure.

Łatwość

Wystarcza wiedza o języku Python, aby móc już transferować dane, zarządzać infrastrukturą czy nawet budować modele machine learning.

Projekt Open Source

Airflow ma wielu aktywnych użytkowników, którzy chętnie dzielą się swoimi doświadczeniami. Zalet OS jest wiele, więc chętnie z tego korzystamy :).

Architektura

Ogólna architektura przedstawianego rozwiązania składa się z poniższych elementów:

web server master repo scheduler code repo metadata database workers
      [ PostgreSQL ]  [ MariaDB ] 
      ____/_____________/_ _ _ _ _
     |
     |    A I R F L O W
     |          
  [ webserver ]  [ executor ]

    [ repo ]     scheduler + worker

 code + master  

Wybrane elementy systemu i pojęcia

DAG Directed Acyclic Graph

Najłatwiej tłumaczyć to sobie jako zadania a grupa takich zadań wykonuje już jakąś pracę. Przykładem może być zadanie Sparkowe, ETL, ML, CI, etc.

Jinja szablony

Wbudowane szablony pozwalają na używanie wielu zmiennych czy makr (np. szczegóły zadania, znacznik czasu wykonania) podczas wykonywania zadania. Ogólnie pobierają informację o środowisku.

API zaimplementowane w Pythonie

Interfejs API może zostać wykorzystany do m. in. bardziej jawnej kontroli procesów.

Czas i Planowanie

W Airflow nie ma możliwości uruchomienia jakiegokolwiek zadania DAG bez wskazania daty wykonania. Podobnie DAG nie może być uruchomiony kilkukrotnie w tym samym czasie tj. dla tej samej daty wykonania. Jeśli istnieje konieczność uruchomienia wielu instancji zadania to (bez kreatywności - potrzeba matką wynalazków;)) nie ma takiej możliwości. Ciekawsze jest to, że data wykonania execution_date to nie jest to samo co data rozpoczęcia zadania - należy o tym pamiętać. Data wykonania to miejsce w czasie od którego może zostać uruchomione zadanie DAG.

Na poniższym wykresie gwiazdki oznaczają planowany czas wykonania zadania (interwał to codziennie o 5:05), kratka wskazuje zdefiniowaną datę wykonania, dolar oznacza kiedy uruchomiliśmy DAG a duży iks kiedy zadanie się jeszcze nie mogło wykonać. (Tak, lubię ASCII art).

     data                  (harmonogram)
 uruchomienia           [schedule_interval]
 zadania  DAG                '5 5 * * '
      |                           |
_ _ _ | _ ________________________|_____________ _ _ 
    | |   |     |     |     |     |     |     |
 - -X-$---X---#-*-----*-----*-----*-----*-----*- - - >
              |                                 (oś czasu)
         '{{ ds }}'
      [execution_date]
      (data wykonania)

W przypadku, gdy nie chcemy planowania wyłączamy możliwość za pomocą schedule_interval=None. Jeśli mamy cykliczność naszego zadania DAG to możemy użyć adnotacji podobnie jak w linuksowym cronie - przykład dla oby dwóch form w tabelce.

opisowo jak w CRONie interwal
None brak zapisu Brak powtarzalności
@once brak zapisu Uruchamiamy tylko raz
@hourly 0 * * * * Godzina
@daily 0 0 * * * Dzień
@weekly 0 0 * * Tydzień
@monthly 0 0 1 * * Miesiąc
@yearly 0 0 1 1 * Rok

Do dyspozycji posiadamy jeszcze określenie punktu początkowego start_date i końcowego end_date na osi czasu. Data początkowa niekoniecznie musi być wykonana, gdy pierwszy przebieg DAG zostanie wykonany, ponieważ

Domyślne zmienne

Zmienne określające czas w większości się duplikują a wszystko za sprawą postfiksa _nodash, który wprowadza inny zapis. Dla przykładu zapis {{ ds }} i {{ ds_nodash }} wskazuje na tą samą zmienną i wartość z tą różnicą, że wartość ma dwa różne zapisy. Przykłady w tabelce.

ds YYYY-MM-DD
Przykład 2020-03-03
ds_nodash YYYYMMDD
Przykład 20200303

Pod tabelką opisałem ważniejsze cechy.

Zmienne Opis
{{ ds }} {{ ds_nodash }} Data wykonania
{{ prev_ds }} {{ prev_ds_nodash }} Jeśli istnieje - wcześniejsza data wykonania
{{ next_ds }} {{ next_ds_nodash }} Jeśli istnieje to kolejny zapis czasu kiedy jest planowane uruchomienie
{{ yesterday_ds }} {{ yesterday_ds_nodash }} Dzień wcześniej
{{ tomorrow_ds }} {{ tomorrow_ds_nodash }} Kolejny dzień
{{ ts }} {{ ts_nodash }} Dokładniejszy czas w formacie ISO

Dla prev_ds / prev_ds_nodash wcześniejszą datę możemy otrzymać, jeśli był wprowadzony interwal w schedule_interval np. @daily - tu zapis jaki powinien się pojawić to 2020-03-02 / 20200302. Tak samo to wygląda dla przyszłych dat a więc dla next_ds / next_ds_nodash. Należy zauważyć: jeżeli Airflow nie jest w stanie wyświetlić właściwych wartości to przekaże None.

Format ISO:

  • dla ts: 2020-03-03T00:00:00+01:00 - wzorowy zapis,
  • dla ts_nodash: 20200303T0000000100 - usunięte wszystkie znaki, które nie są alfanumeryczne i brak informacji o strefie czasowej,
  • dla ts_nodash_with_tz: 20200303T0000000100 - usunięte wszystkie znaki, które nie są alfanumeryczne.

Operatory

Każde zadanie jest zbudowane za pomocą operatorów - można wykorzystać już istniejące (domyślne wbudowane, zobacz https://github.com/apache/airflow/tree/master/airflow/operators) lub napisać własne. Operatory określają co zostanie wykonane przy uruchomieniu zadania. Podzielić je można na kilka grup:

BashOperator Dingding Operators Google Cloud Operators
Papermill PythonOperator Cross-DAG Dependencies

Przykładem może być: https://airflow.apache.org/docs/stable/_modules/airflow/example_dags/example_bash_operator.html. Operator można połączyć z szablonami co znacząco rozszerza jego produktywność. Nie bez znaczenia jest spacja na końcu polecenia, bez niej Python nie może odczytać szablonu.

Błędna składnia

passion0x00 = BashOperator(
  task_id=`bigdata`,
  bash_command="bigdatapassion.sh",
  dag=dag)

Właściwa składnia (z odstępem na końcu polecenia)

passion0x00 = BashOperator(
  task_id=`bigdata`,
  bash_command="bigdatapassion.sh ",
  dag=dag)

Bardzo szybki start

Nie musimy instalować, możemy skorzystać z już istniejących rozwiązań SaaS dostępnych w chmurze - np. Google Cloud Platform (GCP) (https://cloud.google.com/composer/docs/how-to/accessing/airflow-web-interface) lub lokalnie uruchomić obraz Docker’a - https://github.com/puckel/docker-airflow.

Instalacja

Zalecanym sposobem wdrożenia środowiska Airflow jest instalacja za pomocą menedżera pakietów pip.

pip install apache-airflow

Powyższe polecenie zainstaluje minimalny zestaw pakietów potrzebny do uruchomienia środowiska. Fakultatywnie dla naszego środowiska można domieszać takie składniki jak integracja z bazami danych (np. PostgreSQL, MariaDB, Microsoft SQL Server), integracje z platformami autoryzacji (np. LDAP, Google Auth, GitHub Auth), czy z środowiskami chmurowymi (np. Amazon Web Services, Microsoft Azure, Google Cloud Platform). Przykład dla zdalnego wykonywania zadań za pomocą połączenia SSH:

pip install 'apache-airflow[ssh]'

Do dyspozycji mam ponad 30 dodatkowych pakietów umożliwiające nam rozwinięcie funkcjonalności naszej instancji Airflow. Należy zwrócić uwagę, że dla środowisk produkcyjnych należy posłużyć się zewnętrzną bazą danych (np. PostgreSQL lub MariaDB/MySQL) a dla testowych pozostać można przy domyślnej SQLite.

Po instalacji wymagana jest inicjalizacja bazy danych - nie ma znaczenia czy to dla SQLite czy innych zewnętrznych.

airflow initdb

Połączenie z bazami danych realizowane jest za pomocą biblioteki SqlAlchemy (udostępnia funkcjonalność SQL bez potrzeby pisania bezpośrednio zapytań do bazy za pomocą tradycyjnego SQLa, wspiera ponad 25 typów połączeń do baz danych), dzięki czemu Airflow jest w stanie korzystać z dowolnej bazy obsługującej backend SqlAlchemy.

Jak ubogacić nasze środowisko?

Z czasem nasze środowisko może się skomplikować, możemy je próbować posprzątać za pomocą:

clear-missing-dags Okresowo czyści wpisy w tabeli DAG, dla których nie ma już odpowiedniego pliku Python
db-cleanup Dbanie o porządek w MetaStore czyszcząc DagRun, TaskInstance, Log, XCom, Job DB, wpisy SlaMiss
kill-halted-tasks Cyklicznie wyszukuje zadania, które są uruchomione a nie ma ich w bazie danych
log-cleanup Czyścimy dziennik zadań, aby nie utrzymywać jego zbyt dużego rozmiaru
delete-broken-dags Czyszczenie tabeli ImportErrora, uruchamiamy aby okresowo usuwać pliki DAG i czyścić wpisy w tabeli ImportError dla DAG, których Airflow nie może poprawnie przeanalizować ani zaimportować

Jak nie Airflow to co?

W świecie Big Data jest mnogość alternatyw - zestawienie (poglądowe posortowane alfabetycznie) w tabelce.

Nazwa Adres / Opis
Adage https://github.com/yadage/adage
Argo https://argoproj.github.io/argo/
Aurora http://aurora.apache.org/
Azkaban https://azkaban.github.io/
Azure Data Factory https://docs.microsoft.com/en-us/azure/data-factory
Chronos https://mesos.github.io/chronos/
Falcon http://falcon.apache.org/
Kedro https://github.com/quantumblacklabs/kedro
Luigi https://github.com/spotify/luigi
NiFi https://nifi.apache.org/
Oozie http://oozie.apache.org/
Pinball https://github.com/pinterest/pinball
Schedoscope https://github.com/ottogroup/schedoscope
Sparrow Scheduler https://github.com/radlab/sparrow
comments powered by Disqus

Ostatnie wpisy

Zobacz więcej

Kategorie

About