본문 바로가기

카프카

Kafka 개념 정리 및 실습 (카프카, spring, docker)

참고 블로그

https://techblog.woowahan.com/17386/

1. Kafka란?

  • 분산 스트리밍 플랫폼으로 대용량 데이터의 실시간 처리 및 전송을 지원하는 메시지 브로커 시스템
  • Message Queue
    • 전통적인 mesaage queue와 다르게  메시지 큐의 장점과 로그 시스템의 특성을 결합한 분산 스트리밍 플랫폼
      항목 전통적 MQ kafka MQ
      메시지 저장 소비후 삭제됨 로그처럼 유지
      처리속도 낮음(트렌젝션보장) 높음(분산처리)
      확장성 보로커 개수 제한적 분산처리 가능
      순서보장 FIFO지원 Partition 단위로 순서 보장
  • 메세지 전달의 중앙 플랫폼으로 두고 필요한 모든 데이터 시스템과 연결된 파이프라인을 만드는 것을 지향함
  • Kafka는 확장성, 고성능, 데이터 스트리밍 처리에 최적화된 시스템으로 메시지 큐뿐만 아니라 데이터 파이프라인, 이벤트 기반 시스템에서도 널리 사용됨.
  • 분산 메세징 시스템, 확장성, 고가용성
  • 로그 형식으로 파일 시스템에 기록
    • 로그를 한곳에 모아 처리(중앙집중화)
  • 버전
    • Kafka 3.x: Java 11 이상 지원
    • Kafka 2.x: Java 8 이상 지원

2. 카프카 용어

1. Kafka 클러스터

  • Kafka 클러스터는 여러 개의 Kafka Broker가 모여 있는 분산 환경을 의미함.
  • 고가용성(HA, High Availability)과 확장성(Scalability)을 제공함.
  • 데이터가 파티션(Partition)을 통해 여러 브로커에 분산 저장됨.
  • Producer가 데이터를 클러스터 내의 특정 토픽(Topic)에 전송하면, Consumer가 이를 구독함.

2. Broker

  • Kafka 브로커는 Kafka 서버의 개별 인스턴스를 의미함.
  • 클러스터 내에서 메시지를 저장하고, Producer-Consumer 간 메시지를 전달함.
  • 여러 개의 브로커로 구성된 클러스터에서 데이터를 분산 저장하여 확장 가능함.
  • 하나의 브로커가 다운되면, 다른 브로커가 데이터를 유지하여 장애 복구(Failover)가 가능함.

3. Zookeeper

  • Kafka의 클러스터 상태를 관리하는 분산 코디네이터 역할.
  • Broker의 리더 선출 및 상태 관리를 담당함.
  • Consumer의 오프셋(Offset) 정보 관리 (Kafka 2.x 이하에서 사용).
  • Broker가 추가되거나 제거될 때 이를 감지하여 클러스터 구성을 유지함.
  • Kafka 3.x부터는 Zookeeper 없이 KRaft 모드 사용 가능.

4. Producer (생산자)

  • Kafka에서 데이터를 생성하여 Topic으로 전송하는 역할.
  • Producer는 특정 Topic으로 메시지를 전송함.
  • Partition Key를 사용하면 특정 Partition으로 메시지를 보낼 수 있음.
  • 메시지는 Kafka에 의해 복제 및 저장됨.
  • 비동기 방식으로 데이터를 전송함.
  • 메시지 압축(Snappy, Gzip) 지원 → 성능 최적화 가능.
  • acks 설정을 통해 메시지 전송 보장 수준 조절 가능 (acks=all이면 모든 복제본 저장 후 응답).

5. Consumer (소비자)

  • Kafka에서 메시지를 가져와 처리하는 역할.
  • Consumer는 특정 Topic을 구독하여 메시지를 읽음.
  • Consumer Group을 사용하면 여러 Consumer가 병렬로 메시지를 소비 가능.
  • Offset을 기반으로 이전에 읽은 메시지 위치를 추적하여 중복 소비를 방지함.
  • auto.offset.reset 옵션으로 처음부터 소비할지, 최신 메시지만 받을지 설정 가능.
  • enable.auto.commit 설정을 통해 자동 오프셋 커밋 가능.
  • Consumer Group 내에서는 하나의 Partition을 하나의 Consumer만 처리하여 로드 밸런싱 가능.

7. Offset (오프셋)

  • Kafka에서 Partition 내에서 메시지가 저장된 위치를 나타내는 번호.
  • Consumer는 Offset을 기반으로 이전에 읽은 메시지 위치를 추적하여 중복 소비를 방지함.
  • Partition 단위로 관리되며, 각 Partition은 독립적인 Offset을 가짐.
  • Kafka 0.9 이상부터는 Offset을 Kafka 내부 Topic (__consumer_offsets)에 저장하여 관리.
  • Consumer Group별로 각각의 Offset을 독립적으로 유지할 수 있음.
  • auto.offset.reset 옵션을 통해 처음부터 읽을지, 최신 메시지만 읽을지 설정 가능.
  • enable.auto.commit 설정을 통해 Offset을 자동으로 커밋할지, 수동으로 커밋할지 설정 가능.
  • Consumer가 특정 Offset으로 이동하여 원하는 위치부터 메시지를 읽을 수도 있음.
  • Offset은 Kafka가 Append-Only 로그 구조로 동작하므로 순차적으로 증가함

8. Topic (토픽)

  • Kafka에서 데이터를 저장하는 논리적인 메시지 분류 단위.
  • Producer는 Topic으로 메시지를 보내고, Consumer는 해당 Topic을 구독함.
  • 하나의 Topic은 여러 개의 Partition으로 나뉘어 저장됨.
  • Retention 설정을 통해 특정 기간 동안 메시지를 저장 가능.

9. Partition (파티션)

  • 파티션은 토픽을 물리적으로 분할하여 여러 브로커에 나누어 저장하는 단위임.
  • Topic을 분산 저장 및 병렬 처리하기 위해 나눔
  • 같은 Topic이라도 여러 Partition으로 나뉘어 다수의 Broker에 저장됨.
  • Partition 내부에서는 메시지가 순서대로 저장되며, Partition별로 고유한 Leader Broker가 존재함.
  • Consumer Group을 사용하면 여러 Consumer가 Partition을 나눠 처리할 수 있음.
  • 하나의 Partition 내에서는 메시지 순서 보장됨.
  • 여러 Partition을 사용하면 병렬 처리(Throughput) 향상 가능.

10. Record (레코드)

  • Kafka에서 전송되는 하나의 메시지 단위.
  • Producer가 보낸 Key, Value, Timestamp, Offset 등의 정보를 포함함.
  • Record는 파티션에 저장됨.

11. Kafka Connector (Kafka Connect)

  • Kafka와 외부 시스템(MySQL, Elasticsearch, AWS S3 등)을 연동하는 기능.
  • Source Connector: 외부 데이터베이스(MySQL, MongoDB 등)에서 Kafka로 데이터 가져오기.
  • Sink Connector: Kafka 데이터를 외부 시스템(Elasticsearch, HDFS, PostgreSQL 등)으로 내보내기.
  • Kafka Connect를 사용하면 복잡한 ETL 작업 없이 데이터 이동 가능.
  • 확장성이 뛰어나고, Kafka Streams와 함께 사용 가능.
  • 다양한 오픈소스 및 상용 커넥터 제공됨.

개념 설명 특징

Kafka 클러스터 여러 개의 Broker가 모여 있는 분산 환경 고가용성, 확장 가능
Broker 메시지를 저장하고 관리하는 Kafka 서버 다수의 Broker로 클러스터 구성 가능
Zookeeper 클러스터 및 Broker 상태 관리 Kafka 3.x부터 KRaft로 대체 가능
Producer 메시지를 생성하여 Kafka로 전송 비동기 전송, 압축 가능
Consumer Kafka에서 메시지를 가져와 처리 Consumer Group을 사용하면 병렬 소비 가능
Topic 메시지를 저장하는 논리적 단위 여러 Partition으로 나뉨
Partition Topic을 나누어 저장하는 단위 병렬 처리 가능, Partition 내에서는 순서 보장
Record Kafka에서 전송되는 개별 메시지 Key, Value, Offset 포함
Offset Partition 내 메시지의 위치 Consumer가 어디까지 읽었는지 저장
Kafka Connect 외부 시스템과 Kafka 연동 다양한 커넥터 지원

 


3. 개발 흐름도

3-1. Kafka 클러스터 생성

  • 도커를 통해 kafka 컨테이너를 실행 ( Kafka 브로커 컨테이너를 실행 = 카프카 클러스터)
    • 컨테이너 1개에서 Kafka 브로커 1개만 실행하면, 단일 브로커 환경임.(단일 노드 카프카)
    • 여러 개의 Kafka 브로커 컨테이너를 실행해야 Kafka 클러스터가 됨.
  • Zookeeper 실행 후 (Kafka 2.x 사용 시 필요) , 카프카 브로커와 연결
# 단일 브로커 
docker run -d --name kafka-broker \
  -p 9092:9092 \
  -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  confluentinc/cp-kafka
  
  
# 카프카 클러스터 
# docker-compose.yml 
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka1:
    image: confluentinc/cp-kafka
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092

  kafka2:
    image: confluentinc/cp-kafka
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9093

  kafka3:
    image: confluentinc/cp-kafka
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9094

다중 브로커를 사용하는 이유

  • “확장성 있고 안정적인 메시징 시스템”으로 만들기 위해 다중 브로커를 사용함! 🚀
  1. 장애 대비 : 리더 브로커가 다운되더라도 다른 브로커가 대신 처리
  2. 확장성 증가 : 하나의 브로커는 메세지 처리량이 제한적, 토픽을 파티셔닝하여 여러 서버에서 동시에 처리
  3. 처리량 증가 : 여러 브로커가 각기 다른 파티션을 처리하여 속도를 높임
    • consumer group을 활용해 여러 consumer가 여러 브로커의 데이터를 동시에 소비
  4. 부하 분산
    • 메시지가 균등하게 분산, 과부화 방지

3-2. Spring Boot에서 Kafka 사용하기 (토픽 생성, Producer, Consumer)

Spring Boot에서는 Spring Kafka 라이브러리를 사용하여 Kafka Producer와 Consumer를 쉽게 구현할 수 있음.

🔹 build.gradle 또는 pom.xml에 Kafka 의존성 추가

🔹 Gradle (build.gradle)

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.kafka:spring-kafka'
}

🔹 Maven (pom.xml)

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

✅ 이제 Spring Boot에서 Kafka를 사용할 준비 완료! 🚀


2️⃣ Kafka 설정 (application.yml)

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      retries: 3
      acks: all
    consumer:
      group-id: my-group
      auto-offset-reset: earliest

✅ bootstrap-servers는 Kafka 브로커 주소를 설정해야 함.

✅ group-id는 Consumer 그룹을 지정하는 값.


3️⃣ Kafka 토픽 생성 (자동 생성)

Kafka의 토픽을 Spring Boot에서 자동으로 생성할 수도 있음.

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaTopicConfig {

    @Bean
    public NewTopic orderTopic() {
        return new NewTopic("order-events", 3, (short) 1);
    }

    @Bean
    public NewTopic paymentTopic() {
        return new NewTopic("payment-events", 3, (short) 1);
    }

    @Bean
    public NewTopic inventoryTopic() {
        return new NewTopic("inventory-events", 3, (short) 1);
    }
}

✅ NewTopic을 Bean으로 등록하면 Spring Boot 실행 시 자동으로 토픽이 생성됨.

  • NewTopic파라미터 상세
    1. "my-spring-topic"
      • 생성할 Kafka 토픽 이름을 지정함.
      • Producer와 Consumer는 이 토픽을 사용하여 메시지를 주고받음.
    2. 3 (파티션 개수, Partitions)
      • 이 토픽을 몇 개의 파티션으로 나눌 것인지 지정.
      • 파티션이 많을수록 병렬 처리가 가능하지만, 너무 많으면 관리가 어려울 수 있음.
      • 예: 3으로 설정하면 이 토픽의 데이터가 3개의 파티션에 분산 저장됨.
      • Consumer는 파티션 단위로 데이터를 가져가기 때문에 성능과 확장성에 영향을 줌.
    3. (short) 1 (복제 개수, Replication Factor)
      • 이 토픽의 데이터를 몇 개의 브로커에 복제할 것인지 지정.
      • 1이면 복제본이 없고, 단일 브로커에서만 저장됨 (데이터 유실 위험이 있음).
      • 3으로 설정하면 Kafka 클러스터 내에서 데이터가 3개의 브로커에 복제됨 (장애 대비 가능).

4️⃣ Kafka Producer (메시지 전송)

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        kafkaTemplate.send("my-spring-topic", message);
    }
}

✅ KafkaTemplate.send()를 사용하여 "my-spring-topic"으로 메시지 전송.

✅ Producer를 사용하려면 Controller에서 호출하면 됨.


5️⃣ Kafka Consumer (메시지 읽기)

import org.apache.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "my-spring-topic", groupId = "my-group")
    public void listen(String message) {
        System.out.println("Received Message: " + message);
    }
}

✅ @KafkaListener를 사용하면 지정된 토픽에서 메시지를 자동으로 읽음.

✅ groupId가 같은 Consumer는 같은 Consumer Group에서 메시지를 공유해서 처리함.

  • @KafkaListener를 사용하면 토픽에 새로운 메시지가 들어올 때마다 자동으로 실행됨.
    • 주기적으로 메시지를 가져오는 간격 polling 간격 조정 가능

6️⃣ 테스트용 Controller 만들기 (REST API로 메시지 전송)

import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/kafka")
public class KafkaController {

    private final KafkaProducer kafkaProducer;

    public KafkaController(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    @PostMapping("/send")
    public String sendMessage(@RequestParam String message) {
        kafkaProducer.sendMessage(message);
        return "Message sent: " + message;
    }
}

✅ 이제 POST /kafka/send?message=HelloKafka API를 호출하면 Kafka에 메시지가 전송됨!


📌 정리

✔ Spring Boot에서 Kafka를 사용하려면 spring-kafka 라이브러리를 추가해야 함.

✔ Kafka의 토픽은 NewTopic을 Bean으로 등록하면 자동 생성 가능.

  • 토픽 하나가 하나의 큐라고 생각하면됨

✔ Producer는 KafkaTemplate을 사용하여 메시지를 전송.

✔ Consumer는 @KafkaListener를 사용하여 메시지를 자동으로 수신.

  • topic 이름과 group_id를 입력으로 메시지를 받음

✔ REST API로 메시지를 전송하고 Consumer에서 확인할 수 있음. 🚀


❓헷갈리는 부분

1. Kafka의 구조

  • Kafka 클러스터 안에는 여러 개의 브로커(Broker)가 있음.
  • 각 브로커(Broker) 안에는 여러 개의 토픽(Topic)이 저장됨.
  • 각 토픽(Topic) 안에는 여러 개의 파티션(Partition)이 있음.
  • 각 파티션(Partition) 안에는 메시지(Record)가 순차적으로 저장됨.

2. 토픽과 파티션의 관계 (토픽 1 : 파티션 다)

  • 파티션은 토픽을 물리적으로 분할하여 여러 브로커에 나누어 저장하는 단위
  • 하나의 토픽은 여러개의 파티션으로 나뉘고, 각 파티션은 개별 브로커에 저장됨
  • 여러 Consumer가 각 파티션을 병렬로 처리하여 데이터 소비 속도를 향상.
  • 파티션은 수평적으로 확장 가능

3. 브로커와 파티션의 관계 (다대다 관계)

  • 하나의 브로커는 여러 개의 파티션을 저장할 수 있음 (1:N)
  • 하나의 파티션은 여러 브로커에 복제본(Replica)으로 저장될 수 있음 (N:M)

4. consumer Group 이란?

  • Consumer Group(컨슈머 그룹)은 Kafka에서 메시지를 읽는 Consumer들의 논리적인 그룹
  • 같은 groupId를 가진 여러 Consumer들이 하나의 그룹으로 동작하며 메시지를 나누어 가져감 (로드 밸런싱)
  • Consumer Group을 사용하면 여러 Consumer가 병렬로 메시지를 처리할 수 있음.

4-1. Consumer Group이 필요한 이유

  1. 하나의 메시지를 여러 Consumer가 나눠서 처리할 수 있음 (로드 밸런싱)
    • 같은 groupId를 가지면 여러 Consumer가 하나의 메시지를 중복되지 않게 가져감.
    • 즉, 병렬로 메시지를 처리할 수 있어서 성능이 향상됨.
  2. Consumer Group이 다르면 메시지를 중복 소비 가능 (Fan-out 가능)
    • 서로 다른 groupId를 가진 Consumer들은 동일한 메시지를 각각 받을 수 있음.
    • 예를 들어, "주문 처리 서비스"와 "로그 저장 서비스"가 동일한 메시지를 개별적으로 받아야 하는 경우 유용함.
  3. Consumer가 중단되면 자동으로 다른 Consumer가 메시지를 가져감 (Fault Tolerance)
    • Consumer가 죽거나 사라지면, Kafka가 자동으로 다른 Consumer에게 메시지를 재할당함.
    • 고가용성을 유지할 수 있음.

개념 설명

Consumer Group 같은 groupId를 가진 Consumer들의 묶음
같은 그룹 ID를 가지면? 메시지를 나누어 처리함 (Load Balancing), 한 메시지는 한 번만 소비됨
다른 그룹 ID를 가지면? 동일한 메시지를 각각 소비함 (Fan-out)

 

4-2. 같은 groupId를 쓰는 경우

  • 여러 개의 Consumer가 메시지를 나누어 빠르게 처리해야 할 때 사용
  • 하나의 메시지는 같은 Consumer Group 내에서 한 번만 처리됨
  • 사용 예시 (병렬 처리)
    • 주문 처리(Order Service): 주문이 많을 때 여러 서버가 주문을 분배받아 처리해야 함.
    • 알고리즘 트레이딩 시스템(Algo Trading): 매수/매도 주문을 빠르게 처리하기 위해 여러 Consumer가 분산 처리해야 함.

 

4-3. 다른 groupId를 쓰는 경우

  • 동일한 메시지를 여러 서비스가 각각 받아야 할 때 사용
  • 하나의 메시지가 여러 Consumer Group에서 각각 처리됨
  • 사용 예시 (여러 서비스에서 같은 데이터를 필요로 할 때)
    • 거래 내역 저장(Transaction Log Service): 모든 주문 데이터를 로그 저장 서비스에서 별도로 저장해야 함.
    • 알림 서비스(Notification Service): 사용자의 주문 체결 내역을 별도의 알림 시스템에서 받아서 처리해야 함.
    • AI 주식 추천 시스템(Stock AI Recommendation): 과거 매매 데이터를 기반으로 투자 패턴을 분석하는 서비스가 필요할 때.