카프카에서 제공하는 카프카 커맨드 라인 툴은 카프카를 운영할 때 가장 많이 쓰는 도구다. 브로커 운영에 필요한 다양한 명령을 내릴 수 있다. 카프카 클라이언트 애플리케이션을 운영할 때는 카프카 클러스터와 연동해서 데이터 주고 받는 것도 중요하지만 토픽, 파티션 개수 변경과 같은 명령을 실행해야할 때도 생긴다. 그렇기 때문에 카프카 커맨드 라인 툴과 각 툴별 옵션에 대해서 알고 있으면 좋다.
topic에 관련된 명령을 실행할 수 있다. 토픽이란 카프카에서 데이터를 구분하는 기본적인 개념이다. 카프카 클러스터에 토픽은 여러 개 존재할 수 있다. 토픽에는 파티션(partition)이 존재하는데 파티션의 개수는 최소 1개 이상이다. 파티션은 카프카에서 토픽을 구성하는데 아주 중요한 요소다. 파티션을 통해 한 번에 처리할 수 있는 데이터 양을 늘릴 수 있고 토픽 내에서도 파티션을 통해 데이터의 종류를 나누어 처리할 수 있기 때문이다.
토픽을 생성하는 상황은
- 카프카 컨슈머 또는 프로듀서가 카프카 브로커에 생성되지 않은 토픽에 대해 데이터를 요청할 떄
- 커맨드 라인 툴로 명시적으로 토픽을 생성할 때
명시적으로 생성하면 데이터 특성에 따라 옵션을 다르게 설정할 수 있다. 따라서 명령어로 명시적으로 처리하는 것이 좋다.
./bin/kafka-topics.sh \
--create \ ## 토픽 생성 명시
--bootstrap-server localhost:9092 \ ## 카프카 클러스터를 구성하는 브로커들의 IP, port를 적는다.
--topic hello.kafka ## 카프카 클러스터 정보와 토픽 이름만으로 토픽을 생성할 수 있었다.
## 클러스터 정보와 토픽 이름은 필수 값이다.
## 옵션을 주려면 아래와 같이 작성하면된다.
## WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
## Created topic hello.kafka.
-------------
./bin/kafka-topics.sh \
--create \
--bootstrap-server localhost:9092 \
--partitions 3 \ ## 파티션 개수를 지저할 수 있다. 기본은 1개다.
--replication-factor 1 \ ##토픽의 파티션을 복제할 개수를 적는다. 1은 복제하지 않는다. 2는 1개의 복제본을 사용하겠다는 의미다.
## 파티션 데이터는 각 브로커마다 저장되며, 한 개의 브로커에 장애가 발생하더라도 나머지 한 개 브로커에 저장된 데이터를
## 사용하여 안전하게 데이터 처리를 지속할 수 있다.
## 실무에서는 3개 이상으로 운영하는 것이 일반적이다.
--config retention.ms=172800000 \ ## --config로 kafka-topics.sh에 포함되지 않은 추가 설정을 할 수 있다. 이 옵션은 토픽 데이터 유지 기간을 의미한다.
--topic hello.kafka
bash-5.2# ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
hello.kafka
--list
옵션으로 확인할 수 있다. 토픽이 몇 개나 생성됐는지, 어떤 이름의 토픽이 있는지 확인하기 위해서 사용한다.
카프카에 내부 관리를 위한 internal topic
이 존재하는데, 실질적으로 사용하지 않으므로 --exclude-internal
으로
목록에서 제외할 수 있다.
bash-5.2# ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic hello.kafka
Topic: hello.kafka PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: hello.kafka Partition: 0 Leader: 0 Replicas: 0 Isr: 0
--describe
옵션을 사용해서 확인할 수 있다. 파티션 개수, 복제된 파티션이 위치한 브로커 번호, 기타 토픽을 구성하는 설정들을 출력한다. 토픽이 가진 파티션의
리더가 현재 어느 브로커에 존재하는지도 확인할 수 있다. Leader: 0
이므로 0번 브로커에 위치하고 있음을 의미한다. 여러 대로 브로커로 클러스터링할 때
토픽의 리더 파티션이 일부 브로커로 몰려있을 수 있을 때, 이를 확인하기 위해서 --describe
를 사용할 수 있다.
만약 몰려 있다면 부하 분산에 실패해서 네트워크 대역의 이슈가 생길 수 있다.
토픽 옵션 변경은 kafaka-topics.sh
또는 kafka-configs.sh
두 개를 사용해야 한다. 파티션 개수 변경은 kafaka-topics.sh
, 토픽 삭제 정책인 retention 기간을 변경하려면
kafka-configs.sh
를 사용해야 한다.
파티션 추가
bash-5.2# ./bin/kafka-topics.sh \
--bootstrap-server \
localhost:9092 \
--topic hello.kafka \
--alter \
--partitions 4 ### --alter와 --partitions으로 파티션 변경할 수 있다. 늘릴수는 있지만 줄일 수는 없다. 신중해야 한다.
bash-5.2# ./bin/kafka-topics.sh \
--bootstrap-server \
localhost:9092 \
--describe \
--topic hello.kafka
Topic: hello.kafka PartitionCount: 4 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: hello.kafka Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: hello.kafka Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: hello.kafka Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: hello.kafka Partition: 3 Leader: 0 Replicas: 0 Isr: 0
### 파티션 번호는 0부터 시작한다.
토픽 retention.ms 수정
bash-5.2# ./bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name hello.kafka \
--alter \
--add-config retention.ms=86400000 ### --alter, --add-config로 이미 존재하는 설정 값은 바꾸고 없다면 신규로 추가한다.
Completed updating config for topic hello.kafka.
bash-5.2# ./bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name hello.kafka \
--describe
Dynamic configs for topic hello.kafka are:
retention.ms=86400000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=86400000} ### --describe로 확인할 수 있다.
토픽에 넣는 데이터는 레코드(record)
라고 부르며 메시지는 키: 값
쌍이다.
bash-5.2# ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic hello.kafka
# 이후 입력모드로 전환된다.
>hello
>kafka
>0
>1
>2
>3
>4
>5
응답 모드로 돌입하고 메시지를 입력하고 enter를 누르면 메시지 값이 전송된다. 레코드 값은 utf-8기반으로 Byte로 변환되고 ByteArraySerializer로만 직렬화된다. 결론적으로 String이 아닌 타입으로는 직렬화하여 전송할 수 있다. 다른 타입으로 직렬화해서 데이터를 브로커로 전송하고 싶다면 카프카 프로듀서 애플리케이션을 직접 개발해야 한다.
bash-5.2# ./bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic hello.kafka \
--property "parse.key=true" \ ##메시지 키를 추가할 수 있다.
--property "key.separator=:" ## 구분자를 선언한다. 따로 선언하지 않으면 `\t`가 기본값이된다.
## 이 설정을 하고 구분자를 넣지 않고 입력하면 KafkaException이 발생한다.
##
## org.apache.kafka.common.KafkaException: No key found on line 4: key
## at kafka.tools.ConsoleProducer$LineMessageReader.readMessage(ConsoleProducer.scala:289)
## at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:51)
>key1:no1
>key2:no2
>key3:no3
>
메시지 키와 메시지 값을 함께 전송한 레코드는 토픽의 파티션에 저장된다. 메시지 키가 null이라면 프로듀서가 파티션으로 전송할 때 레코드 배치 단위(레코드 전송 묶음)로 라운드 로빈으로 전송한다. 메시지 키가 있다면 키의 해시값을 작성하여 존재하는 파티션 중 한 개에 할당된다. 이로 인해 메시지 키가 동일한 경우에는 동일한 파티션으로 전송된다. 다만, 이 경우 프로듀서에 설정된 파티셔너에 의해 결정되는데, 기본 파티셔너의 경우 이와 같은 동작을 보장한다.
hello.kafka로 전송한 데이터는 kafka-console.consumer.sh
로 확인할 수 있다. 필수적으로 --bootstrap-server
, --topic
옵션이 필수다.
추가로 --from-begining
을 주면 가장 처음에 저장된 데이터부터 출력한다.
bash-5.2# ./bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic hello.kafka \
--from-beginning
1
2
no2
4
5
hello
no3
kafka
0
3
no1
만약 데이터의 메시지 키와 값을 확인하고 싶으면 --property
옵션을 사용하면 된다.
bash-5.2# ./bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic hello.kafka \
--from-beginning \
--property print.key=true \ ## 기본 값이 false이기 때문에 true로 키를 출력
--property key.separator=":" \ ## 기본은 `\t`가 된다.
--group hello-group ## 신규 컨슈머 그룹을 생성했다. 컨슈머 크룹은 1개 이상의 컨슈머로 이루어져 있다.
## 이 컨슈머 그룹을 통해서 가져간 토픽에 대해서는 가져간 메시지에 대해서 커밋을 한다.
## 커밋이란 컨슈머가 특정레코드까지 처리 완료했다고 레코드 오프셋 번호를 카프카 브로커에 저장하는 것이다.
## 커밋 정보는 __consumer_offsets 이름으로 내부 토픽에 저장된다.
null:1
null:2
key2:no2
null:4
null:5
null:hello
key3:no3
null:kafka
null:0
null:3
key1:no1
눈여겨 볼 것은 전송했던 순서와 출력 순서가 다르다는 것이다. 이는 카프카의 핵심인 파티션 개념 때문에 생기는 현상이다.
kafka-console-consumer.sh
명령어를 통해서 토픽의 데이터를 가져가게 되면 토픽의 모든 파티션으로부터 동일한 중요도로 데이터를 가져간다.
이로 인해 프로듀서가 토픽에 넣은 데이터의 순서와 컨슈머가 토픽에 가져간 데이터의 순서가 달라지는 것이다. 만약 토픽에 넣은 데이터의 순서를 보장하고 싶다면
가장 좋은 방법은 파티션 1개로 구성된 토픽을 만드는 것이다.
bash-5.2# ./bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list hello-group
hello-group
--list
컨슈머 그룹의 리스트를 확인하는 옵션이다. 컨슈머 그룹을 통해 현재 컨슈머 그룹이 몇 개나 생성됐는지, 어떤 이름의 컨슈머 그룹이 존재하는지 확인할 수 있다.
이렇게 확인한 컨슈머 그룹 이름을 토대로 컨슈머 그룹이 어떤 토픽의 데이터를 가져가는지 확인할 때 쓴다.
bash-5.2# ./bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group hello-group \ ##어떤 컨슈머 그룹에 대한 상세를 볼 것인지 정해야 한다.
--describe ## 상세를 확인할 수 있는 옵션
Consumer group 'hello-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
hello-group hello.kafka 3 3 3 0 - - -
hello-group hello.kafka 2 2 2 0 - - -
hello-group hello.kafka 1 2 2 0 - - -
hello-group hello.kafka 0 4 4 0 - - -
# GROUP/TOPIC/PARTITION : 조회한 컨슈머 그룹이 마지막으로 커밋한 토픽과 파티션을 나타낸다. hello-group 이름의 컨슈머 그룹이 hello.kafka 토픽의 3번 파티션의 레코드가 마지막으로 커밋된 것을 알 수 있다.
# CURRENT-OFFSET : 토픽의 파티션에 가장 최신 오프셋을 보여준다.
# LOG-END-OFFSET : 그룹의 컨슈머가 어느 오프셋까지 커밋했는지 알 수 있다. CURRENT-OFFSET은 LOG-END-OFFSET과 같거나 작은 값일 수 있다.
# LAG : 컨슈머 그룹이 토픽의 파티션에 있는 데이털르 가져가는데 얼마나 지연이 발생하는지 나타내는 지표다. 랙은 컨슈머 그룹이 커밋한 오프셋과 해당 파티션의 가장 최신 오프셋 간의 차이다.
# CONSUMER_ID : 컨슈머 토픽(파티션) 할당을 카프카 내부적으로 구분하기 위한 id다. 이 값은 client id에 uuid 값을 붙여서 자동 할당되어 유니크 값이된다.
# HOST : 컨슈머가 동작하는 host 명을 출력한다. 이 값을 통해서 카프카에 붙은 컨슈머의 호스트 명 또는 IP를 알 수 있다.
# CLIENT-ID : 컨슈머에 할당된 id다. 지정할 수 있으며 지정하지 않으면 자동 생성 값이다.
해당 내용은 운영에서, 최적화에서 활용할 수 있는 유용한 정보다. 랙이 증가할 때 프로듀서가 데이터를 토픽으로 전달하는 속도에 비해 컨슈머의 처리량이 느리다는 의미가 된다. 카프카를 운영할 떄 컨슈머 그룹 이름을 알아내고 컨슈머 그룹의 상세 정보를 파악하면 카프카에 연결된 커슈머의 호스트명 또는 IP를 알아낼 수 있다.
bash-5.2# ./bin/kafka-verifiable-producer.sh \
--bootstrap-server localhost:9092 \
--max-messages 10 \ ## --max-messages는 kafka-verifiable-producer.sh로 보내는 데이터 개수를 지정한다. 만약 -1을 입력하면 kafka-verifiable-producer.sh가 종료될 때까지 계속 데이터를 토픽으로 전달한다.
--topic verify-test ## 데이터를 방을 대상 토픽을 입력한다.
{"timestamp":1708779801073,"name":"startup_complete"} ## 최초 실행 시점이 startup_complete와 함께 출력된다.
[2024-02-24 13:03:21,155] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {verify-test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
{"timestamp":1708779801281,"name":"producer_send_success","key":null,"value":"0","offset":0,"topic":"verify-test","partition":0}
## 메시지별로 보낸 시간, 메시지 키, 메시지 값, 토픽, 저장된 파티션, 저장된 오프셋 번호가 출력된다.
{"timestamp":1708779801282,"name":"producer_send_success","key":null,"value":"1","offset":1,"topic":"verify-test","partition":0}
{"timestamp":1708779801282,"name":"producer_send_success","key":null,"value":"2","offset":2,"topic":"verify-test","partition":0}
{"timestamp":1708779801282,"name":"producer_send_success","key":null,"value":"3","offset":3,"topic":"verify-test","partition":0}
{"timestamp":1708779801282,"name":"producer_send_success","key":null,"value":"4","offset":4,"topic":"verify-test","partition":0}
{"timestamp":1708779801282,"name":"producer_send_success","key":null,"value":"5","offset":5,"topic":"verify-test","partition":0}
{"timestamp":1708779801282,"name":"producer_send_success","key":null,"value":"6","offset":6,"topic":"verify-test","partition":0}
{"timestamp":1708779801283,"name":"producer_send_success","key":null,"value":"7","offset":7,"topic":"verify-test","partition":0}
{"timestamp":1708779801283,"name":"producer_send_success","key":null,"value":"8","offset":8,"topic":"verify-test","partition":0}
{"timestamp":1708779801283,"name":"producer_send_success","key":null,"value":"9","offset":9,"topic":"verify-test","partition":0}
{"timestamp":1708779801287,"name":"shutdown_complete"}
{"timestamp":1708779801288,"name":"tool_data","sent":10,"acked":10,"target_throughput":-1,"avg_throughput":46.082949308755765}
## 통계값이 출력된다. 평균 처리량을 확인할 수 있다.
bash-5.2# ./bin/kafka-verifiable-consumer.sh \
--bootstrap-server localhost:9092 \
--topic verify-test \ ## 가져오고자 하는 토픽 이름
--group-id test-group ## 컨슈머 그룹을 지정한다.
{"timestamp":1708780052304,"name":"startup_complete"}
{"timestamp":1708780052422,"name":"partitions_assigned","partitions":[{"topic":"verify-test","partition":0}]}
## 컨슈머가 시작되면 startup_complete 문자열과 시작 시간이 timestamp와 함께 출력된다. 토픽에서 데이터를 가져오기 위해서 파티션에 할당하는 과정을 거친다.
## 여기서는 파티션 0에 할당됐다.
{"timestamp":1708780052462,"name":"records_consumed","count":10,"partitions":[{"topic":"verify-test","partition":0,"count":10,"minOffset":0,"maxOffset":9}]}
## 컨슈머는 한 번에 다수의 메시지를 가져와서 처리하므로 한 번에 10개의 메시지를 정상적으로 적으로 받았음을 알 수 있다.
{"timestamp":1708780052466,"name":"offsets_committed","offsets":[{"topic":"verify-test","partition":0,"offset":10}],"success":true}
## 메시지 수신 이후 10번 오프셋 커밋 여부도 확인할 수 있다.
이미 적재된 토픽의 데이터를 지우는 방법으로는 kafka-delete-records.sh
를 사용할 수 있다. 이미 적재된 토픽의 데이터 중 가장 오래된 데이터(가장 낮은 숫자의 오프셋)부터
특정 시점의 오프셋까지 삭제할 수 있다.
vi delete-topic.json ## 삭제에 대한 정보를 파일로 저장해서 사용해야 한다. 토픽, 파티션, 오프셋 정보가 들어가야 한다.
# {"partitions": [{"topic": "test", "partition": 0, "offset": 50}], "version":1}
bash-5.2# ./bin/kafka-delete-records.sh \
--bootstrap-server localhost:9092 \
--offset-json-file delete-topic.json ## 삭제 토픽, 파티션, 오프셋에 대한 정보를 담은 delete-topic.json을 --offset-json-file로 지정하면 읽어서 데이터를 삭제한다.
Executing records delete operation
Records delete operation completed:
partition: test-0 low_watermark: 50 ##삭제가 완료되면 각 파티션에서 삭제된 오프셋 정보를 출력한다.
여기서 중요한 부분은 특정 레코드 하나만 삭제하는 것이 아니라 오래된 것 ~ 지정한 오프셋까지 삭제된다는 점이다. 즉, 특정 데이터만 골라서 삭제할 수 없다는 의미가 된다.