Kafka
지나침은 미치지 못함과 같다.
메시징 시스템
- Message Oriented Middleware : 메시지 지향 미들웨어
일반적으로 메시징 시스템이나 메시지 지향 미들웨어(MOM, Message Oriented Middlware) 라고 부르는 별도의 소프트웨어 시스템이 메시징 기능을 제공한다. 데이터 베이스 시스템이 데이터의 지속성을 관리하는 것처럼 메시징 시스템은 메시징을 관리한다. 애플리케이션이 데이터를 이용할 수 있게 하려면 관리자가 데이터베이스에 스키마를 생성해야 하는 것처럼, 관리자는 애플리케이션이 통신할 수 있게 메시징 시스템에 통신 경로인 채널을 구성해야한다. 메시징 시스템은 메시징 송수신을 관리하고 조정한다. 데이터 베이스 시스템의 주된 목적이 데이터 레코드를 안전하게 시족하는 것이듯이, 메시징 시스템의 주요 작업은 메시지를 발신자의 컴퓨터에서 신뢰할 수 있는 방식으로 수신자의 컴퓨터로 이동시키는 것이다.
- 생성 : 발신자는 메시지를 생성하고 데이터를 채운다.
- 발신 : 발신자는 메시지를 채널에 쓴다.
- 전달 : 메시징 시스템은 수신자가 사용할 수 있게 발신자의 컴퓨터에서 수신자 컴퓨터로 메시지를 이동시킨다.
- 수신 : 수신자는 채널에서 메시지를 읽는다.
-
처리 : 수신자는 메시지에서 데이터를 추출한다.
-
Messaging Systems
- Apache Kafka
- Google Pub/Sub
- RabbitMQ
Apache Kafka
아파치 카프카의 활용 범위 ( Event Streaming )
- 주식변화, 은행, 보험 등과 같이 실시간의 지불/금융 트랜잭션을 처리하기 위해서
- 자동화 산업에서 실시간의 배송/차량,트럭,배송업체에 대한 모니터링 및 추적을 위해서
- IOT 장비로 부터 센서 데이터를 지속적으로 분석하고, 보관하기 위해서
- 모바일 어플리케이션, 여행 산업, 호텔, 도매에서 고객과의 상호작용과 주문에 즉각적으로 응답하고, 수집하기 위해서
- 병원에서 환자 관리를 위해서
등등..
Event Streaming Platform
- 다른 시스템으로 부터의 우리의 데이터을 지속적으로 Export/Import 를 위한, 이벤트 스트림을 구독하고 배포하기 위해서
- 지속적이로 신뢰가능하게 이벤트 스트림을 저장하기 위해서
- 이벤트 스트림을 처리하기 위해서
Kafka의 대략적인 동작 방식
고성능의 TCP Network Protocol을 통해 통신(상호작용)하는 서버와 클라이언트들로 구성된 분산된 시스템이다.
Servers
Kafka는 다중 데이터 센터또는 클라우드 영역을 확장할 수 있도록 한개, 그 이상의 서버의 단위로서 운영된다. 그 중의 일부 서버는 브로커(Broker)라고 불리는 Storage Layer를 구성한다. 그외 서버는 Kafka Connect가 지속적으로 우리가 구성한 시스템에 적용된 Kafka를 통합하기 위한 이벤트 스트림 데이터를 import/export 하기위해서 운영한다. Mission-Critical , fault-tolerant를 구현하기 위함으로 만약 서버 하나가 장애가 생겼을 때, 다른 서버가 해당 서버가 하던 작업을 데이터 유실 없이 이어받아 동작할 것이다.
Clients
병렬적으로 이벤트 스트림을 처리하고, 쓰고, 읽는 마이크로 서비스 와 분산 어플리케이션을 만들 수 있도록 해주었다. 이 때 네트워크 문제, 기기 장애 등에 대한 장애 내성을 고려한다. Clients 로 사용 가능한 언어는 JAVA, Scala, Go, Python, C++/C 와 많은 프로그래밍 언어들이 있다.
Main Concepts and Terminology
- Event
하나의 비즈니스 또는 세계에서 “무엇인가 일어나게되는” 이벤트이다. 데이터를 Kafka에 쓰거나 읽을 때, 하나의 이벤트를 구성한다. 개념적으로 이벤트는 키, 값, 시간 그리고 메타데이터를 가지고 있다.
- Producers
이벤트를 카프카에 배포(쓰는)역할을 한다.
- Consumers
이벤트를 구독(읽는)하는 역할을 한다.
- Topics
이벤트는 구조화되고, 지속적으로 Topics 안에 저장된다.
- Partitioned
Kafka의 Topic은 partition이라는 단위로 쪼개어져 클러스터의 각 서버들에 분산되어 저장되고, 고가용성을 위해 복제 설정을 할 경우 이또한 partition 단위로 각 서버들에 분산되어 복제되고 장애가 발생하면 partition 단위로 fail over가 수행된다.
Kafka에서는 consumer group이라는 개념을 도입하여 두가지 모델을 발행-구독 모델로 일반화하였다. Kafka의 partition은 consumer group당 오로지 하나의 consumer의 접근만을 허용하며, 해당 consumer를 partition owner라고 부른다. 따라서 동일한 consumer group에 속하는 consumer끼리는 동일한 partition에 접근할 수 없다.
- Replicated
Kafka APIs
- Admin API
topics, brokers와 다른 Kafka 오브젝트를 검사하고 관리하기 위해서 사용된다.
- Producer API
Event 스트림을 하나또는 그 이상의 Kafka Topics 배포하는 역학을 한다.
- Consumer API
하나, 하나이상의 topics 를 구독하고, 이벤트 스트림을 처리하기 위해서 사용한다.
- Kafka Streams API
스트림 프로세싱 및 마이트로 서비스를 구현하기 위해서이다. 이벤트 스트림에 대해서 변환, 병합, 조인 등을 처리하기 위해서 고 차원의 함수를 제공한다.
- Kafka Connect API
Kafka Client로 지윈되는 목록
- https://cwiki.apache.org/confluence/display/KAFKA/Clients
Kafka의 기본 설정
Kafka는 기본적으로 키/값의 쌍인 프로파티 파일로 구성되어 있다.
Broker의 Config 설정
https://kafka.apache.org/documentation/#brokerconfigs
Topic Config 설정
https://kafka.apache.org/documentation/#topicconfigs
Producer Config 세팅
https://kafka.apache.org/documentation/#producerconfigs
Consumer Config 세팅
https://kafka.apache.org/documentation/#consumerconfigs
Zookeeper
https://kafka.apache.org/documentation/#zkops
Kafka 기본 포트 9092 구성후 Spring Boot 연동 샘플
plugins {
id 'org.springframework.boot' version '2.5.2'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
}
group = 'com.lines'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.apache.kafka:kafka-streams'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
test {
useJUnitPlatform()
}
- Spring Consumer 기동 시 Topic도 신규로 생성한다.
package com.lines.sample.consumer;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@KafkaListener(id = "myId", topics = "topic1")
public void listen(String in) {
System.out.println(in);
}
}
- Spring Consumer를 기동 시키고 난 뒤 producer를 호출 한다.
package com.lines.sample.producer;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("topic1", "test");
};
}
}
Kafka Command 명령어 [주로 사용 하는 것] Linux - 2021-09-21 기준
- zookeeper 의 background 기동 ( 데몬)
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
- kafka의 background 기동 ( 데몬 )
bin/kafka-server-start.sh -daemon config/server.properties
- topic 목록 조회하기
bin/kafka-topics.sh --list --zookeeper localhost:2181
- topic의 세부내역 확인하기 ( Partition , Replica )
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
실행 결과
Topic: test TopicId: zME3RUR8TU2oDBM9sGBX3w PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
- 생성한 topic 삭제하기
./zookeeper-shell.sh localhost:2181
# zookeeper shell 내에서
ls /brokers/topics
[__consumer_offsets, test]
# 삭제시 - rmr이 deprecated되어서 deleteall을 추천
rmr /brokers/topics/test
The command 'rmr' has been deprecated. Please use 'deleteall' instead.
deleteall /brokers/topics/test # 이 명령어를 더 추천함.
Kafka Command 명령어 [주로 사용 하는 것] Windows
- zooper 기동하기
.\zookeeper-server-start.bat ..\..\config\zookeeper.properties
- kafka server 기동하기
.\kafka-server-start.bat ..\..\config\server.properties
- topic 생성
.\bin\windows\kafka-topic.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
- console의 producer에서 메세지 전송하기
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
- C# 기반 Confluent Kafka 활용
using Confluent.Kafka;
using System;
using System.Threading;
namespace kafka_sample
{
class Program
{
static void Main(string[] args)
{
var config = new ConsumerConfig
{
GroupId = "test-consumer-group",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe("test");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = consumer.Consume(cts.Token);
Console.WriteLine($"Consumed Message: '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch(ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
consumer.Close();
}
}
}
}
}
- 실제 실행 결과
출처: https://epicdevs.com/17 [Epic Developer]