분포 거리 측정 (Part 4) - 실무 적용과 Detection 시스템
· 약 11분
KL, JSD, Wasserstein을 실제 시스템에 적용하는 방법. Drift Detection, Anomaly Detection, Model Monitoring 구현 패턴과 실무 체크리스트.
들어가며
지난 세 편에서 KL Divergence, JSD, Wasserstein Distance의 이론적 배경을 살펴봤습니다. 이제 가장 중요한 질문이 남았습니다. "실제로 어떻게 쓰는가?"
이번 글에서는 분포 비교 기법을 실제 시스템에 적용하는 구체적인 패턴들을 다룹니다. Feature drift detection, anomaly detection, model monitoring 등 바로 활용할 수 있는 내용에 집중합니다.
1. 분포 비교가 필요한 실무 상황
주요 사용 사례
┌─────────────────────────────────────────────────────────┐
│ Distribution Comparison Use Cases │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Data │ │ Model │ │ Anomaly │ │
│ │ Drift │ │ Monitoring│ │ Detection │ │
│ │ │ │ │ │ │ │
│ │ Input │ │ Prediction │ │ Behavior │ │
│ │ Change │ │ Tracking │ │ Pattern │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ A/B Test │ │ Quality │ │
│ │ Analysis │ │ Control │ │
│ │ │ │ │ │
│ │ Group │ │ Batch │ │
│ │ Compare │ │ Compare │ │
│ └─────────────┘ └─── ──────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
- Data Drift: 입력 분포 변화 감지
- Model Monitoring: 예측 분포 변화 추적
- Anomaly Detection: 행동 패턴 이상 탐지
- A/B Test Analysis: 그룹 간 분포 비교
- Quality Control: 배치 간 품질 비교
공통 패턴
대부분의 경우 다음 구조를 따릅니다:
1. Baseline 분포 구축 (정상 상태)
↓
2. 현재 분포 추정 (실시간 또는 배치)
↓
3. 거리/발산 계산
↓
4. Threshold 기반 판단
↓
5. 알림 또는 액션
2. Feature Distribution Drift Detection
문제 상황
ML 모델은 학습 데이터와 유사한 분포의 입력을 기대합니다. 시간이 지나면서 입력 데이터의 분포가 변하면(drift), 모델 성능이 저하될 수 있습니다.
Training time: Production time:
∩ ∩
_| |_ _| |_
────────────── ──────────────
Feature X Feature X
(mean: 100) (mean: 120)
↓
Drift detected! Retrain model?
- 학습 시점 평균: 100 → 운영 시점 평균: 120으로 분포 이동 (Drift 발생)
구현 패턴
class FeatureDriftDetector:
function initialize(n_bins, threshold):
self.n_bins = n_bins
self.threshold = threshold
self.baseline_histogram = null
self.bin_edges = null
function fit_baseline(baseline_data):
# Baseline 데이터로 히스토그램 생성
self.baseline_histogram, self.bin_edges = histogram(
baseline_data,
bins=self.n_bins,
density=true
)
# Smoothing (zero 방지)
self.baseline_histogram = self.baseline_histogram + 1e-10
self.baseline_histogram = normalize(self.baseline_histogram)
function check_drift(current_data):
# 현재 데이터의 히스토그램
current_histogram, _ = histogram(
current_data,
bins=self.bin_edges, # 동일한 bin 사용
density=true
)
current_histogram = current_histogram + 1e-10
current_histogram = normalize(current_histogram)
# JSD 계산 (대칭, bounded)
jsd_value = compute_jsd(self.baseline_histogram, current_histogram)
# 판단
return {
"jsd": jsd_value,
"is_drift": jsd_value > self.threshold,
"severity": classify_severity(jsd_value)
}
function classify_severity(jsd_value):
if jsd_value > 2 * self.threshold:
return "high"
else if jsd_value > self.threshold:
return "medium"
else:
return "low"
사용 예시
# 초기화 및 baseline 학습
detector = FeatureDriftDetector(n_bins=50, threshold=0.05)
detector.fit_baseline(training_data["feature_x"])
# 실시간 모니터링 루프
while true:
current_window = get_recent_data(window_size=1000)
result = detector.check_drift(current_window["feature_x"])
if result["is_drift"]:
alert(
message="Feature drift detected",
severity=result["severity"],
jsd_value=result["jsd"]
)
wait(interval=1_hour)
3. Behavioral Anomaly Detection
문제 상황
게임, 금융, 보안 등의 도메인에서 정상 사용자와 비정상 사용자(봇, 사기꾼, 해커)를 구분해야 합니다. 개별 행동보다 행동 패턴의 분포를 비교하는 것이 효과적입니다.
Normal player: Suspicious player:
Action interval dist: Action interval dist:
∩ │
_/ \_ │
_/ \_ │
─────────── ─────┴─────
50ms~500ms exactly 100ms
(natural variance) (mechanical precision)
- 정상 플레이어: 50ms~500ms 범위의 자연스러운 변동
- 의심 플레이어: 정확히 100ms의 기계적 정확성 (봇 의심)
구현 패턴
class BehaviorAnomalyDetector:
function initialize(feature_configs):
# feature_configs 예시:
# {
# "action_interval_ms": {"bins": 50},
# "click_position_variance": {"bins": 30},
# "session_duration": {"bins": 40}
# }
self.feature_configs = feature_configs
self.baseline_distributions = {}
function fit_baseline(normal_user_data):
# 정상 사용자 데이터로 각 feature의 baseline 분포 학습
for feature_name, config in self.feature_configs:
values = normal_user_data[feature_name]
histogram, edges = histogram(values, bins=config["bins"])
histogram = smooth_and_normalize(histogram)
self.baseline_distributions[feature_name] = {
"histogram": histogram,
"edges": edges
}
function score_user(user_data, window_size=100):
feature_scores = {}
for feature_name, config in self.feature_configs:
if length(user_data[feature_name]) < window_size:
continue
baseline = self.baseline_distributions[feature_name]
# 사용자의 최근 행동 분포
user_values = user_data[feature_name][-window_size:]
user_histogram, _ = histogram(
user_values,
bins=baseline["edges"]
)
user_histogram = smooth_and_normalize(user_histogram)
# JSD로 이상치 스코어 계산
jsd = compute_jsd(baseline["histogram"], user_histogram)
feature_scores[feature_name] = jsd
# 종합 스코어
if length(feature_scores) > 0:
final_score = mean(values(feature_scores))
return {
"anomaly_score": final_score,
"feature_scores": feature_scores,
"is_suspicious": final_score > 0.15,
"top_anomalous_features": get_top_features(feature_scores, n=3)
}
return null
실제 적용 시 고려사항
┌─────────────────────────────────────────────────────────┐
│ Behavioral Anomaly Detection Checklist │
├─────────────────────────────────────────────────────────┤
│ │
│ □ Feature Selection │
│ - Hard-to-manipulate features │
│ - Time-based (response time, intervals) │
│ - Pattern-based (sequence, combinations) │
│ │
│ □ Window Size │
│ - Too small: noise-sensitive │
│ - Too large: detection delay │
│ - Recommend: minimum for statistical significance │
│ │
│ □ Threshold Tuning │
│ - False Positive vs False Negative trade-off │
│ - Consult domain experts │
│ - Validate with A/B testing │
│ │
└─────────────────────────────────────────────────────────┘
체크리스트 설명:
- Feature 선택: 조작하기 어려운 feature, 시간/패턴 기반 feature 선택
- Window 크기: 너무 작으면 노이즈 민감, 너무 크면 감지 지연
- Threshold 튜닝: FP/FN 트레이드오프, 도메인 전문가 협의, A/B 테스트 검증
4. Model Prediction Monitoring
문제 상황
모델의 예측 결과 분포가 시간에 따라 변하면, 이는 데이터 drift나 모델 성능 저하의 신호일 수 있습니다.
Normal period: Anomaly period:
Class prediction ratio: Class prediction ratio:
Class A: ████████ 40% Class A: ██ 10%
Class B: ██████ 30% Class B: ██████████████ 70%
Class C: ██████ 30% Class C: ████ 20%
↓
Prediction drift detected!
- Actual data changed?
- Model issue?
- 정상 시기: 균형 잡힌 예측 분포 (40/30/30)
- 이상 시기: Class B에 편중된 예측 (10/70/20) → 데이터 변화 또는 모델 문제 의심
구현 패턴
class PredictionMonitor:
function initialize(n_classes, alert_threshold):
self.n_classes = n_classes
self.alert_threshold = alert_threshold
self.baseline_distribution = null
self.history = [] # 시간별 기록
function set_baseline(predictions, period="1_day"):
# 정상 기간의 예측 분포를 baseline으로
class_counts = count_by_class(predictions, self.n_classes)
self.baseline_distribution = normalize(class_counts)
self.baseline_distribution = smooth(self.baseline_distribution)
function monitor(current_predictions):
# 현재 예측 분포
current_counts = count_by_class(current_predictions, self.n_classes)
current_distribution = normalize(current_counts)
current_distribution = smooth(current_distribution)
# 여러 metric 계산
jsd = compute_jsd(self.baseline_distribution, current_distribution)
# 어떤 클래스가 가장 변했는지 분석
class_drift = current_distribution - self.baseline_distribution
most_increased = argmax(class_drift)
most_decreased = argmin(class_drift)
result = {
"jsd": jsd,
"alert": jsd > self.alert_threshold,
"most_increased_class": most_increased,
"most_decreased_class": most_decreased,
"class_changes": class_drift,
"timestamp": now()
}
# 히스토리 기록
self.history.append(result)
return result
function get_trend(window="7_days"):
# 시간에 따른 JSD 변화 추이
recent = filter(self.history, last=window)
return {
"jsd_values": [r["jsd"] for r in recent],
"timestamps": [r["timestamp"] for r in recent],
"alert_count": count(r for r in recent if r["alert"])
}
