앞선 포스트들의 이론을 가지고 Kafka System을 구축해보겠다.
먼저 Docker compose로 zookeeper, Broker, Schema Registry, Connect를 생성한다.
Zookeeper : 브로커 서버의 상태 감지를 위해 사용되는 주키퍼 서버
Broker : Source Connector에서 데이터를 받아 Topic에 저장, Sink Connector로 데이터를 넘겨줄 브로커 서버(이 포스트에서는 단일 브로커 사용)
Schema Registry : 메시지의 schema를 저장하기 위한 Schema Registry 서버
Connect : Connector를 띄우기 위한 Connect 서버
Zookeeper와 Broker를 띄우는 코드는 이전 포스트에서 작성한 코드이다.
[Kafka] Producer & Consumer
먼저 Producer와 Consumer에 대해 알아보자. Producer는 "메시지 생산"해서 Broker의 Topic으로 메시지를 보내는 역할을 하는 어플리케이션 또는 서버이다. 데이터 전송 시 "리더 파티션"을 가지고 있는 Broke
pupbani.tistory.com
Schema Registry를 띄우는 코드는 다음과 같다.
schema-registry:
image: confluentinc/cp-schema-registry:7.3.0 # schema-registry 이미지
container_name: schema-registry # 컨테이너 이름
depends_on: # broker 실행 후 실행
- broker
ports: # 포트
- 8081:8081
environment: # 환경 변수 설정
SCHEMA_REGISTRY_HOST_NAME: schema-registry # schema registry의 호스트 이름
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:29092 # Boostrap으로 띄워진 브로커 서버 지정
SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081 # 외부에서 접속할 리스너 설정
다음으로 Connect를 생성하는 코드 작성
Connect는 이미지를 build하기 위해 Dockerfile이 필요.
FROM confluentinc/cp-kafka-connect:7.3.0
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:1.5.5 &&\
confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.2.2 &&\
confluent-hub install --no-prompt confluentinc/kafka-connect-json-schema-converter:7.3.0
FROM
- Base 이미지로 confluentinc/cp-kafka-connect:7.3.0 사용
ENV CONNECT_PLUGIN_PATH
- Connect에서는 플러그인의 path를 설정
- /usr/share/java , /usr/share/confluent-hub-components 두개의 path를 플러그인 path로 설정
RUN
- JDBC Connector를 사용할 것이며, PostgreSQL DB접근이 가능한 Connector를 설치(1~2줄)
- value schema의 Converter는 Json Schema Converter를 사용(3줄)
그리고 이 Dockerfile을 이용한 docker compose 코드는 다음과 같다.
connect:
build: # Dockerfile을 build하기 위한 경로
context: . # 경로
dockerfile: connect.Dockerfile # 파일명
container_name: connect # 컨테이너 이름
depends_on: # broker, schema-registry 컨테이너가 실행되고 실행
- broker
- schema-registry
ports: # 포트
- 8083:8083
environment: # 환경변수
CONNECT_BOOTSTRAP_SERVERS: broker:29092 # Broker 서버
# Connect에서는 REST API 대한 요청에 대한 처리와 Connector의 등록, 설정, 시작, 종료 등의 처리를 담당하는 Worker 존재
CONNECT_REST_ADVERTISED_HOST_NAME: connect # Worker 간의 연결이 가능하도록 호스트 이름 지정
CONNECT_GROUP_ID: docker-connect-group # Connect의 Worker 프로세스 그룹(또는 클러스터)를 구성하는 데 사용하는 고유한 ID 지정
# 단! Consumer 그룹 ID와 충돌하면 안됨....
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs # Connector의 환경 설정을 저장할 브로커의 토픽 이름
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 # 환경 설정을 저장하는 토픽을 생성할 때 사용할 Replication Factor의 수
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets # Connector의 offset을 저장할 브로커의 토픽 이름
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 # Offset을 저장하는 토픽을 생성할 때 사용할 Replication Factor의 수
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status # Connector와 tast의 상태를 저장할 브로커의 토픽 이름을 설정
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 # 상태를 저장하는 토픽을 생성할 때 사용할 Replication Factor의 수
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter # Key에 대한 Converter 설정(여기서는 StringCoverter)
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter # Value에 대한 Converter 설정(여기서는 JsonConverter)
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 # Value Converter에 대한 Schema Registry URL 설정
# Schema Registry의 서비스 이름과 포트를 기입
실행
docker compose -p part7-kafka -f kafka-docker-compose.yaml up -d
'Back-end & Server > Kafka' 카테고리의 다른 글
[Kafka] Sink Connector (0) | 2024.03.20 |
---|---|
[Kafka] Source Connector (0) | 2024.03.20 |
[Kafka] Producer와 Consumer의 한계 (0) | 2024.03.01 |
[Kafka] Producer & Consumer (0) | 2024.02.29 |
[Kafka] 개요 (0) | 2024.02.29 |