Big Data Passion

Big Data Passion

Strona tworzona przez pasjonatów i praktyków Big Data

Radosław Szmit

Poniżej lista przydatnych poleceń Apache Kafka.

Dla ułatwienia korzystania, używane są odpowiednie zmienne systemowe oraz aliasy.

Jeśli korzystamy z CLI w postaci obrazu Docker ustawmy aliasy jak poniżej:

docker run --rm -it confluentinc/cp-kafka --version
docker run --rm -it confluentinc/cp-kafka kafka-topics --version

alias kafka-topics="docker run --rm -it confluentinc/cp-kafka kafka-topics"

Jeśli jednak wolimy zainstalować Kafka CLI to ściągamy paczkę ze strony Apache Kafka i rozpakowujemy do wybranego katalogu, w moim przypadku ~/programs/kafka/

alias kafka-topics="~/programs/kafka/kafka_2.12-2.8.2/bin/kafka-topics.sh"
alias kafka-console-consumer="~/programs/kafka/kafka_2.12-2.8.2/bin/kafka-console-consumer.sh"
alias kafka-console-producer="~/programs/kafka/kafka_2.12-2.8.2/bin/kafka-console-producer.sh"
alias kafka-consumer-groups="~/programs/kafka/kafka_2.12-2.8.2/bin/kafka-consumer-groups.sh"

Gdy korzystamy z Kafki z zabezpieczonym dostępem (certyfikat lub Kerberos) to w większości poleceniach będziemy potrzebować przekazać konfigurację do polecenia dlatego ustawiamy odpowiednia zmienna

export KAFKA_CLIENT_CONFIG=client.properties

Jesli wszystko działa

kafka-topics --version
export KAFKA_BROKER="localhost:9092"
export TOPIC=test-topic
export GROUP=radek

Topiki

lista:

kafka-topics --bootstrap-server $KAFKA_BROKER --list

kafka-topics --bootstrap-server $KAFKA_BROKER --list \
  --command-config $KAFKA_CLIENT_CONFIG

tworzenie:

kafka-topics --create --bootstrap-server $KAFKA_BROKER --topic $TOPIC

kafka-topics --create --bootstrap-server $KAFKA_BROKER --topic $TOPIC --config cleanup.policy=compact

kafka-topics --create --bootstrap-server $KAFKA_BROKER  --topic $TOPIC --replication-factor 3 --partitions 10

opis:

kafka-topics --describe --bootstrap-server $KAFKA_BROKER

kafka-topics --describe --bootstrap-server $KAFKA_BROKER --topic $TOPIC

kafka-topics --describe --bootstrap-server $KAFKA_BROKER --topic $TOPIC \
  --command-config $KAFKA_CLIENT_CONFIG

kafka-topics --zookeeper $KAFKA_ZOOKEEPER --describe --under-replicated-partitions

kafka-topics --describe --zookeeper $KAFKA_ZOOKEEPER --topic __consumer_offsets

usuwanie

kafka-topics --bootstrap-server $KAFKA_BROKER --delete --topic $TOPIC

Do usuwania należy włączyć tą możliwość w ustawieniach (delete.topic.enable), inaczej tylko oznacza topic jako usunięty.

edycja:

kafka-topics --alter --zookeeper $KAFKA_ZOOKEEPER --topic $TOPIC --partitions 20
kafka-topics --alter --zookeeper $KAFKA_ZOOKEEPER --topic $TOPIC --replication-factor 3
kafka-topics --alter --zookeeper $KAFKA_ZOOKEEPER --topic $TOPIC --replica-assignment 1003:1004:1005,0:1:2,0:1:2,2:1:0

kafka-topics --alter --zookeeper $KAFKA_ZOOKEEPER --topic __consumer_offsets --replication-factor 3

Producent i konsument w konsoli

kafka-console-producer --broker-list $KAFKA_BROKER --topic $TOPIC

kafka-console-producer --broker-list $KAFKA_BROKER --topic $TOPIC --property "parse.key=true" --property "key.separator=|"

cat $TOPIC.json | kafka-console-producer --broker-list $KAFKA_BROKER --topic $TOPIC \
  --property "parse.key=true" --property "key.separator=|"

cat $TOPIC.json | kafka-console-producer --broker-list $KAFKA_BROKER --topic $TOPIC \
  --property "parse.key=true" --property "key.separator=|" \
  --property key.serializer=org.apache.kafka.common.serialization.IntegerSerializer \
  --property value.serializer=org.apache.kafka.common.serialization.StringSerializer \

time for i in {1..10}; do
  echo $i'|{"message": "message'$i'"}' | \
  kafka-console-producer --broker-list $KAFKA_BROKER --topic $TOPIC \
  --property "parse.key=true" --property "key.separator=|"; 
done

time for i in {11..100}; do echo $i'|{"message": "message'$i'"}' | kafka-console-producer --broker-list $KAFKA_BROKER --topic $TOPIC --property "parse.key=true" --property "key.separator=|"; done
kafka-console-consumer --bootstrap-server $KAFKA_BROKER --topic $TOPIC --from-beginning

kafka-console-consumer --bootstrap-server $KAFKA_BROKER --topic $TOPIC --from-beginning \
  --property print.key=true --property print.value=true --property key.separator="|" \
  --consumer.config $KAFKA_CLIENT_CONFIG

kafka-console-consumer --bootstrap-server $KAFKA_BROKER --topic $TOPIC --from-beginning \
  --property print.key=true --property print.value=true --property key.separator="|" \
  --property key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer \
  --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
  --consumer.config $KAFKA_CLIENT_CONFIG

kafka-console-consumer --bootstrap-server $KAFKA_BROKER --topic $TOPIC --from-beginning \
  --property print.key=true --property print.value=true --property key.separator="|" \
  --property key.deserializer=org.apache.kafka.common.serialization.LongDeserializer \
  --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
  --consumer.config $KAFKA_CLIENT_CONFIG

kafka-console-consumer --bootstrap-server $KAFKA_BROKER \
    --topic test-topic-out \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

kafka-console-consumer --bootstrap-server $KAFKA_BROKER --topic $TOPIC --from-beginning \
  --property print.key=true --property print.value=true --property key.separator="|" \
  | tee -a $TOPIC.json

kafka-console-consumer --bootstrap-server $KAFKA_BROKER --topic $TOPIC --from-beginning \
  --property print.key=true --property print.value=true --property key.separator="|" \
  --consumer.config $KAFKA_CLIENT_CONFIG \
  | tee -a $TOPIC.json

Grupy konsumentów

kafka-consumer-groups --bootstrap-server $KAFKA_BROKER --list

kafka-consumer-groups --bootstrap-server $KAFKA_BROKER --describe --group $GROUP \
    --command-config $KAFKA_CLIENT_CONFIG

kafka-consumer-groups --bootstrap-server $KAFKA_BROKER  --topic $TOPIC  --reset-offsets --to-earliest --group $GROUP \
  --command-config $KAFKA_CLIENT_CONFIG

kafka-consumer-groups --bootstrap-server $KAFKA_BROKER  --topic $TOPIC  --reset-offsets --to-earliest --group $GROUP \
  --execute \
  --command-config $KAFKA_CLIENT_CONFIG

Zarządzanie partycjami

bin/kafka-reassign-partitions.sh --zookeeper $KAFKA_ZOOKEEPER --reassignment-json-file /tmp/increase-replication-factor.json --execute

Uruchamianie i zatrzymywanie

zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

broker:

bin/kafka-server-start.sh config/server.properties

ZooKeeper

instalacja oddzielna:

cd $ZOOKEEPER_HOME
bin/zkCli.sh

wersja z paczki Apache Kafka:

bin/zookeeper-shell.sh $KAFKA_ZOOKEEPER

polecenia konsoli ZooKeepera:

ls /brokers
ls /brokers/ids
ls /brokers/topics
ls /brokers/seqid

Narzędzia

bin/bin/kafka-consumer-groups.sh \
    --topic $TOPIC --zookeeper $KAFKA_ZOOKEEPER \
    --group test_group
comments powered by Disqus

Ostatnie wpisy

Zobacz więcej

Kategorie

About