5편 전체 목차
벡터 데이터베이스 완전 이론 — 원리부터 수학까지
벡터 데이터베이스를 단순히 “AI용 특별한 DB”로만 이해하면 실제 운영에서 수많은 실수를 만납니다. 왜 벡터 검색이 일반 SQL 검색과 근본적으로 다른지, 그 수학적 원리를 이해해야 올바른 인덱스를 선택하고, 검색 품질을 진단하고, 성능 병목을 해결할 수 있습니다.
🧮 임베딩이란 무엇인가 — 의미를 숫자로 변환하는 원리
임베딩(Embedding)은 텍스트, 이미지, 오디오 등의 비정형 데이터를 고차원 실수 벡터 공간의 한 점으로 매핑하는 것입니다. 핵심은 “의미적으로 비슷한 내용은 벡터 공간에서 가까운 위치에 놓인다”는 것입니다. “강아지”와 “개”는 벡터 공간에서 매우 가깝고, “강아지”와 “양자역학”은 매우 멉니다.
일반적으로 높은 차원(768, 1536, 3072)은 더 풍부한 의미 표현이 가능하지만, 차원의 저주(Curse of Dimensionality) 문제가 발생합니다. 차원이 높아질수록 벡터 간 거리 분포가 균일해져 “가까운”과 “먼” 구분이 희미해집니다. 실제 RAG에서는 768~1024차원이 품질과 성능의 최적 균형점이며, 무작정 큰 모델을 쓰는 것이 항상 정답이 아닙니다.
📐 유사도 측정 방법 3가지 — 언제 무엇을 쓰는가
| 측정 방법 | 수식 | 특징 | 최적 용도 |
|---|---|---|---|
| 코사인 유사도 RAG 표준 | cos(θ) = A·B / (|A||B|) | 벡터 방향만 비교. 크기 무관. -1~1 범위 | 텍스트 의미 검색. 문서 길이가 다를 때 |
| 유클리드 거리 | d = √Σ(Aᵢ-Bᵢ)² | 공간상 실제 거리. 정규화되지 않은 벡터에 취약 | 이미지 임베딩, 정규화된 벡터 |
| 내적 (Dot Product) | A·B = Σ AᵢBᵢ | 방향 + 크기 모두 반영. 빠름 | 단위 벡터로 정규화된 임베딩 (= 코사인과 동일) |
대부분의 텍스트 임베딩 모델은 L2 정규화(단위 벡터)로 출력합니다. 이 경우 코사인 유사도 = 내적이 되어 계산이 단순해집니다. 더 중요한 것은, 코사인 유사도는 문서 길이의 영향을 받지 않습니다. 1문장짜리 메모와 100페이지 PDF의 핵심 내용이 같다면 같은 유사도를 갖습니다. 이것이 RAG에서 코사인 유사도가 표준인 핵심 이유입니다.
⚙️ ANN 인덱스 알고리즘 — HNSW vs IVF vs FLAT
수백만 개의 벡터에서 가장 유사한 것을 찾으려면 모든 벡터와 비교(Exact Search, O(n))하는 것은 너무 느립니다. 근사 최근접 이웃(Approximate Nearest Neighbor, ANN) 알고리즘은 약간의 정확도를 희생해 검색 속도를 극적으로 개선합니다.
HNSW 인덱스의 품질은 두 파라미터로 결정됩니다. m(기본값 16)은 각 노드가 유지하는 연결 수입니다. m을 높이면 정확도가 올라가지만 메모리와 빌드 시간이 증가합니다. ef_construction(기본값 100)은 인덱스 빌드 시 탐색 범위입니다. 높을수록 정확한 인덱스가 만들어지지만 빌드 속도가 느립니다. 실전 RAG에서는 m=16, ef_construction=200을 권장합니다. 인덱스는 한 번만 빌드하므로 ef_construction은 넉넉하게 설정하는 것이 좋습니다.
벡터DB 완전 비교 & 선택 가이드
2026년 현재 오픈소스 벡터 DB 생태계는 Qdrant, ChromaDB, Weaviate, Milvus, pgvector 등 다양합니다. 각각의 장단점을 명확히 이해하지 않으면 나중에 마이그레이션 비용이 발생합니다.
| DB | 언어 | 인덱스 | 필터 검색 | 스케일 | 메모리 | 추천 상황 |
|---|---|---|---|---|---|---|
| Qdrant 추천 | Rust | HNSW | 🟢 매우 강력 | 🟢 분산 클러스터 | 🟡 보통 | 프로덕션, 필터 조합 검색, AI 서버 핵심 DB |
| ChromaDB | Python | HNSW (hnswlib) | 🟡 기본 필터 | 🔴 단일 노드만 | 🟢 낮음 | 프로토타이핑, 소규모, Open WebUI 기본값 |
| pgvector | C (PostgreSQL 확장) | HNSW / IVFFlat | 🟢 SQL 풀 지원 | 🟡 PostgreSQL 수준 | 🟢 낮음 | 기존 PostgreSQL 인프라 활용, 관계형+벡터 혼합 |
| Weaviate | Go | HNSW | 🟢 GraphQL 강력 | 🟢 분산 지원 | 🔴 높음 | 멀티모달(텍스트+이미지), 엔터프라이즈 |
| Milvus | Go/C++ | 다수 지원 | 🟢 강력 | 🟢 최고 수준 | 🔴 매우 높음 | 수십억 건 초대형 벡터, 기업급 클러스터 |
Qdrant는 Rust로 작성되어 메모리 안전성과 성능을 동시에 확보합니다. 가장 결정적인 차별점은 페이로드 기반 필터링입니다. 단순히 “가장 비슷한 벡터”를 찾는 것을 넘어, “category = ‘tech’ AND date > ‘2025-01-01′”이면서 가장 유사한 벡터를 찾는 필터+벡터 하이브리드 검색이 네이티브로 지원됩니다. ChromaDB는 이 필터 검색의 성능이 매우 낮고, Qdrant는 필터를 인덱스 수준에서 처리해 수백만 건에서도 밀리초 단위 응답이 가능합니다.
Qdrant 완전 설치 & 프로덕션 설정
services:
# ── Qdrant 벡터 데이터베이스 ──────────────────────
qdrant:
image: qdrant/qdrant:latest
container_name: qdrant
restart: unless-stopped
ports:
- "6333:6333" # HTTP REST API
- "6334:6334" # gRPC (고성능 조회용)
volumes:
- qdrant_storage:/qdrant/storage
- ./qdrant_config.yaml:/qdrant/config/production.yaml
environment:
- QDRANT__SERVICE__API_KEY=your_very_secure_api_key_here
- QDRANT__SERVICE__ENABLE_TLS=false
- QDRANT__LOG_LEVEL=INFO
networks:
- ai_net
# ── PostgreSQL (n8n + Dify 공유 DB) ──────────────
postgresql:
image: pgvector/pgvector:pg16 # pgvector 확장 포함
container_name: postgresql
restart: unless-stopped
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
environment:
- POSTGRES_USER=aiserver
- POSTGRES_PASSWORD=secure_password_here
- POSTGRES_DB=aiserver_main
networks:
- ai_net
# ── Redis (캐시 + 세션) ───────────────────────────
redis:
image: redis:7-alpine
container_name: redis
restart: unless-stopped
ports:
- "6379:6379"
command: redis-server --requirepass your_redis_password
volumes:
- redis_data:/data
networks:
- ai_net
volumes:
qdrant_storage:
postgres_data:
redis_data:
networks:
ai_net:
external: true
name: ai_networkstorage:
storage_path: /qdrant/storage
snapshots_path: /qdrant/snapshots
# HNSW 인덱스 전역 기본값
hnsw_index:
m: 16 # 연결 수 (높을수록 정확, 메모리↑)
ef_construct: 200 # 빌드 시 탐색 범위 (높을수록 정확)
full_scan_threshold: 10000 # 이 수 이하면 FLAT 검색
# 성능 최적화
on_disk_payload: false # 페이로드 메모리 유지 (검색 속도↑)
service:
host: 0.0.0.0
http_port: 6333
grpc_port: 6334
max_request_size_mb: 64
# 요청 제한 (DoS 방어)
max_workers: 0 # 0 = CPU 코어 수 자동
telemetry_disabled: true # 원격 측정 비활성화 (프라이버시)🗄️ Qdrant 컬렉션 설계 & Python API 완전 활용
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance, VectorParams, PointStruct,
Filter, FieldCondition, MatchValue,
HnswConfigDiff, OptimizersConfigDiff,
PayloadSchemaType
)
from sentence_transformers import SentenceTransformer
import uuid
# 클라이언트 초기화
client = QdrantClient(
host="192.168.1.253",
port=6333,
api_key="your_very_secure_api_key_here",
timeout=60
)
embedder = SentenceTransformer("BAAI/bge-m3")
COLLECTION = "ai_knowledge_base"
VECTOR_DIM = 1024 # bge-m3의 출력 차원
## ── 컬렉션 생성 (HNSW 최적화) ─────────────────────
client.recreate_collection(
collection_name=COLLECTION,
vectors_config=VectorParams(
size=VECTOR_DIM,
distance=Distance.COSINE, # 코사인 유사도
on_disk=False, # 메모리 상주 (빠름)
),
hnsw_config=HnswConfigDiff(
m=16,
ef_construct=200,
full_scan_threshold=10_000,
on_disk=False,
),
optimizers_config=OptimizersConfigDiff(
indexing_threshold=20_000, # 이 수 이하면 인덱스 지연
memmap_threshold=50_000, # 메모리맵 전환 임계값
),
)
# 페이로드 인덱스 생성 (필터 성능 극대화)
client.create_payload_index(
collection_name=COLLECTION,
field_name="category",
field_schema=PayloadSchemaType.KEYWORD, # 정확히 일치 검색
)
client.create_payload_index(
collection_name=COLLECTION,
field_name="created_at",
field_schema=PayloadSchemaType.DATETIME,
)
## ── 벡터 삽입 (배치 처리) ──────────────────────────
def upsert_documents(docs: list[dict], batch_size: int = 100):
"""문서 목록을 배치로 임베딩 후 Qdrant 삽입"""
for i in range(0, len(docs), batch_size):
batch = docs[i:i + batch_size]
texts = [d["content"] for d in batch]
vectors = embedder.encode(texts, normalize_embeddings=True).tolist()
points = [
PointStruct(
id=str(uuid.uuid4()),
vector=vectors[j],
payload={
"title": batch[j].get("title", ""),
"content": batch[j]["content"][:1000], # 페이로드 크기 제한
"source": batch[j].get("source", ""),
"category": batch[j].get("category", "general"),
"created_at": batch[j].get("created_at", ""),
"chunk_index": batch[j].get("chunk_index", 0),
}
)
for j in range(len(batch))
]
client.upsert(collection_name=COLLECTION, points=points)
print(f"✅ 삽입 완료: {i+len(batch)}/{len(docs)}")
## ── 벡터 검색 (필터 + 유사도 하이브리드) ──────────
def search(query: str, category: str = None, top_k: int = 5) -> list:
query_vec = embedder.encode([query], normalize_embeddings=True).tolist()[0]
# 카테고리 필터 조건
search_filter = None
if category:
search_filter = Filter(
must=[FieldCondition(
key="category",
match=MatchValue(value=category)
)]
)
results = client.search(
collection_name=COLLECTION,
query_vector=query_vec,
query_filter=search_filter,
limit=top_k,
with_payload=True,
score_threshold=0.5, # 이 점수 이하 결과 제외
)
return [
{
"score": round(r.score, 4),
"title": r.payload.get("title"),
"content": r.payload.get("content"),
"source": r.payload.get("source"),
}
for r in results
]
# 사용 예시
results = search("Docker 컨테이너 네트워크 설정", category="tech", top_k=5)
for r in results:
print(f"[{r['score']}] {r['title']}\n{r['content'][:100]}...")
## ── 스냅샷 백업 ─────────────────────────────────────
snapshot_info = client.create_snapshot(collection_name=COLLECTION)
print(f"스냅샷 생성: {snapshot_info.name}")임베딩 모델 완전 정복
RAG 시스템에서 임베딩 모델 선택은 검색 품질의 80%를 결정합니다. 아무리 좋은 LLM을 써도 임베딩 모델이 잘못 선택되면 검색 결과가 엉망이 되어 할루시네이션이 폭발합니다. 특히 한국어 문서를 다룬다면 모델 선택이 더욱 중요합니다.
🏆 2026년 기준 한국어 임베딩 모델 성능 비교
| 모델 | 차원 | 한국어 성능 | VRAM | 속도 | 라이선스 | 추천도 |
|---|---|---|---|---|---|---|
| BAAI/bge-m3 | 1024 | 🟢 최고 수준 | ~2GB | 🟡 보통 | MIT | 1순위 추천 |
| intfloat/multilingual-e5-large | 1024 | 🟢 우수 | ~2GB | 🟡 보통 | MIT | 2순위 |
| nomic-embed-text | 768 | 🟡 양호 | ~500MB | 🟢 빠름 | Apache 2.0 | 경량 추천 |
| OpenAI text-embedding-3-small | 1536 | 🟢 우수 | API | 🟢 빠름 | 상업용 | API 사용 시 |
| OpenAI text-embedding-3-large | 3072 | 🟢 최고 | API | 🟡 보통 | 상업용 | 최고 품질 |
| jhgan/ko-sroberta-multitask | 768 | 🟢 한국어 특화 | ~500MB | 🟢 빠름 | Apache 2.0 | 한국어 전용 |
Dense 임베딩(위의 모든 모델)은 문장 전체를 하나의 연속 벡터로 표현합니다. 의미적 유사성 검색에 강하지만 정확한 키워드(예: 버전 번호 “v2.4.1”, 고유명사 “김철수”)를 놓치는 단점이 있습니다. Sparse 임베딩(BM25, SPLADE)은 전통적인 키워드 기반으로 정확한 단어 매칭에 강합니다. Hybrid 검색은 두 가지를 결합해 가중 평균으로 랭킹하는 방식으로, 실제 프로덕션에서 검색 품질이 Dense만 쓸 때보다 20~40% 향상됩니다. Qdrant는 이 Hybrid 검색을 네이티브로 지원하는 몇 안 되는 벡터DB입니다.
🔀 Qdrant Hybrid 검색 구현 (Dense + BM25)
from qdrant_client.models import (
NamedVector, NamedSparseVector,
SparseVector, SparseVectorParams,
VectorParams, Distance
)
from fastembed import SparseTextEmbedding # pip install fastembed
# Hybrid 컬렉션 생성 (Dense + Sparse 동시 저장)
client.recreate_collection(
collection_name="hybrid_knowledge",
vectors_config={
"dense": VectorParams(size=1024, distance=Distance.COSINE),
},
sparse_vectors_config={
"sparse": SparseVectorParams(), # SPLADE/BM25용 희소 벡터
}
)
sparse_model = SparseTextEmbedding(model_name="prithivida/Splade_PP_en_v1")
def upsert_hybrid(docs: list[dict]):
"""Dense + Sparse 동시 임베딩 후 삽입"""
texts = [d["content"] for d in docs]
dense_vecs = embedder.encode(texts, normalize_embeddings=True)
sparse_vecs = list(sparse_model.embed(texts))
points = []
for i, doc in enumerate(docs):
sv = sparse_vecs[i]
points.append(PointStruct(
id=str(uuid.uuid4()),
vector={
"dense": dense_vecs[i].tolist(),
"sparse": SparseVector(
indices=sv.indices.tolist(),
values=sv.values.tolist()
)
},
payload={"content": doc["content"], "title": doc.get("title", "")}
))
client.upsert(collection_name="hybrid_knowledge", points=points)
def hybrid_search(query: str, top_k: int = 5):
"""Dense + Sparse 결합 검색 (RRF 랭킹)"""
dense_vec = embedder.encode([query], normalize_embeddings=True)[0].tolist()
sparse_vecs = list(sparse_model.embed([query]))
sv = sparse_vecs[0]
from qdrant_client.models import Prefetch, FusionQuery, Fusion
results = client.query_points(
collection_name="hybrid_knowledge",
prefetch=[
Prefetch(query=dense_vec, using="dense", limit=top_k * 2),
Prefetch(
query=SparseVector(
indices=sv.indices.tolist(),
values=sv.values.tolist()
),
using="sparse", limit=top_k * 2
),
],
query=FusionQuery(fusion=Fusion.RRF), # Reciprocal Rank Fusion
limit=top_k,
with_payload=True
)
return results.points프로덕션 RAG 파이프라인 완전 설계
처음 RAG를 구현하는 대부분의 개발자는 “문서 청킹 → 임베딩 → 벡터 검색 → LLM 답변” 이라는 단순한 흐름을 사용합니다. 이것을 Naive RAG라고 합니다. 실제 프로덕션에서 Naive RAG는 검색 정확도가 낮고, 긴 문서를 처리하지 못하며, 관련 없는 청크가 섞이는 문제를 일으킵니다.
🔍 Naive RAG vs Advanced RAG vs Modular RAG
| 단계 | Naive RAG | Advanced RAG | Modular RAG |
|---|---|---|---|
| 문서 처리 | 고정 크기 청킹 | Semantic 청킹 + 계층 구조 | 파서별 특화 처리기 |
| 인덱싱 | 단일 벡터 인덱스 | Parent-Child 청크 + 키워드 인덱스 | 멀티 인덱스 + 그래프 |
| 검색 | Top-K 유사도 검색 | 하이브리드 + 재랭킹 | 쿼리 분해 + 반복 검색 |
| 생성 | 단순 컨텍스트 주입 | LLM 기반 압축 + CoT | 에이전트 기반 동적 생성 |
| 검색 품질 | ~60% | ~80~85% | ~90%+ |
🏗️ Advanced RAG 파이프라인 완전 구현
from langchain_text_splitters import (
RecursiveCharacterTextSplitter,
MarkdownHeaderTextSplitter
)
from langchain_ollama import ChatOllama, OllamaEmbeddings
from langchain_community.vectorstores import Qdrant as LCQdrant
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain.schema import Document
import re
## ══ 1단계: 문서 처리 — Semantic 청킹 ══════════════
def smart_chunk_markdown(text: str, source: str) -> list[Document]:
"""마크다운 구조 기반 계층적 청킹"""
# 1차: 마크다운 헤더 기준 분할 (대분류)
md_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=[
("#", "h1"),
("##", "h2"),
("###","h3"),
])
sections = md_splitter.split_text(text)
# 2차: 섹션별 세밀 청킹 (실제 벡터화 단위)
char_splitter = RecursiveCharacterTextSplitter(
chunk_size=800,
chunk_overlap=100,
separators=["\n\n", "\n", "。", ". ", " ", ""],
)
chunks = []
for sec in sections:
sub_chunks = char_splitter.split_documents([sec])
for i, chunk in enumerate(sub_chunks):
chunk.metadata.update({
"source": source,
"chunk_index": i,
"parent_id": sec.metadata.get("h1", "") + sec.metadata.get("h2", ""),
})
chunks.append(chunk)
return chunks
## ══ 2단계: Parent-Child 리트리버 ══════════════════
# 부모 청크: 큰 맥락 보존용 (1500 토큰)
parent_splitter = RecursiveCharacterTextSplitter(
chunk_size=1500, chunk_overlap=200
)
# 자식 청크: 정밀 검색용 (300 토큰)
child_splitter = RecursiveCharacterTextSplitter(
chunk_size=300, chunk_overlap=50
)
embeddings = OllamaEmbeddings(
base_url="http://192.168.1.253:11434",
model="nomic-embed-text"
)
# 자식 청크는 벡터 DB에, 부모 청크는 로컬 스토어에
vectorstore = LCQdrant.from_existing_collection(
embedding=embeddings,
collection_name="rag_children",
url="http://192.168.1.253:6333",
api_key="your_api_key"
)
docstore = InMemoryStore() # 프로덕션에서는 Redis로 교체
retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=docstore,
child_splitter=child_splitter,
parent_splitter=parent_splitter,
search_kwargs={"k": 10}, # 후보를 충분히 가져온 뒤 재랭킹
)
## ══ 3단계: 쿼리 확장 + 재랭킹 ════════════════════
llm = ChatOllama(
base_url="http://192.168.1.253:11434",
model="qwen2.5:14b",
temperature=0
)
def expand_query(query: str) -> list[str]:
"""LLM으로 쿼리 재작성 — 다양한 관점에서 검색 강화"""
prompt = f"""다음 질문을 서로 다른 3가지 방식으로 재작성해줘.
같은 의미지만 다른 표현과 관점을 사용해.
각 질문을 줄바꿈으로 구분해서 3개만 출력해.
원본 질문: {query}"""
result = llm.invoke(prompt)
rewrites = [q.strip() for q in result.content.split('\n') if q.strip()]
return [query] + rewrites[:3] # 원본 + 3가지 재작성
def cross_encoder_rerank(query: str, docs: list, top_n: int = 5) -> list:
"""Cross-Encoder로 최종 재랭킹 (더 정밀한 관련성 평가)"""
from sentence_transformers import CrossEncoder
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
pairs = [(query, doc.page_content) for doc in docs]
scores = reranker.predict(pairs)
ranked = sorted(zip(docs, scores), key=lambda x: x[1], reverse=True)
return [doc for doc, _ in ranked[:top_n]]
## ══ 4단계: 최종 RAG Chain ═════════════════════════
def rag_answer(question: str) -> dict:
"""완전한 Advanced RAG 실행"""
# 쿼리 확장
queries = expand_query(question)
# 다중 쿼리로 검색 (더 많은 관련 문서 수집)
all_docs = []
seen_ids = set()
for q in queries:
docs = retriever.get_relevant_documents(q)
for doc in docs:
doc_id = hash(doc.page_content)
if doc_id not in seen_ids:
all_docs.append(doc)
seen_ids.add(doc_id)
# 재랭킹으로 최종 상위 5개 선택
top_docs = cross_encoder_rerank(question, all_docs, top_n=5)
# 컨텍스트 압축 (LLM으로 관련 부분만 추출)
context = "\n\n---\n\n".join([d.page_content for d in top_docs])
# 최종 답변 생성
prompt = f"""당신은 정확하고 신뢰할 수 있는 AI 어시스턴트입니다.
아래 제공된 컨텍스트만을 사용해서 질문에 답하세요.
컨텍스트에 없는 내용은 절대 추측하거나 지어내지 마세요.
모르는 경우 "제공된 문서에서 해당 정보를 찾을 수 없습니다."라고 말하세요.
컨텍스트:
{context}
질문: {question}
답변:"""
answer = llm.invoke(prompt)
return {
"answer": answer.content,
"sources": [d.metadata.get("source", "") for d in top_docs],
"num_docs": len(top_docs),
"queries": queries,
}
# 실행
result = rag_answer("Docker 컨테이너에서 GPU를 사용하려면 어떻게 설정해야 하나요?")
print(result["answer"])
print(f"\n참고 출처: {result['sources']}")✂️ 청킹 전략 완전 가이드
| 전략 | 방식 | 장점 | 단점 | 적합 문서 |
|---|---|---|---|---|
| Fixed Size | 토큰 수 기준 고정 분할 | 구현 단순, 예측 가능 | 의미 단위 파괴 가능 | 균일한 구조 문서 |
| Recursive | 구분자 계층적 분할 | 문장/단락 경계 존중 | 청크 크기 불균일 | 일반 텍스트, 범용 |
| Semantic | 의미 변화점 기준 분할 | 최고 검색 품질 | 처리 속도 느림, 비쌈 | 학술 논문, 기술 문서 |
| Markdown Header | 제목 태그 기준 분할 | 구조 보존 완벽 | 마크다운 전용 | 기술 문서, 위키 |
| Parent-Child | 대/소 두 단계 계층 | 검색 정밀도 + 컨텍스트 풍부 | 저장 공간 2배 | 복잡한 긴 문서 |
RAG 품질 평가 & 할루시네이션 억제
“RAG가 잘 동작하는 것 같다”는 주관적 평가로는 프로덕션 신뢰성을 보장할 수 없습니다. RAGAS(RAG Assessment) 같은 객관적 평가 프레임워크로 수치화하고 지속적으로 모니터링해야 합니다.
📊 RAGAS 4가지 핵심 평가 지표
from ragas import evaluate
from ragas.metrics import (
faithfulness, answer_relevancy,
context_precision, context_recall
)
from datasets import Dataset
# 평가 데이터셋 (질문 + 정답 + RAG 답변 + 검색 컨텍스트)
eval_data = {
"question": [
"Docker에서 GPU를 사용하려면 무엇이 필요한가?",
"Qdrant의 HNSW 파라미터 m의 역할은?",
],
"answer": [
"NVIDIA Container Toolkit을 설치하고 docker run --gpus all 옵션을 사용해야 합니다.",
"m 파라미터는 HNSW 그래프에서 각 노드의 연결 수를 결정하며, 높을수록 검색 정확도가 올라가지만 메모리를 더 많이 사용합니다.",
],
"contexts": [
["NVIDIA Container Toolkit은 Docker 컨테이너에서 GPU를 사용하기 위한 런타임입니다..."],
["HNSW 알고리즘의 m 파라미터는 각 노드가 유지하는 최대 연결 수입니다..."],
],
"ground_truth": [
"NVIDIA Container Toolkit 설치 필요",
"m은 각 노드의 연결 수로 정확도와 메모리 트레이드오프",
],
}
dataset = Dataset.from_dict(eval_data)
result = evaluate(
dataset,
metrics=[
faithfulness,
answer_relevancy,
context_precision,
context_recall,
],
)
print(result)
# 출력 예시:
# {'faithfulness': 0.89, 'answer_relevancy': 0.84,
# 'context_precision': 0.76, 'context_recall': 0.81}🛡️ 할루시네이션 억제 전략 5가지
score_threshold=0.5 이상인 결과만 사용. 관련 없는 문서를 컨텍스트로 주는 것이 오히려 없는 것보다 나쁠 수 있습니다.Grafana + Prometheus GPU 모니터링
AI 서버는 24시간 GPU를 풀로 돌리는 경우가 많습니다. VRAM이 조용히 누수되거나 GPU 온도가 위험 수준에 도달해도 모르고 있다면 하드웨어 손상이나 서비스 중단으로 이어질 수 있습니다. Grafana 대시보드로 모든 지표를 실시간 시각화하고 이상 감지 시 알림을 받습니다.
services:
prometheus:
image: prom/prometheus:latest
container_name: prometheus
restart: unless-stopped
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.retention.time=30d'
- '--web.enable-lifecycle'
networks:
- ai_net
grafana:
image: grafana/grafana:latest
container_name: grafana
restart: unless-stopped
ports:
- "3002:3000"
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./grafana/datasources:/etc/grafana/provisioning/datasources
environment:
- GF_SECURITY_ADMIN_PASSWORD=secure_grafana_password
- GF_USERS_ALLOW_SIGN_UP=false
- GF_SMTP_ENABLED=true # 이메일 알림
- GF_SMTP_HOST=smtp.gmail.com:587
networks:
- ai_net
# NVIDIA GPU 메트릭 수집기
nvidia-exporter:
image: utkuozdemir/nvidia_gpu_exporter:1.2.0
container_name: nvidia_exporter
restart: unless-stopped
ports:
- "9835:9835"
devices:
- /dev/nvidiactl
- /dev/nvidia0
volumes:
- /usr/lib/x86_64-linux-gnu/libnvidia-ml.so.1:/usr/lib/x86_64-linux-gnu/libnvidia-ml.so.1
networks:
- ai_net
# 시스템 메트릭 수집기 (CPU/RAM/Disk/Network)
node-exporter:
image: prom/node-exporter:latest
container_name: node_exporter
restart: unless-stopped
ports:
- "9100:9100"
volumes:
- /proc:/host/proc:ro
- /sys:/host/sys:ro
- /:/rootfs:ro
command:
- '--path.procfs=/host/proc'
- '--path.sysfs=/host/sys'
- '--collector.filesystem.ignored-mount-points=^/(sys|proc|dev|host|etc)($$|/)'
networks:
- ai_net
# Docker 컨테이너 메트릭
cadvisor:
image: gcr.io/cadvisor/cadvisor:latest
container_name: cadvisor
restart: unless-stopped
ports:
- "8080:8080"
volumes:
- /:/rootfs:ro
- /var/run:/var/run:ro
- /sys:/sys:ro
- /var/lib/docker/:/var/lib/docker:ro
networks:
- ai_net
volumes:
prometheus_data:
grafana_data:
networks:
ai_net:
external: true
name: ai_networkglobal:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "alert_rules.yml"
alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']
scrape_configs:
- job_name: 'nvidia-gpu'
static_configs:
- targets: ['nvidia-exporter:9835']
- job_name: 'node'
static_configs:
- targets: ['node-exporter:9100']
- job_name: 'docker-containers'
static_configs:
- targets: ['cadvisor:8080']
- job_name: 'ollama'
metrics_path: '/api/metrics'
static_configs:
- targets: ['ollama:11434']🚨 핵심 알림 규칙 설정
groups:
- name: ai_server_alerts
rules:
# GPU 온도 경고 (85도 이상)
- alert: GPUTemperatureHigh
expr: nvidia_smi_temperature_gpu > 85
for: 2m
labels:
severity: warning
annotations:
summary: "GPU 온도 위험 수준"
description: "GPU 온도가 {{ $value }}°C에 도달했습니다. 냉각 확인 필요."
# VRAM 사용률 95% 이상
- alert: GPUMemoryCritical
expr: (nvidia_smi_memory_used_bytes / nvidia_smi_memory_total_bytes) * 100 > 95
for: 5m
labels:
severity: critical
annotations:
summary: "VRAM 거의 소진"
description: "VRAM 사용률이 {{ $value }}%입니다."
# RAM 사용률 90% 이상
- alert: HighMemoryUsage
expr: (1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) * 100 > 90
for: 5m
labels:
severity: warning
annotations:
summary: "시스템 RAM 부족"
description: "메모리 사용률이 {{ $value }}%입니다."
# 디스크 사용률 85% 이상
- alert: DiskSpaceLow
expr: (1 - (node_filesystem_free_bytes{mountpoint="/"} / node_filesystem_size_bytes{mountpoint="/"})) * 100 > 85
for: 10m
labels:
severity: warning
annotations:
summary: "디스크 공간 부족"
description: "루트 파티션 사용률이 {{ $value }}%입니다."
# 컨테이너 재시작 감지
- alert: ContainerRestarting
expr: rate(container_last_seen{name!=""}[5m]) > 0 AND changes(container_last_seen{name!=""}[10m]) > 2
labels:
severity: warning
annotations:
summary: "컨테이너 {{ $labels.name }} 반복 재시작 감지"📊 Grafana 핵심 대시보드 패널 구성
| 패널명 | PromQL 쿼리 | 시각화 |
|---|---|---|
| GPU 사용률 | nvidia_smi_utilization_gpu_ratio * 100 | Gauge + Time Series |
| GPU 온도 | nvidia_smi_temperature_gpu | Gauge (임계값 색상) |
| VRAM 사용량 | nvidia_smi_memory_used_bytes / 1024^3 | Bar Gauge |
| GPU 전력 소비 | nvidia_smi_power_draw_watts | Time Series |
| CPU 사용률 | 100 - avg(rate(node_cpu_seconds_total{mode="idle"}[1m])) * 100 | Time Series |
| RAM 사용량 | (node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / 1024^3 | Time Series |
| Ollama 활성 모델 | ollama_models_loaded_total | Stat Panel |
| 컨테이너 상태 | count(container_last_seen) by (name) | Table |
AI 서버 트러블슈팅 완전 매뉴얼
AI 서버에서 발생하는 대부분의 문제는 패턴이 있습니다. 증상별로 원인과 해결 방법을 체계적으로 정리했습니다. 이 섹션을 북마크해두고 문제 발생 시 바로 참조하세요.
🎮 GPU 관련 문제
nvidia-smi → 어떤 프로세스가 VRAM을 점유 중인지 확인. Ollama + ComfyUI + A1111 동시 실행 시 VRAM 합산 초과 자주 발생.docker exec ollama ollama stop qwen2.5:14b. 또는 ComfyUI에서 “Free Memory” 버튼 클릭 후 재실행.#!/bin/bash
# AI 서버 GPU 진단 스크립트
echo "=== GPU 드라이버 확인 ==="
nvidia-smi && echo "✅ 드라이버 정상" || echo "❌ 드라이버 없음"
echo "=== NVIDIA Container Toolkit 확인 ==="
nvidia-ctk --version && echo "✅ Toolkit 설치됨" || echo "❌ Toolkit 미설치"
echo "=== Docker 런타임 설정 확인 ==="
cat /etc/docker/daemon.json | python3 -m json.tool
echo "=== Docker GPU 직접 테스트 ==="
docker run --rm --gpus all ubuntu nvidia-smi && echo "✅ GPU 정상" || echo "❌ GPU 실패"
echo "=== 해결 시도 ==="
sudo nvidia-ctk runtime configure --runtime=docker
sudo systemctl restart docker
echo "Docker 재시작 완료. 다시 테스트하세요."🤖 Ollama / LLM 관련 문제
| 증상 | 원인 | 해결 |
|---|---|---|
| 추론 속도가 갑자기 10배 느려짐 | VRAM 초과로 CPU 폴백 | ollama ps로 확인 → 더 작은 모델 사용 |
| 모델이 응답 중 멈춤 | 컨텍스트 길이 초과 | Open WebUI 채팅 새로 시작 또는 num_ctx 줄이기 |
| API 타임아웃 오류 | Nginx 타임아웃 설정 부족 | proxy_read_timeout 300s로 늘리기 |
| 모델 다운로드 중 끊김 | 디스크 공간 부족 또는 네트워크 | df -h 확인 → docker system prune |
| 한국어 답변 품질 저하 | 임베딩 모델이 한국어 비최적화 | bge-m3 또는 ko-sroberta로 교체 |
🗄️ Qdrant / RAG 관련 문제
| 증상 | 원인 | 해결 |
|---|---|---|
| RAG 검색이 관련 없는 문서 반환 | 청크 크기 부적절 또는 임베딩 모델 불일치 | 청킹 전략 재검토, 임베딩 모델 일치 확인 |
| Qdrant 404 컬렉션 없음 오류 | 컬렉션 미생성 또는 이름 불일치 | GET /collections로 목록 확인 후 재생성 |
| 임베딩 후 검색 품질 급락 | 임베딩 모델 버전 변경으로 벡터 불일치 | 컬렉션 삭제 후 동일 모델로 전체 재임베딩 |
| Qdrant 메모리 사용량 급증 | 인덱스 미최적화 상태 | POST /collections/{name}/index로 강제 최적화 |
| score_threshold 이하 결과만 반환 | 임베딩 모델-컬렉션 차원 불일치 | 벡터 차원 확인: GET /collections/{name} |
🐳 Docker / 컨테이너 문제
#!/bin/bash # ~/ai-server/scripts/health_check.sh echo "╔═══════════════════════════════════════╗" echo "║ AI 서버 헬스체크 리포트 ║" echo "╚═══════════════════════════════════════╝" echo "" # GPU 상태 echo "🎮 GPU 상태" nvidia-smi --query-gpu=name,temperature.gpu,utilization.gpu,memory.used,memory.total \ --format=csv,noheader | awk -F',' '{ printf " 모델: %s | 온도: %s°C | GPU: %s | VRAM: %s / %s\n", $1, $2, $3, $4, $5 }' echo "" # 컨테이너 상태 echo "🐳 컨테이너 상태" docker ps --format " {{.Names}}\t{{.Status}}\t{{.Ports}}" | column -t echo "" # 디스크 사용량 echo "💾 디스크 사용량" df -h / /home 2>/dev/null | grep -v Filesystem | awk '{printf " %s: %s / %s (%s)\n", $6, $3, $2, $5}' echo "" # 메모리 echo "🧠 메모리" free -h | awk '/^Mem/{printf " 사용: %s / 전체: %s (여유: %s)\n", $3, $2, $7}' echo "" # 핵심 서비스 응답 확인 echo "🌐 서비스 응답 확인" services=( "Ollama:http://localhost:11434/api/tags" "OpenWebUI:http://localhost:3000/health" "Qdrant:http://localhost:6333/healthz" "n8n:http://localhost:5678/healthz" ) for svc in "${services[@]}"; do name="${svc%%:*}" url="${svc#*:}" code=$(curl -s -o /dev/null -w "%{http_code}" --connect-timeout 3 "$url") [ "$code" = "200" ] && status="✅" || status="❌" echo " $status $name ($code)" done echo "" echo "완료: $(date '+%Y-%m-%d %H:%M:%S')"
✅ 프로덕션 운영 최종 체크리스트
- ✓Qdrant 스냅샷 자동 백업 (cron 매일 새벽 3시)
- ✓Docker 볼륨 전체 백업 스크립트 설정
- ✓Grafana 알림 — Slack/텔레그램/이메일 연동 완료
- ✓GPU 온도 임계값 알림 설정 (85°C 경고, 90°C 긴급)
- ✓RAGAS 정기 평가 — 월 1회 RAG 품질 수치 측정
- ✓Fail2ban 차단 현황 주간 점검
- ✓Docker 이미지 월간 업데이트 (
docker compose pull) - ✓AI 모델 업데이트 주시 — Ollama 새 모델 릴리즈 추적
- !NVIDIA 드라이버 업데이트 주의 — 반드시 테스트 환경 먼저 검증
- !임베딩 모델 교체 시 반드시 전체 벡터 재인덱싱 필요
- !Open WebUI API 키가 노출되지 않도록 환경변수 관리
AI 데이터 파이프라인 아키텍처 설계
현대적 데이터 파이프라인은 단순히 데이터를 옮기는 것이 아닙니다. AI가 데이터의 의미를 이해하고 정제·분류·분석하는 인텔리전트 파이프라인이 핵심입니다. 모든 구성요소를 AI 서버 내 Docker로 운영해 외부 데이터 유출 없이 완전한 프라이빗 환경을 유지합니다.
🏗️ 전체 파이프라인 플로우
🛠️ 도구 선택 매트릭스
| 레이어 | 경량 (소규모) | 중간 (팀 운영) | 대형 (엔터프라이즈) |
|---|---|---|---|
| 오케스트레이션 | n8n 스케줄 | Apache Airflow | Prefect / Dagster |
| 데이터 수집 | Python requests | Scrapy + Playwright | Apache Kafka |
| 변환 (ETL) | pandas | dbt + pandas | Apache Spark |
| 저장소 (OLTP) | SQLite | PostgreSQL + TimescaleDB | Snowflake |
| 벡터 저장 | ChromaDB | Qdrant | Pinecone |
| BI 시각화 | Grafana | Grafana + Metabase | Tableau |
| AI 분석 | Ollama | vLLM + LangChain | vLLM 클러스터 |
다중 소스 데이터 자동 수집
import asyncio, httpx, feedparser
from datetime import datetime, timezone
from typing import AsyncGenerator
import psycopg2
DB_CONFIG = {
"host": "192.168.1.253", "port": 5432,
"dbname": "aiserver_main",
"user": "aiserver", "password": "secure_password"
}
async def collect_rss_feeds(feeds: list[str]) -> list[dict]:
"""RSS/Atom 피드 병렬 수집"""
async def fetch_feed(url: str) -> list[dict]:
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(url)
parsed = feedparser.parse(resp.text)
items = []
for entry in parsed.entries[:20]:
items.append({
"title": entry.get("title", ""),
"content": entry.get("summary", entry.get("description", "")),
"url": entry.get("link", ""),
"published": entry.get("published", datetime.now(timezone.utc).isoformat()),
"source": parsed.feed.get("title", url),
"source_type": "rss",
})
return items
except Exception as e:
print(f"RSS 수집 실패 {url}: {e}")
return []
results = await asyncio.gather(*[fetch_feed(url) for url in feeds])
return [item for sublist in results for item in sublist]
async def collect_api_data(endpoints: list[dict]) -> list[dict]:
"""REST API 엔드포인트 데이터 수집"""
results = []
async with httpx.AsyncClient(timeout=30) as client:
for ep in endpoints:
try:
resp = await client.get(
ep["url"],
headers=ep.get("headers", {}),
params=ep.get("params", {})
)
data = resp.json()
# 각 API별 데이터 정규화
items = ep.get("parser", lambda x: x)(data)
for item in items:
item["source_type"] = "api"
item["source"] = ep.get("name", ep["url"])
results.extend(items)
except Exception as e:
print(f"API 수집 실패 {ep['url']}: {e}")
return results
async def collect_web_content(urls: list[str]) -> list[dict]:
"""웹페이지 본문 추출 (Playwright)"""
try:
from playwright.async_api import async_playwright
except ImportError:
print("pip install playwright && playwright install chromium")
return []
results = []
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=True)
context = await browser.new_context(
user_agent="Mozilla/5.0 (compatible; AIServerBot/1.0)"
)
for url in urls:
try:
page = await context.new_page()
await page.goto(url, wait_until="domcontentloaded", timeout=30000)
title = await page.title()
content = await page.inner_text("article, main, .content, body")
results.append({
"title": title, "content": content[:5000],
"url": url, "source_type": "web",
"collected_at": datetime.now(timezone.utc).isoformat()
})
await page.close()
except Exception as e:
print(f"웹 수집 실패 {url}: {e}")
await browser.close()
return results
def save_to_postgres(items: list[dict], table: str = "raw_data"):
"""수집 데이터 PostgreSQL 저장 (중복 URL 무시)"""
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor()
# 테이블 자동 생성
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {table} (
id SERIAL PRIMARY KEY,
title TEXT,
content TEXT,
url TEXT UNIQUE,
source TEXT,
source_type TEXT,
published TIMESTAMPTZ,
collected_at TIMESTAMPTZ DEFAULT NOW(),
processed BOOLEAN DEFAULT FALSE,
ai_category TEXT,
ai_sentiment TEXT,
ai_summary TEXT
)
""")
inserted = 0
for item in items:
try:
cur.execute(f"""
INSERT INTO {table} (title, content, url, source, source_type, published)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (url) DO NOTHING
""", (
item.get("title", ""),
item.get("content", "")[:10000],
item.get("url", ""),
item.get("source", ""),
item.get("source_type", ""),
item.get("published", None),
))
if cur.rowcount > 0:
inserted += 1
except Exception as e:
print(f"저장 오류: {e}")
conn.commit()
cur.close()
conn.close()
print(f"✅ {inserted}개 신규 데이터 저장 (총 {len(items)}개 중)")
return inserted
# 실행 예시
async def main():
RSS_FEEDS = [
"https://feeds.feedburner.com/TechCrunch",
"https://www.technologyreview.com/feed/",
"https://rss.arxiv.org/rss/cs.AI",
]
print("📡 RSS 수집 중...")
rss_items = await collect_rss_feeds(RSS_FEEDS)
print(f" 수집: {len(rss_items)}건")
save_to_postgres(rss_items, "raw_data")
asyncio.run(main())AI 기반 데이터 정제 & 분류 ETL
수집된 원시 데이터(Raw Data)를 LLM이 직접 분류·정제하는 AI-powered ETL입니다. 기존 ETL이 규칙(Rule)에 의존한다면, AI ETL은 의미(Semantics)를 이해합니다. 뉴스 기사의 감성, 토픽, 핵심 엔티티를 자동으로 추출해 구조화된 데이터로 변환합니다.
import httpx, json, asyncio, psycopg2
from psycopg2.extras import RealDictCursor
OLLAMA_URL = "http://192.168.1.253:11434"
DB_CONFIG = {"host":"192.168.1.253","port":5432,"dbname":"aiserver_main",
"user":"aiserver","password":"secure_password"}
async def ai_classify_batch(items: list[dict]) -> list[dict]:
"""
LLM 배치 분류: 카테고리·감성·요약·키워드·중요도 동시 추출
배치로 처리해 API 호출 횟수 최소화
"""
if not items:
return []
# 배치 프롬프트 구성 (한 번에 최대 10개)
batch_text = "\n\n".join([
f"[{i+1}] 제목: {item['title'][:200]}\n내용: {item['content'][:500]}"
for i, item in enumerate(items[:10])
])
prompt = f"""다음 {len(items[:10])}개의 데이터를 분석하고 JSON 배열로 출력해.
각 항목에 대해 정확히 다음 형식으로:
{batch_text}
출력 형식 (JSON 배열만, 다른 텍스트 없이):
[
{{
"index": 1,
"category": "기술/비즈니스/정치/경제/스포츠/문화/과학/기타 중 하나",
"subcategory": "세부 카테고리",
"sentiment": "positive/negative/neutral",
"sentiment_score": 0.0~1.0 사이 숫자,
"importance": "high/medium/low",
"summary": "한국어 1~2문장 요약",
"keywords": ["키워드1", "키워드2", "키워드3"],
"entities": {{"companies": [], "people": [], "locations": []}},
"is_duplicate_risk": true/false
}}
]"""
async with httpx.AsyncClient(timeout=120) as client:
resp = await client.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": "qwen2.5:14b",
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.1}
}
)
raw = resp.json()["response"]
try:
import re
json_match = re.search(r'\[.*\]', raw, re.DOTALL)
results = json.loads(json_match.group()) if json_match else []
return results
except Exception as e:
print(f"JSON 파싱 오류: {e}")
return []
def update_processed_data(results: list[dict], ids: list[int]):
"""분류 결과를 DB에 업데이트"""
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor()
for r in results:
idx = r.get("index", 1) - 1
if idx < len(ids):
cur.execute("""
UPDATE raw_data SET
processed = TRUE,
ai_category = %s,
ai_sentiment = %s,
ai_summary = %s
WHERE id = %s
""", (
f"{r.get('category','')}/{r.get('subcategory','')}",
r.get("sentiment", "neutral"),
r.get("summary", ""),
ids[idx]
))
conn.commit(); cur.close(); conn.close()
async def run_etl_pipeline(batch_size: int = 10):
"""미처리 데이터를 배치로 AI 분류"""
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor(cursor_factory=RealDictCursor)
cur.execute("""
SELECT id, title, content FROM raw_data
WHERE processed = FALSE
ORDER BY collected_at DESC
LIMIT %s
""", (batch_size,))
rows = cur.fetchall()
cur.close(); conn.close()
if not rows:
print("✅ 처리할 데이터 없음")
return
print(f"🤖 AI 분류 시작: {len(rows)}건")
ids = [r["id"] for r in rows]
items = [{"title": r["title"], "content": r["content"]} for r in rows]
results = await ai_classify_batch(items)
if results:
update_processed_data(results, ids)
print(f"✅ {len(results)}건 분류 완료")
# 벡터 DB 동시 인덱싱 (RAG 검색용)
await index_to_qdrant(rows)
async def index_to_qdrant(rows: list[dict]):
"""처리된 데이터를 Qdrant에 동시 인덱싱"""
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct
from sentence_transformers import SentenceTransformer
import uuid
embedder = SentenceTransformer("BAAI/bge-m3")
qdrant = QdrantClient(host="192.168.1.253", port=6333, api_key="your_key")
texts = [f"{r['title']} {r['content'][:500]}" for r in rows]
vectors = embedder.encode(texts, normalize_embeddings=True).tolist()
points = [
PointStruct(
id=str(uuid.uuid4()),
vector=vectors[i],
payload={
"title": rows[i]["title"],
"content": rows[i]["content"][:500],
"db_id": rows[i]["id"],
"type": "pipeline_data"
}
)
for i in range(len(rows))
]
qdrant.upsert(collection_name="ai_knowledge_base", points=points)
print(f" 📌 Qdrant 인덱싱: {len(points)}건")
asyncio.run(run_etl_pipeline(batch_size=50))Apache Airflow — 파이프라인 오케스트레이션
여러 파이프라인이 서로 의존하고 실패 시 재시도가 필요해지면 n8n만으로는 한계가 있습니다. Apache Airflow는 DAG(Directed Acyclic Graph) 방식으로 복잡한 데이터 파이프라인을 시각적으로 관리합니다.
services:
airflow-webserver:
image: apache/airflow:2.10.0
container_name: airflow_web
restart: unless-stopped
ports:
- "8085:8080"
environment:
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@airflow_postgres/airflow
- AIRFLOW__CORE__FERNET_KEY=your_fernet_key_here
- AIRFLOW__WEBSERVER__SECRET_KEY=your_secret_key
- AIRFLOW__CORE__LOAD_EXAMPLES=False
- _AIRFLOW_WWW_USER_CREATE=true
- _AIRFLOW_WWW_USER_USERNAME=admin
- _AIRFLOW_WWW_USER_PASSWORD=admin_password
volumes:
- ./dags:/opt/airflow/dags # DAG 파일 위치
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
command: webserver
depends_on:
- airflow_postgres
networks:
- ai_net
airflow-scheduler:
image: apache/airflow:2.10.0
container_name: airflow_scheduler
restart: unless-stopped
environment:
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@airflow_postgres/airflow
- AIRFLOW__CORE__FERNET_KEY=your_fernet_key_here
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
command: scheduler
depends_on:
- airflow_postgres
networks:
- ai_net
airflow_postgres:
image: postgres:16-alpine
container_name: airflow_postgres
environment:
- POSTGRES_DB=airflow
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
volumes:
- airflow_pg_data:/var/lib/postgresql/data
networks:
- ai_net
volumes:
airflow_pg_data:
networks:
ai_net:
external: true
name: ai_networkfrom airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
“owner”: “ai-server”,
“depends_on_past”: False,
“start_date”: datetime(2026, 1, 1),
“retries”: 2,
“retry_delay”: timedelta(minutes=5),
“email_on_failure”: False,
}
def collect_task(**context):
“””데이터 수집 태스크”””
import asyncio, sys
sys.path.insert(0, ‘/opt/airflow/plugins’)
from collectors import collect_rss_feeds, save_to_postgres
RSS_FEEDS = [
“https://feeds.feedburner.com/TechCrunch”,
“https://www.technologyreview.com/feed/”,
]
items = asyncio.run(collect_rss_feeds(RSS_FEEDS))
count = save_to_postgres(items, “raw_data”)
# XCom으로 다음 태스크에 개수 전달
context[“task_instance”].xcom_push(key=”collected_count”, value=count)
return count
def etl_task(**context):
“””AI ETL 정제 태스크”””
import asyncio, sys
sys.path.insert(0, ‘/opt/airflow/plugins’)
from etl_processor import run_etl_pipeline
count = asyncio.run(run_etl_pipeline(batch_size=100))
return count
def report_task(**context):
“””일일 리포트 생성 및 슬랙 알림”””
import httpx
ti = context[“task_instance”]
collected = ti.xcom_pull(key=”collected_count”, task_ids=”collect_data”)
msg = (f”📊 *AI 데이터 파이프라인 일일 리포트*\n”
f”- 수집: {collected}건\n”
f”- 날짜: {context[‘ds’]}\n”
f”- 상태: ✅ 정상 완료”)
httpx.post(“https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK”,
json={“text”: msg})
with DAG(
“ai_data_pipeline”,
default_args=default_args,
description=”AI 기반 자동 데이터 파이프라인”,
schedule_interval=”0 */6 * * *”, # 6시간마다
catchup=False,
tags=[“ai”, “data”, “pipeline”],
) as dag:
t1 = PythonOperator(task_id=”collect_data”, python_callable=collect_task)
t2 = PythonOperator(task_id=”etl_classify”, python_callable=etl_task)
t3 = PythonOperator(task_id=”send_report”, python_callable=report_task)
t4 = BashOperator(
task_id=”cleanup_old_data”,
bash_command=”””psql postgresql://aiserver:[email protected]/aiserver_main \
-c “DELETE FROM raw_data WHERE collected_at < NOW() - INTERVAL '30 days';" """
)
# 실행 순서: 수집 → ETL → 리포트 & 정리 (병렬)
t1 >> t2 >> [t3, t4]Grafana BI 대시보드 — 비즈니스 인사이트 시각화
Grafana는 GPU 모니터링뿐 아니라 비즈니스 KPI 대시보드에도 탁월합니다. PostgreSQL 데이터소스를 연결하면 파이프라인이 수집한 데이터를 실시간으로 시각화하고, AI가 이상 패턴을 감지하면 즉시 알림을 보냅니다.
📊 핵심 BI 패널 구성 (PostgreSQL 쿼리)
-- 패널 1: 시간대별 수집 건수 (Time Series) SELECT date_trunc('hour', collected_at) AS time, source, COUNT(*) AS count FROM raw_data WHERE collected_at >= NOW() - INTERVAL '7 days' GROUP BY 1, 2 ORDER BY 1; -- 패널 2: 카테고리별 분포 (Pie Chart) SELECT ai_category, COUNT(*) AS count, ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS percentage FROM raw_data WHERE processed = TRUE AND collected_at >= NOW() - INTERVAL '24 hours' GROUP BY ai_category ORDER BY count DESC; -- 패널 3: 감성 분석 트렌드 (Time Series) SELECT date_trunc('hour', collected_at) AS time, ai_sentiment, COUNT(*) AS count FROM raw_data WHERE processed = TRUE AND collected_at >= NOW() - INTERVAL '48 hours' GROUP BY 1, 2 ORDER BY 1; -- 패널 4: 최근 중요 뉴스 (Table) SELECT title, source, ai_category, ai_sentiment, LEFT(ai_summary, 100) || '...' AS summary, collected_at FROM raw_data WHERE processed = TRUE AND ai_category ILIKE '%기술%' ORDER BY collected_at DESC LIMIT 20; -- 패널 5: 소스별 성과 (Bar Chart) SELECT source, COUNT(*) AS total, COUNT(*) FILTER (WHERE ai_sentiment = 'positive') AS positive, COUNT(*) FILTER (WHERE ai_sentiment = 'negative') AS negative, ROUND(AVG(CASE WHEN ai_sentiment='positive' THEN 1.0 WHEN ai_sentiment='negative' THEN -1.0 ELSE 0 END), 3) AS avg_sentiment FROM raw_data WHERE processed = TRUE AND collected_at >= NOW() - INTERVAL '7 days' GROUP BY source ORDER BY total DESC LIMIT 10;
🤖 AI 기반 Grafana 이상 감지 알림
import psycopg2, httpx, asyncio
from psycopg2.extras import RealDictCursor
from datetime import datetime, timezone
DB_CONFIG = {"host":"192.168.1.253","port":5432,"dbname":"aiserver_main",
"user":"aiserver","password":"secure_password"}
OLLAMA_URL = "http://192.168.1.253:11434"
SLACK_URL = "https://hooks.slack.com/services/YOUR/WEBHOOK"
async def detect_anomalies():
"""
최근 1시간 데이터에서 이상 패턴 AI 감지
- 급격한 감성 변화
- 특정 키워드 급증
- 데이터 수집 이상
"""
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor(cursor_factory=RealDictCursor)
# 최근 1시간 vs 이전 24시간 평균 비교
cur.execute("""
SELECT
COUNT(*) FILTER (WHERE collected_at >= NOW() - INTERVAL '1 hour') AS last_hour,
COUNT(*) FILTER (WHERE collected_at >= NOW() - INTERVAL '25 hours'
AND collected_at < NOW() - INTERVAL '1 hour') / 24.0 AS hourly_avg,
COUNT(*) FILTER (WHERE ai_sentiment = 'negative'
AND collected_at >= NOW() - INTERVAL '1 hour') AS negative_count,
COUNT(*) FILTER (WHERE collected_at >= NOW() - INTERVAL '1 hour') AS total_recent
FROM raw_data
WHERE processed = TRUE
""")
stats = cur.fetchone()
cur.close(); conn.close()
anomalies = []
# 수집량 급감 (평균 대비 50% 이하)
if stats["hourly_avg"] > 0:
ratio = stats["last_hour"] / stats["hourly_avg"]
if ratio < 0.5:
anomalies.append(f"⚠️ 데이터 수집량 급감: 평균의 {ratio*100:.0f}% ({stats['last_hour']}건)")
# 부정 감성 급증 (70% 초과)
if stats["total_recent"] > 10:
neg_ratio = stats["negative_count"] / stats["total_recent"]
if neg_ratio > 0.7:
anomalies.append(f"🚨 부정 감성 급증: {neg_ratio*100:.0f}% ({stats['negative_count']}건)")
if not anomalies:
return
# LLM으로 이상 패턴 분석 리포트 생성
anomaly_text = "\n".join(anomalies)
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": "qwen2.5:7b",
"prompt": f"""다음 데이터 이상 패턴을 분석하고 가능한 원인과 권장 조치를 제안해줘 (3~5문장):
이상 항목:
{anomaly_text}
현재 시간: {datetime.now(timezone.utc).isoformat()}""",
"stream": False,
"options": {"temperature": 0.3}
}
)
analysis = resp.json()["response"]
# Slack 알림 전송
slack_msg = {
"text": "🔍 *AI 서버 데이터 파이프라인 이상 감지*",
"blocks": [
{"type": "header", "text": {"type": "plain_text", "text": "🔍 데이터 이상 감지"}},
{"type": "section", "text": {"type": "mrkdwn", "text": f"*감지 항목:*\n{anomaly_text}"}},
{"type": "section", "text": {"type": "mrkdwn", "text": f"*AI 분석:*\n{analysis}"}},
]
}
await client.post(SLACK_URL, json=slack_msg)
print(f"✅ 이상 감지 알림 발송: {len(anomalies)}건")
# cron 또는 n8n에서 30분마다 실행
asyncio.run(detect_anomalies())실전 파이프라인 3종 완전 구현
📰 파이프라인 1 — 경쟁사 동향 자동 모니터링
import asyncio, httpx
from datetime import datetime
COMPETITORS = {
"경쟁사A": ["https://competitor-a.com/blog/feed", "https://competitor-a.com/news/feed"],
"경쟁사B": ["https://competitor-b.com/rss"],
}
OLLAMA_URL = "http://192.168.1.253:11434"
async def weekly_competitor_report():
"""매주 월요일 오전 8시 경쟁사 주간 동향 리포트"""
from collectors import collect_rss_feeds, save_to_postgres
all_items = []
for company, feeds in COMPETITORS.items():
items = await collect_rss_feeds(feeds)
for item in items:
item["competitor"] = company
all_items.extend(items)
save_to_postgres(all_items, "competitor_data")
# 주간 요약 생성
summaries = "\n\n".join([
f"제목: {item['title']}\n내용: {item['content'][:300]}"
for item in all_items[:20]
])
async with httpx.AsyncClient(timeout=120) as client:
resp = await client.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": "qwen2.5:14b",
"prompt": f"""경쟁사 주간 동향을 분석해서 경영진 보고용 리포트를 작성해줘.
수집 데이터:
{summaries}
리포트 형식:
1. 핵심 요약 (3문장)
2. 경쟁사별 주요 동향
3. 우리가 주목해야 할 포인트
4. 권장 대응 방안""",
"stream": False
}
)
report = resp.json()["response"]
print(f"주간 경쟁사 리포트:\n{report}")
return report
asyncio.run(weekly_competitor_report())💹 파이프라인 2 — 실시간 시장 데이터 수집 & AI 분석
import asyncio, httpx, psycopg2
from datetime import datetime, timezone
async def collect_market_data(symbols: list[str]) -> list[dict]:
"""무료 API로 시장 데이터 수집 (Yahoo Finance 비공식 API)"""
results = []
async with httpx.AsyncClient(timeout=15) as client:
for symbol in symbols:
try:
resp = await client.get(
f"https://query1.finance.yahoo.com/v8/finance/chart/{symbol}",
params={"interval": "1d", "range": "1mo"},
headers={"User-Agent": "Mozilla/5.0"}
)
data = resp.json()
quotes = data["chart"]["result"][0]
meta = quotes["meta"]
results.append({
"symbol": symbol,
"price": meta.get("regularMarketPrice", 0),
"change_pct": meta.get("regularMarketChangePercent", 0),
"volume": meta.get("regularMarketVolume", 0),
"currency": meta.get("currency", "USD"),
"collected_at": datetime.now(timezone.utc).isoformat(),
})
except Exception as e:
print(f"시장 데이터 수집 실패 {symbol}: {e}")
return results
async def ai_market_analysis(market_data: list[dict], news_sentiment: str) -> str:
"""시장 데이터 + 뉴스 감성 통합 AI 분석"""
market_text = "\n".join([
f"{d['symbol']}: ${d['price']:.2f} ({d['change_pct']:+.2f}%)"
for d in market_data
])
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.post(
"http://192.168.1.253:11434/api/generate",
json={
"model": "qwen2.5:7b",
"prompt": f"""현재 시장 데이터와 뉴스 감성을 종합 분석해줘:
시장 데이터:
{market_text}
최근 뉴스 감성: {news_sentiment}
분석 내용:
1. 현재 시장 분위기
2. 주목할 종목/섹터
3. 주의 신호
(투자 조언 아님, 정보 제공 목적)""",
"stream": False,
"options": {"temperature": 0.3}
}
)
return resp.json()["response"]
async def run_market_pipeline():
SYMBOLS = ["NVDA", "MSFT", "GOOGL", "META", "AMZN"]
market_data = await collect_market_data(SYMBOLS)
analysis = await ai_market_analysis(market_data, "중립적")
print(f"📈 시장 분석 완료:\n{analysis}")
asyncio.run(run_market_pipeline())💬 파이프라인 3 — 고객 리뷰 감성 분석 자동화
import asyncio, httpx, psycopg2
async def analyze_review_batch(reviews: list[str]) -> list[dict]:
"""리뷰 배치를 LLM으로 감성·이슈 분류"""
batch = "\n\n".join([f"[{i+1}] {r[:300]}" for i, r in enumerate(reviews[:15])])
async with httpx.AsyncClient(timeout=90) as client:
resp = await client.post(
"http://192.168.1.253:11434/api/generate",
json={
"model": "qwen2.5:7b",
"prompt": f"""다음 고객 리뷰들을 분석해서 JSON 배열로 출력해:
{batch}
JSON 배열 형식 (다른 텍스트 없이):
[
{{
"index": 1,
"sentiment": "positive/negative/neutral",
"score": 0.0~1.0,
"issue_category": "배송/품질/가격/CS/사용성/기타",
"key_complaint": "핵심 불만사항 또는 null",
"key_praise": "핵심 칭찬 또는 null",
"urgency": "high/medium/low",
"requires_response": true/false
}}
]""",
"stream": False,
"options": {"temperature": 0.1}
}
)
import json, re
raw = resp.json()["response"]
try:
json_match = re.search(r'\[.*\]', raw, re.DOTALL)
return json.loads(json_match.group()) if json_match else []
except:
return []
async def run_review_pipeline():
"""
실제 리뷰 수집은 스토어 API 또는 크롤링으로 구현
여기서는 예시 데이터로 시연
"""
sample_reviews = [
"배송이 너무 빨라서 깜짝 놀랐어요! 제품 품질도 정말 좋습니다.",
"포장이 너무 부실해서 제품이 파손되어 왔어요. 실망입니다.",
"가격 대비 성능이 나쁘지 않네요. 그냥 평범합니다.",
"고객센터 연결이 너무 어려워요. 30분 기다렸는데 연결이 안 됩니다.",
]
results = await analyze_review_batch(sample_reviews)
# 긴급 대응 필요 리뷰 필터링
urgent = [r for r in results if r.get("requires_response") and r.get("urgency") == "high"]
if urgent:
print(f"🚨 긴급 대응 필요 리뷰: {len(urgent)}건")
for r in urgent:
print(f" [{r['issue_category']}] {r.get('key_complaint', '')}")
# 일별 감성 통계 DB 저장
positive = sum(1 for r in results if r.get("sentiment") == "positive")
negative = sum(1 for r in results if r.get("sentiment") == "negative")
print(f"✅ 리뷰 분석: 긍정 {positive}건 / 부정 {negative}건 / 총 {len(results)}건")
asyncio.run(run_review_pipeline())- 6시간마다 AI 기술 뉴스 자동 수집 → LLM 분류·요약 → Grafana 대시보드 실시간 표시
- 경쟁사 블로그 변경 감지 → 자동 분석 리포트 → 슬랙 주간 요약 발송
- 고객 리뷰 급증 시 자동 감지 → 긴급 이슈 담당자 즉시 알림
- 시장 데이터 + 뉴스 감성 통합 → AI 종합 분석 리포트 자동 생성
- Airflow DAG로 모든 파이프라인 의존성 관리 + 실패 자동 재시도
