ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [설치, 구성] Apache Kafka Installation
    엔지니어링/Kafka 2021. 11. 29. 23:10
    728x90

    🐬아파치 카프카(오픈소스)를 설치해보겠다. 엔터프라이즈 버전도 동일하게 설치할 수 있다. 

    🐬버츄얼 머신 구성 후, 설정 부터의 메뉴얼을 담았다. 

    🐬< > 안의 내용은 사용자 지정이므로, 각자 입맛대로 설정해주면 된다. 

    🐬root 의 명령어가 주석처럼 나오는게 싫어서... > 로 대신했다. 


    1. OS setting

    우분투OS 설치 후, VM에서 하는 설정이다. 

    다른 OS 모두 가능하나, 우분투가 가장 호환성이 좋다고 알고 있다. 

    vm을 생성하며 kafka 유저를 생성했다고 가정한다. 

    • root password 변경
    $ sudo su 
    -> kafka
    
    > passwd root
    -> <your password>
    -> <your password>
    • ssh 연결을 위한 패키지 설치
    > su - root
    
    > apt-get update 
    
    > apt-get install openssh-server
    • ssh root 접근 허용
    > vi /etc/ssh/sshd_config
    PermitRootLogin yes #수정
    
    #서비스 재시작
    > systemctl restart ssh
    • ssh 세션값 저장
    > vi /etc/ssh/sshd_config
    ClientAliveInterval 0   #수정
    ClientAliveCountMax 0   #수정
    
    #서비스 재시작
    > systemctl restart ssh
    • session timeout 값 no limit으로 설정
    #전역 타임아웃 해제
    > vi /etc/profile 
    ##Session TimeOut 
    export TMOUT=0
    
    > source /etc/profile
    • 방화벽 해제
    > ufw disable
    Firewall stopped and disabled on system startup

    2. Installation

    • Requirements

    오라클 자바 1.8버전을 설치한다. 

    https://kafka.apache.org/documentation/#java

     

    Apache Kafka

    Apache Kafka: A Distributed Streaming Platform.

    kafka.apache.org

    > wget https://github.com/frekele/oracle-java/releases/download/8u212-b10/jdk-8u212-linux-x64.tar.gz
    
    > tar -xvzf jdk-8u212-linux-x64.tar.gz
    
    > mv jdk1.8.0_212 /usr/local
    
    ##---자바 홈 경로 설정
    > vi /etc/profile
    
    ##JAVA_PATH
    JAVA_HOME=/usr/local/jdk1.8.0_212
    CLASSPATH=$JAVA_HOME/lib/tools.jar
    PATH=$PATH:$JAVA_HOME/bin
    export JAVA_HOME
    export CLASSPATH
    export PATH
    
    > source /etc/profile
    
    ##---OS에 자바 등록
    > vi java_path.sh 
    update-alternatives --install "/usr/bin/java" "java" "/usr/local/jdk1.8.0_212/bin/java" 1;
    update-alternatives --install "/usr/bin/javac" "javac" "/usr/local/jdk1.8.0_212/bin/javac" 1;
    update-alternatives --install "/usr/bin/javaws" "javaws" "/usr/local/jdk1.8.0_212/bin/javaws" 1;
    update-alternatives --set java /usr/local/jdk1.8.0_212/bin/java;
    update-alternatives --set javac /usr/local/jdk1.8.0_212/bin/javac;
    update-alternatives --set javaws /usr/local/jdk1.8.0_212/bin/javaws;
    
    > sh -x java_path.sh

    내부 통신을 위한 host file을 생성한다. 

    IP가 변경되는 경우, 모든 config를 바꾸어야 하는 불상사를 대비하기 위함도 있다. 

    노드 3대에 주키퍼 3대, 카프카 3대로 구성하였다. (주키퍼 앙상블도 가능)

    > vi /etc/hosts
    
    127.0.0.1       localhost
    
    <connectorIP>    conn1
    <broker1IP>     kafka1     zk1
    <broker2IP>     kafka2     zk2
    <broker3IP>     kafka3     zk3

    Apache Kafka "Binary" 버전을 다운받는다. 

    https://kafka.apache.org/downloads

     

    Apache Kafka

    Apache Kafka: A Distributed Streaming Platform.

    kafka.apache.org

    > wget https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz
    
    ##---압축 해제
    > tar -xzf kafka_2.13-2.7.0.tgz
    
    ##---경로 변경
    > mv kafka_2.13-2.7.0 /kafka
    
    ##---권한 변경
    > chown -R kafka:kafka /kafka

     

     

    ◾️ Zookeeper Config

    > su - kafka
    • 환경에 맞게 property 파일 변경
    $ vi /kafka/config/zookeeper.properties
    
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    # 
    #    http://www.apache.org/licenses/LICENSE-2.0
    # 
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    # the directory where the snapshot is stored.
    dataDir=/data/zookeeper
    # the port at which the clients will connect
    clientPort=2181
    # disable the per-ip limit on the number of connections since this is a non-production config
    #maxClientCnxns=0
    # Disable the adminserver by default to avoid port conflicts.
    # Set the port to something non-conflicting if choosing to enable this
    admin.enableServer=false
    # admin.serverPort=8080
    tickTime=2000
    initLimit=5
    syncLimit=2
    server.1=zk1:2888:3888
    server.2=zk2:2888:3888
    server.3=zk3:2888:3888
    • zookeeper data 디렉토리 생성
    $ sudo mkdir -p /data/zookeeper
    
    $ sudo -R kafka:kafka /data
    
    $ cd /data/zookeeper
    • 각 노드의 server.id 생성

    zookeeper.properties에서 설정한 ID 대로 만들어준다. 

    /data/zookeeper 디렉토리에서 진행한다. 

    #zookeeper01노드에서만 수행
    $ echo "1" > myid
    $ sudo chown -R kafka:kafka /data/zookeeper
    
    #zookeeper02노드에서만 수행
    $ echo "2" > myid
    $ sudo chown -R kafka:kafka /data/zookeeper
    
    #zookeeper03노드에서만 수행
    $ echo "3" > myid
    $ sudo chown -R kafka:kafka /data/zookeeper
    • 서비스 등록

    OS에 서비스로 등록하여 기동하지 않는 것을 권장하기는 하나, 테스트 환경이므로 간편한 방법을 택한다.

    $ sudo vi /lib/systemd/system/kafka-zookeeper.service
    
    [Unit]
    Description=kafka Zookeeper server (Kafka)
    Documentation=http://zookeeper.apache.org
    After=network.target remote-fs.target
    
    [Service]
    Type=simple
    User=kafka
    Group=kafka
    ExecStart=/kafka/bin/zookeeper-server-start.sh /kafka/config/zookeeper.properties 
    ExecStop=/kafka/bin/zookeeper-server-stop.sh
    LimitNOFILE=100000
    TimeoutStopSec=180 
    Restart=no
    
    [Install]
    WantedBy=multi-user.target
    $ sudo systemctl daemon-reload
    
    $ sudo systemctl enable kafka-zookeeper
    Created symlink /etc/systemd/system/multi-user.target.wants/kafka-zookeeper.service → /lib/systemd/system/kafka-zookeeper.service.
    
    $ sudo systemctl start kafka-zookeeper

     

    ◾️ Broker Config

    • essential : broker.id / log.dirs / zookeeper.connect

    kafka1번 서버의 경우 broker.id=101, advertised.listeners=PLAINTEXT://kafka1:9092

    kafka2번 서버의 경우 broker.id=102, advertised.listeners=PLAINTEXT://kafka2:9092

    kafka3번 서버의 경우 broker.id=103, advertised.listeners=PLAINTEXT://kafka3:9092

     

    참고로, advertised.listeners는 카프카가 아닌 외부 애플리케이션과의 통신을 위한 리스너이다. 만약 자바 코드로 producing & consuming 등을 하고싶을 때 해당 파라미터를 반드시 기재해주어야 한다.

     

    가용성을 위해 주키퍼 앙상블을 구성하고자 한다면,  zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/<cluster-name> 으로 설정해줄 수 있다. 

    $ vi /kafka/config/server.properties
    
    ############################# Server Basics #############################
    
    broker.id=101
    
    bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
    ############################# Socket Server Settings #############################
    
    listeners=PLAINTEXT://:9092
    
    advertised.listeners=PLAINTEXT://kafka1:9092
    
    #num.network.threads=3
    
    #num.io.threads=8
    
    #socket.send.buffer.bytes=102400
    
    #socket.receive.buffer.bytes=102400
    
    #socket.request.max.bytes=104857600
    
    
    ############################# Log Basics #############################
    
    log.dirs=/data/kafka-logs
    
    num.partitions=1
    
    #num.recovery.threads.per.data.dir=1
    
    ############################# Internal Topic Settings  #############################
    
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    
    
    ############################# Log Retention Policy #############################
    
    # The following configurations control the disposal of log segments. The policy can
    # be set to delete segments after a period of time, or after a given size has accumulated.
    # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    # from the end of the log.
    
    # The minimum age of a log file to be eligible for deletion due to age
    log.retention.hours=168
    
    # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
    # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    #log.retention.bytes=1073741824
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000
    
    ############################# Zookeeper #############################
    
    zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
    
    ############################# Group Coordinator Settings #############################
    
    group.initial.rebalance.delay.ms=3
    • data directory 생성
    $ sudo mkdir -p /data/kafka-logs
    
    $ sudo chown -R kafka:kafka /data/kafka-logs
    • 서비스 등록
    $ sudo vi /lib/systemd/system/kafka-server.service
    
    [Unit]
    Description=Apache Kafka - broker
    Documentation=https://kafka.apache.org/
    After=network.target kafka-zookeeper.target
    
    
    [Service]
    Type=simple
    User=kafka
    Group=kafka
    ExecStart=/kafka/bin/kafka-server-start.sh /kafka/config/server.properties
    ExecStop=/kafka/bin/kafka-server-stop.sh
    LimitNOFILE=1000000 
    TimeoutStopSec=180
    Restart=no
    
    [Install]
    WantedBy=multi-user.target
    $ sudo systemctl daemon-reload
    
    $ sudo systemctl enable kafka-server
    Created symlink /etc/systemd/system/multi-user.target.wants/kafka-server.service → /lib/systemd/system/kafka-server.service.
    
    $ sudo systemctl start kafka-server
    
    $ sudo systemctl status kafka-server

    만약 주키퍼 앙상블을 구성했다면 다음과 같이 확인할 수 있다. 

    (주키퍼 3대를 cluster01 로 묶었을 경우)

    $ cd /kafka/bin 
    $ ./zookeeper-shell.sh zk1:2181
    
    Connecting to zk1:2181
    Welcome to ZooKeeper!
    JLine support is disabled
    
    WATCHER::
    
    WatchedEvent state:SyncConnected type:None path:null
    
    ls /
    [cluster01, zookeeper]
    
    ls /cluster01/brokers/ids
    [101, 102, 103]

    여기까지 하면 아주아주아주 기본적인 카프카 구성은 완료이다. 

    그런데 데이터 파이프라인 구성에서 빠질 수 없는 커넥터까지는 세트로 설치해봐야 한다고 생각한다. 

    그래서 다음은 connector 설치 메뉴얼이다. 


    ◾️ Connector Config

    https://kafka.apache.org/documentation/#connect

     

    Apache Kafka

    Apache Kafka: A Distributed Streaming Platform.

    kafka.apache.org

    etc/hosts파일에서 정한 conn1 서버에서 진행한다. 

    • config 수정
    $ vi /kafka/config/connect-distributed.properties
    
    # A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
    bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
    
    # unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
    group.id=connect-cluster
    
    # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
    # need to configure these based on the format they want their data in when loaded from or stored into Kafka
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
    # it to
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    
    # Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
    # Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
    # the topic before starting Kafka Connect if a specific topic configuration is needed.
    # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
    # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
    # to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
    offset.storage.topic=connect-offsets
    offset.storage.replication.factor=1 #3으로 바꾸기 #티켓 뭔지 물어보기 커넥터에서 repl~fac
    #offset.storage.partitions=25
    
    # Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
    # and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
    # the topic before starting Kafka Connect if a specific topic configuration is needed.
    # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
    # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
    # to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
    config.storage.topic=connect-configs
    config.storage.replication.factor=1
    
    # Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
    # Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
    # the topic before starting Kafka Connect if a specific topic configuration is needed.
    # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
    # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
    # to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
    status.storage.topic=connect-status
    status.storage.replication.factor=1
    #status.storage.partitions=5
    
    # Flush much faster than normal, which is useful for testing/debugging
    offset.flush.interval.ms=10000
    
    # These are provided to inform the user about the presence of the REST host and port configs 
    # Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
    rest.host.name=conn1
    rest.port=8083
    
    # The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
    #rest.advertised.host.name=
    #rest.advertised.port=
    
    # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
    # (connectors, converters, transformations). The list should consist of top level directories that include 
    # any combination of: 
    # a) directories immediately containing jars with plugins and their dependencies
    # b) uber-jars with plugins and their dependencies
    # c) directories immediately containing the package directory structure of classes of plugins and their dependencies
    # Examples: 
    # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
    plugin.path=/usr/local/share/java,
    • 로그 폴더 생성 및 권한 변경
    $ sudo mkdir -p /data/connector 
    $ sudo chown -R kafka:kafka /data/connector/
    • 서비스 등록
    $ sudo vi /lib/systemd/system/kafka-connector.service
    
    [Unit]
    Description=kafka-connector
    Documentation=https://kafka.apache.org//
    After=network.target kafka.target
    
    [Service]
    Type=simple
    User=kafka
    #Group=confluent
    Environment="LOG_DIR=/data/connector"
    ExecStart=/kafka/bin/connect-distributed.sh /kafka/config/connect-distributed.properties &
    TimeoutStopSec=180
    Restart=no
    
    [Install]
    WantedBy=multi-user.target
    $ sudo systemctl daemon-reload
    
    $ sudo systemctl enable kafka-connector.service
    Created symlink /etc/systemd/system/multi-user.target.wants/kafka-connector.service → /lib/systemd/system/kafka-connector.service.
    
    $ sudo systemctl start kafka-connector.service
    • connector process check
    ##---프로세스 기동 확인
    $ ps -ef | grep  java | grep conn
    
    ##---로그 잘 생겼는지 확인
    $ cd /data/connector
    $ ls -l
    total 116
    148 -rw-r--r-- 1 kafka kafka 148083 3월 12 14:27 connect.log
    
    $ curl http://conn1:8083/
    {"version":"2.7.0","commit":"","kafka_cluster_id":""}

     

    728x90

    '엔지니어링 > Kafka' 카테고리의 다른 글

    Kafka consumer group  (0) 2022.03.04
    Apache Kafka Streams 예제코드  (0) 2022.03.03
    Zero copy transfer  (0) 2022.02.24
    Jabba Utility  (0) 2022.02.15
    [구성] Mongo Sink Connector  (0) 2021.12.22

    댓글

Designed by Tistory.