9. LangChain 실무 RAG 파이프라인 구현 가이드 (Advanced)
- 본 가이드는 프로덕션 환경에서 RAG(Retrieval-Augmented Generation) 파이프라인을 설계, 구현, 운영하기 위한 고급 실무 지침서이다. 단순한 튜토리얼 수준을 넘어, 실제 서비스에서 마주치는 성능 병목, 비용 최적화, 장애 대응, 검색 품질 튜닝 등 프로덕션 레벨의 의사결정 기준과 구체적인 수치를 제시한다. 수만~수백만 건의 문서를 다루는 엔터프라이즈 환경을 기준으로 작성되었으며, 각 단계에서의 트레이드오프와 벤치마크 데이터를 포함한다.
1. 실무 RAG 파이프라인의 핵심 목표
1.1. 핵심 목표
| 목표 | 설명 | 측정 방법 | 프로덕션 기준값 |
| 답변 정확도 | 검색된 문서에 기반한 사실적으로 정확한 답변을 생성해야 한다. 환각(hallucination)을 최소화하고, 근거 문서와 답변 간의 일치도를 정량적으로 측정한다. 특히 도메인 전문 용어의 정확한 사용과 수치 데이터의 오류 없는 인용이 핵심이다. | LangSmith Evaluation(Correctness, Faithfulness 스코어), RAGAS faithfulness/answer_relevancy 메트릭, 도메인 전문가 블라인드 평가(월 1회 이상), 사용자 피드백 thumbs-up/down 비율 추적 | Faithfulness ≥ 0.90, Answer Relevancy ≥ 0.85, 사용자 긍정 피드백 비율 ≥ 80% |
| 검색 관련성 | 사용자 질의에 대해 의미적으로 관련된 문서를 상위에 정확히 반환해야 한다. 단순 키워드 매칭이 아닌 의미 기반 검색을 수행하되, 도메인 특화 용어에 대해서는 정확 매칭(exact match)도 병행해야 한다. 검색 실패(zero relevant results)는 사용자 이탈의 가장 큰 원인이다. | Retrieval Recall@k, Precision@k, MRR(Mean Reciprocal Rank), NDCG@10, Hit Rate@k, 골든 셋(golden set) 기반 자동 평가 파이프라인 구축 | Hit Rate@4 ≥ 0.90, MRR ≥ 0.75, NDCG@10 ≥ 0.70 |
| 응답 속도 | 사용자가 질의를 입력한 시점부터 최종 답변이 렌더링되기 시작하는 시점까지의 지연 시간을 최소화한다. 스트리밍을 활용하면 첫 토큰까지의 시간(TTFT)을 줄여 체감 속도를 개선할 수 있다. 검색 단계와 생성 단계 각각의 레이턴시를 분리 측정하여 병목을 식별해야 한다. | E2E Latency(p50, p95, p99), TTFT(Time To First Token), Retrieval Latency, LLM Generation Latency, OpenTelemetry + Jaeger/Datadog 분산 트레이싱 | E2E p50 ≤ 2초, p95 ≤ 5초, TTFT ≤ 800ms, Retrieval ≤ 200ms |
| 비용 효율 | LLM API 호출 비용, 임베딩 API 비용, 벡터 DB 호스팅 비용, 인프라 비용의 총합을 관리한다. 질의당 평균 비용을 추적하고, 캐싱/모델 라우팅으로 불필요한 API 호출을 제거한다. 비용은 트래픽에 비례하여 증가하므로 단위 질의당 비용(cost-per-query)을 KPI로 관리한다. | 월간 총 API 비용, 질의당 평균 비용(input/output 토큰 분리), 캐시 히트율, 모델별 사용 비율 추적, 비용 알림 임계값 설정(CloudWatch/Grafana) | 질의당 평균 비용 ≤ $0.01(GPT-4o-mini 기준) 또는 ≤ $0.05(GPT-4o 기준), 캐시 히트율 ≥ 30% |
| 유지보수성 | 문서 추가/삭제/수정, 모델 교체, 프롬프트 변경이 코드 전체 수정 없이 가능한 모듈화된 구조를 유지한다. 설정 파일 기반 구성(config-driven), 인터페이스 분리 원칙을 적용하여 각 컴포넌트를 독립적으로 교체할 수 있어야 한다. | 컴포넌트 교체 소요 시간(모델 교체 < 1시간), 코드 커버리지, 통합 테스트 수, 배포 빈도(주 1회 이상 가능), 롤백 소요 시간 | 모델 교체 < 1시간, 프롬프트 변경 < 10분, 롤백 < 5분 |
| 확장성 | 문서 수 증가(10만 → 100만건), 동시 사용자 증가(10 → 1,000 QPS)에 대응할 수 있는 수평적 확장이 가능한 아키텍처를 설계한다. 벡터 DB 샤딩, LLM 요청 큐잉, 로드 밸런싱 전략을 사전에 수립해야 한다. | 동시 요청 처리량(QPS), 문서 수 대비 검색 레이턴시 선형성, 오토스케일링 반응 시간, 부하 테스트(k6/Locust) 결과 | 100 QPS 이상 처리, 100만 벡터에서 검색 < 100ms, 오토스케일링 반응 < 60초 |
| 보안/ 컴플라이언스 |
API 키 노출 방지, PII(개인식별정보) 필터링, 프롬프트 인젝션 방어, 감사 로그 기록을 포함한 보안 체계를 구축한다. 금융/의료/법률 도메인에서는 규제 준수(GDPR, 개인정보보호법)가 필수이며, LLM 응답에 대한 면책 조항을 명시해야 한다. | OWASP LLM Top 10 체크리스트, PII 탐지율, 프롬프트 인젝션 방어 테스트, 감사 로그 완전성, 침투 테스트 주기 | PII 필터링 정확도 ≥ 99%, 프롬프트 인젝션 차단율 ≥ 95%, 감사 로그 100% 기록 |
1.1.1. 목표 우선순위 프레임워크
- 프로젝트 단계에 따라 목표 우선순위가 달라진다. 모든 목표를 동시에 최적화하려 하면 프로젝트 일정이 지연되므로, 단계별로 집중해야 할 목표를 명확히 설정한다.
| 프로젝트 단계 | 최우선 목표 | 차선 목표 | 후순위 목표 | 핵심 활동 |
| PoC (1~2주) | 답변 정확도, 검색 관련성 | 유지보수성 | 응답 속도, 비용, 확장성, 보안 | 골든 셋 구축, 기본 파이프라인 검증, 도메인 적합성 확인 |
| MVP (2~4주) | 답변 정확도, 검색 관련성, 응답 속도 | 비용 효율, 유지보수성 | 확장성, 보안 | 프롬프트 튜닝, 청킹 전략 최적화, 스트리밍 구현 |
| 프로덕션 Alpha (1~2개월) | 응답 속도, 비용 효율, 보안 | 확장성, 유지보수성 | - | 캐싱 도입, 모델 라우팅, PII 필터링, 모니터링 구축 |
| 프로덕션 GA (운영) | 확장성, 보안/컴플라이언스 | 전체 목표 균형 유지 | - | 오토스케일링, 부하 테스트, 감사 체계, A/B 테스트 |
- 핵심 원칙: PoC 단계에서 검색 품질(Retrieval Quality)이 확보되지 않으면 이후 어떤 최적화를 해도 최종 답변 품질은 개선되지 않는다. "Garbage In, Garbage Out" 원칙이 RAG에서 가장 강하게 적용되는 지점이 바로 Retrieval 단계이다.
1.2. 전체 파이프라인 흐름도
[실무 RAG 파이프라인 전체 흐름 - Advanced]
━━━ 오프라인 단계 (인덱싱 파이프라인) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
[문서 수집] [문서 전처리] [청킹] [임베딩] [인덱싱] [검증]
┌──────┐ ┌────────────────┐ ┌─────────────┐ ┌───────────┐ ┌────────────┐ ┌────────────┐
│ DOCX │ │ 인코딩 정규화 │ │ Recursive │ │ Dense │ │ HNSW 인덱스 │ │ 골든 셋 │
│ PDF │─────▶│ OCR 처리 │────▶│ Semantic │───▶│ Embedding │──▶│ 메타데이터 │──▶│ Hit Rate │
│ HTML │ │ 메타데이터 추출│ │ Parent-Child│ │ Batch 처리│ │ 필터 인덱스 │ │ 품질 리포트│
│ CSV │ │ PII 마스킹 │ │ Agentic │ │ 캐싱 │ │ 네임스페이스│ │ 분포 분석 │
│ API │ │ 중복 제거 │ │ │ │ │ │ │ │ │
└──────┘ └────────────────┘ └─────────────┘ └───────────┘ └─────────────┘ └────────────┘
━━━ 온라인 단계 (쿼리 파이프라인) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
[사용자 질문]
│
▼
┌────────────────────────────┐
│ 쿼리 전처리 │ ← PII 탐지, 프롬프트 인젝션 감지, 입력 검증
│ (Input Guard) │
└────────────────────────────┘
│
▼
┌────────────────────────────┐
│ 시맨틱 캐시 조회 │ ← 유사 질의 캐시 히트 시 즉시 반환 (p50 < 50ms)
│ (Semantic Cache) │
└────────────────────────────┘
│ (캐시 미스)
▼
┌────────────────────────────┐
│ 쿼리 변환 │ ← Multi-Query, HyDE, Step-back, 키워드 사전 매핑
│ (Query Transformation) │
└────────────────────────────┘
│
▼
┌────────────────────────────┐
│ 하이브리드 검색 │ ← Dense(벡터) + Sparse(BM25) + 메타데이터 필터링
│ (Hybrid Retrieval) │ RRF(Reciprocal Rank Fusion)로 결과 병합
└────────────────────────────┘
│
▼
┌────────────────────────────┐
│ Re-ranking │ ← Cross-encoder 또는 Cohere Rerank로 정밀 재순위화
│ (정밀 재순위화) │ 상위 k개 → 상위 n개로 필터링 (k > n)
└────────────────────────────┘
│
▼
┌────────────────────────────┐
│ 컨텍스트 조립 │ ← 토큰 예산 내에서 문서 배치, 중복 제거, 순서 최적화
│ (Context Assembly) │
└────────────────────────────┘
│
▼
┌────────────────────────────┐
│ LLM 생성 (Streaming) │ ← 모델 라우팅(복잡도 기반), 폴백 체인, 토큰 추적
│ (Generation + Guardrails) │
└────────────────────────────┘
│
▼
┌────────────────────────────┐
│ 후처리 │ ← 출처 표기, 포맷팅, PII 필터링, 응답 검증
│ (Post-processing) │
└────────────────────────────┘
│
▼
[최종 답변 반환 + 캐시 저장 + 로깅]
1.3. Naive RAG vs Advanced RAG vs Modular RAG
- RAG 아키텍처는 복잡도와 성숙도에 따라 세 가지 패러다임으로 구분된다. 현재 프로젝트의 요구사항과 팀 역량에 맞는 패러다임을 선택하고, 점진적으로 고도화하는 전략이 효과적이다.
| 구분 | Naive RAG | Advanced RAG | Modular RAG |
| 아키텍처 | 단일 체인: 검색 → 생성 | 멀티 스테이지: 전처리 → 검색 → 재순위 → 생성 | 플러그인 방식: 각 모듈을 독립적으로 교체/조합 |
| 검색 방식 | 단일 벡터 유사도 검색 | 하이브리드 검색(Dense + Sparse) + Reranking | 적응형 검색(쿼리 유형에 따라 검색 전략 동적 선택) |
| 쿼리 처리 | 원본 쿼리 그대로 검색 | 쿼리 변환(Multi-Query, HyDE, Step-back) | 라우터 기반 쿼리 분류 + 전략별 파이프라인 분기 |
| 청킹 전략 | 고정 크기(Fixed-size) | Semantic/Parent-Child 청킹 | 문서 유형별 적응형 청킹 + 동적 오버랩 |
| 컨텍스트 관리 | 검색된 문서 전체를 그대로 전달 | 토큰 예산 관리, 컨텍스트 압축, 중복 제거 | 반복적 검색(Iterative Retrieval), Self-RAG |
| 평가/모니터링 | 수동 확인 | 골든 셋 기반 자동 평가, LangSmith 트레이싱 | 온라인 A/B 테스트, 실시간 품질 모니터링, 자동 롤백 cloud. |
| 구현 난이도 | 낮음 (1~2일) | 중간 (1~2주) | 높음 (1~2개월) |
| 적합한 상황 | PoC, 내부 데모, 문서 수 1000건 이하 | 프로덕션 MVP, 중규모 서비스 | 대규모 엔터프라이즈, 다중 도메인 서비스 |
| 대표적 한계 | 검색 품질이 낮으면 답변도 나빠짐, 환각 제어 어려움 | 파이프라인 복잡도 증가, 레이턴시 증가 가능 | 설계/운영 복잡도 높음, 숙련된 엔지니어 필요 |
- 마이그레이션 경로:
Naive RAG ──────────────────▶ Advanced RAG ──────────────────▶ Modular RAG
1단계: 하이브리드 검색 도입 1단계: 라우터 모듈 추가
2단계: Reranking 추가 2단계: 적응형 검색 전략
3단계: 쿼리 변환 적용 3단계: Self-RAG / CRAG 도입
4단계: 시맨틱 캐시 도입 4단계: 온라인 평가 + 자동 튜닝
- 실무 판단 기준: 검색 대상 문서가 1000건 미만이고 단일 도메인이라면 Naive RAG로도 충분한 품질을 달성할 수 있다. 그러나 문서가 1만 건을 넘거나, 다양한 질의 유형을 처리해야 하거나, 답변 정확도에 대한 SLA가 있다면 반드시 Advanced RAG 이상을 적용해야 한다.
2. 환경 설정
- 환경 설정은 RAG 파이프라인의 재현성(reproducibility), 보안(security), 운영 안정성(operational stability)을 결정하는 기반이다. 프로덕션 환경에서는 패키지 버전 고정, 시크릿 관리, 멀티 환경 설정 분리가 필수이다.
2.1. 패키지 설치
# 프로덕션 권장: uv를 사용한 빠르고 결정적인 패키지 설치
# uv는 pip 대비 10~100배 빠르며, 결정적 잠금(deterministic locking)을 지원한다
# pyproject.toml 기반 의존성 그룹 관리 (권장)
# [project]
# dependencies = [
# "python-dotenv>=1.0.0,<2.0.0",
# "langchain>=0.3.0,<0.4.0",
# "langchain-openai>=0.2.0,<0.3.0",
# "langchain-chroma>=0.2.0,<0.3.0",
# "langchain-text-splitters>=0.3.0,<0.4.0",
# ]
#
# [project.optional-dependencies]
# pinecone = ["langchain-pinecone>=0.2.0,<0.3.0", "pinecone-client>=5.0.0,<6.0.0"]
# dev = ["pytest>=8.0", "langsmith>=0.2.0", "ragas>=0.2.0"]
# production = ["gunicorn>=22.0", "uvicorn>=0.30.0", "fastapi>=0.115.0"]
# 설치 명령어
# uv sync # 기본 의존성 설치
# uv sync --extra dev # 개발 의존성 포함
# uv sync --extra production # 프로덕션 의존성 포함
# uv sync --all-extras # 전체 의존성 설치
- 버전 고정 전략: `>=X.Y.0,<X.(Y+1).0` 형태의 호환 범위 지정을 권장한다. LangChain은 마이너 버전 간 브레이킹 체인지가 빈번하므로, 마이너 버전 상한을 반드시 설정한다. `uv.lock` 또는 `poetry.lock` 파일을 Git에 커밋하여 팀 전체가 동일한 버전을 사용하도록 보장한다.
2.1.1. 패키지별 역할
| 패키지 | 역할 | 권장 버전 | 대안 패키지 | 호환성 주의사항 |
| python-dotenv | .env 파일에서 환경변수를 로드하여 API 키를 안전하게 관리. pydantic-settings와 함께 사용하면 타입 검증까지 가능하다. | ≥1.0.0 | pydantic-settings (타입 안전), python-decouple | 특별한 호환성 이슈 없음 |
| langchain | LangChain 프레임워크의 핵심 모듈. Chain, Prompt, OutputParser, Retriever 등의 추상화를 제공한다. 0.3.x에서 LCEL이 기본 패턴으로 확립되었다. | ≥0.3.0 | llama-index (트리 기반 인덱싱에 강점), haystack | langchain-core와 버전 동기화 필수, 0.2→0.3 마이그레이션 시 deprecated API 및 import 경로 변경 확인 필요 |
| langchain-openai | OpenAI의 ChatGPT, GPT-4o, Embedding 모델과의 연동. 스트리밍, 함수 호출, 구조화된 출력을 지원한다. | ≥0.2.0 | langchain-anthropic, langchain-google-genai | openai 패키지 ≥1.0.0 권장, 특정 버전 조합에서 의존성 충돌 가능하므로 릴리스 노트로 호환 범위 확인 필요 |
| langchain-chroma | Chroma 벡터 DB 연동. 로컬 개발에 최적이며 설치가 간편하다. | ≥0.2.0 | langchain-qdrant, langchain-weaviate | chromadb ≥0.5.0과 함께 설치, SQLite 버전 제약으로 인해 chroma가 sqlite3 ≥3.35.0을 요구하며 Docker·배포 환경에서 오류가 발생할 수 있음 |
| langchain-pinecone | Pinecone 클라우드 벡터 DB 연동. Serverless 아키텍처를 지원하여 비용 효율적이다. | ≥0.2.0 | langchain-weaviate, langchain-qdrant | pinecone-client ≥5.0.0 사용 시 v4→v5에서 import 경로와 초기화 방식이 달라지므로 마이그레이션 가이드 확인 필요 |
| langchain-text-splitters | 문서 청킹 도구. RecursiveCharacterTextSplitter, SemanticChunker 등을 제공한다. | ≥0.3.0 | unstructured (문서 파싱+청킹), semantic-text-splitter (Rust 기반, 빠름) | langchain-core 버전과 동기화 필요, 0.3 계열에서 일부 분리·이관된 모듈이 있으므로 import 경로 확인 필요 |
| langchain-community | 서드파티 통합 모듈. BM25Retriever, EnsembleRetriever 등 하이브리드 검색 컴포넌트를 포함한다. | ≥0.3.0 | 각 통합별 전용 패키지 | rank_bm25 등 일부 의존 패키지를 별도 설치해야 하며, LangChain 코어와의 메이저 버전 차이가 크면 동작하지 않을 수 있음 |
| chromadb | Chroma 벡터 DB 엔진. HNSW 인덱스 기반 ANN 검색, 메타데이터 필터링을 지원한다. | ≥0.5.0 | qdrant-client, weaviate-client | SQLite ≥3.35.0 필요, 호환 안 되는 환경에서는 pysqlite3 또는 chromadb-pysqlite3를 사용해 우회해야 하며 Docker 이미지의 기본 sqlite 버전을 반드시 확인해야 한다 |
| pinecone-client | Pinecone 클라우드 API 클라이언트. gRPC/REST 모두 지원하며, 배치 업서트를 최적화한다. | ≥5.0.0 | qdrant-client, pymilvus | v5에서 from pinecone import Pinecone 형태로 import 방식이 바뀌었고, 기존 인덱스 관리 API 일부가 변경·폐기되었으므로 버전 업 시 코드 수정 필요 |
2.2. 환경변수 로드 및 API 키 관리
2.2.1. 기본 환경변수 로드
from dotenv import load_dotenv
import os
# .env 파일에서 환경변수 로드
load_dotenv()
# API 키 로드 및 검증 함수
def validate_api_keys():
"""필수 API 키가 설정되었는지 검증한다."""
required_keys = {
"OPENAI_API_KEY": "OpenAI API (LLM + Embedding)",
}
optional_keys = {
"PINECONE_API_KEY": "Pinecone Vector DB",
"LANGCHAIN_API_KEY": "LangSmith 모니터링",
"COHERE_API_KEY": "Cohere Reranker",
}
missing = []
for key, desc in required_keys.items():
val = os.environ.get(key)
if not val:
missing.append(f" - {key} ({desc})")
else:
print(f"[OK] {key}: {val[:8]}...")
for key, desc in optional_keys.items():
val = os.environ.get(key)
if val:
print(f"[OK] {key}: {val[:8]}... (optional)")
else:
print(f"[--] {key}: 미설정 ({desc} - 선택 사항)")
if missing:
raise EnvironmentError(
f"필수 API 키가 설정되지 않았습니다:\n" + "\n".join(missing)
)
validate_api_keys()
2.2.2. 프로덕션 시크릿 관리 패턴
import os
from functools import lru_cache
class SecretManager:
"""환경에 따라 적절한 시크릿 소스에서 API 키를 로드한다.
- development: .env 파일 (python-dotenv)
- staging: AWS Secrets Manager 또는 GCP Secret Manager
- production: HashiCorp Vault 또는 클라우드 시크릿 매니저 + 자동 로테이션
"""
def __init__(self):
self.env = os.environ.get("DEPLOY_ENV", "development")
@lru_cache(maxsize=32)
def get_secret(self, key: str) -> str:
if self.env == "development":
return self._from_env(key)
elif self.env in ("staging", "production"):
return self._from_cloud_secret_manager(key)
raise ValueError(f"Unknown environment: {self.env}")
def _from_env(self, key: str) -> str:
value = os.environ.get(key)
if not value:
raise EnvironmentError(f"{key} not found in environment")
return value
def _from_cloud_secret_manager(self, key: str) -> str:
"""AWS Secrets Manager 예시"""
import boto3
client = boto3.client("secretsmanager", region_name="ap-northeast-2")
response = client.get_secret_value(SecretId=f"rag-pipeline/{self.env}/{key}")
return response["SecretString"]
# 사용 예시
secrets = SecretManager()
openai_key = secrets.get_secret("OPENAI_API_KEY")
2.2.3. 멀티 환경 설정 분리
from pydantic_settings import BaseSettings
from pydantic import Field
class RAGConfig(BaseSettings):
"""RAG 파이프라인 설정. 환경변수 또는 .env 파일에서 자동 로드된다."""
# API Keys
openai_api_key: str = Field(..., alias="OPENAI_API_KEY")
pinecone_api_key: str = Field(default="", alias="PINECONE_API_KEY")
langchain_api_key: str = Field(default="", alias="LANGCHAIN_API_KEY")
# LLM 설정
llm_model: str = Field(default="gpt-4o-mini", alias="LLM_MODEL")
llm_temperature: float = Field(default=0.0, alias="LLM_TEMPERATURE")
llm_max_tokens: int = Field(default=2048, alias="LLM_MAX_TOKENS")
# Embedding 설정
embedding_model: str = Field(default="text-embedding-3-large", alias="EMBEDDING_MODEL")
embedding_dimensions: int = Field(default=3072, alias="EMBEDDING_DIMENSIONS")
# Retrieval 설정
retrieval_k: int = Field(default=4, alias="RETRIEVAL_K")
chunk_size: int = Field(default=1500, alias="CHUNK_SIZE")
chunk_overlap: int = Field(default=200, alias="CHUNK_OVERLAP")
# 환경
deploy_env: str = Field(default="development", alias="DEPLOY_ENV")
# Rate Limiting
max_requests_per_minute: int = Field(default=60, alias="MAX_RPM")
max_tokens_per_minute: int = Field(default=150000, alias="MAX_TPM")
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
# 환경별 설정 파일: .env.development, .env.staging, .env.production
config = RAGConfig()
2.2.4. API 키 보안 관리
| 항목 | 설명 | 구현 방법 | 중요도 |
| .env 파일 Git 제외 | API 키가 Git 저장소에 노출되면 봇과 스캐너에 의해 매우 빠르게 탈취될 수 있다. GitHub Secret Scanning을 활성화하면 실수로 커밋된 키를 자동으로 탐지·알림 받을 수 있다. | .gitignore에 .env* 패턴 추가, .env.example만 커밋. git-secrets, gitleaks, trufflehog 등을 pre-commit/CI 훅으로 설정해 커밋 전에 키 패턴을 차단한다. | 필수 |
| 환경변수 기반 로드 | 코드에 키를 하드코딩하면 PR, 로그, 예외 스택트레이스 어디서든 노출될 수 있다. 환경변수와 .env는 로컬 개발에만 쓰고, 리포지토리에는 값이 절대 남지 않도록 한다. | python-dotenv, pydantic-settings로 환경변수 로드 및 타입 검증, CI에서 정규식 기반 시크릿 검사 스텝을 추가해 하드코딩 키를 탐지한다. | 필수 |
| 프로덕션 시크릿 매니저 | 클라우드 시크릿 매니저는 암호화 저장, IAM 기반 접근 제어, 감사 로깅을 제공해 .env보다 안전하다. 개발 환경은 .env, staging/production은 시크릿 매니저로 분리하는 것이 일반적인 패턴이다. | AWS Secrets Manager(보안 정보당 월 약 0.40달러), GCP Secret Manager(버전당 월 0.06달러, 10,000회당 0.03달러), 또는 HashiCorp Vault를 사용해 애플리케이션 시작 시 시크릿을 로드한다. | 필수 |
| 키 로테이션 자동화 | 키가 유출되면 가능한 한 빨리 폐기·교체해야 피해를 줄일 수 있다. 60~90일 주기의 정기 로테이션은 노출을 모르고 지나가는 리스크를 줄여 준다. | AWS Secrets Manager의 자동 로테이션 기능과 Lambda, 또는 Cron+관리용 스크립트로 OpenAI/Pinecone 키를 재발급하고 시크릿 값을 교체한다. 로테이션 실패 시 알림 설정을 함께 구성한다. | 권장 |
| CI/CD 시크릿 주입 | CI 로그에 키가 찍히면 사설 레포라도 장기간 노출될 수 있다. GitHub Actions/GitLab CI의 내장 시크릿 저장소를 쓰면 마스킹과 권한 분리가 자동으로 적용된다. | GitHub Actions에서는 secrets.OPENAI_API_KEY, GitLab CI에서는 masked/protected 변수로 저장한 뒤 환경변수로 주입한다. 워크플로우에서 echo로 시크릿을 출력하지 않도록 점검한다. | 필수 |
| 최소 권한 원칙 | 하나의 마스터 키에 모든 권한을 부여하면 유출 시 피해 범위가 전체 시스템으로 확장된다. 프로젝트·환경별로 키를 분리하고, 사용량 상한을 두면 오남용 피해를 제한할 수 있다. | OpenAI 조직 설정의 usage limit와 프로젝트별 API 키, Pinecone 프로젝트별 키를 사용해 권한과 트래픽 한도를 분리한다. 내부 서비스 간에는 역할 기반 계정/키를 발급한다 | 권장 |
| 감사 로깅 | 비정상 사용(새로운 지역에서의 급격한 호출, 심야 시간 폭증 등)을 조기에 탐지하려면 상세 사용 로그가 필요하다. 로깅이 없으면 유출을 알아차리기까지 수일 이상 걸릴 수 있다. c | OpenAI Usage API, 벡터DB 및 애플리케이션 레벨 미들웨어에서 호출 메타데이터를 수집해 CloudWatch, Stackdriver, ELK 등에 적재하고, 이상 패턴 룰·알림(Slack/PagerDuty)을 설정한다. | 권장 |
# API 키 로테이션 자동화 예시 (AWS Secrets Manager + Lambda)
import boto3
import openai
from datetime import datetime
def rotate_openai_key():
"""OpenAI API 키를 자동 로테이션한다.
1. 새 API 키를 OpenAI에서 발급
2. Secrets Manager에 새 키 저장
3. 이전 키를 비활성화
4. 슬랙 알림 발송
"""
sm_client = boto3.client("secretsmanager")
secret_id = "rag-pipeline/production/OPENAI_API_KEY"
# 새 키 발급 후 업데이트
# (실제 구현에서는 OpenAI API를 통해 새 키를 발급받는다)
sm_client.update_secret(
SecretId=secret_id,
SecretString=new_key,
Description=f"Rotated at {datetime.utcnow().isoformat()}"
)
print(f"[{datetime.utcnow()}] API key rotated successfully")
3. LLM 선택 및 초기화
- LLM 선택은 RAG 파이프라인의 답변 품질, 비용, 응답 속도를 결정하는 핵심 의사결정이다. 2025년 기준 주요 모델들의 성능이 빠르게 수렴하고 있어, 절대적인 "최고 모델"보다는 프로젝트 요구사항에 맞는 최적 모델을 선택하는 것이 중요하다.
3.1. 모델 선택 기준
| 모델 | 제공자 | 컨텍스트 윈도우 | 입력 비용 ($/1M 토큰) | 출력 비용 ($/1M 토큰) | Latency p50 | 주요 강점 | RAG 적합도 |
| GPT-4.1 | OpenAI | 1M | $2.00 | $8.00 | ~600ms | 코딩, 지시 따르기 최상위, 긴 컨텍스트에서도 품질 유지. 구조화된 출력(Structured Output) 네이티브 지원. | 매우 높음 - 복잡한 추론이 필요한 프로덕션 RAG에 최적 |
| GPT-4.1 mini | OpenAI | 1M | $0.40 | $1.60 | ~300ms | GPT-4.1의 경량 버전. 속도와 비용 면에서 우수하며, 대부분의 RAG 태스크에서 GPT-4.1과 유사한 성능. | 매우 높음 - 비용 효율적 프로덕션 RAG에 최적 |
| GPT-4.1 nano | OpenAI | 1M | $0.10 | $0.40 | ~150ms | 가장 빠르고 저렴한 GPT-4.1 계열. 단순 Q&A, 분류, 추출 태스크에 충분. | 높음 - 단순 RAG, 높은 QPS 환경 |
| GPT-4o | OpenAI | 128K | $2.50 | $10.00 | ~500ms | 멀티모달(이미지/오디오), 안정적인 성능. 레거시 호환 필요 시 선택. | 높음 - 멀티모달 RAG |
| GPT-4o-mini | OpenAI | 128K | $0.15 | $0.60 | ~250ms | 가성비 최고. 단순~중간 복잡도 RAG에 충분한 품질. | 높음 - 비용 민감 프로젝트 |
| Claude Sonnet 4 (claude-sonnet-4-20250514) | Anthropic | 200K | $3.00 | $15.00 | ~700ms | 긴 문서 처리에 강함, 환각이 적고 근거 기반 답변 우수. 한국어 이해도 높음. 코딩 성능 최상위. | 매우 높음 - 정확성 중시 RAG |
| Claude Haiku 4.5 (claude-haiku-4-5-20251001) | Anthropic | 200K | $0.80 | $4.00 | ~300ms | Claude 계열의 경량 버전. 속도 대비 품질이 뛰어남. 비용 효율적. | 높음 - 빠른 응답 필요 시 |
| Gemini 2.5 Pro | 1M | $1.25 / $2.50 | $10.00 | ~800ms | 100만 토큰 컨텍스트로 초장문 문서 처리 가능. 내장 사고(thinking) 모드 지원. | 높음 - 대용량 문서 RAG | |
| Gemini 2.5 Flash | 1M | $0.15 / $0.30 | $0.60 / $2.50 | ~200ms | 가격 대비 성능 우수. 100만 토큰 컨텍스트. 빠른 응답. |
- 가격 참고: 위 가격은 2025년 5월 기준이며, 각 제공사의 가격 정책 변경에 따라 달라질 수 있다. 특히 OpenAI와 Google은 6~12개월 주기로 가격 인하를 단행하는 경향이 있으므로 정기적으로 확인한다.
3.1.1. 모델 선택 의사결정 흐름
┌─ 프로젝트 요구사항 분석
│
├─ Q1: 응답 레이턴시 요구사항은?
│ ├─ p95 < 1초 필요 → GPT-4.1 nano / Gemini 2.5 Flash
│ ├─ p95 < 3초 허용 → GPT-4.1 mini / Claude Haiku 4.5 / GPT-4o-mini
│ └─ p95 < 5초 허용 → GPT-4.1 / Claude Sonnet 4 / GPT-4o
│
├─ Q2: 컨텍스트 윈도우 요구사항은?
│ ├─ 검색 결과 4K 토큰 이내 → 모든 모델 가능
│ ├─ 검색 결과 32K~128K → GPT-4o 이상, Claude 4 계열
│ └─ 검색 결과 128K 이상 → GPT-4.1, Gemini 2.5 Pro (1M 컨텍스트)
│
├─ Q3: 월간 예산은?
│ ├─ < $100/월 (1만 질의 기준) → GPT-4.1 nano ($5), GPT-4o-mini ($8)
│ ├─ < $500/월 → GPT-4.1 mini ($60), Claude Haiku 4.5 ($48)
│ └─ 제한 없음 → 품질 최우선 모델 선택
│
├─ Q4: 구조화된 출력(JSON 등)이 필요한가?
│ ├─ Yes → GPT-4.1 계열 (Structured Output 네이티브 지원)
│ └─ No → 모든 모델 가능
│
├─ Q5: 한국어 품질이 특히 중요한가?
│ ├─ Yes → Claude Sonnet 4 > GPT-4.1 > Gemini 2.5 Pro
│ └─ No → 영어 기준 벤치마크 참조
│
└─ Q6: 멀티모달(이미지/표) 처리가 필요한가?
├─ Yes → GPT-4o, GPT-4.1, Gemini 2.5 Pro
└─ No → 텍스트 전용 최적화 모델 선택
3.2. LLM 초기화 코드
from langchain_openai import ChatOpenAI
from langchain_core.rate_limiters import InMemoryRateLimiter
import logging
logger = logging.getLogger(__name__)
# Rate Limiter 설정 - API 할당량 초과 방지
rate_limiter = InMemoryRateLimiter(
requests_per_second=1, # 초당 최대 요청 수
check_every_n_seconds=0.1, # 체크 간격
max_bucket_size=10, # 버스트 허용량
)
# 프로덕션 LLM 초기화 - 재시도 로직 및 타임아웃 포함
llm = ChatOpenAI(
model="gpt-4.1-mini", # 비용 효율적 프로덕션 모델
temperature=0, # RAG에서는 결정적 답변 권장
max_tokens=2048, # 응답 최대 토큰 수 (비용 제어)
timeout=30, # API 요청 타임아웃 (초)
max_retries=3, # 실패 시 최대 재시도 횟수 (지수 백오프 적용)
rate_limiter=rate_limiter, # Rate Limiter 적용
)
# 폴백 모델 구성 - 주 모델 장애 시 자동 전환
fallback_llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0,
max_tokens=2048,
timeout=30,
max_retries=2,
)
# with_fallbacks()로 주 모델 실패 시 자동 폴백
llm_with_fallback = llm.with_fallbacks([fallback_llm])
# 연결 테스트
try:
response = llm_with_fallback.invoke("테스트: 1+1은?")
print(f"LLM 연결 성공: {response.content}")
except Exception as e:
logger.error(f"LLM 연결 실패: {e}")
raise
# 스트리밍 설정 - 사용자 체감 속도 개선
# 스트리밍을 사용하면 TTFT(Time To First Token)를 크게 줄일 수 있다
async def stream_response(llm, messages):
"""스트리밍으로 응답을 생성하여 첫 토큰을 빠르게 반환한다."""
full_response = ""
async for chunk in llm.astream(messages):
token = chunk.content
full_response += token
print(token, end="", flush=True) # 실시간 출력
return full_response
# 토큰 사용량 추적 - 비용 관리
from langchain_core.callbacks import BaseCallbackHandler
class TokenUsageTracker(BaseCallbackHandler):
"""LLM 호출별 토큰 사용량을 추적하여 비용을 관리한다."""
def __init__(self):
self.total_input_tokens = 0
self.total_output_tokens = 0
self.call_count = 0
def on_llm_end(self, response, **kwargs):
if response.llm_output and "token_usage" in response.llm_output:
usage = response.llm_output["token_usage"]
self.total_input_tokens += usage.get("prompt_tokens", 0)
self.total_output_tokens += usage.get("completion_tokens", 0)
self.call_count += 1
@property
def estimated_cost(self):
"""GPT-4.1-mini 기준 예상 비용 계산"""
input_cost = (self.total_input_tokens / 1_000_000) * 0.40
output_cost = (self.total_output_tokens / 1_000_000) * 1.60
return input_cost + output_cost
def report(self):
return (
f"총 호출 {self.call_count}회 | "
f"입력 {self.total_input_tokens:,} 토큰 | "
f"출력 {self.total_output_tokens:,} 토큰 | "
f"예상 비용 ${self.estimated_cost:.4f}"
)
tracker = TokenUsageTracker()
llm_tracked = ChatOpenAI(
model="gpt-4.1-mini",
temperature=0,
callbacks=[tracker],
)
3.2.1. LLM 파라미터 상세 가이드
| 파라미터 | 설명 | RAG 권장값 | 상호작용 및 주의사항 |
| temperature | 다음 토큰 선택의 무작위성을 제어한다. 0이면 항상 확률이 가장 높은 토큰을 선택(greedy decoding)하고, 높을수록 낮은 확률의 토큰도 선택될 수 있다. | 0~0.1 | top_p와 동시에 낮추면 극도로 결정적인 출력. OpenAI는 둘 중 하나만 조정할 것을 권장한다. |
| top_p | 누적 확률이 top_p에 도달할 때까지의 토큰들만 후보로 사용한다(nucleus sampling). 0.1이면 상위 10% 확률 토큰만 사용한다. | 0.95 (기본값) | temperature=0일 때는 top_p가 무시된다. temperature > 0일 때만 의미 있다. |
| frequency_penalty | 이미 등장한 토큰의 빈도에 비례하여 해당 토큰의 선택 확률을 낮춘다. 반복적인 문구 생성을 방지한다. -2.0 ~ 2.0 범위. | 0 (기본값) | RAG에서는 검색 문서의 핵심 용어를 반복해야 하는 경우가 많으므로, 높게 설정하면 오히려 답변 품질이 저하된다. |
| presence_penalty | 이미 등장한 토큰에 대해 일정한 페널티를 부여하여 새로운 토픽 도입을 유도한다. -2.0 ~ 2.0 범위. | 0 (기본값) | 요약 태스크에서 0.5~1.0으로 설정하면 다양한 관점을 포함할 수 있다. 일반 RAG Q&A에서는 0 유지. |
| max_tokens | 생성할 최대 토큰 수. 비용 제어와 응답 길이 관리에 핵심적이다. | 1024~2048 | 너무 낮으면 답변이 중간에 잘림. 너무 높으면 불필요한 비용 발생. 프롬프트에 "간결하게 답변하라"를 포함하면 실제 사용량을 줄일 수 있다. |
| timeout | API 요청 타임아웃(초). 네트워크 지연이나 API 서버 과부하 시 무한 대기를 방지한다. | 30~60초 | 스트리밍 사용 시에는 첫 청크까지의 시간이므로 10~15초로 줄일 수 있다. |
| max_retries | API 호출 실패(5xx, 타임아웃, rate limit) 시 자동 재시도 횟수. 지수 백오프가 기본 적용된다. | 2~3 | 429(rate limit) 에러에는 Retry-After 헤더를 존중한다. 4xx 에러(인증 실패 등)에는 재시도하지 않는다. |
3.3. 비용 최적화 전략
import os
import hashlib
import json
from langchain_openai import ChatOpenAI
# 전략 1: 환경별 모델 자동 선택
env = os.environ.get("DEPLOY_ENV", "development")
MODEL_CONFIG = {
"development": {"model": "gpt-4o-mini", "max_tokens": 1024},
"staging": {"model": "gpt-4.1-mini", "max_tokens": 2048},
"production": {"model": "gpt-4.1-mini", "max_tokens": 2048},
}
config = MODEL_CONFIG[env]
llm = ChatOpenAI(model=config["model"], temperature=0, max_tokens=config["max_tokens"])
# 전략 2: 시맨틱 캐싱 - 동일/유사 질의에 대한 반복 API 호출 방지
# 정확 매칭(exact match)과 유사 매칭(fuzzy match)을 조합하여 캐시 히트율을 극대화한다
from langchain_core.globals import set_llm_cache
from langchain_community.cache import SQLiteCache
# 정확 매칭 캐시 (동일한 프롬프트 → 동일한 응답 반환)
set_llm_cache(SQLiteCache(database_path=".langchain_cache.db"))
# 시맨틱 캐시 (의미적으로 유사한 질의 → 캐시된 응답 반환)
# 임베딩 기반으로 질의 유사도를 측정하여 임계값 이상이면 캐시 히트
from langchain_community.cache import RedisSemanticCache
# Redis + 임베딩 기반 시맨틱 캐시 (프로덕션 권장)
# set_llm_cache(RedisSemanticCache(
# redis_url="redis://localhost:6379",
# embedding=embedding,
# score_threshold=0.95, # 유사도 임계값 (높을수록 엄격)
# ))
# 전략 3: 쿼리 복잡도 기반 모델 라우팅
# 단순 질의는 저렴한 모델, 복잡한 질의는 고성능 모델로 자동 분배
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
# 복잡도 분류기 (가장 저렴한 모델 사용)
classifier_llm = ChatOpenAI(model="gpt-4.1-nano", temperature=0)
classifier_prompt = ChatPromptTemplate.from_messages([
("system", """질문의 복잡도를 1~3으로 분류하세요.
1: 단순 사실 확인 (예: "X는 무엇인가?")
2: 비교/분석 필요 (예: "A와 B의 차이는?")
3: 복합 추론/종합 필요 (예: "A 상황에서 B를 고려할 때 최적의 C 전략은?")
숫자만 답하세요."""),
("human", "{query}")
])
# 복잡도별 모델 매핑
complexity_model_map = {
"1": ChatOpenAI(model="gpt-4.1-nano", temperature=0), # $0.10/1M 입력
"2": ChatOpenAI(model="gpt-4.1-mini", temperature=0), # $0.40/1M 입력
"3": ChatOpenAI(model="gpt-4.1", temperature=0), # $2.00/1M 입력
}
async def route_query(query: str):
"""쿼리 복잡도를 분석하여 적절한 모델로 라우팅한다."""
chain = classifier_prompt | classifier_llm
result = await chain.ainvoke({"query": query})
complexity = result.content.strip()
return complexity_model_map.get(complexity, complexity_model_map["2"])
# 전략 4: 프롬프트 압축 - 컨텍스트 토큰 수 절감
# LLMLingua 또는 요약 기반 압축으로 입력 토큰을 30~50% 절감
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
# LLM 기반 컨텍스트 압축: 질의에 관련된 부분만 추출
compressor = LLMChainExtractor.from_llm(
ChatOpenAI(model="gpt-4.1-nano", temperature=0) # 압축에는 저렴한 모델 사용
)
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=retriever, # 기존 retriever
)
# 검색된 문서에서 질의 관련 부분만 추출하여 LLM에 전달
# → 입력 토큰 30~50% 절감, 노이즈 감소로 답변 품질 향상 가능
- 비용 최적화 효과 비교
| 전략 | 절감 효과 | 구현 복잡도 | 부작용 |
| 환경별 모델 분리 | 개발 비용 70~90% 절감 | 낮음 | 개발/프로덕션 품질 차이 가능 |
| 정확 매칭 캐싱 | 반복 질의 100% 절감 (캐시 히트 시) | 낮음 | 캐시 무효화 전략 필요 (문서 업데이트 시) |
| 시맨틱 캐싱 | 유사 질의 포함 30~50% 절감 | 중간 | 임베딩 비용 추가, 임계값 튜닝 필요 (거짓 양성/음성) |
| 모델 라우팅 | 전체 비용 40~60% 절감 | 중간 | 분류기 오류 시 품질 저하 (쉬운 태스크에 고가 모델 배정) |
| 프롬프트 압축 | 입력 토큰 30~50% 절감 | 중간 | 압축 과정에서 정보 손실 가능 (핵심 사실 누락) |
| 배치 처리 (Batch API) | 50% 비용 할인 (OpenAI) | 낮음 | 24시간 내 응답, 실시간 불가 (비동기 작업에만 적합) |
- 실무 권장: 가장 ROI가 높은 순서는 (1) 환경별 모델 분리 → (2) 정확 매칭 캐싱 → (3) 모델 라우팅이다. 시맨틱 캐싱은 트래픽이 일정 수준(일 1000 질의) 이상일 때 효과가 나타난다.
4. 문서 로드 및 청킹
- 문서 로드와 청킹은 RAG 파이프라인 품질의 80%를 결정하는 가장 중요한 단계이다. "Garbage In, Garbage Out" 원칙이 가장 극명하게 적용되는 지점으로, 이 단계의 품질이 낮으면 이후 임베딩, 검색, 생성 단계를 아무리 최적화해도 최종 답변 품질은 개선되지 않는다.
4.1. 문서 로드
4.1.1. 문서 로더 선택 가이드
| 문서 형식 | LangChain 로더 | 설치 패키지 | 성능 특성 | OCR 지원 | 주요 한계 및 대안 |
| DOCX | Docx2txtLoader | docx2txt | 빠름(순수 텍스트 추출). 50MB 문서도 수초 내 처리. 서식 정보 손실. | 미지원 | 표/이미지 추출 불가. 서식이 중요하면 UnstructuredWordDocumentLoader 사용. python-docx로 표 별도 추출 가능. 대용량 문서도 안정적으로 처리. |
| PDF (텍스트) | PyPDFLoader | pypdf | 페이지별 로딩, 메타데이터에 페이지 번호 포함. 100페이지 PDF 약 2~5초. | 미지원 | 복잡한 레이아웃(다단, 사이드바)에서 텍스트 순서 오류 발생. PyMuPDFLoader(fitz)가 2~3배 빠르고 레이아웃 보존 우수. PDF 내부 이미지나 주석은 비포함. |
| PDF (스캔) | UnstructuredPDFLoader | unstructured, pytesseract, pdf2image | OCR 포함 시 페이지당 2~5초. GPU 사용 시 0.5초/페이지. | Tesseract OCR | 한국어 OCR 정확도 향상을 위해 tessdata_best의 kor.traineddata 설치 필요. upstage-document-parse API가 한국어 OCR에 가장 정확(유료). OCR 성능은 해상도·주변 여백에 크게 영향받음. |
| CSV | CSVLoader | (기본 포함) | 행 단위로 Document 생성. 대용량 CSV는 메모리 주의. | 미지원 | 10만 행 이상은 pandas로 전처리 후 배치 로딩 권장. source_column 파라미터로 출처 메타데이터 지정 가능. 구분자(delimiter) 오탐 시 인코딩 에러 발생 가능. |
| TXT | TextLoader | (기본 포함) | 가장 빠름. 인코딩 자동 감지 미지원. | 미지원 | 한국어 파일은 encoding="utf-8" 명시 필수. autodetect_encoding=True 옵션으로 인코딩 자동 감지 가능. 줄바꿈(\n) 기준 청킹 필요. |
| HTML | BSHTMLLoader | beautifulsoup4 | HTML 태그 제거 후 텍스트 추출. | 미지원 | JavaScript 렌더링 콘텐츠는 추출 불가. SPA는 AsyncHtmlLoader + playwright 조합 필요. 메타 태그와 구조적 정보는 기본적으로 무시됨. |
| Markdown | UnstructuredMarkdownLoader | unstructured | 헤더 구조를 메타데이터로 보존. | 미지원 | MarkdownHeaderTextSplitter와 조합하면 섹션별 청킹 가능. 표나 코드 블록은 일반 텍스트로 처리됨. |
| JSON | JSONLoader | jq | jq 표현식으로 원하는 필드만 추출 가능. 유연함. | 미지원 | 중첩 구조가 복잡하면 커스텀 로더 작성 권장. 스키마 구조가 불규칙할 경우 파싱 오류 주의. |
| 디렉토리 | DirectoryLoader | (기본 포함) | 디렉토리 내 모든 파일을 glob 패턴으로 일괄 로딩. 멀티스레딩 지원. | 로더에 따라 | use_multithreading=True, max_concurrency=8로 병렬 로딩. show_progress=True로 진행률 확인. 특정 파일 형식만 처리하려면 glob="*.pdf" 등 지정. |
from langchain_community.document_loaders import (
Docx2txtLoader, PyPDFLoader, DirectoryLoader, TextLoader
)
# 에러 핸들링이 포함된 문서 로딩
def load_documents_safely(file_path: str) -> list:
"""파일 형식에 따라 적절한 로더를 선택하고, 에러를 핸들링한다."""
loaders = {
".docx": Docx2txtLoader,
".pdf": PyPDFLoader,
".txt": lambda p: TextLoader(p, encoding="utf-8"),
}
ext = "." + file_path.rsplit(".", 1)[-1].lower()
loader_cls = loaders.get(ext)
if not loader_cls:
raise ValueError(f"지원하지 않는 파일 형식: {ext}")
try:
loader = loader_cls(file_path) if not callable(loader_cls) or isinstance(loader_cls, type) else loader_cls(file_path)
documents = loader.load()
# 빈 문서 필터링
documents = [doc for doc in documents if doc.page_content.strip()]
if not documents:
raise ValueError(f"파일에서 텍스트를 추출할 수 없습니다: {file_path}")
print(f"[OK] {file_path}: {len(documents)}개 문서, "
f"총 {sum(len(d.page_content) for d in documents):,}자")
return documents
except UnicodeDecodeError:
# 인코딩 오류 시 대체 인코딩 시도
print(f"[WARN] UTF-8 디코딩 실패, EUC-KR로 재시도: {file_path}")
loader = TextLoader(file_path, encoding="euc-kr")
return loader.load()
except Exception as e:
print(f"[ERROR] 문서 로딩 실패: {file_path} - {e}")
raise
4.1.2. Document 객체와 메타데이터 강화
- LangChain의 `Document` 객체는 `page_content`(텍스트)와 `metadata`(딕셔너리)로 구성된다. 메타데이터를 풍부하게 구성하면 검색 시 필터링, 답변 시 출처 표기, 운영 시 문서 관리에 큰 도움이 된다.
from langchain_core.documents import Document
from datetime import datetime
import hashlib
def enrich_metadata(documents: list, source_info: dict) -> list:
"""문서의 메타데이터를 강화하여 검색 필터링과 출처 추적을 지원한다.
Args:
documents: Document 리스트
source_info: 추가 메타데이터 (카테고리, 버전, 작성일 등)
"""
enriched = []
for i, doc in enumerate(documents):
# 기존 메타데이터 보존 + 추가 메타데이터 병합
metadata = {
**doc.metadata,
**source_info,
"chunk_index": i,
"char_count": len(doc.page_content),
"indexed_at": datetime.utcnow().isoformat(),
# 문서 해시 (중복 감지용)
"content_hash": hashlib.md5(
doc.page_content.encode()
).hexdigest(),
}
enriched.append(Document(
page_content=doc.page_content,
metadata=metadata,
))
return enriched
# 사용 예시
documents = load_documents_safely("./tax.docx")
documents = enrich_metadata(documents, {
"category": "세법",
"document_type": "법률문서",
"version": "2025-01",
"department": "세무팀",
"access_level": "internal",
})
# 메타데이터를 활용한 검색 필터링 (Chroma 예시)
# retriever = database.as_retriever(
# search_kwargs={
# "k": 4,
# "filter": {"category": "세법", "version": "2025-01"}
# }
# )
4.2. 청킹 (Text Splitting)
- 청킹은 긴 문서를 검색과 LLM 처리에 적합한 크기의 청크(chunk)로 분할하는 과정이다. 청크의 크기, 경계, 의미적 완결성이 검색 품질에 직접적인 영향을 미친다.
4.2.1. chunk_size와 chunk_overlap의 관계
chunk_size = 1500, chunk_overlap = 200인 경우:
청크 1: [===============1500자===============]
청크 2: [=200=[===============1500자===============]
청크 3: [=200=[===============1500자===============]
→ overlap 비율 = 200/1500 = 13.3%
→ 실질 새 콘텐츠 = 1500 - 200 = 1300자/청크
→ 총 문서 길이 D인 경우 예상 청크 수 ≈ D / (chunk_size - chunk_overlap) + 1
→ 예: 45,000자 문서 → 45,000 / 1,300 + 1 ≈ 36개 청크
4.2.1.1. 최적 overlap 계산 공식:
- 최소 overlap: 평균 문장 길이 × 2 (한국어 평균 문장 약 40~60자 → 최소 80~120자)
- 권장 overlap: chunk_size × 0.10 ~ 0.15
- 최대 overlap: chunk_size × 0.20 (이 이상은 저장 비용 대비 효과 미미)
4.2.2. chunk_size 선정 기준
| chunk_size | 토큰 수 (한국어 기준) | 임베딩 모델 호환성 | 검색 Precision | 검색 Recall | 적합한 문서 유형 | 비용 영향 |
| 200~500자 | ~250~600 토큰 | 모든 모델 안전 | 매우 높음 (핵심 정보에 집중) | 낮음 (문맥 부족으로 관련 청크 누락 가능) | FAQ, 용어 정의, 짧은 규정 항목, 테이블 셀 | 임베딩 API 호출 수 증가, 벡터 DB 저장량 증가 |
| 500~1000자 | ~600~1200 토큰 | 모든 모델 안전 | 높음 | 중간 | 일반 업무 문서, 보고서, 매뉴얼 절(section) | 균형 잡힌 비용 |
| 1000~1500자 | ~1200~1800 토큰 | 모든 모델 안전 | 중간 (노이즈 포함 가능) | 높음 (충분한 문맥) | 법률 문서, 기술 사양서, 계약서 조항 | LLM 입력 토큰 증가 |
| 1500~2000자 | ~1800~2400 토큰 | 대부분 안전 (8K 제한 모델 주의) | 중간~낮음 | 매우 높음 | 학술 논문 섹션, 정책 문서 | LLM 비용 상당히 증가 |
| 2000자+ | ~2400+ 토큰 | text-embedding-3 계열만 안전 (8191 토큰 제한) | 낮음 (불필요한 정보 다수 포함) | 매우 높음 | 전체 문서 요약, 장문 분석 | 비용 매우 높음 |
- 한국어 토큰 특성: 한국어는 영어 대비 약 1.5~2배의 토큰을 소비한다. 예를 들어 "소득세 계산 방법"은 영어 "income tax calculation method"(4 토큰)에 비해 약 7~8 토큰을 소비한다. chunk_size를 문자 수 기준으로 설정할 때 이 비율을 반드시 고려해야 한다. `tiktoken` 라이브러리로 정확한 토큰 수를 계산할 수 있다.
# 토큰 수 정확 계산
import tiktoken
def count_tokens(text: str, model: str = "gpt-4o") -> int:
"""텍스트의 토큰 수를 정확히 계산한다."""
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
# 한국어 토큰 비율 확인
sample_kr = "근로소득이 있는 거주자의 종합소득 과세표준을 계산할 때 다음 각 호의 금액을 공제한다"
sample_en = "When calculating the comprehensive income tax base of a resident with earned income, the following amounts shall be deducted"
print(f"한국어 '{sample_kr[:20]}...' → {count_tokens(sample_kr)} 토큰")
print(f"영어 '{sample_en[:20]}...' → {count_tokens(sample_en)} 토큰")
# 한국어: ~45 토큰, 영어: ~22 토큰 → 약 2배 차이
4.2.3. 기본 청킹: RecursiveCharacterTextSplitter
from langchain_text_splitters import RecursiveCharacterTextSplitter
# 한국어 문서에 최적화된 구분자 순서
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1500,
chunk_overlap=200,
separators=[
"\n\n\n", # 섹션 구분 (빈 줄 2개)
"\n\n", # 문단 구분
"\n", # 줄바꿈
"다. ", # 한국어 문장 종결 (다.)
"요. ", # 한국어 문장 종결 (요.)
". ", # 영문 문장 종결
"다.\n", # 한국어 문장 종결 + 줄바꿈
" ", # 단어 구분
"", # 문자 단위 (최후 수단)
],
length_function=len, # 문자 수 기준 (토큰 기준은 아래 참조)
is_separator_regex=False,
)
# 토큰 기준 청킹 (더 정밀한 비용 제어가 필요한 경우)
# from langchain_text_splitters import TokenTextSplitter
# token_splitter = TokenTextSplitter(
# chunk_size=500, # 토큰 수 기준
# chunk_overlap=50,
# model_name="gpt-4o", # tiktoken 인코딩 모델
# )
4.2.4. 고급 청킹: Semantic Chunking
- Semantic Chunking은 문장 간 임베딩 유사도를 측정하여 의미적으로 자연스러운 경계에서 청크를 분할한다. 고정 크기 청킹의 가장 큰 문제인 "의미 단위 분절"을 해결한다.
from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings
embedding = OpenAIEmbeddings(model="text-embedding-3-small")
# Semantic Chunker: 문장 간 의미 유사도가 급격히 변하는 지점에서 분할
semantic_splitter = SemanticChunker(
embeddings=embedding,
breakpoint_threshold_type="percentile", # "percentile", "standard_deviation", "interquartile", "gradient"
breakpoint_threshold_amount=80, # 상위 80 퍼센타일 이상의 변화에서 분할
# → 문장 간 유사도 차이가 상위 20%에 해당할 만큼 클 때 청크 경계로 설정
)
# 사용 예시
documents = loader.load()
semantic_chunks = semantic_splitter.split_documents(documents)
print(f"Semantic 청킹 결과: {len(semantic_chunks)}개 청크")
lengths = [len(c.page_content) for c in semantic_chunks]
print(f" 크기 분포: 최소 {min(lengths)}자, 최대 {max(lengths)}자, "
f"평균 {sum(lengths)//len(lengths)}자, 중앙값 {sorted(lengths)[len(lengths)//2]}자")
# 주의: Semantic Chunking은 모든 문장을 임베딩해야 하므로 비용이 발생한다
# 1000문장 × text-embedding-3-small = 약 $0.002 (매우 저렴)
# 그러나 대용량 문서(10만 문장+)에서는 비용과 시간을 사전 산정해야 한다
4.2.5. 고급 청킹: Parent-Child (Small-to-Big) 전략
- 검색에는 작은 청크(높은 정밀도), 생성에는 큰 청크(풍부한 문맥)를 사용하는 전략이다. 검색 정밀도와 생성 품질을 동시에 높일 수 있다.
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.storage import InMemoryStore
from langchain.retrievers import ParentDocumentRetriever
# Parent (큰 청크) - LLM에 전달될 문맥
parent_splitter = RecursiveCharacterTextSplitter(
chunk_size=2000, # 큰 청크: 풍부한 문맥
chunk_overlap=200,
)
# Child (작은 청크) - 벡터 검색에 사용
child_splitter = RecursiveCharacterTextSplitter(
chunk_size=400, # 작은 청크: 높은 검색 정밀도
chunk_overlap=50,
)
# Parent 문서 저장소 (메모리 또는 Redis/DB)
store = InMemoryStore()
# ParentDocumentRetriever 구성
parent_retriever = ParentDocumentRetriever(
vectorstore=database, # 벡터 DB (Child 청크 저장)
docstore=store, # Parent 문서 저장소
child_splitter=child_splitter,
parent_splitter=parent_splitter,
)
# 문서 추가 (Parent와 Child 모두 자동 생성)
parent_retriever.add_documents(documents)
# 검색 시: Child 청크로 검색 → 해당 Parent 청크를 반환
# → 정밀한 검색 + 풍부한 문맥의 조합
results = parent_retriever.invoke("소득세 계산 방법")
print(f"반환된 Parent 청크 크기: {len(results[0].page_content)}자")
# → 약 2000자의 풍부한 문맥이 LLM에 전달됨
4.2.6. 고급 청킹: 문서 구조 인식 청킹
from langchain_text_splitters import MarkdownHeaderTextSplitter
# 마크다운 헤더 기반 청킹 - 문서 구조를 보존하면서 분할
headers_to_split_on = [
("#", "제목1"),
("##", "제목2"),
("###", "제목3"),
]
md_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=headers_to_split_on,
strip_headers=False, # 헤더를 청크 내용에 포함
)
# 마크다운 문서 분할
md_chunks = md_splitter.split_text(markdown_text)
# 결과: 각 청크의 metadata에 헤더 계층 구조가 포함됨
# 예: {"제목1": "소득세법", "제목2": "제3장 과세표준", "제목3": "제1절 공제"}
# → 검색 시 섹션별 필터링 가능
# 2단계: 헤더 기반 분할 후 크기가 큰 청크를 추가 분할
from langchain_text_splitters import RecursiveCharacterTextSplitter
secondary_splitter = RecursiveCharacterTextSplitter(
chunk_size=1500,
chunk_overlap=200,
)
final_chunks = secondary_splitter.split_documents(md_chunks)
# → 문서 구조 메타데이터를 보존하면서 적절한 크기로 분할
4.2.7. 청킹 전략 비교
| 전략 | 장점 | 단점 | 적합한 상황 | 구현 복잡도 |
| Fixed-size (RecursiveCharacter) | 구현 간단, 예측 가능한 청크 크기, 비용 계산 용이 | 의미 단위 분절 가능, 문맥 손실 | 범용 문서, PoC, 빠른 프로토타이핑 | 낮음 |
| Semantic Chunking | 의미적으로 자연스러운 경계, 높은 청크 품질 | 임베딩 API 비용 추가, 청크 크기 불균일 | 서술형 문서, 논문, 보고서 | 중간 |
| Parent-Child | 검색 정밀도와 생성 품질 동시 확보 | 저장 공간 2배, 구현 복잡도 증가, docstore 관리 필요 | 법률/기술 문서, 높은 정밀도 요구 | 중간~높음 |
| Header-based | 문서 구조 보존, 섹션별 필터링 가능 | 마크다운/HTML 형식 문서에만 적용 가능 | 구조화된 문서, 매뉴얼, 위키 | 낮음 |
| Agentic Chunking | LLM이 최적의 분할점을 판단, 최고 품질 | LLM API 비용 높음, 처리 속도 느림 | 소규모 고품질 문서, 비용 무관한 프로젝트 | 높음 |
| Late Chunking | 임베딩 시점에 전체 문서 문맥을 활용, 경계 정보 손실 최소 | 특정 임베딩 모델(Jina 등)만 지원 | 최신 임베딩 모델 사용 시 | 중간 |
4.2.8. 청킹 품질 검증
import numpy as np
from collections import Counter
def validate_chunks(chunks: list, expected_chunk_size: int = 1500) -> dict:
"""청킹 결과를 정량적으로 검증하고 리포트를 생성한다."""
lengths = [len(c.page_content) for c in chunks]
# 1. 크기 분포 분석
size_stats = {
"total_chunks": len(chunks),
"min_size": min(lengths),
"max_size": max(lengths),
"mean_size": int(np.mean(lengths)),
"median_size": int(np.median(lengths)),
"std_size": int(np.std(lengths)),
}
# 2. 이상 청크 탐지
too_small = [i for i, l in enumerate(lengths) if l < expected_chunk_size * 0.1]
too_large = [i for i, l in enumerate(lengths) if l > expected_chunk_size * 1.5]
# 3. 빈 청크 또는 의미 없는 청크 탐지
empty_or_trivial = [
i for i, c in enumerate(chunks)
if len(c.page_content.strip()) < 50
]
# 4. 경계 품질 검사 (문장 중간 분절 감지)
bad_boundaries = []
for i, c in enumerate(chunks):
text = c.page_content.strip()
# 문장 종결 어미가 아닌 곳에서 끝나는 청크 감지
if text and not any(text.endswith(end) for end in
["다.", "요.", "음.", "임.", "함.", ".", "!", "?", "```"]):
bad_boundaries.append(i)
# 5. 리포트 출력
print("=" * 60)
print("청킹 품질 검증 리포트")
print("=" * 60)
print(f"총 청크 수: {size_stats['total_chunks']}")
print(f"크기 분포: {size_stats['min_size']}~{size_stats['max_size']}자 "
f"(평균 {size_stats['mean_size']}자, 표준편차 {size_stats['std_size']}자)")
print(f"너무 작은 청크 (<{expected_chunk_size*0.1:.0f}자): {len(too_small)}개 "
f"{too_small[:5] if too_small else ''}")
print(f"너무 큰 청크 (>{expected_chunk_size*1.5:.0f}자): {len(too_large)}개 "
f"{too_large[:5] if too_large else ''}")
print(f"빈/의미없는 청크 (<50자): {len(empty_or_trivial)}개")
print(f"경계 품질 의심 (문장 미종결): {len(bad_boundaries)}개 "
f"({len(bad_boundaries)/len(chunks)*100:.1f}%)")
# 품질 판정
quality_score = 100
quality_score -= len(too_small) * 2
quality_score -= len(too_large) * 2
quality_score -= len(empty_or_trivial) * 5
quality_score -= min(len(bad_boundaries), 20) * 1
quality_score = max(0, quality_score)
print(f"\n청킹 품질 점수: {quality_score}/100")
if quality_score >= 90:
print("→ 우수: 프로덕션 배포 가능")
elif quality_score >= 70:
print("→ 양호: 경미한 개선 권장")
else:
print("→ 미흡: chunk_size/overlap/separator 재조정 필요")
return {**size_stats, "quality_score": quality_score}
# 사용 예시
report = validate_chunks(document_list, expected_chunk_size=1500)
4.2.8.1. 전체 흐름
청크 리스트 입력
↓
각 청크의 글자 수 측정 (lengths)
↓
5가지 품질 항목 검사
↓
점수 계산 → 리포트 출력
4.2.8.2. 파라미터
| 파라미터 | 의미 |
| chunks | LangChain Document 객체 리스트 (.page_content로 텍스트 접근) |
| expected_chunk_size | 기대하는 청크 크기 기준값 (기본 1500자) |
4.2.8.3. 검사 항목 5가지
4.2.8.3.1. 크기 분포 분석 (size_stats)
lengths = [len(c.page_content) for c in chunks]
- 모든 청크의 글자 수를 리스트로 추출
- min / max / 평균 / 중앙값 / 표준편차 계산
- **목적**: 청크들이 고르게 분포하는지 파악
4.2.8.3.2. 이상 청크 탐지
too_small = [i for i, l in enumerate(lengths) if l < expected_chunk_size * 0.1]
too_large = [i for i, l in enumerate(lengths) if l > expected_chunk_size * 1.5]
- `expected_chunk_size = 1500`이면:
- `too_small` : **150자 미만** 청크의 인덱스 수집
- `too_large` : **2250자 초과** 청크의 인덱스 수집
- **목적**: 분할이 너무 잘게 됐거나, 반대로 안 잘린 청크 탐지
4.2.8.3.3. 빈/의미없는 청크 탐지
empty_or_trivial = [i for i, c in enumerate(chunks)
if len(c.page_content.strip()) < 50]
- 공백 제거 후 **50자 미만**인 청크를 의미없는 청크로 판정
- 목적: 구분자만 남거나 내용이 거의 없는 쓰레기 청크 탐지
4.2.8.3.4. 경계 품질 검사 (핵심 로직)
if text and not any(text.endswith(end) for end in
["다.", "요.", "음.", "임.", "함.", ".", "!", "?", "```"]):
bad_boundaries.append(i)
- 청크의 마지막 문자가 문장 종결 부호로 끝나지 않으면 의심 청크로 분류
- 한국어 종결 어미(`다.` `요.` `음.` 등)와 일반 문장부호, 코드블록(` ``` `) 포함
- 목적: 문장 중간에서 잘린 청크 탐지 → LLM이 문맥을 잘못 이해할 수 있음
- 이 검사는 완벽하지 않다.
- 한국어 특성상 종결이 아닌데 `"다."`로 끝나는 경우(예: "제목이다.")도 있고,
- 반대로 정상인데 탐지될 수도 있어 **휴리스틱(어림 규칙)** 수준이다.
4.2.8.3.5. 품질 점수 계산
quality_score = 100
quality_score -= len(too_small) * 2 # 너무 작은 청크 1개당 -2점
quality_score -= len(too_large) * 2 # 너무 큰 청크 1개당 -2점
quality_score -= len(empty_or_trivial) * 5 # 빈 청크 1개당 -5점 (가중치 높음)
quality_score -= min(len(bad_boundaries), 20) * 1 # 최대 -20점 (상한 캡)
| 점수 | 판정 | 조치 사항 |
| 90점 이상 | 우수 — 프로덕션 배포 가능 | 모니터링 유지, A/B 테스트 권장 |
| 70~89점 | 양호 — 경미한 개선 권장 | overlap 10% 증가, chunk_size 미세 조정 |
| 70점 미만 | 미흡 — chunk_size / overlap / separator 재조정 필요 | 청킹 전략 변경(Semantic → Parent-Child 등), 재평가 |
4.2.8.4. 실제 출력 예시
============================================================
청킹 품질 검증 리포트
============================================================
총 청크 수: 142
크기 분포: 312~2187자 (평균 1423자, 표준편차 284자)
너무 작은 청크 (<150자): 2개 [3, 87]
너무 큰 청크 (>2250자): 0개
빈/의미없는 청크 (<50자): 0개
경계 품질 의심 (문장 미종결): 11개 (7.7%)
청킹 품질 점수: 85/100
→ 양호: 경미한 개선 권장
5. Embedding 모델 선택
- 임베딩(Embedding)은 텍스트를 고차원 벡터 공간의 밀집 벡터(dense vector)로 변환하는 과정이다. 변환된 벡터 간의 거리(유사도)를 계산하여 의미적으로 유사한 문서를 검색할 수 있게 된다. 임베딩 모델의 선택은 검색 품질의 상한선을 결정하며, 이후 Retriever나 Reranker로 보정할 수 있는 범위에 한계가 있다.
5.1. 임베딩 모델 비교
| 모델 | 제공자 | 차원 수 | 최대 토큰 | MTEB 평균 (영어) | 한국어 검색 품질 | 비용 ($/1M 토큰) | Matryoshka 지원 | 특이사항 |
| text-embedding-3-large | OpenAI | 3072 (256~3072 가변) | 8191 | 64.6 | 상 (다국어 학습, 한국어 데이터 포함) | $0.13 | 지원 (256, 512, 1024, 3072) | Matryoshka로 차원 축소 시 비용/성능 트레이드오프. 3072→1024 축소 시 성능 ~2% 하락, 저장 67% 절감. |
| text-embedding-3-small | OpenAI | 1536 (256~1536 가변) | 8191 | 62.3 | 중상 | $0.02 | 지원 (256, 512, 1536) | 가성비 최고. 대부분의 프로젝트에서 시작점으로 적합. text-embedding-ada-002 대비 성능 향상. |
| Cohere embed-v4 | Cohere | 1024 (128~1024) | 512 | 66.3 | 중상 (100+ 언어 지원) | $0.10 | 지원 | 다국어 성능 우수. 검색(search_document/search_query) 및 분류 용도 구분 지원. 바이너리 양자화 네이티브 지원(32배 저장 절감). |
| voyage-3-large | Voyage AI | 1024 | 32000 | 67.2 | 중상 | $0.18 | 미지원 | Anthropic 공식 권장. 32K 토큰으로 긴 청크 임베딩 가능. 코드 검색에 강점. |
| voyage-3-lite | Voyage AI | 512 | 32000 | 63.1 | 중 | $0.02 | 미지원 | 경량 버전. 비용 민감 프로젝트에 적합. |
| BGE-M3 | BAAI (오픈소스) | 1024 | 8192 | 64.1 | 상 (100+ 언어, 한국어 특화 학습) | 무료 (자체 호스팅) | 미지원 | Dense + Sparse + ColBERT 멀티 벡터 동시 생성. 하이브리드 검색에 최적. 자체 GPU 필요(V100 이상 권장). |
| multilingual-e5-large-instruct | Microsoft (오픈소스) | 1024 | 512 | 63.8 | 상 (한국어 포함 다국어 특화) | 무료 (자체 호스팅) | 미지원 | 지시문(instruction) 기반으로 검색 의도 명시 가능. HuggingFace에서 바로 사용 가능. |
| solar-embedding-1-large | Upstage | 4096 | 4096 | 63.5 | 매우 높음 (한국어 특화 학습) | $0.04 (1M 자) | 미지원 | 한국어 검색에서 최고 수준. 한국어 도메인 특화 RAG에 강력 추천. |
- 한국어 RAG 모델 선택 가이드**: 한국어 문서가 주요 대상이라면 (1) solar-embedding-1-large (최고 한국어 품질), (2) text-embedding-3-large (범용 + 한국어), (3) BGE-M3 (오픈소스 + 하이브리드)를 우선 검토한다. 비용이 최우선이라면 text-embedding-3-small로 시작하고, 검색 품질 평가 후 업그레이드 여부를 결정한다.
5.2. 임베딩 모델 초기화
from langchain_openai import OpenAIEmbeddings
# 기본 초기화
embedding = OpenAIEmbeddings(model="text-embedding-3-large")
# Matryoshka 차원 축소 - 비용/성능 트레이드오프
# 3072 → 1024 차원으로 축소하면 저장 공간 67% 절감, 검색 속도 향상
embedding_reduced = OpenAIEmbeddings(
model="text-embedding-3-large",
dimensions=1024, # 256, 512, 1024, 3072 중 선택
)
# 임베딩 동작 확인
test_vector = embedding.embed_query("소득세 계산 방법")
print(f"벡터 차원: {len(test_vector)}") # 3072
test_vector_reduced = embedding_reduced.embed_query("소득세 계산 방법")
print(f"축소 벡터 차원: {len(test_vector_reduced)}") # 1024
# 배치 임베딩 최적화 - 대량 문서 처리 시 필수
# OpenAI API는 단일 요청에 최대 2048개 텍스트를 배치 처리 가능
from langchain_openai import OpenAIEmbeddings
import time
embedding = OpenAIEmbeddings(
model="text-embedding-3-large",
chunk_size=1000, # 한 번의 API 호출에 포함할 텍스트 수 (기본 1000)
max_retries=3,
request_timeout=60,
)
# 대량 문서 임베딩 시 진행률 표시
def embed_documents_with_progress(texts: list[str], batch_size: int = 500):
"""대량 텍스트를 배치로 임베딩하고 진행률을 표시한다."""
all_embeddings = []
total = len(texts)
for i in range(0, total, batch_size):
batch = texts[i:i + batch_size]
batch_embeddings = embedding.embed_documents(batch)
all_embeddings.extend(batch_embeddings)
print(f" 임베딩 진행: {min(i + batch_size, total)}/{total} "
f"({min(i + batch_size, total)/total*100:.1f}%)")
time.sleep(0.1) # Rate limit 방지
return all_embeddings
# 사용 예시
# texts = [doc.page_content for doc in document_list]
# vectors = embed_documents_with_progress(texts)
# 임베딩 캐싱 - 동일 텍스트의 반복 임베딩 방지
from langchain.embeddings import CacheBackedEmbeddings
from langchain.storage import LocalFileStore
# 로컬 파일 기반 캐시 (개발 환경)
store = LocalFileStore("./embedding_cache/")
cached_embedding = CacheBackedEmbeddings.from_bytes_store(
underlying_embeddings=embedding,
document_embedding_cache=store,
namespace=embedding.model, # 모델별 캐시 분리
)
# 첫 호출: API 호출 + 캐시 저장
# 이후 호출: 캐시에서 즉시 반환 (API 비용 0)
# → 인덱스 재구축 시 변경되지 않은 문서는 캐시에서 로딩하여 비용 절감
5.2.1. 임베딩의 핵심 원리
- 임베딩은 텍스트의 의미를 고차원 벡터 공간의 점(point)으로 표현한다. 의미적으로 유사한 텍스트는 벡터 공간에서 가까운 위치에, 다른 텍스트는 먼 위치에 놓인다.
의미 유사도 예시 (코사인 유사도):
"소득세 계산 방법" ←→ "인컴택스 산출 절차" → 유사도: 0.92 (매우 유사)
"소득세 계산 방법" ←→ "부동산 투자 전략" → 유사도: 0.31 (관련 낮음)
"소득세 계산 방법" ←→ "오늘 날씨가 좋다" → 유사도: 0.05 (무관)
5.2.1.1. Dense vs Sparse 표현
| 구분 | Dense Embedding | Sparse Embedding (BM25/SPLADE) |
| 벡터 형태 | 모든 차원에 실수값 (e.g., 1024개 차원 모두 0이 아닌 값) | 대부분 0이고 특정 차원만 값이 있음 (e.g., 30K 차원 중 100개만 비-0) |
| 의미 포착 | 의미적 유사성(semantic similarity) 포착에 강함. "자동차"와 "차량"의 유사성 인식. | 어휘적 유사성(lexical similarity) 포착에 강함. 정확한 키워드 매칭. |
| 강점 | 동의어, 패러프레이즈, 다국어 매칭 | 전문 용어, 고유명사, 약어의 정확한 매칭 |
| 약점 | 희귀 전문 용어, 최신 신조어에 약함 | 동의어, 의미적 유사성 포착 불가 |
| 대표 모델 | text-embedding-3-large, BGE-M3, Cohere embed-v4 | BM25 (통계 기반), SPLADE (학습 기반) |
5.2.1.2. Bi-encoder vs Cross-encoder 아키텍처
| 구분 | Bi-encoder | Cross-encoder |
| 작동 방식 | 쿼리와 문서를 독립적으로 임베딩 후 유사도 계산 | 쿼리와 문서를 하나의 입력으로 결합하여 관련도 직접 예측 |
| 속도 | 매우 빠름 (문서 임베딩 사전 계산, 검색 시 쿼리만 임베딩) | 느림 (매 쿼리마다 모든 후보 문서와 쌍으로 계산) |
| 정확도 | 상 | 매우 높음 (두 텍스트의 상호작용을 직접 모델링) |
| RAG에서의 역할 | 1차 검색 (Retrieval): 대량 문서에서 후보 추출 | 2차 재순위 (Reranking): 후보 문서의 정밀 순위화 |
| 대표 모델 | text-embedding-3-large, BGE-M3 | Cohere Rerank, cross-encoder/ms-marco-MiniLM-L-12-v2 |
- 실무 패턴: 1차 검색(Bi-encoder, 빠름)으로 상위 20~50개 후보를 추출한 뒤, 2차 재순위(Cross-encoder, 정밀)로 상위 4~6개를 선별하는 2-stage retrieval이 프로덕션 표준이다.
5.2.1.3. Late Interaction 모델 (ColBERT)
- ColBERT는 Bi-encoder의 속도와 Cross-encoder의 정확도를 절충한 아키텍처이다. 쿼리와 문서를 독립적으로 인코딩하되, 토큰 레벨에서의 상호작용(MaxSim 연산)을 통해 더 정밀한 유사도를 계산한다.
Bi-encoder: query → [CLS 벡터] ↔ [CLS 벡터] ← document (1개 벡터 비교)
Cross-encoder: [query + document] → relevance score (결합 후 직접 예측)
ColBERT: query → [토큰 벡터들] ↔ [토큰 벡터들] ← document (토큰별 MaxSim)
5.3. 임베딩 모델 변경 시 주의사항
- 임베딩 모델을 변경하면 기존 벡터와 새 모델의 벡터는 서로 다른 벡터 공간에 존재하므로, 반드시 전체 인덱스를 재구축해야 한다.
# 무중단 임베딩 모델 마이그레이션 전략 (Blue-Green 패턴)
# Phase 1: 새 모델로 병렬 인덱스 구축 (기존 서비스 영향 없음)
new_embedding = OpenAIEmbeddings(model="text-embedding-3-large", dimensions=1024)
new_database = Chroma.from_documents(
documents=document_list,
embedding=new_embedding,
collection_name="my-collection-v2", # 새 컬렉션
persist_directory="./chroma_db_v2",
)
# Phase 2: A/B 테스트 - 골든 셋으로 두 모델의 검색 품질 비교
golden_set = [
{"query": "연봉 5천만원인 직장인의 소득세는?", "expected_doc_ids": ["tax_001", "tax_002"]},
{"query": "의료비 세액공제 한도", "expected_doc_ids": ["tax_045"]},
# ... 50~100개 테스트 케이스
]
def evaluate_retriever(retriever, golden_set):
"""골든 셋 기반 검색 품질 평가"""
hits = 0
total = len(golden_set)
for case in golden_set:
docs = retriever.invoke(case["query"])
retrieved_ids = [d.metadata.get("doc_id") for d in docs]
if any(eid in retrieved_ids for eid in case["expected_doc_ids"]):
hits += 1
return hits / total
old_score = evaluate_retriever(old_retriever, golden_set)
new_score = evaluate_retriever(new_retriever, golden_set)
print(f"기존 모델 Hit Rate: {old_score:.2%}")
print(f"신규 모델 Hit Rate: {new_score:.2%}")
# Phase 3: 신규 모델이 우수하면 트래픽 전환
# Phase 4: 기존 인덱스 삭제
5.3.1. 재인덱싱 비용 추정:
| 문서 수 | 평균 청크 수/문서 | 총 청크 수 | 임베딩 비용 (text-embedding-3-large) | 소요 시간 (예상) |
| 1,000 | 30 | 30,000 | ~$0.20 | ~2분 |
| 10,000 | 30 | 300,000 | ~$2.00 | ~20분 |
| 100,000 | 30 | 3,000,000 | ~$20.00 | ~3시간 |
| 1,000,000 | 30 | 30,000,000 | ~$200.00 | ~30시간 |
- 실무 권장: 재인덱싱 비용이 $20 이상이면 반드시 소규모 샘플(1000건)로 검색 품질을 비교 테스트한 뒤 전체 재인덱싱 여부를 결정한다. 임베딩 캐싱을 활용하면 변경되지 않은 문서의 재임베딩을 방지할 수 있다.
6. Vector Database 구축
- 벡터 데이터베이스(Vector DB)는 임베딩된 벡터를 저장하고 ANN(Approximate Nearest Neighbor) 검색을 수행하는 핵심 인프라이다. 벡터 DB의 선택과 설정은 검색 속도, 정확도, 운영 비용, 확장성에 직접적인 영향을 미친다.
6.1. 벡터 DB 선택 기준
| 기준 | Chroma | Pinecone | FAISS | Weaviate | Qdrant | Milvus | pgvector |
| 유형 | 임베디드/클라이언트-서버 | 완전 관리형 SaaS | 라이브러리 | 자체 호스팅/클라우드 | 자체 호스팅/클라우드 | 자체 호스팅/클라우드 | PostgreSQL 확장 |
| 인덱싱 알고리즘 | HNSW | 독자적 (PQ+그래프 기반) | IVF, PQ, HNSW, ScaNN | HNSW | HNSW | IVF, HNSW, DiskANN | IVFFlat, HNSW |
| 최대 벡터 수 | ~100만 (메모리 의존) | 수십억 (서버리스) | 수십억 (디스크 기반) | 수억 | 수억 | 수십억 | 수천만 (인덱스 유형 의존) |
| 메타데이터 필터링 | 지원 (기본) | 지원 (강력, 복합 필터) | 제한적 (자체 구현 필요) | 지원 (GraphQL 기반, 강력) | 지원 (강력, 중첩 필터) | 지원 (스칼라 필터링) | 지원 (SQL WHERE 절) |
| 하이브리드 검색 | 미지원 | 지원 (Sparse+Dense) | 미지원 | 지원 (BM25+벡터) | 지원 (Sparse+Dense) | 지원 (Sparse+Dense) | 지원 (tsvector+벡터) |
| 멀티 테넌시 | 컬렉션 분리 | 네임스페이스 | 인덱스 분리 | 테넌트 기능 내장 | 컬렉션 분리 | 파티션 | 스키마/테이블 분리 |
| 백업/DR | 파일 복사 | 자동 관리 | 파일 저장/로드 | 스냅샷 | 스냅샷 | 스냅샷 | pg_dump (표준) |
| 비용 | 무료 | 무료 티어 + $0.08/1M read unit | 무료 | 무료(OSS) / 유료(Cloud) | 무료(OSS) / 유료(Cloud) | 무료(OSS) / 유료(Zilliz Cloud) | 무료 (PostgreSQL 확장) |
| 설치 난이도 | 매우 쉬움 | 쉬움 (API 키만) | 쉬움 | 중간 (Docker) | 쉬움 (Docker) | 중간 (Docker Compose) | 쉬움 (PostgreSQL 확장) |
| 적합 환경 | 개발/PoC, 소규모 프로덕션 | 프로덕션 (관리형 선호) | 대규모 로컬 검색, 연구 | 프로덕션 (자체 호스팅) | 프로덕션 (성능 중시) | 대규모 엔터프라이즈 | 기존 PostgreSQL 인프라 활용 |
6.1.1. 인덱싱 알고리즘 비교:
| 알고리즘 | 원리 | 검색 정확도 (Recall) | 검색 속도 | 메모리 사용량 | 적합한 상황 |
| HNSW | 계층적 그래프 탐색. 상위 레이어에서 대략적 위치를 찾고 하위 레이어로 내려가며 정밀 검색. | 매우 높음 (>0.98) | 매우 빠름 | 높음 (벡터 전체 메모리 상주) | 100만 벡터 이하, 높은 recall 필요 |
| IVF | 벡터 공간을 Voronoi 셀로 분할. 쿼리 벡터가 속한 셀과 인근 셀만 검색. | 높음 (0.90~0.95) | 빠름 | 중간 | 대규모 데이터, 메모리 제약 |
| PQ | 벡터를 서브벡터로 분할 후 각각 양자화. 메모리를 10~50배 절감. | 중간 (0.85~0.95) | 빠름 | 매우 낮음 | 수억 벡터, 메모리 극도로 제한 |
| ScaNN | Google 개발. 비대칭 해싱 + 이방성 벡터 양자화. | 매우 높음 | 매우 빠름 | 중간 | 대규모 고성능 요구 |
6.2. Chroma (로컬)
- Chroma는 로컬 환경에서 빠르게 벡터 DB를 구축할 수 있는 오픈소스 데이터베이스이다. HNSW 인덱스를 기본으로 사용하며, SQLite 기반 메타데이터 저장을 지원한다.
from langchain_chroma import Chroma
# ===== 최초 데이터 저장 (인덱싱) =====
database = Chroma.from_documents(
documents=document_list,
embedding=embedding,
collection_name="tax-collection-v1",
persist_directory="./chroma_db",
)
# ===== 이후 재사용 (기존 DB 로드) =====
database = Chroma(
collection_name="tax-collection-v1",
persist_directory="./chroma_db",
embedding_function=embedding,
)
6.2.1. HNSW 파라미터 튜닝
- Chroma의 HNSW 인덱스 파라미터를 조정하여 검색 성능을 최적화할 수 있다.
import chromadb
# ChromaDB 클라이언트 직접 생성 (고급 설정 시)
client = chromadb.PersistentClient(path="./chroma_db")
# HNSW 파라미터 커스텀 컬렉션 생성
collection = client.get_or_create_collection(
name="tax-collection-tuned",
metadata={
# HNSW 인덱스 빌드 시 파라미터
"hnsw:M": 32, # 각 노드의 최대 이웃 수 (기본 16)
# 높을수록 recall 증가, 메모리/빌드 시간 증가
# 16: 일반용, 32: 높은 recall, 64: 최고 recall
"hnsw:construction_ef": 200, # 인덱스 빌드 시 탐색 범위 (기본 100)
# 높을수록 인덱스 품질 향상, 빌드 시간 증가
# 100: 일반용, 200: 높은 품질, 500: 최고 품질
"hnsw:search_ef": 150, # 검색 시 탐색 범위 (기본 10)
# 높을수록 recall 증가, 검색 시간 증가
# 50: 빠른 검색, 100: 균형, 200: 높은 recall
"hnsw:space": "cosine", # 거리 함수: cosine, l2, ip
}
)
# M=16, ef=100: Recall ~0.95, 검색 ~1ms (10만 벡터 기준)
# M=32, ef=200: Recall ~0.98, 검색 ~2ms
# M=64, ef=500: Recall ~0.99, 검색 ~5ms
6.2.2. 배치 삽입 최적화
# 대량 문서 삽입 시 배치 처리로 성능 최적화
def batch_add_documents(database, documents, batch_size=500):
"""대량 문서를 배치로 벡터 DB에 삽입한다.
Chroma는 단일 요청에 많은 문서를 넣으면 메모리 문제가 발생할 수 있으므로
배치 단위로 분할하여 삽입한다.
"""
total = len(documents)
for i in range(0, total, batch_size):
batch = documents[i:i + batch_size]
database.add_documents(batch)
print(f" 삽입 진행: {min(i + batch_size, total)}/{total}")
print(f"총 {total}개 문서 삽입 완료")
6.3. Pinecone (클라우드)
- Pinecone은 완전 관리형 클라우드 벡터 데이터베이스로, 서버리스(Serverless) 아키텍처를 통해 자동 확장과 비용 최적화를 제공한다.
import os
from pinecone import Pinecone, ServerlessSpec
from langchain_pinecone import PineconeVectorStore
pc = Pinecone(api_key=os.environ.get("PINECONE_API_KEY"))
index_name = "rag-production-v1"
# 인덱스 생성 (최초 1회)
if index_name not in pc.list_indexes().names():
pc.create_index(
name=index_name,
dimension=3072, # 임베딩 차원 (text-embedding-3-large 기준)
metric="cosine", # 유사도 측정 방식
spec=ServerlessSpec(
cloud="aws",
region="us-east-1", # 서비스 지역과 가까운 리전 선택
),
)
6.3.1. Serverless vs Pod 비교
| 기준 | Serverless | Pod (s1/p1/p2) |
| 과금 방식 | 사용량 기반 (read/write unit) | 시간 기반 (Pod 수 × 시간) |
| 최소 비용 | $0 (무료 티어: 2GB 저장, 100만 읽기/월) | ~$70/월 (s1.x1 Pod 1개) |
| 확장성 | 자동 확장 (트래픽에 따라) | 수동/자동 스케일링 (Pod 추가) |
| 콜드 스타트 | 있음 (비활성 시 수백ms 지연) | 없음 (항상 활성) |
| 적합 환경 | 간헐적 트래픽, 비용 최적화 | 일정한 고트래픽, 낮은 레이턴시 필수 |
6.3.2. 네임스페이스 전략
# 네임스페이스를 활용한 멀티 테넌시 / 데이터 분리
# 하나의 인덱스 내에서 네임스페이스로 데이터를 논리적으로 분리한다
# 고객별 네임스페이스 (B2B SaaS 패턴)
database_customer_a = PineconeVectorStore.from_documents(
document_list_a, embedding,
index_name=index_name,
namespace="customer-a", # 고객 A의 데이터
)
database_customer_b = PineconeVectorStore.from_documents(
document_list_b, embedding,
index_name=index_name,
namespace="customer-b", # 고객 B의 데이터
)
# 검색 시 네임스페이스로 필터링 → 데이터 격리 보장
retriever_a = database_customer_a.as_retriever(
search_kwargs={"k": 4, "namespace": "customer-a"}
)
# 버전별 네임스페이스 (문서 버전 관리 패턴)
# namespace="tax-2024-v1", namespace="tax-2025-v1"
# → Blue-Green 배포 시 네임스페이스 전환으로 무중단 인덱스 업데이트
6.3.3. 증분 업데이트 (Upsert) 전략
# 전체 재인덱싱 대신 변경된 문서만 업데이트하는 패턴
import hashlib
def upsert_documents(database, new_documents, namespace="default"):
"""변경된 문서만 벡터 DB에 업서트한다.
문서 content의 해시를 ID로 사용하여 중복을 방지한다.
"""
for doc in new_documents:
# 문서 내용 기반 고유 ID 생성
doc_id = hashlib.md5(doc.page_content.encode()).hexdigest()
doc.metadata["doc_id"] = doc_id
# Pinecone의 upsert: 동일 ID → 덮어쓰기, 새 ID → 추가
database.add_documents(
new_documents,
ids=[doc.metadata["doc_id"] for doc in new_documents],
namespace=namespace,
)
print(f"{len(new_documents)}개 문서 업서트 완료 (namespace: {namespace})")
6.4. 벡터 DB 선택 가이드
| 상황 | 1순위 추천 | 2순위 추천 | 이유 |
| 로컬 개발/PoC | Chroma | FAISS | 설정 간단, 무료, pip install로 즉시 시작 |
| 기존 PostgreSQL 인프라 | pgvector | Qdrant | 추가 인프라 불필요, SQL 활용 가능, 트랜잭션 보장 |
| 관리형 프로덕션 (소규모) | Pinecone Serverless | Qdrant Cloud | 무료 티어 충분, 운영 부담 없음 |
| 관리형 프로덕션 (대규모) | Pinecone Pod | Weaviate Cloud | 자동 확장, SLA 보장, 엔터프라이즈 지원 |
| 자체 호스팅 (성능 중시) | Qdrant | Milvus | HNSW 최적화, 빠른 필터링, 경량 |
| 하이브리드 검색 필수 | Weaviate | Qdrant | BM25 + 벡터 검색 네이티브 지원 |
| 수십억 벡터 | Milvus (Zilliz) | Pinecone | DiskANN 지원, 디스크 기반 대규모 검색 |
| 멀티 모달 검색 | Weaviate | Milvus | 이미지/텍스트/오디오 벡터 통합 검색 |
6.5. 인덱스 관리 및 운영
6.5.1. 증분 인덱싱 전략
import hashlib
import json
from datetime import datetime
class IndexManager:
"""벡터 DB 인덱스의 증분 업데이트, 중복 제거, 버전 관리를 담당한다."""
def __init__(self, database, metadata_store_path="./index_metadata.json"):
self.database = database
self.metadata_store_path = metadata_store_path
self.indexed_hashes = self._load_metadata()
def _load_metadata(self):
try:
with open(self.metadata_store_path, "r") as f:
return json.load(f)
except FileNotFoundError:
return {}
def _save_metadata(self):
with open(self.metadata_store_path, "w") as f:
json.dump(self.indexed_hashes, f, indent=2)
def compute_hash(self, text: str) -> str:
return hashlib.sha256(text.encode()).hexdigest()
def incremental_index(self, documents: list) -> dict:
"""변경된 문서만 인덱싱한다.
Returns:
dict: {"added": int, "skipped": int, "updated": int}
"""
added, skipped, updated = 0, 0, 0
new_docs = []
for doc in documents:
content_hash = self.compute_hash(doc.page_content)
source = doc.metadata.get("source", "unknown")
if content_hash in self.indexed_hashes:
skipped += 1
continue
# 같은 source의 이전 버전이 있으면 업데이트로 처리
if source in {v.get("source") for v in self.indexed_hashes.values()}:
updated += 1
else:
added += 1
doc.metadata["content_hash"] = content_hash
doc.metadata["indexed_at"] = datetime.utcnow().isoformat()
new_docs.append(doc)
self.indexed_hashes[content_hash] = {
"source": source,
"indexed_at": doc.metadata["indexed_at"],
}
if new_docs:
self.database.add_documents(new_docs)
self._save_metadata()
result = {"added": added, "skipped": skipped, "updated": updated}
print(f"증분 인덱싱 완료: 추가 {added}, 건너뜀 {skipped}, 업데이트 {updated}")
return result
6.5.2. 인덱스 모니터링
def monitor_index_health(database, expected_count: int = None):
"""벡터 DB 인덱스의 건강 상태를 점검한다."""
collection = database._collection # Chroma 내부 컬렉션
# 기본 통계
count = collection.count()
print(f"인덱스 벡터 수: {count:,}")
if expected_count:
coverage = count / expected_count * 100
print(f"예상 대비 커버리지: {coverage:.1f}% ({count}/{expected_count})")
if coverage < 95:
print(" [WARN] 인덱싱 누락 가능. 소스 문서와 인덱스 비교 필요.")
# 메타데이터 분포 확인 (Chroma)
sample = collection.peek(limit=100)
if sample and sample.get("metadatas"):
categories = [m.get("category", "unknown") for m in sample["metadatas"]]
from collections import Counter
dist = Counter(categories)
print(f"카테고리 분포 (샘플 100): {dict(dist)}")
return {"count": count}
7. Retriever 생성
- Retriever는 벡터 DB에서 사용자 질의와 관련된 문서를 검색하는 컴포넌트이다. RAG 파이프라인에서 "검색 품질 = 답변 품질의 상한선"이라는 공식이 성립하므로, Retriever의 설정과 최적화가 최종 서비스 품질을 결정한다.
7.1. 기본 Retriever 생성
# 벡터 DB를 Retriever 인터페이스로 변환
retriever = database.as_retriever(
search_kwargs={"k": 4}
)
# 검색 테스트
query = "연봉 5천만원인 직장인의 소득세는?"
docs = retriever.invoke(query)
for i, doc in enumerate(docs):
print(f"\n--- 문서 {i+1} ({len(doc.page_content)}자) ---")
print(f"내용: {doc.page_content[:200]}...")
print(f"메타데이터: {doc.metadata}")
7.2. k 값 선정 기준
| k 값 | 특징 | 적합한 경우 | LLM 토큰 비용 (1500자/청크, 한국어) | 주의사항 |
| 1~2 | 가장 관련성 높은 결과만 반환 | 단순 사실 확인("X는 무엇인가?"), 정의 조회 | ~1,800~3,600 토큰 | 정보 부족으로 불완전한 답변 가능 |
| 3~4 | 관련성과 다양성의 균형 | 일반 Q&A, 절차 설명, 대부분의 RAG 용도 | ~5,400~7,200 토큰 | 가장 보편적인 설정 |
| 5~6 | 넓은 맥락 제공 | 비교 분석("A와 B의 차이"), 다면적 질의 | ~9,000~10,800 토큰 | 관련 없는 문서 포함 확률 증가 |
| 7~10 | 최대한 많은 문맥 | 종합 보고서 생성, 전체 맥락 필요 | ~12,600~18,000 토큰 | 비용 급증, 컨텍스트 윈도우 주의 |
# 동적 k 선택 패턴 - 질의 유형에 따라 k를 자동 조정
def get_dynamic_k(query: str) -> int:
"""질의 복잡도에 따라 k 값을 동적으로 결정한다."""
# 단순 패턴 기반 (프로덕션에서는 LLM 분류기 사용 권장)
if any(kw in query for kw in ["정의", "뜻", "무엇", "몇"]):
return 2 # 단순 사실 확인
elif any(kw in query for kw in ["비교", "차이", "장단점", "vs"]):
return 6 # 비교 분석
elif any(kw in query for kw in ["종합", "전체", "요약", "정리"]):
return 8 # 종합 분석
else:
return 4 # 기본값
7.3. 검색 방식별 Retriever 설정
7.3.1. 유사도 검색 (기본)
# 코사인 유사도가 높은 순서대로 k개 문서를 반환
retriever = database.as_retriever(
search_type="similarity",
search_kwargs={"k": 4}
)
7.3.1.1. 거리 메트릭 비교:
| 메트릭 | 수식 | 특성 | 적합한 상황 |
| Cosine | 1−cos(A,B) | 벡터의 방향만 비교, 크기 무시. 정규화된 벡터에서 최적. | 텍스트 임베딩 (가장 보편적). 대부분의 임베딩 모델이 정규화된 벡터를 출력한다. |
| Euclidean (L2) | ∥A−B∥2 | 벡터 간의 직선 거리. 크기와 방향 모두 고려. | 정규화되지 않은 벡터, 이미지 임베딩. |
| Dot Product (IP) | A⋅B | 벡터의 크기와 방향 모두 반영. 정규화된 벡터에서 코사인과 동일. | 정규화된 벡터에서 코사인 대안(연산이 더 빠름). Pinecone에서 성능 최적화 시 사용. |
7.3.1.2. 실무 권장:
- 대부분의 텍스트 임베딩 모델(OpenAI, Cohere, Voyage 등)은 정규화된 벡터를 출력하므로, Cosine과 Dot Product의 결과가 동일하다. Dot Product가 연산 비용이 약간 낮으므로 대규모 인덱스에서는 Dot Product를 선택하는 것이 유리하다.
7.3.2. MMR (Maximal Marginal Relevance) 검색
- MMR은 유사도가 높으면서도 서로 다양한(중복이 적은) 문서를 반환하는 알고리즘이다. 유사한 내용의 문서가 많이 포함되어 있는 도메인(법률 문서, 규정집, FAQ 등)에서 특히 유용하다.
retriever = database.as_retriever(
search_type="mmr",
search_kwargs={
"k": 4, # 최종 반환할 문서 수
"fetch_k": 20, # 1차 후보 문서 수 (k의 3~5배)
"lambda_mult": 0.5, # 0: 다양성 최대, 1: 유사도만 고려 (0.5: 균형)
}
)
7.3.2.1. MMR 알고리즘 동작 원리:
1. fetch_k=20개의 후보 문서를 벡터 유사도로 검색
2. 첫 번째 문서: 유사도가 가장 높은 문서 선택
3. 두 번째 문서: 쿼리와의 유사도는 높으면서, 이미 선택된 문서와의 유사도는 낮은 문서 선택
4. 반복하여 k=4개 선택
MMR Score = λ × sim(query, doc) - (1-λ) × max(sim(doc, selected_doc))
λ=1.0: 순수 유사도 검색과 동일 (다양성 무시)
λ=0.5: 유사도와 다양성 균형 (기본 권장)
λ=0.0: 다양성만 고려 (유사도 무시, 비현실적)
7.3.2.2. lambda_mult 튜닝 가이드:
| 도메인 | 권장 lambda_mult | 이유 |
| 법률/규정 (유사 조항 다수) | 0.3~0.4 | 중복 조항 방지, 다양한 관점 확보 |
| FAQ (유사 질문 다수) | 0.3~0.5 | 중복 답변 방지 |
| 일반 기술 문서 | 0.5~0.7 | 균형 잡힌 검색 |
| 특정 주제 딥다이브 | 0.7~0.9 | 관련 정보 최대 수집 |
7.3.3. 유사도 점수 임계값 검색
retriever = database.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={
"score_threshold": 0.7, # 최소 유사도 점수
"k": 4, # 최대 반환 문서 수
}
)
# 유사도가 0.7 미만인 문서는 반환하지 않음
# → 관련 없는 문서가 LLM에 전달되는 것을 방지
# → 반환 문서 수가 k보다 적거나 0개일 수 있음
7.3.3.1. 임계값 캘리브레이션 방법론:
def calibrate_threshold(database, test_queries, relevant_docs):
"""테스트 쿼리셋으로 최적 유사도 임계값을 결정한다.
임계값이 너무 높으면 관련 문서도 필터링됨 (recall 감소).
임계값이 너무 낮으면 무관한 문서가 포함됨 (precision 감소).
"""
thresholds = [0.5, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9]
results = []
for threshold in thresholds:
retriever = database.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"score_threshold": threshold, "k": 10}
)
total_precision, total_recall = 0, 0
for query, expected in zip(test_queries, relevant_docs):
docs = retriever.invoke(query)
retrieved_set = {d.metadata.get("doc_id") for d in docs}
expected_set = set(expected)
if retrieved_set:
precision = len(retrieved_set & expected_set) / len(retrieved_set)
else:
precision = 1.0 # 아무것도 반환하지 않으면 정밀도는 1
recall = len(retrieved_set & expected_set) / len(expected_set) if expected_set else 0
total_precision += precision
total_recall += recall
avg_precision = total_precision / len(test_queries)
avg_recall = total_recall / len(test_queries)
f1 = 2 * avg_precision * avg_recall / (avg_precision + avg_recall) if (avg_precision + avg_recall) > 0 else 0
results.append({
"threshold": threshold,
"precision": avg_precision,
"recall": avg_recall,
"f1": f1,
})
print(f" threshold={threshold:.2f}: P={avg_precision:.3f}, R={avg_recall:.3f}, F1={f1:.3f}")
# F1이 가장 높은 임계값 선택
best = max(results, key=lambda x: x["f1"])
print(f"\n최적 임계값: {best['threshold']} (F1={best['f1']:.3f})")
return best["threshold"]
7.3.4. 하이브리드 검색 (BM25 + Vector)
- 하이브리드 검색은 Dense(벡터) 검색과 Sparse(BM25) 검색을 결합하여 두 방식의 장점을 모두 활용한다. 벡터 검색이 놓치는 정확한 키워드 매칭을 BM25가 보완하고, BM25가 놓치는 의미적 유사성을 벡터 검색이 보완한다.
from langchain_community.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever
# 1. BM25 Retriever (Sparse 검색)
# 문서의 텍스트를 TF-IDF 기반으로 인덱싱하여 키워드 매칭 검색
bm25_retriever = BM25Retriever.from_documents(
document_list,
k=4, # 반환 문서 수
)
# BM25는 정확한 용어 매칭에 강함
# 예: "종합소득세" 검색 시 "종합소득세"라는 단어가 포함된 문서를 정확히 찾음
# 2. Vector Retriever (Dense 검색)
vector_retriever = database.as_retriever(
search_kwargs={"k": 4}
)
# 벡터 검색은 의미적 유사성에 강함
# 예: "세금 계산 방법" 검색 시 "과세표준 산출 절차" 문서도 검색됨
# 3. Ensemble Retriever (하이브리드)
# RRF(Reciprocal Rank Fusion)로 두 검색 결과를 병합
ensemble_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, vector_retriever],
weights=[0.3, 0.7], # BM25 30%, Vector 70% 가중치
# weights 조정 가이드:
# [0.5, 0.5]: 균형 (기본값)
# [0.3, 0.7]: 의미 검색 중심 (일반 Q&A)
# [0.7, 0.3]: 키워드 검색 중심 (전문 용어가 중요한 도메인)
)
# 테스트: 하이브리드 검색
docs = ensemble_retriever.invoke("종합소득세 계산 방법")
print(f"하이브리드 검색 결과: {len(docs)}개 문서")
7.3.4.1. RRF (Reciprocal Rank Fusion) 알고리즘:
RRF Score(doc) = Σ 1 / (k + rank_i(doc))
예시 (k=60, 기본값):
BM25에서 doc_A가 1위, Vector에서 doc_A가 3위인 경우:
RRF(doc_A) = 1/(60+1) + 1/(60+3) = 0.0164 + 0.0159 = 0.0323
BM25에서 doc_B가 5위, Vector에서 doc_B가 1위인 경우:
RRF(doc_B) = 1/(60+5) + 1/(60+1) = 0.0154 + 0.0164 = 0.0318
→ doc_A가 doc_B보다 높은 RRF 점수 (두 검색 모두에서 상위에 있으므로)
7.3.4.2. 하이브리드 검색 가중치(alpha) 튜닝:
| 도메인 | BM25 가중치 | Vector 가중치 | 이유 |
| 법률/의료 (전문 용어 중심) | 0.5~0.7 | 0.3~0.5 | 정확한 법률 용어 매칭이 핵심 |
| 일반 Q&A (자연어 질의) | 0.2~0.3 | 0.7~0.8 | 다양한 표현의 의미 매칭이 중요 |
| 코드 검색 | 0.4~0.5 | 0.5~0.6 | 함수명/변수명 정확 매칭 + 의미 검색 |
| 다국어 검색 | 0.1~0.2 | 0.8~0.9 | BM25는 다국어에 약함, 벡터 검색에 의존 |
7.3.5. Re-ranking
- Re-ranking은 1차 검색 결과를 Cross-encoder 모델로 재평가하여 정밀한 순위를 매기는 2단계 검색 전략이다. 1차 검색에서 놓치거나 잘못 순위가 매겨진 문서를 보정할 수 있다.
# Cohere Reranker를 활용한 Re-ranking
from langchain.retrievers import ContextualCompressionRetriever
from langchain_cohere import CohereRerank
# Cohere Rerank 모델 초기화
reranker = CohereRerank(
model="rerank-v3.5", # 최신 Rerank 모델
top_n=4, # 최종 반환할 문서 수
)
# 1차 검색: 넓게 검색 (k=20)
base_retriever = database.as_retriever(
search_kwargs={"k": 20} # Reranker에 충분한 후보 제공
)
# 2차 Re-ranking: Cross-encoder로 정밀 재순위화
rerank_retriever = ContextualCompressionRetriever(
base_compressor=reranker,
base_retriever=base_retriever,
)
# 검색 실행
docs = rerank_retriever.invoke("소득세 계산 방법")
# → 20개 후보 중 Reranker가 가장 관련성 높은 4개를 선별하여 반환
7.3.5.1. Re-ranking 모델 비교:
| 모델 | 제공자 | 비용 (1000 검색) | 레이턴시 | 정확도 | 특이사항 |
| rerank-v3.5 | Cohere | $0.00 | ~100ms | 매우 높음 | 다국어 지원, 한국어 우수. RAG 최적화. |
| jina-reranker-v2 | Jina AI | $0.00 | ~80ms | 높음 | 빠른 속도, 코드 검색에 강점. |
| cross-encoder/ms-marco | HuggingFace (OSS) | 무료 (자체 호스팅) | ~200ms (GPU) | 높음 | 영어 전용, GPU 필요. |
| BGE-reranker-v2-m3 | BAAI (OSS) | 무료 (자체 호스팅) | ~150ms (GPU) | 높음 | 다국어 지원, 오픈소스. |
| LLM 기반 Reranking | OpenAI 등 | $0.01~0.10 | ~500ms+ | 매우 높음 | 가장 유연, 가장 비쌈. 커스텀 기준 적용 가능. |
7.3.5.2. Re-ranking이 효과적인 상황:
| 상황 | 개선 효과 | 이유 |
| 임베딩 모델의 도메인 적합성이 낮을 때 | 높음 (MRR +10~20%) | Cross-encoder가 도메인 맥락을 보완 |
| 유사한 문서가 많은 도메인 (법률, 의료) | 높음 | 미세한 관련성 차이를 구분 |
| 멀티홉 질의 (여러 정보를 조합해야 하는 질의) | 중간 | 간접적으로 관련된 문서의 순위를 올림 |
| 이미 검색 품질이 높을 때 (Hit Rate > 0.95) | 낮음 | 개선 여지가 적음 |
7.3.6. Multi-Query Retrieval
- 단일 쿼리로 검색하면 표현 방식의 한계로 관련 문서를 놓칠 수 있다. Multi-Query Retrieval은 원본 질의를 여러 관점에서 재작성하여 검색 커버리지를 높인다.
from langchain.retrievers import MultiQueryRetriever
# Multi-Query Retriever: LLM이 원본 질의를 3개의 다른 관점으로 재작성
multi_retriever = MultiQueryRetriever.from_llm(
retriever=database.as_retriever(search_kwargs={"k": 4}),
llm=ChatOpenAI(model="gpt-4.1-nano", temperature=0.3),
# 내부적으로 3개의 변형 쿼리를 생성하고 각각 검색한 뒤 결과를 병합
)
# 예: "소득세 계산 방법" 질의 시 LLM이 생성하는 변형 쿼리:
# 1. "근로소득에 대한 세금 산출 절차"
# 2. "종합소득세 과세표준 계산 방식"
# 3. "급여 소득자의 세액 결정 과정"
# → 3개 쿼리의 검색 결과를 합치고 중복을 제거하여 반환
7.3.6.1. HyDE (Hypothetical Document Embeddings):
- HyDE는 LLM으로 가상의 답변 문서를 생성한 뒤, 그 문서의 임베딩으로 검색하는 기법이다. 질의와 문서 간의 어휘 차이(lexical gap)를 극복할 수 있다.
from langchain.chains import HypotheticalDocumentEmbedder
# HyDE: 질의 → LLM이 가상 답변 생성 → 가상 답변의 임베딩으로 검색
hyde_embeddings = HypotheticalDocumentEmbedder.from_llm(
llm=ChatOpenAI(model="gpt-4.1-nano", temperature=0.3),
base_embeddings=embedding,
prompt_key="web_search", # 프롬프트 유형
)
# 예: "세금 줄이는 방법" 질의 시
# Step 1: LLM이 가상 답변 생성 → "세금을 줄이기 위해서는 소득공제, 세액공제,
# 비과세 소득 활용, 연금저축 납입, 의료비 공제 등을 활용할 수 있다..."
# Step 2: 이 가상 답변을 임베딩하여 벡터 DB에서 검색
# → 질의("세금 줄이는 방법")보다 가상 답변이 실제 문서와 더 유사한 어휘를 사용하므로
# 검색 품질이 향상됨
# 주의: HyDE는 LLM 호출이 추가되므로 레이턴시와 비용이 증가한다
# 검색 품질이 충분하지 않은 경우에만 선택적으로 적용한다
7.3.6.2. Step-back Prompting:
- 구체적인 질의를 한 단계 추상화하여 더 넓은 범위의 문서를 검색하는 기법이다.
from langchain_core.prompts import ChatPromptTemplate
# Step-back 프롬프트: 구체적 질의 → 상위 개념 질의로 변환
stepback_prompt = ChatPromptTemplate.from_messages([
("system", """주어진 질문을 한 단계 추상화하여 더 넓은 범위의 배경 질문으로 변환하세요.
구체적인 수치나 조건을 제거하고, 상위 개념에 대한 질문으로 바꾸세요.
예시:
질문: "연봉 5천만원인 직장인의 소득세는 얼마인가?"
배경 질문: "근로소득에 대한 소득세 과세 체계와 세율 구조는 어떻게 되는가?"
질문: "2025년 의료비 세액공제 한도는?"
배경 질문: "의료비 세액공제의 요건, 공제 대상, 한도에 관한 규정은?"
"""),
("human", "{query}")
])
# 사용: 원본 질의 + Step-back 질의 모두로 검색 → 결과 병합
# → 구체적 답변 문서 + 배경 지식 문서를 함께 LLM에 제공
7.3.7. 쿼리 변환 전략 비교
| 전략 | 원리 | 추가 LLM 호출 | 레이턴시 증가 | 효과적인 상황 |
| Multi-Query | 원본 질의를 3개 관점으로 재작성 | 1회 (3개 쿼리 생성) | +200~500ms | 표현이 다양한 도메인, 동의어가 많은 경우 |
| HyDE | 가상 답변 문서를 생성하여 검색 | 1회 (가상 답변 생성) | +300~800ms | 질의-문서 간 어휘 갭이 큰 경우 |
| Step-back | 구체적 질의를 추상화 | 1회 (추상 질의 생성) | +200~400ms | 구체적 수치/조건 질의에서 배경 지식 필요 시 |
| Query Decomposition | 복합 질의를 하위 질의로 분해 | 1회 (분해) | +300~600ms | 멀티홉 질의, 여러 정보를 조합해야 하는 경우 |
7.4. 검색 결과 검증
7.4.1. 자동화된 검색 품질 평가
import numpy as np
def evaluate_retrieval(retriever, golden_set: list[dict]) -> dict:
"""골든 셋 기반 검색 품질을 자동 평가한다.
Args:
retriever: 평가할 Retriever
golden_set: [{"query": str, "relevant_doc_ids": list[str]}, ...]
각 질의에 대해 관련 문서 ID 목록을 포함
Returns:
dict: 평가 메트릭 (Hit Rate, MRR, NDCG, Precision, Recall)
"""
hit_rates = []
mrrs = []
precisions = []
recalls = []
for case in golden_set:
query = case["query"]
expected = set(case["relevant_doc_ids"])
docs = retriever.invoke(query)
retrieved = [d.metadata.get("doc_id", "") for d in docs]
# Hit Rate@k: 상위 k개 중 최소 1개의 관련 문서가 있는가?
hit = 1 if any(r in expected for r in retrieved) else 0
hit_rates.append(hit)
# MRR (Mean Reciprocal Rank): 첫 번째 관련 문서의 순위의 역수
rr = 0
for rank, doc_id in enumerate(retrieved, 1):
if doc_id in expected:
rr = 1 / rank
break
mrrs.append(rr)
# Precision@k: 반환된 문서 중 관련 문서 비율
relevant_retrieved = sum(1 for r in retrieved if r in expected)
precision = relevant_retrieved / len(retrieved) if retrieved else 0
precisions.append(precision)
# Recall@k: 전체 관련 문서 중 검색된 비율
recall = relevant_retrieved / len(expected) if expected else 0
recalls.append(recall)
metrics = {
"hit_rate": np.mean(hit_rates),
"mrr": np.mean(mrrs),
"precision": np.mean(precisions),
"recall": np.mean(recalls),
"n_queries": len(golden_set),
}
print("=" * 60)
print("검색 품질 평가 리포트")
print("=" * 60)
print(f"평가 질의 수: {metrics['n_queries']}")
print(f"Hit Rate@{len(docs)}: {metrics['hit_rate']:.3f}")
print(f"MRR: {metrics['mrr']:.3f}")
print(f"Precision@{len(docs)}: {metrics['precision']:.3f}")
print(f"Recall@{len(docs)}: {metrics['recall']:.3f}")
# 품질 판정
if metrics["hit_rate"] >= 0.90 and metrics["mrr"] >= 0.75:
print("\n→ 우수: 프로덕션 배포 가능")
elif metrics["hit_rate"] >= 0.75:
print("\n→ 양호: 개선 여지 있음 (Reranking, 하이브리드 검색 검토)")
else:
print("\n→ 미흡: 청킹 전략, 임베딩 모델, 쿼리 변환 재검토 필요")
return metrics
# 사용 예시
golden_set = [
{"query": "연봉 5천만원인 직장인의 소득세는?",
"relevant_doc_ids": ["tax_ch3_001", "tax_ch3_002"]},
{"query": "의료비 세액공제 한도",
"relevant_doc_ids": ["tax_ch5_012"]},
{"query": "퇴직금 세금 계산",
"relevant_doc_ids": ["tax_ch7_003", "tax_ch7_004"]},
# ... 최소 50~100개 테스트 케이스 권장
]
# metrics = evaluate_retrieval(retriever, golden_set)
7.4.2. 검색 디버깅 워크플로우
def debug_retrieval(database, query: str, k: int = 10):
"""검색 결과를 상세히 분석하여 문제를 진단한다."""
# 유사도 점수와 함께 검색
docs_with_scores = database.similarity_search_with_score(query, k=k)
print(f"\n질의: '{query}'")
print(f"{'='*70}")
for i, (doc, score) in enumerate(docs_with_scores):
print(f"\n[{i+1}] 유사도: {score:.4f}")
print(f" 소스: {doc.metadata.get('source', 'N/A')}")
print(f" 크기: {len(doc.page_content)}자")
print(f" 내용: {doc.page_content[:150]}...")
# 진단 정보
scores = [s for _, s in docs_with_scores]
print(f"\n{'='*70}")
print(f"유사도 분포: min={min(scores):.4f}, max={max(scores):.4f}, "
f"mean={sum(scores)/len(scores):.4f}")
# 점수 분포 분석
if max(scores) - min(scores) < 0.05:
print("[진단] 점수 차이가 매우 작음 → 임베딩 모델이 이 도메인에서 구분력이 낮을 수 있음")
print(" → 도메인 특화 임베딩 모델 시도 또는 하이브리드 검색 도입 권장")
elif scores[0] < 0.5:
print("[진단] 최고 유사도가 낮음 → 질의와 관련된 문서가 인덱스에 없거나, "
"질의 표현이 문서와 크게 다를 수 있음")
print(" → 문서 커버리지 확인 또는 쿼리 변환(Multi-Query, HyDE) 적용 권장")
else:
print("[진단] 정상 범위의 유사도 분포")
# debug_retrieval(database, "연봉 5천만원인 직장인의 소득세는?")
7.4.3. 컨텍스트 관련성 자동 스코어링
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
relevance_prompt = ChatPromptTemplate.from_messages([
("system", """당신은 검색 결과의 관련성을 평가하는 전문가입니다.
주어진 질문과 검색된 문서의 관련성을 1~5 점으로 평가하세요.
1: 전혀 무관
2: 약간 관련 (같은 주제이지만 질문에 답하는 데 도움 안 됨)
3: 부분적으로 관련 (일부 유용한 정보 포함)
4: 관련 있음 (질문에 답하는 데 유용한 정보 포함)
5: 매우 관련 (질문에 직접 답하는 핵심 정보 포함)
점수만 숫자로 답하세요."""),
("human", "질문: {query}\n\n검색된 문서:\n{document}")
])
async def score_context_relevance(query: str, documents: list, llm=None):
"""검색된 문서의 관련성을 LLM으로 자동 평가한다."""
if llm is None:
llm = ChatOpenAI(model="gpt-4.1-nano", temperature=0)
scores = []
for doc in documents:
chain = relevance_prompt | llm
result = await chain.ainvoke({
"query": query,
"document": doc.page_content[:1000],
})
try:
score = int(result.content.strip())
scores.append(score)
except ValueError:
scores.append(0)
avg_score = sum(scores) / len(scores) if scores else 0
print(f"컨텍스트 관련성 평균: {avg_score:.1f}/5.0")
print(f"개별 점수: {scores}")
if avg_score < 3.0:
print("[경고] 검색 결과의 관련성이 낮음. 검색 전략 재검토 필요.")
return {"scores": scores, "average": avg_score}
- 실무 권장: 골든 셋 기반 자동 평가를 CI/CD 파이프라인에 통합하여, 인덱스 업데이트나 모델 변경 시 검색 품질이 기준 이하로 떨어지면 배포를 차단하는 품질 게이트를 설정한다. Hit Rate@4 ≥ 0.90, MRR ≥ 0.75를 프로덕션 배포의 최소 기준값으로 권장한다.
8. RAG Chain 구성
- RAG Chain은 Retriever가 검색한 문서를 LLM에 전달하여 최종 답변을 생성하는 핵심 컴포넌트이다. LangChain은 세 가지 방식을 제공하며, 각각의 유연성과 제어 수준이 다르다.
8.1. 방법 A: RetrievalQA (레거시)
- `RetrievalQA`는 LangChain 초기부터 제공된 방식으로, 현재는 **레거시(deprecated)** 상태이다. 새 프로젝트에서는 사용을 권장하지 않는다.
from langchain.chains import RetrievalQA
from langchain import hub
prompt = hub.pull("rlm/rag-prompt")
qa_chain = RetrievalQA.from_chain_type(
llm,
retriever=retriever,
chain_type_kwargs={"prompt": prompt},
return_source_documents=True,
)
result = qa_chain.invoke({"query": "소득세 계산 방법은?"})
print(result["result"])
8.1.1. Deprecated 사유 및 기술적 한계:
| 한계 | 상세 설명 | 영향 |
| LCEL 비호환 | RunnableSequence 인터페이스를 구현하지 않아 ` | 파이프 연산자로 다른 Runnable과 조합 불가..batch(), .stream(), .astream_events()` 등 LCEL 표준 메서드 미지원 |
| 대화 이력 미지원 | chat_history 파라미터가 없어 멀티턴 대화를 구현하려면 별도의 메모리 관리 로직을 직접 구현해야 함 | 챗봇 형태의 RAG 서비스 구축 시 코드 복잡도가 크게 증가 |
| 중간 단계 접근 불가 | 검색 결과를 후처리(reranking, filtering)하거나 프롬프트 구성을 동적으로 변경하는 미들웨어 삽입이 구조적으로 어려움 | 검색 품질 개선을 위한 파이프라인 확장이 제한됨 |
| 콜백 시스템 제한 | LangChain v0.2+의 astream_events 기반 세밀한 토큰 단위 콜백과 호환되지 않음 | LangSmith 트레이싱의 세분화된 추적이 불완전 |
- 실무 권장: 기존 코드에서 `RetrievalQA`를 사용 중이라면 `create_retrieval_chain` 또는 LCEL 기반으로 마이그레이션을 계획한다. 마이그레이션 시 입력 키(`query` → `input`)와 출력 키(`result` → `answer`) 변경에 주의한다.
8.2. 방법 B: create_retrieval_chain (현재 권장)
- LangChain이 공식 권장하는 방식으로, 대화 이력 통합, 소스 문서 추적, LCEL 호환성을 모두 지원한다.
8.2.1. 기본 구성
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains import create_retrieval_chain
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
# 대화 이력을 지원하는 프롬프트 구성
prompt = ChatPromptTemplate.from_messages([
("system",
"당신은 세법 전문 AI입니다. 아래 컨텍스트를 기반으로 정확하게 답변하세요.\n"
"컨텍스트에 없는 내용은 '해당 정보를 찾을 수 없습니다'라고 답하세요.\n\n"
"컨텍스트:\n{context}"),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{input}"),
])
combine_docs_chain = create_stuff_documents_chain(llm, prompt)
retrieval_chain = create_retrieval_chain(retriever, combine_docs_chain)
8.2.2. 대화 이력 통합
from langchain_core.messages import HumanMessage, AIMessage
# 대화 이력 관리
chat_history = []
def ask_with_history(question: str) -> str:
"""대화 이력을 유지하며 RAG 질의를 수행한다."""
result = retrieval_chain.invoke({
"input": question,
"chat_history": chat_history,
})
# 대화 이력에 현재 질의/응답 추가
chat_history.append(HumanMessage(content=question))
chat_history.append(AIMessage(content=result["answer"]))
return result
# 멀티턴 대화 예시
r1 = ask_with_history("근로소득세 계산 방법을 알려주세요")
print(r1["answer"])
print(f"참조 문서 수: {len(r1['context'])}")
r2 = ask_with_history("위에서 설명한 계산에서 공제 항목은 무엇인가요?")
# "위에서 설명한" → chat_history를 통해 이전 대화 맥락을 이해
print(r2["answer"])
8.2.3. 소스 문서 추적과 출처 표시
def ask_with_sources(question: str) -> dict:
"""답변과 함께 소스 문서 정보를 구조화하여 반환한다."""
result = retrieval_chain.invoke({
"input": question,
"chat_history": [],
})
sources = []
for doc in result["context"]:
sources.append({
"content_preview": doc.page_content[:200],
"source": doc.metadata.get("source", "unknown"),
"page": doc.metadata.get("page", "N/A"),
"chunk_id": doc.metadata.get("chunk_id", "N/A"),
"relevance_score": doc.metadata.get("relevance_score", None),
})
return {
"answer": result["answer"],
"sources": sources,
"source_count": len(sources),
}
8.2.4. 에러 처리 패턴
import logging
from langchain_core.exceptions import OutputParserException
from openai import RateLimitError, APITimeoutError
logger = logging.getLogger(__name__)
def safe_retrieval_chain_invoke(
chain, question: str, chat_history: list = None,
max_retries: int = 3, fallback_answer: str = "일시적 오류가 발생했습니다."
) -> dict:
"""프로덕션용 안전한 RAG Chain 호출 래퍼.
- OpenAI Rate Limit: 지수 백오프로 재시도
- Timeout: 재시도 후 폴백 답변 반환
- 파싱 오류: 원본 텍스트 반환
- 예상치 못한 오류: 로깅 후 폴백 답변
"""
import time
chat_history = chat_history or []
for attempt in range(max_retries):
try:
result = chain.invoke({
"input": question,
"chat_history": chat_history,
})
# 빈 답변 검증
if not result.get("answer", "").strip():
logger.warning(f"빈 답변 반환됨: question={question}")
return {"answer": "답변을 생성하지 못했습니다.", "context": []}
return result
except RateLimitError as e:
wait_time = 2 ** attempt # 1초, 2초, 4초
logger.warning(f"Rate limit 도달. {wait_time}초 후 재시도 ({attempt+1}/{max_retries})")
time.sleep(wait_time)
except APITimeoutError:
logger.warning(f"API 타임아웃 ({attempt+1}/{max_retries})")
if attempt < max_retries - 1:
time.sleep(1)
continue
return {"answer": fallback_answer, "context": []}
except OutputParserException as e:
logger.error(f"출력 파싱 오류: {e}")
# 파싱 실패 시 원본 텍스트 반환 시도
return {"answer": str(e.llm_output) if hasattr(e, 'llm_output') else fallback_answer, "context": []}
except Exception as e:
logger.error(f"예상치 못한 오류: {type(e).__name__}: {e}", exc_info=True)
return {"answer": fallback_answer, "context": []}
return {"answer": fallback_answer, "context": []}
8.3. 방법 C: LCEL 기반 커스텀 RAG Chain (최고 유연성)
- LCEL(LangChain Expression Language)을 사용하면 파이프라인의 모든 단계를 세밀하게 제어할 수 있다. 스트리밍, 비동기 처리, 중간 단계 커스터마이징이 필요한 프로덕션 환경에 가장 적합하다.
8.3.1. 핵심 Runnable 컴포넌트
| 컴포넌트 | 용도 | 사용 시점 | 예시 |
| RunnablePassthrough | 입력을 변환 없이 그대로 다음 단계로 전달. 여러 브랜치 중 하나에서 원본 값을 보존할 때 사용 | 원본 질문을 context 생성과 병렬로 전달할 때 | "question": RunnablePassthrough() |
| RunnableParallel | 여러 Runnable을 동시에 실행하여 결과를 딕셔너리로 합산. 내부적으로 asyncio.gather와 유사하게 동작 | 검색과 질문 전처리를 동시에 수행할 때 | RunnableParallel(context=retriever, question=RunnablePassthrough()) |
| RunnableLambda | 일반 Python 함수를 Runnable 인터페이스로 래핑. invoke, batch, stream을 자동 지원 | 검색 결과 포맷팅, 커스텀 전처리, 조건부 로직 | RunnableLambda(format_docs) |
| RunnableBranch | 조건에 따라 서로 다른 Runnable을 선택적으로 실행. if-else 분기를 선언적으로 표현 | 질문 유형에 따라 다른 프롬프트/체인을 라우팅할 때 | RunnableBranch((is_tax_q, tax_chain), default_chain) |
| RunnableWithFallbacks | 기본 Runnable 실패 시 대체 Runnable을 순차적으로 시도 | GPT-4o 실패 시 GPT-4o-mini로 폴백 | main_chain.with_fallbacks([fallback_chain]) |
8.3.2. 완전한 LCEL RAG 파이프라인
from langchain_core.runnables import (
RunnablePassthrough, RunnableLambda, RunnableParallel
)
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from typing import List
from langchain_core.documents import Document
# --- 유틸리티 함수 ---
def format_docs(docs: List[Document]) -> str:
"""검색된 문서 리스트를 번호 매기기와 출처 정보를 포함한 텍스트로 변환한다."""
formatted = []
for i, doc in enumerate(docs, 1):
source = doc.metadata.get("source", "N/A")
page = doc.metadata.get("page", "")
header = f"[문서 {i}] (출처: {source}{f', p.{page}' if page else ''})"
formatted.append(f"{header}\n{doc.page_content}")
return "\n\n---\n\n".join(formatted)
def log_retrieval(docs: List[Document]) -> List[Document]:
"""검색 결과를 로깅하고 그대로 반환한다 (디버깅용 미들웨어)."""
logger.info(f"검색된 문서 수: {len(docs)}")
for i, doc in enumerate(docs):
logger.debug(f" [{i+1}] {doc.page_content[:80]}...")
return docs
# --- 프롬프트 ---
rag_prompt = ChatPromptTemplate.from_messages([
("system",
"당신은 전문 상담 AI입니다.\n\n"
"## 규칙\n"
"1. 아래 [참고 문서]만을 근거로 답변하세요.\n"
"2. 문서에 없는 내용은 '제공된 문서에서 해당 정보를 찾을 수 없습니다'라고 답하세요.\n"
"3. 답변 마지막에 참조한 문서 번호를 [출처: 문서 N] 형식으로 표시하세요.\n"
"4. 수치 계산이 필요하면 단계별로 과정을 보여주세요.\n\n"
"## 참고 문서\n{context}"),
("human", "{question}"),
])
# --- LCEL 파이프라인 조립 ---
rag_chain = (
RunnableParallel(
context=(
RunnablePassthrough() # 질문 텍스트를 받아
| retriever # 벡터 검색 수행
| RunnableLambda(log_retrieval) # 검색 결과 로깅 (선택)
| RunnableLambda(format_docs) # 문서 포맷팅
),
question=RunnablePassthrough(), # 원본 질문 보존
)
| rag_prompt # 프롬프트 조립
| llm # LLM 호출
| StrOutputParser() # 문자열 파싱
)
# 동기 호출
answer = rag_chain.invoke("근로소득세 계산 방법은?")
print(answer)
8.3.3. 스트리밍 지원
# --- 스트리밍 (토큰 단위 실시간 출력) ---
async def stream_answer(question: str):
"""답변을 토큰 단위로 스트리밍한다. FastAPI의 StreamingResponse와 연동 가능."""
async for chunk in rag_chain.astream(question):
print(chunk, end="", flush=True)
print() # 줄바꿈
# Jupyter 노트북에서 실행
# import asyncio
# asyncio.run(stream_answer("소득세율은?"))
# --- FastAPI 연동 예시 ---
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/chat/stream")
async def chat_stream(question: str):
async def generate():
async for chunk in rag_chain.astream(question):
yield f"data: {chunk}\n\n" # SSE(Server-Sent Events) 형식
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
8.3.4. 비동기 배치 처리
import asyncio
async def batch_answer(questions: list[str], max_concurrency: int = 5) -> list[str]:
"""여러 질문을 동시에 처리한다. max_concurrency로 동시 요청 수를 제한."""
semaphore = asyncio.Semaphore(max_concurrency)
async def process_one(q: str) -> str:
async with semaphore:
return await rag_chain.ainvoke(q)
tasks = [process_one(q) for q in questions]
return await asyncio.gather(*tasks, return_exceptions=True)
# 사용 예시
questions = ["소득세율은?", "공제 항목은?", "연말정산 기간은?"]
results = asyncio.run(batch_answer(questions))
# LangChain 내장 batch 활용 (더 간단)
results = rag_chain.batch(questions, config={"max_concurrency": 5})
8.3.5. 세 가지 방식 종합 비교
| 기준 | RetrievalQA | create_retrieval_chain | LCEL 커스텀 |
| 유연성 | 낮음 (고정 구조) | 중간 (프롬프트/LLM 교체 가능) | 최고 (모든 단계 커스터마이징) |
| 스트리밍 | 미지원 | .stream() 지원 | .astream() 포함 완전 지원 |
| 비동기 | 미지원 | .ainvoke() 지원 | .ainvoke(), .abatch() 모두 지원 |
| 대화 이력 | 미지원 | chat_history 내장 | 직접 구현 (자유도 높음) |
| 중간 단계 접근 | 불가 | 제한적 | 모든 중간 결과 접근 가능 |
| 학습 곡선 | 낮음 | 중간 | 높음 (Runnable 패러다임 이해 필요) |
| 프로덕션 권장 | 비권장 | 일반적 사용 | 고급 요구사항 |
| LangSmith 트레이싱 | 기본 수준 | 상세 추적 | 커스텀 태그/메타데이터 추가 가능 |
- 실무 권장: 프로토타입은 `create_retrieval_chain`으로 빠르게 검증하고, 스트리밍/배치/커스텀 후처리가 필요해지면 LCEL 기반으로 전환한다. 두 방식 모두 내부적으로 LCEL을 사용하므로 전환 비용이 크지 않다.
8.4. 커스텀 프롬프트 작성
- 프롬프트는 RAG 파이프라인의 출력 품질을 결정하는 가장 중요한 요소이다. 동일한 검색 결과라도 프롬프트에 따라 답변 품질이 극적으로 달라진다.
8.4.1. 프롬프트 작성 핵심 요소 테이블
| 요소 | 설명 | 좋은 예시 | 안티패턴 (피해야 할 것) | 영향도 |
| 역할 설정 | LLM에게 전문가 페르소나를 부여하여 답변 톤과 깊이를 결정 | "당신은 10년 경력의 세무사입니다. 전문 용어를 사용하되 쉽게 풀어서 설명합니다." | "당신은 AI입니다" (너무 일반적), "당신은 모든 것을 아는 전문가입니다" (과도한 자신감 유도) | 높음 |
| 근거 제한 | 검색된 문서만을 답변의 근거로 사용하도록 명시적 제약 | "아래 [참고 문서]에 포함된 정보만을 사용하여 답변하세요. 문서에 없는 내용은 추론하지 마세요." | 근거 제한 없음 (LLM이 사전학습 지식으로 환각 생성) | 매우 높음 |
| 출력 형식 | 답변의 구조, 길이, 포맷을 명확히 지정 | "1) 핵심 답변 (2-3문장) 2) 상세 설명 (단계별) 3) 출처 표시" | "자유롭게 답변하세요" (일관성 없는 출력), 형식 지정 없음 | 중간 |
| 톤 지정 | 대상 사용자에 맞는 설명 수준과 어조 | "회계 비전공자도 이해할 수 있도록 쉬운 용어로 설명하되, 법률 조항 번호는 정확히 명시하세요" | 톤 미지정 (LLM이 과도하게 학술적이거나 캐주얼한 답변 생성) | 중간 |
| 실패 행동 | 답변할 수 없는 경우의 행동을 명시 | "해당 정보를 찾을 수 없습니다. 관련될 수 있는 키워드: [...]를 사용해 다시 질문해주세요." | 실패 행동 미정의 (LLM이 자체적으로 답변을 지어냄) | 높음 |
| 컨텍스트 변수 | 검색 문서가 삽입되는 위치를 명확히 구분 | ### 참고 문서\n{context} (마크다운 헤딩으로 구분) | {context} (구분자 없이 삽입하면 문서 경계가 모호) | 중간 |
8.4.2. Few-shot 프롬프트 설계
- RAG 프롬프트에 Few-shot 예시를 포함하면 답변의 형식과 품질을 일관되게 유지할 수 있다.
few_shot_prompt = ChatPromptTemplate.from_messages([
("system", """당신은 세법 전문 상담 AI입니다.
## 규칙
1. [참고 문서]만을 근거로 답변하세요.
2. 답변은 아래 형식을 따르세요.
## 답변 형식 예시
질문: 근로소득세율은 어떻게 되나요?
답변:
근로소득세는 과세표준 구간에 따라 6%~45%의 세율이 적용됩니다.
| 과세표준 | 세율 |
|---------|------|
| 1,400만원 이하 | 6% |
| 1,400만원~5,000만원 | 15% |
[출처: 문서 1 - 소득세법 제55조]
---
질문: 문서에 없는 내용을 물어본 경우
답변:
제공된 문서에서 해당 정보를 찾을 수 없습니다.
관련 키워드: [소득세, 근로소득] 등으로 다시 질문해보시기 바랍니다.
## 참고 문서
{context}"""),
("human", "{question}"),
])
8.4.3. Chain-of-Thought (CoT) 프롬프트
- 복잡한 계산이나 다단계 추론이 필요한 질문에는 CoT 프롬프트가 효과적이다.
cot_prompt = ChatPromptTemplate.from_messages([
("system", """당신은 세법 전문 상담 AI입니다.
## 추론 규칙
복잡한 질문에는 다음 단계를 거쳐 답변하세요:
1. **관련 조항 식별**: 참고 문서에서 질문과 관련된 조항을 식별합니다.
2. **핵심 정보 추출**: 해당 조항에서 답변에 필요한 수치/조건을 추출합니다.
3. **단계별 계산**: 계산이 필요한 경우 중간 과정을 모두 보여줍니다.
4. **최종 답변**: 결론을 명확하게 정리합니다.
5. **출처 표시**: 근거가 된 문서 번호를 명시합니다.
각 단계를 [단계 N]으로 표시하여 추론 과정을 투명하게 보여주세요.
## 참고 문서
{context}"""),
("human", "{question}"),
])
8.4.4. 구조화된 출력 (JSON) 프롬프트
- API 서비스로 제공할 때 JSON 형식의 구조화된 출력이 필요한 경우:
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel, Field
class RAGResponse(BaseModel):
answer: str = Field(description="사용자 질문에 대한 답변")
confidence: str = Field(description="답변 신뢰도: high, medium, low")
sources: list[str] = Field(description="참조한 문서 출처 리스트")
follow_up: str = Field(description="추가로 도움이 될 수 있는 안내")
parser = JsonOutputParser(pydantic_object=RAGResponse)
structured_prompt = ChatPromptTemplate.from_messages([
("system",
"당신은 세법 전문 상담 AI입니다.\n"
"아래 참고 문서를 기반으로 답변하고, 지정된 JSON 형식으로 출력하세요.\n\n"
"{format_instructions}\n\n"
"## 신뢰도 기준\n"
"- high: 문서에 직접적인 답변이 있는 경우\n"
"- medium: 문서 내용을 종합하여 추론한 경우\n"
"- low: 관련 정보가 부분적으로만 있는 경우\n\n"
"## 참고 문서\n{context}"),
("human", "{question}"),
])
structured_prompt = structured_prompt.partial(
format_instructions=parser.get_format_instructions()
)
structured_chain = (
RunnableParallel(
context=retriever | format_docs,
question=RunnablePassthrough(),
)
| structured_prompt
| llm
| parser
)
# 결과는 Python dict로 반환됨
result = structured_chain.invoke("소득세 과세표준 구간은?")
# {"answer": "...", "confidence": "high", "sources": ["문서 1"], "follow_up": "..."}
8.4.5. 프롬프트 버전 관리와 A/B 테스트
# --- 프롬프트 레지스트리 패턴 ---
PROMPT_REGISTRY = {
"v1_basic": ChatPromptTemplate.from_messages([
("system", "참고 문서를 기반으로 답변하세요.\n\n{context}"),
("human", "{question}"),
]),
"v2_structured": ChatPromptTemplate.from_messages([
("system",
"세법 전문 AI입니다. 규칙:\n"
"1. 참고 문서만 사용\n2. 출처 표시\n3. 단계별 설명\n\n{context}"),
("human", "{question}"),
]),
"v3_cot": cot_prompt, # 위에서 정의한 CoT 프롬프트
}
def build_chain(prompt_version: str = "v2_structured"):
"""프롬프트 버전을 선택하여 RAG Chain을 생성한다."""
prompt = PROMPT_REGISTRY[prompt_version]
return (
RunnableParallel(
context=retriever | format_docs,
question=RunnablePassthrough(),
)
| prompt | llm | StrOutputParser()
)
# A/B 테스트: 같은 질문에 대해 두 프롬프트의 답변 품질을 비교
import random
def ab_test_invoke(question: str, ratio: float = 0.5) -> tuple[str, str]:
"""A/B 테스트. ratio 확률로 v2(A)를 사용하고, 나머지는 v3(B)를 사용."""
version = "v2_structured" if random.random() < ratio else "v3_cot"
chain = build_chain(version)
answer = chain.invoke(question)
return version, answer
- 실무 권장: 프롬프트 변경은 코드 변경과 동일한 수준으로 관리한다. Git으로 버전 관리하고, LangSmith Dataset으로 회귀 테스트를 수행하여 새 프롬프트가 기존 품질을 유지하는지 검증한다. 프롬프트 하나의 변경이 전체 답변 품질에 미치는 영향은 예측하기 어렵다.
8.4.6. 환각 방지 프롬프트 기법
- RAG에서 환각(Hallucination)은 LLM이 검색된 문서에 없는 내용을 자체 지식으로 생성하는 현상이다. 이는 RAG 파이프라인의 신뢰성을 근본적으로 훼손하므로 프롬프트 수준에서 체계적으로 방어해야 한다.
8.4.6.1. 환각 방지 5계층 방어 체계:
anti_hallucination_prompt = ChatPromptTemplate.from_messages([
("system", """당신은 세법 전문 상담 AI입니다.
## 1계층: 근거 한정 (Grounding)
반드시 아래 [참고 문서]에 포함된 정보만 사용하여 답변하세요.
당신의 사전 학습 지식은 절대 사용하지 마세요.
## 2계층: 인용 강제 (Citation Enforcement)
모든 주장에 대해 근거가 된 문서 번호를 [문서 N] 형식으로 인라인 표시하세요.
인용 없는 주장은 포함하지 마세요.
## 3계층: 신뢰도 표시 (Confidence Scoring)
답변 마지막에 신뢰도를 표시하세요:
- [신뢰도: 높음] - 문서에 직접적인 내용이 있음
- [신뢰도: 중간] - 문서 내용을 종합하여 추론
- [신뢰도: 낮음] - 부분적 정보만 존재
## 4계층: 모름 표현 (Calibrated Uncertainty)
다음 경우에는 솔직하게 한계를 인정하세요:
- 문서에 관련 내용이 전혀 없는 경우: "제공된 문서에서 해당 정보를 찾을 수 없습니다."
- 문서의 정보가 불완전한 경우: "문서에서 부분적인 정보만 확인됩니다: [확인된 내용]. 정확한 답변을 위해 추가 자료가 필요합니다."
- 모호한 경우: "이 부분은 해석의 여지가 있습니다" + 가능한 해석 제시
## 5계층: 자기 검증 (Self-Verification)
답변을 완성한 후, 다음을 스스로 확인하세요:
- 모든 수치가 문서에 있는 것과 정확히 일치하는가?
- 문서에 없는 내용을 추가하지 않았는가?
- 인용이 올바른 문서를 참조하고 있는가?
## 참고 문서
{context}"""),
("human", "{question}"),
])
8.4.6.2. 환각 유형별 방어 전략:
| 환각 유형 | 정의 | 발생 원인 | 프롬프트 방어 | 파이프라인 방어 |
| 사실 환각 | 문서에 없는 수치, 날짜, 법률 조항을 생성 | LLM의 사전학습 지식이 컨텍스트보다 우선 적용됨 | "사전 학습 지식을 절대 사용하지 마세요" + 인용 강제 | 답변 내 수치를 검색 문서와 대조 검증 |
| 추론 환각 | 문서의 내용을 과도하게 확장하여 잘못된 결론 도출 | 불완전한 컨텍스트에서 LLM이 논리적 빈틈을 메우려 시도 | "문서에 없는 추론은 하지 마세요" + 신뢰도 표시 | 검색 문서 수(k) 증가, 컨텍스트 보강 |
| 출처 환각 | 존재하지 않는 법률 조항이나 문서를 인용 | LLM이 그럴듯한 출처를 생성하는 경향 | "인용은 반드시 위 참고 문서의 번호만 사용" | 답변의 출처 참조를 실제 검색 문서와 매칭 검증 |
| 일관성 환각 | 같은 질문에 대해 매번 다른 답변 생성 | temperature > 0일 때 확률적 샘플링으로 인한 비결정적 출력 | temperature=0 설정 | 캐싱으로 동일 질문에 동일 답변 보장 |
8.5. Context Window 관리 전략
- LLM의 Context Window는 유한한 자원이다. GPT-4o의 128K 토큰이 충분해 보이지만, 실제로는 토큰 예산을 체계적으로 배분해야 답변 품질과 비용을 동시에 최적화할 수 있다.
8.5.1. 토큰 예산 배분
[Context Window 토큰 예산 배분 예시 - GPT-4o-mini, 128K 윈도우]
총 예산: 128,000 토큰
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
시스템 프롬프트: ~800 토큰 (0.6%) ← 역할, 규칙, 형식 지시
Few-shot 예시: ~1,200 토큰 (0.9%) ← 2-3개의 예시 (선택)
검색된 컨텍스트: ~12,000 토큰 (9.4%) ← k=4, 청크당 ~3,000 토큰
대화 이력: ~2,000 토큰 (1.6%) ← 최근 3-5 턴
사용자 질문: ~200 토큰 (0.2%)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
총 입력: ~16,200 토큰 (12.7%)
응답 예약: ~4,000 토큰 (3.1%) ← max_tokens 설정
실제 여유: ~107,800 토큰 (84.2%)
[비용 관점]
- gpt-4o-mini: 입력 $0.15/1M, 출력 $0.60/1M
- 위 예산 기준 1건 비용: ~$0.005 (약 6.5원)
- 월 10,000건 처리: ~$50 (약 65,000원)
- 실무 권장: Context Window가 넉넉하더라도 입력 토큰은 비용에 직결되므로, 필요한 만큼만 사용한다. GPT-4o 기준 입력 128K를 매번 채우면 건당 $0.32로, gpt-4o-mini의 64배 비용이 된다.
8.5.2. 컨텍스트 압축 기법
- 검색된 문서가 길거나 k 값이 큰 경우, 관련 부분만 추출하여 토큰을 절약한다.
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
# LLM 기반 컨텍스트 압축
# 검색된 문서에서 질문과 관련된 부분만 추출
compressor = LLMChainExtractor.from_llm(
llm=ChatOpenAI(model="gpt-4o-mini", temperature=0)
)
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=retriever,
)
# 원본 검색: 1500자 × 4개 = 6000자
# 압축 검색: 질문 관련 부분만 추출 → 약 1500~2000자
compressed_docs = compression_retriever.invoke("소득세율은?")
8.5.2.1. 컨텍스트 압축 방식 비교:
| 방식 | 메커니즘 | 압축률 | 추가 비용 | 지연 시간 | 적합한 상황 |
| LLMChainExtractor | LLM이 질문 관련 문장만 추출 | 50~70% 감소 | LLM 호출 1회 추가 (gpt-4o-mini 기준 ~$0.001) | +1~2초 | 고품질 추출이 필요하고 비용보다 정확도 우선 시 |
| LLMChainFilter | LLM이 관련 없는 문서를 통째로 제거 | 문서 단위 필터링 | LLM 호출 1회 추가 | +1~2초 | k를 크게 설정하고 후처리로 필터링 시 |
| EmbeddingsFilter | 임베딩 유사도가 임계값 이하인 문서 제거 | 문서 단위 필터링 | 임베딩 비용만 추가 (매우 저렴) | +0.1초 | 빠른 필터링이 필요하고 비용 민감 시 |
8.5.3. Map-Reduce 패턴
- 검색 결과가 Context Window를 초과하거나, 매우 많은 문서를 참조해야 할 때 사용한다.
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# --- Map 단계: 각 문서를 개별 요약 ---
map_prompt = ChatPromptTemplate.from_template(
"다음 문서에서 '{question}'과 관련된 핵심 정보만 추출하세요.\n"
"관련 정보가 없으면 '관련 정보 없음'이라고만 답하세요.\n\n"
"문서:\n{document}\n\n핵심 정보:"
)
map_chain = map_prompt | llm | StrOutputParser()
# --- Reduce 단계: 요약 결과를 종합하여 최종 답변 ---
reduce_prompt = ChatPromptTemplate.from_messages([
("system",
"아래는 여러 문서에서 추출한 핵심 정보입니다.\n"
"이 정보를 종합하여 질문에 답변하세요.\n"
"정보가 없으면 '해당 정보를 찾을 수 없습니다'라고 답하세요.\n\n"
"추출 정보:\n{summaries}"),
("human", "{question}"),
])
async def map_reduce_answer(question: str, docs: list, max_concurrency: int = 5):
"""Map-Reduce 패턴으로 대량 문서를 처리한다."""
import asyncio
# Map: 각 문서에서 관련 정보 병렬 추출
semaphore = asyncio.Semaphore(max_concurrency)
async def extract_one(doc):
async with semaphore:
return await map_chain.ainvoke({
"question": question,
"document": doc.page_content
})
summaries = await asyncio.gather(*[extract_one(d) for d in docs])
# "관련 정보 없음" 필터링
relevant = [s for s in summaries if "관련 정보 없음" not in s]
if not relevant:
return "제공된 문서에서 해당 정보를 찾을 수 없습니다."
# Reduce: 종합 답변 생성
combined = "\n---\n".join(relevant)
reduce_chain = reduce_prompt | llm | StrOutputParser()
return await reduce_chain.ainvoke({
"summaries": combined,
"question": question
})
9. 검색 품질 개선
- 기본 RAG 파이프라인의 검색 품질이 기대에 미치지 못할 때 적용하는 고급 기법들이다. 검색 품질은 RAG 답변 품질의 상한선을 결정하므로, 이 단계의 최적화가 전체 파이프라인 성능에 가장 큰 영향을 미친다.
9.1. 데이터 전처리 (Markdown 변환)
9.1.1 전처리 파이프라인 설계
[문서 전처리 파이프라인]
원본 문서 (PDF/DOCX/HTML)
│
├── [1단계: 텍스트 추출]
│ ├── PDF → PyMuPDF (텍스트 레이어)
│ ├── PDF (스캔) → Tesseract OCR / Upstage Document AI
│ ├── DOCX → docx2txt / python-docx
│ └── HTML → BeautifulSoup
│
├── [2단계: 구조 분석]
│ ├── 테이블 감지 → Camelot / Tabula
│ ├── 헤딩/섹션 감지 → 정규식 + 폰트 크기 분석
│ └── 이미지/차트 → GPT-4o Vision으로 텍스트 설명 생성
│
├── [3단계: Markdown 변환]
│ ├── 테이블 → Markdown 테이블 형식
│ ├── 리스트 → Markdown 리스트
│ └── 헤딩 → Markdown 헤딩 (#, ##, ###)
│
├── [4단계: 메타데이터 추출]
│ ├── 문서 제목, 작성일, 작성자
│ ├── 섹션 헤딩 (청크별 위치 정보)
│ └── 페이지 번호, 표 번호
│
└── [5단계: 클리닝]
├── 머리글/바닥글 제거
├── 페이지 번호 제거
├── 중복 공백/줄바꿈 정리
└── 인코딩 정규화 (UTF-8)
9.1.2. PDF 테이블 추출
import camelot
import pandas as pd
def extract_tables_from_pdf(pdf_path: str) -> list[str]:
"""PDF에서 테이블을 감지하여 Markdown 형식으로 변환한다.
Camelot의 두 가지 모드:
- lattice: 선으로 구분된 테이블에 적합 (정확도 높음)
- stream: 선 없이 공백으로 구분된 테이블에 적합
"""
markdown_tables = []
# lattice 모드로 먼저 시도
tables = camelot.read_pdf(pdf_path, flavor="lattice", pages="all")
if len(tables) == 0:
# lattice로 감지 안 되면 stream 모드 시도
tables = camelot.read_pdf(pdf_path, flavor="stream", pages="all")
for i, table in enumerate(tables):
df = table.df
# 첫 행을 헤더로 설정
df.columns = df.iloc[0]
df = df[1:]
# Markdown 테이블로 변환
md = df.to_markdown(index=False)
markdown_tables.append(f"[표 {i+1} - 페이지 {table.page}]\n{md}")
return markdown_tables
9.1.3. 스캔 문서 OCR 파이프라인
from langchain_community.document_loaders import PyMuPDFLoader
import pytesseract
from PIL import Image
import fitz # PyMuPDF
def ocr_scanned_pdf(pdf_path: str, lang: str = "kor+eng") -> list[dict]:
"""스캔된 PDF를 OCR 처리하여 텍스트와 메타데이터를 추출한다.
Args:
pdf_path: PDF 파일 경로
lang: Tesseract 언어 코드 (한국어+영어: "kor+eng")
Returns:
페이지별 텍스트와 메타데이터 리스트
"""
doc = fitz.open(pdf_path)
results = []
for page_num, page in enumerate(doc):
# 페이지를 이미지로 렌더링 (300 DPI)
pix = page.get_pixmap(dpi=300)
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
# OCR 수행
text = pytesseract.image_to_string(img, lang=lang)
# 신뢰도 정보 추출
ocr_data = pytesseract.image_to_data(img, lang=lang, output_type=pytesseract.Output.DICT)
confidences = [int(c) for c in ocr_data["conf"] if int(c) > 0]
avg_confidence = sum(confidences) / len(confidences) if confidences else 0
results.append({
"page": page_num + 1,
"text": text.strip(),
"ocr_confidence": round(avg_confidence, 1),
"source": pdf_path,
})
if avg_confidence < 70:
logger.warning(
f"OCR 신뢰도 낮음: {pdf_path} p.{page_num+1} "
f"(confidence={avg_confidence:.1f}%). 수동 검토 권장."
)
return results
9.1.4. 이미지/차트 설명 생성
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
import base64
def describe_image_with_vision(image_path: str) -> str:
"""이미지/차트를 GPT-4o Vision으로 분석하여 텍스트 설명을 생성한다.
생성된 설명은 텍스트 청크에 포함하여 검색 가능하게 만든다.
"""
vision_llm = ChatOpenAI(model="gpt-4o", temperature=0, max_tokens=1000)
with open(image_path, "rb") as f:
image_data = base64.b64encode(f.read()).decode("utf-8")
message = HumanMessage(content=[
{"type": "text", "text": (
"이 이미지의 내용을 상세히 설명해주세요. "
"차트인 경우 축 이름, 데이터 포인트, 추세를 포함하세요. "
"표인 경우 Markdown 테이블로 변환하세요. "
"한국어로 작성하세요."
)},
{"type": "image_url", "image_url": {
"url": f"data:image/png;base64,{image_data}"
}},
])
return vision_llm.invoke([message]).content
9.1.5. 메타데이터 추출 자동화
from langchain_core.documents import Document
import re
def enrich_metadata(doc: Document) -> Document:
"""문서 청크에 구조적 메타데이터를 자동으로 추가한다.
추가되는 메타데이터:
- section_heading: 청크가 속한 섹션의 헤딩
- has_table: 테이블 포함 여부
- has_numbers: 수치 데이터 포함 여부
- content_type: 내용 유형 (법조항, 설명, 예시, 표)
"""
text = doc.page_content
# 헤딩 추출 (Markdown 형식)
headings = re.findall(r'^#{1,3}\s+(.+)$', text, re.MULTILINE)
doc.metadata["section_heading"] = headings[0] if headings else "N/A"
# 테이블 포함 여부
doc.metadata["has_table"] = bool(re.search(r'\|.*\|.*\|', text))
# 수치 데이터 포함 여부 (퍼센트, 금액 등)
doc.metadata["has_numbers"] = bool(
re.search(r'\d+[%만원억조]|\d{1,3}(,\d{3})+', text)
)
# 내용 유형 분류
if re.search(r'제\d+조|제\d+항', text):
doc.metadata["content_type"] = "법조항"
elif doc.metadata["has_table"]:
doc.metadata["content_type"] = "표"
elif re.search(r'예[시를]|예컨대|가령', text):
doc.metadata["content_type"] = "예시"
else:
doc.metadata["content_type"] = "설명"
return doc
9.2. 키워드 사전 활용 (도메인 용어 매핑)
9.2.1. 자동 동의어 발견
- 수동으로 키워드 사전을 구축하는 것은 한계가 있다.
- 임베딩을 활용하여 자동으로 동의어와 유사 표현을 발견하는 방법:
from langchain_openai import OpenAIEmbeddings
import numpy as np
class AutoSynonymDiscovery:
"""임베딩 유사도를 기반으로 도메인 용어의 동의어를 자동 발견한다."""
def __init__(self, embedding_model, threshold: float = 0.85):
self.embedding = embedding_model
self.threshold = threshold
def find_synonyms(
self, term: str, candidates: list[str], top_k: int = 5
) -> list[tuple[str, float]]:
"""주어진 용어와 유사도가 높은 후보 표현을 반환한다."""
term_vec = self.embedding.embed_query(term)
candidate_vecs = self.embedding.embed_documents(candidates)
similarities = []
for i, cand_vec in enumerate(candidate_vecs):
sim = np.dot(term_vec, cand_vec) / (
np.linalg.norm(term_vec) * np.linalg.norm(cand_vec)
)
if sim >= self.threshold:
similarities.append((candidates[i], round(float(sim), 4)))
return sorted(similarities, key=lambda x: x[1], reverse=True)[:top_k]
def build_synonym_dict(
self, domain_terms: list[str], common_terms: list[str]
) -> dict[str, list[str]]:
"""전문 용어 ↔ 일상 용어 매핑 사전을 자동 구축한다."""
synonym_dict = {}
for term in domain_terms:
synonyms = self.find_synonyms(term, common_terms)
if synonyms:
synonym_dict[term] = [s[0] for s in synonyms]
return synonym_dict
# 사용 예시
embedding = OpenAIEmbeddings(model="text-embedding-3-large")
discovery = AutoSynonymDiscovery(embedding, threshold=0.82)
domain_terms = ["근로소득자", "사업소득", "종합소득세", "원천징수"]
common_terms = ["직장인", "회사원", "월급쟁이", "프리랜서 수입", "사업 수익",
"세금 신고", "세금", "월급에서 빠지는 세금"]
synonym_map = discovery.build_synonym_dict(domain_terms, common_terms)
# {"근로소득자": ["직장인", "회사원", "월급쟁이"], "원천징수": ["월급에서 빠지는 세금"], ...}
9.2.2. 피드백 기반 사전 업데이트
import json
from datetime import datetime
class AdaptiveKeywordDict:
"""검색 실패 사례를 분석하여 키워드 사전을 자동으로 보강한다."""
def __init__(self, dict_path: str = "keyword_dict.json"):
self.dict_path = dict_path
self.dictionary = self._load()
self.failed_queries = []
def _load(self) -> dict:
try:
with open(self.dict_path, "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
return {}
def save(self):
with open(self.dict_path, "w", encoding="utf-8") as f:
json.dump(self.dictionary, f, ensure_ascii=False, indent=2)
def log_failed_query(self, query: str, retrieved_docs: list, user_feedback: str = "irrelevant"):
"""검색 실패 사례를 기록한다."""
self.failed_queries.append({
"query": query,
"timestamp": datetime.now().isoformat(),
"doc_count": len(retrieved_docs),
"feedback": user_feedback,
})
def suggest_new_mappings(self, llm) -> list[dict]:
"""축적된 실패 사례를 LLM으로 분석하여 새로운 매핑을 제안한다."""
if not self.failed_queries:
return []
failed_queries_text = "\n".join(
f"- {fq['query']}" for fq in self.failed_queries[-20:] # 최근 20건
)
current_dict_text = "\n".join(
f"- {k} → {v}" for k, v in self.dictionary.items()
)
prompt = f"""다음은 RAG 시스템에서 검색 실패한 질의 목록입니다:
{failed_queries_text}
현재 키워드 사전:
{current_dict_text}
실패 원인을 분석하고, 키워드 사전에 추가해야 할 매핑을 JSON 배열로 제안하세요.
형식: [{{"일상어": "...", "전문어": "...", "이유": "..."}}]"""
response = llm.invoke(prompt)
# 응답 파싱 후 반환
return json.loads(response.content)
9.3. Query Transformation 기법
- 사용자의 원본 질문을 그대로 벡터 검색에 사용하면 최적의 결과를 얻기 어렵다. Query Transformation은 질문을 검색에 유리한 형태로 변환하여 검색 재현율(Recall)과 정밀도(Precision)를 높이는 기법이다.
9.3.1. Multi-Query 생성
- 하나의 질문을 여러 관점에서 재작성하여 각각 검색하고, 결과를 합산한다.
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda
multi_query_prompt = ChatPromptTemplate.from_template(
"당신은 검색 질의 생성 전문가입니다.\n"
"아래 사용자 질문을 3가지 다른 관점에서 재작성하세요.\n"
"각 질의는 원래 질문과 같은 정보를 찾되, 다른 표현과 키워드를 사용하세요.\n"
"한 줄에 하나씩, 번호 없이 작성하세요.\n\n"
"원본 질문: {question}\n\n"
"재작성 질의:"
)
def parse_queries(text: str) -> list[str]:
return [q.strip() for q in text.strip().split("\n") if q.strip()]
multi_query_chain = multi_query_prompt | llm | StrOutputParser() | RunnableLambda(parse_queries)
def multi_query_retrieve(question: str, retriever, k_per_query: int = 4) -> list:
"""Multi-Query로 검색하고 중복을 제거하여 반환한다."""
queries = multi_query_chain.invoke({"question": question})
queries.append(question) # 원본 질문도 포함
all_docs = []
seen_contents = set()
for query in queries:
docs = retriever.invoke(query)
for doc in docs:
content_hash = hash(doc.page_content[:200])
if content_hash not in seen_contents:
seen_contents.add(content_hash)
all_docs.append(doc)
return all_docs
# 예시: "직장인 연말정산 공제 항목"
# → "근로소득자가 연말정산에서 공제받을 수 있는 항목은 무엇인가요?"
# → "연말정산 시 세액공제와 소득공제의 종류를 알려주세요"
# → "직장인이 세금 환급을 받기 위한 공제 대상은?"
9.3.2. Sub-Question 분해
- 복합 질문을 단순한 하위 질문으로 분해하여 각각 검색한다.
decompose_prompt = ChatPromptTemplate.from_template(
"다음 질문을 답변에 필요한 하위 질문 2-4개로 분해하세요.\n"
"각 하위 질문은 독립적으로 검색 가능해야 합니다.\n"
"한 줄에 하나씩 작성하세요.\n\n"
"질문: {question}\n\n"
"하위 질문:"
)
# 예시 입력: "총급여 7000만원인 직장인의 소득세를 계산하고, 가능한 공제 항목도 알려줘"
# 분해 결과:
# 1. "총급여 7000만원에 해당하는 근로소득공제 금액은?"
# 2. "근로소득세 과세표준별 세율은?"
# 3. "근로소득자가 적용받을 수 있는 소득공제 항목은?"
# 4. "근로소득자가 적용받을 수 있는 세액공제 항목은?"
9.3.3. Step-Back Abstraction
- 구체적인 질문을 한 단계 추상화하여 더 넓은 범위의 문서를 검색한다.
stepback_prompt = ChatPromptTemplate.from_template(
"다음 질문을 한 단계 추상화하여 더 일반적인 질문으로 변환하세요.\n"
"구체적인 수치나 조건을 제거하고, 해당 주제의 전반적인 내용을 다루는 질문으로 바꾸세요.\n\n"
"구체적 질문: {question}\n"
"추상화된 질문:"
)
# 예시: "연봉 5000만원 직장인의 소득세율은?" → "근로소득세의 과세표준과 세율 체계는?"
# 예시: "2024년 연말정산 의료비 공제 한도는?" → "연말정산 소득공제와 세액공제의 종류와 한도는?"
9.3.4. HyDE (Hypothetical Document Embeddings)
- 질문 대신 **가상의 답변 문서**를 생성하고, 그 임베딩으로 검색한다. 질문과 답변 사이의 임베딩 공간 차이를 줄여 검색 품질을 높인다.
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
hyde_prompt = ChatPromptTemplate.from_template(
"다음 질문에 대한 답변이 포함된 문서의 일부를 작성하세요.\n"
"실제 문서처럼 전문적인 톤으로, 3-5문장으로 작성하세요.\n"
"정확한 답을 알 필요는 없으며, 문서 스타일만 모방하면 됩니다.\n\n"
"질문: {question}\n\n"
"가상 문서:"
)
hyde_chain = hyde_prompt | llm | StrOutputParser()
def hyde_retrieve(question: str, retriever, embedding_model) -> list:
"""HyDE 기법으로 검색한다.
1. 질문으로부터 가상 답변 문서를 생성
2. 가상 문서의 임베딩으로 벡터 검색 수행
3. 실제 문서를 반환
효과: 질문-문서 간 의미적 간극(semantic gap)을 줄여
질문만으로는 찾기 어려운 관련 문서를 검색 가능
"""
# 가상 답변 문서 생성
hypothetical_doc = hyde_chain.invoke({"question": question})
# 가상 문서의 임베딩으로 검색
hyde_vector = embedding_model.embed_query(hypothetical_doc)
# 벡터 DB에서 유사 문서 검색 (similarity_search_by_vector)
docs = retriever.vectorstore.similarity_search_by_vector(hyde_vector, k=4)
return docs
# 예시: "세금 많이 내는 방법 없나요?"
# → 가상 문서: "근로소득세를 절감하기 위해서는 소득공제와 세액공제를 최대한
# 활용해야 합니다. 대표적인 절세 방법으로는 연금저축 납입, 의료비 공제,
# 교육비 공제 등이 있으며..."
# → 이 가상 문서의 임베딩이 실제 절세 관련 조항과 높은 유사도를 가짐
9.3.4.1. Query Transformation 기법 비교:
| 기법 | 검색 호출 수 | 추가 LLM 호출 | 지연 시간 증가 | 최적 사용 시나리오 | Recall 향상 |
| Multi-Query | 3~5회 | 1회 (질의 생성) | +1~2초 | 다양한 표현으로 같은 정보를 검색해야 할 때 | 20~40% |
| Sub-Question | 2~4회 | 1회 (분해) | +1~2초 | 복합 질문을 단순 질문들로 나눠 검색해야 할 때 | 15~30% |
| Step-Back | 2회 (원본+추상화) | 1회 (추상화) | +1초 | 너무 구체적인 질문으로 인해 검색이 어려울 때 | 10~25% |
| HyDE | 1회 | 1회 (가상 문서 생성) | +1~2초 | 질문 스타일과 문서 스타일의 차이가 클 때 | 15~35% |
9.3.5. Query 라우팅 (Intent Classification)
- 질문의 의도에 따라 서로 다른 검색 전략이나 데이터소스를 선택한다.
from langchain_core.runnables import RunnableBranch
router_prompt = ChatPromptTemplate.from_template(
"다음 질문의 의도를 분류하세요.\n"
"카테고리: [세율조회, 계산요청, 절차안내, 용어설명, 기타]\n"
"카테고리 이름만 답하세요.\n\n"
"질문: {question}\n카테고리:"
)
router_chain = router_prompt | llm | StrOutputParser()
# 의도별 다른 검색 전략 적용
tax_rate_chain = build_chain("v2_structured") # 정확한 수치가 필요
calculation_chain = build_chain("v3_cot") # 단계별 추론이 필요
procedure_chain = build_chain("v2_structured") # 순서대로 설명이 필요
def route_query(question: str):
"""질문 의도에 따라 최적의 체인으로 라우팅한다."""
intent = router_chain.invoke({"question": question}).strip()
intent_chain_map = {
"세율조회": tax_rate_chain,
"계산요청": calculation_chain,
"절차안내": procedure_chain,
"용어설명": tax_rate_chain,
}
chain = intent_chain_map.get(intent, tax_rate_chain)
return chain.invoke(question)
9.4. Retrieval 고급 기법
9.4.1. Contextual Retrieval (Anthropic 접근법)
- 각 청크에 해당 청크가 전체 문서에서 어떤 맥락에 위치하는지 설명하는 컨텍스트를 앞에 추가한다. 이를 통해 청크만으로는 의미가 불분명한 경우를 해결한다.
def add_contextual_header(chunks: list, full_document: str, llm) -> list:
"""각 청크에 문서 내 맥락 설명을 추가한다.
Anthropic의 Contextual Retrieval 기법:
- 각 청크 앞에 "이 청크가 전체 문서에서 어떤 위치/맥락에 있는지"를 설명하는
짧은 텍스트를 prepend
- 검색 시 맥락 정보가 함께 임베딩되어 관련성 판단이 정확해짐
- Recall이 평균 20~35% 향상되는 것으로 보고됨
주의: 전체 문서를 LLM에 전달하므로, 문서 길이에 따라 비용이 증가함
"""
context_prompt = ChatPromptTemplate.from_template(
"다음은 전체 문서의 일부입니다.\n\n"
"<전체 문서>\n{full_document}\n</전체 문서>\n\n"
"아래 청크가 전체 문서에서 어떤 맥락에 위치하는지 "
"2-3문장으로 간결하게 설명하세요.\n\n"
"<청크>\n{chunk}\n</청크>\n\n"
"맥락 설명:"
)
context_chain = context_prompt | llm | StrOutputParser()
enriched_chunks = []
for chunk in chunks:
context = context_chain.invoke({
"full_document": full_document[:10000], # 토큰 절약 위해 앞부분만
"chunk": chunk.page_content[:1000],
})
# 맥락 설명을 청크 앞에 추가
chunk.page_content = f"[맥락: {context}]\n\n{chunk.page_content}"
enriched_chunks.append(chunk)
return enriched_chunks
# 예시 결과:
# 원본 청크: "1,400만원 이하 | 6% | - | 1,400만원 초과 ~ 5,000만원 이하 | 15% | 126만원"
# 보강 후: "[맥락: 이 청크는 소득세법의 과세표준별 세율 테이블로, 근로소득세
# 계산 시 과세표준에 따라 적용되는 세율과 누진공제액을 정리한 것입니다.]
# 1,400만원 이하 | 6% | - | 1,400만원 초과 ~ 5,000만원 이하 | 15% | 126만원"
9.4.2. Sentence Window Retrieval
- 작은 청크로 검색의 정밀도를 높이고, 답변 생성 시에는 주변 문맥을 포함한 큰 윈도우를 전달한다.
class SentenceWindowRetriever:
"""작은 청크로 검색하고, 주변 문맥을 포함한 큰 윈도우를 반환한다.
검색 정밀도와 생성 품질을 동시에 최적화하는 전략:
- 검색: 작은 청크 (1-2문장) → 높은 정밀도
- 생성: 큰 윈도우 (검색된 청크 ± N개 청크) → 충분한 문맥
"""
def __init__(self, chunks: list, window_size: int = 2):
"""
Args:
chunks: 원본 문서를 작게 분할한 청크 리스트 (순서 유지)
window_size: 검색된 청크 전후로 포함할 청크 수
"""
self.chunks = chunks
self.window_size = window_size
# 각 청크에 인덱스 메타데이터 추가
for i, chunk in enumerate(self.chunks):
chunk.metadata["chunk_index"] = i
def expand_window(self, retrieved_chunks: list) -> list:
"""검색된 청크를 기준으로 전후 윈도우를 확장한다."""
expanded = []
seen_indices = set()
for chunk in retrieved_chunks:
center_idx = chunk.metadata["chunk_index"]
start = max(0, center_idx - self.window_size)
end = min(len(self.chunks), center_idx + self.window_size + 1)
for i in range(start, end):
if i not in seen_indices:
seen_indices.add(i)
expanded.append(self.chunks[i])
# 인덱스 순으로 정렬하여 문서 순서 유지
expanded.sort(key=lambda c: c.metadata["chunk_index"])
return expanded
9.4.3. Auto-Merging Retrieval
- 계층적으로 분할된 문서에서 하위 청크가 일정 비율 이상 검색되면 상위 청크로 자동 병합한다.
class AutoMergingRetriever:
"""하위 청크의 검색 비율에 따라 상위 청크로 자동 병합한다.
문서를 계층적으로 분할 (예: 섹션 → 단락 → 문장):
- 검색: 문장 단위로 수행 (높은 정밀도)
- 병합: 같은 단락에서 50% 이상의 문장이 검색되면 단락 전체를 사용
- 효과: 단편적인 정보 대신 완전한 맥락을 LLM에 전달
"""
def __init__(self, parent_chunks: list, child_chunks: list, merge_threshold: float = 0.5):
self.parent_chunks = parent_chunks
self.child_chunks = child_chunks
self.merge_threshold = merge_threshold
# 부모-자식 매핑 구축
self.parent_to_children = {}
for child in child_chunks:
parent_id = child.metadata.get("parent_id")
if parent_id not in self.parent_to_children:
self.parent_to_children[parent_id] = []
self.parent_to_children[parent_id].append(child)
def merge(self, retrieved_children: list) -> list:
"""검색된 자식 청크를 분석하여 병합 여부를 결정한다."""
# 부모별로 검색된 자식 수 집계
parent_hit_count = {}
for child in retrieved_children:
parent_id = child.metadata.get("parent_id")
parent_hit_count[parent_id] = parent_hit_count.get(parent_id, 0) + 1
result = []
processed_parents = set()
for child in retrieved_children:
parent_id = child.metadata.get("parent_id")
if parent_id in processed_parents:
continue
total_children = len(self.parent_to_children.get(parent_id, []))
hit_count = parent_hit_count.get(parent_id, 0)
if total_children > 0 and (hit_count / total_children) >= self.merge_threshold:
# 병합: 부모 청크 사용
parent = next(
(p for p in self.parent_chunks
if p.metadata.get("chunk_id") == parent_id), None
)
if parent:
result.append(parent)
processed_parents.add(parent_id)
else:
# 병합하지 않음: 자식 청크 그대로 사용
result.append(child)
return result
9.4.4. Knowledge Graph 강화 검색
- 벡터 검색과 지식 그래프를 결합하여 엔티티 간 관계를 활용한 검색을 수행한다.
class KnowledgeGraphRetriever:
"""간단한 지식 그래프를 활용하여 벡터 검색을 보강한다.
세법 도메인 예시:
- 근로소득 → [관련 개념] → 근로소득공제, 원천징수, 연말정산
- 소득공제 → [하위 유형] → 인적공제, 특별공제, 기타공제
- 과세표준 → [계산순서] → 총급여 → 근로소득금액 → 과세표준
검색 흐름:
1. 질문에서 엔티티 추출
2. 지식 그래프에서 관련 엔티티 탐색 (1-2홉)
3. 관련 엔티티를 검색 질의에 추가
4. 벡터 검색 수행
"""
def __init__(self):
# 간단한 인접 리스트 기반 그래프
self.graph = {
"근로소득": {
"관련개념": ["근로소득공제", "원천징수", "연말정산", "근로소득자"],
"상위개념": ["소득"],
"하위개념": ["급여", "상여금", "수당"],
},
"소득공제": {
"하위유형": ["인적공제", "특별소득공제", "기타소득공제"],
"관련개념": ["과세표준", "소득금액"],
},
# ... 도메인별로 확장
}
def expand_query(self, entities: list[str], hops: int = 1) -> list[str]:
"""엔티티로부터 N홉 이내의 관련 개념을 탐색한다."""
expanded = set(entities)
current = set(entities)
for _ in range(hops):
next_level = set()
for entity in current:
if entity in self.graph:
for relation, targets in self.graph[entity].items():
next_level.update(targets)
expanded.update(next_level)
current = next_level
return list(expanded)
9.5. Post-Retrieval 최적화
- 검색된 문서를 LLM에 전달하기 전에 후처리하여 답변 품질을 높이는 기법들이다.
9.5.1. Context Reordering (Lost in the Middle 해결)
- LLM은 긴 컨텍스트의 **시작과 끝에 위치한 정보는 잘 활용하지만, 중간에 위치한 정보는 놓치는 경향**이 있다 (Lost in the Middle 현상). 가장 관련성 높은 문서를 시작과 끝에 배치하여 이 문제를 완화한다.
def reorder_for_llm(docs: list) -> list:
"""검색 결과를 Lost in the Middle 패턴에 맞게 재배치한다.
배치 전략 (관련성 높은 순서: 1 > 2 > 3 > 4 > 5):
- 변경 전: [1, 2, 3, 4, 5] (관련성 내림차순)
- 변경 후: [1, 3, 5, 4, 2] (높음-낮음-높음 패턴)
이유: LLM이 시작과 끝의 정보에 더 집중하므로,
가장 중요한 문서를 양 끝에 배치하여 활용률을 극대화한다.
"""
if len(docs) <= 2:
return docs
# 홀수 인덱스(높은 관련도)는 앞에, 짝수 인덱스(낮은 관련도)는 뒤에 역순 배치
reordered = []
for i in range(0, len(docs), 2):
reordered.append(docs[i]) # 높은 관련도
for i in range(len(docs) - 1 if len(docs) % 2 == 0 else len(docs) - 2, 0, -2):
reordered.append(docs[i]) # 낮은 관련도 (역순)
return reordered
9.5.2. 중복 제거 및 관련성 필터링
from langchain_openai import OpenAIEmbeddings
import numpy as np
def deduplicate_and_filter(
docs: list,
embedding_model,
query: str,
similarity_threshold: float = 0.3,
dedup_threshold: float = 0.95,
) -> list:
"""검색 결과에서 중복을 제거하고 관련성이 낮은 문서를 필터링한다.
Args:
docs: 검색된 문서 리스트
embedding_model: 임베딩 모델
query: 원본 질문
similarity_threshold: 이 값 이하의 유사도를 가진 문서 제거 (0~1)
dedup_threshold: 이 값 이상의 유사도를 가진 문서 쌍은 중복으로 판단 (0~1)
"""
if not docs:
return []
query_vec = embedding_model.embed_query(query)
doc_vecs = embedding_model.embed_documents([d.page_content for d in docs])
# 1단계: 관련성 필터링
relevant_docs = []
for i, (doc, doc_vec) in enumerate(zip(docs, doc_vecs)):
sim = np.dot(query_vec, doc_vec) / (
np.linalg.norm(query_vec) * np.linalg.norm(doc_vec)
)
if sim >= similarity_threshold:
doc.metadata["relevance_score"] = round(float(sim), 4)
relevant_docs.append(doc)
# 2단계: 중복 제거 (유사도가 높은 문서 쌍에서 하나만 유지)
filtered = []
for doc in relevant_docs:
is_dup = False
doc_vec = embedding_model.embed_query(doc.page_content)
for existing in filtered:
existing_vec = embedding_model.embed_query(existing.page_content)
sim = np.dot(doc_vec, existing_vec) / (
np.linalg.norm(doc_vec) * np.linalg.norm(existing_vec)
)
if sim >= dedup_threshold:
is_dup = True
break
if not is_dup:
filtered.append(doc)
return filtered
9.5.3. 컨텍스트 요약 (대량 문서 처리)
async def summarize_context(docs: list, question: str, llm, max_tokens: int = 4000) -> str:
"""검색된 문서를 질문에 관련된 핵심만 요약하여 토큰을 절약한다.
사용 시점:
- 검색된 문서의 총 토큰이 컨텍스트 예산을 초과할 때
- k 값이 크거나 (>6) 청크 크기가 클 때 (>2000자)
- 비용 최적화가 필요할 때
토큰 절약 효과: 평균 60~70% 감소
답변 품질 트레이드오프: 세부 정보 손실 가능 (5~10% 품질 저하)
"""
import tiktoken
enc = tiktoken.encoding_for_model("gpt-4o")
total_tokens = sum(len(enc.encode(d.page_content)) for d in docs)
if total_tokens <= max_tokens:
# 예산 내라면 요약 없이 그대로 반환
return "\n\n---\n\n".join(d.page_content for d in docs)
# 예산 초과 시 각 문서를 개별 요약
summary_prompt = ChatPromptTemplate.from_template(
"다음 문서에서 '{question}'과 관련된 핵심 정보만 3-5문장으로 요약하세요.\n"
"관련 없는 내용은 제외하세요. 수치와 조항 번호는 반드시 보존하세요.\n\n"
"문서:\n{document}\n\n요약:"
)
summary_chain = summary_prompt | llm | StrOutputParser()
import asyncio
tasks = [
summary_chain.ainvoke({"question": question, "document": doc.page_content})
for doc in docs
]
summaries = await asyncio.gather(*tasks)
return "\n\n---\n\n".join(s for s in summaries if s.strip())
10. 최종 파이프라인 (통합 코드)
- 본 문서의 모든 기법을 하나의 프로덕션 파이프라인으로 통합한다. 모듈화, 타입 힌트, 에러 처리, 로깅, 설정 관리를 포함한 실무 수준의 코드이다.
10.1. 통합 코드
"""
프로덕션 RAG 파이프라인
======================
- 모듈화된 컴포넌트 설계
- 동기/비동기 인터페이스
- 에러 처리 및 폴백
- 구조화된 로깅
- 설정 기반 구성
"""
from __future__ import annotations
import asyncio
import logging
import os
import time
from dataclasses import dataclass, field
from typing import Optional
from dotenv import load_dotenv
from langchain_chroma import Chroma
from langchain_community.document_loaders import Docx2txtLoader
from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda, RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
# -- 로깅 설정 --
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("rag_pipeline")
# -- 설정 --
@dataclass
class RAGConfig:
"""RAG 파이프라인 설정. 환경변수 또는 설정 파일에서 로드."""
# LLM 설정
llm_model: str = "gpt-4o-mini"
llm_temperature: float = 0.0
llm_max_tokens: int = 4096
# 임베딩 설정
embedding_model: str = "text-embedding-3-large"
# 청킹 설정
chunk_size: int = 1500
chunk_overlap: int = 200
# 검색 설정
search_type: str = "similarity" # "similarity" | "mmr"
search_k: int = 4
search_score_threshold: float = 0.3
# 벡터 DB 설정
collection_name: str = "production_rag"
persist_directory: str = "./chroma_db"
# 키워드 사전
keyword_dict: dict = field(default_factory=lambda: {
"직장인": "근로소득자",
"프리랜서": "사업소득자",
"월급": "근로소득",
"세금": "소득세",
"연말정산": "근로소득세 연말정산",
"공제": "소득공제, 세액공제",
"알바": "일용근로소득자",
})
# 에러 처리
max_retries: int = 3
fallback_answer: str = "일시적인 오류가 발생했습니다. 잠시 후 다시 시도해주세요."
@classmethod
def from_env(cls) -> "RAGConfig":
"""환경변수에서 설정을 로드한다."""
return cls(
llm_model=os.getenv("RAG_LLM_MODEL", "gpt-4o-mini"),
llm_temperature=float(os.getenv("RAG_LLM_TEMPERATURE", "0")),
chunk_size=int(os.getenv("RAG_CHUNK_SIZE", "1500")),
chunk_overlap=int(os.getenv("RAG_CHUNK_OVERLAP", "200")),
search_k=int(os.getenv("RAG_SEARCH_K", "4")),
collection_name=os.getenv("RAG_COLLECTION", "production_rag"),
persist_directory=os.getenv("RAG_PERSIST_DIR", "./chroma_db"),
)
# -- RAG 응답 모델 --
@dataclass
class RAGResponse:
"""RAG 파이프라인의 구조화된 응답."""
answer: str
sources: list[dict] = field(default_factory=list)
query: str = ""
transformed_query: str = ""
latency_ms: float = 0.0
token_usage: dict = field(default_factory=dict)
error: Optional[str] = None
# -- 메인 파이프라인 클래스 --
class ProductionRAGPipeline:
"""프로덕션 수준의 RAG 파이프라인.
기능:
- 문서 로딩, 청킹, 임베딩, 벡터 DB 구축
- 키워드 사전 기반 질의 변환
- LCEL 기반 검색 + 생성
- 동기/비동기 인터페이스
- 에러 처리, 로깅, 지표 수집
"""
def __init__(self, config: RAGConfig):
self.config = config
self._init_components()
self._build_chains()
logger.info(f"RAG 파이프라인 초기화 완료 (model={config.llm_model})")
def _init_components(self):
"""LLM, 임베딩 모델, 텍스트 분할기를 초기화한다."""
load_dotenv()
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError("OPENAI_API_KEY가 설정되지 않았습니다.")
self.llm = ChatOpenAI(
model=self.config.llm_model,
temperature=self.config.llm_temperature,
max_tokens=self.config.llm_max_tokens,
)
self.embedding = OpenAIEmbeddings(model=self.config.embedding_model)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=self.config.chunk_size,
chunk_overlap=self.config.chunk_overlap,
)
self.vectorstore = None
self.retriever = None
def _build_chains(self):
"""키워드 사전 Chain과 RAG Chain을 구성한다."""
# 키워드 사전 Chain
dict_text = "\n".join(
f"- {k} -> {v}" for k, v in self.config.keyword_dict.items()
)
dict_prompt = ChatPromptTemplate.from_messages([
("system",
"당신은 질문 변환 전문가입니다. "
"아래 사전의 일상 용어를 전문 용어로 변환하세요. "
"질문의 의미와 구조는 유지하세요.\n\n"
f"사전:\n{dict_text}"),
("human", "원본 질문: {question}\n변환된 질문:"),
])
self.dict_chain = dict_prompt | self.llm | StrOutputParser()
# RAG 프롬프트
self.rag_prompt = ChatPromptTemplate.from_messages([
("system",
"당신은 전문 상담 AI입니다.\n\n"
"## 규칙\n"
"1. 아래 [참고 문서]만을 근거로 답변하세요. 사전 학습 지식은 사용하지 마세요.\n"
"2. 모든 주장에 근거 문서 번호를 [문서 N] 형식으로 표시하세요.\n"
"3. 문서에 없는 내용은 '제공된 문서에서 해당 정보를 찾을 수 없습니다'라고 답하세요.\n"
"4. 수치 계산이 필요하면 단계별로 과정을 보여주세요.\n\n"
"## 참고 문서\n{context}"),
("human", "{question}"),
])
# -- 인덱싱 --
def index_documents(self, file_paths: list[str]) -> int:
"""문서를 로드, 청킹, 임베딩하여 벡터 DB에 저장한다."""
all_chunks = []
for path in file_paths:
logger.info(f"문서 로딩: {path}")
loader = Docx2txtLoader(path)
documents = loader.load()
chunks = self.text_splitter.split_documents(documents)
# 메타데이터 보강
for i, chunk in enumerate(chunks):
chunk.metadata["source"] = os.path.basename(path)
chunk.metadata["chunk_id"] = f"{os.path.basename(path)}_{i}"
chunk.metadata["chunk_index"] = i
all_chunks.extend(chunks)
logger.info(f" -> {len(chunks)}개 청크 생성")
# 벡터 DB 구축
self.vectorstore = Chroma.from_documents(
documents=all_chunks,
embedding=self.embedding,
collection_name=self.config.collection_name,
persist_directory=self.config.persist_directory,
)
self.retriever = self.vectorstore.as_retriever(
search_type=self.config.search_type,
search_kwargs={"k": self.config.search_k},
)
self._build_rag_chain()
logger.info(f"벡터 DB 구축 완료: 총 {len(all_chunks)}개 청크")
return len(all_chunks)
def load_existing_db(self):
"""기존 벡터 DB를 로드한다 (재인덱싱 없이)."""
self.vectorstore = Chroma(
collection_name=self.config.collection_name,
persist_directory=self.config.persist_directory,
embedding_function=self.embedding,
)
self.retriever = self.vectorstore.as_retriever(
search_type=self.config.search_type,
search_kwargs={"k": self.config.search_k},
)
self._build_rag_chain()
logger.info("기존 벡터 DB 로드 완료")
def _build_rag_chain(self):
"""LCEL RAG Chain을 조립한다."""
def format_docs(docs: list[Document]) -> str:
formatted = []
for i, doc in enumerate(docs, 1):
source = doc.metadata.get("source", "N/A")
formatted.append(f"[문서 {i}] (출처: {source})\n{doc.page_content}")
return "\n\n---\n\n".join(formatted)
self.rag_chain = (
RunnableParallel(
context=(
RunnableLambda(lambda q: self.dict_chain.invoke({"question": q}))
| self.retriever
| RunnableLambda(format_docs)
),
question=RunnablePassthrough(),
)
| self.rag_prompt
| self.llm
| StrOutputParser()
)
# -- 질의 처리 --
def ask(self, question: str) -> RAGResponse:
"""동기 방식으로 질문에 답변한다."""
start_time = time.time()
try:
# 키워드 변환
transformed = self.dict_chain.invoke({"question": question})
logger.info(f"질의 변환: '{question}' -> '{transformed}'")
# RAG Chain 실행
answer = self.rag_chain.invoke(question)
latency = (time.time() - start_time) * 1000
# 소스 문서 조회 (디버깅/출처 표시용)
source_docs = self.retriever.invoke(transformed)
sources = [
{
"source": doc.metadata.get("source", "N/A"),
"chunk_id": doc.metadata.get("chunk_id", "N/A"),
"preview": doc.page_content[:150],
}
for doc in source_docs
]
logger.info(f"응답 완료: latency={latency:.0f}ms, sources={len(sources)}")
return RAGResponse(
answer=answer,
sources=sources,
query=question,
transformed_query=transformed,
latency_ms=round(latency, 1),
)
except Exception as e:
latency = (time.time() - start_time) * 1000
logger.error(f"질의 처리 실패: {e}", exc_info=True)
return RAGResponse(
answer=self.config.fallback_answer,
query=question,
latency_ms=round(latency, 1),
error=str(e),
)
async def aask(self, question: str) -> RAGResponse:
"""비동기 방식으로 질문에 답변한다."""
start_time = time.time()
try:
transformed = await self.dict_chain.ainvoke({"question": question})
answer = await self.rag_chain.ainvoke(question)
latency = (time.time() - start_time) * 1000
return RAGResponse(
answer=answer,
query=question,
transformed_query=transformed,
latency_ms=round(latency, 1),
)
except Exception as e:
latency = (time.time() - start_time) * 1000
logger.error(f"비동기 질의 처리 실패: {e}", exc_info=True)
return RAGResponse(
answer=self.config.fallback_answer,
query=question,
latency_ms=round(latency, 1),
error=str(e),
)
def stream(self, question: str):
"""스트리밍 방식으로 답변을 토큰 단위로 반환한다."""
try:
for chunk in self.rag_chain.stream(question):
yield chunk
except Exception as e:
logger.error(f"스트리밍 실패: {e}")
yield self.config.fallback_answer
# -- 실행 예시 --
if __name__ == "__main__":
# 설정 로드
config = RAGConfig.from_env()
# 파이프라인 초기화
pipeline = ProductionRAGPipeline(config)
# 문서 인덱싱 (최초 1회)
pipeline.index_documents(["./tax.docx"])
# 질의 처리
response = pipeline.ask("직장인 세금 계산 방법은?")
print(f"답변: {response.answer}")
print(f"지연: {response.latency_ms}ms")
print(f"소스: {len(response.sources)}개")
# 스트리밍
print("\n[스트리밍 답변]")
for token in pipeline.stream("소득세 과세표준 구간은?"):
print(token, end="", flush=True)
print()
10.2. 실행 흐름 정리
[ProductionRAGPipeline 실행 흐름]
1. 초기화 단계
RAGConfig.from_env() <- 환경변수에서 설정 로드
ProductionRAGPipeline(config) <- LLM, 임베딩, 체인 초기화
2. 인덱싱 단계 (오프라인, 1회)
index_documents(["tax.docx"])
|
+-- Docx2txtLoader.load() <- DOCX -> Document 객체
+-- text_splitter.split() <- 1500자 청크 + 200자 오버랩
+-- 메타데이터 보강 <- source, chunk_id, chunk_index
+-- Chroma.from_documents() <- 임베딩 -> 벡터 DB 저장
3. 질의 단계 (온라인, 매 요청)
pipeline.ask("직장인 세금은?")
|
+-- dict_chain.invoke() <- "직장인" -> "근로소득자" 변환
+-- retriever.invoke() <- 벡터 검색 (상위 4개)
+-- format_docs() <- 검색 결과 텍스트 포맷팅
+-- rag_prompt <- 컨텍스트 + 질문 -> 프롬프트 조립
+-- llm.invoke() <- GPT-4o-mini 답변 생성
+-- StrOutputParser() <- 문자열 파싱 -> RAGResponse 반환
4. 에러 처리
+-- API 오류 -> 폴백 답변 반환 + 로깅
+-- 빈 답변 -> "답변을 생성하지 못했습니다" 반환
+-- 모든 에러 -> RAGResponse.error에 기록
11. 평가 및 모니터링
- RAG 파이프라인은 배포 후에도 지속적인 평가와 모니터링이 필수적이다. "측정하지 않으면 개선할 수 없다"는 원칙은 RAG에 특히 잘 적용된다. 검색 품질, 답변 품질, 시스템 성능을 체계적으로 측정해야 한다.
11.1. RAG 평가 프레임워크
11.1.1 RAGAS (Retrieval Augmented Generation Assessment)
- RAGAS는 RAG 파이프라인을 전문적으로 평가하기 위한 프레임워크로, 검색과 생성 양쪽을 모두 평가한다.
# pip install ragas
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_precision,
context_recall,
)
from datasets import Dataset
# 평가 데이터셋 구성
eval_data = {
"question": [
"근로소득세 계산 방법은?",
"연말정산 의료비 공제 한도는?",
"종합소득세 신고 기간은?",
],
"answer": [
# RAG 파이프라인이 생성한 실제 답변
pipeline.ask("근로소득세 계산 방법은?").answer,
pipeline.ask("연말정산 의료비 공제 한도는?").answer,
pipeline.ask("종합소득세 신고 기간은?").answer,
],
"contexts": [
# 검색된 컨텍스트 (리스트의 리스트)
[doc.page_content for doc in retriever.invoke("근로소득세 계산 방법")],
[doc.page_content for doc in retriever.invoke("연말정산 의료비 공제 한도")],
[doc.page_content for doc in retriever.invoke("종합소득세 신고 기간")],
],
"ground_truth": [
# 사람이 작성한 정답 (Context Recall 계산에 사용)
"총급여액에서 근로소득공제를 적용한 후 과세표준을 산출하고, 구간별 세율을 적용하여 산출세액을 계산한다.",
"총급여 7000만원 이하인 경우 의료비 공제 한도는 700만원이다.",
"종합소득세 신고 기간은 매년 5월 1일부터 5월 31일까지이다.",
],
}
dataset = Dataset.from_dict(eval_data)
# 평가 실행
result = evaluate(
dataset=dataset,
metrics=[faithfulness, answer_relevancy, context_precision, context_recall],
)
print(result)
# {'faithfulness': 0.92, 'answer_relevancy': 0.88, 'context_precision': 0.85, 'context_recall': 0.90}
11.2. 평가 메트릭 상세
| 메트릭 | 정의 | 측정 방법 | 목표값 | 목표 미달 시 개선 방법 |
| Faithfulness (충실도) | 답변의 모든 주장이 검색된 컨텍스트에 근거하는 비율. 환각이 없을수록 1.0에 가까움 | LLM이 답변의 각 문장을 컨텍스트와 대조하여 근거 유무를 판단. 지지되는 문장 수 / 전체 문장 수 | >= 0.90 | 프롬프트에 근거 제한 강화, temperature=0, 인용 강제 규칙 추가 |
| Answer Relevancy (답변 관련성) | 답변이 질문에 얼마나 관련되는지. 질문에서 벗어난 답변일수록 점수 감소 | 답변에서 역으로 질문을 생성하고, 원본 질문과의 유사도를 측정 | >= 0.85 | 프롬프트에서 질문 핵심에 집중하도록 지시, Few-shot 예시 추가 |
| Context Precision (컨텍스트 정밀도) | 검색된 문서 중 실제로 답변에 유용한 문서의 비율. 불필요한 문서가 많으면 감소 | 검색된 각 문서가 정답 생성에 필요한지 LLM이 판단. 유용한 문서 수 / 총 검색 문서 수 | >= 0.80 | 임베딩 모델 품질 향상, k 값 감소, 관련성 필터링 추가, Reranking 적용 |
| Context Recall (컨텍스트 재현율) | 정답을 생성하는 데 필요한 정보가 검색 결과에 포함된 비율 | 정답의 각 문장에 대해 뒷받침하는 컨텍스트가 있는지 확인 | >= 0.85 | k 값 증가, Multi-Query 적용, 청킹 전략 개선, HyDE 적용 |
| Answer Correctness (답변 정확도) | 답변이 정답과 얼마나 일치하는지. 사실적 정확성을 측정 | 정답과 답변의 의미적 유사도 + 사실 일치 여부 종합 점수 | >= 0.80 | 전체 파이프라인 종합 개선 (검색 + 생성 모두) |
| Hallucination Rate (환각률) | 컨텍스트에 없는 정보가 답변에 포함된 비율 | 1 - Faithfulness. 환각 문장 수를 직접 카운트하여 검증 가능 | <= 0.10 | 5계층 환각 방지 프롬프트 적용, 답변 후처리 검증 추가 |
11.3. LangSmith 활용
11.3.1. 트레이싱 설정
import os
# LangSmith 트레이싱 활성화 (환경변수)
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "ls-..."
os.environ["LANGCHAIN_PROJECT"] = "tax-rag-production"
# 파이프라인 실행 시 자동으로 LangSmith에 트레이스가 기록됨
# 각 트레이스에는 다음 정보가 포함:
# - 질문 원본 및 변환 결과
# - 검색된 문서 내용
# - 프롬프트 전문
# - LLM 응답 (토큰 단위)
# - 각 단계의 지연 시간
# - 토큰 사용량 및 비용
11.3.2. 평가 데이터셋 구축
from langsmith import Client
client = Client()
# 평가 데이터셋 생성
dataset = client.create_dataset(
"tax-rag-eval-v1",
description="세법 RAG 파이프라인 평가 데이터셋 (50건)"
)
# 평가 케이스 추가
eval_cases = [
{
"input": {"question": "근로소득세 계산 방법은?"},
"expected_output": {
"answer_contains": ["과세표준", "세율", "근로소득공제"],
"should_not_contain": ["제공된 문서에서 해당 정보를 찾을 수 없습니다"],
}
},
{
"input": {"question": "비트코인 세금은?"},
"expected_output": {
"answer_contains": ["찾을 수 없습니다"],
"should_not_contain": ["과세", "세율"], # 환각 검증
}
},
# ... 50건 이상 구축 권장
]
for case in eval_cases:
client.create_example(
inputs=case["input"],
outputs=case["expected_output"],
dataset_id=dataset.id,
)
11.3.3. 자동화된 평가 파이프라인
from langsmith.evaluation import evaluate as ls_evaluate
def answer_correctness_evaluator(run, example):
"""답변에 필수 키워드가 포함되어 있는지 확인한다."""
answer = run.outputs.get("answer", "")
expected = example.outputs
# 포함해야 할 키워드 확인
must_contain = expected.get("answer_contains", [])
contains_all = all(kw in answer for kw in must_contain)
# 포함하지 않아야 할 키워드 확인 (환각 검증)
must_not_contain = expected.get("should_not_contain", [])
excludes_all = all(kw not in answer for kw in must_not_contain)
return {
"key": "answer_correctness",
"score": 1.0 if (contains_all and excludes_all) else 0.0,
"comment": f"contains={contains_all}, excludes={excludes_all}",
}
def latency_evaluator(run, example):
"""응답 시간이 SLA(5초) 이내인지 확인한다."""
latency_s = (run.end_time - run.start_time).total_seconds()
return {
"key": "latency_ok",
"score": 1.0 if latency_s < 5.0 else 0.0,
"comment": f"latency={latency_s:.2f}s",
}
# 평가 실행
def run_rag_for_eval(inputs: dict) -> dict:
response = pipeline.ask(inputs["question"])
return {"answer": response.answer}
results = ls_evaluate(
run_rag_for_eval,
data="tax-rag-eval-v1",
evaluators=[answer_correctness_evaluator, latency_evaluator],
experiment_prefix="v2-cot-prompt",
)
print(f"정확도: {results['answer_correctness']:.2%}")
print(f"SLA 준수: {results['latency_ok']:.2%}")
11.4. 프로덕션 모니터링
11.4.1. 핵심 모니터링 지표
| 지표 카테고리 | 메트릭 | 측정 방법 | 알림 기준 | 대응 방법 |
| 지연 시간 | p50 latency | 전체 요청의 50번째 백분위 응답 시간 | > 3초 | LLM 모델 경량화, 캐싱 적용 |
| 지연 시간 | p95 latency | 전체 요청의 95번째 백분위 응답 시간 | > 8초 | 검색 k 값 감소, 컨텍스트 압축 |
| 지연 시간 | p99 latency | 전체 요청의 99번째 백분위 응답 시간 | > 15초 | API 타임아웃 설정, 폴백 체인 활성화 |
| 오류율 | error_rate | 에러 응답 수 / 전체 요청 수 | > 1% | API 키 확인, Rate Limit 모니터링, 재시도 로직 점검 |
| 품질 | hallucination_rate | 일일 샘플링(50건) 기반 환각 비율 측정 | > 10% | 프롬프트 점검, 검색 품질 확인, temperature 조정 |
| 품질 | empty_answer_rate | "찾을 수 없습니다" 유형 답변 비율 | > 30% | 검색 k 증가, 청킹 전략 변경, 키워드 사전 보강 |
| 비용 | cost_per_query | (입력 토큰 + 출력 토큰) * 단가 기준 건당 비용 | 건당 > $0.01 | 모델 경량화, 컨텍스트 압축, 캐싱 |
| 비용 | daily_cost | 일일 총 API 비용 | 일 > $10 | 트래픽 분석, 불필요한 호출 제거, 캐시 히트율 개선 |
| 사용자 | thumbs_up_rate | 사용자 긍정 피드백 비율 | < 70% | 전체 파이프라인 품질 재점검, 프롬프트 A/B 테스트 |
| 시스템 | cache_hit_rate | 캐시에서 응답한 비율 | < 20% | 캐시 TTL 조정, 질문 정규화 로직 추가 |
11.4.2. 모니터링 구현 예시
import time
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class MetricsCollector:
"""RAG 파이프라인 지표 수집기."""
latencies: list[float] = field(default_factory=list)
errors: int = 0
total_requests: int = 0
costs: list[float] = field(default_factory=list)
empty_answers: int = 0
feedback_positive: int = 0
feedback_total: int = 0
def record_request(self, latency_ms: float, cost: float, is_error: bool, is_empty: bool):
self.total_requests += 1
self.latencies.append(latency_ms)
self.costs.append(cost)
if is_error:
self.errors += 1
if is_empty:
self.empty_answers += 1
def record_feedback(self, is_positive: bool):
self.feedback_total += 1
if is_positive:
self.feedback_positive += 1
def get_percentile(self, percentile: float) -> float:
if not self.latencies:
return 0.0
sorted_lat = sorted(self.latencies)
idx = int(len(sorted_lat) * percentile / 100)
return sorted_lat[min(idx, len(sorted_lat) - 1)]
def report(self) -> dict:
return {
"total_requests": self.total_requests,
"p50_latency_ms": round(self.get_percentile(50), 1),
"p95_latency_ms": round(self.get_percentile(95), 1),
"p99_latency_ms": round(self.get_percentile(99), 1),
"error_rate": round(self.errors / max(self.total_requests, 1) * 100, 2),
"empty_answer_rate": round(self.empty_answers / max(self.total_requests, 1) * 100, 2),
"total_cost": round(sum(self.costs), 4),
"avg_cost_per_query": round(sum(self.costs) / max(self.total_requests, 1), 6),
"satisfaction_rate": round(
self.feedback_positive / max(self.feedback_total, 1) * 100, 1
),
}
# 사용 예시
metrics = MetricsCollector()
response = pipeline.ask("소득세율은?")
estimated_cost = 0.005 # gpt-4o-mini 기준 대략적 추정
is_empty = "찾을 수 없습니다" in response.answer
metrics.record_request(
latency_ms=response.latency_ms,
cost=estimated_cost,
is_error=response.error is not None,
is_empty=is_empty,
)
print(metrics.report())
11.4.3. 드리프트 감지
class DriftDetector:
"""질문 분포 변화와 답변 품질 저하를 감지한다.
감지 대상:
1. 질문 분포 드리프트: 새로운 유형의 질문이 증가하는 경우
2. 검색 품질 드리프트: 평균 검색 유사도가 하락하는 경우
3. 답변 품질 드리프트: 빈 답변 비율이 증가하는 경우
"""
def __init__(self, window_size: int = 100, alert_threshold: float = 0.2):
self.window_size = window_size
self.alert_threshold = alert_threshold
self.recent_scores = []
self.baseline_avg = None
def update(self, relevance_score: float):
self.recent_scores.append(relevance_score)
if len(self.recent_scores) == self.window_size and self.baseline_avg is None:
# 첫 윈도우를 베이스라인으로 설정
self.baseline_avg = sum(self.recent_scores) / len(self.recent_scores)
logger.info(f"드리프트 감지 베이스라인 설정: {self.baseline_avg:.3f}")
return False
if len(self.recent_scores) >= self.window_size * 2:
# 최근 윈도우의 평균과 베이스라인 비교
recent_avg = sum(self.recent_scores[-self.window_size:]) / self.window_size
drift = abs(recent_avg - self.baseline_avg) / self.baseline_avg
if drift > self.alert_threshold:
logger.warning(
f"품질 드리프트 감지! baseline={self.baseline_avg:.3f}, "
f"current={recent_avg:.3f}, drift={drift:.1%}"
)
return True
return False
12. 실무 체크리스트
- RAG 파이프라인 구축과 운영의 각 단계에서 반드시 확인해야 할 항목들을 우선순위, 빈도, 구체적 도구와 함께 정리한다.
| 단계 | 체크 항목 | 확인 방법 / 도구 | 우선순위 | 확인 빈도 |
| 환경 설정 | API Key가 .env에 저장되고 .gitignore에 포함 | grep ".env" .gitignore, GitHub Secret Scanning 활성화 | P0 | 프로젝트 초기 1회 |
| 환경 설정 | 패키지 버전 고정 (requirements.txt 또는 pyproject.toml) | pip freeze > requirements.txt 또는 uv lock | P1 | 의존성 변경 시 |
| 환경 설정 | Python 버전 고정 (.python-version) | python --version 일치 확인, pyenv/uv 사용 | P1 | 프로젝트 초기 1회 |
| 환경 설정 | LangSmith 트레이싱 환경변수 설정 | echo $LANGCHAIN_TRACING_V2 확인 | P1 | 프로젝트 초기 1회 |
| 데이터 | 문서 인코딩 정상 (한글 깨짐 없음) | 로딩 후 print(doc.page_content[:200]) 육안 확인 | P0 | 새 문서 추가 시 |
| 데이터 | 테이블/구조 데이터의 Markdown 변환 | 변환 전후 비교, LLM 기반 자동 변환 스크립트 사용 | P1 | 새 문서 추가 시 |
| 데이터 | 메타데이터 정상 추출 (source, page 등) | doc.metadata 출력하여 필수 필드 존재 확인 | P1 | 새 문서 추가 시 |
| 데이터 | 머리글/바닥글/페이지 번호 제거 | 클리닝 스크립트 실행 후 청크 샘플 10개 검사 | P2 | 새 문서 추가 시 |
| 청킹 | chunk_size와 chunk_overlap 도메인에 맞게 설정 | 청크 샘플 20개 수동 검사, 문장 잘림 여부 확인 | P0 | 최초 설정 + 품질 이슈 시 |
| 청킹 | 청크 크기 분포 정상 | [len(c.page_content) for c in chunks]의 min/max/mean/std 확인 | P1 | 최초 설정 |
| 청킹 | 의미 단위 보존 확인 | 랜덤 청크 10개의 시작/끝이 자연스러운 문장 경계인지 확인 | P1 | 최초 설정 |
| 임베딩 | 모델 선택 적절성 (한국어 지원 여부) | 한국어 테스트 질의 5건으로 검색 결과 관련성 수동 평가 | P0 | 최초 설정 |
| 임베딩 | 임베딩 차원과 벡터 DB 설정 일치 | len(embedding.embed_query("test")) vs Pinecone dimension 비교 | P0 | 모델 변경 시 |
| 임베딩 | 임베딩 모델 변경 시 전체 재인덱싱 수행 | 벡터 DB 초기화 후 from_documents() 재실행 | P0 | 모델 변경 시 |
| 벡터 DB | 환경별 적절한 DB 선택 (개발: Chroma, 프로덕션: Pinecone/Weaviate) | 배포 환경에 맞는 DB 클라이언트 설정 확인 | P0 | 환경 구축 시 |
| 검색 | k 값 적절성 (기본 3~5) | 테스트 질의 10건의 검색 결과 관련성 수동 평가 | P0 | 최초 설정 + 분기별 |
| 프롬프트 | 환각 방지 규칙 포함 | 프롬프트에 "문서에 없으면 답변하지 마세요" 류의 명시적 규칙 확인 | P0 | 프롬프트 변경 시 |
| 평가 | 평가 데이터셋 최소 50건 구축 | LangSmith Dataset 생성, 다양한 질문 유형 포함 | P0 | 배포 전 |
| 평가 | Faithfulness >= 0.90 달성 | RAGAS 또는 LangSmith 평가 실행 | P0 | 배포 전 + 주 1회 |
| 보안 | 프롬프트 인젝션 방어 | 악의적 입력 테스트 (예: "이전 지시를 무시하고..."), 입력 길이 제한 | P0 | 배포 전 |
- 실무 권장: P0 항목은 배포 전 반드시 완료해야 하며, 하나라도 미충족 시 프로덕션 배포를 보류한다. P1은 배포 후 2주 이내, P2는 월 단위로 해결한다.
13. 단계별 문제 해결 가이드
- RAG 파이프라인에서 발생하는 문제는 크게 검색 품질, 생성 품질, 성능, 비용 네 카테고리로 분류된다. 문제 진단의 첫 단계는 항상 검색 결과를 직접 확인하는 것이다.
[문제 진단 의사결정 트리]
답변 품질이 낮다
|
+-- retriever.invoke(query) 결과 확인
|
+-- 검색 결과가 관련 없음 --> [13.1 검색 품질 문제]
| |
| +-- 질의 표현이 문서와 다름 --> 키워드 사전, Query Transformation
| +-- 청크에 필요한 정보 없음 --> 청킹 전략 변경
| +-- 임베딩 품질 부족 --> 임베딩 모델 변경
|
+-- 검색 결과는 관련 있음 --> [13.2 생성 품질 문제]
|
+-- 환각 포함 --> 프롬프트 강화, temperature=0
+-- 답변이 불완전 --> k 값 증가, 프롬프트 개선
+-- 형식이 안 맞음 --> 출력 형식 지정 프롬프트
13.1. 검색 품질 문제 진단
| 증상 | 가능한 원인 | 진단 방법 | 해결 방법 | 예상 효과 |
| 검색 결과가 질의와 전혀 무관 | 임베딩 모델이 도메인 용어를 이해하지 못함 | embedding.embed_query("전문용어")의 유사도 테스트. 동의어 쌍의 코사인 유사도가 0.7 이하이면 모델 한계 | 더 높은 품질의 임베딩 모델로 변경 (text-embedding-3-small -> text-embedding-3-large), 파인튜닝 검토 | 검색 Recall 20~40% 향상 |
| 일상 용어로 질문하면 검색 실패 | 사용자 질의 표현과 문서 용어 불일치 (예: "세금" vs "소득세") | 실패 질의 수집, 질의와 문서 용어 간 어휘 차이 분석 | 키워드 사전 구축 + Query Transformation (Multi-Query, HyDE) | 용어 불일치 케이스 70~90% 해결 |
| 중요한 정보가 검색에 누락됨 | chunk_size가 너무 커서 관련 정보가 불필요한 텍스트에 희석됨 | 정답이 포함된 청크의 크기와 위치 확인. 정답 문장이 청크의 5% 미만이면 희석 의심 | chunk_size 축소 (1500 -> 500~800), Sentence Window Retrieval 적용 | Context Recall 15~30% 향상 |
| 정보가 여러 청크에 분산되어 있음 | chunk_overlap이 너무 작아 청크 경계에서 정보 단절 | 인접 청크 간 내용 연속성 확인, overlap 영역에 핵심 정보가 잘리는지 점검 | chunk_overlap 증가 (10% -> 15~20%), Auto-Merging Retrieval 적용 | 정보 단절 케이스 50% 감소 |
| 유사한 내용의 문서만 반복 검색 | 기본 similarity 검색이 다양성 없이 가장 유사한 것만 반환 | 검색된 k개 문서의 내용 중복도 확인. 3개 이상이 거의 동일한 내용이면 다양성 부족 | search_type="mmr" 변경 + lambda_mult=0.7 (유사도 70%, 다양성 30%) | 다양한 관점의 문서 검색으로 Answer Correctness 향상 |
| 테이블/수치 검색이 잘 안 됨 | 비구조화 텍스트 형태의 테이블이 임베딩 시 의미가 손실됨 | 테이블 포함 청크에 대한 검색 성공률 별도 측정 | Markdown 테이블로 변환, 테이블 전용 메타데이터 추가 | 테이블 관련 검색 정확도 40~60% 향상 |
13.2. 생성 품질 문제 진단
| 증상 | 가능한 원인 | 진단 방법 | 해결 방법 | 예상 효과 |
| 문서에 없는 내용이 답변에 포함 (환각) | LLM이 컨텍스트보다 사전학습 지식을 우선 활용 | RAGAS Faithfulness 측정, 답변의 각 문장이 컨텍스트에서 발견되는지 수동 확인 | 환각 방지 5계층 프롬프트 적용, temperature=0, 인용 강제 | Faithfulness 0.7 → 0.9+ 향상 |
| 답변이 너무 짧거나 피상적 | k 값이 낮아 LLM에 전달되는 정보 부족, 또는 프롬프트가 상세 답변을 유도하지 않음 | 검색된 문서에 충분한 정보가 있는지 확인 + 프롬프트에 "상세하게" 지시가 있는지 확인 | k 값 증가 (4 → 6~8), 프롬프트에 "단계별로 상세하게 설명하세요" 추가, CoT 프롬프트 적용 | Answer Relevancy 10~20% 향상 |
| 같은 질문에 매번 다른 답변 | temperature > 0으로 설정되어 확률적 출력 | 동일 질문을 5회 반복 실행하여 답변 일관성 비교 | temperature=0 설정 + 캐싱으로 동일 질문에 동일 답변 보장 | 일관성 100% 달성 |
| 출력 형식이 불규칙 | 프롬프트에 형식 지시가 없거나 모호함 | 답변 10건의 형식을 비교하여 편차 확인 | Few-shot 예시 추가, JSON 출력 파서 사용, 구체적 형식 지시 | 형식 일관성 90%+ 달성 |
| 질문과 관련 없는 답변 | 검색 결과는 맞지만 프롬프트가 질문 핵심에 집중하지 않음 | Answer Relevancy 메트릭 측정, 프롬프트의 질문 참조 방식 검토 | 프롬프트에 "질문에 직접 답하세요" 명시, 불필요한 배경 설명 제거 지시 | Answer Relevancy 15~25% 향상 |
13.3. 성능 문제 진단
| 증상 | 가능한 원인 | 진단 방법 | 해결 방법 | 예상 효과 |
| 응답 시간 > 5초 | LLM 모델이 무거움 (GPT-4o 사용 중) | LangSmith 트레이스에서 각 단계별 소요 시간 확인. LLM 단계가 전체의 70% 이상이면 모델 문제 | gpt-4o-mini로 변경 (품질 대비 5~10배 빠름), 스트리밍으로 체감 속도 개선 | 평균 응답 시간 60~70% 감소 |
| 첫 토큰 출력까지 오래 걸림 (TTFT) | 키워드 사전 변환에 LLM 호출이 포함되어 직렬 처리됨 | TTFT(Time to First Token) 측정. 키워드 변환 단계가 1초 이상이면 병목 | 키워드 사전을 규칙 기반(정규식)으로 변경, LLM 호출 제거 | TTFT 1~2초 단축 |
| 동시 요청 처리 시 타임아웃 | OpenAI API Rate Limit 도달 | API 응답 헤더의 x-ratelimit-remaining 확인, 429 에러 빈도 모니터링 | 지수 백오프 재시도 구현, API 키 분산, 배치 처리로 요청 수 감소 | 타임아웃 90%+ 감소 |
| 벡터 검색이 느림 (> 500ms) | 벡터 DB의 인덱스 최적화 부족 또는 데이터 규모 과대 | 검색 단계 소요 시간 단독 측정, 벡터 DB 컬렉션 크기 확인 | Chroma: HNSW 파라미터 튜닝, Pinecone: Pod 스펙 업그레이드, 필요 시 메타데이터 필터링으로 검색 범위 축소 | 검색 시간 50~80% 감소 |
| 토큰 제한 초과 오류 | 검색 문서 합계가 Context Window 초과 | 검색 문서 총 토큰 수 계산: sum(len(enc.encode(d.page_content)) for d in docs) | k 값 축소, chunk_size 축소, 컨텍스트 압축 (LLMChainExtractor) 적용 | 오류 완전 해결 |
13.4. 비용 문제 진단
| 증상 | 가능한 원인 | 진단 방법 | 해결 방법 | 절감 효과 |
| 월간 API 비용이 예산 초과 | 고비용 모델(GPT-4o) 사용 + 높은 트래픽 | OpenAI Usage 대시보드에서 모델별/일별 비용 분석 | gpt-4o-mini로 변경 (입력 $0.15/1M vs $2.50/1M = 약 17배 절감), 단순 질문은 경량 모델 라우팅 | 70~90% 비용 절감 |
| 건당 비용이 $0.01 이상 | 컨텍스트 토큰이 과다 (k가 크거나 chunk_size가 큼) | 건당 평균 입력/출력 토큰 수 추적 | k 값 축소 (6 → 4), 컨텍스트 압축 적용, EmbeddingsFilter로 불필요 문서 제거 | 건당 비용 40~60% 절감 |
| 임베딩 비용이 높음 | 재인덱싱이 빈번하거나 문서가 대량 | 임베딩 API 호출 빈도와 토큰 수 추적 | 증분 인덱싱 (변경분만 재임베딩), 로컬 임베딩 모델(Sentence Transformers) 고려 | 임베딩 비용 80%+ 절감 |
| 동일 질문 반복으로 불필요 비용 | 캐싱 미적용 | 질문 로그에서 중복 질문 비율 분석 (보통 20~40% 중복) | Redis/인메모리 캐시 적용, 질문 정규화(소문자화, 불용어 제거) 후 캐시 키 생성 | 중복 비율만큼 비용 절감 (20~40%) |
14. 파이프라인 아키텍처
14.1. 기본 아키텍처 (Naive RAG)
[Naive RAG - 가장 기본적인 구조]
사용자 질문
|
v
+---------------------------+
| Retriever | <- 벡터 DB에서 코사인 유사도 기반 검색
| (similarity, k=4) | 입력: 질문 임베딩 벡터
+---------------------------+ 출력: 상위 k개 Document 객체
|
v
+---------------------------+
| Prompt + LLM | <- 검색 문서를 컨텍스트로 삽입
| (gpt-4o-mini, temp=0) | "컨텍스트를 기반으로 답변하세요"
+---------------------------+
|
v
최종 답변
[특징]
- 구현 난이도: 낮음 (50줄 이내)
- 적합한 경우: 프로토타입, 단순 Q&A
- 한계: 용어 불일치에 취약, 환각 제어 어려움
14.2. 확장 아키텍처 (Advanced RAG)
[Advanced RAG - 프로덕션 수준의 확장 구조]
사용자 질문
|
v
+===============================================+
| 질의 전처리 (Pre-Retrieval) |
| |
| +------------------+ +-------------------+ |
| | 키워드 사전 변환 | | Query Transform | |
| | (일상어->전문어) | | - Multi-Query | |
| +------------------+ | - HyDE | |
| | | - Step-Back | |
| v +-------------------+ |
| +------------------+ | |
| | Intent Router |<---------+ |
| | (질문 유형 분류) | |
| +------------------+ |
+===============================================+
|
v
+===============================================+
| 검색 (Retrieval) |
| |
| +------------------+ +-------------------+ |
| | 벡터 검색 (k=6) | | BM25 검색 (k=6) | |
| | (의미 기반) | | (키워드 기반) | |
| +------------------+ +-------------------+ |
| | | |
| v v |
| +-------------------------------------+ |
| | Reciprocal Rank Fusion (RRF) | |
| | (하이브리드 검색 결과 합산) | |
| +-------------------------------------+ |
| | |
| v |
| +-------------------------------------+ |
| | Reranker (Cross-Encoder) | |
| | (상위 k개 재순위화) | |
| +-------------------------------------+ |
+===============================================+
|
v
+===============================================+
| 후처리 (Post-Retrieval) |
| |
| +------------------+ +-------------------+ |
| | 중복 제거 | | 관련성 필터링 | |
| | (dedup >= 0.95) | | (threshold=0.3) | |
| +------------------+ +-------------------+ |
| | | |
| v v |
| +-------------------------------------+ |
| | Context Reordering | |
| | (Lost in the Middle 패턴 적용) | |
| +-------------------------------------+ |
| | |
| v |
| +-------------------------------------+ |
| | Context Compression (선택) | |
| | (토큰 예산 초과 시 요약) | |
| +-------------------------------------+ |
+===============================================+
|
v
+===============================================+
| 생성 (Generation) |
| |
| +-------------------------------------+ |
| | RAG Prompt | |
| | - 역할 설정 | |
| | - 환각 방지 5계층 | |
| | - 출력 형식 지정 | |
| | - Few-shot 예시 | |
| +-------------------------------------+ |
| | |
| v |
| +-------------------------------------+ |
| | LLM (gpt-4o-mini / gpt-4o) | |
| | streaming=True, temperature=0 | |
| +-------------------------------------+ |
| | |
| v |
| +-------------------------------------+ |
| | Self-Check (선택) | |
| | (답변의 환각 여부 자동 검증) | |
| +-------------------------------------+ |
+===============================================+
|
v
최종 답변 + 소스 문서 + 메타데이터
|
v
+---------------------------+
| 모니터링/로깅 |
| - LangSmith 트레이싱 |
| - 지표 수집 |
| - 피드백 기록 |
+---------------------------+
14.3. 프로덕션 운영 고려사항
| 항목 | 설명 | 구현 방법 / 도구 | 코드 예시 또는 설정 |
| 캐싱 | 동일/유사 질의에 대한 반복 LLM 호출 방지. 비용 절감과 응답 속도 향상을 동시에 달성 | Redis (프로덕션), InMemoryCache (개발). 질문을 정규화(소문자, 불용어 제거)한 해시를 캐시 키로 사용, TTL 24시간 권장 | from langchain_core.globals import set_llm_cache; from langchain_community.cache import RedisCache; set_llm_cache(RedisCache(redis_url="redis://localhost:6379")) |
| 모니터링 | 답변 품질, 응답 시간, 오류율, 비용을 실시간 추적하여 품질 저하를 조기 발견 | LangSmith (LLM 트레이싱 전문), Prometheus + Grafana (시스템 메트릭), Sentry (에러 추적) | LangSmith: LANGCHAIN_TRACING_V2=true, Prometheus: /metrics 엔드포인트 노출 |
| 로깅 | 질의-응답 이력을 구조화하여 저장. 디버깅, 평가 데이터 축적, 규정 준수에 활용 | 구조화된 JSON 로깅 (python-json-logger), ELK 스택으로 집계/검색 | {"timestamp": "...", "query": "...", "transformed_query": "...", "answer": "...", "latency_ms": 1200, "sources": [...]} |
| A/B 테스트 | 프롬프트, 모델, 검색 전략 변경의 효과를 통계적으로 검증 | 트래픽을 비율로 분할하여 두 버전에 할당, LangSmith Experiment로 결과 비교 | Feature Flag (LaunchDarkly, Unleash) 또는 코드 내 random.random() < 0.5로 분기 |
| 문서 업데이트 | 새 문서 추가, 기존 문서 수정/삭제를 벡터 DB에 반영 | 증분 인덱싱: 변경 파일만 감지하여 해당 청크만 갱신. 전체 재인덱싱: 주 1회 스케줄 | 증분: 파일 해시 비교 -> 변경분만 add_documents(), 삭제: delete(ids=[...]) |
| 에러 처리 | API 장애, 타임아웃, Rate Limit 등 외부 의존성 장애에 대한 복원력 확보 | 지수 백오프 재시도 (최대 3회), 폴백 체인 (GPT-4o -> GPT-4o-mini -> 캐시), Circuit Breaker 패턴 | chain.with_fallbacks([fallback_chain]), tenacity 라이브러리로 재시도 로직 |
| 보안 | 프롬프트 인젝션 공격 방어, 시스템 프롬프트 유출 방지, 민감 정보 필터링 | 입력 길이 제한 (2000자), 금지 패턴 필터링, 출력에서 시스템 프롬프트 참조 제거 | 입력 검증: if len(question) > 2000: raise ValueError(...), 출력 필터링: 정규식으로 시스템 프롬프트 문구 제거 |
| 스케일링 | 트래픽 증가에 따른 수평 확장. API 서버 다중화, 벡터 DB 스케일 아웃 | FastAPI + Uvicorn workers, Kubernetes HPA, Pinecone Serverless (자동 스케일링) | uvicorn main:app --workers 4, K8s: targetCPUUtilizationPercentage: 70 |
14.4. 확장 패턴
- RAG의 한계를 극복하기 위해 다양한 확장 패턴이 제안되고 있다. 각 패턴의 핵심 아이디어와 적합한 사용 시나리오를 정리한다.
| 패턴 | 핵심 아이디어 | 구현 복잡도 | 적합한 시나리오 | 주요 도구/라이브러리 |
| Agentic RAG | LLM이 도구(Tool)로서 검색을 호출. 검색 필요 여부, 검색 쿼리, 검색 횟수를 LLM이 자율적으로 판단. 단순 질문은 검색 없이 답변하고, 복잡한 질문은 여러 번 검색 | 높음 | 다양한 유형의 질문이 혼재하는 범용 챗봇. 검색이 항상 필요하지 않은 경우 | LangChain Agent, create_tool_calling_agent, AgentExecutor |
| Corrective RAG (CRAG) | 검색 결과의 품질을 LLM이 평가하고, 품질이 낮으면 질의를 수정하여 재검색하거나 웹 검색으로 폴백. 검색 실패를 자동으로 복구하는 자가 교정 메커니즘 | 중간 | 문서 커버리지가 불완전한 경우. 질의 표현이 다양하여 첫 검색이 실패할 가능성이 높은 경우 | LangGraph, 커스텀 검증 체인 |
| Self-RAG | LLM이 생성한 답변의 각 부분에 대해 스스로 "이 부분은 검색 결과에 근거하는가?" "이 정보는 관련 있는가?"를 판단하는 반성(Reflection) 토큰을 생성. 환각을 자가 감지하여 제거 | 높음 | 높은 신뢰성이 요구되는 법률, 의료, 금융 도메인. 환각 허용 범위가 매우 낮은 경우 | LangGraph, 커스텀 Reflection 체인 |
| Graph RAG | 문서에서 엔티티와 관계를 추출하여 지식 그래프를 구축. 벡터 검색과 그래프 탐색을 결합하여 엔티티 간 관계 기반 검색 수행. "A와 B의 관계는?" 류의 질문에 강력 | 높음 | 복잡한 엔티티 간 관계를 다루는 도메인 (법률 조항 간 참조, 조직 구조, 계보 등). 단순 유사도 검색으로는 답할 수 없는 질문이 많은 경우 | Neo4j + LangChain, Microsoft GraphRAG, langchain-neo4j |
| Multi-Modal RAG | 텍스트뿐 아니라 이미지, 차트, 테이블 등 다양한 모달리티를 함께 인덱싱하고 검색. GPT-4o Vision으로 이미지를 텍스트 설명으로 변환하여 벡터 DB에 저장 | 중간 | 이미지, 도표가 많은 기술 문서, 제품 매뉴얼, 의학 자료. 텍스트만으로 정보가 불완전한 문서 | GPT-4o Vision, Unstructured.io, CLIP embeddings |
| Conversational RAG | 대화 이력(Memory)을 관리하여 멀티턴 대화를 지원. 이전 대화 맥락을 기반으로 후속 질문을 해석하고, 대화 이력이 길어지면 요약하여 토큰 절약 | 중간 | 챗봇 형태의 RAG 서비스. 사용자가 후속 질문("위에서 말한 것 중에서...", "그럼 이 경우에는?")을 하는 경우 | ChatMessageHistory, ConversationBufferWindowMemory, create_history_aware_retriever |
# Corrective RAG (CRAG) 간소화 구현 예시
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 검색 결과 품질 평가 체인
eval_prompt = ChatPromptTemplate.from_template(
"다음 검색 결과가 질문에 답하기에 충분한 정보를 포함하고 있는지 평가하세요.\n\n"
"질문: {question}\n"
"검색 결과:\n{context}\n\n"
"평가 결과 (SUFFICIENT 또는 INSUFFICIENT만 답하세요):"
)
eval_chain = eval_prompt | llm | StrOutputParser()
def corrective_rag(question: str, retriever, rag_chain, max_retries: int = 2):
"""검색 결과가 불충분하면 질의를 수정하여 재검색한다."""
for attempt in range(max_retries + 1):
# 검색
docs = retriever.invoke(question)
context = "\n".join(d.page_content[:300] for d in docs)
# 품질 평가
quality = eval_chain.invoke({"question": question, "context": context})
if "SUFFICIENT" in quality.upper():
# 충분하면 답변 생성
return rag_chain.invoke(question)
if attempt < max_retries:
# 불충분하면 질의 수정
logger.info(f"검색 결과 불충분 (시도 {attempt+1}). 질의 수정 중...")
rewrite_prompt = ChatPromptTemplate.from_template(
"다음 질문으로 검색했지만 좋은 결과를 얻지 못했습니다.\n"
"더 효과적인 검색을 위해 질문을 다르게 표현해주세요.\n\n"
"원본 질문: {question}\n"
"수정된 질문:"
)
rewrite_chain = rewrite_prompt | llm | StrOutputParser()
question = rewrite_chain.invoke({"question": question})
logger.info(f"수정된 질의: {question}")
# 최대 시도 횟수 도달
return "충분한 정보를 찾지 못했습니다. 질문을 더 구체적으로 바꿔 시도해주세요."
15. 실무 프로젝트: 세법 상담 RAG 파이프라인 구축
15.1. 프로젝트 개요
| 항목 | 내용 |
| 목표 | 세법 관련 질의에 정확하게 답변하는 프로덕션 수준의 RAG 파이프라인 구축 |
| 시나리오 | 중소기업 회계팀에서 근로소득세, 종합소득세 관련 질의를 처리하는 내부 Q&A 시스템 |
| 학습 목표 | 본 문서의 모든 기법(Section 1~14)을 하나의 프로젝트에 통합하여 적용 |
| 사용 기술 | LangChain, ChatOpenAI, OpenAIEmbeddings, Chroma, LCEL, 키워드 사전, 환각 방지 프롬프트 |
| 대상 사용자 | 세법 비전공자 (일상 용어로 질문하는 회계팀 직원) |
| 문서 규모 | DOCX 형식 세법 문서 1~5개, 총 3만~10만자 |
15.2. 요구사항 명세
[기능 요구사항]
1. DOCX 형식의 세법 문서를 로딩하여 인덱싱
2. 키워드 사전을 통한 일상어 -> 전문어 변환 (직장인 -> 근로소득자)
3. 벡터 검색으로 관련 조항을 찾아 정확한 답변 생성
4. 답변에 근거 출처를 [문서 N] 형식으로 표시
5. 문서에 없는 내용은 답변하지 않음 (환각 방지)
6. 수치 계산 질문은 단계별 과정을 보여줌
7. 스트리밍 방식으로 실시간 답변 출력 지원
[비기능 요구사항 - SLA]
- 응답 시간: p50 < 3초, p95 < 5초, p99 < 10초
- 비용: 월 $50 이내 (gpt-4o-mini 사용)
- 답변 정확도: Faithfulness >= 0.90, Answer Relevancy >= 0.85
- 환각률: <= 10% (Hallucination Rate <= 0.10)
- 가용성: 99.5% (월 다운타임 3.6시간 이내)
- 동시 사용자: 최대 10명
15.3. 구현
"""
세법 상담 RAG 파이프라인 - 프로덕션 구현
========================================
본 문서의 모든 기법을 통합한 실무 프로젝트 코드.
"""
from __future__ import annotations
import logging
import os
import re
import time
from dataclasses import dataclass, field
from typing import Optional
from dotenv import load_dotenv
from langchain_chroma import Chroma
from langchain_community.document_loaders import Docx2txtLoader
from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda, RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("tax_rag")
# ===== Step 1: 환경 설정 =====
load_dotenv()
openai_key = os.environ.get("OPENAI_API_KEY")
if openai_key:
print(f"OPENAI_API_KEY 로드 성공: {openai_key[:5]}...")
else:
raise ValueError("OPENAI_API_KEY가 설정되지 않았습니다. .env 파일을 확인하세요.")
# ===== Step 2: LLM 초기화 =====
# 비용 효율적인 gpt-4o-mini 사용 (월 $50 이내 요구사항 충족)
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0, # 사실 기반 답변을 위해 결정적 출력
max_tokens=4096,
)
# LLM 동작 확인
test_response = llm.invoke("테스트: 1+1=?")
print(f"LLM 연결 확인: {test_response.content}")
# ===== Step 3: 문서 로딩 및 청킹 =====
loader = Docx2txtLoader("./tax.docx")
documents = loader.load()
print(f"로드된 문서 수: {len(documents)}")
print(f"문서 길이: {len(documents[0].page_content)}자")
# 법률 문서이므로 큰 청크 + 높은 오버랩
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1500,
chunk_overlap=200, # 13% 오버랩
)
chunks = text_splitter.split_documents(documents)
print(f"생성된 청크 수: {len(chunks)}")
# 메타데이터 보강
for i, chunk in enumerate(chunks):
chunk.metadata["chunk_id"] = f"tax_{i}"
chunk.metadata["chunk_index"] = i
# 내용 유형 자동 분류
text = chunk.page_content
if re.search(r"제\d+조|제\d+항", text):
chunk.metadata["content_type"] = "법조항"
elif re.search(r"\|.*\|.*\|", text):
chunk.metadata["content_type"] = "표"
else:
chunk.metadata["content_type"] = "설명"
# 청크 품질 검증
print("\n[청크 품질 검증]")
sizes = [len(c.page_content) for c in chunks]
print(f" 크기 범위: {min(sizes)}~{max(sizes)}자")
print(f" 평균 크기: {sum(sizes)/len(sizes):.0f}자")
for i, chunk in enumerate(chunks[:2]):
print(f"\n=== 청크 {i+1} ({len(chunk.page_content)}자, {chunk.metadata.get('content_type', 'N/A')}) ===")
print(f"시작: {chunk.page_content[:80]}...")
print(f"끝: ...{chunk.page_content[-80:]}")
# ===== Step 4: 임베딩 모델 초기화 =====
embedding = OpenAIEmbeddings(model="text-embedding-3-large")
# 임베딩 동작 확인
test_vector = embedding.embed_query("소득세 계산 방법")
print(f"\n임베딩 벡터 차원: {len(test_vector)}") # 3072
# ===== Step 5: 벡터 DB 구축 =====
database = Chroma.from_documents(
documents=chunks,
embedding=embedding,
collection_name="tax_qa_production",
persist_directory="./chroma_tax_qa",
)
print(f"벡터 DB 구축 완료: {len(chunks)}개 청크 인덱싱")
# 검색 테스트
test_results = database.similarity_search("소득세란 무엇인가?", k=2)
print(f"검색 테스트: {len(test_results)}개 문서")
for i, doc in enumerate(test_results):
print(f" [{i+1}] {doc.page_content[:80]}...")
# ===== Step 6: 키워드 사전 체인 구축 =====
keyword_dict = {
"직장인": "근로소득자",
"프리랜서": "사업소득자",
"월급": "근로소득",
"세금": "소득세",
"연말정산": "근로소득세 연말정산",
"공제": "소득공제, 세액공제",
"알바": "일용근로소득자",
"보험료": "사회보험료",
"퇴직금": "퇴직소득",
"집": "주택",
}
dict_text = "\n".join(f"- {k} -> {v}" for k, v in keyword_dict.items())
dict_prompt = ChatPromptTemplate.from_messages([
("system",
"당신은 질문 변환 전문가입니다. "
"아래 사전을 참고하여 질문의 일상 용어를 전문 용어로 변환하세요. "
"질문의 의미를 변경하지 마세요.\n\n"
f"사전:\n{dict_text}"),
("human", "원본 질문: {question}\n변환된 질문:"),
])
dict_chain = dict_prompt | llm | StrOutputParser()
# 변환 테스트
original_q = "직장인 세금 계산 방법 알려줘"
transformed_q = dict_chain.invoke({"question": original_q})
print(f"\n원본: {original_q}")
print(f"변환: {transformed_q}")
# ===== Step 7: Retriever 설정 =====
retriever = database.as_retriever(
search_type="similarity",
search_kwargs={"k": 4},
)
# ===== Step 8: RAG Chain 조립 (LCEL + 환각 방지 프롬프트) =====
def format_docs(docs: list[Document]) -> str:
"""검색 문서를 번호+출처 포함 텍스트로 변환."""
formatted = []
for i, doc in enumerate(docs, 1):
source = doc.metadata.get("source", "tax.docx")
content_type = doc.metadata.get("content_type", "")
header = f"[문서 {i}] (출처: {source}, 유형: {content_type})"
formatted.append(f"{header}\n{doc.page_content}")
return "\n\n---\n\n".join(formatted)
# 환각 방지 5계층 프롬프트 적용
rag_prompt = ChatPromptTemplate.from_messages([
("system",
"당신은 10년 경력의 세무사 AI 어시스턴트입니다.\n\n"
"## 1. 근거 한정\n"
"아래 [참고 문서]에 포함된 정보만 사용하여 답변하세요.\n"
"사전 학습 지식은 절대 사용하지 마세요.\n\n"
"## 2. 인용 강제\n"
"모든 주장에 [문서 N] 형식으로 출처를 표시하세요.\n\n"
"## 3. 모름 표현\n"
"- 문서에 없는 내용: '제공된 문서에서 해당 정보를 찾을 수 없습니다.'\n"
"- 불완전한 정보: '부분적인 정보만 확인됩니다: [내용]. 정확한 답변을 위해 추가 자료가 필요합니다.'\n\n"
"## 4. 계산 규칙\n"
"수치 계산이 필요하면 단계별로 과정을 보여주세요.\n\n"
"## 5. 어조\n"
"회계 비전공자도 이해할 수 있도록 쉬운 용어로 설명하되,\n"
"법률 조항 번호는 정확히 명시하세요.\n\n"
"## 참고 문서\n{context}"),
("human", "{question}"),
])
# 전체 파이프라인 LCEL 조립
rag_chain = (
RunnableParallel(
context=(
RunnableLambda(lambda q: dict_chain.invoke({"question": q}))
| retriever
| RunnableLambda(format_docs)
),
question=RunnablePassthrough(),
)
| rag_prompt
| llm
| StrOutputParser()
)
# ===== Step 9: 지표 수집기 =====
@dataclass
class Metrics:
latencies: list[float] = field(default_factory=list)
total: int = 0
errors: int = 0
empty: int = 0
def record(self, latency_ms: float, is_error: bool = False, is_empty: bool = False):
self.total += 1
self.latencies.append(latency_ms)
if is_error: self.errors += 1
if is_empty: self.empty += 1
def summary(self) -> str:
if not self.latencies:
return "데이터 없음"
sorted_lat = sorted(self.latencies)
p50 = sorted_lat[len(sorted_lat) // 2]
p95 = sorted_lat[int(len(sorted_lat) * 0.95)]
return (
f"총 {self.total}건 | p50={p50:.0f}ms | p95={p95:.0f}ms | "
f"에러율={self.errors/self.total*100:.1f}% | "
f"빈답변율={self.empty/self.total*100:.1f}%"
)
metrics = Metrics()
# ===== Step 10: 안전한 질의 함수 =====
def ask(question: str) -> str:
"""안전한 질의 함수. 에러 처리와 지표 수집을 포함한다."""
start = time.time()
try:
answer = rag_chain.invoke(question)
latency = (time.time() - start) * 1000
is_empty = "찾을 수 없습니다" in answer
metrics.record(latency, is_empty=is_empty)
logger.info(f"[{latency:.0f}ms] Q: {question[:50]}...")
return answer
except Exception as e:
latency = (time.time() - start) * 1000
metrics.record(latency, is_error=True)
logger.error(f"질의 실패: {e}", exc_info=True)
return "일시적인 오류가 발생했습니다. 잠시 후 다시 시도해주세요."
# ===== 테스트 실행 =====
test_questions = [
"직장인 세금은 어떻게 계산하나요?",
"연말정산에서 공제받을 수 있는 항목은?",
"프리랜서의 세금 신고 방법은?",
"소득세 과세표준 구간별 세율은?",
"비트코인 수익에 대한 세금은?", # 문서에 없을 가능성 높은 질문
"총급여 5000만원일 때 소득세는 얼마인가요?", # 계산 질문
]
print("\n" + "=" * 60)
print("세법 상담 RAG 파이프라인 테스트")
print("=" * 60)
for q in test_questions:
print(f"\n질문: {q}")
print("-" * 40)
answer = ask(q)
print(f"답변: {answer[:500]}...")
print()
print("\n" + "=" * 60)
print(f"성능 요약: {metrics.summary()}")
print("=" * 60)
15.3.1. 실행 결과 예시:
OPENAI_API_KEY 로드 성공: sk-ab...
LLM 연결 확인: 1+1은 2입니다.
로드된 문서 수: 1
문서 길이: 45230자
생성된 청크 수: 35
[청크 품질 검증]
크기 범위: 312~1498자
평균 크기: 1287자
임베딩 벡터 차원: 3072
벡터 DB 구축 완료: 35개 청크 인덱싱
원본: 직장인 세금 계산 방법 알려줘
변환: 근로소득자 소득세 계산 방법 알려줘
============================================================
세법 상담 RAG 파이프라인 테스트
============================================================
질문: 직장인 세금은 어떻게 계산하나요?
----------------------------------------
답변: 근로소득자의 소득세는 다음 단계로 계산됩니다:
1. **총급여액** 확인 [문서 1]
2. **근로소득공제** 적용 (총급여액 구간별 공제율) [문서 1]
3. **과세표준** 산출 (근로소득금액 - 소득공제) [문서 2]
4. **산출세액** 계산 (과세표준 x 세율) [문서 3]
5. **결정세액** 산출 (산출세액 - 세액공제) [문서 3]
[신뢰도: 높음]...
질문: 비트코인 수익에 대한 세금은?
----------------------------------------
답변: 제공된 문서에서 해당 정보를 찾을 수 없습니다.
현재 참고 문서에는 암호화폐(가상자산) 관련 세금 규정이 포함되어 있지 않습니다.
============================================================
성능 요약: 총 6건 | p50=2340ms | p95=4120ms | 에러율=0.0% | 빈답변율=16.7%
============================================================
15.4. 확장 과제
| 확장 과제 | 설명 | 난이도 | 학습 효과 | 참조 섹션 |
| 하이브리드 검색 적용 | BM25 + 벡터 검색을 RRF로 결합. 법률 조항 번호("제55조")같은 키워드 매칭 강화 | 중간 | 검색 아키텍처의 유연성 이해, 키워드 검색과 의미 검색의 상호 보완 관계 체험 | 14.2 |
| 스트리밍 답변 | rag_chain.stream()으로 답변을 토큰 단위로 실시간 출력. FastAPI SSE 연동까지 구현 | 쉬움 | LCEL의 스트리밍 인터페이스 이해, 사용자 체감 속도 개선 효과 확인 | 8.3.3 |
| 대화 이력 유지 | ChatMessageHistory로 멀티턴 대화 구현. "위에서 말한 것 중에서..." 같은 후속 질문 처리 | 중간 | 대화 컨텍스트 관리, 토큰 예산 배분, 히스토리 요약 전략 학습 | 8.2.2, 14.4 |
| RAGAS 평가 구축 | 50건 이상의 평가 데이터셋 구축, Faithfulness/Context Recall 자동 측정 파이프라인 구현 | 중간 | RAG 평가 메트릭 이해, 정량적 품질 관리 체계 수립 | 11.1, 11.2 |
| 답변 자가 검증 (Self-Check) | LLM으로 답변의 환각 여부를 자동 검증하는 후처리 단계 추가. 환각 감지 시 재생성 | 어려움 | Self-RAG 패턴 이해, 품질-비용-속도 트레이드오프 체험 | 14.4 |
| Pinecone 마이그레이션 | Chroma에서 Pinecone으로 벡터 DB 전환. 성능 비교, 인덱싱 전략 차이 분석 | 중간 | 벡터 DB의 로컬/클라우드 차이, 프로덕션 인프라 전환 경험 | 6절 |
| Corrective RAG 구현 | 검색 결과 품질 평가 + 불충분 시 자동 재검색 로직 구현 | 어려움 | CRAG 패턴의 자가 교정 메커니즘 이해, LangGraph 활용 | 14.4 |
| 프롬프트 A/B 테스트 | 2개 이상의 프롬프트 버전을 트래픽 분할로 비교, LangSmith로 정량 평가 | 중간 | 프롬프트 엔지니어링의 실험적 접근법, 데이터 기반 의사결정 | 8.4.5, 11.3 |
| 비용 최적화 | 캐싱, 컨텍스트 압축, 모델 라우팅(단순 질문은 mini, 복잡한 질문은 4o)으로 월 비용 50% 절감 | 어려움 | 비용-품질 트레이드오프 분석, 실무 비용 관리 역량 | 8.5, 13.4 |
| 모니터링 대시보드 | Streamlit으로 실시간 p50/p95/에러율/비용 대시보드 구축, 드리프트 감지 알림 | 중간 | 프로덕션 운영 모니터링 역량, 데이터 시각화 | 11.4 |
- 실무 권장: 위 과제를 난이도 순으로 진행한다. 먼저 "스트리밍 답변"과 "RAGAS 평가"로 기본기를 다지고, 이후 "하이브리드 검색", "Corrective RAG" 순으로 검색 품질을 개선한다. 마지막으로 "비용 최적화"와 "모니터링 대시보드"로 프로덕션 운영 역량을 갖춘다.
16. 참고 자료
| 자료 | 링크 | 용도 |
| LangChain 공식 문서 | https://python.langchain.com/ | LangChain 전체 가이드 |
| Chroma 공식 문서 | https://docs.trychroma.com/ | Chroma 벡터 DB 사용법 |
| Pinecone 공식 문서 | https://docs.pinecone.io/ | Pinecone 벡터 DB 사용법 |
| LangSmith | https://smith.langchain.com/ | LLM 평가 및 모니터링 |
| LangChain Hub | https://smith.langchain.com/hub | 검증된 프롬프트 템플릿 |
| OpenAI API 문서 | https://platform.openai.com/docs/ | OpenAI 모델 및 API 레퍼런스 |
| RAGAS 공식 문서 | https://docs.ragas.io/ | RAG 평가 프레임워크 |
| LangGraph 공식 문서 | https://langchain-ai.github.io/langgraph/ | 에이전트 및 그래프 기반 워크플로우 |
| Anthropic Contextual Retrieval | https://www.anthropic.com/news/contextual-retrieval | Contextual Retrieval 기법 상세 |
'Study > RAG(Retrieval-Augmented Generation)' 카테고리의 다른 글
| 11. LangChain HuggingFace 오픈소스를 활용한 프로덕션급 RAG Pipeline 구성 (Advanced) (0) | 2026.03.17 |
|---|---|
| 3. RAG 커스텀 최적화 - 3가지 시나리오별 맞춤 전략 (0) | 2026.02.22 |
| 2. RAG 단계별 기술과 OpenAI API (0) | 2026.02.18 |
| 1. RAG 파이프라인 기초 아키텍처 (0) | 2026.02.17 |
| [인프런] RAG 마스터: 기초부터 고급기법까지 (feat. LangChain) - 학습 후기 (0) | 2026.01.17 |
댓글