llm-quant/app/llm/cost.py
2025-10-05 17:24:10 +08:00

239 lines
7.9 KiB
Python

"""LLM cost control and budget management."""
from __future__ import annotations
import json
import logging
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Set
from .metrics import snapshot
LOGGER = logging.getLogger(__name__)
LOG_EXTRA = {"stage": "cost_control"}
@dataclass
class CostLimits:
"""Cost control limits configuration."""
hourly_budget: float # 每小时预算
daily_budget: float # 每日预算
monthly_budget: float # 每月预算
model_weights: Dict[str, float] = field(default_factory=dict) # 模型权重配置
@classmethod
def default(cls) -> CostLimits:
"""Create default cost limits."""
return cls(
hourly_budget=2.0, # $2/hour
daily_budget=20.0, # $20/day
monthly_budget=300.0, # $300/month
model_weights={
"gpt-4": 0.2, # 限制GPT-4使用比例
"gpt-3.5-turbo": 0.6,
"llama2": 0.2
}
)
@dataclass
class ModelCosts:
"""Per-model cost configuration."""
prompt_cost_per_1k: float
completion_cost_per_1k: float
min_tokens: int = 1
def calculate(self, prompt_tokens: int, completion_tokens: int) -> float:
"""Calculate cost for token usage."""
prompt_cost = max(self.min_tokens, prompt_tokens) / 1000 * self.prompt_cost_per_1k
completion_cost = max(self.min_tokens, completion_tokens) / 1000 * self.completion_cost_per_1k
return prompt_cost + completion_cost
class CostController:
"""Controls and manages LLM costs."""
def __init__(self, limits: Optional[CostLimits] = None):
"""Initialize cost controller."""
self.limits = limits or CostLimits.default()
self._costs: Dict[str, ModelCosts] = {
"gpt-4": ModelCosts(0.03, 0.06),
"gpt-4-32k": ModelCosts(0.06, 0.12),
"gpt-3.5-turbo": ModelCosts(0.0015, 0.002),
"gpt-3.5-turbo-16k": ModelCosts(0.003, 0.004),
"llama2": ModelCosts(0.0, 0.0),
"codellama": ModelCosts(0.0, 0.0)
}
self._usage_lock = threading.Lock()
self._usage: Dict[str, List[Dict[str, Any]]] = {
"hourly": [],
"daily": [],
"monthly": []
}
self._last_cleanup = time.time()
self._cleanup_interval = 3600 # 1小时清理一次历史数据
def can_use_model(self, model: str, prompt_tokens: int,
completion_tokens: int) -> bool:
"""检查是否允许使用指定模型."""
# 检查成本限制
if not self._check_budget_limits(model, prompt_tokens, completion_tokens):
return False
# 检查模型权重限制
if not self._check_model_weights(model):
return False
return True
def record_usage(self, model: str, prompt_tokens: int,
completion_tokens: int) -> None:
"""记录模型使用情况."""
cost = self._calculate_cost(model, prompt_tokens, completion_tokens)
timestamp = time.time()
usage = {
"model": model,
"timestamp": timestamp,
"cost": cost,
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens
}
with self._usage_lock:
self._usage["hourly"].append(usage)
self._usage["daily"].append(usage)
self._usage["monthly"].append(usage)
# 定期清理过期数据
self._cleanup_old_usage(timestamp)
def get_current_costs(self) -> Dict[str, float]:
"""获取当前时段的成本统计."""
with self._usage_lock:
now = time.time()
hour_ago = now - 3600
day_ago = now - 86400
month_ago = now - 2592000 # 30天
hourly = sum(u["cost"] for u in self._usage["hourly"]
if u["timestamp"] > hour_ago)
daily = sum(u["cost"] for u in self._usage["daily"]
if u["timestamp"] > day_ago)
monthly = sum(u["cost"] for u in self._usage["monthly"]
if u["timestamp"] > month_ago)
return {
"hourly": hourly,
"daily": daily,
"monthly": monthly
}
def get_model_distribution(self) -> Dict[str, float]:
"""获取模型使用分布."""
with self._usage_lock:
now = time.time()
day_ago = now - 86400
# 统计24小时内的使用情况
model_calls: Dict[str, int] = {}
total_calls = 0
for usage in self._usage["daily"]:
if usage["timestamp"] > day_ago:
model = usage["model"]
model_calls[model] = model_calls.get(model, 0) + 1
total_calls += 1
if total_calls == 0:
return {}
return {
model: count / total_calls
for model, count in model_calls.items()
}
def _calculate_cost(self, model: str, prompt_tokens: int,
completion_tokens: int) -> float:
"""计算使用成本."""
model_costs = self._costs.get(model)
if not model_costs:
return 0.0
return model_costs.calculate(prompt_tokens, completion_tokens)
def _check_budget_limits(self, model: str, prompt_tokens: int,
completion_tokens: int) -> bool:
"""检查是否超出预算限制."""
estimated_cost = self._calculate_cost(model, prompt_tokens, completion_tokens)
current_costs = self.get_current_costs()
# 检查各个时间维度的预算限制
if (current_costs["hourly"] + estimated_cost > self.limits.hourly_budget or
current_costs["daily"] + estimated_cost > self.limits.daily_budget or
current_costs["monthly"] + estimated_cost > self.limits.monthly_budget):
LOGGER.warning(
"Cost limit exceeded - model: %s, estimated: $%.4f",
model, estimated_cost, extra=LOG_EXTRA
)
return False
return True
def _check_model_weights(self, model: str) -> bool:
"""检查是否符合模型权重限制."""
if model not in self.limits.model_weights:
return True # 未配置权重的模型不限制
distribution = self.get_model_distribution()
current_weight = distribution.get(model, 0.0)
max_weight = self.limits.model_weights[model]
if current_weight >= max_weight:
LOGGER.warning(
"Model weight exceeded - model: %s, current: %.1f%%, max: %.1f%%",
model, current_weight * 100, max_weight * 100, extra=LOG_EXTRA
)
return False
return True
def _cleanup_old_usage(self, current_time: float) -> None:
"""清理过期的使用记录."""
if current_time - self._last_cleanup < self._cleanup_interval:
return
hour_ago = current_time - 3600
day_ago = current_time - 86400
month_ago = current_time - 2592000
self._usage["hourly"] = [
u for u in self._usage["hourly"]
if u["timestamp"] > hour_ago
]
self._usage["daily"] = [
u for u in self._usage["daily"]
if u["timestamp"] > day_ago
]
self._usage["monthly"] = [
u for u in self._usage["monthly"]
if u["timestamp"] > month_ago
]
self._last_cleanup = current_time
# 全局实例
_controller = CostController()
def get_controller() -> CostController:
"""获取全局CostController实例."""
return _controller
def set_cost_limits(limits: CostLimits) -> None:
"""设置全局成本限制."""
_controller.limits = limits