본문 바로가기

Back-end & Server/Kafka

[Kafka] Sink Connector

728x90
반응형

 

 

{
    "name": "postgres-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:postgresql://target-postgres-server:5432/targetdatabase",
        "connection.user": "targetuser",
        "connection.password": "targetpassword",
        "table.name.format": "<table format>",
        "topics": "<topic name>",
        "auto.create": false,
        "auto.evolve": false,
        "tasks.max": 2,
        "transforms": "TimestampConverter",
        "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.TimestampConverter.field": "timestamp",
        "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.S",
        "transforms.TimestampConverter.target.type": "Timestamp"
    }
}

 

name : connector의 이름

 

config 

  • connector.class : Connector를 생성하기 위한 class를 설정.
  • connection.url : Target DB에 접근하기 위한 주소 설정.
  • connection.user : Target DB에 접근하기 위한 유저 이름 설정.
  • connection.password : Target DB에 접근하기 위한 유저 비밀번호 설정.
  • table.name.format : Target DB에 전달할 테이블 이름의 format을 설정.
  • topics : Sink Connector가 브로커에 있는 토픽들 중에 가져올 토픽을 설정
  • auto.create : 테이블을 자동으로 생성할 지의 여부(true, false)
  • auto.evolve : Sink Connector로 들어오는 데이터의 스키마와 일치하도록 해당 DB에 있는 테이블의 스키마를 자동으로 변경시킬 지의 여부를 설정(false일 시 자동으로 변경하지 않도록 설정)
  • tasks.max : Connector에서 task의 수를 얼마나 가져갈 지를 설정.
  • transforms : 토픽에 있는 string type의 timestamp 값을 Target DB로 전달할 때 timestamp type으로 변경하여 전달해야함. Sink Connector를 생성할 때 transforms에 있는 Timestamp Converter를 이용하여 string type을 timestamp type으로 변경 후, Target DB에 넣는다.
  • transforms.TimestampConverter.type : Timestamp Converter type을 설정.
  • transforms.TimestampConverter.field : Timestamp Converter field을 설정.
  • transforms.TimestampConverter.format : Timestamp Converter format을 설정.
  • transforms.TimestampConverter.target.type : Timestamp Converter를 이용하여 변환한 후에 적용할 type을 설정.

 

 

Sink-Connector 생성

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d @sink_connector.json

 

728x90
반응형

'Back-end & Server > Kafka' 카테고리의 다른 글

[Kafka] Source Connector  (0) 2024.03.20
[Kafka] Kafka System  (0) 2024.03.01
[Kafka] Producer와 Consumer의 한계  (0) 2024.03.01
[Kafka] Producer & Consumer  (0) 2024.02.29
[Kafka] 개요  (0) 2024.02.29