-
[구성] Mongo Sink Connector엔지니어링/Kafka 2021. 12. 22. 16:02728x90
🍃Mongo Sink Connector를 구성해 보려고 한다. (apache/confluent 모두에서 사용 가능)
🍃<your hostname>는 해당 서버의 호스트이름을, <한국어로 써놓은 config들>은 사용자 설정사항이다.
(그냥 입맛대로 정하면 됨)🍃MongoDB 커넥터의 경우, confluent가 아닌 벤더사 자체에서 지원해주고 있다.
++ 22.04.14 업데이트
최근 조회수가 꾸준히 나오는 것 같아서 configuration을 업데이트 해본다 ㅋㅋ
- kafka user는 이미 생성되어 있다고 가정
- 커넥터 플러그인을 모아놓을 디렉토리 생성
> su - kafka $ cd /kafka $ mkdir plugins $ sudo chown -R kafka:kafka /kafka/plugins
- confluent-hub 사이트에서 mongoDB sink connector 다운로드
물론 wget으로 직접 서버에 받아도 무방하다.
https://www.confluent.io/hub/hpgrahsl/kafka-connect-mongodb
로컬에서 서버로 전송한다.
(이때, 다운로드 받은 커넥터의 jar파일들을 구분없이 플러그인 디렉토리에 저장하지 않도록 함. jar끼리 충돌 일어나서 커넥터가 제대로 등록되지 않기 때문)% scp -r mongodb-kafka-connect-mongodb-1.6.1 kafka@<your IP>:/kafka/plugins
- connector-distributed.properties의 플러그인 경로 추가 및 재기동
해당 파일은 ${kafka_home}/config 에 위치해 있다. (나의 경우 ${kafka_home} == /kafka )
플러그인 경로는 default 보다는 사용자가 지정해주는 것을 권장한다.
$ vi /kafka/config/connect-distributed.properties plugin.path=/usr/local/jdk1.8.0_212,/kafka/plugins $ sudo systemctl stop kafka-connector.service $ sudo systemctl start kafka-connector.service
- 플러그인 설치 확인
$ curl http://<your hostname>:8083/connector-plugins | jq % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 690 100 690 0 0 3484 0 --:--:-- --:--:-- --:--:-- 3484 [ { "class": "com.mongodb.kafka.connect.MongoSinkConnector", "type": "sink", "version": "1.6.1-dirty" #버전이 dirty }, { "class": "com.mongodb.kafka.connect.MongoSourceConnector", "type": "source", "version": "1.6.1-dirty" #버전이 dirty },
- Mongo Sink Connector 등록
이때 몇 가지 주의사항이 있다.
1. MongoDB에 insert하는 collection 이름에 하이픈(-) 등의 특수문자가 들어가면 안된다. (CLI로 조회 불가함)
2. value값에 null이 들어가면 DataException ERROR가 뜨면서 커넥터 프로세스가 중단된다. value가 null인 데이터를 무시하더라도 커넥터 프로세스를 계속 유지하고 싶다면, 다음 파라미터를 추가한다."mongo.errors.tolerance": "all" ##커넥터는 문제가 있는 메시지를 무시하고 처리 "mongo.errors.log.enable": "true" ##문제가 있는 메시지를 무시하고 처리하되, 에러 로그를 남김
##connector config 모아놓을 디렉토리 생성 $ cd $ mkdir json && cd json ##config 생성 $ vi Kafka-to-Mongo.json { "name": "mongoSink-sample", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "tasks.max":"2", "topics":"<전송할 토픽>", "connection.uri":"mongodb://admin:mongo@<mongoDB IP>:<mongodb port>", "database":"<sink할 몽고DB 데이터베이스 이름>", "collection":"<sink할 몽고DB 컬렉션 이름>" } } ##connector 등록 $ curl -X POST -H "Content-Type: application/json" --data @Kafka-to-Mongo.json http://<your hostname>:8083/connectors/
초기 적재이므로 몽고에서 확인했을 때, 상기에서 지정한 데이터베이스와 컬렉션이 모두 보여야 정상이다. (새로운 메시지를 프로듀싱 하지 않더라도)
- 커넥터 설치 확인
$ curl <your hostname>:8083/connectors ["<사용자 지정 커넥터 이름>"]
- 커넥터 상태 확인
$ curl <your hostname>:8083/connectors/<사용자 지정 커넥터 이름>/status | jq
- 커넥터 삭제
$ curl -XDELETE http://<your hostname>:8083/connectors/<사용자 지정 커넥터 이름>/ ##결과값 아무것도 안나오면 삭제 잘 된것임
728x90'엔지니어링 > Kafka' 카테고리의 다른 글
Kafka consumer group (0) 2022.03.04 Apache Kafka Streams 예제코드 (0) 2022.03.03 Zero copy transfer (0) 2022.02.24 Jabba Utility (0) 2022.02.15 [설치, 구성] Apache Kafka Installation (0) 2021.11.29