본문 바로가기

Back-end & Server/Kafka

[Kafka] Kafka System

728x90
반응형

 

앞선 포스트들의 이론을 가지고 Kafka System을 구축해보겠다.

 

먼저 Docker compose로 zookeeper, Broker, Schema Registry, Connect를 생성한다.

 

Zookeeper : 브로커 서버의 상태 감지를 위해 사용되는 주키퍼 서버

Broker : Source Connector에서 데이터를 받아 Topic에 저장, Sink Connector로 데이터를 넘겨줄 브로커 서버(이 포스트에서는 단일 브로커 사용)

Schema Registry : 메시지의 schema를 저장하기 위한 Schema Registry 서버

Connect : Connector를 띄우기 위한 Connect 서버

 

Zookeeper와 Broker를 띄우는 코드는 이전 포스트에서 작성한 코드이다.

 

[Kafka] Producer & Consumer

먼저 Producer와 Consumer에 대해 알아보자. Producer는 "메시지 생산"해서 Broker의 Topic으로 메시지를 보내는 역할을 하는 어플리케이션 또는 서버이다. 데이터 전송 시 "리더 파티션"을 가지고 있는 Broke

pupbani.tistory.com

 

Schema Registry를 띄우는 코드는 다음과 같다.

schema-registry:
    image: confluentinc/cp-schema-registry:7.3.0 # schema-registry 이미지
    container_name: schema-registry # 컨테이너 이름
    depends_on: # broker 실행 후 실행
      - broker
    ports: # 포트
      - 8081:8081
    environment: # 환경 변수 설정
      SCHEMA_REGISTRY_HOST_NAME: schema-registry # schema registry의 호스트 이름
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:29092 # Boostrap으로 띄워진 브로커 서버 지정
      SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081 # 외부에서 접속할 리스너 설정

 

다음으로 Connect를 생성하는 코드 작성

Connect는 이미지를 build하기 위해 Dockerfile이 필요.

FROM confluentinc/cp-kafka-connect:7.3.0

ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"

RUN confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:1.5.5 &&\
    confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.2.2 &&\
    confluent-hub install --no-prompt confluentinc/kafka-connect-json-schema-converter:7.3.0

 

FROM

  • Base 이미지로 confluentinc/cp-kafka-connect:7.3.0 사용

 

ENV CONNECT_PLUGIN_PATH

  • Connect에서는 플러그인의 path를 설정
  • /usr/share/java , /usr/share/confluent-hub-components 두개의 path를 플러그인 path로 설정

 

RUN

  • JDBC Connector를 사용할 것이며, PostgreSQL DB접근이 가능한 Connector를 설치(1~2줄)
  • value schema의 Converter는 Json Schema Converter를 사용(3줄)

 

 

그리고 이 Dockerfile을 이용한 docker compose 코드는 다음과 같다.

connect:
    build: # Dockerfile을 build하기 위한 경로
      context: . # 경로
      dockerfile: connect.Dockerfile # 파일명
    container_name: connect # 컨테이너 이름
    depends_on: # broker, schema-registry 컨테이너가 실행되고 실행
      - broker
      - schema-registry
    ports: # 포트
      - 8083:8083
    environment: # 환경변수
      CONNECT_BOOTSTRAP_SERVERS: broker:29092 # Broker 서버
      # Connect에서는 REST API 대한 요청에 대한 처리와 Connector의 등록, 설정, 시작, 종료 등의 처리를 담당하는 Worker 존재
      CONNECT_REST_ADVERTISED_HOST_NAME: connect # Worker 간의 연결이 가능하도록 호스트 이름 지정
      CONNECT_GROUP_ID: docker-connect-group # Connect의 Worker 프로세스 그룹(또는 클러스터)를 구성하는 데 사용하는 고유한 ID 지정
      # 단! Consumer 그룹 ID와 충돌하면 안됨....
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs # Connector의 환경 설정을 저장할 브로커의 토픽 이름
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 # 환경 설정을 저장하는 토픽을 생성할 때 사용할 Replication Factor의 수
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets # Connector의 offset을 저장할 브로커의 토픽 이름
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 # Offset을 저장하는 토픽을 생성할 때 사용할 Replication Factor의 수
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status # Connector와 tast의 상태를 저장할 브로커의 토픽 이름을 설정
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 # 상태를 저장하는 토픽을 생성할 때 사용할 Replication Factor의 수
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter # Key에 대한 Converter 설정(여기서는 StringCoverter)
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter # Value에 대한 Converter 설정(여기서는 JsonConverter)
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 # Value Converter에 대한 Schema Registry URL 설정
      # Schema Registry의 서비스 이름과 포트를 기입

 

실행

docker compose -p part7-kafka -f kafka-docker-compose.yaml up -d

 

728x90
반응형

'Back-end & Server > Kafka' 카테고리의 다른 글

[Kafka] Sink Connector  (0) 2024.03.20
[Kafka] Source Connector  (0) 2024.03.20
[Kafka] Producer와 Consumer의 한계  (0) 2024.03.01
[Kafka] Producer & Consumer  (0) 2024.02.29
[Kafka] 개요  (0) 2024.02.29