2021. 5. 24. 17:37

이번에 적용해볼 것들

  • Order Service에 요청된 주문의 수량 정보를 Catalog Service에 반영
  • Order Service에서 Kafka Topic으로 메세지 전송 -> Producer
  • Catalog Service에서 Kafka Topic에 전송된 메세지 취득 -> Consumer

kafka 의존성 추가


Catalog Service

public class KafkaConsumerConfig {

    // Consumer 빈 설정 및 등록 
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();
        // kafka 서버의 host, port, 컨슈머는 데이터를 받아오기 때문에 역직렬화 설정
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(properties);

    // 카프카에 들어온 정보를 감지하기 위해 리스너 빈 등록
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
                = new ConcurrentKafkaListenerContainerFactory<>();
        return kafkaListenerContainerFactory;

catalog service 수정

public class KafkaConsumer {

    private final CatalogRepository catalogRepository;
    private final String kafkaTopic = "example-catalog-topic";

    // Listen할 토픽 설정
    @KafkaListener(topics = kafkaTopic)
    public void updateQuantity(String kafkaMessage) {
        log.info("kafka Message = " + kafkaMessage);

        // 역직렬화
        Map<Object, Object> map = new HashMap<>();
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            map = objectMapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
        } catch(JsonProcessingException e) {

        CatalogEntity entity = catalogRepository.findByProductId((String) map.get("productId"));
        // 상품이 존재할 경우 상품의 수량 수정
        if (entity != null) {
            entity.setStock(entity.getStock() - (Integer) map.get("quantity"));

Order Service

public class KafkaProducerConfig {

    // kafka로 메세지를 보내야하기 때문에 직렬화 해주어야함
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(properties);

    // 데이터 전달 인스턴스
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());

Order Controller 수정

public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
                                                 @RequestBody RequestOrder request) throws JsonProcessingException {

    ModelMapper modelMapper = new ModelMapper();

    OrderDto orderDto = modelMapper.map(request, OrderDto.class);
    OrderDto createdOrder = orderService.createOrder(orderDto);

    // kafkatopic에 주문 데이터 전달
    kafkaProducer.send(kafkaTopic, orderDto);

    ResponseOrder responseOrder = modelMapper.map(orderDto, ResponseOrder.class);
    return ResponseEntity.status(HttpStatus.CREATED).body(modelMapper.map(createdOrder, ResponseOrder.class));

Order Service에서 주문해보기

먼저 상품의 재고 확인

# [GET] /catalog-service/catalogs
        "productId": "CATALOG-001",
        "productName": "Berlin",
        "unitPrice": 1500,
        "stock": 99,
        "createdAt": "2021-05-24T16:19:54.62"


# [POST] /order-service/{userId}/orders
    "productId" : "CATALOG-001",
    "quantity" : 95,
    "unitPrice" : 2000
    "productId": "CATALOG-001",
    "quantity": 95,
    "unitPrice": 2000,
    "totalPrice": 190000,
    "orderId": "b0b2d4be-e484-44c8-a42d-258bc79ec1bc"

다시 상품의 재고 확인

# [GET] /catalog-service/catalogs
        "productId": "CATALOG-001",
        "productName": "Berlin",
        "unitPrice": 1500,
        "stock": 4,
        "createdAt": "2021-05-24T16:19:54.62"

그리고 내 정보에서 주문 확인

    "email": "kobumssh@naver.com",
    "name": "고범석",
    "userId": "b169e54c-c563-449a-9fe4-2b77fa4df2fd",
    "orders": [
            "productId": "CATALOG-001",
            "quantity": 95,
            "unitPrice": 2000,
            "totalPrice": 190000,
            "createdAt": "2021-05-24T16:39:24",
            "orderId": "b0b2d4be-e484-44c8-a42d-258bc79ec1bc"

Order Service Instance가 복수일 경우 동기화 문제를 해결해보기

만약 OrderService 인스턴스를 하나 더 띄우게 되면 프로젝트에는 각각의 인스턴스에 H2 내장 DB가 붙는다. 그리고 라운드 로빈 방식으로 각 서비스가 호출되기 때문에 주문을 여러번 하고 내 주문 조회를 하게 되면 조회를 할 때 마다 결과가 다르게 나온다.


이 문제를 해결하기 위해 Order Service에 요청된 주문 정보를 DB가 아니라 Kafka의 Topic으로 전송하고, Topic에 설정된 Kafka Sink Connect를 사용해 단일 DB(Maria DB)에 저장해보자


Table 생성

    id int auto_increment primary key,
    product_id varchar(20) not null,
    quantity int default 0,
    unit_price int default 0,
    total_price int default 0,
    user_id varchar(50) not null,
    order_id varchar(50) not null,
    created_at datetime default NOW()

Order Service의 datasource 설정 변경

    url: jdbc:mariadb://localhost:포트/스키마
    driver-class-name: org.mariadb.jdbc.Driver
    username: root
    password: 비밀번호입력

Kafka Sink Connect 추가

# connect의 기본 포트는 8083
# 커넥트 추가, POST로 전송
    "config": {
        "auto.create":"true",  # DB를 자동으로 만들기 설정, 토픽과 같은 이름의 테이블 생성
        "topics":"orders"    # 정보를 받을 토픽

Order Service Controller 수정

    public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
                                                     @RequestBody RequestOrder request) throws JsonProcessingException {

        ModelMapper modelMapper = new ModelMapper();

        OrderDto orderDto = modelMapper.map(request, OrderDto.class);

        orderDto.setTotalPrice(request.getUnitPrice() * request.getQuantity());

        // catalog-service와 데이터 동기화
        kafkaProducer.send(catalogTopic, orderDto);
        // order-service의 데이터 동기화(단일 DB에 저장)
        orderProducer.send(orderTopic, orderDto);

        ResponseOrder responseOrder = modelMapper.map(orderDto, ResponseOrder.class);
        return ResponseEntity.status(HttpStatus.CREATED).body(modelMapper.map(responseOrder, ResponseOrder.class));

각각의 DTO들과 OrderProducer 설정

// Kafka에 넣을 DTO, 직렬화 해주기 잊지말긔
public class KafkaOrderDto implements Serializable {

    private Schema schema;
    private Payload payload;


// 스키마 정보 클래스
public class Schema {

    private String type;
    private List<Field> fields;
    private boolean optional;
    private String name;


// Schema 내부의 필드 정의 클래스
public class Field {

    private String type;
    private boolean optional;
    private String field;


// 실제 데이터
public class Payload {

    private String order_id;
    private String user_id;
    private String product_id;
    private int quantity;
    private int unit_price;
    private int total_price;
public class KafkaProducer {

    // 카프카에 데이터를 보낼 템플릿
    private final KafkaTemplate<String, String> kafkaTemplate;

    public OrderDto send(String topic, OrderDto orderDto) throws JsonProcessingException {
        ObjectMapper objectMapper = new ObjectMapper();
        // dto를 json으로 변환
        String jsonInString = objectMapper.writeValueAsString(orderDto);
        kafkaTemplate.send(topic, jsonInString);
        log.info("Kafka Producer sent data from the Order micro service : " + orderDto);

        return orderDto;
public class OrderProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;
    // 필드, 스키마는 이미 DB 테이블이 존재하기 때문에 그에 맞게 바로 생성
    private final List<Field> fields = Arrays.asList(
    private final Schema schema = Schema.builder()

    public OrderDto send(String topic, OrderDto orderDto) throws JsonProcessingException {

        Payload payload = Payload.builder()

        KafkaOrderDto kafkaOrderDto = KafkaOrderDto.builder()

        ObjectMapper objectMapper = new ObjectMapper();
        // json으로 변환
        String jsonInString = objectMapper.writeValueAsString(kafkaOrderDto);
        kafkaTemplate.send(topic, jsonInString);
        log.info("Order Producer sent data from the Order micro-service : " + kafkaOrderDto);

        return orderDto;


현재 내 주문 정보와 상품 재고는 다음과 같이 나온다.

# 주문 정보
    "email": "kobumssh@naver.com",
    "name": "고범석",
    "userId": "b169e54c-c563-449a-9fe4-2b77fa4df2fd",
    "orders": [
            "productId": "CATALOG-001",
            "quantity": 95,
            "unitPrice": 2000,
            "totalPrice": 190000,
            "createdAt": "2021-05-24T16:39:24",
            "orderId": "b0b2d4be-e484-44c8-a42d-258bc79ec1bc"

# 상품 재고
        "productId": "CATALOG-001",
        "productName": "Berlin",
        "unitPrice": 1500,
        "stock": 4,
        "createdAt": "2021-05-24T16:19:54.62"
        "productId": "CATALOG-002",
        "productName": "Tokyo",
        "unitPrice": 1000,
        "stock": 108,
        "createdAt": "2021-05-24T16:19:54.625"
        "productId": "CATALOG-003",
        "productName": "Stockholm",
        "unitPrice": 2000,
        "stock": 105,
        "createdAt": "2021-05-24T16:19:54.625"

이제 주문을 두번 더 해보자.

    "productId" : "CATALOG-002",
    "quantity" : 18,
    "unitPrice" : 1000
    "productId" : "CATALOG-003",
    "quantity" : 15,
    "unitPrice" : 2000

그리고 내 주문 조회 및 상품 재고를 조회해보면?

# 주문 조회
    "email": "kobumssh@naver.com",
    "name": "고범석",
    "userId": "b169e54c-c563-449a-9fe4-2b77fa4df2fd",
    "orders": [
            "productId": "CATALOG-001",
            "quantity": 95,
            "unitPrice": 2000,
            "totalPrice": 190000,
            "createdAt": "2021-05-24T16:39:24",
            "orderId": "b0b2d4be-e484-44c8-a42d-258bc79ec1bc"
            "productId": "CATALOG-002",
            "quantity": 18,
            "unitPrice": 1000,
            "totalPrice": 18000,
            "createdAt": "2021-05-24T17:30:50",
            "orderId": "c4b5a3f1-6914-4e67-bf12-0b7461da87c8"
            "productId": "CATALOG-003",
            "quantity": 15,
            "unitPrice": 2000,
            "totalPrice": 30000,
            "createdAt": "2021-05-24T17:31:46",
            "orderId": "9ba95fa6-a61b-43fe-a092-5a5834882b53"

# 재고
        "productId": "CATALOG-001",
        "productName": "Berlin",
        "unitPrice": 1500,
        "stock": 4,
        "createdAt": "2021-05-24T16:19:54.62"
        "productId": "CATALOG-002",
        "productName": "Tokyo",
        "unitPrice": 1000,
        "stock": 90,
        "createdAt": "2021-05-24T16:19:54.625"
        "productId": "CATALOG-003",
        "productName": "Stockholm",
        "unitPrice": 2000,
        "stock": 90,
        "createdAt": "2021-05-24T16:19:54.625"

잘 반영된 것을 확인할 수 있다. Maria DB에서도 확인해보자! HeidiSQL로 접속해서 확인해보았다.



테이블명도 잘 있다!

