Big Data Passion

Big Data Passion

Strona tworzona przez pasjonatów i praktyków Big Data

Marcin Wojtczak

W środowiskach Big Data proces przejmowania jest często elementem całości przepływu ETL. Owe przejmowanie danych rozumiemy jako pobieranie treści z takich źródeł jak bazy danych czy pliki. Bardziej złożonymi formami (w większości przypadków) to źródła rzadziej używane na co dzień a więc NoSQL, maile, API, IoT, których skala, różnorodność i złożoność można opisać w dwóch słowach: Volume i Variety. Przy tak dużym surowcu danych naturalną decyzją jest przejście z modelu przetwarzania wsadowego na tryb czasu rzeczywistego (np. przy użyciu narzędzia Kafka). Dochodzimy do kolejnego V jakim jest Velocity: czas, prędkość i zmienność. Czwartym elementem opisywanych danych jest Veracity: jakość, pewność i pochodzenie. I tu wkradają się kolejne wyzwania, gdzie należy stawić czoła: niedokładności danych, niekompletności, niespójności, wątpliwej (nieopisanej) wiarygodności czy nieprawdziwości. Wynik eksploracji danych jest mocno zależny od ich jakości.

Apache Flume i podobne rozwiązania

Flume to projekt Apache Software Foundation, jest opisany jako usługa agregowania danych, która jest: rozproszona, niezawodna i dostępna. Flume jest bardzo elastyczny (umożliwia wiele idei wdrażania) i umożliwia przesyłanie dużych ilości danych (ruch sieciowy, dane społecznościowe, logi) co daje dużo możliwości dostosowania do środowiska. Należy uwypuklić dużą konfigurowalność narzędzia, aplikacja nie ogranicza się tylko do agregacji danych. Przykład, jeśli musimy wdrożyć dane dziennika (tekstowe) do Hadoopa/HDFS to Flume jest odpowiednim narzędziem.

Ograniczeniem wielkości tego bytu jest (technicznie) możliwość zapisu go w przestrzeni dyskowej czy pamięci. Należy zwrócić uwagę, że aplikacja stworzona jest do ciągłego przetwarzania danych, lecz brak tej stabilności strumienia danych nie powoduje problemów w jej funkcjonowaniu.

Warto zarekomendować, że ze względu na swą łatwość konfiguracji Flume jest uwielbianym konstruktorem logów a uznawanie go za ‘realtime data processors’ jest jednym z jego funkcjonalności. Popularność zawdzięcza faktowi, że było to jedne z pierwszych narzędzi do gromadzenia danych w Hadoopie. Flume też ma swoją smutniejszą stronę, jest wypierany przez inne narzędzia (np. takie jak NiFi) i zasadność wdrożenia tej technologi należy poprzedzić analizą a więc zainwestować czas na prześledzenie innych dostępnych rozwiązań.

Podobne technologie to:

  • Sqoop (hadoop <-> RDBMS)
  • Oozie (natywne hadoopowe narzędzie do uporządkowania ETL)
  • NiFi (zarządza i kontroluje przepływy z obsługą myszki - GUI)

Technicznie o Flume

Flume jest zaprojektowany do transportu i przetwarzania regularnie generowanych danych zdarzeń w relatywnie stabilnych, potencjalnie złożonych topologiach. Jak już jesteśmy przy zdarzeniach to je zdefiniujmy jako ciąg znaków (bajtów). Przykładem może być wcześniej wspomniany wpisu logu (tekst) lub obrazek. Reguralnie generowane dane możemy przedstawić jako strumienie danych.

Wymagania systemu ograniczają się tylko do:

  • jdk 1.8+
  • większej ilości pamięci - w zależności od wielkości przetwarzanych zasobów
  • powierzchnia dyskowa na składowanie danych

Przepływ danych

Flum definiuje 3 parametry, które trywialnie można je opisać wg ich przeznaczenia:

Source

  • konfiguracja danych wejściowych

Channel

  • proces przetwarzania (przeskok)

Sink

  • rozdzielenie danych wyjściowych

Składnia w pliku konfiguracyjnym

Aranżacja w pliku:

<Agent>.sources.<Source>.<someProperty> = <someValue>
<Agent>.channel.<Channel>.<someProperty> = <someValue>
<Agent>.sources.<Sink>.<someProperty> = <someValue>

Podsumowanie teorii

Zdarzenie Flume jest zdefiniowane jako jednostka przepływu danych posiadająca ładunek bajtów i opcjonalny zestaw atrybutów ciągu. Agent Flume to proces (JVM), który hostuje komponenty, przez które zdarzenia przepływają ze źródła zewnętrznego do następnego miejsca docelowego (przeskoku). Kropka.

Przygotowanie środowiska

Posłużymy się oraclowym JDK8 (opisane w poprzednich artykułach, więc nie powielamy treści) i Flume w wersji 1.9.0, gdzie możemy pobrać ze strony ftp.man.poznan.pl. Zwróćmy uwagę na weryfikację pobranych treści, ale po kolei.

Po właściwej instalacji Java Developer Kit powinniśmy otrzymać wynik polecenia sprawdzającego wersje javy java -version tak jak poniżej.

java version "1.8.0_221"
Java(TM) SE Runtime Environment (build 1.8.0_221-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.221-b11, mixed mode)

Na maszynie, która będzie nam służyła jako agent tworzymy katalog /opt-dev i w nim pobieramy aplikacje.

root@host104:~# mkdir /opt-dev
root@host104:~# cd /opt-dev
root@host104:/opt-dev# wget http://ftp.man.poznan.pl/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
--2019-10-21 20:54:57--  http://ftp.man.poznan.pl/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
Resolving ftp.man.poznan.pl (ftp.man.poznan.pl)... 150.254.173.17, 2001:808::173:17
Connecting to ftp.man.poznan.pl (ftp.man.poznan.pl)|150.254.173.17|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 67938106 (65M) [application/x-gzip]
Saving to: 'apache-flume-1.9.0-bin.tar.gz'

apache-flume-1.9.0-bin.tar 100%[======================================>]  64.79M  4.89MB/s    in 13s     

2019-10-21 20:55:10 (4.86 MB/s) - 'apache-flume-1.9.0-bin.tar.gz' saved [67938106/67938106]

Teraz pobieramy pliki, które posłużą nam jako zbiory do porównania pobranej binarki z flumem oraz klucz.

# wget -q http://www.apache.org/dist/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz.sha512
# wget -q http://www.apache.org/dist/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz.asc
# wget -q http://www.apache.org/dist/flume/KEYS -O asf_keys

Korzystając z polecenia wget posłużyliśmy się przełącznikiem -q tylko dlatego, aby zaśmiecać wpisów zbędnymi informacjami.

Nasz katalog powinien posiadać trzy pliki

root@host104:/opt-dev# tree -a /opt-dev/
/opt-dev/
|-- apache-flume-1.9.0-bin.tar.gz
|-- apache-flume-1.9.0-bin.tar.gz.asc
|-- apache-flume-1.9.0-bin.tar.gz.sha512
`-- asf_keys

0 directories, 4 files

Mamy już wszystko, importujemy klucze deweloperów.

root@host104:/opt-dev# gpg --import asf_keys 
gpg: key CD14AD58901791C2: public key "Eric Sammer <esammer@apache.org>" imported
...
gpg: Total number processed: 9
gpg:               imported: 9

Dla każdego autora mamy zapis mówiący o zaimportowaniu public key "Ktoś Tam <jego@e-mail>" imported" jego klucza do bazy. Zauważmy, że przy kolejnym imporcie mamy już informację, że klucz nie został ponownie dodany not changed - dzieje się to w sytuacji, gdy już wcześniej w naszej bazie mieliśmy już wpis, przykład poniżej.

root@host104:/opt-dev# gpg --import asf_keys 
gpg: key CD14AD58901791C2: "Eric Sammer <esammer@apache.org>" not changed
...
gpg: Total number processed: 9
gpg:              unchanged: 9

Sposobów weryfikacji jest kilka i zostały opublikowane na stronie pobierania. Czas rozpakować paczkę i powinniśmy otrzymać podobną strukturę jak poniżej.

root@host104:/opt-dev# tar zxf apache-flume-1.9.0-bin.tar.gz
root@host104:/opt-dev# tree -L 2 -a /opt-dev/
/opt-dev/
|-- apache-flume-1.9.0-bin
|   |-- bin
|   |-- CHANGELOG
|   |-- conf
|   |-- DEVNOTES
|   |-- doap_Flume.rdf
|   |-- docs
|   |-- lib
|   |-- LICENSE
|   |-- NOTICE
|   |-- README.md
|   |-- RELEASE-NOTES
|   `-- tools
|-- apache-flume-1.9.0-bin.tar.gz
|-- apache-flume-1.9.0-bin.tar.gz.asc
|-- apache-flume-1.9.0-bin.tar.gz.sha512
`-- asf_keys

6 directories, 11 files

W katalogu conf znajdują się pliki konfiguracyjne, które możemy wykorzystać. Tworzymy kopie plików z dopiskiem .template.

# cd apache-flume-1.9.0-bin
# pwd
/opt-dev/apache-flume-1.9.0-bin
# cp conf/flume-conf.properties.template conf/flume-conf.properties
# cp conf/flume-env.ps1.template conf/flume-env.ps1
# cp conf/flume-env.sh.template conf/flume-env.sh

Po wskazanych działaniach zawartość katalogu konfiguracji powinna być podobna do poniższego schematu.

root@host104:/opt-dev/apache-flume-1.9.0-bin# tree conf/
conf/
|-- flume-conf.properties
|-- flume-conf.properties.template
|-- flume-env.ps1
|-- flume-env.ps1.template
|-- flume-env.sh
|-- flume-env.sh.template
`-- log4j.properties

0 directories, 7 files

Na potrzeby tego ćwiczenie należy utworzyć strukturę katalogów na hdfs. Owe miejsce będzie służyło do zapisów wyników działania flume.

# hdfs dfs -mkdir -p /bigdata/flume/cw1
# hdfs dfs -ls -R /
drwxr-xr-x   - root supergroup          0 2019-10-21 22:34 /bigdata
drwxr-xr-x   - root supergroup          0 2019-10-21 22:34 /bigdata/flume
drwxr-xr-x   - root supergroup          0 2019-10-21 22:34 /bigdata/flume/cw1

Dodajemy pierwsze zapiski mówiące co i gdzie należy wrzucać dane do flume-conf.properties

passion0x00.sources                              = src0x00
passion0x00.sinks                                = snk0x00
passion0x00.channels                             = chn0x00
passion0x00.sources.src0x00.channels             = chn0x00
passion0x00.sinks.snk0x00.channel                = chn0x00
passion0x00.sinks.snk0x00.type                   = HDFS
passion0x00.sinks.snk0x00.fileType               = DataStream
passion0x00.sinks.snk0x00.hdfs.path              = hdfs://h3host200.bigdatalinux.com:8020/bigdata/flume/cw1
passion0x00.sinks.snk0x00.hdfs.filePrefix        = infoout
passion0x00.sinks.snk0x00.hdfs.writeFormat       = Text
passion0x00.sinks.snk0x00.hdfs.rollInterval      = 0
passion0x00.sinks.snk0x00.hdfs.rollSize          = 0
passion0x00.sinks.snk0x00.hdfs.rollCount         = 1000
passion0x00.sinks.snk0x00.hdfs.batchSize         = 100
passion0x00.sinks.snk0x00.hdfs.useLocalTimeStamp = true
passion0x00.sinks.snk0x00.hdfs.callTimeout       = 180000
passion0x00.channels.chn0x00.type                = memory
passion0x00.channels.chn0x00.capacity            = 256
passion0x00.channels.chn0x00.transactionCapacity = 128
passion0x00.sources.src0x00.type                 = org.apache.flume.source.StressSource
passion0x00.sources.src0x00.size                 = 512
passion0x00.sources.src0x00.maxTotalEvents       = 2048
passion0x00.sources.src0x00.batchSize            = 1
passion0x00.sources.src0x00.maxEventsPerSecond   = 128 

Powyższa konfiguracja dla danych wejściowych ustawia losowe znaki z dużym limitem, aby nie zapełnić zbędnymi danymi HDFS-a. Co prawda użyłem org.apache.flume.source.StressSource, które służy do przeprowadzania testów wydajnościowych, lecz takie rozwiązanie jest najprostsze tj. out-of-the-box.

Zapis następuje na katalog na Hadoopie wraz z kilkoma parametrami, które są opcjonalne. Pomocny może się okazać passion0x00.sinks.snk0x00.hdfs.callTimeout w którym określamy czas oczekiwania na potwierdzenie transakcji zapisu.

Pozostaje nam tylko uruchomić na kilka minut a następnie zabić agenta flume. Gdyby wystąpiły problemy z wystartowaniem usługi można się wspomóc dodatkowym przełącznikiem flume.root.looger=DEBUG,console.

time bin/flume-ng agent \
     -n passion0x00 -c conf \
     -f conf/flume-conf.properties

W katalogu /bigdata/flume/cw1 powinniśmy zobaczyć kilka plików.

$ hdfs dfs -ls /bigdata/flume/cw1
Found 3 items
-rw-r--r--   2 hdfs hdfs     531286 2019-10-22 23:31 /bigdata/flume/cw1/infoout.1571779877530
-rw-r--r--   2 hdfs hdfs     531286 2019-10-22 23:31 /bigdata/flume/cw1/infoout.1571779877531
-rw-r--r--   2 hdfs hdfs      25594 2019-10-22 23:31 /bigdata/flume/cw1/infoout.1571779877532.tmp

Nie działa?

W moim przypadku miałem problemy z połączeniem do HDFS-a w wersji 3.2.1 (wydanie z 22. września 2019). Problem wynikał z m. in. z brakiem nowszej wersji pliku jar odpowiedzialnego za komunikacje z DFS. Test przeprowadzone na hadoop 2.9.2 (19. listopada 2018) dały już pożądane rezultaty. Warto zaznajomić sie z https://www.ietf.org/rfc/rfc2119.txt a następnie ze zdaniem Apache Hadoop revisions SHOULD retain binary compatability such that end-user applications continue to work without any modifications - to daje nam ogólny pogląd na nasze problemy :)

Na zakończenie. Zadanie było wykonywane na kilku środowiskach, stąd nie należy sie sugerować nazwa hosta czy GID/UID na wylistowanych przykładach.

comments powered by Disqus

Ostatnie wpisy

Zobacz więcej

Kategorie

About