홈랩 AI 서버 가이드 5편: Qdrant 벡터DB와 Airflow 및 GPU 모니터링

편수5편 / 5편 (완결)
난이도⭐⭐⭐ 고급
다루는 내용Qdrant · RAG · 임베딩 · Grafana · Airflow · 데이터 파이프라인 · 트러블슈팅
섹션 수14개 섹션
🔍 Qdrant · RAG 🧮 임베딩 완전 정복 📊 Grafana 모니터링 🔄 Airflow ETL 🛠️ 트러블슈팅 데이터파이프라인 · BI · 할루시네이션 억제
5편은 홈랩 AI 서버를 실제로 지식을 검색하고 분석하는 지능형 시스템으로 완성합니다. 벡터 데이터베이스의 수학적 원리부터 Qdrant 프로덕션 배포, 임베딩 모델 선택, RAG 파이프라인 설계, 할루시네이션 억제까지 — 지능형 검색의 모든 것을 다룹니다. 이어서 Airflow로 데이터 파이프라인을 오케스트레이션하고, Grafana로 비즈니스 인사이트 대시보드를 구축합니다. 마지막으로 CUDA OOM·컨테이너 재시작·네트워크 이슈까지 — 홈랩 AI 서버 운영에서 만날 수 있는 모든 문제의 해결책을 이 한 편에 담았습니다.
SECTION01

벡터 데이터베이스 완전 이론 — 원리부터 수학까지

벡터 데이터베이스를 단순히 “AI용 특별한 DB”로만 이해하면 실제 운영에서 수많은 실수를 만납니다. 왜 벡터 검색이 일반 SQL 검색과 근본적으로 다른지, 그 수학적 원리를 이해해야 올바른 인덱스를 선택하고, 검색 품질을 진단하고, 성능 병목을 해결할 수 있습니다.

🧮 임베딩이란 무엇인가 — 의미를 숫자로 변환하는 원리

임베딩(Embedding)은 텍스트, 이미지, 오디오 등의 비정형 데이터를 고차원 실수 벡터 공간의 한 점으로 매핑하는 것입니다. 핵심은 “의미적으로 비슷한 내용은 벡터 공간에서 가까운 위치에 놓인다”는 것입니다. “강아지”와 “개”는 벡터 공간에서 매우 가깝고, “강아지”와 “양자역학”은 매우 멉니다.

⚡ 전문가 노트 — 임베딩 차원이 많을수록 무조건 좋은가?

일반적으로 높은 차원(768, 1536, 3072)은 더 풍부한 의미 표현이 가능하지만, 차원의 저주(Curse of Dimensionality) 문제가 발생합니다. 차원이 높아질수록 벡터 간 거리 분포가 균일해져 “가까운”과 “먼” 구분이 희미해집니다. 실제 RAG에서는 768~1024차원이 품질과 성능의 최적 균형점이며, 무작정 큰 모델을 쓰는 것이 항상 정답이 아닙니다.

📐 유사도 측정 방법 3가지 — 언제 무엇을 쓰는가

측정 방법수식특징최적 용도
코사인 유사도 RAG 표준cos(θ) = A·B / (|A||B|)벡터 방향만 비교. 크기 무관. -1~1 범위텍스트 의미 검색. 문서 길이가 다를 때
유클리드 거리d = √Σ(Aᵢ-Bᵢ)²공간상 실제 거리. 정규화되지 않은 벡터에 취약이미지 임베딩, 정규화된 벡터
내적 (Dot Product)A·B = Σ AᵢBᵢ방향 + 크기 모두 반영. 빠름단위 벡터로 정규화된 임베딩 (= 코사인과 동일)
⚡ 전문가 노트 — RAG에서 코사인 유사도가 표준인 이유

대부분의 텍스트 임베딩 모델은 L2 정규화(단위 벡터)로 출력합니다. 이 경우 코사인 유사도 = 내적이 되어 계산이 단순해집니다. 더 중요한 것은, 코사인 유사도는 문서 길이의 영향을 받지 않습니다. 1문장짜리 메모와 100페이지 PDF의 핵심 내용이 같다면 같은 유사도를 갖습니다. 이것이 RAG에서 코사인 유사도가 표준인 핵심 이유입니다.

⚙️ ANN 인덱스 알고리즘 — HNSW vs IVF vs FLAT

수백만 개의 벡터에서 가장 유사한 것을 찾으려면 모든 벡터와 비교(Exact Search, O(n))하는 것은 너무 느립니다. 근사 최근접 이웃(Approximate Nearest Neighbor, ANN) 알고리즘은 약간의 정확도를 희생해 검색 속도를 극적으로 개선합니다.

HNSW
검색 속도⚡⚡⚡⚡⚡
인덱스 빌드🔴 느림
메모리 사용🔴 높음
정확도 (Recall)🟢 95~99%
RAG 기본값 추천
IVF_FLAT
검색 속도⚡⚡⚡
인덱스 빌드🟡 보통
메모리 사용🟡 보통
정확도 (Recall)🟡 85~95%
대용량 데이터셋
FLAT (Exact)
검색 속도
인덱스 빌드🟢 즉시
메모리 사용🟢 최저
정확도 (Recall)🟢 100%
소규모 < 10만 건
⚡ 전문가 노트 — HNSW의 핵심 파라미터 m과 ef_construction

HNSW 인덱스의 품질은 두 파라미터로 결정됩니다. m(기본값 16)은 각 노드가 유지하는 연결 수입니다. m을 높이면 정확도가 올라가지만 메모리와 빌드 시간이 증가합니다. ef_construction(기본값 100)은 인덱스 빌드 시 탐색 범위입니다. 높을수록 정확한 인덱스가 만들어지지만 빌드 속도가 느립니다. 실전 RAG에서는 m=16, ef_construction=200을 권장합니다. 인덱스는 한 번만 빌드하므로 ef_construction은 넉넉하게 설정하는 것이 좋습니다.

SECTION02

벡터DB 완전 비교 & 선택 가이드

2026년 현재 오픈소스 벡터 DB 생태계는 Qdrant, ChromaDB, Weaviate, Milvus, pgvector 등 다양합니다. 각각의 장단점을 명확히 이해하지 않으면 나중에 마이그레이션 비용이 발생합니다.

DB언어인덱스필터 검색스케일메모리추천 상황
Qdrant 추천RustHNSW🟢 매우 강력🟢 분산 클러스터🟡 보통프로덕션, 필터 조합 검색, AI 서버 핵심 DB
ChromaDBPythonHNSW (hnswlib)🟡 기본 필터🔴 단일 노드만🟢 낮음프로토타이핑, 소규모, Open WebUI 기본값
pgvectorC (PostgreSQL 확장)HNSW / IVFFlat🟢 SQL 풀 지원🟡 PostgreSQL 수준🟢 낮음기존 PostgreSQL 인프라 활용, 관계형+벡터 혼합
WeaviateGoHNSW🟢 GraphQL 강력🟢 분산 지원🔴 높음멀티모달(텍스트+이미지), 엔터프라이즈
MilvusGo/C++다수 지원🟢 강력🟢 최고 수준🔴 매우 높음수십억 건 초대형 벡터, 기업급 클러스터
⚡ 전문가 노트 — Qdrant를 AI 서버 핵심으로 추천하는 이유

Qdrant는 Rust로 작성되어 메모리 안전성과 성능을 동시에 확보합니다. 가장 결정적인 차별점은 페이로드 기반 필터링입니다. 단순히 “가장 비슷한 벡터”를 찾는 것을 넘어, “category = ‘tech’ AND date > ‘2025-01-01′”이면서 가장 유사한 벡터를 찾는 필터+벡터 하이브리드 검색이 네이티브로 지원됩니다. ChromaDB는 이 필터 검색의 성능이 매우 낮고, Qdrant는 필터를 인덱스 수준에서 처리해 수백만 건에서도 밀리초 단위 응답이 가능합니다.

SECTION03

Qdrant 완전 설치 & 프로덕션 설정

yaml — ~/ai-server/database/docker-compose.yml (Qdrant + PostgreSQL)
services:

  # ── Qdrant 벡터 데이터베이스 ──────────────────────
  qdrant:
    image: qdrant/qdrant:latest
    container_name: qdrant
    restart: unless-stopped
    ports:
      - "6333:6333"   # HTTP REST API
      - "6334:6334"   # gRPC (고성능 조회용)
    volumes:
      - qdrant_storage:/qdrant/storage
      - ./qdrant_config.yaml:/qdrant/config/production.yaml
    environment:
      - QDRANT__SERVICE__API_KEY=your_very_secure_api_key_here
      - QDRANT__SERVICE__ENABLE_TLS=false
      - QDRANT__LOG_LEVEL=INFO
    networks:
      - ai_net

  # ── PostgreSQL (n8n + Dify 공유 DB) ──────────────
  postgresql:
    image: pgvector/pgvector:pg16  # pgvector 확장 포함
    container_name: postgresql
    restart: unless-stopped
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    environment:
      - POSTGRES_USER=aiserver
      - POSTGRES_PASSWORD=secure_password_here
      - POSTGRES_DB=aiserver_main
    networks:
      - ai_net

  # ── Redis (캐시 + 세션) ───────────────────────────
  redis:
    image: redis:7-alpine
    container_name: redis
    restart: unless-stopped
    ports:
      - "6379:6379"
    command: redis-server --requirepass your_redis_password
    volumes:
      - redis_data:/data
    networks:
      - ai_net

volumes:
  qdrant_storage:
  postgres_data:
  redis_data:

networks:
  ai_net:
    external: true
    name: ai_network
yaml — qdrant_config.yaml (프로덕션 최적화)
storage:
  storage_path: /qdrant/storage
  snapshots_path: /qdrant/snapshots

  # HNSW 인덱스 전역 기본값
  hnsw_index:
    m: 16                  # 연결 수 (높을수록 정확, 메모리↑)
    ef_construct: 200      # 빌드 시 탐색 범위 (높을수록 정확)
    full_scan_threshold: 10000  # 이 수 이하면 FLAT 검색

  # 성능 최적화
  on_disk_payload: false   # 페이로드 메모리 유지 (검색 속도↑)

service:
  host: 0.0.0.0
  http_port: 6333
  grpc_port: 6334
  max_request_size_mb: 64

  # 요청 제한 (DoS 방어)
  max_workers: 0           # 0 = CPU 코어 수 자동

telemetry_disabled: true   # 원격 측정 비활성화 (프라이버시)

🗄️ Qdrant 컬렉션 설계 & Python API 완전 활용

python — Qdrant 컬렉션 생성 및 벡터 삽입·검색
from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance, VectorParams, PointStruct,
    Filter, FieldCondition, MatchValue,
    HnswConfigDiff, OptimizersConfigDiff,
    PayloadSchemaType
)
from sentence_transformers import SentenceTransformer
import uuid

# 클라이언트 초기화
client = QdrantClient(
    host="192.168.1.253",
    port=6333,
    api_key="your_very_secure_api_key_here",
    timeout=60
)
embedder = SentenceTransformer("BAAI/bge-m3")

COLLECTION = "ai_knowledge_base"
VECTOR_DIM  = 1024  # bge-m3의 출력 차원

## ── 컬렉션 생성 (HNSW 최적화) ─────────────────────
client.recreate_collection(
    collection_name=COLLECTION,
    vectors_config=VectorParams(
        size=VECTOR_DIM,
        distance=Distance.COSINE,    # 코사인 유사도
        on_disk=False,               # 메모리 상주 (빠름)
    ),
    hnsw_config=HnswConfigDiff(
        m=16,
        ef_construct=200,
        full_scan_threshold=10_000,
        on_disk=False,
    ),
    optimizers_config=OptimizersConfigDiff(
        indexing_threshold=20_000,   # 이 수 이하면 인덱스 지연
        memmap_threshold=50_000,     # 메모리맵 전환 임계값
    ),
)

# 페이로드 인덱스 생성 (필터 성능 극대화)
client.create_payload_index(
    collection_name=COLLECTION,
    field_name="category",
    field_schema=PayloadSchemaType.KEYWORD,  # 정확히 일치 검색
)
client.create_payload_index(
    collection_name=COLLECTION,
    field_name="created_at",
    field_schema=PayloadSchemaType.DATETIME,
)

## ── 벡터 삽입 (배치 처리) ──────────────────────────
def upsert_documents(docs: list[dict], batch_size: int = 100):
    """문서 목록을 배치로 임베딩 후 Qdrant 삽입"""
    for i in range(0, len(docs), batch_size):
        batch    = docs[i:i + batch_size]
        texts    = [d["content"] for d in batch]
        vectors  = embedder.encode(texts, normalize_embeddings=True).tolist()

        points = [
            PointStruct(
                id=str(uuid.uuid4()),
                vector=vectors[j],
                payload={
                    "title":      batch[j].get("title", ""),
                    "content":    batch[j]["content"][:1000],  # 페이로드 크기 제한
                    "source":     batch[j].get("source", ""),
                    "category":   batch[j].get("category", "general"),
                    "created_at": batch[j].get("created_at", ""),
                    "chunk_index": batch[j].get("chunk_index", 0),
                }
            )
            for j in range(len(batch))
        ]
        client.upsert(collection_name=COLLECTION, points=points)
        print(f"✅ 삽입 완료: {i+len(batch)}/{len(docs)}")

## ── 벡터 검색 (필터 + 유사도 하이브리드) ──────────
def search(query: str, category: str = None, top_k: int = 5) -> list:
    query_vec = embedder.encode([query], normalize_embeddings=True).tolist()[0]

    # 카테고리 필터 조건
    search_filter = None
    if category:
        search_filter = Filter(
            must=[FieldCondition(
                key="category",
                match=MatchValue(value=category)
            )]
        )

    results = client.search(
        collection_name=COLLECTION,
        query_vector=query_vec,
        query_filter=search_filter,
        limit=top_k,
        with_payload=True,
        score_threshold=0.5,    # 이 점수 이하 결과 제외
    )
    return [
        {
            "score":   round(r.score, 4),
            "title":   r.payload.get("title"),
            "content": r.payload.get("content"),
            "source":  r.payload.get("source"),
        }
        for r in results
    ]

# 사용 예시
results = search("Docker 컨테이너 네트워크 설정", category="tech", top_k=5)
for r in results:
    print(f"[{r['score']}] {r['title']}\n{r['content'][:100]}...")

## ── 스냅샷 백업 ─────────────────────────────────────
snapshot_info = client.create_snapshot(collection_name=COLLECTION)
print(f"스냅샷 생성: {snapshot_info.name}")
SECTION04

임베딩 모델 완전 정복

RAG 시스템에서 임베딩 모델 선택은 검색 품질의 80%를 결정합니다. 아무리 좋은 LLM을 써도 임베딩 모델이 잘못 선택되면 검색 결과가 엉망이 되어 할루시네이션이 폭발합니다. 특히 한국어 문서를 다룬다면 모델 선택이 더욱 중요합니다.

🏆 2026년 기준 한국어 임베딩 모델 성능 비교

모델차원한국어 성능VRAM속도라이선스추천도
BAAI/bge-m31024🟢 최고 수준~2GB🟡 보통MIT1순위 추천
intfloat/multilingual-e5-large1024🟢 우수~2GB🟡 보통MIT2순위
nomic-embed-text768🟡 양호~500MB🟢 빠름Apache 2.0경량 추천
OpenAI text-embedding-3-small1536🟢 우수API🟢 빠름상업용API 사용 시
OpenAI text-embedding-3-large3072🟢 최고API🟡 보통상업용최고 품질
jhgan/ko-sroberta-multitask768🟢 한국어 특화~500MB🟢 빠름Apache 2.0한국어 전용
⚡ 전문가 노트 — Dense vs Sparse vs Hybrid 임베딩

Dense 임베딩(위의 모든 모델)은 문장 전체를 하나의 연속 벡터로 표현합니다. 의미적 유사성 검색에 강하지만 정확한 키워드(예: 버전 번호 “v2.4.1”, 고유명사 “김철수”)를 놓치는 단점이 있습니다. Sparse 임베딩(BM25, SPLADE)은 전통적인 키워드 기반으로 정확한 단어 매칭에 강합니다. Hybrid 검색은 두 가지를 결합해 가중 평균으로 랭킹하는 방식으로, 실제 프로덕션에서 검색 품질이 Dense만 쓸 때보다 20~40% 향상됩니다. Qdrant는 이 Hybrid 검색을 네이티브로 지원하는 몇 안 되는 벡터DB입니다.

🔀 Qdrant Hybrid 검색 구현 (Dense + BM25)

python — Qdrant Hybrid 검색 (Dense + Sparse 결합)
from qdrant_client.models import (
    NamedVector, NamedSparseVector,
    SparseVector, SparseVectorParams,
    VectorParams, Distance
)
from fastembed import SparseTextEmbedding  # pip install fastembed

# Hybrid 컬렉션 생성 (Dense + Sparse 동시 저장)
client.recreate_collection(
    collection_name="hybrid_knowledge",
    vectors_config={
        "dense": VectorParams(size=1024, distance=Distance.COSINE),
    },
    sparse_vectors_config={
        "sparse": SparseVectorParams(),  # SPLADE/BM25용 희소 벡터
    }
)

sparse_model = SparseTextEmbedding(model_name="prithivida/Splade_PP_en_v1")

def upsert_hybrid(docs: list[dict]):
    """Dense + Sparse 동시 임베딩 후 삽입"""
    texts = [d["content"] for d in docs]

    dense_vecs  = embedder.encode(texts, normalize_embeddings=True)
    sparse_vecs = list(sparse_model.embed(texts))

    points = []
    for i, doc in enumerate(docs):
        sv = sparse_vecs[i]
        points.append(PointStruct(
            id=str(uuid.uuid4()),
            vector={
                "dense":  dense_vecs[i].tolist(),
                "sparse": SparseVector(
                    indices=sv.indices.tolist(),
                    values=sv.values.tolist()
                )
            },
            payload={"content": doc["content"], "title": doc.get("title", "")}
        ))
    client.upsert(collection_name="hybrid_knowledge", points=points)

def hybrid_search(query: str, top_k: int = 5):
    """Dense + Sparse 결합 검색 (RRF 랭킹)"""
    dense_vec   = embedder.encode([query], normalize_embeddings=True)[0].tolist()
    sparse_vecs = list(sparse_model.embed([query]))
    sv = sparse_vecs[0]

    from qdrant_client.models import Prefetch, FusionQuery, Fusion

    results = client.query_points(
        collection_name="hybrid_knowledge",
        prefetch=[
            Prefetch(query=dense_vec, using="dense",  limit=top_k * 2),
            Prefetch(
                query=SparseVector(
                    indices=sv.indices.tolist(),
                    values=sv.values.tolist()
                ),
                using="sparse", limit=top_k * 2
            ),
        ],
        query=FusionQuery(fusion=Fusion.RRF),  # Reciprocal Rank Fusion
        limit=top_k,
        with_payload=True
    )
    return results.points
SECTION05

프로덕션 RAG 파이프라인 완전 설계

처음 RAG를 구현하는 대부분의 개발자는 “문서 청킹 → 임베딩 → 벡터 검색 → LLM 답변” 이라는 단순한 흐름을 사용합니다. 이것을 Naive RAG라고 합니다. 실제 프로덕션에서 Naive RAG는 검색 정확도가 낮고, 긴 문서를 처리하지 못하며, 관련 없는 청크가 섞이는 문제를 일으킵니다.

🔍 Naive RAG vs Advanced RAG vs Modular RAG

단계Naive RAGAdvanced RAGModular RAG
문서 처리고정 크기 청킹Semantic 청킹 + 계층 구조파서별 특화 처리기
인덱싱단일 벡터 인덱스Parent-Child 청크 + 키워드 인덱스멀티 인덱스 + 그래프
검색Top-K 유사도 검색하이브리드 + 재랭킹쿼리 분해 + 반복 검색
생성단순 컨텍스트 주입LLM 기반 압축 + CoT에이전트 기반 동적 생성
검색 품질~60%~80~85%~90%+

🏗️ Advanced RAG 파이프라인 완전 구현

python — 프로덕션 Advanced RAG (Parent-Child + 재랭킹)
from langchain_text_splitters import (
    RecursiveCharacterTextSplitter,
    MarkdownHeaderTextSplitter
)
from langchain_ollama import ChatOllama, OllamaEmbeddings
from langchain_community.vectorstores import Qdrant as LCQdrant
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain.schema import Document
import re

## ══ 1단계: 문서 처리 — Semantic 청킹 ══════════════

def smart_chunk_markdown(text: str, source: str) -> list[Document]:
    """마크다운 구조 기반 계층적 청킹"""

    # 1차: 마크다운 헤더 기준 분할 (대분류)
    md_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=[
        ("#",  "h1"),
        ("##", "h2"),
        ("###","h3"),
    ])
    sections = md_splitter.split_text(text)

    # 2차: 섹션별 세밀 청킹 (실제 벡터화 단위)
    char_splitter = RecursiveCharacterTextSplitter(
        chunk_size=800,
        chunk_overlap=100,
        separators=["\n\n", "\n", "。", ". ", " ", ""],
    )
    chunks = []
    for sec in sections:
        sub_chunks = char_splitter.split_documents([sec])
        for i, chunk in enumerate(sub_chunks):
            chunk.metadata.update({
                "source":      source,
                "chunk_index": i,
                "parent_id":   sec.metadata.get("h1", "") + sec.metadata.get("h2", ""),
            })
            chunks.append(chunk)
    return chunks

## ══ 2단계: Parent-Child 리트리버 ══════════════════

# 부모 청크: 큰 맥락 보존용 (1500 토큰)
parent_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1500, chunk_overlap=200
)
# 자식 청크: 정밀 검색용 (300 토큰)
child_splitter = RecursiveCharacterTextSplitter(
    chunk_size=300, chunk_overlap=50
)

embeddings = OllamaEmbeddings(
    base_url="http://192.168.1.253:11434",
    model="nomic-embed-text"
)

# 자식 청크는 벡터 DB에, 부모 청크는 로컬 스토어에
vectorstore    = LCQdrant.from_existing_collection(
    embedding=embeddings,
    collection_name="rag_children",
    url="http://192.168.1.253:6333",
    api_key="your_api_key"
)
docstore = InMemoryStore()   # 프로덕션에서는 Redis로 교체

retriever = ParentDocumentRetriever(
    vectorstore=vectorstore,
    docstore=docstore,
    child_splitter=child_splitter,
    parent_splitter=parent_splitter,
    search_kwargs={"k": 10},  # 후보를 충분히 가져온 뒤 재랭킹
)

## ══ 3단계: 쿼리 확장 + 재랭킹 ════════════════════

llm = ChatOllama(
    base_url="http://192.168.1.253:11434",
    model="qwen2.5:14b",
    temperature=0
)

def expand_query(query: str) -> list[str]:
    """LLM으로 쿼리 재작성 — 다양한 관점에서 검색 강화"""
    prompt = f"""다음 질문을 서로 다른 3가지 방식으로 재작성해줘. 
    같은 의미지만 다른 표현과 관점을 사용해.
    각 질문을 줄바꿈으로 구분해서 3개만 출력해.
    
    원본 질문: {query}"""
    result   = llm.invoke(prompt)
    rewrites = [q.strip() for q in result.content.split('\n') if q.strip()]
    return [query] + rewrites[:3]  # 원본 + 3가지 재작성

def cross_encoder_rerank(query: str, docs: list, top_n: int = 5) -> list:
    """Cross-Encoder로 최종 재랭킹 (더 정밀한 관련성 평가)"""
    from sentence_transformers import CrossEncoder
    reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")

    pairs  = [(query, doc.page_content) for doc in docs]
    scores = reranker.predict(pairs)
    ranked = sorted(zip(docs, scores), key=lambda x: x[1], reverse=True)
    return [doc for doc, _ in ranked[:top_n]]

## ══ 4단계: 최종 RAG Chain ═════════════════════════

def rag_answer(question: str) -> dict:
    """완전한 Advanced RAG 실행"""

    # 쿼리 확장
    queries     = expand_query(question)

    # 다중 쿼리로 검색 (더 많은 관련 문서 수집)
    all_docs = []
    seen_ids = set()
    for q in queries:
        docs = retriever.get_relevant_documents(q)
        for doc in docs:
            doc_id = hash(doc.page_content)
            if doc_id not in seen_ids:
                all_docs.append(doc)
                seen_ids.add(doc_id)

    # 재랭킹으로 최종 상위 5개 선택
    top_docs = cross_encoder_rerank(question, all_docs, top_n=5)

    # 컨텍스트 압축 (LLM으로 관련 부분만 추출)
    context = "\n\n---\n\n".join([d.page_content for d in top_docs])

    # 최종 답변 생성
    prompt = f"""당신은 정확하고 신뢰할 수 있는 AI 어시스턴트입니다.
아래 제공된 컨텍스트만을 사용해서 질문에 답하세요.
컨텍스트에 없는 내용은 절대 추측하거나 지어내지 마세요.
모르는 경우 "제공된 문서에서 해당 정보를 찾을 수 없습니다."라고 말하세요.

컨텍스트:
{context}

질문: {question}

답변:"""

    answer = llm.invoke(prompt)
    return {
        "answer":    answer.content,
        "sources":   [d.metadata.get("source", "") for d in top_docs],
        "num_docs":  len(top_docs),
        "queries":   queries,
    }

# 실행
result = rag_answer("Docker 컨테이너에서 GPU를 사용하려면 어떻게 설정해야 하나요?")
print(result["answer"])
print(f"\n참고 출처: {result['sources']}")

✂️ 청킹 전략 완전 가이드

전략방식장점단점적합 문서
Fixed Size토큰 수 기준 고정 분할구현 단순, 예측 가능의미 단위 파괴 가능균일한 구조 문서
Recursive구분자 계층적 분할문장/단락 경계 존중청크 크기 불균일일반 텍스트, 범용
Semantic의미 변화점 기준 분할최고 검색 품질처리 속도 느림, 비쌈학술 논문, 기술 문서
Markdown Header제목 태그 기준 분할구조 보존 완벽마크다운 전용기술 문서, 위키
Parent-Child대/소 두 단계 계층검색 정밀도 + 컨텍스트 풍부저장 공간 2배복잡한 긴 문서
SECTION06

RAG 품질 평가 & 할루시네이션 억제

“RAG가 잘 동작하는 것 같다”는 주관적 평가로는 프로덕션 신뢰성을 보장할 수 없습니다. RAGAS(RAG Assessment) 같은 객관적 평가 프레임워크로 수치화하고 지속적으로 모니터링해야 합니다.

📊 RAGAS 4가지 핵심 평가 지표

Faithfulness (충실도)
답변이 검색된 컨텍스트에만 근거하는가? 컨텍스트에 없는 내용을 지어냈으면 낮은 점수. 할루시네이션 탐지의 핵심 지표. 목표: 0.85 이상
Answer Relevancy (답변 관련성)
생성된 답변이 질문에 얼마나 직접적으로 관련되는가? 장황하거나 주제를 벗어난 답변을 탐지. 목표: 0.80 이상
Context Precision (컨텍스트 정밀도)
검색된 컨텍스트 중 실제로 답변에 관련된 청크의 비율. 낮으면 불필요한 청크가 너무 많이 검색됨. 목표: 0.70 이상
Context Recall (컨텍스트 재현율)
정답 생성에 필요한 정보가 검색된 컨텍스트에 얼마나 포함됐는가? 낮으면 검색 품질 문제. 목표: 0.75 이상
python — RAGAS 자동 평가 파이프라인
from ragas import evaluate
from ragas.metrics import (
    faithfulness, answer_relevancy,
    context_precision, context_recall
)
from datasets import Dataset

# 평가 데이터셋 (질문 + 정답 + RAG 답변 + 검색 컨텍스트)
eval_data = {
    "question": [
        "Docker에서 GPU를 사용하려면 무엇이 필요한가?",
        "Qdrant의 HNSW 파라미터 m의 역할은?",
    ],
    "answer": [
        "NVIDIA Container Toolkit을 설치하고 docker run --gpus all 옵션을 사용해야 합니다.",
        "m 파라미터는 HNSW 그래프에서 각 노드의 연결 수를 결정하며, 높을수록 검색 정확도가 올라가지만 메모리를 더 많이 사용합니다.",
    ],
    "contexts": [
        ["NVIDIA Container Toolkit은 Docker 컨테이너에서 GPU를 사용하기 위한 런타임입니다..."],
        ["HNSW 알고리즘의 m 파라미터는 각 노드가 유지하는 최대 연결 수입니다..."],
    ],
    "ground_truth": [
        "NVIDIA Container Toolkit 설치 필요",
        "m은 각 노드의 연결 수로 정확도와 메모리 트레이드오프",
    ],
}

dataset = Dataset.from_dict(eval_data)
result  = evaluate(
    dataset,
    metrics=[
        faithfulness,
        answer_relevancy,
        context_precision,
        context_recall,
    ],
)
print(result)
# 출력 예시:
# {'faithfulness': 0.89, 'answer_relevancy': 0.84,
#  'context_precision': 0.76, 'context_recall': 0.81}

🛡️ 할루시네이션 억제 전략 5가지

1
시스템 프롬프트에 강력한 제약 삽입
“제공된 컨텍스트에 없는 정보는 절대 사용하지 마세요. 모르면 ‘정보를 찾을 수 없습니다’라고 답하세요.” 이 한 줄이 할루시네이션을 30~50% 감소시킵니다.
2
점수 임계값(Score Threshold) 설정
Qdrant 검색 시 score_threshold=0.5 이상인 결과만 사용. 관련 없는 문서를 컨텍스트로 주는 것이 오히려 없는 것보다 나쁠 수 있습니다.
3
컨텍스트 압축 (LLM Extractor)
검색된 청크에서 질문과 관련된 부분만 LLM으로 추출. 긴 컨텍스트가 오히려 LLM을 혼란시키는 “Lost in the Middle” 문제를 방지.
4
출처 인용 강제화
프롬프트에 “답변에서 각 사실마다 [출처: 문서명]을 반드시 표시하세요”를 추가. AI가 출처를 명시하면 근거 없는 내용을 쉽게 감지할 수 있습니다.
5
Guardrails — 답변 후처리 검증
생성된 답변의 각 사실 주장을 검색 컨텍스트와 다시 비교하는 별도의 검증 LLM을 추가. NeMo Guardrails, Guidance 라이브러리 활용.
SECTION07

Grafana + Prometheus GPU 모니터링

AI 서버는 24시간 GPU를 풀로 돌리는 경우가 많습니다. VRAM이 조용히 누수되거나 GPU 온도가 위험 수준에 도달해도 모르고 있다면 하드웨어 손상이나 서비스 중단으로 이어질 수 있습니다. Grafana 대시보드로 모든 지표를 실시간 시각화하고 이상 감지 시 알림을 받습니다.

yaml — ~/ai-server/monitoring/docker-compose.yml
services:

  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    restart: unless-stopped
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.retention.time=30d'
      - '--web.enable-lifecycle'
    networks:
      - ai_net

  grafana:
    image: grafana/grafana:latest
    container_name: grafana
    restart: unless-stopped
    ports:
      - "3002:3000"
    volumes:
      - grafana_data:/var/lib/grafana
      - ./grafana/dashboards:/etc/grafana/provisioning/dashboards
      - ./grafana/datasources:/etc/grafana/provisioning/datasources
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=secure_grafana_password
      - GF_USERS_ALLOW_SIGN_UP=false
      - GF_SMTP_ENABLED=true                    # 이메일 알림
      - GF_SMTP_HOST=smtp.gmail.com:587
    networks:
      - ai_net

  # NVIDIA GPU 메트릭 수집기
  nvidia-exporter:
    image: utkuozdemir/nvidia_gpu_exporter:1.2.0
    container_name: nvidia_exporter
    restart: unless-stopped
    ports:
      - "9835:9835"
    devices:
      - /dev/nvidiactl
      - /dev/nvidia0
    volumes:
      - /usr/lib/x86_64-linux-gnu/libnvidia-ml.so.1:/usr/lib/x86_64-linux-gnu/libnvidia-ml.so.1
    networks:
      - ai_net

  # 시스템 메트릭 수집기 (CPU/RAM/Disk/Network)
  node-exporter:
    image: prom/node-exporter:latest
    container_name: node_exporter
    restart: unless-stopped
    ports:
      - "9100:9100"
    volumes:
      - /proc:/host/proc:ro
      - /sys:/host/sys:ro
      - /:/rootfs:ro
    command:
      - '--path.procfs=/host/proc'
      - '--path.sysfs=/host/sys'
      - '--collector.filesystem.ignored-mount-points=^/(sys|proc|dev|host|etc)($$|/)'
    networks:
      - ai_net

  # Docker 컨테이너 메트릭
  cadvisor:
    image: gcr.io/cadvisor/cadvisor:latest
    container_name: cadvisor
    restart: unless-stopped
    ports:
      - "8080:8080"
    volumes:
      - /:/rootfs:ro
      - /var/run:/var/run:ro
      - /sys:/sys:ro
      - /var/lib/docker/:/var/lib/docker:ro
    networks:
      - ai_net

volumes:
  prometheus_data:
  grafana_data:

networks:
  ai_net:
    external: true
    name: ai_network
yaml — prometheus.yml (스크레이프 설정)
global:
  scrape_interval:     15s
  evaluation_interval: 15s

rule_files:
  - "alert_rules.yml"

alerting:
  alertmanagers:
    - static_configs:
        - targets: ['alertmanager:9093']

scrape_configs:
  - job_name: 'nvidia-gpu'
    static_configs:
      - targets: ['nvidia-exporter:9835']

  - job_name: 'node'
    static_configs:
      - targets: ['node-exporter:9100']

  - job_name: 'docker-containers'
    static_configs:
      - targets: ['cadvisor:8080']

  - job_name: 'ollama'
    metrics_path: '/api/metrics'
    static_configs:
      - targets: ['ollama:11434']

🚨 핵심 알림 규칙 설정

yaml — alert_rules.yml (GPU + 시스템 알림)
groups:
  - name: ai_server_alerts
    rules:

      # GPU 온도 경고 (85도 이상)
      - alert: GPUTemperatureHigh
        expr: nvidia_smi_temperature_gpu > 85
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "GPU 온도 위험 수준"
          description: "GPU 온도가 {{ $value }}°C에 도달했습니다. 냉각 확인 필요."

      # VRAM 사용률 95% 이상
      - alert: GPUMemoryCritical
        expr: (nvidia_smi_memory_used_bytes / nvidia_smi_memory_total_bytes) * 100 > 95
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "VRAM 거의 소진"
          description: "VRAM 사용률이 {{ $value }}%입니다."

      # RAM 사용률 90% 이상
      - alert: HighMemoryUsage
        expr: (1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) * 100 > 90
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "시스템 RAM 부족"
          description: "메모리 사용률이 {{ $value }}%입니다."

      # 디스크 사용률 85% 이상
      - alert: DiskSpaceLow
        expr: (1 - (node_filesystem_free_bytes{mountpoint="/"} / node_filesystem_size_bytes{mountpoint="/"})) * 100 > 85
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "디스크 공간 부족"
          description: "루트 파티션 사용률이 {{ $value }}%입니다."

      # 컨테이너 재시작 감지
      - alert: ContainerRestarting
        expr: rate(container_last_seen{name!=""}[5m]) > 0 AND changes(container_last_seen{name!=""}[10m]) > 2
        labels:
          severity: warning
        annotations:
          summary: "컨테이너 {{ $labels.name }} 반복 재시작 감지"

📊 Grafana 핵심 대시보드 패널 구성

패널명PromQL 쿼리시각화
GPU 사용률nvidia_smi_utilization_gpu_ratio * 100Gauge + Time Series
GPU 온도nvidia_smi_temperature_gpuGauge (임계값 색상)
VRAM 사용량nvidia_smi_memory_used_bytes / 1024^3Bar Gauge
GPU 전력 소비nvidia_smi_power_draw_wattsTime Series
CPU 사용률100 - avg(rate(node_cpu_seconds_total{mode="idle"}[1m])) * 100Time Series
RAM 사용량(node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / 1024^3Time Series
Ollama 활성 모델ollama_models_loaded_totalStat Panel
컨테이너 상태count(container_last_seen) by (name)Table
SECTION08

AI 서버 트러블슈팅 완전 매뉴얼

AI 서버에서 발생하는 대부분의 문제는 패턴이 있습니다. 증상별로 원인과 해결 방법을 체계적으로 정리했습니다. 이 섹션을 북마크해두고 문제 발생 시 바로 참조하세요.

🎮 GPU 관련 문제

에러 1 — CUDA out of memory torch.cuda.OutOfMemoryError: CUDA out of memory. Tried to allocate 2.00 GiB
진단
VRAM 사용 현황 확인
nvidia-smi → 어떤 프로세스가 VRAM을 점유 중인지 확인. Ollama + ComfyUI + A1111 동시 실행 시 VRAM 합산 초과 자주 발생.
해결
서비스 우선순위 조정
이미지 생성 중에는 Ollama 큰 모델 언로드: docker exec ollama ollama stop qwen2.5:14b. 또는 ComfyUI에서 “Free Memory” 버튼 클릭 후 재실행.
에러 2 — GPU not detected in Docker Error: could not select device driver “” with capabilities: [[gpu]]
bash — GPU Docker 인식 문제 진단 스크립트
#!/bin/bash
# AI 서버 GPU 진단 스크립트
echo "=== GPU 드라이버 확인 ==="
nvidia-smi && echo "✅ 드라이버 정상" || echo "❌ 드라이버 없음"

echo "=== NVIDIA Container Toolkit 확인 ==="
nvidia-ctk --version && echo "✅ Toolkit 설치됨" || echo "❌ Toolkit 미설치"

echo "=== Docker 런타임 설정 확인 ==="
cat /etc/docker/daemon.json | python3 -m json.tool

echo "=== Docker GPU 직접 테스트 ==="
docker run --rm --gpus all ubuntu nvidia-smi && echo "✅ GPU 정상" || echo "❌ GPU 실패"

echo "=== 해결 시도 ==="
sudo nvidia-ctk runtime configure --runtime=docker
sudo systemctl restart docker
echo "Docker 재시작 완료. 다시 테스트하세요."

🤖 Ollama / LLM 관련 문제

증상원인해결
추론 속도가 갑자기 10배 느려짐VRAM 초과로 CPU 폴백ollama ps로 확인 → 더 작은 모델 사용
모델이 응답 중 멈춤컨텍스트 길이 초과Open WebUI 채팅 새로 시작 또는 num_ctx 줄이기
API 타임아웃 오류Nginx 타임아웃 설정 부족proxy_read_timeout 300s로 늘리기
모델 다운로드 중 끊김디스크 공간 부족 또는 네트워크df -h 확인 → docker system prune
한국어 답변 품질 저하임베딩 모델이 한국어 비최적화bge-m3 또는 ko-sroberta로 교체

🗄️ Qdrant / RAG 관련 문제

증상원인해결
RAG 검색이 관련 없는 문서 반환청크 크기 부적절 또는 임베딩 모델 불일치청킹 전략 재검토, 임베딩 모델 일치 확인
Qdrant 404 컬렉션 없음 오류컬렉션 미생성 또는 이름 불일치GET /collections로 목록 확인 후 재생성
임베딩 후 검색 품질 급락임베딩 모델 버전 변경으로 벡터 불일치컬렉션 삭제 후 동일 모델로 전체 재임베딩
Qdrant 메모리 사용량 급증인덱스 미최적화 상태POST /collections/{name}/index로 강제 최적화
score_threshold 이하 결과만 반환임베딩 모델-컬렉션 차원 불일치벡터 차원 확인: GET /collections/{name}

🐳 Docker / 컨테이너 문제

bash — AI 서버 전체 상태 진단 스크립트
#!/bin/bash
# ~/ai-server/scripts/health_check.sh

echo "╔═══════════════════════════════════════╗"
echo "║       AI 서버 헬스체크 리포트          ║"
echo "╚═══════════════════════════════════════╝"
echo ""

# GPU 상태
echo "🎮 GPU 상태"
nvidia-smi --query-gpu=name,temperature.gpu,utilization.gpu,memory.used,memory.total \
  --format=csv,noheader | awk -F',' '{
    printf "  모델: %s | 온도: %s°C | GPU: %s | VRAM: %s / %s\n",
    $1, $2, $3, $4, $5
}'
echo ""

# 컨테이너 상태
echo "🐳 컨테이너 상태"
docker ps --format "  {{.Names}}\t{{.Status}}\t{{.Ports}}" | column -t
echo ""

# 디스크 사용량
echo "💾 디스크 사용량"
df -h / /home 2>/dev/null | grep -v Filesystem | awk '{printf "  %s: %s / %s (%s)\n", $6, $3, $2, $5}'
echo ""

# 메모리
echo "🧠 메모리"
free -h | awk '/^Mem/{printf "  사용: %s / 전체: %s (여유: %s)\n", $3, $2, $7}'
echo ""

# 핵심 서비스 응답 확인
echo "🌐 서비스 응답 확인"
services=(
  "Ollama:http://localhost:11434/api/tags"
  "OpenWebUI:http://localhost:3000/health"
  "Qdrant:http://localhost:6333/healthz"
  "n8n:http://localhost:5678/healthz"
)
for svc in "${services[@]}"; do
  name="${svc%%:*}"
  url="${svc#*:}"
  code=$(curl -s -o /dev/null -w "%{http_code}" --connect-timeout 3 "$url")
  [ "$code" = "200" ] && status="✅" || status="❌"
  echo "  $status $name ($code)"
done

echo ""
echo "완료: $(date '+%Y-%m-%d %H:%M:%S')"

✅ 프로덕션 운영 최종 체크리스트

  • Qdrant 스냅샷 자동 백업 (cron 매일 새벽 3시)
  • Docker 볼륨 전체 백업 스크립트 설정
  • Grafana 알림 — Slack/텔레그램/이메일 연동 완료
  • GPU 온도 임계값 알림 설정 (85°C 경고, 90°C 긴급)
  • RAGAS 정기 평가 — 월 1회 RAG 품질 수치 측정
  • Fail2ban 차단 현황 주간 점검
  • Docker 이미지 월간 업데이트 (docker compose pull)
  • AI 모델 업데이트 주시 — Ollama 새 모델 릴리즈 추적
  • !NVIDIA 드라이버 업데이트 주의 — 반드시 테스트 환경 먼저 검증
  • !임베딩 모델 교체 시 반드시 전체 벡터 재인덱싱 필요
  • !Open WebUI API 키가 노출되지 않도록 환경변수 관리
SECTIONE1

AI 데이터 파이프라인 아키텍처 설계

현대적 데이터 파이프라인은 단순히 데이터를 옮기는 것이 아닙니다. AI가 데이터의 의미를 이해하고 정제·분류·분석하는 인텔리전트 파이프라인이 핵심입니다. 모든 구성요소를 AI 서버 내 Docker로 운영해 외부 데이터 유출 없이 완전한 프라이빗 환경을 유지합니다.

🏗️ 전체 파이프라인 플로우

🌐
수집
API·스크래핑·RSS
🧹
정제
AI ETL·정규화
💾
저장
PostgreSQL·Qdrant
🤖
AI 분석
LLM·분류·요약
📊
시각화
Grafana·대시보드
🔔
알림
Slack·이메일

🛠️ 도구 선택 매트릭스

레이어경량 (소규모)중간 (팀 운영)대형 (엔터프라이즈)
오케스트레이션n8n 스케줄Apache AirflowPrefect / Dagster
데이터 수집Python requestsScrapy + PlaywrightApache Kafka
변환 (ETL)pandasdbt + pandasApache Spark
저장소 (OLTP)SQLitePostgreSQL + TimescaleDBSnowflake
벡터 저장ChromaDBQdrantPinecone
BI 시각화GrafanaGrafana + MetabaseTableau
AI 분석OllamavLLM + LangChainvLLM 클러스터
SECTIONE2

다중 소스 데이터 자동 수집

python — 다중 소스 통합 수집기 (뉴스·RSS·API·웹)
import asyncio, httpx, feedparser
from datetime import datetime, timezone
from typing import AsyncGenerator
import psycopg2

DB_CONFIG = {
    "host": "192.168.1.253", "port": 5432,
    "dbname": "aiserver_main",
    "user": "aiserver", "password": "secure_password"
}

async def collect_rss_feeds(feeds: list[str]) -> list[dict]:
    """RSS/Atom 피드 병렬 수집"""
    async def fetch_feed(url: str) -> list[dict]:
        try:
            async with httpx.AsyncClient(timeout=15) as client:
                resp = await client.get(url)
            parsed = feedparser.parse(resp.text)
            items = []
            for entry in parsed.entries[:20]:
                items.append({
                    "title":       entry.get("title", ""),
                    "content":     entry.get("summary", entry.get("description", "")),
                    "url":         entry.get("link", ""),
                    "published":   entry.get("published", datetime.now(timezone.utc).isoformat()),
                    "source":      parsed.feed.get("title", url),
                    "source_type": "rss",
                })
            return items
        except Exception as e:
            print(f"RSS 수집 실패 {url}: {e}")
            return []

    results = await asyncio.gather(*[fetch_feed(url) for url in feeds])
    return [item for sublist in results for item in sublist]

async def collect_api_data(endpoints: list[dict]) -> list[dict]:
    """REST API 엔드포인트 데이터 수집"""
    results = []
    async with httpx.AsyncClient(timeout=30) as client:
        for ep in endpoints:
            try:
                resp = await client.get(
                    ep["url"],
                    headers=ep.get("headers", {}),
                    params=ep.get("params", {})
                )
                data = resp.json()
                # 각 API별 데이터 정규화
                items = ep.get("parser", lambda x: x)(data)
                for item in items:
                    item["source_type"] = "api"
                    item["source"]      = ep.get("name", ep["url"])
                results.extend(items)
            except Exception as e:
                print(f"API 수집 실패 {ep['url']}: {e}")
    return results

async def collect_web_content(urls: list[str]) -> list[dict]:
    """웹페이지 본문 추출 (Playwright)"""
    try:
        from playwright.async_api import async_playwright
    except ImportError:
        print("pip install playwright && playwright install chromium")
        return []

    results = []
    async with async_playwright() as pw:
        browser = await pw.chromium.launch(headless=True)
        context = await browser.new_context(
            user_agent="Mozilla/5.0 (compatible; AIServerBot/1.0)"
        )
        for url in urls:
            try:
                page = await context.new_page()
                await page.goto(url, wait_until="domcontentloaded", timeout=30000)
                title   = await page.title()
                content = await page.inner_text("article, main, .content, body")
                results.append({
                    "title": title, "content": content[:5000],
                    "url": url, "source_type": "web",
                    "collected_at": datetime.now(timezone.utc).isoformat()
                })
                await page.close()
            except Exception as e:
                print(f"웹 수집 실패 {url}: {e}")
        await browser.close()
    return results

def save_to_postgres(items: list[dict], table: str = "raw_data"):
    """수집 데이터 PostgreSQL 저장 (중복 URL 무시)"""
    conn = psycopg2.connect(**DB_CONFIG)
    cur  = conn.cursor()

    # 테이블 자동 생성
    cur.execute(f"""
        CREATE TABLE IF NOT EXISTS {table} (
            id          SERIAL PRIMARY KEY,
            title       TEXT,
            content     TEXT,
            url         TEXT UNIQUE,
            source      TEXT,
            source_type TEXT,
            published   TIMESTAMPTZ,
            collected_at TIMESTAMPTZ DEFAULT NOW(),
            processed   BOOLEAN DEFAULT FALSE,
            ai_category TEXT,
            ai_sentiment TEXT,
            ai_summary  TEXT
        )
    """)

    inserted = 0
    for item in items:
        try:
            cur.execute(f"""
                INSERT INTO {table} (title, content, url, source, source_type, published)
                VALUES (%s, %s, %s, %s, %s, %s)
                ON CONFLICT (url) DO NOTHING
            """, (
                item.get("title", ""),
                item.get("content", "")[:10000],
                item.get("url", ""),
                item.get("source", ""),
                item.get("source_type", ""),
                item.get("published", None),
            ))
            if cur.rowcount > 0:
                inserted += 1
        except Exception as e:
            print(f"저장 오류: {e}")

    conn.commit()
    cur.close()
    conn.close()
    print(f"✅ {inserted}개 신규 데이터 저장 (총 {len(items)}개 중)")
    return inserted

# 실행 예시
async def main():
    RSS_FEEDS = [
        "https://feeds.feedburner.com/TechCrunch",
        "https://www.technologyreview.com/feed/",
        "https://rss.arxiv.org/rss/cs.AI",
    ]
    print("📡 RSS 수집 중...")
    rss_items = await collect_rss_feeds(RSS_FEEDS)
    print(f"  수집: {len(rss_items)}건")
    save_to_postgres(rss_items, "raw_data")

asyncio.run(main())
SECTIONE3

AI 기반 데이터 정제 & 분류 ETL

수집된 원시 데이터(Raw Data)를 LLM이 직접 분류·정제하는 AI-powered ETL입니다. 기존 ETL이 규칙(Rule)에 의존한다면, AI ETL은 의미(Semantics)를 이해합니다. 뉴스 기사의 감성, 토픽, 핵심 엔티티를 자동으로 추출해 구조화된 데이터로 변환합니다.

python — AI ETL 배치 처리 (LLM 분류 + DB 업데이트)
import httpx, json, asyncio, psycopg2
from psycopg2.extras import RealDictCursor

OLLAMA_URL = "http://192.168.1.253:11434"
DB_CONFIG  = {"host":"192.168.1.253","port":5432,"dbname":"aiserver_main",
               "user":"aiserver","password":"secure_password"}

async def ai_classify_batch(items: list[dict]) -> list[dict]:
    """
    LLM 배치 분류: 카테고리·감성·요약·키워드·중요도 동시 추출
    배치로 처리해 API 호출 횟수 최소화
    """
    if not items:
        return []

    # 배치 프롬프트 구성 (한 번에 최대 10개)
    batch_text = "\n\n".join([
        f"[{i+1}] 제목: {item['title'][:200]}\n내용: {item['content'][:500]}"
        for i, item in enumerate(items[:10])
    ])

    prompt = f"""다음 {len(items[:10])}개의 데이터를 분석하고 JSON 배열로 출력해.
각 항목에 대해 정확히 다음 형식으로:

{batch_text}

출력 형식 (JSON 배열만, 다른 텍스트 없이):
[
  {{
    "index": 1,
    "category": "기술/비즈니스/정치/경제/스포츠/문화/과학/기타 중 하나",
    "subcategory": "세부 카테고리",
    "sentiment": "positive/negative/neutral",
    "sentiment_score": 0.0~1.0 사이 숫자,
    "importance": "high/medium/low",
    "summary": "한국어 1~2문장 요약",
    "keywords": ["키워드1", "키워드2", "키워드3"],
    "entities": {{"companies": [], "people": [], "locations": []}},
    "is_duplicate_risk": true/false
  }}
]"""

    async with httpx.AsyncClient(timeout=120) as client:
        resp = await client.post(
            f"{OLLAMA_URL}/api/generate",
            json={
                "model":  "qwen2.5:14b",
                "prompt": prompt,
                "stream": False,
                "options": {"temperature": 0.1}
            }
        )

    raw = resp.json()["response"]
    try:
        import re
        json_match = re.search(r'\[.*\]', raw, re.DOTALL)
        results = json.loads(json_match.group()) if json_match else []
        return results
    except Exception as e:
        print(f"JSON 파싱 오류: {e}")
        return []

def update_processed_data(results: list[dict], ids: list[int]):
    """분류 결과를 DB에 업데이트"""
    conn = psycopg2.connect(**DB_CONFIG)
    cur  = conn.cursor()
    for r in results:
        idx = r.get("index", 1) - 1
        if idx < len(ids):
            cur.execute("""
                UPDATE raw_data SET
                    processed    = TRUE,
                    ai_category  = %s,
                    ai_sentiment = %s,
                    ai_summary   = %s
                WHERE id = %s
            """, (
                f"{r.get('category','')}/{r.get('subcategory','')}",
                r.get("sentiment", "neutral"),
                r.get("summary", ""),
                ids[idx]
            ))
    conn.commit(); cur.close(); conn.close()

async def run_etl_pipeline(batch_size: int = 10):
    """미처리 데이터를 배치로 AI 분류"""
    conn = psycopg2.connect(**DB_CONFIG)
    cur  = conn.cursor(cursor_factory=RealDictCursor)
    cur.execute("""
        SELECT id, title, content FROM raw_data
        WHERE processed = FALSE
        ORDER BY collected_at DESC
        LIMIT %s
    """, (batch_size,))
    rows = cur.fetchall()
    cur.close(); conn.close()

    if not rows:
        print("✅ 처리할 데이터 없음")
        return

    print(f"🤖 AI 분류 시작: {len(rows)}건")
    ids   = [r["id"] for r in rows]
    items = [{"title": r["title"], "content": r["content"]} for r in rows]

    results = await ai_classify_batch(items)
    if results:
        update_processed_data(results, ids)
        print(f"✅ {len(results)}건 분류 완료")

    # 벡터 DB 동시 인덱싱 (RAG 검색용)
    await index_to_qdrant(rows)

async def index_to_qdrant(rows: list[dict]):
    """처리된 데이터를 Qdrant에 동시 인덱싱"""
    from qdrant_client import QdrantClient
    from qdrant_client.models import PointStruct
    from sentence_transformers import SentenceTransformer
    import uuid

    embedder = SentenceTransformer("BAAI/bge-m3")
    qdrant   = QdrantClient(host="192.168.1.253", port=6333, api_key="your_key")

    texts   = [f"{r['title']} {r['content'][:500]}" for r in rows]
    vectors = embedder.encode(texts, normalize_embeddings=True).tolist()

    points = [
        PointStruct(
            id=str(uuid.uuid4()),
            vector=vectors[i],
            payload={
                "title":   rows[i]["title"],
                "content": rows[i]["content"][:500],
                "db_id":   rows[i]["id"],
                "type":    "pipeline_data"
            }
        )
        for i in range(len(rows))
    ]
    qdrant.upsert(collection_name="ai_knowledge_base", points=points)
    print(f"  📌 Qdrant 인덱싱: {len(points)}건")

asyncio.run(run_etl_pipeline(batch_size=50))
SECTIONE4

Apache Airflow — 파이프라인 오케스트레이션

여러 파이프라인이 서로 의존하고 실패 시 재시도가 필요해지면 n8n만으로는 한계가 있습니다. Apache Airflow는 DAG(Directed Acyclic Graph) 방식으로 복잡한 데이터 파이프라인을 시각적으로 관리합니다.

yaml — Airflow Docker Compose (경량 설치)
services:
  airflow-webserver:
    image: apache/airflow:2.10.0
    container_name: airflow_web
    restart: unless-stopped
    ports:
      - "8085:8080"
    environment:
      - AIRFLOW__CORE__EXECUTOR=LocalExecutor
      - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@airflow_postgres/airflow
      - AIRFLOW__CORE__FERNET_KEY=your_fernet_key_here
      - AIRFLOW__WEBSERVER__SECRET_KEY=your_secret_key
      - AIRFLOW__CORE__LOAD_EXAMPLES=False
      - _AIRFLOW_WWW_USER_CREATE=true
      - _AIRFLOW_WWW_USER_USERNAME=admin
      - _AIRFLOW_WWW_USER_PASSWORD=admin_password
    volumes:
      - ./dags:/opt/airflow/dags         # DAG 파일 위치
      - ./logs:/opt/airflow/logs
      - ./plugins:/opt/airflow/plugins
    command: webserver
    depends_on:
      - airflow_postgres
    networks:
      - ai_net

  airflow-scheduler:
    image: apache/airflow:2.10.0
    container_name: airflow_scheduler
    restart: unless-stopped
    environment:
      - AIRFLOW__CORE__EXECUTOR=LocalExecutor
      - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@airflow_postgres/airflow
      - AIRFLOW__CORE__FERNET_KEY=your_fernet_key_here
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
    command: scheduler
    depends_on:
      - airflow_postgres
    networks:
      - ai_net

  airflow_postgres:
    image: postgres:16-alpine
    container_name: airflow_postgres
    environment:
      - POSTGRES_DB=airflow
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
    volumes:
      - airflow_pg_data:/var/lib/postgresql/data
    networks:
      - ai_net

volumes:
  airflow_pg_data:

networks:
  ai_net:
    external: true
    name: ai_network
python — ./dags/ai_data_pipeline_dag.py
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator from datetime import datetime, timedelta default_args = { “owner”: “ai-server”, “depends_on_past”: False, “start_date”: datetime(2026, 1, 1), “retries”: 2, “retry_delay”: timedelta(minutes=5), “email_on_failure”: False, } def collect_task(**context): “””데이터 수집 태스크””” import asyncio, sys sys.path.insert(0, ‘/opt/airflow/plugins’) from collectors import collect_rss_feeds, save_to_postgres RSS_FEEDS = [ “https://feeds.feedburner.com/TechCrunch”, “https://www.technologyreview.com/feed/”, ] items = asyncio.run(collect_rss_feeds(RSS_FEEDS)) count = save_to_postgres(items, “raw_data”) # XCom으로 다음 태스크에 개수 전달 context[“task_instance”].xcom_push(key=”collected_count”, value=count) return count def etl_task(**context): “””AI ETL 정제 태스크””” import asyncio, sys sys.path.insert(0, ‘/opt/airflow/plugins’) from etl_processor import run_etl_pipeline count = asyncio.run(run_etl_pipeline(batch_size=100)) return count def report_task(**context): “””일일 리포트 생성 및 슬랙 알림””” import httpx ti = context[“task_instance”] collected = ti.xcom_pull(key=”collected_count”, task_ids=”collect_data”) msg = (f”📊 *AI 데이터 파이프라인 일일 리포트*\n” f”- 수집: {collected}건\n” f”- 날짜: {context[‘ds’]}\n” f”- 상태: ✅ 정상 완료”) httpx.post(“https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK”, json={“text”: msg}) with DAG( “ai_data_pipeline”, default_args=default_args, description=”AI 기반 자동 데이터 파이프라인”, schedule_interval=”0 */6 * * *”, # 6시간마다 catchup=False, tags=[“ai”, “data”, “pipeline”], ) as dag: t1 = PythonOperator(task_id=”collect_data”, python_callable=collect_task) t2 = PythonOperator(task_id=”etl_classify”, python_callable=etl_task) t3 = PythonOperator(task_id=”send_report”, python_callable=report_task) t4 = BashOperator( task_id=”cleanup_old_data”, bash_command=”””psql postgresql://aiserver:[email protected]/aiserver_main \ -c “DELETE FROM raw_data WHERE collected_at < NOW() - INTERVAL '30 days';" """ ) # 실행 순서: 수집 → ETL → 리포트 & 정리 (병렬) t1 >> t2 >> [t3, t4]
SECTIONE5

Grafana BI 대시보드 — 비즈니스 인사이트 시각화

Grafana는 GPU 모니터링뿐 아니라 비즈니스 KPI 대시보드에도 탁월합니다. PostgreSQL 데이터소스를 연결하면 파이프라인이 수집한 데이터를 실시간으로 시각화하고, AI가 이상 패턴을 감지하면 즉시 알림을 보냅니다.

📊 핵심 BI 패널 구성 (PostgreSQL 쿼리)

sql — Grafana 패널용 PostgreSQL 쿼리 모음
-- 패널 1: 시간대별 수집 건수 (Time Series)
SELECT
    date_trunc('hour', collected_at) AS time,
    source,
    COUNT(*) AS count
FROM raw_data
WHERE collected_at >= NOW() - INTERVAL '7 days'
GROUP BY 1, 2
ORDER BY 1;

-- 패널 2: 카테고리별 분포 (Pie Chart)
SELECT
    ai_category,
    COUNT(*) AS count,
    ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS percentage
FROM raw_data
WHERE processed = TRUE
  AND collected_at >= NOW() - INTERVAL '24 hours'
GROUP BY ai_category
ORDER BY count DESC;

-- 패널 3: 감성 분석 트렌드 (Time Series)
SELECT
    date_trunc('hour', collected_at) AS time,
    ai_sentiment,
    COUNT(*) AS count
FROM raw_data
WHERE processed = TRUE
  AND collected_at >= NOW() - INTERVAL '48 hours'
GROUP BY 1, 2
ORDER BY 1;

-- 패널 4: 최근 중요 뉴스 (Table)
SELECT
    title,
    source,
    ai_category,
    ai_sentiment,
    LEFT(ai_summary, 100) || '...' AS summary,
    collected_at
FROM raw_data
WHERE processed = TRUE
  AND ai_category ILIKE '%기술%'
ORDER BY collected_at DESC
LIMIT 20;

-- 패널 5: 소스별 성과 (Bar Chart)
SELECT
    source,
    COUNT(*) AS total,
    COUNT(*) FILTER (WHERE ai_sentiment = 'positive') AS positive,
    COUNT(*) FILTER (WHERE ai_sentiment = 'negative') AS negative,
    ROUND(AVG(CASE WHEN ai_sentiment='positive' THEN 1.0
                   WHEN ai_sentiment='negative' THEN -1.0
                   ELSE 0 END), 3) AS avg_sentiment
FROM raw_data
WHERE processed = TRUE
  AND collected_at >= NOW() - INTERVAL '7 days'
GROUP BY source
ORDER BY total DESC
LIMIT 10;

🤖 AI 기반 Grafana 이상 감지 알림

python — AI 이상 감지 + Grafana 알림 연동
import psycopg2, httpx, asyncio
from psycopg2.extras import RealDictCursor
from datetime import datetime, timezone

DB_CONFIG  = {"host":"192.168.1.253","port":5432,"dbname":"aiserver_main",
               "user":"aiserver","password":"secure_password"}
OLLAMA_URL = "http://192.168.1.253:11434"
SLACK_URL  = "https://hooks.slack.com/services/YOUR/WEBHOOK"

async def detect_anomalies():
    """
    최근 1시간 데이터에서 이상 패턴 AI 감지
    - 급격한 감성 변화
    - 특정 키워드 급증
    - 데이터 수집 이상
    """
    conn = psycopg2.connect(**DB_CONFIG)
    cur  = conn.cursor(cursor_factory=RealDictCursor)

    # 최근 1시간 vs 이전 24시간 평균 비교
    cur.execute("""
        SELECT
            COUNT(*) FILTER (WHERE collected_at >= NOW() - INTERVAL '1 hour') AS last_hour,
            COUNT(*) FILTER (WHERE collected_at >= NOW() - INTERVAL '25 hours'
                              AND collected_at < NOW() - INTERVAL '1 hour') / 24.0 AS hourly_avg,
            COUNT(*) FILTER (WHERE ai_sentiment = 'negative'
                              AND collected_at >= NOW() - INTERVAL '1 hour') AS negative_count,
            COUNT(*) FILTER (WHERE collected_at >= NOW() - INTERVAL '1 hour') AS total_recent
        FROM raw_data
        WHERE processed = TRUE
    """)
    stats = cur.fetchone()
    cur.close(); conn.close()

    anomalies = []

    # 수집량 급감 (평균 대비 50% 이하)
    if stats["hourly_avg"] > 0:
        ratio = stats["last_hour"] / stats["hourly_avg"]
        if ratio < 0.5:
            anomalies.append(f"⚠️ 데이터 수집량 급감: 평균의 {ratio*100:.0f}% ({stats['last_hour']}건)")

    # 부정 감성 급증 (70% 초과)
    if stats["total_recent"] > 10:
        neg_ratio = stats["negative_count"] / stats["total_recent"]
        if neg_ratio > 0.7:
            anomalies.append(f"🚨 부정 감성 급증: {neg_ratio*100:.0f}% ({stats['negative_count']}건)")

    if not anomalies:
        return

    # LLM으로 이상 패턴 분석 리포트 생성
    anomaly_text = "\n".join(anomalies)
    async with httpx.AsyncClient(timeout=60) as client:
        resp = await client.post(
            f"{OLLAMA_URL}/api/generate",
            json={
                "model": "qwen2.5:7b",
                "prompt": f"""다음 데이터 이상 패턴을 분석하고 가능한 원인과 권장 조치를 제안해줘 (3~5문장):

이상 항목:
{anomaly_text}

현재 시간: {datetime.now(timezone.utc).isoformat()}""",
                "stream": False,
                "options": {"temperature": 0.3}
            }
        )
    analysis = resp.json()["response"]

    # Slack 알림 전송
    slack_msg = {
        "text": "🔍 *AI 서버 데이터 파이프라인 이상 감지*",
        "blocks": [
            {"type": "header", "text": {"type": "plain_text", "text": "🔍 데이터 이상 감지"}},
            {"type": "section", "text": {"type": "mrkdwn", "text": f"*감지 항목:*\n{anomaly_text}"}},
            {"type": "section", "text": {"type": "mrkdwn", "text": f"*AI 분석:*\n{analysis}"}},
        ]
    }
    await client.post(SLACK_URL, json=slack_msg)
    print(f"✅ 이상 감지 알림 발송: {len(anomalies)}건")

# cron 또는 n8n에서 30분마다 실행
asyncio.run(detect_anomalies())
SECTIONE6

실전 파이프라인 3종 완전 구현

📰 파이프라인 1 — 경쟁사 동향 자동 모니터링

python — 경쟁사 모니터링 → AI 요약 → 주간 리포트
import asyncio, httpx
from datetime import datetime

COMPETITORS = {
    "경쟁사A": ["https://competitor-a.com/blog/feed", "https://competitor-a.com/news/feed"],
    "경쟁사B": ["https://competitor-b.com/rss"],
}
OLLAMA_URL = "http://192.168.1.253:11434"

async def weekly_competitor_report():
    """매주 월요일 오전 8시 경쟁사 주간 동향 리포트"""
    from collectors import collect_rss_feeds, save_to_postgres

    all_items = []
    for company, feeds in COMPETITORS.items():
        items = await collect_rss_feeds(feeds)
        for item in items:
            item["competitor"] = company
        all_items.extend(items)

    save_to_postgres(all_items, "competitor_data")

    # 주간 요약 생성
    summaries = "\n\n".join([
        f"제목: {item['title']}\n내용: {item['content'][:300]}"
        for item in all_items[:20]
    ])

    async with httpx.AsyncClient(timeout=120) as client:
        resp = await client.post(
            f"{OLLAMA_URL}/api/generate",
            json={
                "model": "qwen2.5:14b",
                "prompt": f"""경쟁사 주간 동향을 분석해서 경영진 보고용 리포트를 작성해줘.

수집 데이터:
{summaries}

리포트 형식:
1. 핵심 요약 (3문장)
2. 경쟁사별 주요 동향
3. 우리가 주목해야 할 포인트
4. 권장 대응 방안""",
                "stream": False
            }
        )
    report = resp.json()["response"]
    print(f"주간 경쟁사 리포트:\n{report}")
    return report

asyncio.run(weekly_competitor_report())

💹 파이프라인 2 — 실시간 시장 데이터 수집 & AI 분석

python — 주식/코인 가격 + 뉴스 감성 연동 분석
import asyncio, httpx, psycopg2
from datetime import datetime, timezone

async def collect_market_data(symbols: list[str]) -> list[dict]:
    """무료 API로 시장 데이터 수집 (Yahoo Finance 비공식 API)"""
    results = []
    async with httpx.AsyncClient(timeout=15) as client:
        for symbol in symbols:
            try:
                resp = await client.get(
                    f"https://query1.finance.yahoo.com/v8/finance/chart/{symbol}",
                    params={"interval": "1d", "range": "1mo"},
                    headers={"User-Agent": "Mozilla/5.0"}
                )
                data   = resp.json()
                quotes = data["chart"]["result"][0]
                meta   = quotes["meta"]
                results.append({
                    "symbol":        symbol,
                    "price":         meta.get("regularMarketPrice", 0),
                    "change_pct":    meta.get("regularMarketChangePercent", 0),
                    "volume":        meta.get("regularMarketVolume", 0),
                    "currency":      meta.get("currency", "USD"),
                    "collected_at":  datetime.now(timezone.utc).isoformat(),
                })
            except Exception as e:
                print(f"시장 데이터 수집 실패 {symbol}: {e}")
    return results

async def ai_market_analysis(market_data: list[dict], news_sentiment: str) -> str:
    """시장 데이터 + 뉴스 감성 통합 AI 분석"""
    market_text = "\n".join([
        f"{d['symbol']}: ${d['price']:.2f} ({d['change_pct']:+.2f}%)"
        for d in market_data
    ])

    async with httpx.AsyncClient(timeout=60) as client:
        resp = await client.post(
            "http://192.168.1.253:11434/api/generate",
            json={
                "model": "qwen2.5:7b",
                "prompt": f"""현재 시장 데이터와 뉴스 감성을 종합 분석해줘:

시장 데이터:
{market_text}

최근 뉴스 감성: {news_sentiment}

분석 내용:
1. 현재 시장 분위기
2. 주목할 종목/섹터
3. 주의 신호
(투자 조언 아님, 정보 제공 목적)""",
                "stream": False,
                "options": {"temperature": 0.3}
            }
        )
    return resp.json()["response"]

async def run_market_pipeline():
    SYMBOLS = ["NVDA", "MSFT", "GOOGL", "META", "AMZN"]
    market_data = await collect_market_data(SYMBOLS)
    analysis    = await ai_market_analysis(market_data, "중립적")
    print(f"📈 시장 분석 완료:\n{analysis}")

asyncio.run(run_market_pipeline())

💬 파이프라인 3 — 고객 리뷰 감성 분석 자동화

python — 네이버·카카오·앱스토어 리뷰 자동 수집 + AI 분류
import asyncio, httpx, psycopg2

async def analyze_review_batch(reviews: list[str]) -> list[dict]:
    """리뷰 배치를 LLM으로 감성·이슈 분류"""
    batch = "\n\n".join([f"[{i+1}] {r[:300]}" for i, r in enumerate(reviews[:15])])

    async with httpx.AsyncClient(timeout=90) as client:
        resp = await client.post(
            "http://192.168.1.253:11434/api/generate",
            json={
                "model":  "qwen2.5:7b",
                "prompt": f"""다음 고객 리뷰들을 분석해서 JSON 배열로 출력해:

{batch}

JSON 배열 형식 (다른 텍스트 없이):
[
  {{
    "index": 1,
    "sentiment": "positive/negative/neutral",
    "score": 0.0~1.0,
    "issue_category": "배송/품질/가격/CS/사용성/기타",
    "key_complaint": "핵심 불만사항 또는 null",
    "key_praise": "핵심 칭찬 또는 null",
    "urgency": "high/medium/low",
    "requires_response": true/false
  }}
]""",
                "stream": False,
                "options": {"temperature": 0.1}
            }
        )

    import json, re
    raw = resp.json()["response"]
    try:
        json_match = re.search(r'\[.*\]', raw, re.DOTALL)
        return json.loads(json_match.group()) if json_match else []
    except:
        return []

async def run_review_pipeline():
    """
    실제 리뷰 수집은 스토어 API 또는 크롤링으로 구현
    여기서는 예시 데이터로 시연
    """
    sample_reviews = [
        "배송이 너무 빨라서 깜짝 놀랐어요! 제품 품질도 정말 좋습니다.",
        "포장이 너무 부실해서 제품이 파손되어 왔어요. 실망입니다.",
        "가격 대비 성능이 나쁘지 않네요. 그냥 평범합니다.",
        "고객센터 연결이 너무 어려워요. 30분 기다렸는데 연결이 안 됩니다.",
    ]

    results = await analyze_review_batch(sample_reviews)

    # 긴급 대응 필요 리뷰 필터링
    urgent = [r for r in results if r.get("requires_response") and r.get("urgency") == "high"]
    if urgent:
        print(f"🚨 긴급 대응 필요 리뷰: {len(urgent)}건")
        for r in urgent:
            print(f"  [{r['issue_category']}] {r.get('key_complaint', '')}")

    # 일별 감성 통계 DB 저장
    positive = sum(1 for r in results if r.get("sentiment") == "positive")
    negative = sum(1 for r in results if r.get("sentiment") == "negative")
    print(f"✅ 리뷰 분석: 긍정 {positive}건 / 부정 {negative}건 / 총 {len(results)}건")

asyncio.run(run_review_pipeline())
✅ 추가편 E 파이프라인 완성으로 가능해지는 것들
  • 6시간마다 AI 기술 뉴스 자동 수집 → LLM 분류·요약 → Grafana 대시보드 실시간 표시
  • 경쟁사 블로그 변경 감지 → 자동 분석 리포트 → 슬랙 주간 요약 발송
  • 고객 리뷰 급증 시 자동 감지 → 긴급 이슈 담당자 즉시 알림
  • 시장 데이터 + 뉴스 감성 통합 → AI 종합 분석 리포트 자동 생성
  • Airflow DAG로 모든 파이프라인 의존성 관리 + 실패 자동 재시도
← 이전 편
4편 — 자동화·Open WebUI·실전 워크플로우

Related Stories

Leave A Reply

Please enter your comment!
Please enter your name here

Stay on op - Ge the daily news in your inbox