-
Apache Kafka Streams 예제코드엔지니어링/Kafka 2022. 3. 3. 15:36728x90
1. Initialize the project
스트림 프로젝트를 실행할 새로운 디렉토리를 생성한다.
$ pwd /Users/ksy/IdeaProjects $ mkdir creating-first-apache-kafka-streams-application && cd creating-first-apache-kafka-streams-application
2. Configure the project
하기 내용을 Gradle build file을 생성한다.
프로젝트의 이름은 build.gradle로 한다.
buildscript { repositories { mavenCentral() } dependencies { classpath "com.github.jengelman.gradle.plugins:shadow:4.0.2" } } plugins { id "java" id "com.google.cloud.tools.jib" version "3.1.1" id "idea" id "eclipse" id "application" } version = "0.0.1" sourceCompatibility = "1.8" targetCompatibility = "1.8" application { mainClassName = "io.confluent.developer.KafkaStreamsApplication" } repositories { mavenCentral() maven { url "https://packages.confluent.io/maven" } } apply plugin: "com.github.johnrengelman.shadow" dependencies { implementation "org.slf4j:slf4j-simple:1.7.30" implementation "org.apache.kafka:kafka-streams:2.8.1" implementation 'com.github.javafaker:javafaker:1.0.2' testImplementation "org.apache.kafka:kafka-streams-test-utils:2.8.1" testImplementation "junit:junit:4.13.2" testImplementation 'org.hamcrest:hamcrest:2.2' } test { testLogging { outputs.upToDateWhen { false } showStandardStreams = true exceptionFormat = "full" } } jar { manifest { attributes( "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "), "Main-Class": "io.confluent.developer.KafkaStreamsApplication" ) } } shadowJar { archivesBaseName = "creating-first-apache-kafka-streams-application-standalone" archiveClassifier = '' }
configuration을 위치시킬 새로운 디렉토리를 생성한다.
$ pwd /Users/ksy/IdeaProjects/creating-first-apache-kafka-streams-application $ mkdir configurations
kafka connection 정보를 입력할 dev.properties 파일을 생성한다.
$ vi configurations/dev.properties application.id=kafka-streams-101 bootstrap.servers=mybroker1:9092, mybroker2:9092, mybroker3:9092 input.topic.name=test.sy.string.220228 output.topic.name=stream.test.sy.string.220228 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer
3. Create a Utility class
java 파일들을 모아놓을 디렉토리를 생성한다.
$ pwd /Users/ksy/IdeaProjects/creating-first-apache-kafka-streams-application $ mkdir -p src/main/java/io/confluent/developer
하기 유틸리티 클래스에는 kafka topic을 생성하고, 스트림즈 토폴로지를 실행하는데 사용할 수 있는 샘플 이벤트 데이터를 생성하는 함수가 포함되어 있다.
$ vi src/main/java/io/confluent/developer/Util.java package io.confluent.developer; import com.github.javafaker.Faker; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.TopicExistsException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.*; import java.util.stream.Collectors; public class Util implements AutoCloseable { private final Logger logger = LoggerFactory.getLogger(Util.class); private ExecutorService executorService = Executors.newFixedThreadPool(1); public class Randomizer implements AutoCloseable, Runnable { private Properties props; private String topic; private Producer<String, String> producer; private boolean closed; public Randomizer(Properties producerProps, String topic) { this.closed = false; this.topic = topic; this.props = producerProps; this.props.setProperty("client.id", "faker"); } public void run() { try (KafkaProducer producer = new KafkaProducer<String, String>(props)) { Faker faker = new Faker(); while (!closed) { try { Object result = producer.send(new ProducerRecord<>( this.topic, faker.chuckNorris().fact())).get(); Thread.sleep(5000); } catch (InterruptedException e) { } } } catch (Exception ex) { logger.error(ex.toString()); } } public void close() { closed = true; } } public Randomizer startNewRandomizer(Properties producerProps, String topic) { Randomizer rv = new Randomizer(producerProps, topic); executorService.submit(rv); return rv; } public void createTopics(final Properties allProps, List<NewTopic> topics) throws InterruptedException, ExecutionException, TimeoutException { try (final AdminClient client = AdminClient.create(allProps)) { logger.info("Creating topics"); client.createTopics(topics).values().forEach( (topic, future) -> { try { future.get(); } catch (Exception ex) { logger.info(ex.toString()); } }); Collection<String> topicNames = topics .stream() .map(t -> t.name()) .collect(Collectors.toCollection(LinkedList::new)); logger.info("Asking cluster for topic descriptions"); client .describeTopics(topicNames) .all() .get(10, TimeUnit.SECONDS) .forEach((name, description) -> logger.info("Topic Description: {}", description.toString())); } } public void close() { if (executorService != null) { executorService.shutdownNow(); executorService = null; } } }
4. Create the Kafka Streams topology
카프카 스트림즈는 processor topology를 통해 로직을 정의한다. 이때 nodes는 스트림 프로세서이고, edges는 스트림즈 이다.
해당 컴포넌트를 구현하는 방법은 Streams DSL과 Processor APi 두 가지가 존재한다. Streams DSL은 스트림, 테이블, 변환과 같은 기능을 빌트인으로 제공한다. Processor API는 스트림즈 DSL이 제공하지 않는 이외의 기능을 구현할 때 사용한다.
$ vi src/main/java/io/confluent/developer/KafkaStreamsApplication.java package io.confluent.developer; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Produced; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.FileInputStream; import java.io.InputStream; import java.util.Arrays; import java.util.Optional; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class KafkaStreamsApplication { private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsApplication.class); static void runKafkaStreams(final KafkaStreams streams) { final CountDownLatch latch = new CountDownLatch(1); streams.setStateListener((newState, oldState) -> { if (oldState == KafkaStreams.State.RUNNING && newState != KafkaStreams.State.RUNNING) { latch.countDown(); } }); streams.start(); try { latch.await(); } catch (final InterruptedException e) { throw new RuntimeException(e); } logger.info("Streams Closed"); } static Topology buildTopology(String inputTopic, String outputTopic) { Serde<String> stringSerde = Serdes.String(); StreamsBuilder builder = new StreamsBuilder(); builder .stream(inputTopic, Consumed.with(stringSerde, stringSerde)) .peek((k,v) -> logger.info("Observed event: {}", v)) .mapValues(s -> s.toUpperCase()) .peek((k,v) -> logger.info("Transformed event: {}", v)) .to(outputTopic, Produced.with(stringSerde, stringSerde)); return builder.build(); } public static void main(String[] args) throws Exception { if (args.length < 1) { throw new IllegalArgumentException("This program takes one argument: the path to a configuration file."); } Properties props = new Properties(); try (InputStream inputStream = new FileInputStream(args[0])) { props.load(inputStream); } final String inputTopic = props.getProperty("input.topic.name"); final String outputTopic = props.getProperty("output.topic.name"); try (Util utility = new Util()) { utility.createTopics( props, Arrays.asList( new NewTopic(inputTopic, Optional.empty(), Optional.empty()), new NewTopic(outputTopic, Optional.empty(), Optional.empty()))); // Ramdomizer only used to produce sample data for this application, not typical usage try (Util.Randomizer rando = utility.startNewRandomizer(props, inputTopic)) { KafkaStreams kafkaStreams = new KafkaStreams( buildTopology(inputTopic, outputTopic), props); Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close)); logger.info("Kafka Streams 101 App Started"); runKafkaStreams(kafkaStreams); } } } }
5. Compile and run the Kafka Streams program
터미널에서 다음을 수행한다.
$ ./gradlew shadowJar Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0. Use '--warning-mode all' to show the individual deprecation warnings. See https://docs.gradle.org/6.8/userguide/command_line_interface.html#sec:command_line_warnings BUILD SUCCESSFUL in 1s 2 actionable tasks: 2 up-to-date
로컬 환경에 launch 한다.
$ java -jar build/libs/creating-first-apache-kafka-streams-application-*.jar configuration/dev.properties
정상 기동 시, 다음과 같은 로그를 볼 수 있다.
[faker-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [faker] State transition from REBALANCING to RUNNING [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Observed event: apple1 [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Transformed event: APPLE1 [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Observed event: banana2 [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Transformed event: BANANA2 ... [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Observed event: Chuck Norris doesn't pair program. [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Transformed event: CHUCK NORRIS DOESN'T PAIR PROGRAM. [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Observed event: Chuck Norris' programs occupy 150% of CPU, even when they are not executing. [faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Transformed event: CHUCK NORRIS' PROGRAMS OCCUPY 150% OF CPU, EVEN WHEN THEY ARE NOT EXECUTING.
6. Stream events using a console consumer
스트림의 결과로 생성된 토픽을 확인한다. (각 메시지가 capitalized된 것을 확인할 수 있음)
$ ./kafka-topics.sh --bootstrap-server mybroker1:9092 --list __consumer_offsets stream.test.sy.string.220228 test.sy.string.220228
스트림 결과인 output 토픽을 컨슈밍한다.
$ ./kafka-console-consumer.sh --bootstrap-server mybroker1:9092 --topic stream.test.sy.string.220228 --from-beginning APPLE1 BANANA2 ... CHUCK NORRIS DOESN'T PAIR PROGRAM. CHUCK NORRIS' PROGRAMS OCCUPY 150% OF CPU, EVEN WHEN THEY ARE NOT EXECUTING.
- reference
728x90'엔지니어링 > Kafka' 카테고리의 다른 글
Kafka Monitoring with Prometheus & Grafana : 1 (0) 2022.04.28 Kafka consumer group (0) 2022.03.04 Zero copy transfer (0) 2022.02.24 Jabba Utility (0) 2022.02.15 [구성] Mongo Sink Connector (0) 2021.12.22