고딩왕 코범석

8장 - 애그리거트와 트랜잭션 본문

Book Lounge/도메인 주도 개발 시작하기

8장 - 애그리거트와 트랜잭션

고딩왕 코범석 2022. 7. 22. 16:04
반응형

Index

  1. 애그리거트와 트랜잭션
  2. 선점 잠금
    1. 선점 잠금과 교착상태
  3. 비선점 잠금
    1. 강제 버전 증가
  4. 오프라인 선점 잠금
    1. 오프라인 선점 잠금을 위한 LockManager 인터페이스와 관련 클래스
    2. DB를 이용한 LockManager 구현

애그리거트와 트랜잭션

개념적으로 같은 애그리거트지만 물리적으로 다른 애그리거트 객체를 사용할 때 데이터의 일관성이 깨질 수 있다.

  • 예를 들어 운영자가 상품을 배송 상태로 변경하는 요청, 고객의 배송지 요청이 동시 접근했을 때를 가정해본다.
  • 운영자가 배송 상태를 변경할 때 고객의 배송지 변경 기능을 막아야 일관성이 지켜진다.

https://user-images.githubusercontent.com/37062337/179507846-a1e231a8-f353-4f64-9f2e-063c55bf414d.png

일관성이 깨지지 않도록 둘 중 하나의 방법을 선택해야한다.

  1. 운영자가 배송지 정보를 조회하고 상태를 변경하는 동안, 고객이 애그리거트를 수정하지 못하게 막는다.
  2. 운영자가 배송지 정보를 조회한 이후 고객이 정보를 변경하면, 운영자가 애그리거트를 다시 조회한 뒤에 배송 상태를 변경하도록 한다.

이 두 방법은 애그리거트 자체의 트랜잭션과 관련이 있다. 트랜잭션의 처리 방식에는 선점(비관적) 잠금, 비선점(낙관적) 잠금 기법이 있다.

위로

선점 잠금

선점(비관적) 잠금은 애그리거트를 구한 쓰레드가 사용이 끝날 때 까지 다른 쓰레드가 해당 애그리거트를 수정하지 못하게 하는 방식이다.

https://user-images.githubusercontent.com/37062337/179513354-38e70f93-2127-4d46-bc21-9c9ab9d00563.png

아래 그림에서는 운영자가 주문 애그리거트를 가져와 상태를 변경하는 동안 고객은 애그리거트에 대한 잠금이 해제될 때 까지 블로킹이 되는 상황이다.

선점 잠금은 보통 DBMS가 제공하는 행단위 잠금을 사용해 구현한다. 대부분의 DBMS가 for update와 같은 쿼리를 이용해 특정 레코드에 한 커넥션만 접근할 수 있는 잠금장치를 제공한다.

JPA EntityManager는 LockModeType을 인자로 받는 find()를 제공하며 하이버네이트는 PESSIMISTIC_WRITE를 잠금 모드로 사용하면 for update 쿼리를 이용해 선점 잠금을 구현한다.

또한, 스프링 데이터 JPA에서는 @Lock을 이용해 잠금 모드를 지정한다.

public interface MemberRepository extends JpaRepository<Member, Long> {

    @Lock(LockModeType.PESSIMISTIC_WRITE)
    @Query("select m from Member m where m.id = :id")
    Optional<Member> findByIdForUpdate(@Param("id") Long id);
}

선점 잠금과 교착상태

선점 잠금시에는 교착상태가 발생하지 않도록 주의해야한다. 다음과 같은 상황을 보자.

  1. [Thread-1] A 애그리거트에 대한 선점 잠금 구함
  2. [Thread-2] B 애그리거트에 대한 선점 잠금 구함
  3. [Thread-1] B 애그리거트에 대한 선점 잠금 시도
  4. [Thread-2] A 애그리거트에 대한 선점 잠금 시도

이 경우 Thread-1, Thread-2는 선점 잠금 시도 시 원하는 선점 잠금을 구할 수 없다.

이런 문제가 발생하지 않도록 잠금을 구할 때 최대 대기 시간을 지정해야 한다. 스프링 데이터 JPA에서 선점 잠금을 시도할 때 최대 대기 시간을 지정하려면 @QueryHints를 사용해야 한다.

public interface MemberRepository extends JpaRepository<Member, Long> {

    @Lock(LockModeType.PESSIMISTIC_WRITE)
    @QueryHints({
        @QueryHint(name = "javax.persistence.lock.timeout", value = "2000")
    })
    @Query("select m from Member m where m.id = :id")
    Optional<Member> findByIdForUpdate(@Param("id") Long id);
}

위로

비선점 잠금

선점 잠금으로 설정해도 아래와 같은 상황에서는 문제가 발생한다.

https://user-images.githubusercontent.com/37062337/179667743-f455985b-7361-42b4-a980-8d2a7a8b0cd2.png

이 상황에서의 문제는 운영자가 배송지 정보를 조회하고 배송 상태로 변경하는 사이에 고객이 배송지를 변경한 것이다. 즉, 배송 상태 변경 전에 배송지를 한번 더 확인하지 않으면 운영자는 다른 배송지로 상품을 보내는 상황이 발생한다.

이 때는 비선점 잠금 방식으로 해결한다. 비선점 잠금 방식은 변경한 데이터를 실제 DBMS에 반영하는 시점에 변경 가능 여부를 확인하는 방식이다.

비선점 잠금 구현시에는 애그리거트에 버전으로 사용할 숫자 타입 프로퍼티를 추가해야한다. 애그리거트를 수정할 때 마다 버전으로 사용할 프로퍼티를 1 증가시켜 다음과 같은 쿼리를 사용한다.

update aggregate_table 
set version = version + 1, 
    change_column = 'change_value'
where aggregate_id = ?
    and version = {현재버전}

그림으로 표현하면 아래와 같다.

https://user-images.githubusercontent.com/37062337/179671805-823e5993-86e6-4f91-a79b-19e113e0b5f1.png

JPA는 버전을 이용한 비선점 잠금 기능을 지원한다. 아래와 같이 버전으로 사용할 필드에 @Version을 붙여 매핑되는 테이블에 버전을 저장할 컬럼을 추가하면 된다.

@Entity
public class Order {

    @Id
    @GeneratedValue(stratagy = IDENTITY)
    @Column(name = "order_id")
    private Long id;

    @Version
    private Long version;

    ...
}

이 방법의 장점은 응용 서비스가 엔티티의 버전을 알 필요없이 리포지터리에서 필요한 애그리거트를 구해 알맞은 기능만 실행하면 된다. 기능 실행 과정에서 데이터가 변경되면 JPA는 트랜잭션 종료 시점에 비선점 잠금을 위한 쿼리를 실행한다.

쿼리의 where절에 version 컬럼이 있기 때문에 해당 버전의 row를 찾지 못할 경우 OptimisticLockingFailureException이 발생한다.

요청 시에는 애그리거트의 id와 version을 보내서 수정 전에 값에 대해 검증 후 수정을 진행한다. 만약 버전이 맞지 않을 경우 별도의 커스텀 예외를 발생시킨다.

강제 버전 증가

버전 변경 시 루트 애그리거트가 아닌 다른 엔티티 값만 변경할 시 JPA에서는 버전 값을 증가시키지 않는다. 연관된 엔티티 값이 변경된다 하더라도 루트 엔티티 자체는 변경되지 않기 때문에 갱신하지 않는다.

하지만 애그리거트 관점으로 봤을 때, 루트 엔티티가 바뀌지 않았더라도 애그리거트 구성 요소 중 일부가 변경되었다면 애그리거트 자체도 변경된 것이다. 또한 변경 되어야 비선점 잠금 방식도 올바르게 동작한다.

이 경우 JPA에서는 EntityManager에서 find()를 통해 엔티티를 구할 때 강제로 버전 값을 증가시키는 잠금 모드를 지원한다.

LockModeType.OPTIMISTIC_FORCE_INCREMENT를 사용하면 해당 엔티티의 상태가 변경되었는지 상관없이 트랜잭션 종료 시점에 버전 값 증가 처리를 한다. 스프링 데이터 JPA에서는 @Lock을 사용하면 된다.

위로

오프라인 선점 잠금

오프라인에서 데이터 충돌을 막으려면 선점 잠금, 비선점 잠금으로는 불가능하다. 오프라인 선점 잠금에서는 아래 그림과 같이 사용자 A가 모든 수정을 마치고 나서 다른 사용자가 잠금을 획득할 수 있다.

https://user-images.githubusercontent.com/37062337/180370946-42f9e091-4ffd-4ecf-a485-7d8fe2de2d1c.png

그림에서 사용자 A가 과정 3을 수행하지 않는다면 잠금을 해제하지 않으므로 다른 사용자는 잠금을 획득할 수 없다. 이런 상황을 방지하기 위해 오프라인 선점 잠금 방식은 잠금 유효 시간을 가져야한다.

오프라인 선점 잠금을 위한 LockManager 인터페이스와 관련 클래스

오프라인 선점 잠금은 다음과 같은 기능이 필요하며 이를 인터페이스화 한다.

  1. 잠금 선점 시도
  2. 잠금 확인
  3. 잠금 해제
  4. 잠금 유효시간 연장
public interface LockManager {
    LockId tryLock(String type, String id) throws LockException;
    void checkLock(LockId lockId) throws LockException;
    void releaseLock(LockId lockId) throws LockException;
    void extendLock(LockId lockId) throws Exception;
}

잠금을 해제하는 서비스 코드에서는 checkLock()을 호출해야하는데, 이는 잠금을 선점한 이후 실행하는 기능은 아래와 같은 상황을 고려해 주어진 LockId를 갖는 잠금이 유효한지 확인해야한다.

  • 잠금 유효 시간이 지났으면 이미 다른 사용자가 잠금을 선점
  • 잠금을 선점하지 않은 사용자가 기능을 실행한다면 실행을 막아야함

DB를 이용한 LockManager 구현

아래와 같은 테이블과 인덱스를 생성하여 LockManager를 구현한다.

-- MySQL
create table locks (
  `type` varchar(255),
  id varchar(255),
  lockid varchar(255),
  expiration_time datetime,
  primary key (`type`, id)
) character set utf8mb4;

create unique index locks_idx ON shop.locks (lockid);

예를 들어 Order 타입의 1번 식별자를 갖는 애그리거트에 대해 잠금을 갖고 싶다면 insert 쿼리를 이용해 locks 테이블에 데이터를 삽입하면 된다.

insert into locks values ('ORDER', '1', '{생성한 Lock ID}', '날짜');

그리고 LockManager 인터페이스의 구현체는 아래와 같다.

@Component
public class SpringLockManager implements LockManager {
    private int lockTimeout = 5 * 60 * 1000;
    private JdbcTemplate jdbcTemplate;

    private RowMapper<LockData> lockDataRowMapper = (rs, rowNum) ->
            new LockData(rs.getString(1), rs.getString(2),
                    rs.getString(3), rs.getTimestamp(4).getTime());

    public SpringLockManager(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    @Override
    public LockId tryLock(String type, String id) throws LockException {
        checkAlreadyLocked(type, id);
        LockId lockId = new LockId(UUID.randomUUID().toString());
        locking(type, id, lockId);
        return lockId;
    }

    private void checkAlreadyLocked(String type, String id) {
        List<LockData> locks = jdbcTemplate.query(
                "select * from locks where type = ? and id = ?",
                lockDataRowMapper, type, id);
        Optional<LockData> lockData = handleExpiration(locks);
        if (lockData.isPresent()) throw new AlreadyLockedException();
    }

    private Optional<LockData> handleExpiration(List<LockData> locks) {
        if (locks.isEmpty()) return Optional.empty();
        LockData lockData = locks.get(0);
        if (lockData.isExpired()) {
            jdbcTemplate.update(
                    "delete from locks where type = ? and id = ?",
                    lockData.getType(), lockData.getId());
            return Optional.empty();
        } else {
            return Optional.of(lockData);
        }
    }

    private void locking(String type, String id, LockId lockId) {
        try {
            int updatedCount = jdbcTemplate.update(
                    "insert into locks values (?, ?, ?, ?)",
                    type, id, lockId.getValue(), new Timestamp(getExpirationTime()));
            if (updatedCount == 0) throw new LockingFailException();
        } catch (DuplicateKeyException e) {
            throw new LockingFailException(e);
        }
    }

    private long getExpirationTime() {
        return System.currentTimeMillis() + lockTimeout;
    }

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    @Override
    public void checkLock(LockId lockId) throws LockException {
        Optional<LockData> lockData = getLockData(lockId);
        if (!lockData.isPresent()) throw new NoLockException();
    }

    private Optional<LockData> getLockData(LockId lockId) {
        List<LockData> locks = jdbcTemplate.query(
                "select * from locks where lockid = ?",
                lockDataRowMapper, lockId.getValue());
        return handleExpiration(locks);
    }

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    @Override
    public void extendLockExpiration(LockId lockId, long inc) throws LockException {
        Optional<LockData> lockDataOpt = getLockData(lockId);
        LockData lockData =
                lockDataOpt.orElseThrow(() -> new NoLockException());
        jdbcTemplate.update(
                "update locks set expiration_time = ? where type = ? AND id = ?",
                new Timestamp(lockData.getTimestamp() + inc),
                lockData.getType(), lockData.getId());
    }

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    @Override
    public void releaseLock(LockId lockId) throws LockException {
        jdbcTemplate.update("delete from locks where lockid = ?", lockId.getValue());
    }

    public void setLockTimeout(int lockTimeout) {
        this.lockTimeout = lockTimeout;
    }
}
  • tryLock()은 잠금을 획득하는 로직이다. 체크해야할 상황은 아래와 같다.
    • 타입과 ID에 대한 Lock이 있는지 조회한다. 만약 유효시간이 지나거나 존재하지 않는다면 Optional.empty()로 반환한다.
    • 존재할 경우는 예외 발생시킨다.
    • 존재하지 않는다면 lock 테이블에 데이터를 insert한다. 여기서도 처리된 sql 카운트가 0이거나 key가 중복된다면 예외를 발생시킨다.

@Transactional(propagation = Propagation.REQUIRES_NEW)

REQUIRES_NEW 옵션은 무조건 새로운 트랜잭션을 만든다. 기존에 진행중인 트랜잭션이 롤백되더라도 서로의 영향을 미치지 않는다. Lock 획득은 기능과 별개의 트랜잭션으로 설정하여 처리한다.

위로

반응형