-
Kafka consumer group엔지니어링/Kafka 2022. 3. 4. 10:47728x90
카프카 컨슈머 그룹을 조작하는 방법 및 특정 오프셋을 리셋하는 방법을 알아보겠다.
List Groups
컨슈머 그룹의 리스트를 출력한다.
$ pwd /kafka/bin
$ ./kafka-consumer-groups.sh --bootstrap-server mybroker:9092 --list kafka-streams-101 sy-consumer1 sy-consumer2
Describe Group
특정 컨슈머 그룹의 정보를 출력한다.
$ ./kafka-consumer-groups.sh --bootstrap-server mybroker:9092 --group sy-consumer1 --describe Consumer group 'sy-consumer1' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID sy-consumer1 stream.test.sy.string.220228 0 30 30 0 - - -
Delete Group
특정 컨슈머 그룹을 삭제한다.
$ ./kafka-consumer-groups.sh --bootstrap-server mybroker:9092 --group sy-consumer2 --delete Deletion of requested consumer groups ('sy-consumer2') was successful.
Consume with Group
특정 컨슈머 그룹을 지정하여 토픽을 컨슈밍한다.
$ ./kafka-console-consumer.sh --bootstrap-server mybroker:9092 --topic stream.test.sy.string.220228 --from-beginning --group sy-consumer1 APPLE1 BANANA2 WATERMELON3 LEMON4 FISH5 MEAT6 CHERRY7 NOODLE8 COFFEE9 ...(하략)
Reset Offset
CLI 실행을 위해 --execute 플래그를 붙여준다.
--to-earliest : 가장 처음 오프셋(작은 번호)으로 리셋
$ ./kafka-consumer-groups.sh --bootstrap-server mybroker:9092 --group sy-consumer1 --topic stream.test.sy.string.220228 --reset-offsets --to-earliest --execute GROUP TOPIC PARTITION NEW-OFFSET sy-consumer1 stream.test.sy.string.220228 0 0
--to-latest : 가장 마지막 오프셋(큰 번호)으로 리셋
$ ./kafka-consumer-groups.sh --bootstrap-server mybroker:9092 --group sy-consumer1 --topic stream.test.sy.string.220228 --reset-offsets --to-latest --execute GROUP TOPIC PARTITION NEW-OFFSET sy-consumer1 stream.test.sy.string.220228 0 30
--to-current : 현 시점 기준 오프셋으로 리셋
테스트를 위해 최대 메시지 개수를 7로 정하여 컨슈밍한다.
$ ./kafka-console-consumer.sh --bootstrap-server mybroker:9092 --group sy-consumer1 --topic stream.test.sy.string.220228 --from-beginning --max-messages 7 APPLE1 BANANA2 WATERMELON3 LEMON4 FISH5 MEAT6 CHERRY7 Processed a total of 7 messages
현 시점의 오프셋으로 리셋한다.
$ ./kafka-consumer-groups.sh --bootstrap-server mybroker:9092 --group sy-consumer1 --topic stream.test.sy.string.220228 --reset-offsets --to-current --execute GROUP TOPIC PARTITION NEW-OFFSET sy-consumer1 stream.test.sy.string.220228 0 7
--to-datetime {YYYY-MM-DDTHH:mmSS.sss} : 특정 일시로 오프셋 리셋(레코드 타임스탬프 기준)
테스트를 위해 --property print.timestamp=true 플래그를 활용하여 각 메시지가 쓰여진 시간(timestamp)를 확인한다.
$ ./kafka-console-consumer.sh --bootstrap-server mybroker:9092 --group sy-consumer1 --topic stream.test.sy.string.220228 --property print.timestamp=true CreateTime:1646012868689 APPLE1 CreateTime:1646012872828 BANANA2 CreateTime:1646012876854 WATERMELON3 CreateTime:1646012882651 LEMON4 CreateTime:1646012888443 FISH5 CreateTime:1646012894767 MEAT6 CreateTime:1646012907942 CHERRY7 CreateTime:1646012927510 NOODLE8 CreateTime:1646012935114 COFFEE9 ...(하략)
조회를 위해서 UNIX Timestamp를 ISO8601 포맷({YYYY-MM-DDTHH:mmSS.sss})으로 변경해주어야 한다.
다음 사이트를 참고한다.
$ ./kafka-consumer-groups.sh --bootstrap-server mybroker:9092 --group sy-consumer1 --topic stream.test.sy.string.220228 --reset-offsets --to-datetime 2022-03-04T01:43:13.564 --execute
--to-offset {long} : 특정 오프셋으로 리셋
테스트를 위해 --property print.offset=true 플래그를 활용하여 각 메시지의 오프셋을 확인한다.
$ ./kafka-console-consumer.sh --bootstrap-server mybroker:9092 --group sy-consumer1 --topic stream.test.sy.string.220228 --property print.offset=true
오프셋 20으로 리셋한다.
$ ./kafka-consumer-groups.sh --bootstrap-server mybroker:9092 --group sy-consumer1 --topic stream.test.sy.string.220228 --reset-offsets --to-offset 20 --execute GROUP TOPIC PARTITION NEW-OFFSET sy-consumer1 stream.test.sy.string.220228 0 20
--shift-by {+/- long} : 현재 컨슈머 오프셋에서 앞뒤로 옮겨서 리셋
테스트를 위해 오프셋 20에서 5만큼 이전으로 옮겨서 리셋한다.
$ ./kafka-consumer-groups.sh --bootstrap-server mybroker:9092 --group sy-consumer1 --topic stream.test.sy.string.220228 --reset-offsets --shift-by -5 --execute GROUP TOPIC PARTITION NEW-OFFSET sy-consumer1 stream.test.sy.string.220228 0 15
- references
https://docs.confluent.io/platform/current/clients/consumer.html
https://www.timestamp-converter.com/
dev원영님의 러닝스푼즈 오프라인 강의 자료
728x90'엔지니어링 > Kafka' 카테고리의 다른 글
Kafka Monitoring with Prometheus & Grafana : 1 (0) 2022.04.28 Apache Kafka Streams 예제코드 (0) 2022.03.03 Zero copy transfer (0) 2022.02.24 Jabba Utility (0) 2022.02.15 [구성] Mongo Sink Connector (0) 2021.12.22