-
Apache Kafka Streams 예제코드엔지니어링/Kafka 2022. 3. 3. 15:36728x90
1. Initialize the project
스트림 프로젝트를 실행할 새로운 디렉토리를 생성한다.
$ pwd /Users/ksy/IdeaProjects $ mkdir creating-first-apache-kafka-streams-application && cd creating-first-apache-kafka-streams-application
2. Configure the project
하기 내용을 Gradle build file을 생성한다.
프로젝트의 이름은 build.gradle로 한다.
buildscript { repositories { mavenCentral() } dependencies { classpath "com.github.jengelman.gradle.plugins:shadow:4.0.2" } } plugins { id "java" id "com.google.cloud.tools.jib" version "3.1.1" id "idea" id "eclipse" id "application" } version = "0.0.1" sourceCompatibility = "1.8" targetCompatibility = "1.8" application { mainClassName = "io.confluent.developer.KafkaStreamsApplication" } repositories { mavenCentral() maven { url "https://packages.confluent.io/maven" } } apply plugin: "com.github.johnrengelman.shadow" dependencies { implementation "org.slf4j:slf4j-simple:1.7.30" implementation "org.apache.kafka:kafka-streams:2.8.1" implementation 'com.github.javafaker:javafaker:1.0.2' testImplementation "org.apache.kafka:kafka-streams-test-utils:2.8.1" testImplementation "junit:junit:4.13.2" testImplementation 'org.hamcrest:hamcrest:2.2' } test { testLogging { outputs.upToDateWhen { false } showStandardStreams = true exceptionFormat = "full" } } jar { manifest { attributes( "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "), "Main-Class": "io.confluent.developer.KafkaStreamsApplication" ) } } shadowJar { archivesBaseName = "creating-first-apache-kafka-streams-application-standalone" archiveClassifier = '' }
configuration을 위치시킬 새로운 디렉토리를 생성한다.
$ pwd /Users/ksy/IdeaProjects/creating-first-apache-kafka-streams-application $ mkdir configurations
kafka connection 정보를 입력할 dev.properties 파일을 생성한다.
$ vi configurations/dev.properties application.id=kafka-streams-101 bootstrap.servers=mybroker1:9092, mybroker2:9092, mybroker3:9092 input.topic.name=test.sy.string.220228 output.topic.name=stream.test.sy.string.220228 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer
3. Create a Utility class
java 파일들을 모아놓을 디렉토리를 생성한다.
$ pwd /Users/ksy/IdeaProjects/creating-first-apache-kafka-streams-application $ mkdir -p src/main/java/io/confluent/developer
하기 유틸리티 클래스에는 kafka topic을 생성하고, 스트림즈 토폴로지를 실행하는데 사용할 수 있는 샘플 이벤트 데이터를 생성하는 함수가 포함되어 있다.
$ vi src/main/java/io/confluent/developer/Util.java package io.confluent.developer; import com.github.javafaker.Faker; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.TopicExistsException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.*; import java.util.stream.Collectors; public class Util implements AutoCloseable { private final Logger logger = LoggerFactory.getLogger(Util.class); private ExecutorService executorService = Executors.newFixedThreadPool(1); public class Randomizer implements AutoCloseable, Runnable { private Properties props; private String topic; private Producer<String, String> producer; private boolean closed; public Randomizer(Properties producerProps, String topic) { this.closed = false; this.topic = topic; this.props = producerProps; this.props.setProperty("client.id", "faker"); } public void run() { try (KafkaProducer producer = new KafkaProducer<String, String>(props)) { Faker faker = new Faker(); while (!closed) { try { Object result = producer.send(new ProducerRecord<>( this.topic, faker.chuckNorris().fact())).get(); Thread.sleep(5000); } catch (InterruptedException e) { } } } catch (Exception ex) { logger.error(ex.toString()); } } public void close() { closed = true; } } public Randomizer startNewRandomizer(Properties producerProps, String topic) { Randomizer rv = new Randomizer(producerProps, topic); executorService.submit(rv); return rv; } public void createTopics(final Properties allProps, List<NewTopic> topics) throws InterruptedException, ExecutionException, TimeoutException { try (final AdminClient client = AdminClient.create(allProps)) { logger.info("Creating topics"); client.createTopics(topics).values().forEach( (topic, future) -> { try { future.get(); } catch (Exception ex) { logger.info(ex.toString()); } }); Collection<String> topicNames = topics .stream() .map(t -> t.name()) .collect(Collectors.toCollection(LinkedList::new)); logger.info("Asking cluster for topic descriptions"); client .describeTopics(topicNames) .all() .get(10, TimeUnit.SECONDS) .forEach((name, description) -> logger.info("Topic Description: {}", description.toString())); } } public void close() { if (executorService != null) { executorService.shutdownNow(); executorService = null; } } }
4. Create the Kafka Streams topology
카프카 스트림즈는 processor topology를 통해 로직을 정의한다. 이때 nodes는 스트림 프로세서이고, edges는 스트림즈 이다.
해당 컴포넌트를 구현하는 방법은 Streams DSL과 Processor APi 두 가지가 존재한다. Streams DSL은 스트림, 테이블, 변환과 같은 기능을 빌트인으로 제공한다. Processor API는 스트림즈 DSL이 제공하지 않는 이외의 기능을 구현할 때 사용한다.
$ vi src/main/java/io/confluent/developer/KafkaStreamsApplication.java package io.confluent.developer; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Produced; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.FileInputStream; import java.io.InputStream; import java.util.Arrays; import java.util.Optional; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class KafkaStreamsApplication { private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsApplication.class); static void runKafkaStreams(final KafkaStreams streams) { final CountDownLatch latch = new CountDownLatch(1); streams.setStateListener((newState, oldState) -> { if (oldState == KafkaStreams.State.RUNNING && newState != KafkaStreams.State.RUNNING) { latch.countDown(); } }); streams.start(); try { latch.await(); } catch (final InterruptedException e) { throw new RuntimeException(e); } logger.info("Streams Closed"); } static Topology buildTopology(String inputTopic, String outputTopic) { Serde<String> stringSerde = Serdes.String(); StreamsBuilder builder = new StreamsBuilder(); builder .stream(inputTopic, Consumed.with(stringSerde, stringSerde)) .peek((k,v) -> logger.info("Observed event: {}", v)) .mapValues(s -> s.toUpperCase()) .peek((k,v) -> logger.info("Transformed event: {}", v)) .to(outputTopic, Produced.with(stringSerde, stringSerde)); return builder.build(); } public static void main(String[] args) throws Exception { if (args.length < 1) { throw new IllegalArgumentException("This program takes one argument: the path to a configuration file."); } Properties props = new Properties(); try (InputStream inputStream = new FileInputStream(args[0])) { props.load(inputStream); } final String inputTopic = props.getProperty("input.topic.name"); final String outputTopic = props.getProperty("output.topic.name"); try (Util utility = new Util()) { utility.createTopics( props, Arrays.asList( new NewTopic(inputTopic, Optional.empty(), Optional.empty()), new NewTopic(outputTopic, Optional.empty(), Optional.empty()))); // Ramdomizer only used to produce sample data for this application, not typical usage try (Util.Randomizer rando = utility.startNewRandomizer(props, inputTopic)) { KafkaStreams kafkaStreams = new KafkaStreams( buildTopology(inputTopic, outputTopic), props); Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close)); logger.info("Kafka Streams 101 App Started"); runKafkaStreams(kafkaStreams); } } } }
5. Compile and run the Kafka Streams program
터미널에서 다음을 수행한다.
$ ./gradlew shadowJar Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0. Use '--warning-mode all' to show the individual deprecation warnings. See https://docs.gradle.org/6.8/userguide/command_line_interface.html#sec:command_line_warnings BUILD SUCCESSFUL in 1s 2 actionable tasks: 2 up-to-date
로컬 환경에 launch 한다.
$ java -jar build/libs/creating-first-apache-kafka-streams-application-*.jar configuration/dev.properties
정상 기동 시, 다음과 같은 로그를 볼 수 있다.
[faker-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [faker] State transition from REBALANCING to RUNNING [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Observed event: apple1 [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Transformed event: APPLE1 [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Observed event: banana2 [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Transformed event: BANANA2 ... [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Observed event: Chuck Norris doesn't pair program. [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Transformed event: CHUCK NORRIS DOESN'T PAIR PROGRAM. [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Observed event: Chuck Norris' programs occupy 150% of CPU, even when they are not executing. [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Transformed event: CHUCK NORRIS' PROGRAMS OCCUPY 150% OF CPU, EVEN WHEN THEY ARE NOT EXECUTING.
6. Stream events using a console consumer
스트림의 결과로 생성된 토픽을 확인한다. (각 메시지가 capitalized된 것을 확인할 수 있음)
$ ./kafka-topics.sh --bootstrap-server mybroker1:9092 --list __consumer_offsets stream.test.sy.string.220228 test.sy.string.220228
스트림 결과인 output 토픽을 컨슈밍한다.
$ ./kafka-console-consumer.sh --bootstrap-server mybroker1:9092 --topic stream.test.sy.string.220228 --from-beginning APPLE1 BANANA2 ... CHUCK NORRIS DOESN'T PAIR PROGRAM. CHUCK NORRIS' PROGRAMS OCCUPY 150% OF CPU, EVEN WHEN THEY ARE NOT EXECUTING.
- reference
How to build your first Apache Kafka Streams application using Kafka Streams
Using the Apache Kafka Streams DSL, create a stream processing topology to define your business logic. The example below reads events from the input topic using the stream function, processes events using the mapValues transformation, allows for debugging
developer.confluent.io
728x90'엔지니어링 > Kafka' 카테고리의 다른 글
Kafka Monitoring with Prometheus & Grafana : 1 (0) 2022.04.28 Kafka consumer group (0) 2022.03.04 Zero copy transfer (0) 2022.02.24 Jabba Utility (0) 2022.02.15 [구성] Mongo Sink Connector (0) 2021.12.22