[SQLALchemy] 트랜잭션(Transaction) 그리고 Database Lock
1. 동시성 문제 구현 (Concurrency Implements)
1) 테스트 테이블 정의
- fastapi와 sqlalchemy를 기반으로 한 talbe을 정의한다.
from fastapi import FastAPI, HTTPException, Depends, status
from sqlalchemy import Column, Integer, text, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import func
Base = declarative_base()
# 모델 정의
class Post(Base):
__tablename__ = "posts"
pk = Column(Integer, primary_key=True, index=True)
like = Column(Integer, server_default=text("'0'"))
modified_at = Column(
DateTime(timezone=True), default=func.now(), onupdate=func.now()
)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.modified_at = func.now()
2) 기본 테스트 - like 증가
- /inc 엔드포인트를 이용해 기본적인 증가 테스트를 진행하면 25의 like가 발생하는 것을 확인할 수 있다.
def increase_like(session: Session):
for _ in range(25):
post = session.query(Post).filter(Post.pk == 1).first()
if post:
print(f"Before increment: like = {post.like}")
post.like += 1
print(f"After increment: like = {post.like}")
session.commit()
print(f"After commit: like = {post.like}")
# 기본값을 가지는 post의 row를 생성하는 함수
def create_default_post(session: Session):
# 이미 존재하는지 확인
post = session.query(Post).filter(Post.pk == 1).first()
if not post:
# 기본값으로 새로운 Post 레코드 생성
new_post = Post(like=0) # 예시로 pk가 1인 새로운 post 생성
session.add(new_post)
session.commit()
@core_router.get("/inc")
def increment_likes(session: Session = Depends(get_db)):
# post 테이블이 비어있으면 기본값을 가지는 post의 row 생성
create_default_post(session)
# like 증가 로직
increase_like(session)
post = session.query(Post).filter(Post.pk == 1).first()
result = post.like if post else 0
return {"message": "/inc : Concurrent increment completed", "like_result": result}

3) 쓰레드 테스트 - 세션의 동시 접근 문제 발생
- /inc 엔드포인트로 like를 초기화하고 /2th 엔드포인트를 실행한다.
- 하나의 세션으로 실행하면 에러가 발생한다. 두개의 db 세션을 사용하여 실행해야 한다.
- 2개의 쓰레드를 동작시키면 25 + 25 = 50개가 쌓여야 하지만, 실제 27개만 쌓인다.
@core_router.get("/2th")
def concurrent_increment(session: Session = Depends(get_db)):
create_default_post(session)
t1 = threading.Thread(target=increase_like, args=(next(get_db()),))
t2 = threading.Thread(target=increase_like, args=(next(get_db()),))
t1.start()
t2.start()
t1.join()
t2.join()
post = session.query(Post).filter(Post.pk == 1).first()
result = post.like if post else 0
return {
"message": "/2th : Concurrent increment completed",
"like_result": result,
}

2. 비관적 / 낙관적 잠금(Pessimistic / Optimistic Lock)
1) Pessimistic Lock, 비관적 락
- /inc 엔드포인트로 like를 초기화하고 /2th-plock 엔드포인트를 실행한다.
- 2개의 쓰레드를 동작시키면 25 + 25 = 50개가 쌓인다.
def increase_like_with_pessimistic_lock(session: Session):
thread_id = threading.get_ident() # 현재 스레드 ID
logger = setup_logger(thread_id) # 스레드별 로거 설정
session_id = id(session)
cnt = 0
for _ in range(25):
try:
cnt = cnt + 1
post = session.query(Post).filter(Post.pk == 1).with_for_update().first()
if post:
logger.info(
f"Thread {thread_id} (Session ID: {session_id}) acquired lock. Before ---- increment: like = {post.like}"
)
post.like += 1
session.commit()
logger.info(
f"Thread {thread_id} (Session ID: {session_id}) released lock. After **** commit: like = {post.like}"
)
session.close()
except Exception as e:
logger.info(f"Thread {thread_id} encountered an error: {e}")
session.rollback()
logger.info(
f"Thread {thread_id} (Session ID: {session_id}) end of thread cnt = {cnt}"
)
@core_router.get("/2th-plock")
def concurrent_increment_pessimistic_lock(session: Session = Depends(get_db)):
create_default_post(session)
t1 = threading.Thread(
target=increase_like_with_pessimistic_lock, args=(next(get_db()),)
)
t2 = threading.Thread(
target=increase_like_with_pessimistic_lock, args=(next(get_db()),)
)
t1.start()
t2.start()
t1.join()
t2.join()
post = session.query(Post).filter(Post.pk == 1).first()
result = post.like if post else 0
return {
"message": "/2th-plock : pessimistic lock increment completed",
"like_result": result,
}
}
2) version을 명시하는 Optimistic Lock
- 테이블에 version 필드를 추가한다.
class Post(Base):
__tablename__ = "posts"
pk = Column(Integer, primary_key=True, index=True)
like = Column(Integer, server_default=text("'0'"))
version = Column(INTEGER(unsigned=True), server_default=text("'0'"), nullable=False)
modified_at = Column(
DateTime(timezone=True), default=func.now(), onupdate=func.now()
)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.modified_at = func.now()
engine = create_engine(
settings.DATABASE_URL,
pool_pre_ping=True, # 연결 상태 확인
pool_size=5, # 커넥션 풀 크기
max_overflow=10, # 최대 초과 커넥션
echo=settings.SQL_DEBUG, # SQL 로깅
)
...
SELECT posts.pk AS posts_pk, posts."like" AS posts_like, posts.version
AS posts_version, posts.modified_at AS posts_modified_at
FROM posts
WHERE posts.pk = %(pk_1)s
LIMIT %(param_1)s
UPDATE posts SET "like"=%(like)s, version=%(version)s,
modified_at=now() WHERE posts.pk = %(pk_1)s AND posts.version = %(version_1)s
...
@core_router.get("/2th-olock")
def increment_likes_optimistic_lock(session: Session = Depends(get_db)):
# post 테이블이 비어있으면 기본값을 가지는 post의 row 생성
create_default_post(session)
t1 = threading.Thread(
target=increase_like_by_optimistic_lock, args=(next(get_db()),)
)
t2 = threading.Thread(
target=increase_like_by_optimistic_lock, args=(next(get_db()),)
)
t1.start()
t2.start()
t1.join()
t2.join()
post = session.query(Post).filter(Post.pk == 1).first()
result = post.like if post else 0
return {
"message": "/2th-olock : optimistic lock increment completed",
"like_result": result,
}
def increase_like_by_optimistic_lock(session: Session):
thread_id = threading.get_ident() # 현재 스레드 ID
logger = setup_logger(thread_id) # 스레드별 로거 설정
session_id = id(session)
cnt = 0
range_cnt = 25
while cnt < range_cnt:
cnt = cnt + 1
post = session.query(Post).filter(Post.pk == 1).first()
logger.info(
f"Thread {thread_id} (Session ID: {session_id}) acquired lock. Before ---- increment: like = {post.like}"
)
result = (
session.query(Post)
.filter(Post.pk == post.pk, Post.version == post.version)
.update({Post.like: post.like + 1, Post.version: post.version + 1})
)
if result == 0:
logger.error(
f"Thread {thread_id} (Session ID: {session_id}) failed to update: version mismatch."
)
session.rollback() # 롤백
cnt -= 1
continue
updated_post = session.query(Post).filter(Post.pk == post.pk).first()
if not bool(result):
session.rollback()
else:
session.commit()
logger.info(
f"Thread {thread_id} (Session ID: {session_id}) released lock. After **** commit: like = {updated_post.like}"
)
logger.info(
f"Thread {thread_id} (Session ID: {session_id}) end of thread cnt = {cnt}"
)
session.close()
3) 문제 확인 및 보완
- sqlalchemy에서 제공하는 mapper_args를 사용하면 쉽게 버전 필드 관리를 할 수 있다.
def custom_version_generator(current_version):
return current_version + 1 # 현재 버전에서 1을 증가시킴
class Post(Base):
__tablename__ = "posts"
pk = Column(Integer, primary_key=True, index=True)
like = Column(Integer, server_default=text("'0'"))
version = Column(INTEGER(unsigned=True), server_default=text("'0'"), nullable=False)
modified_at = Column(
DateTime(timezone=True), default=func.now(), onupdate=func.now()
)
# 심플 설정 방법
# __mapper_args__ = {"version_id_col": version}
__mapper_args__ = {
"version_id_col": version,
"version_id_generator": custom_version_generator, # 커스텀 버전 증가 함수 설정
}
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.modified_at = func.now()
- 테스트 결과 버전 관리가 되지만, 제대로 lock 관리가 되지 않는다. 정확한 핵심 원인 분석은 못했다.
- 검색과 gpt에서 제안하는 방법들으로 답을 얻지 못했다. 어떤 방법으로도 version을 가져오면 lock을 거는 방법을 확인할 수 없었다. 물론, row query를 이용해 db lock을 걸 수 있지만, fastapi 혹은 sqlalchemy의 기능만으로 가능할 수 있을까란 생각에 해답을 얻을 수 없었다.
- wait_for_update와 version을 함께 쓰면 상호 보완을 할 수 있을것이다. 하지만 version만으로는 어플리케이션과 라이브러리단에서 제공하는 기술로는 해결 방법을 찾지 못했다.
def increase_like_by_optimistic_lock_sqlalchemy_versioning(session: Session):
thread_id = threading.get_ident() # 현재 스레드 ID
logger = setup_logger(thread_id) # 스레드별 로거 설정
session_id = id(session)
cnt = 0
range_cnt = 25 # 반복할 횟수
while cnt < range_cnt:
cnt += 1
try:
# 현재 Post 객체를 가져오고 Pessimistic Locking을 적용
# post = session.query(Post).filter(Post.pk == 1).with_for_update().one()
post = session.query(Post).filter(Post.pk == 1).one()
logger.info(
f"Thread {thread_id} (Session ID: {session_id}) acquired lock. Before ---- increment: like = {post.like}, version = {post.version}"
)
# 현재 버전 저장
current_version = post.version
# like 수 증가
post.like += 1
session.commit() # 변경 사항 커밋
# 커밋 후 다시 Post 객체를 가져와서 버전 확인
updated_post = session.query(Post).filter(Post.pk == 1).one()
if updated_post.version != current_version + 1:
logger.error(
f"Thread {thread_id} (Session ID: {session_id}) failed to update: version mismatch."
)
session.rollback() # 롤백
continue # 다음 반복으로 넘어감
logger.info(
f"Thread {thread_id} (Session ID: {session_id}) released lock. After **** commit: like = {updated_post.like}, version = {updated_post.version}"
)
logger.info(
f"Thread {thread_id} (Session ID: {session_id}) end of thread cnt = {cnt}"
)
except Exception as e:
logger.error(
f"Thread {thread_id} (Session ID: {session_id}) encountered an error: {e}"
)
session.rollback() # 오류 발생 시 롤백
finally:
session.close() # 세션 닫기
4) Optimistic Lock - retry
- backoff 라이브러리가 있지만 현재 3.12 버전에서는 동작하지 않는다.
- 상위 코드의 result를 확인하여 처리하는 것만으로도 제대로 된 동작을 하는 것으로 확인된다.
- 대안으로 tenacity를 사용하는 것을 추천한다.
5) 추가 내용 정리
- fastapi와 sqlalchemy를 사용하며 적용 가능한 lock에 대한 정보를 많이 살펴봤다. sqlalchemy의 map과 관련되어 게시된 대부분의 blog/web 문서들의 내용은 폐기된 모듈인 경우가 많았고, fastapi에서 충돌 등으로 제대로 동작을 안하는 경우가 많았다. 제일 안전한 것은 DB에 직접 lock 제한을 설정하는 것이고, 이 외에는 select_for_update가 되고 이후 version을 이용한 방법이 될것 같다.
- sqlalchemy의 version 스키마 테이블을 별도로 만들어 alembic에 적용하여 사용하는 방법이 있었으나 사용하기에 복잡도가 있어 사용할 생각이 별로 들지 않았다.
- reference의 내용이 적합하지 않아 직접 테스트하여 fastapi에 적합한 구조로 변경, 테스트 하였다.
- 저장소 : https://github.com/bluebamus/fastapi-sqlalchemy-concurrency
- reference :
https://jakpentest.tistory.com/entry/SQLAlchemy-PessimisticOptimistic-Lock
[SQLAlchemy] Pessimistic/Optimistic Lock
HTML 삽입 미리보기할 수 없는 소스 개요 웹 서버는 여러 Client의 요청을 동시에 수행할 수 있어야 한다. 한 자원을 놓고 동시에 요청이 실행한다면 어떤 문제가 발생할 수 있을까? 위와 같은 주제
jakpentest.tistory.com
https://docs.sqlalchemy.org/en/20/orm/session_transaction.html
Transactions and Connection Management — SQLAlchemy 2.0 Documentation
Transactions and Connection Management Managing Transactions Changed in version 1.4: Session transaction management has been revised to be clearer and easier to use. In particular, it now features “autobegin” operation, which means the point at which a
docs.sqlalchemy.org
https://docs.sqlalchemy.org/en/20/orm/versioning.html
Configuring a Version Counter — SQLAlchemy 2.0 Documentation
Configuring a Version Counter The Mapper supports management of a version id column, which is a single table column that increments or otherwise updates its value each time an UPDATE to the mapped table occurs. This value is checked each time the ORM emits
docs.sqlalchemy.org