Poniżej lista przydatnych poleceń Apache Kafka.
Dla ułatwienia korzystania, używane są odpowiednie zmienne systemowe oraz aliasy.
Jeśli korzystamy z CLI w postaci obrazu Docker ustawmy aliasy jak poniżej:
docker run --rm -it confluentinc/cp-kafka --version
docker run --rm -it confluentinc/cp-kafka kafka-topics --version
alias kafka-topics="docker run --rm -it confluentinc/cp-kafka kafka-topics"
Jeśli jednak wolimy zainstalować Kafka CLI to ściągamy paczkę ze strony Apache Kafka i rozpakowujemy do wybranego katalogu, w moim przypadku ~/programs/kafka/
alias kafka-topics="~/programs/kafka/kafka_2.12-2.8.2/bin/kafka-topics.sh"
alias kafka-console-consumer="~/programs/kafka/kafka_2.12-2.8.2/bin/kafka-console-consumer.sh"
alias kafka-console-producer="~/programs/kafka/kafka_2.12-2.8.2/bin/kafka-console-producer.sh"
alias kafka-consumer-groups="~/programs/kafka/kafka_2.12-2.8.2/bin/kafka-consumer-groups.sh"
Gdy korzystamy z Kafki z zabezpieczonym dostępem (certyfikat lub Kerberos) to w większości poleceniach będziemy potrzebować przekazać konfigurację do polecenia dlatego ustawiamy odpowiednia zmienna
export KAFKA_CLIENT_CONFIG=client.properties
Jesli wszystko działa
kafka-topics --version
export KAFKA_BROKER="localhost:9092"
export TOPIC=test-topic
export GROUP=radek
Topiki
lista:
kafka-topics --bootstrap-server $KAFKA_BROKER --list
kafka-topics --bootstrap-server $KAFKA_BROKER --list \
--command-config $KAFKA_CLIENT_CONFIG
tworzenie:
kafka-topics --create --bootstrap-server $KAFKA_BROKER --topic $TOPIC
kafka-topics --create --bootstrap-server $KAFKA_BROKER --topic $TOPIC --config cleanup.policy=compact
kafka-topics --create --bootstrap-server $KAFKA_BROKER --topic $TOPIC --replication-factor 3 --partitions 10
opis:
kafka-topics --describe --bootstrap-server $KAFKA_BROKER
kafka-topics --describe --bootstrap-server $KAFKA_BROKER --topic $TOPIC
kafka-topics --describe --bootstrap-server $KAFKA_BROKER --topic $TOPIC \
--command-config $KAFKA_CLIENT_CONFIG
kafka-topics --zookeeper $KAFKA_ZOOKEEPER --describe --under-replicated-partitions
kafka-topics --describe --zookeeper $KAFKA_ZOOKEEPER --topic __consumer_offsets
usuwanie
kafka-topics --bootstrap-server $KAFKA_BROKER --delete --topic $TOPIC
Do usuwania należy włączyć tą możliwość w ustawieniach (delete.topic.enable), inaczej tylko oznacza topic jako usunięty.
edycja:
kafka-topics --alter --zookeeper $KAFKA_ZOOKEEPER --topic $TOPIC --partitions 20
kafka-topics --alter --zookeeper $KAFKA_ZOOKEEPER --topic $TOPIC --replication-factor 3
kafka-topics --alter --zookeeper $KAFKA_ZOOKEEPER --topic $TOPIC --replica-assignment 1003:1004:1005,0:1:2,0:1:2,2:1:0
kafka-topics --alter --zookeeper $KAFKA_ZOOKEEPER --topic __consumer_offsets --replication-factor 3
Producent i konsument w konsoli
kafka-console-producer --broker-list $KAFKA_BROKER --topic $TOPIC
kafka-console-producer --broker-list $KAFKA_BROKER --topic $TOPIC --property "parse.key=true" --property "key.separator=|"
cat $TOPIC.json | kafka-console-producer --broker-list $KAFKA_BROKER --topic $TOPIC \
--property "parse.key=true" --property "key.separator=|"
cat $TOPIC.json | kafka-console-producer --broker-list $KAFKA_BROKER --topic $TOPIC \
--property "parse.key=true" --property "key.separator=|" \
--property key.serializer=org.apache.kafka.common.serialization.IntegerSerializer \
--property value.serializer=org.apache.kafka.common.serialization.StringSerializer \
time for i in {1..10}; do
echo $i'|{"message": "message'$i'"}' | \
kafka-console-producer --broker-list $KAFKA_BROKER --topic $TOPIC \
--property "parse.key=true" --property "key.separator=|";
done
time for i in {11..100}; do echo $i'|{"message": "message'$i'"}' | kafka-console-producer --broker-list $KAFKA_BROKER --topic $TOPIC --property "parse.key=true" --property "key.separator=|"; done
kafka-console-consumer --bootstrap-server $KAFKA_BROKER --topic $TOPIC --from-beginning
kafka-console-consumer --bootstrap-server $KAFKA_BROKER --topic $TOPIC --from-beginning \
--property print.key=true --property print.value=true --property key.separator="|" \
--consumer.config $KAFKA_CLIENT_CONFIG
kafka-console-consumer --bootstrap-server $KAFKA_BROKER --topic $TOPIC --from-beginning \
--property print.key=true --property print.value=true --property key.separator="|" \
--property key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--consumer.config $KAFKA_CLIENT_CONFIG
kafka-console-consumer --bootstrap-server $KAFKA_BROKER --topic $TOPIC --from-beginning \
--property print.key=true --property print.value=true --property key.separator="|" \
--property key.deserializer=org.apache.kafka.common.serialization.LongDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--consumer.config $KAFKA_CLIENT_CONFIG
kafka-console-consumer --bootstrap-server $KAFKA_BROKER \
--topic test-topic-out \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka-console-consumer --bootstrap-server $KAFKA_BROKER --topic $TOPIC --from-beginning \
--property print.key=true --property print.value=true --property key.separator="|" \
| tee -a $TOPIC.json
kafka-console-consumer --bootstrap-server $KAFKA_BROKER --topic $TOPIC --from-beginning \
--property print.key=true --property print.value=true --property key.separator="|" \
--consumer.config $KAFKA_CLIENT_CONFIG \
| tee -a $TOPIC.json
Grupy konsumentów
kafka-consumer-groups --bootstrap-server $KAFKA_BROKER --list
kafka-consumer-groups --bootstrap-server $KAFKA_BROKER --describe --group $GROUP \
--command-config $KAFKA_CLIENT_CONFIG
kafka-consumer-groups --bootstrap-server $KAFKA_BROKER --topic $TOPIC --reset-offsets --to-earliest --group $GROUP \
--command-config $KAFKA_CLIENT_CONFIG
kafka-consumer-groups --bootstrap-server $KAFKA_BROKER --topic $TOPIC --reset-offsets --to-earliest --group $GROUP \
--execute \
--command-config $KAFKA_CLIENT_CONFIG
Zarządzanie partycjami
bin/kafka-reassign-partitions.sh --zookeeper $KAFKA_ZOOKEEPER --reassignment-json-file /tmp/increase-replication-factor.json --execute
Uruchamianie i zatrzymywanie
zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
broker:
bin/kafka-server-start.sh config/server.properties
ZooKeeper
instalacja oddzielna:
cd $ZOOKEEPER_HOME
bin/zkCli.sh
wersja z paczki Apache Kafka:
bin/zookeeper-shell.sh $KAFKA_ZOOKEEPER
polecenia konsoli ZooKeepera:
ls /brokers
ls /brokers/ids
ls /brokers/topics
ls /brokers/seqid
Narzędzia
bin/bin/kafka-consumer-groups.sh \
--topic $TOPIC --zookeeper $KAFKA_ZOOKEEPER \
--group test_group