고딩왕 코범석
[MSA] 데이터 동기화를 위한 Kafka 활용 -2 본문
반응형
이번에 적용해볼 것들
- Order Service에 요청된 주문의 수량 정보를 Catalog Service에 반영
- Order Service에서 Kafka Topic으로 메세지 전송 -> Producer
- Catalog Service에서 Kafka Topic에 전송된 메세지 취득 -> Consumer
kafka 의존성 추가
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Catalog Service
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
// Consumer 빈 설정 및 등록
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
// kafka 서버의 host, port, 컨슈머는 데이터를 받아오기 때문에 역직렬화 설정
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
// 카프카에 들어온 정보를 감지하기 위해 리스너 빈 등록
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
catalog service 수정
@Service
@RequiredArgsConstructor
@Slf4j
@Transactional
public class KafkaConsumer {
private final CatalogRepository catalogRepository;
private final String kafkaTopic = "example-catalog-topic";
// Listen할 토픽 설정
@KafkaListener(topics = kafkaTopic)
public void updateQuantity(String kafkaMessage) {
log.info("kafka Message = " + kafkaMessage);
// 역직렬화
Map<Object, Object> map = new HashMap<>();
ObjectMapper objectMapper = new ObjectMapper();
try {
map = objectMapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
} catch(JsonProcessingException e) {
e.printStackTrace();
}
CatalogEntity entity = catalogRepository.findByProductId((String) map.get("productId"));
// 상품이 존재할 경우 상품의 수량 수정
if (entity != null) {
entity.setStock(entity.getStock() - (Integer) map.get("quantity"));
}
}
}
Order Service
@Configuration
@EnableKafka
public class KafkaProducerConfig {
// kafka로 메세지를 보내야하기 때문에 직렬화 해주어야함
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
// 데이터 전달 인스턴스
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Order Controller 수정
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
@RequestBody RequestOrder request) throws JsonProcessingException {
ModelMapper modelMapper = new ModelMapper();
modelMapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
OrderDto orderDto = modelMapper.map(request, OrderDto.class);
orderDto.setUserId(userId);
OrderDto createdOrder = orderService.createOrder(orderDto);
// kafkatopic에 주문 데이터 전달
kafkaProducer.send(kafkaTopic, orderDto);
ResponseOrder responseOrder = modelMapper.map(orderDto, ResponseOrder.class);
return ResponseEntity.status(HttpStatus.CREATED).body(modelMapper.map(createdOrder, ResponseOrder.class));
}
Order Service에서 주문해보기
먼저 상품의 재고 확인
# [GET] /catalog-service/catalogs
{
{
"productId": "CATALOG-001",
"productName": "Berlin",
"unitPrice": 1500,
"stock": 99,
"createdAt": "2021-05-24T16:19:54.62"
},...
}
주문
# [POST] /order-service/{userId}/orders
Request
{
"productId" : "CATALOG-001",
"quantity" : 95,
"unitPrice" : 2000
}
Response
{
"productId": "CATALOG-001",
"quantity": 95,
"unitPrice": 2000,
"totalPrice": 190000,
"orderId": "b0b2d4be-e484-44c8-a42d-258bc79ec1bc"
}
다시 상품의 재고 확인
# [GET] /catalog-service/catalogs
{
{
"productId": "CATALOG-001",
"productName": "Berlin",
"unitPrice": 1500,
"stock": 4,
"createdAt": "2021-05-24T16:19:54.62"
},...
}
그리고 내 정보에서 주문 확인
{
"email": "kobumssh@naver.com",
"name": "고범석",
"userId": "b169e54c-c563-449a-9fe4-2b77fa4df2fd",
"orders": [
{
"productId": "CATALOG-001",
"quantity": 95,
"unitPrice": 2000,
"totalPrice": 190000,
"createdAt": "2021-05-24T16:39:24",
"orderId": "b0b2d4be-e484-44c8-a42d-258bc79ec1bc"
}
]
}
Order Service Instance가 복수일 경우 동기화 문제를 해결해보기
만약 OrderService 인스턴스를 하나 더 띄우게 되면 프로젝트에는 각각의 인스턴스에 H2 내장 DB가 붙는다. 그리고 라운드 로빈 방식으로 각 서비스가 호출되기 때문에 주문을 여러번 하고 내 주문 조회를 하게 되면 조회를 할 때 마다 결과가 다르게 나온다.
이 문제를 해결하기 위해 Order Service에 요청된 주문 정보를 DB가 아니라 Kafka의 Topic으로 전송하고, Topic에 설정된 Kafka Sink Connect를 사용해 단일 DB(Maria DB)에 저장해보자
Table 생성
CREATE TABLE orders (
id int auto_increment primary key,
product_id varchar(20) not null,
quantity int default 0,
unit_price int default 0,
total_price int default 0,
user_id varchar(50) not null,
order_id varchar(50) not null,
created_at datetime default NOW()
);
Order Service의 datasource 설정 변경
spring:
datasource:
url: jdbc:mariadb://localhost:포트/스키마
driver-class-name: org.mariadb.jdbc.Driver
username: root
password: 비밀번호입력
Kafka Sink Connect 추가
# connect의 기본 포트는 8083
# 커넥트 추가, POST로 전송
{
"name":"my-order-sink-connect",
"config": {
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"test1357",
"auto.create":"true", # DB를 자동으로 만들기 설정, 토픽과 같은 이름의 테이블 생성
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"orders" # 정보를 받을 토픽
}
}
Order Service Controller 수정
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
@RequestBody RequestOrder request) throws JsonProcessingException {
ModelMapper modelMapper = new ModelMapper();
modelMapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
OrderDto orderDto = modelMapper.map(request, OrderDto.class);
orderDto.setUserId(userId);
orderDto.setOrderId(UUID.randomUUID().toString());
orderDto.setTotalPrice(request.getUnitPrice() * request.getQuantity());
// catalog-service와 데이터 동기화
kafkaProducer.send(catalogTopic, orderDto);
// order-service의 데이터 동기화(단일 DB에 저장)
orderProducer.send(orderTopic, orderDto);
ResponseOrder responseOrder = modelMapper.map(orderDto, ResponseOrder.class);
return ResponseEntity.status(HttpStatus.CREATED).body(modelMapper.map(responseOrder, ResponseOrder.class));
}
각각의 DTO들과 OrderProducer 설정
// Kafka에 넣을 DTO, 직렬화 해주기 잊지말긔
public class KafkaOrderDto implements Serializable {
private Schema schema;
private Payload payload;
}
...
// 스키마 정보 클래스
public class Schema {
private String type;
private List<Field> fields;
private boolean optional;
private String name;
}
...
// Schema 내부의 필드 정의 클래스
public class Field {
private String type;
private boolean optional;
private String field;
}
...
// 실제 데이터
public class Payload {
private String order_id;
private String user_id;
private String product_id;
private int quantity;
private int unit_price;
private int total_price;
}
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
// 카프카에 데이터를 보낼 템플릿
private final KafkaTemplate<String, String> kafkaTemplate;
public OrderDto send(String topic, OrderDto orderDto) throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
// dto를 json으로 변환
String jsonInString = objectMapper.writeValueAsString(orderDto);
kafkaTemplate.send(topic, jsonInString);
log.info("Kafka Producer sent data from the Order micro service : " + orderDto);
return orderDto;
}
}
@Service
@RequiredArgsConstructor
@Slf4j
@Transactional
public class OrderProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
// 필드, 스키마는 이미 DB 테이블이 존재하기 때문에 그에 맞게 바로 생성
private final List<Field> fields = Arrays.asList(
Field.builder().type("string").optional(true).field("order_id").build(),
Field.builder().type("string").optional(true).field("user_id").build(),
Field.builder().type("string").optional(true).field("product_id").build(),
Field.builder().type("int32").optional(true).field("quantity").build(),
Field.builder().type("int32").optional(true).field("unit_price").build(),
Field.builder().type("int32").optional(true).field("total_price").build());
private final Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("orders").build();
public OrderDto send(String topic, OrderDto orderDto) throws JsonProcessingException {
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.quantity(orderDto.getQuantity())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();
KafkaOrderDto kafkaOrderDto = KafkaOrderDto.builder()
.schema(schema)
.payload(payload)
.build();
ObjectMapper objectMapper = new ObjectMapper();
// json으로 변환
String jsonInString = objectMapper.writeValueAsString(kafkaOrderDto);
kafkaTemplate.send(topic, jsonInString);
log.info("Order Producer sent data from the Order micro-service : " + kafkaOrderDto);
return orderDto;
}
}
테스트
현재 내 주문 정보와 상품 재고는 다음과 같이 나온다.
# 주문 정보
{
"email": "kobumssh@naver.com",
"name": "고범석",
"userId": "b169e54c-c563-449a-9fe4-2b77fa4df2fd",
"orders": [
{
"productId": "CATALOG-001",
"quantity": 95,
"unitPrice": 2000,
"totalPrice": 190000,
"createdAt": "2021-05-24T16:39:24",
"orderId": "b0b2d4be-e484-44c8-a42d-258bc79ec1bc"
}
]
}
# 상품 재고
[
{
"productId": "CATALOG-001",
"productName": "Berlin",
"unitPrice": 1500,
"stock": 4,
"createdAt": "2021-05-24T16:19:54.62"
},
{
"productId": "CATALOG-002",
"productName": "Tokyo",
"unitPrice": 1000,
"stock": 108,
"createdAt": "2021-05-24T16:19:54.625"
},
{
"productId": "CATALOG-003",
"productName": "Stockholm",
"unitPrice": 2000,
"stock": 105,
"createdAt": "2021-05-24T16:19:54.625"
}
]
이제 주문을 두번 더 해보자.
{
"productId" : "CATALOG-002",
"quantity" : 18,
"unitPrice" : 1000
}
{
"productId" : "CATALOG-003",
"quantity" : 15,
"unitPrice" : 2000
}
그리고 내 주문 조회 및 상품 재고를 조회해보면?
# 주문 조회
{
"email": "kobumssh@naver.com",
"name": "고범석",
"userId": "b169e54c-c563-449a-9fe4-2b77fa4df2fd",
"orders": [
{
"productId": "CATALOG-001",
"quantity": 95,
"unitPrice": 2000,
"totalPrice": 190000,
"createdAt": "2021-05-24T16:39:24",
"orderId": "b0b2d4be-e484-44c8-a42d-258bc79ec1bc"
},
{
"productId": "CATALOG-002",
"quantity": 18,
"unitPrice": 1000,
"totalPrice": 18000,
"createdAt": "2021-05-24T17:30:50",
"orderId": "c4b5a3f1-6914-4e67-bf12-0b7461da87c8"
},
{
"productId": "CATALOG-003",
"quantity": 15,
"unitPrice": 2000,
"totalPrice": 30000,
"createdAt": "2021-05-24T17:31:46",
"orderId": "9ba95fa6-a61b-43fe-a092-5a5834882b53"
}
]
}
# 재고
[
{
"productId": "CATALOG-001",
"productName": "Berlin",
"unitPrice": 1500,
"stock": 4,
"createdAt": "2021-05-24T16:19:54.62"
},
{
"productId": "CATALOG-002",
"productName": "Tokyo",
"unitPrice": 1000,
"stock": 90,
"createdAt": "2021-05-24T16:19:54.625"
},
{
"productId": "CATALOG-003",
"productName": "Stockholm",
"unitPrice": 2000,
"stock": 90,
"createdAt": "2021-05-24T16:19:54.625"
}
]
잘 반영된 것을 확인할 수 있다. Maria DB에서도 확인해보자! HeidiSQL로 접속해서 확인해보았다.
테이블명도 잘 있다!
본 내용은 이도원님의 Spring Cloud로 개발하는 마이크로서비스 애플리케이션을 수강하고 정리한 내용입니다.
반응형
'Language & Framework > Spring' 카테고리의 다른 글
JPA @OneToMany cascade, orphanRemoval 정리 (0) | 2021.08.02 |
---|---|
Controller에서 다형성을 활용해 JSON 객체를 받아보자! (0) | 2021.06.07 |
[MSA] 데이터 동기화를 위한 Kafka 활용 -1 (0) | 2021.05.24 |
[MSA] Spring Cloud Bus및 설정 정보 암호화 (0) | 2021.05.21 |
[MSA] Spring Cloud Config (3) | 2021.05.15 |