ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka consumer group
    엔지니어링/Kafka 2022. 3. 4. 10:47
    728x90

    카프카 컨슈머 그룹을 조작하는 방법 및 특정 오프셋을 리셋하는 방법을 알아보겠다. 

     

    List Groups 

    컨슈머 그룹의 리스트를 출력한다. 

    $ pwd
    
    /kafka/bin
    $ ./kafka-consumer-groups.sh --bootstrap-server mybroker:9092 --list
    
    kafka-streams-101
    sy-consumer1
    sy-consumer2

     

    Describe Group 

    특정 컨슈머 그룹의 정보를 출력한다. 

    $ ./kafka-consumer-groups.sh --bootstrap-server mybroker:9092 --group sy-consumer1 --describe
    
    Consumer group 'sy-consumer1' has no active members.
    GROUP           TOPIC                        PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
    sy-consumer1    stream.test.sy.string.220228 0          30              30              0               -               -               -

     

    Delete Group 

    특정 컨슈머 그룹을 삭제한다. 

    $ ./kafka-consumer-groups.sh --bootstrap-server mybroker:9092 --group sy-consumer2 --delete 
    
    Deletion of requested consumer groups ('sy-consumer2') was successful.

     

    Consume with Group 

    특정 컨슈머 그룹을 지정하여 토픽을 컨슈밍한다. 

    $ ./kafka-console-consumer.sh --bootstrap-server mybroker:9092 --topic stream.test.sy.string.220228 --from-beginning --group sy-consumer1
    
    APPLE1
    BANANA2
    WATERMELON3
    LEMON4
    FISH5
    MEAT6
    CHERRY7
    NOODLE8
    COFFEE9
    ...(하략)

     

    Reset Offset

    CLI 실행을 위해 --execute 플래그를 붙여준다. 

     

    --to-earliest : 가장 처음 오프셋(작은 번호)으로 리셋

    $ ./kafka-consumer-groups.sh --bootstrap-server mybroker:9092 --group sy-consumer1 --topic stream.test.sy.string.220228 --reset-offsets --to-earliest --execute
    
    GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
    sy-consumer1                   stream.test.sy.string.220228   0          0

    --to-latest : 가장 마지막 오프셋(큰 번호)으로 리셋

    $ ./kafka-consumer-groups.sh --bootstrap-server mybroker:9092 --group sy-consumer1 --topic stream.test.sy.string.220228 --reset-offsets --to-latest --execute
    
    GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
    sy-consumer1                   stream.test.sy.string.220228   0          30

    --to-current : 현 시점 기준 오프셋으로 리셋

    테스트를 위해 최대 메시지 개수를 7로 정하여 컨슈밍한다. 

    $ ./kafka-console-consumer.sh --bootstrap-server mybroker:9092 --group sy-consumer1 --topic stream.test.sy.string.220228 --from-beginning --max-messages 7
    
    APPLE1
    BANANA2
    WATERMELON3
    LEMON4
    FISH5
    MEAT6
    CHERRY7
    Processed a total of 7 messages

    현 시점의 오프셋으로 리셋한다. 

    $ ./kafka-consumer-groups.sh --bootstrap-server mybroker:9092 --group sy-consumer1 --topic stream.test.sy.string.220228 --reset-offsets --to-current --execute
    
    GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
    sy-consumer1                   stream.test.sy.string.220228   0          7

    --to-datetime {YYYY-MM-DDTHH:mmSS.sss} : 특정 일시로 오프셋 리셋(레코드 타임스탬프 기준)

    테스트를 위해 --property print.timestamp=true 플래그를 활용하여 각 메시지가 쓰여진 시간(timestamp)를 확인한다. 

    $ ./kafka-console-consumer.sh --bootstrap-server mybroker:9092 --group sy-consumer1 --topic stream.test.sy.string.220228 --property print.timestamp=true
    
    CreateTime:1646012868689	APPLE1
    CreateTime:1646012872828	BANANA2
    CreateTime:1646012876854	WATERMELON3
    CreateTime:1646012882651	LEMON4
    CreateTime:1646012888443	FISH5
    CreateTime:1646012894767	MEAT6
    CreateTime:1646012907942	CHERRY7
    CreateTime:1646012927510	NOODLE8
    CreateTime:1646012935114	COFFEE9
    ...(하략)

    조회를 위해서 UNIX Timestamp를 ISO8601 포맷({YYYY-MM-DDTHH:mmSS.sss})으로 변경해주어야 한다. 

    다음 사이트를 참고한다.

    $ ./kafka-consumer-groups.sh --bootstrap-server mybroker:9092 --group sy-consumer1 --topic stream.test.sy.string.220228 --reset-offsets --to-datetime 2022-03-04T01:43:13.564 --execute

    --to-offset {long} : 특정 오프셋으로 리셋 

    테스트를 위해 --property print.offset=true 플래그를 활용하여 각 메시지의 오프셋을 확인한다. 

    $ ./kafka-console-consumer.sh --bootstrap-server mybroker:9092 --group sy-consumer1 --topic stream.test.sy.string.220228 --property print.offset=true

    오프셋 20으로 리셋한다. 

    $ ./kafka-consumer-groups.sh --bootstrap-server mybroker:9092 --group sy-consumer1 --topic stream.test.sy.string.220228 --reset-offsets --to-offset 20 --execute 
    
    GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
    sy-consumer1                   stream.test.sy.string.220228   0          20

    --shift-by {+/- long} : 현재 컨슈머 오프셋에서 앞뒤로 옮겨서 리셋

    테스트를 위해 오프셋 20에서 5만큼 이전으로 옮겨서 리셋한다. 

    $ ./kafka-consumer-groups.sh --bootstrap-server mybroker:9092 --group sy-consumer1 --topic stream.test.sy.string.220228 --reset-offsets --shift-by -5 --execute
    
    GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
    sy-consumer1                   stream.test.sy.string.220228   0          15

     

    • references

    https://docs.confluent.io/platform/current/clients/consumer.html

     

    Kafka Consumer | Confluent Documentation

    Home Build Applications for Kafka Kafka Clients Kafka Consumer Confluent Platform includes the Java consumer shipped with Apache Kafka®. This section gives a high-level overview of how the consumer works and an introduction to the configuration settings f

    docs.confluent.io

    https://www.timestamp-converter.com/

     

    Timestamp Converter

     

    www.timestamp-converter.com

    dev원영님의 러닝스푼즈 오프라인 강의 자료

    728x90

    '엔지니어링 > Kafka' 카테고리의 다른 글

    Kafka Monitoring with Prometheus & Grafana : 1  (0) 2022.04.28
    Apache Kafka Streams 예제코드  (0) 2022.03.03
    Zero copy transfer  (0) 2022.02.24
    Jabba Utility  (0) 2022.02.15
    [구성] Mongo Sink Connector  (0) 2021.12.22

    댓글

Designed by Tistory.