본문 바로가기

Back-end & Server/Kafka

[Kafka] Source Connector

728x90
반응형

 

 

생성

Source Connector는 Connect에 API 호출을 통해 생성한다.

ex. source_connector.json

{
    "name": "postgres-source-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:postgresql://postgres-server:5432/mydatabase",
        "connection.user": "myuser",
        "connection.password": "mypassword",
        "table.whitelist": "iris_data",
        "topic.prefix": "postgres-source-",
        "topic.creation.default.partitions": 1,
        "topic.creation.default.replication.factor": 1,
        "mode": "incrementing",
        "incrementing.column.name": "id",
        "tasks.max": 2,
        "transforms": "TimestampConverter",
        "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.TimestampConverter.field": "timestamp",
        "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.S",
        "transforms.TimestampConverter.target.type": "string"
    }
}

 

name : Connector의 이름 정의

config

  • connector.class : Connector를 생성하기 위한 class 설정
  • connection.url : Source DB에 접근하기 위한 주소 설정
  • connection.user : Source DB에 접속하기 위한 유저 이름을 설정
  • connection.password : Source DB에 접속하기 위한 유저 비밀번호
  • table.whitelist : 데이터를 가져올 테이블의 목록을 설정(복수 개의 테이블의 경우 콤마(,)를 통해 작성)
  • topic.prefix : 토픽 생성 시 이름 앞에 붙일 prefix를 설정, prefix와 테이블 이름이 최종 토픽의 이름
  • topic.creation.default.partitions : 토픽 자동 생성을 위해 반드시 설정, 토픽 자동 생성 시 파티션 수를 설정
  • topic.creation.default.replication.factor : 토픽 자동 생성을 위해 반드시 설정, Replication Factor의 수 설정
  • mode : 테이블 변경이 발생했을 때 어떤 방식으로 데이터를 가져올 지 설정(bulk, timestamp,incrementing,timestamp+incrementing)
    • bulk : event가 발생한 테이블의 내용을 모두 가져옴.
    • timestamp : timestamp column을 통해 들어온 row를 신규로 판단, 해당 데이터만 가져옴.
    • incrementing : incrementing column을 통해 들어온 row를 신규로 판단, 해당 데이터만 가져옴.(삭제, 수정에 대해서는 작동하지 않음.)
    • timestamp+incrementing : timestamp column과 incrementing column 2개의 column을 사용하여 들어온 row를 신규로 판단하고, 해당 데이터만 가져온다.

 

  • incrementing.column.name : Incrementing column의 이름을 설정.
  • tasks.max : Connector에서 task의 수를 얼마나 가져갈 지를 설정.
  • transforms : DB에 테이블에는 timestamp column은 Source Connector를 이용하여 데이터를 가져올 경우, type이 Unix Epoch Time으로 변경, 따라서 transforms에 있는 Timestamp Converter를 이용하여 timestamp type으로 변경 후, topic에 string으로 넣어야함.
  • transforms.TimestampConverter.type : Timestamp Converter의 type의 설정
  • transforms.TimestampConverter.field : Timestamp Converter를 정용할 field를 설정
  • transforms.TimestampConverter.format : Timestamp Converter의 format을 설정
  • transforms.TimestampConverter.target.type : Timestamp Converter를 이용하여 변환한 후에 적용할 type을 설정

 

 

curl을 이용하여 Connect REST API에 POST로 json을 보냄

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d @source_connector.json

 

 

 

connector 생성 확인

 

 

 source connector 정보 확인

curl -X GET http://localhost:8083/connectors/<connector 이름>

 

 

Topic에 쌓인 데이터 확인(kafkacat)

# Linux
apt-get install kafkacat
# MacOs
brew install kcat

 

# broker 주소
kcat -L -b localhost:9092
kcat -b localhost:9092 -t postgres-source-weather

 

 

 

Connector 삭제

curl -X DELETE http://localhost:8083/connectors/<connector_name>

728x90
반응형

'Back-end & Server > Kafka' 카테고리의 다른 글

[Kafka] Sink Connector  (0) 2024.03.20
[Kafka] Kafka System  (0) 2024.03.01
[Kafka] Producer와 Consumer의 한계  (0) 2024.03.01
[Kafka] Producer & Consumer  (0) 2024.02.29
[Kafka] 개요  (0) 2024.02.29