ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [구성] Mongo Sink Connector
    엔지니어링/Kafka 2021. 12. 22. 16:02
    728x90

    🍃Mongo Sink Connector를 구성해 보려고 한다. (apache/confluent 모두에서 사용 가능)

    🍃<your hostname>는 해당 서버의 호스트이름을, <한국어로 써놓은 config들>은 사용자 설정사항이다. (그냥 입맛대로 정하면 됨)

    🍃MongoDB 커넥터의 경우, confluent가 아닌 벤더사 자체에서 지원해주고 있다. 

    https://docs.mongodb.com/kafka-connector/current/introduction/install/#std-label-kafka-connector-installation-reference

     

    Install the MongoDB Kafka Connector — MongoDB Kafka Connector

    Docs Home → MongoDB Kafka ConnectorLearn how to install the MongoDB Kafka Connector. The MongoDB Kafka Connector is available for Confluent Platform and Apache Kafka deployments. To see installation instructions for your deployment type, navigate to one

    docs.mongodb.com

     

    ++ 22.04.14 업데이트
    최근 조회수가 꾸준히 나오는 것 같아서 configuration을 업데이트 해본다 ㅋㅋ


    • kafka user는 이미 생성되어 있다고 가정
    • 커넥터 플러그인을 모아놓을 디렉토리 생성
    > su - kafka
    $ cd /kafka
    $ mkdir plugins
    
    $ sudo chown -R kafka:kafka /kafka/plugins

     

    • confluent-hub 사이트에서 mongoDB sink connector 다운로드

    물론 wget으로 직접 서버에 받아도 무방하다. 

    https://www.confluent.io/hub/hpgrahsl/kafka-connect-mongodb

     

    MongoDB Sink Connector

    Confluent, founded by the creators of Apache Kafka, delivers a complete execution of Kafka for the Enterprise, to help you run your business in real time.

    www.confluent.io

    로컬에서 서버로 전송한다.
    (이때, 다운로드 받은 커넥터의 jar파일들을 구분없이 플러그인 디렉토리에 저장하지 않도록 함. jar끼리 충돌 일어나서 커넥터가 제대로 등록되지 않기 때문)

    % scp -r mongodb-kafka-connect-mongodb-1.6.1 kafka@<your IP>:/kafka/plugins

     

    • connector-distributed.properties의 플러그인 경로 추가 및 재기동 

    해당 파일은 ${kafka_home}/config 에 위치해 있다. (나의 경우 ${kafka_home} == /kafka )

    플러그인 경로는 default 보다는 사용자가 지정해주는 것을 권장한다. 

    $ vi /kafka/config/connect-distributed.properties
    plugin.path=/usr/local/jdk1.8.0_212,/kafka/plugins
    
    $ sudo systemctl stop kafka-connector.service
    $ sudo systemctl start kafka-connector.service

     

    • 플러그인 설치 확인
    $ curl http://<your hostname>:8083/connector-plugins | jq
    
    % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                     Dload  Upload   Total   Spent    Left  Speed
    100   690  100   690    0     0   3484      0 --:--:-- --:--:-- --:--:--  3484
    [
        {
            "class": "com.mongodb.kafka.connect.MongoSinkConnector",
            "type": "sink",
            "version": "1.6.1-dirty" #버전이 dirty
        },
        {
            "class": "com.mongodb.kafka.connect.MongoSourceConnector",
            "type": "source",
            "version": "1.6.1-dirty" #버전이 dirty
        },

     

    • Mongo Sink Connector 등록

    이때 몇 가지 주의사항이 있다. 
    1. MongoDB에 insert하는 collection 이름에 하이픈(-) 등의 특수문자가 들어가면 안된다. (CLI로 조회 불가함)
    2. value값에 null이 들어가면 DataException ERROR가 뜨면서 커넥터 프로세스가 중단된다. value가 null인 데이터를 무시하더라도 커넥터 프로세스를 계속 유지하고 싶다면, 다음 파라미터를 추가한다. 

      "mongo.errors.tolerance": "all"    ##커넥터는 문제가 있는 메시지를 무시하고 처리
      "mongo.errors.log.enable": "true"    ##문제가 있는 메시지를 무시하고 처리하되, 에러 로그를 남김
    ##connector config 모아놓을 디렉토리 생성
    $ cd 
    $ mkdir json && cd json
    
    
    ##config 생성
    $ vi Kafka-to-Mongo.json
    {
      "name": "mongoSink-sample",
      "config": {
          "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
          "tasks.max":"2",
          "topics":"<전송할 토픽>",
          "connection.uri":"mongodb://admin:mongo@<mongoDB IP>:<mongodb port>",
          "database":"<sink할 몽고DB 데이터베이스 이름>",
          "collection":"<sink할 몽고DB 컬렉션 이름>"
      }
    }
    
    
    ##connector 등록
    $ curl -X POST -H "Content-Type: application/json" --data @Kafka-to-Mongo.json http://<your hostname>:8083/connectors/

    초기 적재이므로 몽고에서 확인했을 때, 상기에서 지정한 데이터베이스와 컬렉션이 모두 보여야 정상이다. (새로운 메시지를 프로듀싱 하지 않더라도)

     

    • 커넥터 설치 확인
    $ curl <your hostname>:8083/connectors
    
    ["<사용자 지정 커넥터 이름>"]

     

    • 커넥터 상태 확인
    $ curl <your hostname>:8083/connectors/<사용자 지정 커넥터 이름>/status | jq

     

    • 커넥터 삭제
    $ curl -XDELETE http://<your hostname>:8083/connectors/<사용자 지정 커넥터 이름>/
    
    ##결과값 아무것도 안나오면 삭제 잘 된것임
    728x90

    '엔지니어링 > Kafka' 카테고리의 다른 글

    Kafka consumer group  (0) 2022.03.04
    Apache Kafka Streams 예제코드  (0) 2022.03.03
    Zero copy transfer  (0) 2022.02.24
    Jabba Utility  (0) 2022.02.15
    [설치, 구성] Apache Kafka Installation  (0) 2021.11.29

    댓글

Designed by Tistory.