728x90
반응형

생성
Source Connector는 Connect에 API 호출을 통해 생성한다.
ex. source_connector.json
{
"name": "postgres-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres-server:5432/mydatabase",
"connection.user": "myuser",
"connection.password": "mypassword",
"table.whitelist": "iris_data",
"topic.prefix": "postgres-source-",
"topic.creation.default.partitions": 1,
"topic.creation.default.replication.factor": 1,
"mode": "incrementing",
"incrementing.column.name": "id",
"tasks.max": 2,
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field": "timestamp",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.S",
"transforms.TimestampConverter.target.type": "string"
}
}
name : Connector의 이름 정의
config :
- connector.class : Connector를 생성하기 위한 class 설정
- connection.url : Source DB에 접근하기 위한 주소 설정
- connection.user : Source DB에 접속하기 위한 유저 이름을 설정
- connection.password : Source DB에 접속하기 위한 유저 비밀번호
- table.whitelist : 데이터를 가져올 테이블의 목록을 설정(복수 개의 테이블의 경우 콤마(,)를 통해 작성)
- topic.prefix : 토픽 생성 시 이름 앞에 붙일 prefix를 설정, prefix와 테이블 이름이 최종 토픽의 이름
- topic.creation.default.partitions : 토픽 자동 생성을 위해 반드시 설정, 토픽 자동 생성 시 파티션 수를 설정
- topic.creation.default.replication.factor : 토픽 자동 생성을 위해 반드시 설정, Replication Factor의 수 설정
- mode : 테이블 변경이 발생했을 때 어떤 방식으로 데이터를 가져올 지 설정(bulk, timestamp,incrementing,timestamp+incrementing)
- bulk : event가 발생한 테이블의 내용을 모두 가져옴.
- timestamp : timestamp column을 통해 들어온 row를 신규로 판단, 해당 데이터만 가져옴.
- incrementing : incrementing column을 통해 들어온 row를 신규로 판단, 해당 데이터만 가져옴.(삭제, 수정에 대해서는 작동하지 않음.)
- timestamp+incrementing : timestamp column과 incrementing column 2개의 column을 사용하여 들어온 row를 신규로 판단하고, 해당 데이터만 가져온다.
- incrementing.column.name : Incrementing column의 이름을 설정.
- tasks.max : Connector에서 task의 수를 얼마나 가져갈 지를 설정.
- transforms : DB에 테이블에는 timestamp column은 Source Connector를 이용하여 데이터를 가져올 경우, type이 Unix Epoch Time으로 변경, 따라서 transforms에 있는 Timestamp Converter를 이용하여 timestamp type으로 변경 후, topic에 string으로 넣어야함.
- transforms.TimestampConverter.type : Timestamp Converter의 type의 설정
- transforms.TimestampConverter.field : Timestamp Converter를 정용할 field를 설정
- transforms.TimestampConverter.format : Timestamp Converter의 format을 설정
- transforms.TimestampConverter.target.type : Timestamp Converter를 이용하여 변환한 후에 적용할 type을 설정
curl을 이용하여 Connect REST API에 POST로 json을 보냄
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d @source_connector.json

connector 생성 확인

source connector 정보 확인
curl -X GET http://localhost:8083/connectors/<connector 이름>
Topic에 쌓인 데이터 확인(kafkacat)
# Linux
apt-get install kafkacat
# MacOs
brew install kcat
# broker 주소
kcat -L -b localhost:9092
kcat -b localhost:9092 -t postgres-source-weather
Connector 삭제
curl -X DELETE http://localhost:8083/connectors/<connector_name>

728x90
반응형
'Back-end & Server > Kafka' 카테고리의 다른 글
[Kafka] Sink Connector (0) | 2024.03.20 |
---|---|
[Kafka] Kafka System (0) | 2024.03.01 |
[Kafka] Producer와 Consumer의 한계 (0) | 2024.03.01 |
[Kafka] Producer & Consumer (0) | 2024.02.29 |
[Kafka] 개요 (0) | 2024.02.29 |