stock/log_system_events.py

214 lines
6.0 KiB
Python

"""
系统事件日志记录脚本
记录系统运行过程中的重要事件和异常信息
"""
import sys
import os
import logging
from datetime import datetime
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.storage.database import db_manager
from src.storage.stock_repository import StockRepository
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def log_system_event(level, module, message, exception_type=None, exception_message=None, traceback=None, stock_code=None, data_type=None):
"""
记录系统事件到数据库
Args:
level: 日志级别 (INFO, WARNING, ERROR, DEBUG)
module: 模块名称
message: 日志消息
exception_type: 异常类型 (可选)
exception_message: 异常消息 (可选)
traceback: 异常堆栈 (可选)
stock_code: 关联股票代码 (可选)
data_type: 数据类型 (可选)
Returns:
记录是否成功
"""
try:
# 创建存储库
repository = StockRepository(db_manager.get_session())
# 创建日志记录
log_data = {
"log_level": level,
"module_name": module,
"message": message,
"exception_type": exception_type,
"exception_message": exception_message,
"traceback": traceback,
"stock_code": stock_code,
"data_type": data_type
}
# 保存到数据库
success = repository.save_system_log(log_data)
if success:
logger.info(f"系统事件记录成功: {level} - {module} - {message}")
else:
logger.error(f"系统事件记录失败: {level} - {module} - {message}")
return success
except Exception as e:
logger.error(f"记录系统事件异常: {str(e)}")
return False
def log_data_collection_start(stock_codes, data_type, source):
"""
记录数据采集开始事件
Args:
stock_codes: 股票代码列表
data_type: 数据类型 (kline/financial)
source: 数据源 (baostock/akshare)
"""
message = f"开始采集{data_type}数据,股票数量: {len(stock_codes)},数据源: {source}"
return log_system_event("INFO", "data_collection", message, stock_code=",".join(stock_codes[:5]) if stock_codes else None, data_type=data_type)
def log_data_collection_complete(stock_codes, data_type, source, success_count, error_count):
"""
记录数据采集完成事件
Args:
stock_codes: 股票代码列表
data_type: 数据类型 (kline/financial)
source: 数据源 (baostock/akshare)
success_count: 成功数量
error_count: 失败数量
"""
message = f"{data_type}数据采集完成,成功: {success_count},失败: {error_count},数据源: {source}"
return log_system_event("INFO", "data_collection", message, stock_code=",".join(stock_codes[:5]) if stock_codes else None, data_type=data_type)
def log_database_operation(operation, table, affected_rows):
"""
记录数据库操作事件
Args:
operation: 操作类型 (insert/update/delete)
table: 表名
affected_rows: 影响行数
"""
message = f"数据库{operation}操作,表: {table},影响行数: {affected_rows}"
return log_system_event("INFO", "database", message)
def log_system_error(module, error_message, exception=None, stock_code=None):
"""
记录系统错误事件
Args:
module: 模块名称
error_message: 错误消息
exception: 异常对象 (可选)
stock_code: 关联股票代码 (可选)
"""
if exception:
return log_system_event("ERROR", module, error_message,
exception_type=type(exception).__name__,
exception_message=str(exception),
stock_code=stock_code)
else:
return log_system_event("ERROR", module, error_message, stock_code=stock_code)
def get_system_logs(level=None, module=None, start_date=None, end_date=None, limit=100):
"""
查询系统日志
Args:
level: 日志级别过滤 (可选)
module: 模块名称过滤 (可选)
start_date: 开始日期 (可选)
end_date: 结束日期 (可选)
limit: 返回记录数限制
Returns:
日志记录列表
"""
try:
repository = StockRepository(db_manager.get_session())
# 构建查询条件
query = repository.session.query(repository.SystemLog)
if level:
query = query.filter(repository.SystemLog.log_level == level)
if module:
query = query.filter(repository.SystemLog.module_name == module)
if start_date:
query = query.filter(repository.SystemLog.created_at >= start_date)
if end_date:
query = query.filter(repository.SystemLog.created_at <= end_date)
# 按时间倒序排列
query = query.order_by(repository.SystemLog.created_at.desc())
# 限制返回数量
logs = query.limit(limit).all()
logger.info(f"查询到{len(logs)}条系统日志")
return logs
except Exception as e:
logger.error(f"查询系统日志异常: {str(e)}")
return []
def main():
"""
主函数 - 测试系统日志功能
"""
logger.info("开始测试系统日志功能...")
# 测试记录各种类型的事件
test_events = [
("INFO", "system", "系统启动", None, None, None, None, None),
("INFO", "data_collection", "数据采集开始", None, None, None, "000001", "kline"),
("ERROR", "database", "数据库连接失败", "ConnectionError", "无法连接数据库", "traceback info", None, None),
("WARNING", "data_processing", "数据格式异常", None, None, None, "000002", "financial")
]
success_count = 0
for event in test_events:
if log_system_event(*event):
success_count += 1
logger.info(f"系统日志测试完成,成功记录{success_count}/{len(test_events)}条事件")
# 查询并显示最近的系统日志
logger.info("查询最近的系统日志:")
recent_logs = get_system_logs(limit=10)
for i, log in enumerate(recent_logs):
logger.info(f" {i+1}. [{log.log_level}] {log.module_name}: {log.message} ({log.created_at})")
return success_count == len(test_events)
if __name__ == "__main__":
# 运行测试
success = main()
if success:
print("系统日志功能测试成功!")
sys.exit(0)
else:
print("系统日志功能测试失败!")
sys.exit(1)