ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Apache Kafka Streams 예제코드
    엔지니어링/Kafka 2022. 3. 3. 15:36
    728x90

    1. Initialize the project

    스트림 프로젝트를 실행할 새로운 디렉토리를 생성한다. 

    $ pwd
    /Users/ksy/IdeaProjects
    
    $ mkdir creating-first-apache-kafka-streams-application && cd creating-first-apache-kafka-streams-application

     

    2. Configure the project

    하기 내용을 Gradle build file을 생성한다.

    프로젝트의 이름은 build.gradle로 한다. 

    buildscript {
        repositories {
            mavenCentral()
        }
        dependencies {
            classpath "com.github.jengelman.gradle.plugins:shadow:4.0.2"
        }
    }
    
    plugins {
        id "java"
        id "com.google.cloud.tools.jib" version "3.1.1"
        id "idea"
        id "eclipse"
        id "application"
    }
    
    version = "0.0.1"
    sourceCompatibility = "1.8"
    targetCompatibility = "1.8"
    application {
      mainClassName = "io.confluent.developer.KafkaStreamsApplication"
    }
    
    repositories {
        mavenCentral()
    
    
        maven {
            url "https://packages.confluent.io/maven"
        }
    }
    
    apply plugin: "com.github.johnrengelman.shadow"
    
    dependencies {
        implementation "org.slf4j:slf4j-simple:1.7.30"
        implementation "org.apache.kafka:kafka-streams:2.8.1"
        implementation 'com.github.javafaker:javafaker:1.0.2'
        testImplementation "org.apache.kafka:kafka-streams-test-utils:2.8.1"
        testImplementation "junit:junit:4.13.2"
        testImplementation 'org.hamcrest:hamcrest:2.2'
    }
    
    test {
        testLogging {
            outputs.upToDateWhen { false }
            showStandardStreams = true
            exceptionFormat = "full"
        }
    }
    
    jar {
      manifest {
        attributes(
          "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
          "Main-Class": "io.confluent.developer.KafkaStreamsApplication"
        )
      }
    }
    
    shadowJar {
        archivesBaseName = "creating-first-apache-kafka-streams-application-standalone"
        archiveClassifier = ''
    }

    configuration을 위치시킬 새로운 디렉토리를 생성한다. 

    $ pwd
    /Users/ksy/IdeaProjects/creating-first-apache-kafka-streams-application
    
    $ mkdir configurations

    kafka connection 정보를 입력할 dev.properties 파일을 생성한다. 

    $ vi configurations/dev.properties 
    
    application.id=kafka-streams-101
    bootstrap.servers=mybroker1:9092, mybroker2:9092, mybroker3:9092
    
    input.topic.name=test.sy.string.220228
    output.topic.name=stream.test.sy.string.220228
    
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    value.serializer=org.apache.kafka.common.serialization.StringSerializer

     

    3. Create a Utility class 

    java 파일들을 모아놓을 디렉토리를 생성한다. 

    $ pwd
    /Users/ksy/IdeaProjects/creating-first-apache-kafka-streams-application
    
    $ mkdir -p src/main/java/io/confluent/developer

    하기 유틸리티 클래스에는 kafka topic을 생성하고, 스트림즈 토폴로지를 실행하는데 사용할 수 있는 샘플 이벤트 데이터를 생성하는 함수가 포함되어 있다. 

    $ vi src/main/java/io/confluent/developer/Util.java
    
    package io.confluent.developer;
    
    import com.github.javafaker.Faker;
    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.DescribeTopicsResult;
    import org.apache.kafka.clients.admin.NewTopic;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.errors.TopicExistsException;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.*;
    import java.util.concurrent.*;
    import java.util.stream.Collectors;
    
    public class Util implements AutoCloseable {
    
        private final Logger logger = LoggerFactory.getLogger(Util.class);
        private ExecutorService executorService = Executors.newFixedThreadPool(1);
    
        public class Randomizer implements AutoCloseable, Runnable {
            private Properties props;
            private String topic;
            private Producer<String, String> producer;
            private boolean closed;
    
            public Randomizer(Properties producerProps, String topic) {
                this.closed = false;
                this.topic = topic;
                this.props = producerProps;
                this.props.setProperty("client.id", "faker");
            }
    
            public void run() {
                try (KafkaProducer producer = new KafkaProducer<String, String>(props)) {
                    Faker faker = new Faker();
                    while (!closed) {
                        try {
                            Object result = producer.send(new ProducerRecord<>(
                                    this.topic,
                                    faker.chuckNorris().fact())).get();
                            Thread.sleep(5000);
                        } catch (InterruptedException e) {
                        }
                    }
                } catch (Exception ex) {
                    logger.error(ex.toString());
                }
            }
            public void close()  {
                closed = true;
            }
        }
    
        public Randomizer startNewRandomizer(Properties producerProps, String topic) {
            Randomizer rv = new Randomizer(producerProps, topic);
            executorService.submit(rv);
            return rv;
        }
    
        public void createTopics(final Properties allProps, List<NewTopic> topics)
                throws InterruptedException, ExecutionException, TimeoutException {
            try (final AdminClient client = AdminClient.create(allProps)) {
                logger.info("Creating topics");
    
                client.createTopics(topics).values().forEach( (topic, future) -> {
                    try {
                        future.get();
                    } catch (Exception ex) {
                        logger.info(ex.toString());
                    }
                });
    
                Collection<String> topicNames = topics
                    .stream()
                    .map(t -> t.name())
                    .collect(Collectors.toCollection(LinkedList::new));
    
                logger.info("Asking cluster for topic descriptions");
                client
                    .describeTopics(topicNames)
                    .all()
                    .get(10, TimeUnit.SECONDS)
                    .forEach((name, description) -> logger.info("Topic Description: {}", description.toString()));
            }
        }
    
        public void close() {
            if (executorService != null) {
                executorService.shutdownNow();
                executorService = null;
            }
        }
    }

     

    4. Create the Kafka Streams topology 

    카프카 스트림즈는 processor topology를 통해 로직을 정의한다. 이때 nodes는 스트림 프로세서이고, edges는 스트림즈 이다. 

    해당 컴포넌트를 구현하는 방법은 Streams DSL과 Processor APi 두 가지가 존재한다. Streams DSL은 스트림, 테이블, 변환과 같은 기능을 빌트인으로 제공한다. Processor API는 스트림즈 DSL이 제공하지 않는 이외의 기능을 구현할 때 사용한다. 

    $ vi src/main/java/io/confluent/developer/KafkaStreamsApplication.java
    
    package io.confluent.developer;
    
    import org.apache.kafka.clients.admin.NewTopic;
    import org.apache.kafka.common.serialization.Serde;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.kstream.Consumed;
    import org.apache.kafka.streams.kstream.Produced;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.FileInputStream;
    import java.io.InputStream;
    import java.util.Arrays;
    import java.util.Optional;
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;
    
    public class KafkaStreamsApplication {
    
        private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsApplication.class);
    
        static void runKafkaStreams(final KafkaStreams streams) {
            final CountDownLatch latch = new CountDownLatch(1);
            streams.setStateListener((newState, oldState) -> {
                if (oldState == KafkaStreams.State.RUNNING && newState != KafkaStreams.State.RUNNING) {
                    latch.countDown();
                }
            });
    
            streams.start();
    
            try {
                latch.await();
            } catch (final InterruptedException e) {
                throw new RuntimeException(e);
            }
    
            logger.info("Streams Closed");
        }
        static Topology buildTopology(String inputTopic, String outputTopic) {
            Serde<String> stringSerde = Serdes.String();
    
            StreamsBuilder builder = new StreamsBuilder();
    
            builder
                .stream(inputTopic, Consumed.with(stringSerde, stringSerde))
                .peek((k,v) -> logger.info("Observed event: {}", v))
                .mapValues(s -> s.toUpperCase())
                .peek((k,v) -> logger.info("Transformed event: {}", v))
                .to(outputTopic, Produced.with(stringSerde, stringSerde));
    
            return builder.build();
        }
        public static void main(String[] args) throws Exception {
    
            if (args.length < 1) {
                throw new IllegalArgumentException("This program takes one argument: the path to a configuration file.");
            }
    
            Properties props = new Properties();
            try (InputStream inputStream = new FileInputStream(args[0])) {
                props.load(inputStream);
            }
    
            final String inputTopic = props.getProperty("input.topic.name");
            final String outputTopic = props.getProperty("output.topic.name");
    
            try (Util utility = new Util()) {
    
                utility.createTopics(
                        props,
                        Arrays.asList(
                                new NewTopic(inputTopic, Optional.empty(), Optional.empty()),
                                new NewTopic(outputTopic, Optional.empty(), Optional.empty())));
    
                // Ramdomizer only used to produce sample data for this application, not typical usage
                try (Util.Randomizer rando = utility.startNewRandomizer(props, inputTopic)) {
    
                    KafkaStreams kafkaStreams = new KafkaStreams(
                            buildTopology(inputTopic, outputTopic),
                            props);
    
                    Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
    
                    logger.info("Kafka Streams 101 App Started");
                    runKafkaStreams(kafkaStreams);
    
                }
            }
        }
    }

     

    5. Compile and run the Kafka Streams program 

    터미널에서 다음을 수행한다. 

    $ ./gradlew shadowJar
    
    Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
    Use '--warning-mode all' to show the individual deprecation warnings.
    See https://docs.gradle.org/6.8/userguide/command_line_interface.html#sec:command_line_warnings
    
    BUILD SUCCESSFUL in 1s
    2 actionable tasks: 2 up-to-date

    로컬 환경에 launch 한다. 

    $ java -jar build/libs/creating-first-apache-kafka-streams-application-*.jar configuration/dev.properties

    정상 기동 시, 다음과 같은 로그를 볼 수 있다. 

    [faker-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [faker] State transition from REBALANCING to RUNNING
    [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Observed event: apple1
    [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Transformed event: APPLE1
    [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Observed event: banana2
    [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Transformed event: BANANA2
    ...
    [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Observed event: Chuck Norris doesn't pair program.
    [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Transformed event: CHUCK NORRIS DOESN'T PAIR PROGRAM.
    [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Observed event: Chuck Norris' programs occupy 150% of CPU, even when they are not executing.
    [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Transformed event: CHUCK NORRIS' PROGRAMS OCCUPY 150% OF CPU, EVEN WHEN THEY ARE NOT EXECUTING.

     

    6. Stream events using a console consumer 

    스트림의 결과로 생성된 토픽을 확인한다. (각 메시지가 capitalized된 것을 확인할 수 있음)

    $ ./kafka-topics.sh --bootstrap-server mybroker1:9092 --list
    
    __consumer_offsets
    stream.test.sy.string.220228
    test.sy.string.220228

    스트림 결과인 output 토픽을 컨슈밍한다. 

    $ ./kafka-console-consumer.sh --bootstrap-server mybroker1:9092 --topic stream.test.sy.string.220228 --from-beginning
    
    APPLE1
    BANANA2
    ...
    CHUCK NORRIS DOESN'T PAIR PROGRAM.
    CHUCK NORRIS' PROGRAMS OCCUPY 150% OF CPU, EVEN WHEN THEY ARE NOT EXECUTING.

     

    • reference

    https://developer.confluent.io/tutorials/creating-first-apache-kafka-streams-application/kstreams.html#stream-events-using-a-console-consumer

     

    How to build your first Apache Kafka Streams application using Kafka Streams

    Using the Apache Kafka Streams DSL, create a stream processing topology to define your business logic. The example below reads events from the input topic using the stream function, processes events using the mapValues transformation, allows for debugging

    developer.confluent.io

     

    728x90

    '엔지니어링 > Kafka' 카테고리의 다른 글

    Kafka Monitoring with Prometheus & Grafana : 1  (0) 2022.04.28
    Kafka consumer group  (0) 2022.03.04
    Zero copy transfer  (0) 2022.02.24
    Jabba Utility  (0) 2022.02.15
    [구성] Mongo Sink Connector  (0) 2021.12.22

    댓글

Designed by Tistory.