SQLAlchemy

[SQLALchemy] 트랜잭션(Transaction) 그리고 Database Lock

bluebamus 2025. 2. 4.

1. 동시성 문제 구현 (Concurrency Implements)

   1) 테스트 테이블 정의

      - fastapi와 sqlalchemy를 기반으로 한 talbe을 정의한다.

from fastapi import FastAPI, HTTPException, Depends, status
from sqlalchemy import Column, Integer, text, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import func


Base = declarative_base()


# 모델 정의
class Post(Base):
    __tablename__ = "posts"

    pk = Column(Integer, primary_key=True, index=True)
    like = Column(Integer, server_default=text("'0'"))
    modified_at = Column(
        DateTime(timezone=True), default=func.now(), onupdate=func.now()
    )

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.modified_at = func.now()

 

   2) 기본 테스트 - like 증가

      - /inc 엔드포인트를 이용해 기본적인 증가 테스트를 진행하면 25의 like가 발생하는 것을 확인할 수 있다.

def increase_like(session: Session):
    for _ in range(25):
        post = session.query(Post).filter(Post.pk == 1).first()
        if post:
            print(f"Before increment: like = {post.like}")
            post.like += 1
            print(f"After increment: like = {post.like}")
            session.commit()
            print(f"After commit: like = {post.like}")


# 기본값을 가지는 post의 row를 생성하는 함수
def create_default_post(session: Session):
    # 이미 존재하는지 확인
    post = session.query(Post).filter(Post.pk == 1).first()
    if not post:
        # 기본값으로 새로운 Post 레코드 생성
        new_post = Post(like=0)  # 예시로 pk가 1인 새로운 post 생성
        session.add(new_post)
        session.commit()


@core_router.get("/inc")
def increment_likes(session: Session = Depends(get_db)):
    # post 테이블이 비어있으면 기본값을 가지는 post의 row 생성
    create_default_post(session)

    # like 증가 로직
    increase_like(session)

    post = session.query(Post).filter(Post.pk == 1).first()
    result = post.like if post else 0

    return {"message": "/inc : Concurrent increment completed", "like_result": result}

 

    3) 쓰레드 테스트 - 세션의 동시 접근 문제 발생

      - /inc 엔드포인트로 like를 초기화하고 /2th 엔드포인트를 실행한다.

      - 하나의 세션으로 실행하면 에러가 발생한다. 두개의 db 세션을 사용하여 실행해야 한다.

      - 2개의 쓰레드를 동작시키면 25 + 25 = 50개가 쌓여야 하지만, 실제 27개만 쌓인다.

@core_router.get("/2th")
def concurrent_increment(session: Session = Depends(get_db)):

    create_default_post(session)

    t1 = threading.Thread(target=increase_like, args=(next(get_db()),))
    t2 = threading.Thread(target=increase_like, args=(next(get_db()),))

    t1.start()
    t2.start()

    t1.join()
    t2.join()

    post = session.query(Post).filter(Post.pk == 1).first()
    result = post.like if post else 0

    return {
        "message": "/2th : Concurrent increment completed",
        "like_result": result,
    }

 

2. 비관적 / 낙관적 잠금(Pessimistic / Optimistic Lock)

   1) Pessimistic Lock, 비관적 락

      - /inc 엔드포인트로 like를 초기화하고 /2th-plock 엔드포인트를 실행한다.

      - 2개의 쓰레드를 동작시키면 25 + 25 = 50개가 쌓인다.

def increase_like_with_pessimistic_lock(session: Session):
    thread_id = threading.get_ident()  # 현재 스레드 ID
    logger = setup_logger(thread_id)  # 스레드별 로거 설정
    session_id = id(session)
    cnt = 0
    for _ in range(25):
        try:
            cnt = cnt + 1
            post = session.query(Post).filter(Post.pk == 1).with_for_update().first()

            if post:
                logger.info(
                    f"Thread {thread_id} (Session ID: {session_id}) acquired lock. Before ---- increment: like = {post.like}"
                )
                post.like += 1
                session.commit()
                logger.info(
                    f"Thread {thread_id} (Session ID: {session_id}) released lock. After **** commit: like = {post.like}"
                )
                session.close()

        except Exception as e:
            logger.info(f"Thread {thread_id} encountered an error: {e}")
            session.rollback()
    logger.info(
        f"Thread {thread_id} (Session ID: {session_id}) end of thread cnt = {cnt}"
    )
    

@core_router.get("/2th-plock")
def concurrent_increment_pessimistic_lock(session: Session = Depends(get_db)):

    create_default_post(session)

    t1 = threading.Thread(
        target=increase_like_with_pessimistic_lock, args=(next(get_db()),)
    )
    t2 = threading.Thread(
        target=increase_like_with_pessimistic_lock, args=(next(get_db()),)
    )
    t1.start()
    t2.start()

    t1.join()
    t2.join()

    post = session.query(Post).filter(Post.pk == 1).first()
    result = post.like if post else 0

    return {
        "message": "/2th-plock : pessimistic lock increment completed",
        "like_result": result,
    }
    }

 

   2) version을 명시하는 Optimistic Lock

      - 테이블에 version 필드를 추가한다.

class Post(Base):
    __tablename__ = "posts"

    pk = Column(Integer, primary_key=True, index=True)
    like = Column(Integer, server_default=text("'0'"))
    version = Column(INTEGER(unsigned=True), server_default=text("'0'"), nullable=False)
    modified_at = Column(
        DateTime(timezone=True), default=func.now(), onupdate=func.now()
    )

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.modified_at = func.now()
engine = create_engine(
    settings.DATABASE_URL,
    pool_pre_ping=True,  # 연결 상태 확인
    pool_size=5,  # 커넥션 풀 크기
    max_overflow=10,  # 최대 초과 커넥션
    echo=settings.SQL_DEBUG,  # SQL 로깅
)
...
SELECT posts.pk AS posts_pk, posts."like" AS posts_like, posts.version 
AS posts_version, posts.modified_at AS posts_modified_at 
FROM posts 
WHERE posts.pk = %(pk_1)s 
 LIMIT %(param_1)s
UPDATE posts SET "like"=%(like)s, version=%(version)s, 
modified_at=now() WHERE posts.pk = %(pk_1)s AND posts.version = %(version_1)s
...
@core_router.get("/2th-olock")
def increment_likes_optimistic_lock(session: Session = Depends(get_db)):
    # post 테이블이 비어있으면 기본값을 가지는 post의 row 생성
    create_default_post(session)

    t1 = threading.Thread(
        target=increase_like_by_optimistic_lock, args=(next(get_db()),)
    )
    t2 = threading.Thread(
        target=increase_like_by_optimistic_lock, args=(next(get_db()),)
    )
    t1.start()
    t2.start()

    t1.join()
    t2.join()

    post = session.query(Post).filter(Post.pk == 1).first()
    result = post.like if post else 0

    return {
        "message": "/2th-olock : optimistic lock increment completed",
        "like_result": result,
    }
def increase_like_by_optimistic_lock(session: Session):
    thread_id = threading.get_ident()  # 현재 스레드 ID
    logger = setup_logger(thread_id)  # 스레드별 로거 설정
    session_id = id(session)
    cnt = 0
    range_cnt = 25
    while cnt < range_cnt:
        cnt = cnt + 1
        post = session.query(Post).filter(Post.pk == 1).first()
        logger.info(
            f"Thread {thread_id} (Session ID: {session_id}) acquired lock. Before ---- increment: like = {post.like}"
        )
        result = (
            session.query(Post)
            .filter(Post.pk == post.pk, Post.version == post.version)
            .update({Post.like: post.like + 1, Post.version: post.version + 1})
        )

        if result == 0:
            logger.error(
                f"Thread {thread_id} (Session ID: {session_id}) failed to update: version mismatch."
            )
            session.rollback()  # 롤백
            cnt -= 1
            continue

        updated_post = session.query(Post).filter(Post.pk == post.pk).first()

        if not bool(result):
            session.rollback()
        else:
            session.commit()
            logger.info(
                f"Thread {thread_id} (Session ID: {session_id}) released lock. After **** commit: like = {updated_post.like}"
            )
            logger.info(
                f"Thread {thread_id} (Session ID: {session_id}) end of thread cnt = {cnt}"
            )
            session.close()

 

   3) 문제 확인 및 보완

      - sqlalchemy에서 제공하는 mapper_args를 사용하면 쉽게 버전 필드 관리를 할 수 있다.

def custom_version_generator(current_version):
    return current_version + 1  # 현재 버전에서 1을 증가시킴


class Post(Base):
    __tablename__ = "posts"

    pk = Column(Integer, primary_key=True, index=True)
    like = Column(Integer, server_default=text("'0'"))
    version = Column(INTEGER(unsigned=True), server_default=text("'0'"), nullable=False)
    modified_at = Column(
        DateTime(timezone=True), default=func.now(), onupdate=func.now()
    )

    # 심플 설정 방법
    # __mapper_args__ = {"version_id_col": version}

    __mapper_args__ = {
        "version_id_col": version,
        "version_id_generator": custom_version_generator,  # 커스텀 버전 증가 함수 설정
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.modified_at = func.now()

 

      - 테스트 결과 버전 관리가 되지만, 제대로 lock 관리가 되지 않는다. 정확한 핵심 원인 분석은 못했다.

 

      - 검색과 gpt에서 제안하는 방법들으로 답을 얻지 못했다. 어떤 방법으로도 version을 가져오면 lock을 거는 방법을 확인할 수 없었다. 물론, row query를 이용해 db lock을 걸 수 있지만, fastapi 혹은 sqlalchemy의 기능만으로 가능할 수 있을까란 생각에 해답을 얻을 수 없었다.

 

      - wait_for_update와 version을 함께 쓰면 상호 보완을 할 수 있을것이다. 하지만 version만으로는 어플리케이션과 라이브러리단에서 제공하는 기술로는 해결 방법을 찾지 못했다.

def increase_like_by_optimistic_lock_sqlalchemy_versioning(session: Session):
    thread_id = threading.get_ident()  # 현재 스레드 ID
    logger = setup_logger(thread_id)  # 스레드별 로거 설정
    session_id = id(session)
    cnt = 0
    range_cnt = 25  # 반복할 횟수

    while cnt < range_cnt:
        cnt += 1
        try:
            # 현재 Post 객체를 가져오고 Pessimistic Locking을 적용
            # post = session.query(Post).filter(Post.pk == 1).with_for_update().one()
            post = session.query(Post).filter(Post.pk == 1).one()
            logger.info(
                f"Thread {thread_id} (Session ID: {session_id}) acquired lock. Before ---- increment: like = {post.like}, version = {post.version}"
            )

            # 현재 버전 저장
            current_version = post.version

            # like 수 증가
            post.like += 1
            session.commit()  # 변경 사항 커밋

            # 커밋 후 다시 Post 객체를 가져와서 버전 확인
            updated_post = session.query(Post).filter(Post.pk == 1).one()

            if updated_post.version != current_version + 1:
                logger.error(
                    f"Thread {thread_id} (Session ID: {session_id}) failed to update: version mismatch."
                )
                session.rollback()  # 롤백
                continue  # 다음 반복으로 넘어감

            logger.info(
                f"Thread {thread_id} (Session ID: {session_id}) released lock. After **** commit: like = {updated_post.like}, version = {updated_post.version}"
            )
            logger.info(
                f"Thread {thread_id} (Session ID: {session_id}) end of thread cnt = {cnt}"
            )
        except Exception as e:
            logger.error(
                f"Thread {thread_id} (Session ID: {session_id}) encountered an error: {e}"
            )
            session.rollback()  # 오류 발생 시 롤백
        finally:
            session.close()  # 세션 닫기

 

   4) Optimistic Lock - retry

      - backoff 라이브러리가 있지만 현재 3.12 버전에서는 동작하지 않는다.

      - 상위 코드의 result를 확인하여 처리하는 것만으로도 제대로 된 동작을 하는 것으로 확인된다.

      - 대안으로 tenacity를 사용하는 것을 추천한다.

 

   5) 추가 내용 정리

      - fastapi와 sqlalchemy를 사용하며 적용 가능한 lock에 대한 정보를 많이 살펴봤다. sqlalchemy의 map과 관련되어 게시된 대부분의 blog/web 문서들의 내용은 폐기된 모듈인 경우가 많았고, fastapi에서 충돌 등으로 제대로 동작을 안하는 경우가 많았다. 제일 안전한 것은 DB에 직접 lock 제한을 설정하는 것이고, 이 외에는 select_for_update가 되고 이후 version을 이용한 방법이 될것 같다. 

      - sqlalchemy의 version 스키마 테이블을 별도로 만들어 alembic에 적용하여 사용하는 방법이 있었으나 사용하기에 복잡도가 있어 사용할 생각이 별로 들지 않았다.

 

 - reference의 내용이 적합하지 않아 직접 테스트하여 fastapi에 적합한 구조로 변경, 테스트 하였다.

   - 저장소 : https://github.com/bluebamus/fastapi-sqlalchemy-concurrency

 

 

- reference : 

https://jakpentest.tistory.com/entry/SQLAlchemy-PessimisticOptimistic-Lock

 

[SQLAlchemy] Pessimistic/Optimistic Lock

HTML 삽입 미리보기할 수 없는 소스 개요 웹 서버는 여러 Client의 요청을 동시에 수행할 수 있어야 한다. 한 자원을 놓고 동시에 요청이 실행한다면 어떤 문제가 발생할 수 있을까? 위와 같은 주제

jakpentest.tistory.com

https://docs.sqlalchemy.org/en/20/orm/session_transaction.html

 

Transactions and Connection Management — SQLAlchemy 2.0 Documentation

Transactions and Connection Management Managing Transactions Changed in version 1.4: Session transaction management has been revised to be clearer and easier to use. In particular, it now features “autobegin” operation, which means the point at which a

docs.sqlalchemy.org

https://docs.sqlalchemy.org/en/20/orm/versioning.html

 

Configuring a Version Counter — SQLAlchemy 2.0 Documentation

Configuring a Version Counter The Mapper supports management of a version id column, which is a single table column that increments or otherwise updates its value each time an UPDATE to the mapped table occurs. This value is checked each time the ORM emits

docs.sqlalchemy.org

 

댓글