SQLAlchemy

[SQLALchemy] 비동기 함수, 비동기 Database 그리고 Pool의 정리

bluebamus 2025. 2. 1.

1. 개요

   - FastAPI는 비동기(Async)를 지원하는 웹 프레임워크이므로, 데이터베이스 처리에서도 비동기 접근이 가능하다.

   - SQLAlchemy는 기본적으로 동기식 ORM이지만, SQLAlchemy 1.4부터 비동기 지원이 추가되었다.

   - PostgreSQL을 사용할 때, 동기식 접근은 psycopg2을, 비동기식 접근은 asyncpg를 사용하는 것이 일반적이다.

 

2. 주요 차이점

항목 동기식 (Synchronous) 비동기식 (Asynchronous)
라이브러리 psycopg2, sqlalchemy asyncpg, sqlalchemy.ext.asyncio
데이터베이스 엔진 create_engine() create_async_engine()
세션 관리 Session (동기) AsyncSession (비동기)
쿼리 실행 session.execute() await session.execute()
FastAPI 엔드포인트 동기 함수 사용 (def) 비동기 함수 사용 (async def)
성능 블로킹 발생 가능 고성능 비동기 처리

 

3. FastAPI에서 비동기 Database 설정 방법

   1) 동기식(SQLAlchemy + psycopg2)

 

pip install fastapi asyncpg sqlalchemy alembic

 

   2) 동기식 설정

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

# PostgreSQL 연결 URL 설정
DATABASE_URL = "postgresql://user:password@localhost:5432/database"

# SQLAlchemy engine 생성
engine = create_engine(DATABASE_URL, echo=True)

# 세션 생성을 위한 sessionmaker 설정
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# Base 클래스 설정 (선택 사항)
Base = declarative_base()

 

   3) 비동기식(SQLAlchemy + asyncpg)

pip install fastapi sqlalchemy[asyncio] asyncpg alembic

 

   4) 비동기식 설정

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

# PostgreSQL 연결 URL 설정
DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/database"

# Async SQLAlchemy engine 생성
engine = create_async_engine(DATABASE_URL, echo=True)

# 세션 생성을 위한 sessionmaker 설정
async_session = sessionmaker(
    bind=engine,
    class_=AsyncSession,
    expire_on_commit=False,
    autocommit=False,
    autoflush=False
)

# Base 클래스 설정 (선택 사항)
Base = declarative_base()

 

   5) 비동기 함수를 이용한 데이터베이스 세션 관리

       - FastAPI에서 비동기 함수를 사용하여 데이터베이스 세션을 관리합니다. 이를 통해 비동기 쿼리를 실행할 수 있다.

from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import SQLAlchemyError
from typing import AsyncGenerator

app = FastAPI()

# 데이터베이스 세션을 비동기로 관리하는 함수
async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
    async with async_session() as session:
        try:
            yield session
        except SQLAlchemyError as e:
            await session.rollback()
            raise e

 

4. CRUD 사용 방법

   1) 동기식 CRUD 함수

from sqlalchemy.orm import Session
from models import User

def get_user(db: Session, user_id: int):
    return db.query(User).filter(User.id == user_id).first()

def create_user(db: Session, name: str):
    db_user = User(name=name)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

 

   2) 비동기식 CRUD 함수

from sqlalchemy.ext.asyncio import AsyncSession
from models import User
from sqlalchemy.future import select

async def get_user(db: AsyncSession, user_id: int):
    result = await db.execute(select(User).filter(User.id == user_id))
    return result.scalar_one_or_none()

async def create_user(db: AsyncSession, name: str):
    db_user = User(name=name)
    db.add(db_user)
    await db.commit()
    await db.refresh(db_user)
    return db_user

 

5. 비동기 Pool 설정 및 사용 방법

   1) Async SQLAlchemy Engine와 Pool 설정

      - FastAPI에서 비동기적으로 PostgreSQL에 접속하기 위해 Async SQLAlchemy Engine을 설정하고 비동기 Pool을 구성한다. 

from fastapi import FastAPI
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.pool import AsyncPool
from sqlalchemy.exc import SQLAlchemyError

app = FastAPI()

# PostgreSQL 연결 URL 설정
DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/database"

# Async SQLAlchemy engine 및 pool 생성
engine = create_async_engine(
    DATABASE_URL,
    pool_size=10,      # 최대 동시 연결 수
    max_overflow=20,   # 추가 연결 허용 수
    poolclass=AsyncPool,
    echo=True          # 디버그용 로그 활성화
)

# 세션 생성을 위한 sessionmaker 설정
async_session = sessionmaker(
    bind=engine,
    class_=AsyncSession,
    expire_on_commit=False,
    autocommit=False,  # Autocommit 비활성화
    autoflush=False    # Autoflush 비활성화
)

# Base 클래스 설정 (선택 사항)
Base = declarative_base()

# 데이터베이스 세션을 비동기로 관리하는 함수
async def get_db_session() -> AsyncSession:
    async with async_session() as session:
        try:
            yield session
        except SQLAlchemyError as e:
            await session.rollback()
            raise e

 

6. SQLAlchemy의 close() vs dispose() 차이점

   - SQLAlchemy에서 데이터베이스 연결을 관리할 때 close()와 dispose()는 각각 다른 역할을 수행한다.
   - 둘 다 연결을 정리하는 기능을 하지만, 동작 방식과 적용 범위가 다르다.

 

   1) 개념 정리

메서드 설명 실행 방식 Connection Pool 영향
close() 개별 데이터베이스 세션(DB connection)을 Connection Pool에 반환 세션이 닫히지만 풀에서 재사용 가능 Connection Pool은 그대로 유지
dispose() Connection Pool 자체를 해제하고 모든 연결을 닫음 모든 연결이 강제 종료됨 Connection Pool을 완전히 제거

 

   2) close() - 개별 연결(Session/Connection) 닫기

      - close()는 개별 연결 또는 세션을 정리할 때 사용된다.

      - db.close()를 호출하면 현재 세션이 종료된다.
      - 실제 연결(Connection)은 닫히지 않고 Connection Pool에 반환된다.
      - 새로운 요청이 오면 같은 연결을 재사용할 수 있다.

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "postgresql://user:password@localhost:5432/mydatabase"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)

# 데이터베이스 세션 생성
db = SessionLocal()

# 작업 수행
result = db.execute("SELECT 1")

# 세션 종료
db.close()

 

   3) dispose() - 엔진과 연결 풀 전체 해제

      - dispose()는 전체 엔진과 연결 풀을 제거하는 데 사용된다.

      - 기존의 모든 활성 연결을 강제로 닫고, 연결 풀을 완전히 제거한다.
      - 이후 새로운 데이터베이스 요청이 발생하면, 새로운 연결 풀이 생성된다.
      - 일반적으로 애플리케이션 종료 시점에 호출된다.

from sqlalchemy import create_engine

DATABASE_URL = "postgresql://user:password@localhost:5432/mydatabase"
engine = create_engine(DATABASE_URL)

# Connection Pool 종료
engine.dispose()

 

   4) 비동기식에서의 차이점

      - 비동기식 close() 사용 예시

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydatabase"
async_engine = create_async_engine(DATABASE_URL)
AsyncSessionLocal = sessionmaker(bind=async_engine, class_=AsyncSession)

async def main():
    async with AsyncSessionLocal() as session:
        await session.execute("SELECT 1")  # 데이터 조회
        await session.close()  # Connection Pool에 반환

# 실행
import asyncio
asyncio.run(main())

 

      - 비동기식 dispose() 사용 예시

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine

DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydatabase"
async_engine = create_async_engine(DATABASE_URL)

async def shutdown():
    await async_engine.dispose()  # 모든 연결을 닫고 Connection Pool 삭제

# 실행
asyncio.run(shutdown())

 

   5) close() vs dispose() 차이점 정리

항목 close() (세션 반환) dispose() (Pool 종료)
대상 개별 DB 연결 전체 Connection Pool
동작 방식 Connection Pool에 반환 Pool 내부의 모든 연결 해제
실제 연결 종료 여부 ❌ (연결 유지, 재사용 가능) ✅ (모든 연결 강제 종료)
사용 시점 세션을 더 이상 사용하지 않을 때 애플리케이션 종료 또는 DB 연결 재설정 필요 시
성능 영향 낮음 (빠른 재사용 가능) 높음 (새로운 Pool 생성 필요)
비동기 지원 await session.close() await engine.dispose()

 

   6) 어떤 경우에 사용해야 할까?

      - close()를 사용할 때 (일반적인 경우)
         - 데이터베이스 연결을 닫고 싶지만, Connection Pool을 유지해야 할 때
         - 같은 애플리케이션 내에서 재사용할 가능성이 있는 경우
         - 매 요청마다 새 연결을 열고 닫는 것보다 성능이 우수함


      - dispose()를 사용할 때 (예외적인 경우)
         - 애플리케이션이 종료될 때
         - DB 연결을 강제로 초기화해야 할 때
         - 데이터베이스 설정이 변경되어 기존 연결을 모두 닫아야 할 때

댓글