Kafka 찍먹해보기! : Kafka통한 서비스 경계를 넘는 이벤트 파이프라인을 구축기

2025. 9. 5. 01:11·Loopers/테크니컬 라이팅
더보기

한줄요약 :
 Spring Boot로 만든 이커머스 API가 단일 애플리케이션의 한계에 부딪혔을 때, Kafka를 도입해서 서비스를
  분리하고 확장 가능한 이벤트 파이프라인을 만든 과정을 단계별로 정리했습니다.

Kafka가 뭐하는 놈이지

 Kafka는 이름만 들어봤는데 한번 어떤방식으로 쓰는지 익히고자 들이박아봤다.

@Service
  public class LikeFacade {

      @Transactional
      public void like(Long userId, Long productId, LikeType likeType) {

          // 1. 핵심 비즈니스: 좋아요 저장 (UPSERT로 중복 처리)
          int affected = likeRepository.upsertLike(userId, productId, likeType);

          if (affected == 1) { // INSERT된 경우에만
              LikeAddedEvent event = LikeAddedEvent.of(userId, productId, likeType);

              // 2. 동기 처리: 같은 애플리케이션 내에서 집계 업데이트
              eventPublisher.publishEvent(event);

              ...
          }
      }
  }
  
  
  @Component
  public class LikeEventHandler {

      @EventListener
      @Async  // 비동기지만 같은 JVM 내에서
      public void handleLikeAdded(LikeAddedEvent event) {
          // LikeProcessingService로 위임
          likeProcessingService.processLikeAdded(event);
      }
  }


  @Service
  public class LikeProcessingService {

      @Transactional(propagation = Propagation.REQUIRES_NEW)
      public void processLikeAdded(LikeAddedEvent event) {
          // 1. Product 테이블의 likeCount 증가 (실시간 조회용)
          productService.increaseLikeCount(productId);

          // 2. Redis 캐시 삭제 (상품 정보가 바뀌었으니까)
          productCacheRepository.evictProductDetail(productId);

          // 앞으로 추가될 처리들...
          // 3. 감사 로그 저장?
          // 4. 실시간 알림 발송?
          // 5. 추천 시스템 업데이트?
      }
  }

 

기존 좋아요 처리 관련 코드는 위와같은 형태도 짜져 있었는데 하나의 좋아요 요청이 이렇게 많은 레이어를 거쳐서 

여러 처리를 한다. 그리고 이 모든 처리가 어느 정도 완료될 때까지 사용자는 기다려야 한다.
그러던 중 멘토분이 "서비스가 커지면 서비스 경계를 분리해야 한다"고 Kafka를 추천해주셨다.

 

멘토분의 도움과 내 나름대로의 생각을 정리하여 바꾸고자 하는 목표를 세웠다.

 

 [현재]
  commerce-api 하나가 모든 일 처리
  ├── 좋아요 저장
  ├── likeCount 증가
  ├── 캐시 삭제
  └── (앞으로 추가될 것들...)

  [목표]
  commerce-api (핵심 비즈니스만)
      ├── 좋아요 저장
      ├── likeCount 증가 (실시간 조회용)
      └── Kafka로 이벤트 발행
           ↓
  commerce-collector (후속 처리 전담)
      ├── 캐시 삭제
      ├── 감사 로그 저장
      ├── 집계 테이블 업데이트
      └── (미래의 확장 기능들...)

  왜 이렇게 분리하고 싶었는가?

  1. 성능: 사용자는 좋아요만 빠르게 처리되면 된다. 나머지는 백그라운드에서
  2. 확장성: 새로운 후속 처리가 생겨도 메인 API(commerce-api)는 건드리지 않아도 된다
  3. 장애 격리: 캐시나 로그 시스템에 문제가 생겨도 좋아요 기능은 정상 동작해야 한다
  4. 독립 배포: 각 서비스를 독립적으로 배포하고 확장하고 싶다

 

이제 계획을 세웠으니 Kafka가 뭔지 알아보고 진행하자.


Kafka란?

 Kafka ≠ 일반적인 메시지 큐


  처음에는 "Kafka = 고급 버전의 메시지 큐"라고 생각했는데, 공부해보니 완전히 달랐다.

  일반 메시지 큐 (RabbitMQ 등):
  - 메시지를 전달하고 삭제
  - 받는 사람이 없으면 메시지 유실
  - "편지를 전달하고 편지는 버리는" 우체부 같은 느낌

  Kafka:
  - 메시지를 디스크에 로그로 저장
  - 며칠이 지나도 같은 메시지를 다시 읽을 수 있다
  - "도서관에 책을 보관해두고, 필요할 때마다 빌려보는" 느낌

  이 차이를 이해하는 순간 "아, 그래서 감사 로그나 재처리에 좋다는 거구나!" 싶었다.

1. Kafka의 핵심 개념


  1. Topic (토픽) - "메시지가 모이는 주제"

  Topic: "catalog-events"
  = "상품 관련 이벤트들이 모이는 곳"

  좋아요 추가/삭제, 재고 변경, 조회수 증가 등등
  모든 상품 관련 이벤트가 여기로 온다

  2. Message (메시지) - "실제 전송되는 데이터"

  {
    "eventType": "LikeAddedEvent",
    "eventId": "uuid-1234-5678",
    "timestamp": "2024-01-01T12:00:00Z",
    "payload": {
      "userId": 123,
      "productId": 456,
      "likeType": "PRODUCT"
    }
  }
  이런 JSON 형태의 데이터가 메시지다. 실제로는 EventEnvelope로 감싸서 보낸다.

  3. Partition (파티션) - "토픽을 나눈 조각들"

  Topic: catalog-events
  ├── Partition 0: productId가 1, 4, 7, 10... 인 메시지들
  ├── Partition 1: productId가 2, 5, 8, 11... 인 메시지들
  └── Partition 2: productId가 3, 6, 9, 12... 인 메시지들

  핵심: 같은 파티션 내에서만 순서가 보장된다!

  4. Message Key (메시지 키) - "어느 파티션으로 갈지 결정하는 값"

  // 실제 프로젝트 코드
  kafkaEventPublisher.publish("catalog-events", productId.toString(), event);
                                                                        ↑topic        ↑key               ↑event object

  // KafkaEventPublisher 내부에서는
  kafkaTemplate.send(topic, partitionKey, envelope);
                                       ↑topic ↑key            ↑value

  productId를 키로 사용하면 같은 상품의 모든 이벤트가 같은 파티션으로 가서 순서가 보장된다.

  5. Offset (오프셋) - "메시지의 위치 번호"

  Partition 0:
  [0] LikeAddedEvent (product=1, user=A)
  [1] LikeRemovedEvent (product=1, user=A)
  [2] LikeAddedEvent (product=4, user=B)
  [3] StockAdjustedEvent (product=1, stock=10)
   ↑ 이 숫자가 offset
  Consumer는 자신이 어느 offset까지 읽었는지 기억한다.

  6. Producer (프로듀서) - "메시지를 보내는 놈"

// 실제 프로젝트의 KafkaEventPublisher
  @Component
  public class KafkaEventPublisher {

      private final KafkaTemplate<Object, Object> kafkaTemplate;

      public void publish(String topic, String partitionKey, Object event) {
          // 이벤트를 EventEnvelope로 감싸기
          EventEnvelope envelope = EventEnvelope.wrap(event);

          // Kafka로 전송 (.get()으로 동기 처리 = At Least Once 보장)
          kafkaTemplate.send(topic, partitionKey, envelope).get();
      }
  }



  7. Consumer (컨슈머) - "메시지를 받아서 처리하는 놈"

// commerce-collector의 Consumer
  @Component
  public class CatalogEventConsumer {

      @KafkaListener(topics = "catalog-events", groupId = "commerce-collector")
      public void handleCatalogEvent(ConsumerRecord<String, String> record) {
          // 메시지를 받아서 처리
          String productId = record.key();        // 파티션 키
          String message = record.value();        // 메시지 내용

          // 후속 처리 로직...
      }
  }



  8. Consumer Group (컨슈머 그룹) - "같은 일을 하는 컨슈머들의 팀"

  catalog-events Topic (3개 파티션)
  ├── Partition 0 → Consumer A (commerce-collector 그룹)
  ├── Partition 1 → Consumer B (commerce-collector 그룹)
  └── Partition 2 → Consumer C (commerce-collector 그룹)

  각 파티션은 그룹 내 하나의 컨슈머만 담당한다!

  9. ACK (확인응답) - "메시지 잘 받았다는 신호"

  # 실제 프로젝트 설정
  spring:
    kafka:
      producer:
        acks: all  # 모든 replica에 저장된 후 "성공" 응답
  - acks=0: 응답 안 기다림 (빠르지만 유실 가능)
  - acks=1: 리더만 저장하면 응답 (보통)
  - acks=all: 모든 복사본에 저장한 후 응답 (안전) 


실제 적용해 보

위 개념들을 이용하여 기존 kafka를 이용한 이벤트를 발행해 보자!

 

1. Kafka Producer, Consumer 설정

Producer 설정 (commerce-api)

  spring:
    kafka:
      bootstrap-servers: localhost:9092
      producer:
        # 모든 replica에 저장된 후 성공 응답 (안전성 최우선)
        acks: all

        # Producer 레벨 중복 제거 (네트워크 재전송 시)
        enable-idempotence: true

        # 전송 실패 시 재시도 횟수
        retries: 3

        # 재시도 간격 (ms)
        retry-backoff-ms: 1000

        # 직렬화 방식 (Spring이 JSON으로 자동 변환)
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

--------------------------------------------------------------------------------------------

  Consumer 설정 (commerce-collector)

  spring:
    kafka:
      bootstrap-servers: localhost:9092
      consumer:
        # 컨슈머 그룹 이름 (같은 그룹은 파티션을 분담해서 처리)
        group-id: commerce-collector

        # 처음 시작할 때 어디서부터? (earliest=처음부터, latest=최신부터)
        auto-offset-reset: earliest

        # offset 커밋 방식 (false=수동, true=자동)
        enable-auto-commit: false

        # 한 번에 가져올 메시지 개수 (배치 처리 최적화)
        max-poll-records: 100

        # 컨슈머가 죽었는지 판단하는 시간 (ms)
        session-timeout-ms: 30000

        # 역직렬화 방식
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

 

2. Publisher 로직

@Service
  public class LikeFacade {

    // ★★★ kfaka로 이벤트 전송하기 위한 의존성 주입 ★★★
      private final KafkaEventPublisher kafkaEventPublisher;

      @Transactional
      public void like(Long userId, Long productId, LikeType likeType) {
         
          int affected = likeRepository.upsertLike(userId, productId, likeType);

          if (affected == 1) {
              LikeAddedEvent event = LikeAddedEvent.of(userId, productId, likeType);

              // 2. 같은 애플리케이션 내 처리 (기존)
              eventPublisher.publishEvent(event);

              // 3. ★★★ 외부 시스템(kfaka)으로 이벤트 전송 ★★★
              kafkaEventPublisher.publish("catalog-events", productId.toString(), event);
          }
      }
  }

 

* KafkaEventPublisher 이벤트 전송 시 내부 동작

@Component
  public class KafkaEventPublisher {

      private final KafkaTemplate<Object, Object> kafkaTemplate;

      public void publish(String topic, String partitionKey, Object event) {
          try {
              // 1. 이벤트를 표준화된 형태로 감싸기
              EventEnvelope envelope = EventEnvelope.wrap(event);
              // {
              //   "eventType": "LikeAddedEvent",
              //   "eventId": "uuid-generated",
              //   "timestamp": "2024-01-01T12:00:00Z",
              //   "payload": {
              //     "userId": 123,
              //     "productId": 456,
              //     "likeType": "PRODUCT"
              //   }
              // }

              // 2. Kafka로 전송 (동기 - At Least Once 보장)
              kafkaTemplate.send(topic, partitionKey, envelope).get();
              //                   ↑            ↑          ↑
              //             "catalog-events"  productId  EventEnvelope

              log.info("이벤트 발행 완료 - topic: {}, eventType: {}, partitionKey: {}",
                      topic, envelope.eventType(), partitionKey);

          } catch (Exception e) {
              log.error("이벤트 발행 실패 - topic: {}, partitionKey: {}, error: {}",
                      topic, partitionKey, e.getMessage(), e);
              throw new RuntimeException("Kafka 이벤트 발행 실패", e);
          }
      }
  }

* kafka 전송 시 .get() 사용 이유 
    - kafkaTemplate.send()는 기본적으로 비동기다
    - .get()을 붙이면 동기가 되어 "확실히 전송될 때까지 기다린다"
    - 이렇게 해야 At Least Once 전달이 보장된다

 

 

3. Consumer 로직

- kafka를 통하여 완전히 별도의 commerce-collector(가칭) 애플리케이션에서 이벤트를 받아 처리한다

@Component
  @RequiredArgsConstructor
  @Slf4j
  public class CatalogEventConsumer {

      private final LikeEventHandler likeEventHandler;
      private final ObjectMapper objectMapper;

      @KafkaListener(
          topics = "catalog-events",              // commerce-api에서 발행하는 토픽
          groupId = "commerce-collector",         // 컨슈머 그룹 이름
          containerFactory = "kafkaListenerContainerFactory"
      )
      public void consume(ConsumerRecord<String, String> record) {
          try {
              // 1. 메시지 기본 정보 추출
              String messageKey = record.key();        // productId (파티션 키)
              String messageValue = record.value();    // JSON 문자열
              long offset = record.offset();           // 메시지 위치
              int partition = record.partition();      // 어느 파티션에서 왔는가?

              log.info("카탈로그 이벤트 수신 - partition={}, offset={}, key={}",
                      partition, offset, messageKey);

              // 2. JSON을 EventEnvelope 객체로 변환
              EventEnvelope envelope = objectMapper.readValue(messageValue, EventEnvelope.class);
              String eventType = envelope.getEventType();

              // 3. 이벤트 타입별로 처리 위임
              switch (eventType) {
                  case "LikeAddedEvent":
                  case "LikeRemovedEvent":
                      // 좋아요 관련 처리 (캐시, 집계, 로그 등)
                      likeEventHandler.handle(eventType, messageValue, messageKey);
                      break;
                  default:
                      log.warn("알 수 없는 이벤트 타입: {}", eventType);
              }

          } catch (Exception e) {
              log.error("이벤트 처리 실패 - key={}, value={}, error={}",
                      record.key(), record.value(), e.getMessage(), e);
          }
      }
  }
@Component
  @RequiredArgsConstructor
  public class LikeEventHandler {

      private final EventHandledService eventHandledService;  // 중복 체크
      private final ProductMetricsService metricsService;     // 집계 테이블
      private final CacheEvictService cacheEvictService;     // 캐시 삭제
      private final AuditLogService auditLogService;         // 감사 로그
      private final ObjectMapper objectMapper;

      public void handle(String eventType, String payloadJson, String messageKey) {
          try {
              // 1. JSON에서 eventId 추출 (중복 체크용)
              JsonNode jsonNode = objectMapper.readTree(payloadJson);
              String eventId = jsonNode.get("eventId").asText();

              // 2. 이미 처리한 이벤트인지 확인 (멱등성 보장)
              if (eventHandledService.isAlreadyHandled(eventId)) {
                  log.info("중복 이벤트 무시 - eventId={}", eventId);
                  return; // 이미 처리했으면 바로 종료
              }

              // 3. 이벤트 파싱
              EventEnvelope envelope = objectMapper.readValue(payloadJson, EventEnvelope.class);
              Map<String, Object> payload = envelope.getPayload();

              Long userId = Long.valueOf(payload.get("userId").toString());
              Long productId = Long.valueOf(payload.get("productId").toString());

              // 4. 실제 비즈니스 로직 실행 (commerce-api와 다른 처리)
              if ("LikeAddedEvent".equals(eventType)) {
                  // 4-1. 집계 테이블 업데이트 (분석용)
                  metricsService.incrementProductLikeCount(productId);

                  // 4-2. 다른 캐시 삭제 (상품 랭킹, 추천 등)
                  cacheEvictService.evictProductRanking();

                  // 4-3. 감사 로그 저장
                  auditLogService.saveUserAction("LIKE_ADDED", userId, productId);

                  log.info("좋아요 추가 후속 처리 완료 - userId={}, productId={}", userId, productId);

              } else if ("LikeRemovedEvent".equals(eventType)) {
                  // 좋아요 제거 후속 처리
                  metricsService.decrementProductLikeCount(productId);
                  cacheEvictService.evictProductRanking();
                  auditLogService.saveUserAction("LIKE_REMOVED", userId, productId);

                  log.info("좋아요 제거 후속 처리 완료 - userId={}, productId={}", userId, productId);
              }

              // 5. 처리 완료 기록 (중복 방지용) - 가장 중요!
              eventHandledService.markAsHandled(eventId, eventType, messageKey);

          } catch (Exception e) {
              log.error("좋아요 이벤트 처리 실패 - eventType={}, payload={}, error={}",
                      eventType, payloadJson, e.getMessage(), e);
              throw e; // 재시도나 DLQ 처리를 위해 예외를 다시 던짐
          }
      }
  }

* 멱등성 처리

Consumer에서 중요하게 처리해야 하는 점은 멱등성 처리인것 같다.

Kafka는 "At Least Once" 전달을 보장하여 메시지가 최소 한 번은 전달되지만, 중복될 수 있는 문제가 있다.

예를 들면 

    1. 사용자가 좋아요 클릭
    2. LikeAddedEvent가 Kafka로 발행
    3. commerce-collector가 받아서 처리
    4. 네트워크 문제로 ACK 응답이 늦음
    5. Producer가 "메시지가 안 갔나보다" 생각하고 재전송
    6. commerce-collector가 같은 메시지를 또 받음!
     => 결과: 좋아요 1번 눌렀는데 집계가 2번 증가

이러한 경우다 

 

그래서 멱등성을 보장하기 위하여 멱등성 보장용 테이블을 따로 만들고 처리 시 해당 테이블에 저장 후 

다음 이벤트를 처리할 때 조회하여 중복여부를 확인할 수 있게 처리했다.

멱등성 보장 테이블:
  -- commerce-collector 데이터베이스에
  CREATE TABLE event_handled (
      event_id VARCHAR(255) PRIMARY KEY,        -- EventEnvelope의 eventId
      event_type VARCHAR(100) NOT NULL,         -- "LikeAddedEvent" 등
      message_key VARCHAR(255),                 -- productId 등
      handled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  );
  
  
  -----------------------------------------------------------------------------------
  
  @Service
  @RequiredArgsConstructor
  public class EventHandledService {

      private final EventHandledRepository eventHandledRepository;

      // 이미 처리한 이벤트인지 확인
      public boolean isAlreadyHandled(String eventId) {
          return eventHandledRepository.existsByEventId(eventId);
      }

      // 처리 완료 기록
      @Transactional
      public void markAsHandled(String eventId, String eventType, String messageKey) {
          EventHandled record = EventHandled.builder()
              .eventId(eventId)          // UUID - 절대 중복되지 않음
              .eventType(eventType)      // "LikeAddedEvent"
              .messageKey(messageKey)    // productId
              .handledAt(LocalDateTime.now())
              .build();

          eventHandledRepository.save(record);
      }
  }

 

 

 

위와 같은 설정을 전체적인 흐름으로 봤을 땐 아래와 같을 것이다.

 

좋아요 처리 플로우:

  1. 사용자가 상품에 좋아요 클릭
     ↓
  2. commerce-api (Producer 역할)
     ├── LikeV1Controller.like()
     ├── LikeFacade.like()
     │   ├── likeRepository.upsertLike() (핵심: Like 테이블)
     │   ├── eventPublisher.publishEvent() (동기: Product.likeCount 업데이트)
     │   └── kafkaEventPublisher.publish() (비동기: 외부 시스템용)
     └── 사용자에게 즉시 200 응답! (50ms 이내)
     ↓
  3. Kafka Broker
     ├── Topic: catalog-events
     ├── Partition: productId % 파티션수
     ├── Message: EventEnvelope(LikeAddedEvent)
     └── 안전하게 디스크에 저장
     ↓
  4. commerce-collector (Consumer 역할)
     ├── CatalogEventConsumer.consume()
     ├── LikeEventHandler.handle()
     │   ├── eventHandledService.isAlreadyHandled() (멱등성(중복) 체크)
     │   ├── metricsService.incrementProductLikeCount() (분석용 집계)
     │   ├── cacheEvictService.evictProductRanking() (캐시 삭제)
     │   ├── auditLogService.saveUserAction() (감사 로그)
     │   └── eventHandledService.markAsHandled() (처리 완료 기록)
     └── 실패해도 메인 API에는 영향 없음


배우고 느낀 점

 Kafka 설정이 정말 많았다. 

Producer만 해도 acks, retries, 등등... 각각의 설정에 대한 개념을 이해하는 데 시간이 걸렸다. 아직도 명확하게 이해하진 못한것 같다. 계속 시도해보고 모르고 까먹을 때마다 정리하면서 익혀가는 수밖에 없다..

하지만 새로운 경험을 하면서 서비스 분리가 필요할 때, 비동기 처리로 사용자 경험 개선이 필요할 때, 감사 로그나 이벤트 히스토리가 중요할 때 등 언제 kafka를 쓰면 좋을지 조금의 실마리는 잡은것 같고 기본적인 이벤트 파이프라인은 구축할 수 있게 되었고, 서비스 분리의 기초를 다졌다는 점에서 의미 있는 경험이었다고 생각한다.

 

추후 기본적인  개념들과 DLQ(실패한 메시지 처리 방안) 에 대한것도 시간이 나면 정리하면서 배워봐야겠다.

 

'Loopers > 테크니컬 라이팅' 카테고리의 다른 글

Spring 이벤트를 처음 써보며 깨달은 것들  (1) 2025.08.28
DB 조회 및 정렬 성능 개선하기(비정규화, 인덱스, 캐시(Redis))  (1) 2025.08.15
동시성 테스트(Flaky Test 삽질기)  (4) 2025.08.09
다들 이해하지?! 설계의 청사진 시퀀스 다이어그램  (0) 2025.07.25
TDD, 실패하는 테스트가 알려준 것들: 아래에서 내려다보는 TDD  (0) 2025.07.18
'Loopers/테크니컬 라이팅' 카테고리의 다른 글
  • Spring 이벤트를 처음 써보며 깨달은 것들
  • DB 조회 및 정렬 성능 개선하기(비정규화, 인덱스, 캐시(Redis))
  • 동시성 테스트(Flaky Test 삽질기)
  • 다들 이해하지?! 설계의 청사진 시퀀스 다이어그램
KBroJ9210
KBroJ9210
  • KBroJ9210
    개발일기
    KBroJ9210
  • 전체
    오늘
    어제
    • 분류 전체보기 (25)
      • 토스 러너스하이 2기 (11)
        • 회고 (1)
        • 기술 (10)
      • Loopers (9)
        • 테크니컬 라이팅 (6)
        • WIL(What I Learned) (3)
      • 두리두리넋두리 (5)
        • 개발일기 (5)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.5
KBroJ9210
Kafka 찍먹해보기! : Kafka통한 서비스 경계를 넘는 이벤트 파이프라인을 구축기
상단으로
목차

    티스토리툴바