[SQLALchemy] 트랜잭션(Transaction) 그리고 Database Lock
1. 동시성 문제 구현 (Concurrency Implements)
1) 테스트 테이블 정의
- fastapi와 sqlalchemy를 기반으로 한 talbe을 정의한다.
from fastapi import FastAPI, HTTPException, Depends, status
from sqlalchemy import Column, Integer, text, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import func
Base = declarative_base()
# 모델 정의
class Post(Base):
__tablename__ = "posts"
pk = Column(Integer, primary_key=True, index=True)
like = Column(Integer, server_default=text("'0'"))
modified_at = Column(
DateTime(timezone=True), default=func.now(), onupdate=func.now()
)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.modified_at = func.now()
2) 기본 테스트 - like 증가
- /inc 엔드포인트를 이용해 기본적인 증가 테스트를 진행하면 25의 like가 발생하는 것을 확인할 수 있다.
def increase_like(session: Session):
for _ in range(25):
post = session.query(Post).filter(Post.pk == 1).first()
if post:
print(f"Before increment: like = {post.like}")
post.like += 1
print(f"After increment: like = {post.like}")
session.commit()
print(f"After commit: like = {post.like}")
# 기본값을 가지는 post의 row를 생성하는 함수
def create_default_post(session: Session):
# 이미 존재하는지 확인
post = session.query(Post).filter(Post.pk == 1).first()
if not post:
# 기본값으로 새로운 Post 레코드 생성
new_post = Post(like=0) # 예시로 pk가 1인 새로운 post 생성
session.add(new_post)
session.commit()
@core_router.get("/inc")
def increment_likes(session: Session = Depends(get_db)):
# post 테이블이 비어있으면 기본값을 가지는 post의 row 생성
create_default_post(session)
# like 증가 로직
increase_like(session)
post = session.query(Post).filter(Post.pk == 1).first()
result = post.like if post else 0
return {"message": "/inc : Concurrent increment completed", "like_result": result}
3) 쓰레드 테스트 - 세션의 동시 접근 문제 발생
- /inc 엔드포인트로 like를 초기화하고 /2th 엔드포인트를 실행한다.
- 하나의 세션으로 실행하면 에러가 발생한다. 두개의 db 세션을 사용하여 실행해야 한다.
- 2개의 쓰레드를 동작시키면 25 + 25 = 50개가 쌓여야 하지만, 실제 27개만 쌓인다.
@core_router.get("/2th")
def concurrent_increment(session: Session = Depends(get_db)):
create_default_post(session)
t1 = threading.Thread(target=increase_like, args=(next(get_db()),))
t2 = threading.Thread(target=increase_like, args=(next(get_db()),))
t1.start()
t2.start()
t1.join()
t2.join()
post = session.query(Post).filter(Post.pk == 1).first()
result = post.like if post else 0
return {
"message": "/2th : Concurrent increment completed",
"like_result": result,
}
2. 비관적 / 낙관적 잠금(Pessimistic / Optimistic Lock)
1) Pessimistic Lock, 비관적 락
- /inc 엔드포인트로 like를 초기화하고 /2th-plock 엔드포인트를 실행한다.
- 2개의 쓰레드를 동작시키면 25 + 25 = 50개가 쌓인다.
def increase_like_with_pessimistic_lock(session: Session):
thread_id = threading.get_ident() # 현재 스레드 ID
logger = setup_logger(thread_id) # 스레드별 로거 설정
session_id = id(session)
cnt = 0
for _ in range(25):
try:
cnt = cnt + 1
post = session.query(Post).filter(Post.pk == 1).with_for_update().first()
if post:
logger.info(
f"Thread {thread_id} (Session ID: {session_id}) acquired lock. Before ---- increment: like = {post.like}"
)
post.like += 1
session.commit()
logger.info(
f"Thread {thread_id} (Session ID: {session_id}) released lock. After **** commit: like = {post.like}"
)
session.close()
except Exception as e:
logger.info(f"Thread {thread_id} encountered an error: {e}")
session.rollback()
logger.info(
f"Thread {thread_id} (Session ID: {session_id}) end of thread cnt = {cnt}"
)
@core_router.get("/2th-plock")
def concurrent_increment_pessimistic_lock(session: Session = Depends(get_db)):
create_default_post(session)
t1 = threading.Thread(
target=increase_like_with_pessimistic_lock, args=(next(get_db()),)
)
t2 = threading.Thread(
target=increase_like_with_pessimistic_lock, args=(next(get_db()),)
)
t1.start()
t2.start()
t1.join()
t2.join()
post = session.query(Post).filter(Post.pk == 1).first()
result = post.like if post else 0
return {
"message": "/2th-plock : pessimistic lock increment completed",
"like_result": result,
}
}
2) version을 명시하는 Optimistic Lock
- 테이블에 version 필드를 추가한다.
class Post(Base):
__tablename__ = "posts"
pk = Column(Integer, primary_key=True, index=True)
like = Column(Integer, server_default=text("'0'"))
version = Column(INTEGER(unsigned=True), server_default=text("'0'"), nullable=False)
modified_at = Column(
DateTime(timezone=True), default=func.now(), onupdate=func.now()
)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.modified_at = func.now()
engine = create_engine(
settings.DATABASE_URL,
pool_pre_ping=True, # 연결 상태 확인
pool_size=5, # 커넥션 풀 크기
max_overflow=10, # 최대 초과 커넥션
echo=settings.SQL_DEBUG, # SQL 로깅
)
...
SELECT posts.pk AS posts_pk, posts."like" AS posts_like, posts.version
AS posts_version, posts.modified_at AS posts_modified_at
FROM posts
WHERE posts.pk = %(pk_1)s
LIMIT %(param_1)s
UPDATE posts SET "like"=%(like)s, version=%(version)s,
modified_at=now() WHERE posts.pk = %(pk_1)s AND posts.version = %(version_1)s
...
@core_router.get("/2th-olock")
def increment_likes_optimistic_lock(session: Session = Depends(get_db)):
# post 테이블이 비어있으면 기본값을 가지는 post의 row 생성
create_default_post(session)
t1 = threading.Thread(
target=increase_like_by_optimistic_lock, args=(next(get_db()),)
)
t2 = threading.Thread(
target=increase_like_by_optimistic_lock, args=(next(get_db()),)
)
t1.start()
t2.start()
t1.join()
t2.join()
post = session.query(Post).filter(Post.pk == 1).first()
result = post.like if post else 0
return {
"message": "/2th-olock : optimistic lock increment completed",
"like_result": result,
}
def increase_like_by_optimistic_lock(session: Session):
thread_id = threading.get_ident() # 현재 스레드 ID
logger = setup_logger(thread_id) # 스레드별 로거 설정
session_id = id(session)
cnt = 0
range_cnt = 25
while cnt < range_cnt:
cnt = cnt + 1
post = session.query(Post).filter(Post.pk == 1).first()
logger.info(
f"Thread {thread_id} (Session ID: {session_id}) acquired lock. Before ---- increment: like = {post.like}"
)
result = (
session.query(Post)
.filter(Post.pk == post.pk, Post.version == post.version)
.update({Post.like: post.like + 1, Post.version: post.version + 1})
)
if result == 0:
logger.error(
f"Thread {thread_id} (Session ID: {session_id}) failed to update: version mismatch."
)
session.rollback() # 롤백
cnt -= 1
continue
updated_post = session.query(Post).filter(Post.pk == post.pk).first()
if not bool(result):
session.rollback()
else:
session.commit()
logger.info(
f"Thread {thread_id} (Session ID: {session_id}) released lock. After **** commit: like = {updated_post.like}"
)
logger.info(
f"Thread {thread_id} (Session ID: {session_id}) end of thread cnt = {cnt}"
)
session.close()
3) 문제 확인 및 보완
- sqlalchemy에서 제공하는 mapper_args를 사용하면 쉽게 버전 필드 관리를 할 수 있다.
def custom_version_generator(current_version):
return current_version + 1 # 현재 버전에서 1을 증가시킴
class Post(Base):
__tablename__ = "posts"
pk = Column(Integer, primary_key=True, index=True)
like = Column(Integer, server_default=text("'0'"))
version = Column(INTEGER(unsigned=True), server_default=text("'0'"), nullable=False)
modified_at = Column(
DateTime(timezone=True), default=func.now(), onupdate=func.now()
)
# 심플 설정 방법
# __mapper_args__ = {"version_id_col": version}
__mapper_args__ = {
"version_id_col": version,
"version_id_generator": custom_version_generator, # 커스텀 버전 증가 함수 설정
}
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.modified_at = func.now()
- 테스트 결과 버전 관리가 되지만, 제대로 lock 관리가 되지 않는다. 정확한 핵심 원인 분석은 못했다.
- 검색과 gpt에서 제안하는 방법들으로 답을 얻지 못했다. 어떤 방법으로도 version을 가져오면 lock을 거는 방법을 확인할 수 없었다. 물론, row query를 이용해 db lock을 걸 수 있지만, fastapi 혹은 sqlalchemy의 기능만으로 가능할 수 있을까란 생각에 해답을 얻을 수 없었다.
- wait_for_update와 version을 함께 쓰면 상호 보완을 할 수 있을것이다. 하지만 version만으로는 어플리케이션과 라이브러리단에서 제공하는 기술로는 해결 방법을 찾지 못했다.
def increase_like_by_optimistic_lock_sqlalchemy_versioning(session: Session):
thread_id = threading.get_ident() # 현재 스레드 ID
logger = setup_logger(thread_id) # 스레드별 로거 설정
session_id = id(session)
cnt = 0
range_cnt = 25 # 반복할 횟수
while cnt < range_cnt:
cnt += 1
try:
# 현재 Post 객체를 가져오고 Pessimistic Locking을 적용
# post = session.query(Post).filter(Post.pk == 1).with_for_update().one()
post = session.query(Post).filter(Post.pk == 1).one()
logger.info(
f"Thread {thread_id} (Session ID: {session_id}) acquired lock. Before ---- increment: like = {post.like}, version = {post.version}"
)
# 현재 버전 저장
current_version = post.version
# like 수 증가
post.like += 1
session.commit() # 변경 사항 커밋
# 커밋 후 다시 Post 객체를 가져와서 버전 확인
updated_post = session.query(Post).filter(Post.pk == 1).one()
if updated_post.version != current_version + 1:
logger.error(
f"Thread {thread_id} (Session ID: {session_id}) failed to update: version mismatch."
)
session.rollback() # 롤백
continue # 다음 반복으로 넘어감
logger.info(
f"Thread {thread_id} (Session ID: {session_id}) released lock. After **** commit: like = {updated_post.like}, version = {updated_post.version}"
)
logger.info(
f"Thread {thread_id} (Session ID: {session_id}) end of thread cnt = {cnt}"
)
except Exception as e:
logger.error(
f"Thread {thread_id} (Session ID: {session_id}) encountered an error: {e}"
)
session.rollback() # 오류 발생 시 롤백
finally:
session.close() # 세션 닫기
4) Optimistic Lock - retry
- backoff 라이브러리가 있지만 현재 3.12 버전에서는 동작하지 않는다.
- 상위 코드의 result를 확인하여 처리하는 것만으로도 제대로 된 동작을 하는 것으로 확인된다.
- 대안으로 tenacity를 사용하는 것을 추천한다.
5) 추가 내용 정리
- fastapi와 sqlalchemy를 사용하며 적용 가능한 lock에 대한 정보를 많이 살펴봤다. sqlalchemy의 map과 관련되어 게시된 대부분의 blog/web 문서들의 내용은 폐기된 모듈인 경우가 많았고, fastapi에서 충돌 등으로 제대로 동작을 안하는 경우가 많았다. 제일 안전한 것은 DB에 직접 lock 제한을 설정하는 것이고, 이 외에는 select_for_update가 되고 이후 version을 이용한 방법이 될것 같다.
- sqlalchemy의 version 스키마 테이블을 별도로 만들어 alembic에 적용하여 사용하는 방법이 있었으나 사용하기에 복잡도가 있어 사용할 생각이 별로 들지 않았다.
- reference의 내용이 적합하지 않아 직접 테스트하여 fastapi에 적합한 구조로 변경, 테스트 하였다.
- 저장소 : https://github.com/bluebamus/fastapi-sqlalchemy-concurrency
- reference :
https://jakpentest.tistory.com/entry/SQLAlchemy-PessimisticOptimistic-Lock
[SQLAlchemy] Pessimistic/Optimistic Lock
HTML 삽입 미리보기할 수 없는 소스 개요 웹 서버는 여러 Client의 요청을 동시에 수행할 수 있어야 한다. 한 자원을 놓고 동시에 요청이 실행한다면 어떤 문제가 발생할 수 있을까? 위와 같은 주제
jakpentest.tistory.com
https://docs.sqlalchemy.org/en/20/orm/session_transaction.html
Transactions and Connection Management — SQLAlchemy 2.0 Documentation
Transactions and Connection Management Managing Transactions Changed in version 1.4: Session transaction management has been revised to be clearer and easier to use. In particular, it now features “autobegin” operation, which means the point at which a
docs.sqlalchemy.org
https://docs.sqlalchemy.org/en/20/orm/versioning.html
Configuring a Version Counter — SQLAlchemy 2.0 Documentation
Configuring a Version Counter The Mapper supports management of a version id column, which is a single table column that increments or otherwise updates its value each time an UPDATE to the mapped table occurs. This value is checked each time the ORM emits
docs.sqlalchemy.org
'SQLAlchemy' 카테고리의 다른 글
__table_args__ 메소드 정리 (0) | 2025.02.13 |
---|---|
sqlalchemy의 declarative_base()를 fastapi에서 사용하는 이유 (0) | 2025.02.08 |
[SQLALchemy] 비동기 함수, 비동기 Database 그리고 Pool의 정리 (0) | 2025.02.01 |
[SQLALchemy] 세션(Session), 비동기 세션(Async Session) 그리고 scoped_session의 정리 (0) | 2025.02.01 |
[SQLALchemy] 1:1, 1:N, N:M 관계에서 테이블 정의와 relationship의 활용 방법 (0) | 2025.02.01 |
댓글