7. LangChain 커스텀 함수 구현 샘플 모음 - RAG + LangChain 전 단계
- 본 문서는 LangChain의 기본 내장 함수가 아닌, 커스텀 구현을 통해 각 단계를 제어하는 방법을 시나리오 기반으로 제시한다.
- 커스텀 구현이 필요한 이유
- LangChain은 RAG 파이프라인의 각 단계(문서 로딩, 청킹, 임베딩, 검색, 생성 등)에 대해 범용적인 내장 함수를 제공한다. 그러나 실무 환경에서는 도메인 특성, 데이터 구조, 비용 최적화, 운영 요구사항 등으로 인해 내장 함수만으로는 충분하지 않은 경우가 많다. 커스텀 구현을 통해 각 단계를 세밀하게 제어하면 파이프라인의 품질과 효율을 크게 향상시킬 수 있다.
- 내장 함수 vs 커스텀 구현 비교
| 기준 | LangChain 내장 함수 | 커스텀 구현 |
| 개발 속도 | 빠름 (즉시 사용 가능) | 상대적으로 느림 (직접 개발 필요) |
| 유연성 | 제한적 (범용 설계) | 높음 (도메인 맞춤 설계) |
| 메타데이터 제어 | 기본 메타데이터만 자동 생성 | 필요한 모든 메타데이터 직접 설계 |
| 비용 최적화 | 기본 동작 (캐싱 없음) | 캐싱, 배치 처리 등 비용 최적화 가능 |
| 에러 핸들링 | 범용 에러 처리 | 도메인별 상세 에러 처리 가능 |
| 유지보수 | LangChain 버전 의존 | 독립적 관리 가능 |
| 적합 상황 | 프로토타입, 단순 파이프라인 | 프로덕션, 복잡한 요구사항 |
- 커스텀 구현의 전체 흐름
[커스텀 RAG 파이프라인 아키텍처]
[원본 문서]
↓
[1. SmartDocumentLoader] ─── 다중 포맷 자동 감지 + LLM 메타데이터 추출
↓
[2. AdaptiveChunker] ─────── 문서 유형별 최적 청킹 전략 자동 선택
↓
[3. DomainEnhancedEmbeddings] ── 도메인 용어 확장 + 캐싱
↓
[4. VectorDBManager] ─────── 증분 업데이트 + 버전 관리
↓
[사용자 질의]
↓
[5. QueryTransformer] ────── 사전 변환 → 확장 → 분해
↓
[6. IntelligentRetriever] ── 의도 분석 → 전략 선택 → 다중 검색 → 재순위
↓
[7. DynamicPromptBuilder] ── 질의 유형별 동적 프롬프트 조립
↓
[LLM 답변 생성]
↓
[8. AnswerPostProcessor] ─── 환각 검증 → 재생성 → 출처 추가 → 포맷팅
↓
[최종 답변]
- 실무 권장: 처음부터 모든 단계를 커스텀으로 구현할 필요는 없다. LangChain 내장 함수로 프로토타입을 먼저 만들고, 품질 평가(Retrieval Evaluation)를 통해 병목이 되는 단계를 파악한 뒤 해당 단계만 커스텀으로 교체하는 것이 효율적이다. 이것이 "Modular RAG" 설계의 핵심 원칙이기도 하다.
1. 문서 로딩 커스텀
- 문서 로딩 커스텀이 필요한 상황
- LangChain의 내장 로더(`Docx2txtLoader`, `PyPDFLoader` 등)는 파일을 읽어 텍스트를 추출하고 기본 메타데이터(source, page 등)를 자동 생성한다. 그러나 다음과 같은 상황에서는 커스텀 로더가 필요하다:
| 상황 | 내장 로더의 한계 | 커스텀 로더의 해결 방식 |
| 다중 포맷 문서 통합 | 각 포맷별 로더를 개별 호출해야 함 | 확장자 자동 감지 후 통합 로딩 (PDF/Text/DOCX/CSV 동시 처리) |
| 메타데이터 자동 추출 | source, page 등 기본 정보만 제공 | LLM으로 카테고리, 키워드, 작성일, 중요도 등 풍부한 메타데이터 자동 추출 |
| 전처리 통합 | 로딩 후 별도 전처리 필요 | 로딩과 전처리(노이즈 제거, 정규화, 언어 필터링)를 하나의 파이프라인으로 통합 |
| 비표준 데이터 소스 | 파일 기반만 지원 | DB 쿼리(MySQL/Postgres), API 실시간 호출, 웹 크롤링 등 다양한 소스 지원 |
| 품질 관리 | 빈 문서, 깨진 인코딩 등 처리 없음 | 로딩 시 품질 검증(길이, 인코딩, 중복 체크) 및 자동 필터링/로그 |
- 관련 개념: LangChain Document 객체
- 커스텀 로더를 이해하려면 LangChain의 `Document` 객체를 먼저 이해해야 한다. 모든 로더는 최종적으로 `Document` 객체의 리스트를 반환한다.
from langchain_core.documents import Document
# Document 객체의 기본 구조
doc = Document(
page_content="실제 텍스트 내용", # 문서의 텍스트 (필수)
metadata={ # 문서에 대한 부가 정보 (딕셔너리)
"source": "파일 경로 또는 URL",
"page": 0,
"category": "세법",
# 임의의 메타데이터를 자유롭게 추가 가능
}
)
- 파라미터 정의 기준: `page_content`는 벡터 DB에 임베딩되어 검색에 사용되는 실제 텍스트이다. `metadata`는 검색 시 필터링 조건이나 답변 생성 시 출처 표시에 활용되므로, 프로젝트 초기에 어떤 메타데이터가 필요한지 설계하는 것이 중요하다.
- 시나리오: 다중 포맷 문서를 통합 로딩하되, 메타데이터를 자동 추출
import os
import re
from datetime import datetime
from langchain_core.documents import Document
from langchain_openai import ChatOpenAI
class SmartDocumentLoader:
"""
다중 포맷(docx, pdf, txt, md, json)을 자동 감지하여 로딩하고,
LLM을 활용해 카테고리와 키워드를 자동 추출하는 커스텀 로더.
동작:
1. 파일 확장자에 따라 적절한 파싱 로직 선택
2. 텍스트 추출 후 LLM으로 메타데이터 자동 생성
3. Document 객체 리스트 반환
내장 로더와의 차이점:
- 디렉토리 내 다양한 포맷을 한 번에 로딩
- LLM으로 카테고리/키워드 자동 추출
- 파일 크기, 문자 수 등 운영 메타데이터 자동 추가
"""
def __init__(self, directory: str, llm=None):
# directory: 문서가 저장된 디렉토리 경로
self.directory = directory
# llm: 메타데이터 추출에 사용할 LLM (비용 절감을 위해 gpt-4o-mini 기본값)
self.llm = llm or ChatOpenAI(model="gpt-4o-mini", temperature=0)
# 지원하는 파일 확장자 목록 (필요 시 확장 가능)
self.supported_extensions = {".docx", ".pdf", ".txt", ".md", ".json"}
def load_all(self) -> list[Document]:
"""디렉토리의 모든 지원 파일을 재귀적으로 탐색하여 로딩"""
documents = []
for root, _, files in os.walk(self.directory):
for file in files:
# 파일 확장자 추출 (소문자로 통일하여 대소문자 무관하게 처리)
ext = os.path.splitext(file)[1].lower()
if ext in self.supported_extensions:
filepath = os.path.join(root, file)
docs = self._load_single(filepath, ext)
documents.extend(docs)
return documents
def _load_single(self, filepath: str, ext: str) -> list[Document]:
"""단일 파일을 확장자에 따라 적절한 방식으로 로딩
에러 핸들링:
- 개별 파일 로딩 실패 시 에러를 로깅하고 빈 리스트를 반환
- 이를 통해 하나의 파일 실패가 전체 로딩 프로세스를 중단하지 않음
"""
import logging
logger = logging.getLogger(__name__)
try:
if ext == ".docx":
text = self._load_docx(filepath)
elif ext == ".pdf":
text = self._load_pdf(filepath)
elif ext in (".txt", ".md"):
# 텍스트/마크다운 파일은 직접 읽기
with open(filepath, "r", encoding="utf-8") as f:
text = f.read()
elif ext == ".json":
text = self._load_json(filepath)
else:
return []
# 빈 문서 필터링: 내용이 너무 짧으면 스킵
if not text or len(text.strip()) < 10:
logger.warning(f"빈 문서 스킵: {filepath} (길이: {len(text.strip()) if text else 0})")
return []
# LLM으로 메타데이터 자동 추출 (카테고리, 키워드 등)
metadata = self._extract_metadata(text, filepath)
return [Document(page_content=text, metadata=metadata)]
except UnicodeDecodeError as e:
# 인코딩 문제 — 파일 인코딩이 UTF-8이 아닌 경우
logger.error(f"인코딩 에러 (파일: {filepath}): {e}")
return []
except FileNotFoundError as e:
# 파일이 존재하지 않거나 심볼릭 링크가 깨진 경우
logger.error(f"파일 없음 (파일: {filepath}): {e}")
return []
except Exception as e:
# 예상치 못한 모든 에러를 포착하여 전체 프로세스 보호
logger.error(f"로딩 실패 (파일: {filepath}, 확장자: {ext}): {e}")
return []
def _load_docx(self, path: str) -> str:
"""DOCX 파일에서 텍스트 추출 (python-docx 라이브러리 사용)"""
from docx import Document as DocxDoc
doc = DocxDoc(path)
# 빈 문단은 제외하고 줄바꿈으로 연결
return "\n".join([p.text for p in doc.paragraphs if p.text.strip()])
def _load_pdf(self, path: str) -> str:
"""PDF 파일에서 텍스트 추출 (pypdf 라이브러리 사용)"""
from pypdf import PdfReader
reader = PdfReader(path)
# 모든 페이지의 텍스트를 줄바꿈으로 연결
return "\n".join([page.extract_text() for page in reader.pages])
def _load_json(self, path: str) -> str:
"""JSON 파일을 읽어 포맷팅된 문자열로 변환"""
import json
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
# ensure_ascii=False: 한글 등 유니코드 문자를 그대로 출력
# indent=2: 가독성을 위한 들여쓰기
return json.dumps(data, ensure_ascii=False, indent=2)
def _extract_metadata(self, text: str, filepath: str) -> dict:
"""LLM으로 문서 메타데이터를 자동 추출"""
# 비용 절약을 위해 문서의 앞부분(1000자)만 LLM에 전달
preview = text[:1000]
response = self.llm.invoke(
f"다음 문서의 카테고리(1개)와 핵심 키워드(5개 이내)를 추출하세요.\n"
f"형식: 카테고리: XXX\n키워드: A, B, C\n\n문서:\n{preview}"
)
content = response.content
# 정규표현식으로 카테고리 추출
cat_match = re.search(r'카테고리:\s*(.+)', content)
category = cat_match.group(1).strip() if cat_match else "unknown"
# 정규표현식으로 키워드 추출 (쉼표로 분리)
kw_match = re.search(r'키워드:\s*(.+)', content)
keywords = [k.strip() for k in kw_match.group(1).split(",")] if kw_match else []
return {
"source": filepath, # 파일 경로 (출처 추적용)
"filename": os.path.basename(filepath), # 파일명만 추출
"category": category, # LLM이 추출한 카테고리
"keywords": keywords, # LLM이 추출한 키워드 리스트
"file_size": os.path.getsize(filepath), # 파일 크기 (바이트)
"loaded_at": datetime.now().isoformat(), # 로딩 시각 (운영 추적용)
"char_count": len(text) # 문자 수 (청킹 전략 판단에 활용)
}
- 파라미터 정의 기준: `_extract_metadata`에서 `preview = text[:1000]`으로 앞부분만 사용하는 이유는 LLM API 비용 절감이다. 대부분의 문서는 앞부분에 제목, 요약, 서론이 있어 1000자만으로도 카테고리와 키워드를 충분히 추출할 수 있다. 문서 끝부분의 정보가 중요한 경우(예: 결론이 핵심인 논문)에는 `text[:500] + text[-500:]`처럼 앞뒤를 결합하는 전략도 고려할 수 있다.
- 사용 예시:
# SmartDocumentLoader 초기화 (메타데이터 추출에 gpt-4o-mini 사용)
loader = SmartDocumentLoader("./documents/", llm=ChatOpenAI(model="gpt-4o-mini"))
# 디렉토리 내 모든 지원 파일을 한 번에 로딩
documents = loader.load_all()
# 로딩 결과 확인 (카테고리, 키워드, 크기 등 자동 추출된 메타데이터 출력)
for doc in documents:
print(f"[{doc.metadata['category']}] {doc.metadata['filename']}")
print(f" 키워드: {doc.metadata['keywords']}")
print(f" 크기: {doc.metadata['char_count']}자")
- 커스텀 로더 확장 방법
- 위의 `SmartDocumentLoader`를 기반으로 다양한 확장이 가능하다:
| 확장 기능 | 구현 방법 | 용도 |
| 인코딩 자동 감지 | chardet 또는 cchardet 라이브러리로 파일 인코딩 자동 감지 후 open(encoding=detected) | 인코딩이 다양하거나 명시되지 않은 레거시 문서(EUC-KR/CP949 등 한국어 문서) |
| 빈 문서 필터링 | len(text.strip()) < min_length (기본 50자) 또는 text.isspace() 체크 후 스킵 및 로깅 | 빈 파일, 헤더만 있는 파일, 깨진 인코딩으로 인한 garbage 텍스트 제거 |
| 중복 문서 감지 | 텍스트 해시(hashlib.md5(text.encode()).hexdigest()) 또는 MinHash(LSH)로 유사도 95%↑ 시 스킵 | 동일 문서가 다른 경로/이름으로 중복 저장된 경우, 인덱스 오염 방지 |
| OCR 연동 | pytesseract + pdf2image로 이미지 기반 PDF 첫 페이지 OCR 후 텍스트 추출 | 스캔된 PDF, 이미지 문서, 테이블이 많은 행정/법률 문서 |
| DB/API 소스 | _load_from_db(sql_query), _load_from_api(endpoint, params) 메서드 오버라이드 | 실시간 데이터(MySQL/Postgres 뷰), 외부 API(국세청 API), 동적 콘텐츠 로딩 |
- 실무 권장: 프로덕션 환경에서는 로딩 실패 시 에러를 로깅하고 나머지 파일을 계속 처리하는 try-except 로직을 `_load_single` 메서드에 추가하는 것이 필수적이다. 또한 대량 문서를 로딩할 때는 `tqdm` 등으로 진행률을 표시하면 운영 편의성이 높아진다.
2. 청킹 커스텀
- 청킹 커스텀이 필요한 상황
- 청킹(Chunking)은 RAG 파이프라인 품질에 가장 큰 영향을 미치는 단계 중 하나이다. LangChain의 `RecursiveCharacterTextSplitter`는 범용적이지만, 문서 유형에 따라 최적의 `chunk_size`, `chunk_overlap`, `separators`가 다르다. 예를 들어 법률 문서는 조항 단위로 큰 청크가 필요하고, FAQ 문서는 질문-답변 단위로 작은 청크가 적합하다.
- 문서 유형별 최적 청킹 전략
| 문서 유형 | 최적 chunk_size | 최적 overlap | 권장 구분자 | 이유 |
| 법률/규정 | 1500~2000 | 200~300 | \n제, \n\n, \n | 조항/항 단위 문맥 보존 중요, 법적 해석 시 상하 맥락 필수 |
| FAQ | 300~500 | 0~50 | \n\nQ:, \n\n---\n, \n\n | 질문-답변 한 쌍이 완결된 의미 단위, 불필요 overlap 최소화 |
| 코드/기술 문서 | 1000~1500 | 100~200 | \nclass , \ndef , \n\n## | 함수/클래스/섹션 단위 보존, 코드 실행 맥락 유지 |
| 일반 텍스트 | 800~1000 | 100~150 | \n\n, \n. , .\n | 문단 단위 의미 단위, 4K 컨텍스트 내 4~5개 문단 적재 최적 |
| 표 중심 문서 | 600~800 | 100 | \n\n, \n|, |\n | 표 구조 분리 방지, 헤더-로우 관계 보존, 엑셀/PDF 테이블 |
- 핵심 포인트: `chunk_overlap`이 0인 FAQ 유형에 주목하자. FAQ의 각 항목은 독립적인 질문-답변 쌍이므로, 인접 청크와 중복이 필요하지 않다. 오히려 중복이 있으면 다른 Q&A의 내용이 섞여 검색 품질이 저하될 수 있다.
- 시나리오: 문서 유형에 따라 자동으로 최적 청킹 전략을 선택
from langchain_text_splitters import RecursiveCharacterTextSplitter
# 참고: 기존 from langchain.text_splitter도 동작하지만, langchain_text_splitters가 최신 권장 경로
from langchain_core.documents import Document
class AdaptiveChunker:
"""
문서의 특성(길이, 구조, 유형)을 분석하여 자동으로
최적의 청킹 전략을 선택하는 커스텀 청커.
동작:
1. 문서 특성 분석 (평균 문장 길이, 구조 마커 존재 여부 등)
2. 특성에 따라 최적 chunk_size, overlap, 분할기 선택
3. 청크에 위치 정보 메타데이터 추가
내장 RecursiveCharacterTextSplitter와의 차이점:
- 단일 설정이 아니라, 문서별로 최적 설정을 자동 선택
- 위치 정보(시작/중간/끝) 메타데이터를 자동 추가
- 문서 유형 감지 결과를 메타데이터에 기록하여 추후 분석 가능
"""
# 문서 유형별 기본 설정
# - chunk_size: 각 청크의 최대 문자 수
# - overlap: 인접 청크 간 중복 문자 수 (문맥 연속성 보장)
# - separators: 분할 우선순위 (앞의 구분자부터 시도)
CONFIGS = {
"legal": {
"chunk_size": 2000, # 법률 문서는 조항 단위로 큰 청크
"overlap": 300, # 조항 간 문맥 연결을 위해 높은 오버랩
"separators": ["\n제", "\n\n", "\n", " "] # "제N조" 단위로 분할 우선
},
"faq": {
"chunk_size": 500, # 질문-답변 쌍은 짧은 청크
"overlap": 0, # 독립적 항목이므로 오버랩 불필요
"separators": ["\n\nQ:", "\n\n", "\n"] # Q: 마커 기준 분할
},
"code": {
"chunk_size": 1500, # 함수/클래스가 잘리지 않도록 넉넉한 크기
"overlap": 100, # 함수 간 의존성을 위한 최소 오버랩
"separators": ["\nclass ", "\ndef ", "\n\n", "\n"] # 클래스/함수 단위
},
"general": {
"chunk_size": 1000, # 범용 크기
"overlap": 150, # 표준 오버랩 (chunk_size의 15%)
"separators": ["\n\n", "\n", ". ", " "] # 문단 → 줄 → 문장 → 단어
},
"table_heavy": {
"chunk_size": 800, # 표가 잘리지 않도록 적정 크기
"overlap": 100,
"separators": ["\n\n", "\n|", "\n"] # 표 행 구분자 포함
},
}
def __init__(self):
pass
def chunk(self, documents: list[Document]) -> list[Document]:
"""문서 리스트를 유형별 최적 전략으로 청킹"""
all_chunks = []
for doc in documents:
# 문서 유형 자동 감지
doc_type = self._detect_document_type(doc)
# 감지된 유형에 맞는 설정 로딩 (매칭 실패 시 general 사용)
config = self.CONFIGS.get(doc_type, self.CONFIGS["general"])
# 설정에 따라 RecursiveCharacterTextSplitter 생성
splitter = RecursiveCharacterTextSplitter(
chunk_size=config["chunk_size"],
chunk_overlap=config["overlap"],
separators=config["separators"]
)
# 청킹 수행
chunks = splitter.split_documents([doc])
# 각 청크에 위치 정보 메타데이터 추가
total_chunks = len(chunks)
for i, chunk in enumerate(chunks):
chunk.metadata.update({
"chunk_index": i, # 현재 청크 번호 (0-based)
"total_chunks": total_chunks, # 해당 문서의 전체 청크 수
"chunk_position": ( # 위치 정보 (start/middle/end)
"start" if i == 0
else ("end" if i == total_chunks - 1
else "middle")
),
"detected_type": doc_type, # 감지된 문서 유형
"chunk_size_config": config["chunk_size"], # 적용된 chunk_size 설정
})
all_chunks.append(chunk)
return all_chunks
def _detect_document_type(self, doc: Document) -> str:
"""문서 내용과 메타데이터를 분석하여 유형 결정"""
text = doc.page_content
category = doc.metadata.get("category", "").lower()
# 1차 판단: 메타데이터 기반 (SmartDocumentLoader가 추출한 카테고리 활용)
if "법" in category or "legal" in category:
return "legal"
if "faq" in category or "질문" in category:
return "faq"
# 2차 판단: 내용 기반 (텍스트 패턴으로 유형 추론)
# 법률 문서: "제N조", "제N항" 등의 패턴이 빈번
if text.count("제") > 10 and ("조" in text or "항" in text):
return "legal"
# FAQ 문서: "Q:" 또는 "질문:" 패턴이 반복
if text.count("Q:") > 3 or text.count("질문:") > 3:
return "faq"
# 코드 문서: Python 함수/클래스 정의 또는 코드 블록이 빈번
if text.count("def ") > 3 or text.count("class ") > 2 or text.count("```") > 4:
return "code"
# 표 중심 문서: 파이프(|) 기호가 빈번 (마크다운 표)
if text.count("|") > 20 or text.count("│") > 10:
return "table_heavy"
# 어떤 패턴에도 매칭되지 않으면 일반 문서로 분류
return "general"
- 파라미터 정의 기준: `_detect_document_type`의 임계값(예: `text.count("제") > 10`)은 경험적으로 설정한 값이다. 프로덕션 환경에서는 실제 문서 샘플로 테스트하여 임계값을 조정해야 한다. 오탐(false positive)이 발생하면 `general` 설정이 적용되므로 큰 문제는 없지만, 최적 성능을 위해 도메인별 튜닝이 필요하다.
- 위치 정보 메타데이터의 활용
- `chunk_position` 메타데이터(start, middle, end)는 검색 후 답변 생성 시 유용하게 활용된다:
| chunk_position | 활용 방법 |
| start | 문서 서론/개요/정의 포함 가능성 높아 전체 문서 맥락 파악 및 라우팅/범위 좁히기에 유리. 초기 검색 스코어 보정 가중치 ↑ |
| middle | 핵심 본론/세부 내용/데이터 포함 가능성 높음. 주요 사실 추출, 정밀 검색에 최적. 기본 우선순위 최고 |
| end | 결론/요약/권고사항 포함 가능성 높아 종합적 답변 생성 및 최종 판단에 유리. 답변 합성 시 가중치 ↑ |
# 예시: 검색 결과에서 start 청크의 우선순위를 높이는 재순위 로직
def prioritize_start_chunks(docs: list[Document]) -> list[Document]:
"""start 위치의 청크를 우선 배치하여 문맥 파악을 돕는 재순위 함수"""
start_docs = [d for d in docs if d.metadata.get("chunk_position") == "start"]
other_docs = [d for d in docs if d.metadata.get("chunk_position") != "start"]
return start_docs + other_docs
- 사용 예시:
# AdaptiveChunker 초기화 및 실행
chunker = AdaptiveChunker()
chunks = chunker.chunk(documents) # SmartDocumentLoader로 로딩한 문서 전달
# 유형별 통계 확인 (문서 유형 감지 결과 검증)
from collections import Counter
type_counts = Counter(c.metadata["detected_type"] for c in chunks)
print(f"유형별 청크 수: {type_counts}")
# → Counter({'legal': 45, 'table_heavy': 12, 'general': 8})
# 각 유형별 적용된 설정 확인
for doc_type, count in type_counts.items():
config = AdaptiveChunker.CONFIGS.get(doc_type, {})
print(f" {doc_type}: {count}개 청크 (chunk_size={config.get('chunk_size')})")
- 실무 권장: 청킹 결과를 반드시 시각적으로 검증하라. 청크가 의미 단위로 잘 분할되었는지, 중요한 정보가 청크 경계에서 잘리지 않았는지 확인해야 한다. 검증 방법으로는 랜덤 샘플링한 청크를 직접 읽어보거나, 청크 크기 분포를 히스토그램으로 시각화하는 것이 효과적이다.
3. 임베딩 커스텀
- 임베딩 커스텀이 필요한 상황
- 임베딩(Embedding)은 텍스트를 벡터로 변환하여 의미적 유사도 계산을 가능하게 하는 단계이다. LangChain의 `OpenAIEmbeddings`는 범용적이지만, 도메인 특화 환경에서는 다음과 같은 한계가 있다:
| 한계 | 설명 | 커스텀 해결 방식 |
| 도메인 용어 이해 부족 | 일반 임베딩 모델이 법률/세법 전문 용어의 의미적 유사성 포착 어려움 | 도메인 용어 사전 구축 후 하이브리드 검색 (BM25 + semantic), 또는 도메인 특화 fine-tuning 모델 사용 |
| API 비용 증가 | 동일 텍스트의 반복 임베딩 호출로 불필요한 토큰 비용 발생 | 텍스트 해시 기반 Redis/SQLite 캐싱 (TTL 30일), 동일 chunk_position별 고유 키 생성 |
| 응답 속도 | 매 쿼리마다 임베딩 API 왕복 지연 (200-500ms 추가) | 캐시 히트 시 즉시 반환 (<1ms), 미스 시 비동기 prefetching으로 다음 쿼리 대비 |
| 오프라인 환경 | OpenAI API 서버 장애/네트워크 끊김 시 전체 시스템 중단 | fallback 로컬 모델 (sentence-transformers/all-MiniLM-L6-v2), 캐시된 임베딩 우선 사용으로 graceful degradation |
- 관련 개념: 도메인 용어 확장이 검색 품질에 미치는 영향
- 벡터 임베딩은 텍스트의 의미를 벡터 공간에 매핑한다. 그런데 일상어("직장인")와 전문어("거주자, 근로소득자")는 동일한 개념을 가리키지만 임베딩 공간에서 거리가 멀 수 있다. 도메인 용어 확장은 이 간극을 줄이는 기법이다.
[도메인 용어 확장 전]
질의: "직장인 세금 계산" ──── 임베딩 ────→ 벡터 A
문서: "근로소득자 소득세" ──── 임베딩 ────→ 벡터 B
코사인 유사도(A, B) = 0.65 (낮음 → 검색 순위 하락)
[도메인 용어 확장 후]
질의: "직장인 세금 계산 (거주자, 근로소득자) (소득세, 과세)" ──→ 벡터 A'
문서: "근로소득자 소득세" ──── 임베딩 ────→ 벡터 B
코사인 유사도(A', B) = 0.82 (높음 → 검색 순위 상승)
- 관련 개념: LangChain Embeddings 인터페이스
- LangChain의 벡터 DB(Chroma, Pinecone 등)와 호환되는 커스텀 임베딩을 만들려면 `Embeddings` 인터페이스를 구현해야 한다. 이 인터페이스는 두 가지 메서드를 필수로 요구한다:
from langchain_core.embeddings import Embeddings
class CustomEmbeddings(Embeddings):
def embed_documents(self, texts: list[str]) -> list[list[float]]:
"""여러 문서를 임베딩 (인덱싱 시 사용)"""
pass
def embed_query(self, text: str) -> list[float]:
"""단일 질의를 임베딩 (검색 시 사용)"""
pass
- 핵심 포인트: `embed_documents`와 `embed_query`가 분리된 이유는, 일부 임베딩 모델(예: Voyage AI)이 문서 임베딩과 질의 임베딩에 서로 다른 전략을 사용하기 때문이다. 문서는 정보를 표현하는 것이 목적이고, 질의는 정보를 찾는 것이 목적이므로 최적화 방향이 다를 수 있다.
- 시나리오: 도메인 용어를 강화하고 캐싱을 적용한 임베딩
import hashlib
import json
import os
from langchain_core.embeddings import Embeddings
from langchain_openai import OpenAIEmbeddings
class DomainEnhancedEmbeddings(Embeddings):
"""
도메인 전문 용어를 확장하고, 로컬 캐싱을 적용하여
비용을 절감하는 커스텀 임베딩 래퍼.
Embeddings 인터페이스를 구현하여 Chroma 등 벡터 DB와 호환.
동작:
1. 입력 텍스트에 도메인 용어 확장을 적용
2. 캐시에 존재하면 캐시된 벡터 반환 (API 호출 생략)
3. 캐시에 없으면 OpenAI API 호출 후 캐시 저장
비용 절감 효과:
- 동일 텍스트 재임베딩 시 API 비용 0원
- 문서 인덱스 재구축 시 변경되지 않은 청크는 캐시에서 즉시 로딩
"""
def __init__(self, domain_terms: dict = None, cache_dir: str = "./embedding_cache"):
# base_embedding: 실제 임베딩을 수행하는 OpenAI 모델
# text-embedding-3-large: 3072차원, 현재 가장 높은 품질의 OpenAI 임베딩
self.base_embedding = OpenAIEmbeddings(model="text-embedding-3-large")
# domain_terms: 도메인 용어 사전 {일상어: 전문어 확장}
self.domain_terms = domain_terms or {}
# cache_dir: 임베딩 캐시를 저장할 디렉토리
self.cache_dir = cache_dir
os.makedirs(cache_dir, exist_ok=True)
# 기존 캐시 로딩 (이전 실행에서 저장한 임베딩 벡터 재사용)
self._load_cache()
def _load_cache(self):
"""디스크에서 캐시 파일 로딩 (JSON 형식)"""
cache_file = os.path.join(self.cache_dir, "embedding_cache.json")
if os.path.exists(cache_file):
with open(cache_file, "r") as f:
self.cache = json.load(f)
else:
self.cache = {}
def _save_cache(self):
"""현재 캐시를 디스크에 저장 (프로그램 종료 후에도 유지)"""
cache_file = os.path.join(self.cache_dir, "embedding_cache.json")
with open(cache_file, "w") as f:
json.dump(self.cache, f)
def _get_cache_key(self, text: str) -> str:
"""텍스트의 SHA-256 해시를 캐시 키로 사용
(동일 텍스트 → 동일 해시 → 캐시 적중)"""
return hashlib.sha256(text.encode()).hexdigest()
def _enhance_with_domain(self, text: str) -> str:
"""도메인 용어 확장: 텍스트에 포함된 일상어에 전문어를 추가
예: "직장인 세금" → "직장인 세금 (거주자, 근로소득자) (소득세, 과세)"
"""
enhanced = text
for term, expansion in self.domain_terms.items():
if term in enhanced:
# 원본 텍스트 뒤에 괄호로 전문어를 추가
# 원본을 변경하지 않으므로 의미 손실 없음
enhanced += f" ({expansion})"
return enhanced
def embed_documents(self, texts: list[str]) -> list[list[float]]:
"""문서 임베딩 (캐시 활용으로 비용 절감)"""
results = []
uncached_texts = [] # API 호출이 필요한 텍스트
uncached_indices = [] # 해당 텍스트의 원래 인덱스
for i, text in enumerate(texts):
# 도메인 용어 확장 적용
enhanced = self._enhance_with_domain(text)
# 확장된 텍스트로 캐시 키 생성
key = self._get_cache_key(enhanced)
if key in self.cache:
# 캐시 적중: API 호출 없이 저장된 벡터 사용
results.append(self.cache[key])
else:
# 캐시 미스: API 호출 대상으로 마킹
results.append(None)
uncached_texts.append(enhanced)
uncached_indices.append(i)
# 캐시되지 않은 텍스트만 모아서 한 번에 API 호출 (배치 효율)
if uncached_texts:
new_embeddings = self.base_embedding.embed_documents(uncached_texts)
for idx, emb in zip(uncached_indices, new_embeddings):
enhanced = self._enhance_with_domain(texts[idx])
key = self._get_cache_key(enhanced)
# 새로 생성된 임베딩을 캐시에 저장
self.cache[key] = emb
results[idx] = emb
# 변경된 캐시를 디스크에 저장
self._save_cache()
print(f" [임베딩] 캐시 적중: {len(texts)-len(uncached_texts)}/{len(texts)}, "
f"신규 API 호출: {len(uncached_texts)}")
return results
def embed_query(self, text: str) -> list[float]:
"""질의 임베딩 (도메인 확장 적용, 캐시 미적용)
질의는 매번 다르고 실시간 응답이 필요하므로 캐시를 적용하지 않음"""
enhanced = self._enhance_with_domain(text)
return self.base_embedding.embed_query(enhanced)
- 파라미터 정의 기준: `embed_query`에서 캐시를 적용하지 않는 이유는 두 가지이다. (1) 질의는 사용자마다 다르므로 캐시 적중률이 낮고, (2) 질의 임베딩은 한 번에 하나만 수행하므로 비용이 미미하다. 반면 `embed_documents`는 수백~수천 개의 문서를 처리하므로 캐시 효과가 크다.
- 캐싱 전략 비교
| 캐싱 방식 | 장점 | 단점 | 적합 상황 |
| JSON 파일 (현재 구현) | 구현 가장 간단 (3줄 코드), 디버깅 용이 (파일 직접 확인), 백업 쉬움 | 대용량(10만+) 시 파일 I/O 느림, 동시성 문제, 메모리 전체 로딩 | 개발/테스트, 소규모 문서(1만 건 이하), 단일 인스턴스 |
| SQLite | 대용량(100만+) 지원, 부분 로딩(SELECT LIMIT), 트랜잭션/ACID, 단일 파일 | 구현 복잡도 중간 (SQL 쿼리 작성), 쓰기 잠금 발생 가능 | 중대형 문서셋(10만~100만), 단일 서버 프로덕션, 데이터 무결성 중요 |
| Redis | 초고속(<1ms), 분산 클러스터, TTL 자동 만료, 메모리 기반 | Redis 서버 인프라/관리 비용, 영속성 설정 필요(RDB/AOF) | 멀티 서버/컨테이너 환경, 고트래픽(QPS 1K+), 실시간 서비스 |
| LRU 메모리 캐시 | 가장 빠름 (인메모리 O(1)), LangChain InMemoryCache 바로 사용 | 프로세스 종료 시 완전 소멸, 메모리 제한(수백만 벡터 한계) | 단일 세션 내 반복 쿼리, 임베딩 서버 프록시, 메모리 여유 충분 |
- 실무 권장: 프로토타입에서는 JSON 파일 캐시로 시작하고, 문서가 10만 건을 넘거나 멀티 프로세스 환경이면 SQLite 또는 Redis로 전환하라. 캐시 키에 임베딩 모델명을 포함시키면(예: `f"{model_name}:{text_hash}"`), 모델 변경 시 캐시 충돌을 방지할 수 있다.
- 사용 예시:
# 도메인 용어 사전 정의 (일상어 → 전문어 매핑)
domain_terms = {
"직장인": "거주자, 근로소득자",
"월급": "근로소득, 급여소득",
"세금": "소득세, 과세",
"연말정산": "근로소득세 연말정산, 소득공제",
}
# 커스텀 임베딩 초기화
embedding = DomainEnhancedEmbeddings(domain_terms=domain_terms)
# Chroma와 함께 사용 (LangChain의 Embeddings 인터페이스 호환)
db = Chroma.from_documents(
documents=chunks, # 청킹된 문서 리스트
embedding=embedding, # 커스텀 임베딩 전달
persist_directory="./chroma_enhanced" # 벡터 DB 저장 경로
)
# 첫 실행: 모든 문서에 대해 API 호출 → 캐시에 저장
# → [임베딩] 캐시 적중: 0/100, 신규 API 호출: 100
# 두 번째 실행 (동일 문서): 캐시에서 즉시 로딩 → API 비용 0원
# → [임베딩] 캐시 적중: 100/100, 신규 API 호출: 0
4. 벡터 DB 관리 커스텀
- 벡터 DB 관리 커스텀이 필요한 상황
- LangChain의 기본 `Chroma.from_documents()`는 매번 전체 문서를 새로 인덱싱한다. 문서가 수천 건일 때 매번 전체를 재처리하면 임베딩 API 비용과 시간이 크게 증가한다. 또한 문서 변경 이력을 추적할 수 없어 "어떤 문서가 언제 추가/삭제되었는지" 파악이 불가능하다.
- 증분 업데이트의 필요성
[전체 재인덱싱 방식]
문서 100개 중 1개 변경 → 100개 전부 재임베딩 → 비용: $0.50, 시간: 2분
[증분 업데이트 방식]
문서 100개 중 1개 변경 → 변경된 1개만 재임베딩 → 비용: $0.005, 시간: 1초
- 핵심 포인트: 증분 업데이트의 핵심은 "변경 감지"이다. 문서의 해시값(SHA-256)을 기록해두고, 새로 로딩한 문서의 해시와 비교하여 추가/삭제/변경 없음을 판별한다. 이는 Git이 파일 변경을 감지하는 원리와 동일하다.
- 관련 개념: 문서 해시 기반 변경 감지
- 문서의 해시는 `page_content`와 `metadata`를 결합한 문자열의 SHA-256 해시로 생성한다. 이렇게 하면:
- 텍스트 내용이 변경되면 해시가 바뀜 → 변경 감지
- 메타데이터가 변경되어도 해시가 바뀜 → 변경 감지
- 아무것도 변경되지 않으면 해시 동일 → 변경 없음 확인
- 시나리오: 문서 버전 관리와 증분 업데이트를 지원하는 벡터 DB 매니저
import hashlib
import json
import os
from datetime import datetime
from langchain_community.vectorstores import Chroma
from langchain_core.documents import Document
class VectorDBManager:
"""
문서 버전 관리, 증분 업데이트, 롤백을 지원하는
벡터 DB 관리 래퍼.
동작:
1. 문서 해시를 기반으로 변경 감지
2. 변경된 문서만 증분 업데이트
3. 버전별 상태를 기록하여 롤백 지원
내장 Chroma.from_documents와의 차이점:
- 변경된 문서만 처리 (비용/시간 절감)
- 업데이트 이력 추적 (버전 관리)
- 삭제된 문서 자동 제거
"""
def __init__(self, persist_dir: str, embedding, collection_name: str = "default"):
# persist_dir: 벡터 DB와 버전 정보를 저장할 디렉토리
self.persist_dir = persist_dir
# embedding: 임베딩 모델 (DomainEnhancedEmbeddings 등)
self.embedding = embedding
# collection_name: Chroma 컬렉션 이름 (용도별 분리 가능)
self.collection_name = collection_name
# 버전 정보 파일 경로
self.version_file = os.path.join(persist_dir, "versions.json")
# 기존 버전 정보 로딩
self._load_versions()
def _load_versions(self):
"""버전 정보 파일 로딩 (이전 업데이트 이력 복원)"""
if os.path.exists(self.version_file):
with open(self.version_file, "r") as f:
self.versions = json.load(f)
else:
# 초기 상태: 버전 0, 문서 없음, 이력 없음
self.versions = {"current_version": 0, "documents": {}, "history": []}
def _save_versions(self):
"""버전 정보를 디스크에 저장"""
os.makedirs(os.path.dirname(self.version_file) or ".", exist_ok=True)
with open(self.version_file, "w") as f:
json.dump(self.versions, f, ensure_ascii=False, indent=2)
def _doc_hash(self, doc: Document) -> str:
"""문서의 고유 해시 생성 (내용 + 메타데이터 기반)"""
# page_content와 metadata를 결합하여 해시 생성
# sort_keys=True: 메타데이터 키 순서에 관계없이 동일한 해시 보장
content = doc.page_content + json.dumps(doc.metadata, sort_keys=True, ensure_ascii=False)
return hashlib.sha256(content.encode()).hexdigest()
def get_db(self) -> Chroma:
"""현재 벡터 DB 인스턴스 반환"""
return Chroma(
persist_directory=self.persist_dir,
embedding_function=self.embedding,
collection_name=self.collection_name
)
def update(self, documents: list[Document]) -> dict:
"""증분 업데이트 - 변경된 문서만 처리
처리 흐름:
1. 새 문서들의 해시 계산
2. 이전 버전의 해시와 비교하여 추가/삭제/유지 분류
3. 삭제된 문서는 벡터 DB에서 제거
4. 새로 추가된 문서만 벡터 DB에 추가
5. 버전 정보 업데이트 및 이력 기록
"""
# 새 문서들의 해시 맵 생성 {해시: Document}
new_hashes = {self._doc_hash(doc): doc for doc in documents}
# 이전 버전에 있던 문서 해시들
old_hashes = set(self.versions["documents"].keys())
new_hash_set = set(new_hashes.keys())
# 집합 연산으로 변경 사항 분류
added = new_hash_set - old_hashes # 새로 추가된 문서 (신규 해시)
removed = old_hashes - new_hash_set # 삭제된 문서 (이전에만 존재)
unchanged = old_hashes & new_hash_set # 변경 없는 문서 (양쪽 모두 존재)
db = self.get_db()
# 삭제된 문서를 벡터 DB에서 제거
if removed:
ids_to_remove = [self.versions["documents"][h]["id"] for h in removed
if h in self.versions["documents"]]
if ids_to_remove:
db._collection.delete(ids=ids_to_remove)
# 새로 추가된 문서만 벡터 DB에 추가 (임베딩 API 호출은 여기서만 발생)
added_docs = [new_hashes[h] for h in added]
if added_docs:
db.add_documents(added_docs)
# 버전 정보 업데이트
self.versions["current_version"] += 1
self.versions["documents"] = {
h: {"id": h[:8], "source": new_hashes[h].metadata.get("source", "")}
for h in new_hash_set
}
# 업데이트 이력 기록 (감사 추적용)
self.versions["history"].append({
"version": self.versions["current_version"],
"timestamp": datetime.now().isoformat(),
"added": len(added),
"removed": len(removed),
"unchanged": len(unchanged),
"total": len(documents)
})
self._save_versions()
return {
"version": self.versions["current_version"],
"added": len(added),
"removed": len(removed),
"unchanged": len(unchanged)
}
def get_status(self) -> dict:
"""현재 DB 상태 조회 (모니터링용)"""
return {
"version": self.versions["current_version"],
"total_documents": len(self.versions["documents"]),
"history": self.versions["history"][-5:] # 최근 5개 이력만 반환
}
- 파라미터 정의 기준: `collection_name`은 동일 벡터 DB 내에서 용도별로 컬렉션을 분리할 때 사용한다. 예를 들어 `"tax_docs"`, `"hr_docs"`, `"policy_docs"` 등으로 나누면 각 컬렉션을 독립적으로 업데이트할 수 있다. `history`에서 최근 5개만 반환하는 것은 조회 성능을 위한 것이며, 전체 이력은 `self.versions["history"]`로 접근 가능하다.
- 증분 업데이트의 내부 동작 시각화
[버전 1: 초기 인덱싱]
문서: {A, B, C} → 해시: {ha, hb, hc}
이전 해시: {}
추가: {ha, hb, hc} (3개)
삭제: {} (0개)
→ 결과: version=1, added=3, removed=0, unchanged=0
[버전 2: 문서 D 추가, 문서 B 삭제]
문서: {A, C, D} → 해시: {ha, hc, hd}
이전 해시: {ha, hb, hc}
추가: {hd} (1개 - D만 새로 임베딩)
삭제: {hb} (1개 - B를 벡터 DB에서 제거)
유지: {ha, hc} (2개 - A, C는 처리하지 않음)
→ 결과: version=2, added=1, removed=1, unchanged=2
- 사용 예시:
# VectorDBManager 초기화
manager = VectorDBManager(
persist_dir="./chroma_managed", # 벡터 DB 저장 경로
embedding=embedding, # 커스텀 임베딩 인스턴스
collection_name="tax_docs" # 컬렉션 이름
)
# 첫 업데이트 (전체 문서 인덱싱)
result = manager.update(documents)
print(result) # {'version': 1, 'added': 50, 'removed': 0, 'unchanged': 0}
# 문서 변경 후 증분 업데이트 (변경된 것만 처리)
updated_documents = documents + [new_doc] # 새 문서 1개 추가
result = manager.update(updated_documents)
print(result) # {'version': 2, 'added': 1, 'removed': 0, 'unchanged': 50}
# 상태 확인 (업데이트 이력 조회)
status = manager.get_status()
print(f"현재 버전: {status['version']}")
print(f"총 문서 수: {status['total_documents']}")
print(f"최근 이력: {status['history']}")
- 실무 권장: 프로덕션 환경에서는 `update()` 호출 전에 반드시 백업을 수행하라. `shutil.copytree(persist_dir, f"{persist_dir}_backup_{datetime.now().strftime('%Y%m%d')}")`로 간단히 백업할 수 있다. 또한 `_doc_hash`에서 metadata를 포함하므로, 메타데이터만 변경해도 해시가 바뀌어 재인덱싱된다. 내용 변경만 추적하고 싶다면 해시 계산에서 metadata를 제외할 수 있다.
5. 검색 커스텀
- 검색 커스텀이 필요한 상황
- 검색(Retrieval)은 RAG 파이프라인에서 답변 품질에 가장 직접적인 영향을 미치는 단계이다. LangChain의 기본 `db.as_retriever()`는 단일 검색 전략(similarity 또는 mmr)을 사용하지만, 실무에서는 질의의 유형에 따라 최적 검색 전략이 다르다.
- 질의 유형별 최적 검색 전략
| 질의 유형 | 예시 | 최적 k값 | 최적 검색 타입 | 재순위 필요 여부 |
| 사실 확인 | "소득세율은?", "2026 법인세율 몇 %?" | 2~3 | similarity_topk | 불필요 (단일 정확 문서 우선) |
| 정의 | "과세표준이란?", "조세공제 정의" | 2~3 | similarity_topk | 불필요 (정의는 첫 번째 문서 충분) |
| 비교 | "소득세 vs 법인세", "근로소득 vs 사업소득 과세 차이" | 5~6 | mmr (다양성 확보) | 필요 (관련 문서들 균형 확보) |
| 절차 | "연말정산 신고 방법", "환급 절차 단계" | 4~5 | similarity_topk | 필요 (순서/단계별 문서 재정렬) |
| 계산 | "세액 얼마? (연봉 5천)", "공제 후 과세표준 계산" | 3~4 | similarity_hybrid (BM25 혼합) | 불필요 (수식/예시 문서 우선) |
| 복합 | "세금 줄이는 법과 주의사항", "절세 + 법적 리스크" | 6~8 | mmr + hybrid | 필요 (다양 주제 균형 + rerank 필수) |
- 핵심 포인트: 비교(comparison)와 복합(complex) 질의에서 MMR(Maximal Marginal Relevance)을 사용하는 이유는 "다양성" 때문이다. "소득세와 법인세 비교"라는 질의에 대해 similarity 검색을 하면 소득세 관련 문서만 상위에 올 수 있다. MMR은 유사도가 높으면서도 서로 다른 내용의 문서를 선택하므로, 소득세와 법인세 양쪽의 문서를 고르게 검색할 수 있다.
- 관련 개념: Similarity Search vs MMR
[Similarity Search]
질의: "소득세와 법인세 차이"
결과: 소득세 문서1 (유사도 0.92) ← 소득세 내용만 치우침
소득세 문서2 (유사도 0.90)
소득세 문서3 (유사도 0.88)
법인세 문서1 (유사도 0.85)
[MMR (lambda_mult=0.5)]
질의: "소득세와 법인세 차이"
결과: 소득세 문서1 (유사도 0.92, 다양성 높음) ← 양쪽 균형
법인세 문서1 (유사도 0.85, 다양성 높음)
소득세 비교 문서 (유사도 0.87, 다양성 높음)
법인세 계산 문서 (유사도 0.83, 다양성 높음)
- 시나리오: 질의 의도를 분석하고, 다중 전략으로 검색 후 재순위를 매기는 지능형 검색기
from langchain_core.documents import Document
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableLambda
class IntelligentRetriever:
"""
질의 의도 분석 → 전략 선택 → 다중 검색 → 재순위 매기기를
자동 수행하는 지능형 검색기.
동작:
1. LLM으로 질의 의도 분석 (사실 확인, 비교, 절차, 계산 등)
2. 의도에 따라 검색 전략 결정 (k값, 검색 타입, 필터)
3. 복수 전략으로 검색 후 결과 통합
4. LLM으로 최종 재순위
내장 retriever와의 차이점:
- 질의별로 동적으로 검색 전략 변경
- 의도에 따라 k값, search_type 자동 최적화
- LLM 기반 재순위로 최종 검색 품질 향상
"""
def __init__(self, vector_db, llm=None):
# vector_db: Chroma 등 벡터 DB 인스턴스
self.db = vector_db
# llm: 의도 분석과 재순위에 사용할 LLM (비용 절감을 위해 mini 모델)
self.llm = llm or ChatOpenAI(model="gpt-4o-mini", temperature=0)
def retrieve(self, query: str) -> list[Document]:
"""메인 검색 메서드: 의도 분석 → 전략 검색 → 재순위"""
# 1. 질의 의도 분석
intent = self._analyze_intent(query)
print(f" [검색] 의도: {intent['type']}, 전략: {intent['strategy']}")
# 2. 분석된 전략에 따라 검색 수행
candidates = self._search_by_strategy(query, intent)
# 3. 후보가 충분하면 LLM 재순위 수행
if len(candidates) > 3:
reranked = self._rerank(query, candidates)
else:
# 후보가 3개 이하면 재순위 의미 없음 (비용 절감)
reranked = candidates
return reranked
def _analyze_intent(self, query: str) -> dict:
"""LLM으로 질의 의도를 분석하여 검색 전략 결정
에러 핸들링:
- LLM 호출 실패(네트워크, API 키, Rate Limit 등) 시 "complex" 전략으로 폴백
- LLM이 예상 외 응답을 반환해도 strategy_map.get()의 기본값으로 안전하게 처리
- 이를 통해 의도 분석 실패가 전체 검색 프로세스를 중단하지 않음
"""
import logging
logger = logging.getLogger(__name__)
# 의도별 최적 검색 전략 매핑
strategy_map = {
"fact_check": {
"k": 3, # 소수의 정확한 문서면 충분
"search_type": "similarity", # 단순 유사도 검색
"needs_rerank": False # 재순위 불필요
},
"comparison": {
"k": 6, # 비교 대상 양쪽의 문서 필요
"search_type": "mmr", # 다양성 확보 (양쪽 균형)
"needs_rerank": True # 관련도 재정렬 필요
},
"procedure": {
"k": 5, # 절차의 전체 단계를 커버
"search_type": "similarity",
"needs_rerank": True # 순서대로 정렬 필요
},
"calculation": {
"k": 4, # 계산식과 예시 문서 필요
"search_type": "similarity",
"needs_rerank": False
},
"definition": {
"k": 2, # 정의는 1~2개 문서면 충분
"search_type": "similarity",
"needs_rerank": False
},
"complex": {
"k": 8, # 복합 질의는 넓게 검색
"search_type": "mmr", # 다양한 관점의 문서 필요
"needs_rerank": True
},
}
try:
response = self.llm.invoke(
f"다음 질문의 의도를 분석하세요.\n"
f"유형: fact_check(사실확인), comparison(비교), procedure(절차), "
f"calculation(계산), definition(정의), complex(복합)\n"
f"질문: {query}\n"
f"유형 하나만 답하세요:"
)
intent_type = response.content.strip().lower()
# LLM 응답이 유효한 유형인지 검증
if intent_type not in strategy_map:
logger.warning(
f"LLM이 예상 외 의도를 반환: '{intent_type}' → 'complex'로 폴백"
)
intent_type = "complex"
except Exception as e:
# LLM 호출 실패 시 가장 넓은 검색 전략(complex)으로 폴백
# → 검색 범위가 넓어 정보 누락 위험이 가장 낮음
logger.error(f"의도 분석 LLM 호출 실패: {e} → 'complex'로 폴백")
intent_type = "complex"
strategy = strategy_map.get(intent_type, strategy_map["complex"])
return {"type": intent_type, "strategy": strategy}
def _search_by_strategy(self, query: str, intent: dict) -> list[Document]:
"""의도에 맞는 전략으로 벡터 DB 검색 수행"""
strategy = intent["strategy"]
search_kwargs = {"k": strategy["k"]}
if strategy["search_type"] == "mmr":
# MMR 검색 시 추가 파라미터 설정
search_kwargs["fetch_k"] = strategy["k"] * 3 # 1차 후보 수 (k의 3배)
search_kwargs["lambda_mult"] = 0.5 # 유사도(1.0)와 다양성(0.0)의 균형
# LangChain의 as_retriever로 검색기 생성
retriever = self.db.as_retriever(
search_type=strategy["search_type"],
search_kwargs=search_kwargs
)
return retriever.invoke(query)
def _rerank(self, query: str, documents: list[Document], top_k: int = 4) -> list[Document]:
"""LLM 기반 재순위: 검색된 문서를 질의 관련도 순으로 재정렬
벡터 검색의 유사도 점수는 "의미적 유사성"을 기준으로 하지만,
LLM 재순위는 "질의에 대한 답변 적합성"을 기준으로 하므로
최종 검색 품질이 향상된다.
"""
# 각 문서의 미리보기 생성 (비용 절감을 위해 300자로 제한)
doc_summaries = []
for i, doc in enumerate(documents):
preview = doc.page_content[:300]
doc_summaries.append(f"[문서{i}] {preview}")
response = self.llm.invoke(
f"다음 질문에 가장 관련 있는 문서 순서대로 번호를 나열하세요.\n"
f"질문: {query}\n\n"
f"{'\\n'.join(doc_summaries)}\n\n"
f"상위 {top_k}개 문서 번호 (쉼표 구분):"
)
try:
# LLM 응답에서 문서 번호 파싱
indices = [int(x.strip()) for x in response.content.strip().split(",")]
return [documents[i] for i in indices[:top_k] if i < len(documents)]
except (ValueError, IndexError):
# 파싱 실패 시 원래 순서 유지 (안전한 폴백)
return documents[:top_k]
- 파라미터 정의 기준: `_rerank`에서 `top_k=4`인 이유는 LLM의 컨텍스트 윈도우를 효율적으로 활용하기 위함이다. 문서가 너무 많으면 LLM 프롬프트가 길어지고, 너무 적으면 정보가 부족하다. 일반적으로 4~5개의 컨텍스트 문서가 답변 품질과 비용의 균형점이다. `preview = doc.page_content[:300]`으로 제한하는 것도 같은 이유이다.
- LLM 재순위 vs Cross-Encoder 재순위
| 재순위 방식 | 장점 | 단점 | 비용 | 적합 상황 |
| LLM 재순위 (현재 구현) | 구현 매우 간단 (LangChain 프롬프트 10줄), 자연어 이해 우수, 즉시 사용 가능 | LLM API 호출 지연 (2-5초/K=5), 토큰 비용 ($0.01-0.03/쿼리), 배치 미지원 | 중간 (gpt-4o-mini 기준 $0.15/M input) | 프로토타입/테스트, 소량 고품질 질의, 빠른 검증 |
| Cross-Encoder | 초고속 (로컬 GPU 50-200ms/K=10), 오프라인 실행 가능, 정밀도 높음 (NDCG+0.05) | 모델 다운로드/서빙 필요 (bge-reranker-large), 한국어 성능 모델별 편차 | 낮음 (로컬 무료, 배치 효율) | 대량 프로덕션, 실시간 요구 (RTT<1s), 비용 최적화 |
| Cohere Rerank | SOTA 품질 (다국어/법률 특화), API 즉시 사용, 배치 지원 | 외부 API 의존성, 사용량 기반 비용 ($2-5/M 쿼리) | 중간 ($0.10/쿼리 K=10 기준) | 다국어/고트래픽 프로덕션, 품질 최우선, 인프라 최소화 |
- 실무 권장: 프로토타입에서는 본 구현처럼 LLM 기반 재순위를 사용하되, 프로덕션 전환 시 `sentence-transformers`의 Cross-Encoder 모델(예: `cross-encoder/ms-marco-MiniLM-L-6-v2`)이나 Cohere Rerank API로 교체하는 것이 비용-성능 면에서 효율적이다.
- 사용 예시:
# 지능형 검색기 초기화
smart_retriever = IntelligentRetriever(vector_db=db, llm=llm)
# LCEL 체인에 통합 (LangChain Expression Language)
# RunnableLambda: 일반 Python 함수를 LCEL 체인의 구성요소로 변환
rag_chain = (
{
"context": RunnableLambda(lambda q: smart_retriever.retrieve(q)) | format_docs,
"question": RunnablePassthrough() # 질의를 그대로 전달
}
| prompt
| llm
| StrOutputParser()
)
# 비교 질의 실행 → 자동으로 MMR + 재순위 적용
result = rag_chain.invoke("근로소득세와 종합소득세의 차이점은?")
# → [검색] 의도: comparison, 전략: {'k': 6, 'search_type': 'mmr', ...}
# → 소득세, 종합소득세 양쪽의 문서가 균형 있게 검색됨
6. 질의 변환 커스텀
- 질의 변환 커스텀이 필요한 상황
- 사용자가 입력하는 질의는 일상적 표현, 축약어, 복합적 의도를 포함하는 경우가 많다. 이런 질의를 그대로 벡터 검색에 사용하면 검색 품질이 떨어진다. 질의 변환은 사용자의 원본 질의를 검색에 최적화된 형태로 변환하여 검색 재현율(recall)과 정밀도(precision)를 높이는 핵심 기법이다.
- 질의 변환 기법 비교
| 기법 | 설명 | 효과 | 비용 |
| 도메인 사전 변환 | 사용자 질의의 일상어를 세법 전문 용어로 규칙 기반 치환 ("세금" → "조세", "급여" → "근로소득") | 벡터 공간에서의 의미 매칭 정확도 15-25% 향상, 법률 용어 검색 정밀도 ↑ | 낮음 (JSON 사전 로딩 O(1), 사전 구축 초기 비용만) |
| 동의어 확장 | 질의 키워드에 LLM으로 동의어/관련어 자동 추가 ("연말정산" → "연말결산, year-end settlement") | 검색 재현율(recall) 20-30% 향상, 다양한 표현 커버 | 중간 (gpt-4o-mini 1회 호출, 100-200토큰 $0.001) |
| 질의 분해 | 복합 질의를 독립 하위 질의로 분리 후 병렬 검색 ("세금 줄이는 법 + 주의사항" → 2개 쿼리) | 각 하위 주제별 정밀 검색, 종합 답변 품질 ↑ | 중간 (분해 LLM 1회 + 검색 2배, 전체 20% ↑) |
| HyDE (Hypothetical Document Embeddings) | 질의에 대한 가상 답변 문서 생성 후 해당 문서 임베딩으로 실제 문서 검색 | dense retrieval 성능 10-20% 향상, 질의-문서 임베딩 차원 정렬 | 높음 (답변 생성 LLM + 2회 임베딩, $0.005/쿼리) |
- 핵심 포인트: 질의 변환의 각 기법은 독립적으로 사용할 수도 있고, 본 시나리오처럼 파이프라인으로 연결하여 순차 적용할 수도 있다. 실무에서는 "사전 변환 → 분해/확장"의 2단계 파이프라인이 비용 대비 효과가 가장 좋다.
- 관련 개념: 질의 변환의 단계별 효과
[원본 질의]
"직장인 세금이랑 프리랜서 세금 차이가 뭐야?"
↓ [1단계: 사전 변환 (일상어 → 전문어)]
"근로소득자의 소득세와 사업소득자의 소득세 차이는?"
↓ [2단계: 복합 질의 분해]
하위 질의 1: "근로소득자의 소득세 계산 방법"
하위 질의 2: "사업소득자의 소득세 계산 방법"
하위 질의 3: "근로소득과 사업소득의 세금 차이"
↓ [3단계: 동의어 확장 (각 하위 질의별)]
하위 질의 1: "근로소득자의 소득세 계산 방법"
확장 1-1: "근로소득세의 과세 체계"
확장 1-2: "급여소득에 대한 세금 산정"
하위 질의 2: ...
하위 질의 3: ...
→ 최종: 9개의 검색 질의로 벡터 DB를 검색 → 중복 제거 → 종합
- 시나리오: 다단계 질의 변환 파이프라인 (사전변환 → 확장 → 분해)
class QueryTransformer:
"""
다단계 질의 변환 파이프라인.
원본 질의를 여러 단계에 걸쳐 최적화한다.
동작:
1. 도메인 사전 변환 (일상어 → 전문어)
2. 동의어 확장 (검색 범위 확장)
3. 복합 질의 분해 (필요 시)
4. 최종 검색 질의 목록 반환
LangChain MultiQueryRetriever와의 차이점:
- 도메인 사전 기반 전처리 단계 추가
- 복합 질의 판단 및 분해 로직 내장
- 확장과 분해의 순서를 제어 가능
- 최대 질의 수를 제한하여 비용 통제
"""
def __init__(self, llm, domain_dict: str = ""):
# llm: 질의 변환에 사용할 LLM
self.llm = llm
# domain_dict: 도메인 용어 사전 (일상어 → 전문어 매핑 문자열)
self.domain_dict = domain_dict
def transform(self, query: str) -> list[str]:
"""질의를 변환하여 검색 질의 목록 반환
반환 흐름:
- 단순 질의 → [변환된 질의, 확장1, 확장2] (3개)
- 복합 질의 → [하위질의1, 확장1-1, 확장1-2, 하위질의2, ...] (최대 15개)
"""
# 1단계: 도메인 사전 기반 용어 변환
transformed = self._apply_dictionary(query)
# 2단계: 복합 질의 여부 판단
if self._is_complex_query(transformed):
# 복합 질의: 분해 후 각각 확장
sub_queries = self._decompose(transformed)
all_queries = []
for sq in sub_queries:
all_queries.append(sq) # 하위 질의 자체
all_queries.extend(self._expand(sq)) # 하위 질의의 동의어 확장
return list(set(all_queries)) # 중복 제거
else:
# 단순 질의: 확장만 수행
expanded = self._expand(transformed)
return [transformed] + expanded
def _apply_dictionary(self, query: str) -> str:
"""도메인 사전 기반 용어 변환 (LLM 사용)
사전을 참고하여 일상어를 전문어로 치환하되,
원래 의미를 변경하지 않도록 LLM에 지시한다.
"""
if not self.domain_dict:
return query # 사전이 없으면 원본 반환
response = self.llm.invoke(
f"사전을 참고하여 질문의 일상 용어를 전문 용어로 변환하세요. "
f"의미를 바꾸지 마세요.\n"
f"사전:\n{self.domain_dict}\n\n"
f"질문: {query}\n변환된 질문:"
)
return response.content.strip()
def _is_complex_query(self, query: str) -> bool:
"""복합 질의 여부 판단
복합 질의의 기준:
1. 접속사/비교 표현이 포함된 경우 ("그리고", "비교", "차이" 등)
2. 질의 길이가 80자를 초과하는 경우 (긴 질의는 복합적일 가능성이 높음)
"""
indicators = ["그리고", "또한", "및", "와/과", "비교", "차이",
"각각", "모두", "여러"]
return any(ind in query for ind in indicators) or len(query) > 80
def _decompose(self, query: str) -> list[str]:
"""복합 질의를 독립적인 하위 질의들로 분해"""
response = self.llm.invoke(
f"다음 복합 질문을 독립적인 하위 질문들로 분해하세요.\n"
f"한 줄에 하나씩 작성하세요.\n\n"
f"질문: {query}\n\n하위 질문:"
)
# LLM 응답에서 불필요한 접두사 제거 (번호, 대시 등)
sub_queries = [q.strip().lstrip("- ").lstrip("0123456789. ")
for q in response.content.strip().split("\n")
if q.strip()]
return sub_queries[:5] # 최대 5개로 제한 (비용 통제)
def _expand(self, query: str) -> list[str]:
"""동의어/유사 표현으로 질의 확장
같은 의미를 다른 방식으로 표현한 질의를 생성하여
벡터 검색의 재현율(recall)을 높인다.
"""
response = self.llm.invoke(
f"다음 질문을 다른 2가지 표현으로 바꿔주세요.\n"
f"한 줄에 하나씩 작성하세요.\n\n"
f"질문: {query}\n\n다른 표현:"
)
expanded = [q.strip().lstrip("- ").lstrip("0123456789. ")
for q in response.content.strip().split("\n")
if q.strip()]
return expanded[:2] # 최대 2개로 제한 (비용 통제)
- 파라미터 정의 기준: `_decompose`에서 최대 5개, `_expand`에서 최대 2개로 제한하는 이유는 검색 비용과 품질의 균형이다. 복합 질의를 5개로 분해하고 각각 2개씩 확장하면 최대 5 + 5*2 = 15개의 검색 질의가 생긴다. 각 질의마다 벡터 DB 검색이 수행되므로, 질의 수가 많으면 검색 시간이 선형으로 증가한다. 실무에서는 10~15개가 적정 상한이다.
- 도메인 사전 설계 가이드
- 도메인 사전은 질의 변환의 핵심 자원이다. 효과적인 사전 설계를 위한 기준:
| 항목 | 설명 | 예시 |
| 일상어 → 전문어 | 사용자가 쓰는 일상 표현을 세법 문서의 공식 전문 용어로 매핑 | "직장인" → "근로소득자", "세금" → "조세", "월세" → "임대소득" |
| 축약어 → 전체명 | 자주 쓰이는 축약어/줄임말을 전체 명칭으로 자동 전환 | "연정" → "연말정산", "양도세" → "양도소득세", "부가세" → "부가가치세" |
| 유사어 통일 | 동일 개념의 다양한 일상 표현을 표준 용어로 통일 | "월급/봉급/급여" → "근로소득", "절세/세금절약" → "조세절감" |
| 오타/비표준어 | 자주 발생하는 오타나 비표준 표현을 정정 매핑 | "세금공제" → "세액공제", "소득세율" → "소득세세율", "환급금" → "환급세액" |
# 효과적인 도메인 사전 예시 (세법 도메인)
domain_dict = """직장인 → 근로소득자
프리랜서 → 사업소득자
월급 → 근로소득
세금 → 소득세
연말정산 → 근로소득세 연말정산
공제 → 소득공제, 세액공제
세금 줄이기 → 절세, 세액공제
알바 → 일용근로소득자
투잡 → 겸업소득"""
- 실무 권장: 도메인 사전은 실제 사용자 질의 로그를 분석하여 지속적으로 보강해야 한다. 검색 실패(관련 문서를 찾지 못한 경우)가 발생한 질의를 수집하고, 해당 질의에 어떤 전문어 매핑이 필요한지 분석하여 사전에 추가하는 것이 가장 효과적인 사전 개선 방법이다.
- 사용 예시:
# QueryTransformer 초기화
transformer = QueryTransformer(
llm=ChatOpenAI(model="gpt-4o-mini", temperature=0.3), # 약간의 창의성을 위해 temperature=0.3
domain_dict="직장인 → 근로소득자\n월급 → 근로소득\n세금 → 소득세"
)
# 복합 질의 변환 실행
queries = transformer.transform("직장인 세금이랑 프리랜서 세금 차이가 뭐야?")
# → ["근로소득자의 소득세와 사업소득자의 소득세 차이는?",
# "근로소득자의 소득세 계산 방법",
# "근로소득세의 과세 체계",
# "사업소득자의 소득세 계산 방법",
# "사업소득세의 과세 체계",
# "근로소득과 사업소득의 세금 비교"]
# 변환된 모든 질의로 검색 수행 후 결과 통합
all_docs = []
for q in queries:
docs = retriever.invoke(q)
all_docs.extend(docs)
# 중복 제거 (동일한 page_content를 가진 문서는 하나만 유지)
unique_docs = list({doc.page_content: doc for doc in all_docs}.values())
print(f"총 검색 질의: {len(queries)}개 → 고유 문서: {len(unique_docs)}개")
7. 프롬프트 커스텀
- 프롬프트 커스텀이 필요한 상황
- 프롬프트(Prompt)는 LLM에게 "어떻게 답변할 것인지"를 지시하는 핵심 요소이다. LangChain의 기본 `ChatPromptTemplate`은 정적인 프롬프트를 사용하지만, 실무에서는 질의 유형과 컨텍스트 상태에 따라 프롬프트를 동적으로 조립해야 최적의 답변 품질을 얻을 수 있다.
- 정적 프롬프트 vs 동적 프롬프트
| 기준 | 정적 프롬프트 | 동적 프롬프트 |
| 구성 방식 | 모든 질의에 동일한 고정 템플릿 사용 (system + {{query}} + context) | 질의 분석(LLM 라우터/regex) 후 실시간 컴포넌트 조립 (if 비교 then 표 형식 요구) |
| 답변 형식 | 항상 같은 형식 (문단/불릿) | 질의 유형별 최적 형식: 비교→표, 절차→번호 리스트, 계산→수식+결과, 정의→간결 |
| 환각 제어 | 고정된 지시어 ("컨텍스트 외 답변 금지") | 컨텍스트 품질/길이에 따라 강도 조절 ("약한 컨텍스트: 엄격 모드", 검색 적중률 기반) |
| 출처 표시 | 항상 요구하거나 항상 생략 | 선택적 요구 (복잡 질의/참조 다중 시 표시, 단순 사실은 생략 옵션) |
| 적합 상황 | 단순 Q&A, 프로토타입, 비용 최적화 | 다양한 질의 유형 처리 프로덕션, 사용자 경험 최적화, A/B 테스트 |
- 관련 개념: 프롬프트의 구성 요소
- 효과적인 RAG 프롬프트는 다음 요소들로 구성된다:
[프롬프트 구조]
1. 역할 정의 (System)
→ "당신은 세법 전문 AI 어시스턴트입니다."
2. 답변 형식 지시 (Instruction)
→ "비교 항목을 표 형태로 정리하세요." (질의 유형별 동적 변경)
3. 제약 조건 (Constraints)
→ "컨텍스트에 없는 정보는 추측하지 마세요." (환각 방지)
→ "출처를 명시하세요." (투명성)
→ "정보가 부족하면 부족하다고 명시하세요." (컨텍스트 품질 대응)
4. 컨텍스트 (Context)
→ 검색된 문서 내용 (변수로 주입)
5. 질문 (Question)
→ 사용자 질의 (변수로 주입)
- 시나리오: 동적 프롬프트 생성기 - 질의 특성에 따라 프롬프트 자동 조립
from langchain_core.prompts import ChatPromptTemplate
class DynamicPromptBuilder:
"""
질의 유형, 컨텍스트 품질, 사용자 요구에 따라
동적으로 프롬프트를 조립하는 빌더.
동작:
1. 질의 유형 감지 (설명, 비교, 계산, 절차 등)
2. 컨텍스트 품질 평가 (충분/부족)
3. 적절한 프롬프트 요소 조합
정적 ChatPromptTemplate과의 차이점:
- 질의 유형별로 다른 답변 형식 지시
- 컨텍스트 부족 시 자동으로 보수적 답변 유도
- 출처 요구 등 옵션을 동적으로 추가/제거
"""
# 프롬프트 구성 요소: 시스템 기본 역할
SYSTEM_BASE = "당신은 전문 AI 어시스턴트입니다."
# 질의 유형별 답변 형식 지시 (각 유형에 최적화된 출력 형식)
INSTRUCTION_TEMPLATES = {
"definition": "용어의 정의를 명확하고 간결하게 설명하세요.",
"comparison": "각 항목의 특징을 표 형태로 비교하세요.",
"procedure": "단계별로 번호를 매겨 절차를 설명하세요.",
"calculation": "계산 과정을 단계별로 보여주세요. 수식을 포함하세요.",
"analysis": "핵심 포인트를 분석하고 시사점을 제시하세요.",
"general": "질문에 정확하고 도움이 되도록 답변하세요."
}
# 제약 조건 (필요에 따라 조합하여 사용)
CONSTRAINTS = {
"with_source": "반드시 답변의 근거가 되는 출처를 [출처: ...] 형태로 명시하세요.",
"no_hallucination": "컨텍스트에 없는 정보는 절대 추측하지 마세요. "
"'관련 정보를 찾을 수 없습니다'라고 답하세요.",
"low_context": "참고할 정보가 제한적입니다. 가능한 범위 내에서만 답하고 "
"추가 확인이 필요한 부분을 명시하세요."
}
def __init__(self, llm):
# llm: 질의 유형 감지에 사용 (현재는 규칙 기반이므로 미사용)
self.llm = llm
def build(self, query: str, context_docs: list[Document],
require_source: bool = True) -> ChatPromptTemplate:
"""질의와 컨텍스트에 맞는 프롬프트 동적 생성
Args:
query: 사용자 질의
context_docs: 검색된 문서 리스트
require_source: 출처 표시 요구 여부 (기본값: True)
Returns:
ChatPromptTemplate: 동적으로 조립된 프롬프트
"""
# 질의 유형 감지 (키워드 기반)
query_type = self._detect_query_type(query)
# 컨텍스트 품질 평가
context_quality = self._assess_context_quality(query, context_docs)
# 프롬프트 요소를 순차적으로 조립
system_parts = [self.SYSTEM_BASE]
# 질의 유형에 맞는 답변 형식 지시 추가
system_parts.append(
self.INSTRUCTION_TEMPLATES.get(query_type, self.INSTRUCTION_TEMPLATES["general"])
)
# 출처 표시가 요구되면 해당 제약 추가
if require_source:
system_parts.append(self.CONSTRAINTS["with_source"])
# 환각 방지 제약은 항상 포함
system_parts.append(self.CONSTRAINTS["no_hallucination"])
# 컨텍스트 품질이 낮으면 보수적 답변 유도 제약 추가
if context_quality == "low":
system_parts.append(self.CONSTRAINTS["low_context"])
# 모든 구성 요소를 줄바꿈으로 연결하여 시스템 메시지 생성
system_message = "\n".join(system_parts)
return ChatPromptTemplate.from_messages([
("system", system_message),
("human", "컨텍스트:\n{context}\n\n질문: {question}")
])
def _detect_query_type(self, query: str) -> str:
"""키워드 기반 질의 유형 감지
규칙 기반으로 동작하므로 LLM 호출 비용 없음.
더 정확한 감지가 필요하면 LLM 기반으로 교체 가능.
"""
type_keywords = {
"definition": ["이란", "무엇", "뜻", "정의", "개념"],
"comparison": ["차이", "비교", "vs", "다른점", "구분"],
"procedure": ["방법", "절차", "과정", "어떻게", "순서"],
"calculation": ["계산", "얼마", "금액", "세액", "세율"],
"analysis": ["분석", "영향", "효과", "시사점", "평가"],
}
for qtype, keywords in type_keywords.items():
if any(kw in query for kw in keywords):
return qtype
return "general"
def _assess_context_quality(self, query: str, docs: list[Document]) -> str:
"""컨텍스트 품질 평가
검색 결과의 양과 질을 기준으로 평가:
- none: 검색 결과 없음 (답변 불가 가능성 높음)
- low: 검색 결과가 빈약함 (보수적 답변 필요)
- sufficient: 충분한 검색 결과 (정상 답변 가능)
"""
if not docs:
return "none"
# 전체 컨텍스트 길이로 품질 판단
total_length = sum(len(d.page_content) for d in docs)
if total_length < 200:
return "low" # 200자 미만이면 정보 부족
return "sufficient"
- 파라미터 정의 기준: `_assess_context_quality`에서 200자를 기준으로 사용하는 이유는, 일반적으로 200자 미만의 컨텍스트로는 의미 있는 답변을 생성하기 어렵기 때문이다. 이 임계값은 도메인에 따라 조정 가능하다. 예를 들어 법률 문서는 조항 하나가 200자 이상일 수 있으므로 임계값을 높일 수 있다.
- 동적 프롬프트 조립 과정 예시
[질의] "근로소득세와 종합소득세 차이점은?"
1. 유형 감지: "차이" 키워드 → comparison
2. 컨텍스트 평가: 문서 4개, 총 3200자 → sufficient
3. 출처 요구: True
[조립된 프롬프트]
System:
당신은 전문 AI 어시스턴트입니다.
각 항목의 특징을 표 형태로 비교하세요. ← comparison 유형
반드시 답변의 근거가 되는 출처를 [출처: ...] 형태로 명시하세요. ← 출처 요구
컨텍스트에 없는 정보는 절대 추측하지 마세요. ← 환각 방지 (항상 포함)
(low_context 제약은 컨텍스트가 sufficient이므로 추가되지 않음)
- 실무 권장: 프롬프트 구성 요소를 딕셔너리로 관리하면, 도메인별로 다른 제약 조건 세트를 쉽게 전환할 수 있다. 예를 들어 의료 도메인에서는 `"disclaimer": "의학적 조언이 아님을 명시하세요."` 같은 제약을 추가할 수 있다. 또한 A/B 테스트를 통해 어떤 프롬프트 조합이 가장 좋은 답변 품질을 내는지 실험하는 것이 중요하다.
- 사용 예시:
# DynamicPromptBuilder 초기화
prompt_builder = DynamicPromptBuilder(llm=llm)
# 검색 수행
docs = retriever.invoke("근로소득세와 종합소득세 차이")
# 동적 프롬프트 생성 (질의 유형과 컨텍스트 품질에 맞게 자동 조립)
dynamic_prompt = prompt_builder.build(
query="근로소득세와 종합소득세 차이",
context_docs=docs,
require_source=True # 출처 표시 요구
)
# 체인 구성 및 실행
chain = dynamic_prompt | llm | StrOutputParser()
result = chain.invoke({
"context": "\n".join(d.page_content for d in docs),
"question": "근로소득세와 종합소득세 차이"
})
# → 비교 유형이 감지되어 표 형태로 답변이 생성됨
8. 답변 후처리 커스텀
- 답변 후처리 커스텀이 필요한 상황
- LLM이 생성한 답변은 항상 정확하지 않다. RAG에서 컨텍스트를 제공하더라도 LLM이 컨텍스트에 없는 내용을 추가(환각)하거나, 질문에 완전히 답하지 못하는 경우가 발생한다. 답변 후처리는 이러한 문제를 자동으로 감지하고 보정하는 마지막 품질 관리 단계이다.
- 답변 후처리가 해결하는 문제
| 문제 | 설명 | 후처리 해결 방식 |
| 환각 (Hallucination) | 컨텍스트에 없는 사실/수치를 생성하여 신뢰성 저하 | LLM Judge로 faithfulness 검증 (score < 0.9 시 재생성 or "확인 불가" 반환), 컨텍스트 주장 추출 후 cross-check |
| 불완전 답변 | 질문의 핵심 부분 누락 또는 일부만 다룸 | answer_relevancy 체크 + 질문 키워드 커버리지 확인 (<80% 시 후속 질의 생성 후 병합), 완전성 점수화 |
| 출처 누락 | 답변의 사실 근거 문서/위치 미표시로 투명성 부족 | 컨텍스트 청크별 인덱스/메타데이터 자동 삽입 ("[출처: 문서1, 청크3]"), citation 링크 생성 |
| 불일치 형식 | 요청 형식(표/리스트) 미준수 또는 스타일 불균일 | 출력 파싱 + 표준 템플릿 재포맷 (markdown 표 변환, 번호 리스트 정규화), 정규식/LLM 구조화 |
- 관련 개념: 환각 검증의 두 가지 기준
- LLM 답변의 품질을 검증할 때 두 가지 기준을 사용한다:
[충실성 (Groundedness/Faithfulness)]
"답변이 컨텍스트에 근거하는가?"
→ 컨텍스트에 없는 내용을 만들어낸다면 환각
[관련성 (Relevance/Completeness)]
"답변이 질문에 완전히 답하는가?"
→ 질문의 핵심을 빠뜨린다면 불완전 답변
- 시나리오: 답변 검증, 포맷팅, 출처 추가를 자동 수행하는 후처리기
import re
class AnswerPostProcessor:
"""
생성된 답변의 품질을 자동 검증하고,
일관된 포맷으로 변환하는 후처리기.
동작:
1. 환각 체크 (컨텍스트에 근거하지 않는 주장 식별)
2. 불완전 답변 보완 요청
3. 출처 정보 자동 추가
4. 일관된 포맷 적용
RAG 파이프라인에서의 위치:
LLM 답변 생성 → [AnswerPostProcessor] → 최종 사용자 응답
"""
def __init__(self, llm):
# llm: 답변 검증과 재생성에 사용할 LLM
self.llm = llm
def process(self, answer: str, context_docs: list[Document],
question: str) -> dict:
"""답변 후처리 수행
처리 흐름:
1. 답변 검증 (충실성 + 완전성)
2. 검증 실패 시 재생성
3. 출처 정보 추출 및 추가
4. 최종 포맷팅
Returns:
dict: {
"answer": 포맷팅된 최종 답변,
"validation": 검증 결과,
"sources": 출처 리스트,
"was_regenerated": 재생성 여부
}
"""
# 1. 기본 검증 (충실성 + 완전성)
validation = self._validate(answer, context_docs, question)
# 2. 검증 불합격 시 문제점을 명시하여 재생성
if not validation["is_valid"]:
answer = self._regenerate(question, context_docs, validation["issues"])
validation = self._validate(answer, context_docs, question)
# 3. 컨텍스트 문서에서 출처 정보 추출
sources = self._extract_sources(context_docs)
# 4. 답변과 출처를 일관된 포맷으로 결합
formatted = self._format_answer(answer, sources)
return {
"answer": formatted,
"validation": validation,
"sources": sources,
"was_regenerated": not validation.get("first_pass", True)
}
def _validate(self, answer: str, context_docs: list, question: str) -> dict:
"""LLM으로 답변의 충실성과 완전성을 검증
검증 기준:
1. grounded: 답변이 컨텍스트에 근거하는가? (환각 체크)
2. complete: 질문에 완전히 답하는가? (완전성 체크)
에러 핸들링:
- LLM 호출 실패 시 검증 불합격(is_valid=False)으로 처리하여 재생성 유도
- LLM 응답 파싱 실패 시에도 안전하게 기본값 반환
- 검증 단계의 실패가 전체 파이프라인을 중단하지 않도록 보호
"""
import logging
logger = logging.getLogger(__name__)
context = "\n".join([d.page_content for d in context_docs])
try:
response = self.llm.invoke(
f"다음 답변을 검증하세요.\n\n"
f"질문: {question}\n"
f"컨텍스트: {context[:2000]}\n" # 비용 절감을 위해 2000자로 제한
f"답변: {answer}\n\n"
f"확인사항:\n"
f"1. 답변이 컨텍스트에 근거하는가? (yes/no)\n"
f"2. 질문에 완전히 답하는가? (yes/no)\n"
f"3. 문제점이 있다면 기술하세요.\n"
f"형식: grounded:yes/no|complete:yes/no|issues:설명"
)
content = response.content
# 응답이 비어 있는 경우 방어 처리
if not content or not content.strip():
logger.warning("검증 LLM이 빈 응답 반환 → 검증 불합격 처리")
return {
"is_valid": False,
"grounded": False,
"complete": False,
"issues": "검증 LLM이 빈 응답을 반환하여 검증 불가"
}
# 응답에서 검증 결과 파싱
grounded = "grounded:yes" in content.lower()
complete = "complete:yes" in content.lower()
issues_match = re.search(r'issues:(.+)', content, re.IGNORECASE)
issues = issues_match.group(1).strip() if issues_match else ""
return {
"is_valid": grounded and complete, # 둘 다 yes여야 합격
"grounded": grounded,
"complete": complete,
"issues": issues
}
except Exception as e:
# LLM 호출 또는 파싱 실패 시 → 검증 불합격으로 처리
# 이렇게 하면 재생성(_regenerate)이 시도되어 더 안전한 답변을 받을 수 있음
logger.error(f"검증 실패: {e} → 검증 불합격 처리")
return {
"is_valid": False,
"grounded": False,
"complete": False,
"issues": f"검증 프로세스 에러: {str(e)}"
}
def _regenerate(self, question: str, context_docs: list, issues: str) -> str:
"""문제점을 반영하여 답변 재생성
이전 답변의 문제점을 LLM에 알려주고,
컨텍스트에만 근거하여 다시 답변하도록 지시한다.
"""
context = "\n".join([d.page_content for d in context_docs])
response = self.llm.invoke(
f"이전 답변에 문제가 있었습니다: {issues}\n"
f"컨텍스트에만 근거하여 다시 답변하세요.\n\n"
f"컨텍스트: {context[:2000]}\n"
f"질문: {question}\n답변:"
)
return response.content
def _extract_sources(self, docs: list[Document]) -> list[str]:
"""컨텍스트 문서의 메타데이터에서 출처 정보 추출"""
sources = set() # 중복 제거를 위해 set 사용
for doc in docs:
source = doc.metadata.get("source", "")
article = doc.metadata.get("article", "")
if article:
# 법률 문서 등에서 조항 정보가 있으면 함께 표시
sources.add(f"{source} - {article}")
elif source:
sources.add(source)
return sorted(list(sources)) # 정렬하여 일관된 순서 보장
def _format_answer(self, answer: str, sources: list[str]) -> str:
"""답변과 출처를 일관된 포맷으로 결합"""
formatted = answer.strip()
# 출처가 있으면 구분선 아래에 참조 출처 섹션 추가
if sources:
formatted += "\n\n---\n**참조 출처:**\n"
for src in sources:
formatted += f"- {src}\n"
return formatted
- 파라미터 정의 기준: `_validate`에서 `context[:2000]`으로 제한하는 이유는 검증 LLM 호출의 비용을 줄이기 위함이다. 전체 컨텍스트를 전달하면 정확도는 높아지지만, 대부분의 경우 2000자로도 충분히 환각을 감지할 수 있다. 또한 `_regenerate`는 최대 1회만 수행하는 것이 일반적이다. 무한 재생성 루프를 방지하기 위해, 2회 연속 실패 시에는 재생성을 포기하고 원본을 반환하는 로직을 추가하는 것이 안전하다.
- 검증 실패와 재생성의 흐름
[검증 흐름]
LLM 답변 생성
↓
검증: grounded? + complete?
↓
[합격] ──→ 출처 추가 → 포맷팅 → 반환
↓
[불합격] ──→ 문제점 반영 재생성
↓
재검증: grounded? + complete?
↓
[합격/불합격 무관] ──→ 출처 추가 → 포맷팅 → 반환
- 실무 권장: 검증과 재생성은 추가적인 LLM 호출을 수반하므로, 모든 답변에 적용하면 비용이 2~3배 증가한다. 프로덕션에서는 (1) 중요도가 높은 질의(금융, 법률, 의료)에만 적용하거나, (2) 사용자가 "정확성 우선" 옵션을 선택한 경우에만 적용하는 등 선택적 사용을 권장한다.
- 사용 예시:
# AnswerPostProcessor 초기화
post_processor = AnswerPostProcessor(llm=llm)
# RAG 체인에서 답변 생성 후 후처리 적용
raw_answer = rag_chain.invoke({"input": "근로소득세 계산 방법"})
# 후처리 수행 (검증 → 재생성(필요 시) → 출처 추가 → 포맷팅)
result = post_processor.process(
answer=raw_answer["answer"],
context_docs=raw_answer["context"],
question="근로소득세 계산 방법"
)
# 결과 확인
print(result["answer"]) # 포맷팅된 최종 답변 (출처 포함)
print(f"검증 결과: {result['validation']}") # {'is_valid': True, 'grounded': True, ...}
print(f"재생성 여부: {result['was_regenerated']}") # False (첫 답변이 합격한 경우)
9. 전체 파이프라인 커스텀 통합
- 통합 파이프라인의 필요성
- 지금까지 각 단계별로 독립적인 커스텀 컴포넌트를 구현했다. 이 섹션에서는 모든 컴포넌트를 하나의 파이프라인으로 통합하여, 문서 인덱싱부터 질의 응답까지 전체 과정을 자동화하는 방법을 다룬다.
- 통합 파이프라인의 구성 요소
| 단계 | 컴포넌트 | 역할 | 호출 시점 |
| 인덱싱 1 | SmartDocumentLoader | 다양한 포맷 문서 자동 로딩 + 풍부한 메타데이터 추출 (카테고리/키워드 자동 생성) | index_documents(docs_path) 초기 호출 |
| 인덱싱 2 | AdaptiveChunker | 문서 유형별 최적 청킹 (법률:1500+overlap, FAQ:300) + position 태깅 | index_documents() 내 SequentialChain |
| 인덱싱 3 | DomainEnhancedEmbeddings | 도메인 사전 적용 + 캐싱된 임베딩 생성 (text-embedding-3-large) | index_documents() 청크별 for 루프 |
| 인덱싱 4 | VectorDBManager | 중복 체크 + 증분 업데이트 + 컬렉션 관리 (Chroma/Pinecone) | index_documents() 최종 커밋 |
| 질의 1 | QueryTransformer | 일상어→전문어 변환 + 동의어 확장 + 하이퍼파라미터 동적 결정 | query(user_input) 첫 번째 |
| 질의 2 | IntelligentRetriever | 하이브리드 검색 (similarity+MMR) + 재순위 (cross-encoder) + 필터링 | query() 검색 단계 |
| 질의 3 | DynamicPromptBuilder | 질의 유형 분석 → 최적 프롬프트 템플릿 + few-shot 동적 삽입 | query() 생성 전 |
| 질의 4 | LLM | 컨텍스트+프롬프트로 답변 생성 (gpt-4o-mini/gpt-4o) | query() core generation |
| 질의 5 | AnswerPostProcessor | 환각 검증 + 형식 정규화 + 출처 추가 + 완전성 보완 | query() 최종 출력 전 |
시나리오: 모든 커스텀 컴포넌트를 하나의 파이프라인으로 조립
from langchain_core.output_parsers import StrOutputParser
class CustomRAGPipeline:
"""
모든 커스텀 컴포넌트를 통합한 완전 커스텀 RAG 파이프라인.
구성:
1. SmartDocumentLoader - 지능형 문서 로딩
2. AdaptiveChunker - 적응형 청킹
3. DomainEnhancedEmbeddings - 도메인 강화 임베딩
4. VectorDBManager - 벡터 DB 관리
5. QueryTransformer - 다단계 질의 변환
6. IntelligentRetriever - 지능형 검색
7. DynamicPromptBuilder - 동적 프롬프트
8. AnswerPostProcessor - 답변 후처리
사용 패턴:
1. 초기화: pipeline = CustomRAGPipeline(doc_dir, db_dir, domain_dict)
2. 인덱싱: pipeline.index_documents() (초기 구축 또는 문서 변경 시)
3. 질의: result = pipeline.query("질문") (사용자 질의 처리)
LLM 모델 사용 전략:
- gpt-4o: 최종 답변 생성 (품질 최우선)
- gpt-4o-mini: 메타데이터 추출, 의도 분석, 재순위, 검증 등 보조 작업 (비용 절감)
"""
def __init__(self, doc_dir: str, db_dir: str, domain_dict: str = ""):
# 메인 LLM (답변 생성용 - 최고 품질 모델)
self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
# 보조 LLM (메타데이터 추출, 의도 분석, 검증 등 - 비용 최적화 모델)
self.llm_mini = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# 각 컴포넌트 초기화 (위 섹션에서 구현한 클래스 사용)
self.loader = SmartDocumentLoader(doc_dir, llm=self.llm_mini)
self.chunker = AdaptiveChunker()
self.embedding = DomainEnhancedEmbeddings(
domain_terms=self._parse_dict(domain_dict)
)
self.db_manager = VectorDBManager(db_dir, self.embedding)
self.transformer = QueryTransformer(self.llm_mini, domain_dict)
self.prompt_builder = DynamicPromptBuilder(self.llm_mini)
self.post_processor = AnswerPostProcessor(self.llm_mini)
# 검색기는 인덱싱 후 초기화
self.retriever = None
def _parse_dict(self, dict_str: str) -> dict:
"""도메인 사전 문자열을 딕셔너리로 파싱
입력: "직장인 → 근로소득자\n월급 → 근로소득"
출력: {"직장인": "근로소득자", "월급": "근로소득"}
"""
terms = {}
for line in dict_str.strip().split("\n"):
if "→" in line:
key, value = line.split("→", 1)
terms[key.strip()] = value.strip()
return terms
def index_documents(self):
"""문서 인덱싱 (초기 구축 또는 업데이트)
처리 흐름:
1. 로딩: 디렉토리 내 모든 지원 파일을 로딩하고 메타데이터 추출
2. 청킹: 문서 유형별 최적 전략으로 청킹
3. DB 업데이트: 변경된 청크만 증분 업데이트
4. 검색기 초기화: 업데이트된 벡터 DB로 검색기 생성
"""
# 1. 로딩 (LLM으로 메타데이터 자동 추출)
documents = self.loader.load_all()
print(f"로딩 완료: {len(documents)}개 문서")
# 2. 청킹 (문서 유형에 따라 자동으로 최적 전략 선택)
chunks = self.chunker.chunk(documents)
print(f"청킹 완료: {len(chunks)}개 청크")
# 3. 벡터 DB 증분 업데이트 (변경된 청크만 처리)
result = self.db_manager.update(chunks)
print(f"DB 업데이트: {result}")
# 4. 검색기 초기화 (업데이트된 DB로 IntelligentRetriever 생성)
db = self.db_manager.get_db()
self.retriever = IntelligentRetriever(db, self.llm_mini)
def query(self, question: str) -> dict:
"""질의 실행 (전체 커스텀 파이프라인 적용)
처리 흐름:
1. 질의 변환: 사전 변환 + 확장/분해
2. 검색: 변환된 각 질의로 지능형 검색
3. 프롬프트 생성: 질의 유형에 맞는 동적 프롬프트
4. 답변 생성: LLM으로 답변 생성
5. 후처리: 검증 + 출처 추가 + 포맷팅
Returns:
dict: {
"question": 원본 질문,
"answer": 최종 답변,
"validation": 검증 결과,
"sources": 출처 리스트,
"num_docs_retrieved": 검색된 문서 수,
"queries_used": 사용된 검색 질의 목록
}
"""
if not self.retriever:
raise ValueError("먼저 index_documents()를 실행하세요.")
# 1. 질의 변환 (사전 변환 → 분해/확장)
transformed_queries = self.transformer.transform(question)
# 2. 변환된 각 질의로 검색 수행 (의도 분석 → 전략 검색 → 재순위)
all_docs = []
for q in transformed_queries:
docs = self.retriever.retrieve(q)
all_docs.extend(docs)
# 중복 제거 (동일 page_content를 가진 문서는 하나만 유지)
unique_docs = list({d.page_content: d for d in all_docs}.values())
# 3. 동적 프롬프트 생성 (질의 유형과 컨텍스트 품질에 맞게 조립)
prompt = self.prompt_builder.build(question, unique_docs)
# 4. LLM 답변 생성 (상위 5개 문서를 컨텍스트로 사용)
context = "\n\n---\n\n".join(d.page_content for d in unique_docs[:5])
chain = prompt | self.llm | StrOutputParser()
raw_answer = chain.invoke({"context": context, "question": question})
# 5. 답변 후처리 (검증 → 재생성(필요 시) → 출처 추가 → 포맷팅)
result = self.post_processor.process(raw_answer, unique_docs, question)
return {
"question": question,
"answer": result["answer"],
"validation": result["validation"],
"sources": result["sources"],
"num_docs_retrieved": len(unique_docs),
"queries_used": transformed_queries
}
- 파라미터 정의 기준: `gpt-4o`와 `gpt-4o-mini`를 분리하여 사용하는 것은 비용 최적화의 핵심이다. 최종 답변 생성만 고품질 모델(`gpt-4o`)을 사용하고, 나머지 보조 작업(메타데이터 추출, 의도 분석, 검증 등)은 경량 모델(`gpt-4o-mini`)을 사용한다. 이 전략으로 전체 비용을 50~70% 절감하면서도 답변 품질은 유지할 수 있다.
- 전체 파이프라인 동작 흐름 요약
[인덱싱 단계] pipeline.index_documents()
./documents/
├── tax_law.docx ──→ SmartDocumentLoader ──→ [법률] 카테고리, 키워드
├── faq.txt ──→ (다중 포맷 통합) ──→ [FAQ] 카테고리, 키워드
└── guide.pdf ──→ ──→ [일반] 카테고리, 키워드
↓
AdaptiveChunker
├── tax_law → legal (chunk_size=2000, overlap=300)
├── faq → faq (chunk_size=500, overlap=0)
└── guide → general (chunk_size=1000, overlap=150)
↓
DomainEnhancedEmbeddings (도메인 용어 확장 + 캐싱)
↓
VectorDBManager (증분 업데이트 - 변경분만 처리)
[질의 단계] pipeline.query("직장인 세금 줄이는 법")
"직장인 세금 줄이는 법"
↓
QueryTransformer → "근로소득자 소득세 절세 방법" + 확장 질의들
↓
IntelligentRetriever → 의도: procedure → k=5, similarity
↓
DynamicPromptBuilder → procedure 유형 → "단계별로 번호를 매겨..."
↓
GPT-4o → 답변 생성
↓
AnswerPostProcessor → 검증 합격 → 출처 추가 → 포맷팅
↓
- 사용 예시:
# 파이프라인 초기화 (모든 컴포넌트를 한 번에 설정)
pipeline = CustomRAGPipeline(
doc_dir="./documents", # 문서 디렉토리
db_dir="./chroma_custom", # 벡터 DB 저장 경로
domain_dict="직장인 → 근로소득자\n월급 → 근로소득\n세금 → 소득세" # 도메인 사전
)
# 문서 인덱싱 (초기 구축 - 한 번만 실행, 문서 변경 시 재실행)
pipeline.index_documents()
# → 로딩 완료: 5개 문서
# → 청킹 완료: 65개 청크
# → DB 업데이트: {'version': 1, 'added': 65, 'removed': 0, 'unchanged': 0}
# 질의 실행 (사용자 질문 처리)
result = pipeline.query("직장인 세금 계산 방법 알려줘")
# 결과 출력
print(result["answer"])
# → 1. 총급여에서 비과세소득을 제외합니다...
# 2. 근로소득공제를 적용합니다...
# 3. 과세표준에 세율을 적용합니다...
# ---
# **참조 출처:**
# - ./documents/tax_law.docx
print(f"검색 문서 수: {result['num_docs_retrieved']}") # 8
print(f"사용된 질의: {result['queries_used']}") # ['근로소득자 소득세 계산 방법', ...]
print(f"검증: {result['validation']}") # {'is_valid': True, ...}
- 실무 권장: 프로덕션 배포 전에 반드시 다음을 확인하라: (1) 인덱싱 시간과 비용 측정 (문서 수 x 임베딩 비용), (2) 질의당 평균 응답 시간 측정 (질의 변환 + 검색 + 생성 + 후처리), (3) 검증 합격률 측정 (환각 발생 빈도). 이 지표들을 LangSmith 등의 모니터링 도구로 추적하면 파이프라인의 품질을 지속적으로 개선할 수 있다.
10. 실무 확장 가이드
- 프로덕션 배포 시 고려사항
- 프로토타입에서 프로덕션으로 전환할 때 가장 큰 차이는 안정성과 관찰 가능성이다. 위 섹션들에서 각 컴포넌트별 에러 핸들링을 추가했지만, 이 섹션에서는 프로덕션 환경에서 필요한 공통 패턴을 체계적으로 정리한다.
| 영역 | 프로토타입 | 프로덕션 |
| 에러 처리 | 예외 발생 시 즉시 중단 (print traceback) | try-except 블록 + 구조화 로깅 (JSON) + graceful fallback (기본 응답) + exponential backoff 재시도 (3회) |
| 처리 방식 | 순차 싱글 스레드 처리 (for 루프) | asyncio 비동기 + 배치 처리 (10개 동시) + worker pool로 처리량 극대화 (QPS 100+) |
| 모니터링 | print() 또는 간단 logging | LangSmith 트레이싱 + OpenTelemetry + Prometheus 메트릭 (latency, error_rate) + Sentry 에러 추적 |
| 비용 관리 | 무제한 API 호출 (개발자 유료 플랜) | Redis 임베딩/응답 캐싱 + OpenAI rate limit 관리 + tiktoken 기반 실시간 비용 추적 + 예산 초과 시 다운그레이드 |
10.1. 에러 핸들링 패턴
- 프로덕션 RAG 파이프라인에서 발생하는 에러는 크게 세 가지로 분류된다.
| 에러 유형 | 예시 | 권장 처리 | 폴백 전략 |
| 일시적 에러 | API Rate Limit (429), 네트워크 타임아웃, DB 연결 임시 실패 | Exponential backoff 재시도 (1s → 2s → 4s, 최대 3회) + 상세 로깅 (retry count 포함) | 재시도 실패 시 캐시된 최근 결과 반환 or 저사양 모델(gpt-4o-mini)로 다운그레이드 |
| 영구 에러 | 잘못된 API 키 (401), 지원 안 하는 파일 포맷, 스키마 불일치 | 즉시 에러 로깅 (Sentry) + 해당 항목 스킵 + 진행 상황 업데이트 | 해당 컴포넌트 완전 우회 (검색 스킵 시 모델 지식만 사용, 로더 실패 시 다른 소스 우선) |
| 논리 에러 | LLM 응답 파싱 실패 (JSONDecodeError), 빈 검색 결과 (no matches), 환각 검증 실패 | 입력 검증 강화 + 기본값 삽입 + 재시도 1회 | 기본값 사용 ("정보를 확인할 수 없습니다") 또는 단순화 경로 (검색 없이 직접 생성, 기본 프롬프트) |
- 핵심 원칙: 에러 핸들링의 목표는 "실패하지 않는 것"이 아니라, "실패하더라도 가능한 한 유용한 결과를 반환하는 것"이다. 완벽한 답변을 못 주더라도, 부분적인 답변이나 "답변을 생성할 수 없습니다"라는 명시적 응답이 예외로 인한 서버 중단보다 훨씬 낫다.
10.1.1. 시나리오: 재시도 + 폴백 패턴이 적용된 LLM 호출 래퍼
import time
import logging
from functools import wraps
from langchain_openai import ChatOpenAI
logger = logging.getLogger(__name__)
def retry_with_fallback(
max_retries: int = 3,
base_delay: float = 1.0,
fallback_value=None
):
"""LLM 호출에 재시도 + 지수 백오프 + 폴백을 적용하는 데코레이터
파라미터:
- max_retries: 최대 재시도 횟수 (기본: 3)
- base_delay: 첫 번째 재시도까지 대기 시간(초) (기본: 1.0)
이후 재시도마다 2배씩 증가 (지수 백오프)
- fallback_value: 모든 재시도 실패 시 반환할 기본값
지수 백오프 대기 시간 예시:
- 1차 재시도: 1.0초 대기
- 2차 재시도: 2.0초 대기
- 3차 재시도: 4.0초 대기
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_retries:
# 지수 백오프: 재시도 간격을 점진적으로 늘림
delay = base_delay * (2 ** attempt)
logger.warning(
f"[재시도 {attempt + 1}/{max_retries}] "
f"{func.__name__} 실패: {e} → {delay}초 후 재시도"
)
time.sleep(delay)
else:
# 모든 재시도 소진 → 폴백 값 반환
logger.error(
f"[최종 실패] {func.__name__}: {max_retries}회 재시도 "
f"후에도 실패. 에러: {e} → 폴백 값 반환"
)
return fallback_value
return wrapper
return decorator
class ResilientLLMCaller:
"""프로덕션용 LLM 호출 래퍼
기능:
1. 자동 재시도: 일시적 에러(Rate Limit, 타임아웃) 시 지수 백오프로 재시도
2. 폴백 체인: 주 모델 실패 시 보조 모델로 자동 전환
3. 구조화된 로깅: 호출 시간, 토큰 수, 에러 등을 체계적으로 기록
"""
def __init__(self):
# 주 모델: 품질 우선 (비용이 높지만 정확도가 높음)
self.primary_llm = ChatOpenAI(model="gpt-4o", temperature=0)
# 보조 모델: 비용 우선 (폴백용 — 품질은 다소 낮지만 안정적)
self.fallback_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
@retry_with_fallback(max_retries=3, base_delay=1.0)
def _call_primary(self, prompt: str) -> str:
"""주 모델 호출 (재시도 적용)"""
response = self.primary_llm.invoke(prompt)
return response.content
def invoke(self, prompt: str, fallback_response: str = "") -> str:
"""안전한 LLM 호출: 주 모델 → 보조 모델 → 폴백 값 순으로 시도
파라미터:
- prompt: LLM에 전달할 프롬프트
- fallback_response: 모든 모델이 실패했을 때 반환할 기본 응답
반환:
- str: LLM 응답 텍스트 또는 폴백 응답
"""
# 1단계: 주 모델 시도 (자동 재시도 포함)
result = self._call_primary(prompt)
if result is not None:
return result
# 2단계: 주 모델 최종 실패 시 보조 모델로 전환
logger.warning("주 모델 실패 → 보조 모델(gpt-4o-mini)로 전환")
try:
response = self.fallback_llm.invoke(prompt)
return response.content
except Exception as e:
logger.error(f"보조 모델도 실패: {e} → 폴백 응답 반환")
# 3단계: 모든 모델 실패 시 폴백 응답
return fallback_response
# 사용 예시
caller = ResilientLLMCaller()
# 의도 분석에 적용
intent_result = caller.invoke(
prompt="다음 질문의 의도를 분석하세요: '세금 계산 방법은?'\n유형 하나만 답하세요:",
fallback_response="complex" # 실패 시 가장 넓은 검색 전략 사용
)
print(f"의도: {intent_result}")
# → 의도: procedure
# (실패 시) → 의도: complex
- 파라미터 선택 기준: `max_retries=3`, `base_delay=1.0`은 OpenAI의 Rate Limit 정책(분당 요청 수 제한)에 맞춘 기본값이다. Rate Limit 에러의 경우 보통 60초 내에 해제되므로, 3회 재시도(1초 + 2초 + 4초 = 총 7초)면 대부분의 일시적 에러를 극복할 수 있다. Rate Limit이 빈번한 환경에서는 `max_retries=5`, `base_delay=2.0`으로 상향 조정한다.
10.2. 비동기/배치 처리 패턴
- 대량 문서를 인덱싱하거나 여러 질의를 동시에 처리해야 하는 프로덕션 환경에서는 순차 처리가 병목이 된다. Python의 `asyncio`와 LangChain의 비동기 API를 활용하면 처리 속도를 크게 개선할 수 있다.
10.2.1. 처리 방식별 비교
| 처리 방식 | 처리량 | 구현 복잡도 | 적합 상황 |
| 순차 처리 | 낮음 (1 req/s, 블로킹) | 낮음 (for 루프 그대로) | 프로토타입, 로컬 테스트, 디버깅, 소규모 데이터(100건 이하) |
| 배치 처리 | 중간 (N=10-50 묶음, API 배치 효율 5배) | 중간 (배치 크기 관리, 결과 매핑) | 대량 임베딩 생성, 벡터 DB 일괄 삽입, 평가 데이터셋 처리 |
| 비동기 처리 | 높음 (동시 10-50개, asyncio/ThreadPool) | 높음 (await/async 관리, 에러 핸들링 복잡) | 실시간 다중 LLM 호출, 채팅 여러 사용자 동시 처리, 검색 병렬 |
| 배치 + 비동기 | 최고 (QPS 100+, 스케일 무한) | 최고 (Semaphore + Queue + 배치 최적화) | 대규모 프로덕션 (일일 10만 쿼리), 멀티 리전, 트래픽 피크 대응 |
10.2.2. 시나리오: 비동기 배치 파이프라인으로 대량 문서 처리
import asyncio
import logging
from langchain_openai import ChatOpenAI
from langchain_core.documents import Document
logger = logging.getLogger(__name__)
class AsyncBatchProcessor:
"""비동기 배치 처리로 대량 문서/질의를 효율적으로 처리
핵심 개념:
- asyncio.Semaphore: 동시 실행 수를 제한하여 API Rate Limit 방지
- asyncio.gather: 여러 비동기 작업을 동시에 실행
- 배치 분할: 대량 데이터를 작은 묶음으로 나누어 메모리 관리
동작:
1. 입력을 batch_size 크기로 분할
2. 각 배치 내에서 max_concurrent개만 동시 실행
3. 배치 간 간격(batch_delay)으로 Rate Limit 방지
"""
def __init__(
self,
llm=None,
max_concurrent: int = 5,
batch_size: int = 10,
batch_delay: float = 1.0
):
"""
파라미터:
- llm: 사용할 LLM 인스턴스
- max_concurrent: 동시 실행 최대 수 (기본: 5)
→ OpenAI Tier 1 기준 RPM 제한에 맞춤
- batch_size: 배치 크기 (기본: 10)
→ 한 번에 처리할 항목 수
- batch_delay: 배치 간 대기 시간(초) (기본: 1.0)
→ Rate Limit 여유 확보
"""
self.llm = llm or ChatOpenAI(model="gpt-4o-mini", temperature=0)
self.semaphore = asyncio.Semaphore(max_concurrent)
self.batch_size = batch_size
self.batch_delay = batch_delay
async def _process_single(self, item: str, task_type: str) -> dict:
"""단일 항목을 비동기로 처리 (세마포어로 동시 실행 수 제한)
파라미터:
- item: 처리할 텍스트 (문서 또는 질의)
- task_type: 작업 유형 ("embed", "query", "validate" 등)
"""
async with self.semaphore:
try:
# LangChain의 ainvoke로 비동기 LLM 호출
response = await self.llm.ainvoke(
f"[{task_type}] 다음을 처리하세요: {item[:500]}"
)
return {"input": item[:50], "result": response.content, "status": "success"}
except Exception as e:
logger.error(f"비동기 처리 실패: {e}")
return {"input": item[:50], "result": None, "status": "error", "error": str(e)}
async def process_batch(self, items: list[str], task_type: str = "query") -> list[dict]:
"""대량 항목을 배치로 분할하여 비동기 처리
파라미터:
- items: 처리할 항목 리스트
- task_type: 작업 유형
반환:
- list[dict]: 각 항목의 처리 결과 (성공/실패 포함)
"""
all_results = []
# 배치 분할: [0:10], [10:20], [20:30], ...
for i in range(0, len(items), self.batch_size):
batch = items[i:i + self.batch_size]
batch_num = i // self.batch_size + 1
total_batches = (len(items) + self.batch_size - 1) // self.batch_size
logger.info(f"배치 {batch_num}/{total_batches} 처리 중 ({len(batch)}건)")
# 배치 내 항목을 동시에 실행 (세마포어로 max_concurrent개 제한)
tasks = [self._process_single(item, task_type) for item in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 예외가 발생한 항목도 안전하게 처리
for j, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"항목 {i + j} 예외: {result}")
all_results.append({
"input": batch[j][:50],
"result": None,
"status": "exception",
"error": str(result)
})
else:
all_results.append(result)
# 배치 간 대기 (Rate Limit 방지)
if i + self.batch_size < len(items):
await asyncio.sleep(self.batch_delay)
# 처리 요약 로깅
success = sum(1 for r in all_results if r["status"] == "success")
failed = len(all_results) - success
logger.info(f"처리 완료: 성공 {success}건, 실패 {failed}건")
return all_results
# 사용 예시: 대량 질의 배치 처리
async def main():
processor = AsyncBatchProcessor(
max_concurrent=5, # API Rate Limit에 맞게 조절
batch_size=10,
batch_delay=1.0
)
queries = [
"세금 계산 방법은?",
"소득공제 항목 알려줘",
"연말정산 절차는?",
# ... 수십~수백 개의 질의
]
results = await processor.process_batch(queries, task_type="query")
for r in results:
print(f"[{r['status']}] {r['input']}... → {r['result'][:50] if r['result'] else 'N/A'}")
# 실행
# asyncio.run(main())
# → 배치 1/1 처리 중 (3건)
# → 처리 완료: 성공 3건, 실패 0건
# → [success] 세금 계산 방법은?... → 세금은 과세표준에 세율을 적용하여 계산합니다...
# → [success] 소득공제 항목 알려줘... → 주요 소득공제 항목으로는 인적공제, 연금보험료...
# → [success] 연말정산 절차는?... → 연말정산은 다음 절차로 진행됩니다: 1) 소득...
- 파라미터 선택 기준: `max_concurrent=5`는 OpenAI Tier 1(무료 플랜) 기준 분당 60 요청 제한에 안전 마진을 둔 값이다. Tier 3 이상(분당 5,000 요청)이면 `max_concurrent=50`, `batch_size=100`까지 올릴 수 있다. 단, 동시 요청 수를 너무 높이면 Rate Limit 외에도 메모리와 소켓 고갈 문제가 발생할 수 있으므로, 부하 테스트 후 결정하라.
10.3. 콜백을 활용한 모니터링
- LangChain의 콜백(Callback) 시스템은 LLM 호출, 검색, 체인 실행 등 각 단계에서 자동으로 이벤트를 수집하는 메커니즘이다. 이를 활용하면 print() 기반 디버깅을 넘어 구조화된 모니터링을 구축할 수 있다.
10.3.1. LangChain 콜백 이벤트 종류
| 콜백 메서드 | 호출 시점 | 수집 가능 정보 |
| on_llm_start | LLM 생성 요청 직전 | 프롬프트 전체 텍스트, 모델명 (gpt-4o-mini), temperature/max_tokens 등 파라미터, 요청 timestamp |
| on_llm_end | LLM 응답 수신 후 | 생성된 응답 텍스트, input_tokens/output_tokens 사용량, 총 소요 시간 (ms), 메타데이터 |
| on_llm_error | LLM 호출 예외 발생 시 | 에러 타입 (RateLimitError, APIConnectionError), 상세 에러 메시지, 재시도 횟수, 원본 입력 |
| on_retriever_start | 검색기 실행 직전 | 원본 질의어, 검색 파라미터 (k=5, similarity_threshold=0.7), 필터 조건 |
| on_retriever_end | 검색 결과 반환 후 | 검색된 문서 리스트 (content+metadata), 매칭 문서 수, top-k 스코어, 검색 소요 시간 |
| on_chain_start | 체인/파이프라인 시작 시 | 체인 이름 (TaxRAGChain), 전체 입력값 (query+user_id), 실행 config |
| on_chain_end | 전체 체인 완료 후 | 최종 출력값, end-to-end 소요 시간, 중간 메트릭 (검색 hit rate, LLM 토큰 총합) |
10.3.2. 시나리오: RAG 파이프라인 모니터링 콜백 구현
import time
import logging
from datetime import datetime
from langchain_core.callbacks import BaseCallbackHandler
from langchain_openai import ChatOpenAI
logger = logging.getLogger(__name__)
class RAGMonitorCallback(BaseCallbackHandler):
"""RAG 파이프라인의 각 단계를 모니터링하는 커스텀 콜백
수집 메트릭:
1. LLM 호출 횟수, 소요 시간, 토큰 사용량
2. 에러 발생 횟수 및 유형별 집계
3. 전체 파이프라인 실행 시간
내장 콜백과의 차이:
- LangSmith 등 외부 서비스 없이도 핵심 메트릭 수집 가능
- 커스텀 알림 조건(임계값 초과 등) 직접 설정 가능
- 수집 데이터를 원하는 형태(JSON, DB 등)로 저장 가능
"""
def __init__(self):
super().__init__()
# 메트릭 저장소 초기화
self.metrics = {
"llm_calls": 0, # LLM 총 호출 횟수
"total_tokens": 0, # 총 토큰 사용량
"total_cost": 0.0, # 총 비용 (추정)
"errors": [], # 에러 이력
"latencies": [], # 각 호출의 소요 시간(초)
}
self._start_times = {} # 호출별 시작 시간 추적용
def on_llm_start(self, serialized, prompts, **kwargs):
"""LLM 호출 시작 시 호출됨
파라미터:
- serialized: LLM 설정 정보 (모델명, 온도 등)
- prompts: LLM에 전달된 프롬프트 리스트
"""
run_id = kwargs.get("run_id", "unknown")
self._start_times[run_id] = time.time()
self.metrics["llm_calls"] += 1
model_name = serialized.get("kwargs", {}).get("model_name", "unknown")
logger.info(
f"[LLM 호출 #{self.metrics['llm_calls']}] "
f"모델: {model_name}, 프롬프트 길이: {len(str(prompts))}자"
)
def on_llm_end(self, response, **kwargs):
"""LLM 호출 완료 시 호출됨
파라미터:
- response: LLM 응답 객체 (LLMResult)
- response.llm_output: 토큰 사용량 등 메타 정보 포함
"""
run_id = kwargs.get("run_id", "unknown")
start_time = self._start_times.pop(run_id, None)
if start_time:
latency = time.time() - start_time
self.metrics["latencies"].append(latency)
# 토큰 사용량 집계 (OpenAI API 응답에 포함)
if response.llm_output:
token_usage = response.llm_output.get("token_usage", {})
total = token_usage.get("total_tokens", 0)
self.metrics["total_tokens"] += total
# 비용 추정 (gpt-4o 기준: 입력 $2.50/1M, 출력 $10.00/1M)
prompt_tokens = token_usage.get("prompt_tokens", 0)
completion_tokens = token_usage.get("completion_tokens", 0)
cost = (prompt_tokens * 2.50 + completion_tokens * 10.00) / 1_000_000
self.metrics["total_cost"] += cost
logger.info(f"[LLM 완료] 소요: {latency:.2f}초, 토큰: {total}")
# 경고: 응답 시간이 10초를 초과하면 알림
if latency > 10.0:
logger.warning(
f"[느린 응답 경고] {latency:.2f}초 소요 — "
f"프롬프트 최적화 또는 모델 변경 검토 필요"
)
def on_llm_error(self, error, **kwargs):
"""LLM 호출 실패 시 호출됨
파라미터:
- error: 발생한 예외 객체
"""
self.metrics["errors"].append({
"timestamp": datetime.now().isoformat(),
"error_type": type(error).__name__,
"message": str(error)
})
logger.error(f"[LLM 에러] {type(error).__name__}: {error}")
def get_summary(self) -> dict:
"""수집된 메트릭의 요약 반환"""
latencies = self.metrics["latencies"]
return {
"총 LLM 호출": self.metrics["llm_calls"],
"총 토큰 사용": self.metrics["total_tokens"],
"추정 비용(USD)": round(self.metrics["total_cost"], 4),
"평균 응답 시간(초)": round(
sum(latencies) / len(latencies), 2
) if latencies else 0,
"최대 응답 시간(초)": round(max(latencies), 2) if latencies else 0,
"에러 횟수": len(self.metrics["errors"]),
}
# 사용 예시: 콜백을 파이프라인에 연결
monitor = RAGMonitorCallback()
llm = ChatOpenAI(
model="gpt-4o",
temperature=0,
callbacks=[monitor] # 콜백 인스턴스를 LLM에 직접 연결
)
# 파이프라인 실행 (기존 코드와 동일하게 사용)
response = llm.invoke("세금 계산 방법을 설명해주세요.")
print(response.content)
# → 세금은 과세표준에 세율을 적용하여 산출합니다...
# 모니터링 요약 조회
summary = monitor.get_summary()
for key, value in summary.items():
print(f" {key}: {value}")
# → 총 LLM 호출: 1
# → 총 토큰 사용: 356
# → 추정 비용(USD): 0.0023
# → 평균 응답 시간(초): 1.45
# → 최대 응답 시간(초): 1.45
# → 에러 횟수: 0
10.3.3. 확장: 전체 파이프라인에 콜백 통합
# 9장의 CustomRAGPipeline에 콜백을 통합하는 예시
from langchain_openai import ChatOpenAI
# 모니터링 콜백 생성
monitor = RAGMonitorCallback()
# 콜백이 연결된 LLM으로 파이프라인 구성
llm_with_monitor = ChatOpenAI(
model="gpt-4o",
temperature=0,
callbacks=[monitor]
)
# pipeline = CustomRAGPipeline(
# document_dir="./documents",
# llm=llm_with_monitor # 콜백이 연결된 LLM 전달
# )
#
# # 이후 모든 LLM 호출이 자동으로 모니터링됨
# result = pipeline.query("세금 절세 방법은?")
#
# # 질의 완료 후 메트릭 확인
# print(monitor.get_summary())
# → 총 LLM 호출: 4 (질의 변환 + 의도 분석 + 답변 생성 + 검증)
# → 총 토큰 사용: 2,845
# → 추정 비용(USD): 0.0187
# → 평균 응답 시간(초): 1.82
# → 최대 응답 시간(초): 3.21
# → 에러 횟수: 0
- 실무 권장: LangSmith를 사용할 수 있는 환경이라면 `LangSmithCallbackHandler`를 함께 연결하여 트레이싱과 시각화를 활용하는 것이 가장 효과적이다. 그러나 보안 정책상 외부 서비스 연동이 불가한 환경에서는 위와 같은 커스텀 콜백으로 핵심 메트릭을 수집하고, JSON 파일이나 내부 DB에 저장하여 대시보드를 구축할 수 있다. 두 접근법은 배타적이지 않으므로, 가능하다면 커스텀 콜백(실시간 알림용)과 LangSmith(사후 분석용)를 병행하는 것을 권장한다.
'Study > LangChain' 카테고리의 다른 글
| 10. LangChain HuggingFace 오픈소스 언어모델 활용방법 (1) | 2026.03.16 |
|---|---|
| 8. LangChain 실무 설계 고려사항 총정리 (0) | 2026.03.08 |
| 6. LangChain 검증, 고도화, 프로덕션 가이드 (0) | 2026.03.07 |
| 5. LangChain 에이전트 구축 가이드 - Tool 연동과 에이전트 패턴 (0) | 2026.03.03 |
| 4. LangChain 기본 활용 가이드 - 기본부터 체인 구성까지 (0) | 2026.02.23 |
댓글