-
[설치, 구성] Apache Kafka Installation엔지니어링/Kafka 2021. 11. 29. 23:10728x90
🐬아파치 카프카(오픈소스)를 설치해보겠다. 엔터프라이즈 버전도 동일하게 설치할 수 있다.
🐬버츄얼 머신 구성 후, 설정 부터의 메뉴얼을 담았다.
🐬< > 안의 내용은 사용자 지정이므로, 각자 입맛대로 설정해주면 된다.
🐬root 의 명령어가 주석처럼 나오는게 싫어서... > 로 대신했다.
1. OS setting
우분투OS 설치 후, VM에서 하는 설정이다.
다른 OS 모두 가능하나, 우분투가 가장 호환성이 좋다고 알고 있다.
vm을 생성하며 kafka 유저를 생성했다고 가정한다.
- root password 변경
$ sudo su -> kafka > passwd root -> <your password> -> <your password>
- ssh 연결을 위한 패키지 설치
> su - root > apt-get update > apt-get install openssh-server
- ssh root 접근 허용
> vi /etc/ssh/sshd_config PermitRootLogin yes #수정 #서비스 재시작 > systemctl restart ssh
- ssh 세션값 저장
> vi /etc/ssh/sshd_config ClientAliveInterval 0 #수정 ClientAliveCountMax 0 #수정 #서비스 재시작 > systemctl restart ssh
- session timeout 값 no limit으로 설정
#전역 타임아웃 해제 > vi /etc/profile ##Session TimeOut export TMOUT=0 > source /etc/profile
- 방화벽 해제
> ufw disable Firewall stopped and disabled on system startup
2. Installation
- Requirements
오라클 자바 1.8버전을 설치한다.
https://kafka.apache.org/documentation/#java
Apache Kafka
Apache Kafka: A Distributed Streaming Platform.
kafka.apache.org
> wget https://github.com/frekele/oracle-java/releases/download/8u212-b10/jdk-8u212-linux-x64.tar.gz > tar -xvzf jdk-8u212-linux-x64.tar.gz > mv jdk1.8.0_212 /usr/local ##---자바 홈 경로 설정 > vi /etc/profile ##JAVA_PATH JAVA_HOME=/usr/local/jdk1.8.0_212 CLASSPATH=$JAVA_HOME/lib/tools.jar PATH=$PATH:$JAVA_HOME/bin export JAVA_HOME export CLASSPATH export PATH > source /etc/profile ##---OS에 자바 등록 > vi java_path.sh update-alternatives --install "/usr/bin/java" "java" "/usr/local/jdk1.8.0_212/bin/java" 1; update-alternatives --install "/usr/bin/javac" "javac" "/usr/local/jdk1.8.0_212/bin/javac" 1; update-alternatives --install "/usr/bin/javaws" "javaws" "/usr/local/jdk1.8.0_212/bin/javaws" 1; update-alternatives --set java /usr/local/jdk1.8.0_212/bin/java; update-alternatives --set javac /usr/local/jdk1.8.0_212/bin/javac; update-alternatives --set javaws /usr/local/jdk1.8.0_212/bin/javaws; > sh -x java_path.sh
내부 통신을 위한 host file을 생성한다.
IP가 변경되는 경우, 모든 config를 바꾸어야 하는 불상사를 대비하기 위함도 있다.
노드 3대에 주키퍼 3대, 카프카 3대로 구성하였다. (주키퍼 앙상블도 가능)
> vi /etc/hosts 127.0.0.1 localhost <connectorIP> conn1 <broker1IP> kafka1 zk1 <broker2IP> kafka2 zk2 <broker3IP> kafka3 zk3
Apache Kafka "Binary" 버전을 다운받는다.
https://kafka.apache.org/downloads
Apache Kafka
Apache Kafka: A Distributed Streaming Platform.
kafka.apache.org
> wget https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz ##---압축 해제 > tar -xzf kafka_2.13-2.7.0.tgz ##---경로 변경 > mv kafka_2.13-2.7.0 /kafka ##---권한 변경 > chown -R kafka:kafka /kafka
◾️ Zookeeper Config
> su - kafka
- 환경에 맞게 property 파일 변경
$ vi /kafka/config/zookeeper.properties # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # the directory where the snapshot is stored. dataDir=/data/zookeeper # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config #maxClientCnxns=0 # Disable the adminserver by default to avoid port conflicts. # Set the port to something non-conflicting if choosing to enable this admin.enableServer=false # admin.serverPort=8080 tickTime=2000 initLimit=5 syncLimit=2 server.1=zk1:2888:3888 server.2=zk2:2888:3888 server.3=zk3:2888:3888
- zookeeper data 디렉토리 생성
$ sudo mkdir -p /data/zookeeper $ sudo -R kafka:kafka /data $ cd /data/zookeeper
- 각 노드의 server.id 생성
zookeeper.properties에서 설정한 ID 대로 만들어준다.
/data/zookeeper 디렉토리에서 진행한다.
#zookeeper01노드에서만 수행 $ echo "1" > myid $ sudo chown -R kafka:kafka /data/zookeeper #zookeeper02노드에서만 수행 $ echo "2" > myid $ sudo chown -R kafka:kafka /data/zookeeper #zookeeper03노드에서만 수행 $ echo "3" > myid $ sudo chown -R kafka:kafka /data/zookeeper
- 서비스 등록
OS에 서비스로 등록하여 기동하지 않는 것을 권장하기는 하나, 테스트 환경이므로 간편한 방법을 택한다.
$ sudo vi /lib/systemd/system/kafka-zookeeper.service [Unit] Description=kafka Zookeeper server (Kafka) Documentation=http://zookeeper.apache.org After=network.target remote-fs.target [Service] Type=simple User=kafka Group=kafka ExecStart=/kafka/bin/zookeeper-server-start.sh /kafka/config/zookeeper.properties ExecStop=/kafka/bin/zookeeper-server-stop.sh LimitNOFILE=100000 TimeoutStopSec=180 Restart=no [Install] WantedBy=multi-user.target
$ sudo systemctl daemon-reload $ sudo systemctl enable kafka-zookeeper Created symlink /etc/systemd/system/multi-user.target.wants/kafka-zookeeper.service → /lib/systemd/system/kafka-zookeeper.service. $ sudo systemctl start kafka-zookeeper
◾️ Broker Config
- essential : broker.id / log.dirs / zookeeper.connect
kafka1번 서버의 경우 broker.id=101, advertised.listeners=PLAINTEXT://kafka1:9092
kafka2번 서버의 경우 broker.id=102, advertised.listeners=PLAINTEXT://kafka2:9092
kafka3번 서버의 경우 broker.id=103, advertised.listeners=PLAINTEXT://kafka3:9092
참고로, advertised.listeners는 카프카가 아닌 외부 애플리케이션과의 통신을 위한 리스너이다. 만약 자바 코드로 producing & consuming 등을 하고싶을 때 해당 파라미터를 반드시 기재해주어야 한다.
가용성을 위해 주키퍼 앙상블을 구성하고자 한다면, zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/<cluster-name> 으로 설정해줄 수 있다.
$ vi /kafka/config/server.properties ############################# Server Basics ############################# broker.id=101 bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092 ############################# Socket Server Settings ############################# listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://kafka1:9092 #num.network.threads=3 #num.io.threads=8 #socket.send.buffer.bytes=102400 #socket.receive.buffer.bytes=102400 #socket.request.max.bytes=104857600 ############################# Log Basics ############################# log.dirs=/data/kafka-logs num.partitions=1 #num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining # segments drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# zookeeper.connect=zk1:2181,zk2:2181,zk3:2181 ############################# Group Coordinator Settings ############################# group.initial.rebalance.delay.ms=3
- data directory 생성
$ sudo mkdir -p /data/kafka-logs $ sudo chown -R kafka:kafka /data/kafka-logs
- 서비스 등록
$ sudo vi /lib/systemd/system/kafka-server.service [Unit] Description=Apache Kafka - broker Documentation=https://kafka.apache.org/ After=network.target kafka-zookeeper.target [Service] Type=simple User=kafka Group=kafka ExecStart=/kafka/bin/kafka-server-start.sh /kafka/config/server.properties ExecStop=/kafka/bin/kafka-server-stop.sh LimitNOFILE=1000000 TimeoutStopSec=180 Restart=no [Install] WantedBy=multi-user.target
$ sudo systemctl daemon-reload $ sudo systemctl enable kafka-server Created symlink /etc/systemd/system/multi-user.target.wants/kafka-server.service → /lib/systemd/system/kafka-server.service. $ sudo systemctl start kafka-server $ sudo systemctl status kafka-server
만약 주키퍼 앙상블을 구성했다면 다음과 같이 확인할 수 있다.
(주키퍼 3대를 cluster01 로 묶었을 경우)
$ cd /kafka/bin $ ./zookeeper-shell.sh zk1:2181 Connecting to zk1:2181 Welcome to ZooKeeper! JLine support is disabled WATCHER:: WatchedEvent state:SyncConnected type:None path:null ls / [cluster01, zookeeper] ls /cluster01/brokers/ids [101, 102, 103]
여기까지 하면 아주아주아주 기본적인 카프카 구성은 완료이다.
그런데 데이터 파이프라인 구성에서 빠질 수 없는 커넥터까지는 세트로 설치해봐야 한다고 생각한다.
그래서 다음은 connector 설치 메뉴얼이다.
◾️ Connector Config
https://kafka.apache.org/documentation/#connect
Apache Kafka
Apache Kafka: A Distributed Streaming Platform.
kafka.apache.org
etc/hosts파일에서 정한 conn1 서버에서 진행한다.
- config 수정
$ vi /kafka/config/connect-distributed.properties # A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092 # unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs group.id=connect-cluster # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply # it to key.converter.schemas.enable=false value.converter.schemas.enable=false # Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted. # Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create # the topic before starting Kafka Connect if a specific topic configuration is needed. # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value. # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able # to run this example on a single-broker cluster and so here we instead set the replication factor to 1. offset.storage.topic=connect-offsets offset.storage.replication.factor=1 #3으로 바꾸기 #티켓 뭔지 물어보기 커넥터에서 repl~fac #offset.storage.partitions=25 # Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated, # and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create # the topic before starting Kafka Connect if a specific topic configuration is needed. # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value. # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able # to run this example on a single-broker cluster and so here we instead set the replication factor to 1. config.storage.topic=connect-configs config.storage.replication.factor=1 # Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted. # Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create # the topic before starting Kafka Connect if a specific topic configuration is needed. # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value. # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able # to run this example on a single-broker cluster and so here we instead set the replication factor to 1. status.storage.topic=connect-status status.storage.replication.factor=1 #status.storage.partitions=5 # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 # These are provided to inform the user about the presence of the REST host and port configs # Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests. rest.host.name=conn1 rest.port=8083 # The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers. #rest.advertised.host.name= #rest.advertised.port= # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins # (connectors, converters, transformations). The list should consist of top level directories that include # any combination of: # a) directories immediately containing jars with plugins and their dependencies # b) uber-jars with plugins and their dependencies # c) directories immediately containing the package directory structure of classes of plugins and their dependencies # Examples: # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, plugin.path=/usr/local/share/java,
- 로그 폴더 생성 및 권한 변경
$ sudo mkdir -p /data/connector $ sudo chown -R kafka:kafka /data/connector/
- 서비스 등록
$ sudo vi /lib/systemd/system/kafka-connector.service [Unit] Description=kafka-connector Documentation=https://kafka.apache.org// After=network.target kafka.target [Service] Type=simple User=kafka #Group=confluent Environment="LOG_DIR=/data/connector" ExecStart=/kafka/bin/connect-distributed.sh /kafka/config/connect-distributed.properties & TimeoutStopSec=180 Restart=no [Install] WantedBy=multi-user.target
$ sudo systemctl daemon-reload $ sudo systemctl enable kafka-connector.service Created symlink /etc/systemd/system/multi-user.target.wants/kafka-connector.service → /lib/systemd/system/kafka-connector.service. $ sudo systemctl start kafka-connector.service
- connector process check
##---프로세스 기동 확인 $ ps -ef | grep java | grep conn ##---로그 잘 생겼는지 확인 $ cd /data/connector $ ls -l total 116 148 -rw-r--r-- 1 kafka kafka 148083 3월 12 14:27 connect.log $ curl http://conn1:8083/ {"version":"2.7.0","commit":"","kafka_cluster_id":""}
728x90'엔지니어링 > Kafka' 카테고리의 다른 글
Kafka consumer group (0) 2022.03.04 Apache Kafka Streams 예제코드 (0) 2022.03.03 Zero copy transfer (0) 2022.02.24 Jabba Utility (0) 2022.02.15 [구성] Mongo Sink Connector (0) 2021.12.22