본문 바로가기

Back-end & Server/Kafka

[Kafka] Producer & Consumer

728x90
반응형

 

먼저 Producer와 Consumer에 대해 알아보자.

 

Producer"메시지 생산"해서 Broker의 Topic으로 메시지를 보내는 역할을 하는 어플리케이션 또는 서버이다.

  • 데이터 전송 시 "리더 파티션"을 가지고 있는 Broker와 직접 통신한다.
  • 원하는 Topic의 파티션에 전송만하며 이후 어떤 Consumer에게 전송되는 지는 신경쓰지 않는다.

 

 

Consumer는 Topic의 파티션에 저장되어 있는 "메시지를 소비"하는 역할을 하는 어플리케이션 또는 서버이다.

  • 데이터를 요청할 때 리더 파티션을 가지고 있는 Broker와 통신하여 Topic의 파티션으로부터 데이터를 가져간다.
  • 운영 방법은 2가지가 있다.
  • 운영방법1 - Topic의 특정 파티션만 구독하는 Consumer 운영
  • 운영방법2 - 1개 이상의 Consumer로 이루어진 Consumer 그룹을 운영
  • 어떤 Producer에게서 메시지가 왔는지 관심이 없고, 원하는 Topic의 파티션을 읽어서 필요한 메시지만 받는다.

 

 

Zookeeper & Broker Setup

먼저 docker-compose로 Zookeeper와 Broker를 띄워보자.

# zookeeper & broker setup

version: '1'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    ports: 
      - 2182:2182
    environment:
      ZOOKEEPER_SERVER_ID: 1 # zookeeper 클러스터에서 해당 주키퍼를 식별할 ID
      ZOOKEEPER_CLIENT_PORT: 2182 # zookeeper client의 포트 지정
  
  broker:
    image: confluentinc/cp-kafka:7.3.0
    container_name: broker
    depends_on: # zookeeper가 먼저 실행된 후 다음에 브로커가 실행되도록 설정
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1 # Broker ID
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2182 # Broker가 Zookeepr에 연결하기 위한 주소
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      # 내부와 외부에서 접속하기 위한 리스너 설정
      # 일반적으로 internal로 PLAINTEXT://broker:29092로 설정
      # external로 PLAINTEXT_HOST://localhost:9092로 설정
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      # 보안을 위한 protocol mapping, key:value로 mapping
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      # 컨테이너 내부에서 사용할 리스너 이름 지정 -> 여기서는 internal로 설정한 PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 # Topic을 분산하여 저장할 Replication Factor
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 
      # kafka cluster에서 초기 rebalancing 할 때 Consumer들이 Consumer group에 조인할 때 대기하는 시간

# docker-compose -p part7-naive -f naive-docker-compose.yaml up -d
# -p : project name
# -f : docker-compose file

 

 

 

Producer & Consumer Setup

먼저 Topic을 생성한다.

docker compose -p part7-naive exec broker kafka-topics --create --topic topic-test --bootstrap-server broker:29092 --partitions 1 --replication-factor 1

 

  • broker :
    • 생성된 브로커 서비스의 이름

 

  • kafka-topics :
    • 토픽에 대한 명령을 실행

 

  • --create :
    • 토픽을 생성

 

  • --topic :
    • 생성할 토픽의 이름을 지정
    • 여기서는 topic-test 라는 이름으로 생성

 

  • --bootstrap-server :
    • 브로커 서비스에 대한 호스트 이름과 포트를 지정.
    • 여기서는 앞서 Docker Compose 로 띄웠던 브로커의 환경 변수를 참고하여 broker:29092 로 설정

 

  • --partition :
    • 토픽 내에 파티션 개수를 설정
    • 여기에서는 1 로 설정

 

  • --replication-factor :
    • Replication Factor 를 지정
    • 여기에서는 1 로 설정

 

 

docker compose -p part7-naive exec broker kafka-topics --describe --topic topic-test --bootstrap-server broker:29092

 

--describe : 토픽 생성 확인

topic-test 토픽이 생성된 것을 확인할 수 있음

 

이제 토픽이 생성되었으니 토픽을 사용할 Consumer를 만들어보자.

 

먼저 Broker 컨테이너로 접속한다.

# terminal 1
docker compose -p part7-naive exec broker /bin/bash

 

그 후 kafka-console-consumer를 이용하여 topic-test 토픽을 subscribe 한다.

kafka-console-consumer --topic topic-test --bootstrap-server broker:29092

 

이렇게 수신을 대기하고 있는 상태가 된다.

 

다음은 Producer를 만들어 메시지를 보낼 준비를 한다.

Broker Container 접속

# terminal 1
docker compose -p part7-naive exec broker /bin/bash

 

kafka-console-producer를 이용하여 topic-test 토픽에 접근하여 publish 할 준비를 한다.

다음 명령어를 통해 publish 할 수 있는 상태로 전환

kafka-console-producer --topic topic-test --broker-list broker:29092

 

 

테스트

 

모든 과정이 종료되었으면 zookeeper와 broker를 종료한다.

docker compose -p part7-naive down -v

728x90
반응형

'Back-end & Server > Kafka' 카테고리의 다른 글

[Kafka] Sink Connector  (0) 2024.03.20
[Kafka] Source Connector  (0) 2024.03.20
[Kafka] Kafka System  (0) 2024.03.01
[Kafka] Producer와 Consumer의 한계  (0) 2024.03.01
[Kafka] 개요  (0) 2024.02.29