[SQLALchemy] 비동기 함수, 비동기 Database 그리고 Pool의 정리
1. 개요
- FastAPI는 비동기(Async)를 지원하는 웹 프레임워크이므로, 데이터베이스 처리에서도 비동기 접근이 가능하다.
- SQLAlchemy는 기본적으로 동기식 ORM이지만, SQLAlchemy 1.4부터 비동기 지원이 추가되었다.
- PostgreSQL을 사용할 때, 동기식 접근은 psycopg2을, 비동기식 접근은 asyncpg를 사용하는 것이 일반적이다.
2. 주요 차이점
항목 | 동기식 (Synchronous) | 비동기식 (Asynchronous) |
라이브러리 | psycopg2, sqlalchemy | asyncpg, sqlalchemy.ext.asyncio |
데이터베이스 엔진 | create_engine() | create_async_engine() |
세션 관리 | Session (동기) | AsyncSession (비동기) |
쿼리 실행 | session.execute() | await session.execute() |
FastAPI 엔드포인트 | 동기 함수 사용 (def) | 비동기 함수 사용 (async def) |
성능 | 블로킹 발생 가능 | 고성능 비동기 처리 |
3. FastAPI에서 비동기 Database 설정 방법
1) 동기식(SQLAlchemy + psycopg2)
pip install fastapi asyncpg sqlalchemy alembic
2) 동기식 설정
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# PostgreSQL 연결 URL 설정
DATABASE_URL = "postgresql://user:password@localhost:5432/database"
# SQLAlchemy engine 생성
engine = create_engine(DATABASE_URL, echo=True)
# 세션 생성을 위한 sessionmaker 설정
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Base 클래스 설정 (선택 사항)
Base = declarative_base()
3) 비동기식(SQLAlchemy + asyncpg)
pip install fastapi sqlalchemy[asyncio] asyncpg alembic
4) 비동기식 설정
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# PostgreSQL 연결 URL 설정
DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/database"
# Async SQLAlchemy engine 생성
engine = create_async_engine(DATABASE_URL, echo=True)
# 세션 생성을 위한 sessionmaker 설정
async_session = sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False,
autocommit=False,
autoflush=False
)
# Base 클래스 설정 (선택 사항)
Base = declarative_base()
5) 비동기 함수를 이용한 데이터베이스 세션 관리
- FastAPI에서 비동기 함수를 사용하여 데이터베이스 세션을 관리합니다. 이를 통해 비동기 쿼리를 실행할 수 있다.
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import SQLAlchemyError
from typing import AsyncGenerator
app = FastAPI()
# 데이터베이스 세션을 비동기로 관리하는 함수
async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session() as session:
try:
yield session
except SQLAlchemyError as e:
await session.rollback()
raise e
4. CRUD 사용 방법
1) 동기식 CRUD 함수
from sqlalchemy.orm import Session
from models import User
def get_user(db: Session, user_id: int):
return db.query(User).filter(User.id == user_id).first()
def create_user(db: Session, name: str):
db_user = User(name=name)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
2) 비동기식 CRUD 함수
from sqlalchemy.ext.asyncio import AsyncSession
from models import User
from sqlalchemy.future import select
async def get_user(db: AsyncSession, user_id: int):
result = await db.execute(select(User).filter(User.id == user_id))
return result.scalar_one_or_none()
async def create_user(db: AsyncSession, name: str):
db_user = User(name=name)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user
5. 비동기 Pool 설정 및 사용 방법
1) Async SQLAlchemy Engine와 Pool 설정
- FastAPI에서 비동기적으로 PostgreSQL에 접속하기 위해 Async SQLAlchemy Engine을 설정하고 비동기 Pool을 구성한다.
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.pool import AsyncPool
from sqlalchemy.exc import SQLAlchemyError
app = FastAPI()
# PostgreSQL 연결 URL 설정
DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/database"
# Async SQLAlchemy engine 및 pool 생성
engine = create_async_engine(
DATABASE_URL,
pool_size=10, # 최대 동시 연결 수
max_overflow=20, # 추가 연결 허용 수
poolclass=AsyncPool,
echo=True # 디버그용 로그 활성화
)
# 세션 생성을 위한 sessionmaker 설정
async_session = sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False,
autocommit=False, # Autocommit 비활성화
autoflush=False # Autoflush 비활성화
)
# Base 클래스 설정 (선택 사항)
Base = declarative_base()
# 데이터베이스 세션을 비동기로 관리하는 함수
async def get_db_session() -> AsyncSession:
async with async_session() as session:
try:
yield session
except SQLAlchemyError as e:
await session.rollback()
raise e
6. SQLAlchemy의 close() vs dispose() 차이점
- SQLAlchemy에서 데이터베이스 연결을 관리할 때 close()와 dispose()는 각각 다른 역할을 수행한다.
- 둘 다 연결을 정리하는 기능을 하지만, 동작 방식과 적용 범위가 다르다.
1) 개념 정리
메서드 | 설명 | 실행 방식 | Connection Pool 영향 |
close() | 개별 데이터베이스 세션(DB connection)을 Connection Pool에 반환 | 세션이 닫히지만 풀에서 재사용 가능 | Connection Pool은 그대로 유지 |
dispose() | Connection Pool 자체를 해제하고 모든 연결을 닫음 | 모든 연결이 강제 종료됨 | Connection Pool을 완전히 제거 |
2) close() - 개별 연결(Session/Connection) 닫기
- close()는 개별 연결 또는 세션을 정리할 때 사용된다.
- db.close()를 호출하면 현재 세션이 종료된다.
- 실제 연결(Connection)은 닫히지 않고 Connection Pool에 반환된다.
- 새로운 요청이 오면 같은 연결을 재사용할 수 있다.
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql://user:password@localhost:5432/mydatabase"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
# 데이터베이스 세션 생성
db = SessionLocal()
# 작업 수행
result = db.execute("SELECT 1")
# 세션 종료
db.close()
3) dispose() - 엔진과 연결 풀 전체 해제
- dispose()는 전체 엔진과 연결 풀을 제거하는 데 사용된다.
- 기존의 모든 활성 연결을 강제로 닫고, 연결 풀을 완전히 제거한다.
- 이후 새로운 데이터베이스 요청이 발생하면, 새로운 연결 풀이 생성된다.
- 일반적으로 애플리케이션 종료 시점에 호출된다.
from sqlalchemy import create_engine
DATABASE_URL = "postgresql://user:password@localhost:5432/mydatabase"
engine = create_engine(DATABASE_URL)
# Connection Pool 종료
engine.dispose()
4) 비동기식에서의 차이점
- 비동기식 close() 사용 예시
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydatabase"
async_engine = create_async_engine(DATABASE_URL)
AsyncSessionLocal = sessionmaker(bind=async_engine, class_=AsyncSession)
async def main():
async with AsyncSessionLocal() as session:
await session.execute("SELECT 1") # 데이터 조회
await session.close() # Connection Pool에 반환
# 실행
import asyncio
asyncio.run(main())
- 비동기식 dispose() 사용 예시
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydatabase"
async_engine = create_async_engine(DATABASE_URL)
async def shutdown():
await async_engine.dispose() # 모든 연결을 닫고 Connection Pool 삭제
# 실행
asyncio.run(shutdown())
5) close() vs dispose() 차이점 정리
항목 | close() (세션 반환) | dispose() (Pool 종료) |
대상 | 개별 DB 연결 | 전체 Connection Pool |
동작 방식 | Connection Pool에 반환 | Pool 내부의 모든 연결 해제 |
실제 연결 종료 여부 | ❌ (연결 유지, 재사용 가능) | ✅ (모든 연결 강제 종료) |
사용 시점 | 세션을 더 이상 사용하지 않을 때 | 애플리케이션 종료 또는 DB 연결 재설정 필요 시 |
성능 영향 | 낮음 (빠른 재사용 가능) | 높음 (새로운 Pool 생성 필요) |
비동기 지원 | await session.close() | await engine.dispose() |
6) 어떤 경우에 사용해야 할까?
- close()를 사용할 때 (일반적인 경우)
- 데이터베이스 연결을 닫고 싶지만, Connection Pool을 유지해야 할 때
- 같은 애플리케이션 내에서 재사용할 가능성이 있는 경우
- 매 요청마다 새 연결을 열고 닫는 것보다 성능이 우수함
- dispose()를 사용할 때 (예외적인 경우)
- 애플리케이션이 종료될 때
- DB 연결을 강제로 초기화해야 할 때
- 데이터베이스 설정이 변경되어 기존 연결을 모두 닫아야 할 때
'SQLAlchemy' 카테고리의 다른 글
sqlalchemy의 declarative_base()를 fastapi에서 사용하는 이유 (0) | 2025.02.08 |
---|---|
[SQLALchemy] 트랜잭션(Transaction) 그리고 Database Lock (0) | 2025.02.04 |
[SQLALchemy] 세션(Session), 비동기 세션(Async Session) 그리고 scoped_session의 정리 (0) | 2025.02.01 |
[SQLALchemy] 1:1, 1:N, N:M 관계에서 테이블 정의와 relationship의 활용 방법 (0) | 2025.02.01 |
[SQLALchemy] Alembic을 이용한 마이그레이션 관리 방법 (0) | 2025.01.31 |
댓글