고딩왕 코범석
[MSA] 데이터 동기화를 위한 Kafka 활용 -1 본문
반응형
Kafka Broker
- 실행된 Kafka 애플리케이션 서버 개념
- 3대 이상의 Broker Cluster 구성
Zookeeper
- 이러한 브로커들을 컨트롤해주는 역할
- 즉, 클라이언트가 서로 공유하는 데이터를 관리해주는 역할
- 분산되어 있는 각 애플리케이션의 정보를 중앙에 집중하고 구성관리, 그룹관리 네이밍, 동기화 등을 제공
Kafka Client
- Kafka와 데이터를 주고 받기 위해 사용하는 자바 라이브러리
- Producer, Consumer, Admin, Stream 등 각종 API 제공
- 다양한 Third Party Library 존재
Kafka 실행해보기
Zookeeper를 먼저 실행한 다음, Kafka 서버를 기동하자
# window 기준, zookeeper 실행
$KAFKA_HOME/bin/windows/zookeeper-server-start.bat $KAFKA_HOME/config/zookeeper.properties
# kafka 서버 구동
$KAFKA_HOME/bin/windows/kafka-server-start.bat $KAFKA_HOME/config/server.properties
topic 생성, 목록확인, 정보확인
# 생성 / kafka서버는 기본적으로 9092 포트를 사용하며 토픽의 파티션은 1개로 가정
$KAFKA_HOME/bin/windows/kafka-topics.bat --create --topic 토픽명 --bootstrap-server localhost:9092 --partitions 1
# 목록 확인
$KAFKA_HOME/bin/windows/kafka-topics.bat --bootstrap-server localhost:9092 --list
# 정보 확인
$KAFKA_HOME/bin/windows/kafka-topics.bat --describe --topic 토픽명 --bootstrap-server localhost:9092
메세지를 생산하는 producer, 소비하는 consumer 기동하기
# 생산
$KAFKA_HOME/bin/windows/kafka-console-producer.bat --broker-list localhost:9092 --topic 앞에서설정한토픽명
# 소비, from-beginning 해당 토픽의 처음부터 메세지를 받아오기 위한 옵션
$KAFKA_HOME/bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic 앞에서설정한토픽명 --from-beginning
Kafka Connect
- Kafka Connect를 통해 Data를 import/export 가능하다
- 코드 없이 Configuration으로 데이터를 이동
- Restful api를 통해 지원
- Stream or Batch 형태로 데이터 전송 가능
- 커스텀 Connector를 통한 다양한 plugin 제공 (DB를 적용해 볼 예정)
- Kafka Connect Source : 특정 리소스에서 데이터를 가져와 카프카 클러스터에 가져오는걸 개입한다.(import)
- Kafka Connect Sink : Cluster에 저장되어 있는 데이터를 다른 쪽으로 보내는데 개입한다.(export)
Kafka Connect 실행해보기
Kafka Connect 실행
$KAFKA_CONNECT_HOME/bin/windows/connect-distributed.bat ./etc/kafka/connect-distributed.properties
설정 정보 수정
# rem classpath addition for core 위에 삽입
rem classpath addition for LSB style path
if exist %BASE_DIR%\share\java\kafka\* (
call:concat %BASE_DIR%\share\java\kafka\*
)
# mariadb 연동을 위해 jdbc 추가
# 위치 : \etc\kafka\connect-distributed.properties
plugin.path=[confluentinc-kafka-connect-jdbc-10.0.1 폴더]
# 이후 kafka connect jdbc maridb 드라이버 파일을 confluent-6.1.0/share/java/kafka 에 복사
connect 연결 이후 토픽을 확인하면
이렇게 세 개의 토픽이 추가된 것을 확인할 수 있다. 커넥트가 소스에서 읽어왔던 데이터를 저장하고 관리하기 위한 토픽들이다.
Soure Connect 추가
# connect의 기본 포트는 8083
# 커넥트 추가, POST로 전송
{
"name" : "my-source-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"test1357",
"mode": "incrementing",
"incrementing.column.name" : "id",
"table.whitelist":"users", # 감지할 테이블명
"topic.prefix" : "my_topic_", # 저장될 토픽의 prefix 최종으로 만들어진 토픽 : my_topic_users
"tasks.max" : "1"
}
}
my_topic_users가 토픽으로 등록된 것을 확인할 수 있다. 이후, 마리아 db에 users 테이블을 만들어 데이터를 삽입 후 확인해보면
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "user_id"
},
{
"type": "string",
"optional": true,
"field": "pwd"
},
{
"type": "string",
"optional": true,
"field": "name"
},
{
"type": "int64",
"optional": true,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "created_at"
}
],
"optional": false,
"name": "users"
},
"payload": {
"id": 1,
"user_id": "hel",
"pwd": "1234",
"name": "ko",
"created_at": 1621788132000
}
}
Schema에 fields는 각 컬럼 정보들이 저장되어 있고, payload에 컬럼별 저장된 데이터 값들이 있다.
Sink Connect 추가
{
"name":"my-sink-connect",
"config": {
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"test1357",
"auto.create":"true", # DB를 자동으로 만들기 설정, 토픽과 같은 이름의 테이블 생성
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"my_topic_users" # 정보를 받을 토픽
}
}
본 내용은 이도원님의 Spring Cloud로 개발하는 마이크로서비스 애플리케이션을 수강하고 정리한 내용입니다.
반응형
'Language & Framework > Spring' 카테고리의 다른 글
Controller에서 다형성을 활용해 JSON 객체를 받아보자! (0) | 2021.06.07 |
---|---|
[MSA] 데이터 동기화를 위한 Kafka 활용 -2 (0) | 2021.05.24 |
[MSA] Spring Cloud Bus및 설정 정보 암호화 (0) | 2021.05.21 |
[MSA] Spring Cloud Config (3) | 2021.05.15 |
[MSA] API Gateway (0) | 2021.05.11 |