""" 数据处理和定时任务模块单元测试 测试数据处理器和定时任务调度器的功能 """ import pytest import asyncio from unittest.mock import Mock, patch, AsyncMock from datetime import datetime, timedelta from src.data.data_processor import DataProcessor from src.scheduler.task_scheduler import TaskScheduler from src.utils.exceptions import DataProcessingError class TestDataProcessor: """数据处理器测试类""" @pytest.fixture def data_processor(self): """数据处理器实例""" return DataProcessor() def test_process_stock_basic_info_success(self, data_processor): """测试处理股票基础信息成功""" raw_data = [ { "代码": "000001", "名称": "平安银行", "市场": "主板", "行业": "银行", "地区": "广东", "上市日期": "1991-04-03", "数据源": "akshare" } ] result = data_processor.process_stock_basic_info(raw_data) assert len(result) == 1 assert result[0]["code"] == "000001" assert result[0]["name"] == "平安银行" assert result[0]["market"] == "主板" assert result[0]["industry"] == "银行" assert result[0]["area"] == "广东" assert result[0]["ipo_date"] == "1991-04-03" assert result[0]["data_source"] == "akshare" def test_process_stock_basic_info_missing_fields(self, data_processor): """测试处理缺失字段的股票基础信息""" raw_data = [ { "代码": "000001", "名称": "平安银行" # 缺少其他字段 } ] result = data_processor.process_stock_basic_info(raw_data) assert len(result) == 1 assert result[0]["code"] == "000001" assert result[0]["name"] == "平安银行" assert result[0]["market"] == "未知" # 默认值 assert result[0]["industry"] == "未知" # 默认值 assert result[0]["area"] == "未知" # 默认值 assert result[0]["ipo_date"] is None # 默认值 def test_process_daily_kline_data_success(self, data_processor): """测试处理日K线数据成功""" raw_data = [ { "日期": "2024-01-15", "开盘": 10.5, "最高": 11.2, "最低": 10.3, "收盘": 10.8, "成交量": 1000000, "成交额": 10800000, "数据源": "akshare" } ] result = data_processor.process_daily_kline_data("000001", raw_data) assert len(result) == 1 assert result[0]["code"] == "000001" assert result[0]["date"] == "2024-01-15" assert result[0]["open"] == 10.5 assert result[0]["high"] == 11.2 assert result[0]["low"] == 10.3 assert result[0]["close"] == 10.8 assert result[0]["volume"] == 1000000 assert result[0]["amount"] == 10800000 assert result[0]["data_source"] == "akshare" def test_process_daily_kline_data_invalid_values(self, data_processor): """测试处理无效值的日K线数据""" raw_data = [ { "日期": "2024-01-15", "开盘": "无效值", # 字符串而不是数字 "最高": 11.2, "最低": 10.3, "收盘": 10.8, "成交量": 1000000, "成交额": 10800000 } ] with pytest.raises(DataProcessingError): data_processor.process_daily_kline_data("000001", raw_data) def test_process_financial_report_data_success(self, data_processor): """测试处理财务报告数据成功""" raw_data = [ { "报告期": "2023-12-31", "报告类型": "年报", "每股收益": 1.5, "净利润": 1500000000, "营业收入": 5000000000, "总资产": 10000000000, "数据源": "akshare" } ] result = data_processor.process_financial_report_data("000001", raw_data) assert len(result) == 1 assert result[0]["code"] == "000001" assert result[0]["report_date"] == "2023-12-31" assert result[0]["report_type"] == "年报" assert result[0]["eps"] == 1.5 assert result[0]["net_profit"] == 1500000000 assert result[0]["revenue"] == 5000000000 assert result[0]["total_assets"] == 10000000000 assert result[0]["data_source"] == "akshare" def test_validate_data_success(self, data_processor): """测试数据验证成功""" valid_data = [ { "code": "000001", "name": "平安银行", "market": "主板", "ipo_date": "1991-04-03" } ] result = data_processor._validate_data(valid_data, ["code", "name"]) assert result is True def test_validate_data_missing_required_fields(self, data_processor): """测试数据验证缺失必要字段""" invalid_data = [ { "code": "000001" # 缺少name字段 } ] with pytest.raises(DataProcessingError): data_processor._validate_data(invalid_data, ["code", "name"]) def test_clean_data_success(self, data_processor): """测试数据清洗成功""" dirty_data = [ { "code": " 000001 ", # 有空格 "name": "平安银行", "market": "主板", "ipo_date": "1991-04-03" } ] result = data_processor._clean_data(dirty_data) assert len(result) == 1 assert result[0]["code"] == "000001" # 空格被去除 def test_standardize_data_success(self, data_processor): """测试数据标准化成功""" raw_data = [ { "代码": "000001", "名称": "平安银行", "市场": "主板" } ] field_mapping = { "代码": "code", "名称": "name", "市场": "market" } result = data_processor._standardize_data(raw_data, field_mapping) assert len(result) == 1 assert "code" in result[0] assert "name" in result[0] assert "market" in result[0] assert result[0]["code"] == "000001" assert result[0]["name"] == "平安银行" assert result[0]["market"] == "主板" class TestTaskScheduler: """定时任务调度器测试类""" @pytest.fixture def task_scheduler(self): """定时任务调度器实例""" return TaskScheduler() def test_scheduler_initialization(self, task_scheduler): """测试调度器初始化""" assert task_scheduler.scheduler is None assert task_scheduler.is_running is False @pytest.mark.asyncio async def test_start_scheduler_success(self, task_scheduler): """测试启动调度器成功""" with patch.object(task_scheduler, "_configure_jobs"): result = await task_scheduler.start() assert result is True assert task_scheduler.is_running is True assert task_scheduler.scheduler is not None @pytest.mark.asyncio async def test_stop_scheduler_success(self, task_scheduler): """测试停止调度器成功""" # 先启动调度器 with patch.object(task_scheduler, "_configure_jobs"): await task_scheduler.start() # 停止调度器 result = await task_scheduler.stop() assert result is True assert task_scheduler.is_running is False @pytest.mark.asyncio async def test_stop_scheduler_not_running(self, task_scheduler): """测试停止未运行的调度器""" result = await task_scheduler.stop() assert result is False def test_configure_daily_kline_job(self, task_scheduler): """测试配置每日K线数据更新任务""" with patch.object(task_scheduler, "_add_job") as mock_add_job: task_scheduler._configure_daily_kline_job() mock_add_job.assert_called_once() call_args = mock_add_job.call_args[0] assert call_args[0] == "daily_kline_update" assert call_args[1] == "cron" assert call_args[2] == task_scheduler._update_daily_kline_data assert call_args[3] == {"hour": 18, "minute": 0} # 下午6点 def test_configure_weekly_financial_job(self, task_scheduler): """测试配置每周财务数据更新任务""" with patch.object(task_scheduler, "_add_job") as mock_add_job: task_scheduler._configure_weekly_financial_job() mock_add_job.assert_called_once() call_args = mock_add_job.call_args[0] assert call_args[0] == "weekly_financial_update" assert call_args[1] == "cron" assert call_args[2] == task_scheduler._update_financial_data assert call_args[3] == {"day_of_week": 0, "hour": 20, "minute": 0} # 周日晚上8点 def test_configure_monthly_basic_job(self, task_scheduler): """测试配置每月股票基础信息更新任务""" with patch.object(task_scheduler, "_add_job") as mock_add_job: task_scheduler._configure_monthly_basic_job() mock_add_job.assert_called_once() call_args = mock_add_job.call_args[0] assert call_args[0] == "monthly_basic_update" assert call_args[1] == "cron" assert call_args[2] == task_scheduler._update_stock_basic_info assert call_args[3] == {"day": 1, "hour": 22, "minute": 0} # 每月1日晚上10点 def test_configure_daily_health_check_job(self, task_scheduler): """测试配置每日健康检查任务""" with patch.object(task_scheduler, "_add_job") as mock_add_job: task_scheduler._configure_daily_health_check_job() mock_add_job.assert_called_once() call_args = mock_add_job.call_args[0] assert call_args[0] == "daily_health_check" assert call_args[1] == "cron" assert call_args[2] == task_scheduler._health_check assert call_args[3] == {"hour": 9, "minute": 0} # 早上9点 @pytest.mark.asyncio async def test_update_daily_kline_data_success(self, task_scheduler): """测试更新日K线数据成功""" mock_data = [ { "code": "000001", "date": "2024-01-15", "open": 10.5, "close": 10.8 } ] with patch.object(task_scheduler.data_manager, "get_daily_kline_data", return_value=mock_data): with patch.object(task_scheduler.stock_repo, "save_daily_kline_data", return_value=True): result = await task_scheduler._update_daily_kline_data() assert result is True @pytest.mark.asyncio async def test_update_daily_kline_data_failure(self, task_scheduler): """测试更新日K线数据失败""" with patch.object(task_scheduler.data_manager, "get_daily_kline_data", side_effect=Exception("API错误")): result = await task_scheduler._update_daily_kline_data() assert result is False @pytest.mark.asyncio async def test_update_financial_data_success(self, task_scheduler): """测试更新财务数据成功""" mock_data = [ { "code": "000001", "report_date": "2023-12-31", "eps": 1.5 } ] with patch.object(task_scheduler.data_manager, "get_financial_report", return_value=mock_data): with patch.object(task_scheduler.stock_repo, "save_financial_report_data", return_value=True): result = await task_scheduler._update_financial_data() assert result is True @pytest.mark.asyncio async def test_update_stock_basic_info_success(self, task_scheduler): """测试更新股票基础信息成功""" mock_data = [ { "code": "000001", "name": "平安银行", "market": "主板" } ] with patch.object(task_scheduler.data_manager, "get_stock_basic_info", return_value=mock_data): with patch.object(task_scheduler.stock_repo, "save_stock_basic_info", return_value=True): result = await task_scheduler._update_stock_basic_info() assert result is True @pytest.mark.asyncio async def test_health_check_success(self, task_scheduler): """测试健康检查成功""" with patch.object(task_scheduler.data_manager, "get_stock_basic_info", return_value=[]): with patch.object(task_scheduler.stock_repo, "get_stock_basic_info", return_value=[]): result = await task_scheduler._health_check() assert result is True def test_get_job_status(self, task_scheduler): """测试获取任务状态""" # 启动调度器 with patch.object(task_scheduler, "_configure_jobs"): task_scheduler.start() # 添加一个测试任务 def test_job(): pass task_scheduler._add_job("test_job", "date", test_job, {"run_date": datetime.now() + timedelta(hours=1)}) # 获取任务状态 status = task_scheduler.get_job_status() assert "test_job" in status assert status["test_job"]["next_run_time"] is not None def test_remove_job_success(self, task_scheduler): """测试移除任务成功""" # 启动调度器 with patch.object(task_scheduler, "_configure_jobs"): task_scheduler.start() # 添加一个测试任务 def test_job(): pass task_scheduler._add_job("test_job", "date", test_job, {"run_date": datetime.now() + timedelta(hours=1)}) # 移除任务 result = task_scheduler.remove_job("test_job") assert result is True # 验证任务已被移除 status = task_scheduler.get_job_status() assert "test_job" not in status def test_remove_job_not_found(self, task_scheduler): """测试移除不存在的任务""" # 启动调度器 with patch.object(task_scheduler, "_configure_jobs"): task_scheduler.start() # 移除不存在的任务 result = task_scheduler.remove_job("nonexistent_job") assert result is False class TestIntegration: """集成测试类""" @pytest.mark.asyncio async def test_data_processing_pipeline(self): """测试数据处理流水线""" # 创建数据处理器 processor = DataProcessor() # 模拟原始数据 raw_stock_data = [ { "代码": "000001", "名称": "平安银行", "市场": "主板", "行业": "银行", "地区": "广东", "上市日期": "1991-04-03" } ] # 处理数据 processed_data = processor.process_stock_basic_info(raw_stock_data) # 验证处理结果 assert len(processed_data) == 1 assert processed_data[0]["code"] == "000001" assert processed_data[0]["name"] == "平安银行" assert processed_data[0]["market"] == "主板" assert processed_data[0]["industry"] == "银行" assert processed_data[0]["area"] == "广东" assert processed_data[0]["ipo_date"] == "1991-04-03" @pytest.mark.asyncio async def test_scheduler_integration(self): """测试调度器集成""" scheduler = TaskScheduler() # Mock所有依赖 with patch.object(scheduler, "_configure_jobs"): # 启动调度器 result = await scheduler.start() assert result is True assert scheduler.is_running is True # 停止调度器 result = await scheduler.stop() assert result is True assert scheduler.is_running is False