""" 性能测试模块 测试股票分析系统的性能表现 """ import pytest import asyncio import time from unittest.mock import Mock, patch import tempfile import os from src.main import StockAnalysisSystem from src.storage.database import DatabaseManager from src.storage.stock_repository import StockRepository from src.data.data_manager import DataManager class TestPerformance: """性能测试类""" @pytest.fixture def temp_database(self): """创建临时数据库""" with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp: temp_db_path = tmp.name yield temp_db_path # 清理临时文件 if os.path.exists(temp_db_path): os.unlink(temp_db_path) @pytest.fixture def stock_system(self, temp_database): """股票分析系统实例""" system = StockAnalysisSystem() system.db_config = { "database_url": f"sqlite:///{temp_database}", "echo": False, "pool_size": 10, "max_overflow": 20 } return system @pytest.mark.asyncio async def test_database_initialization_performance(self, stock_system): """测试数据库初始化性能""" # 测量数据库初始化时间 start_time = time.time() await stock_system.initialize() end_time = time.time() execution_time = end_time - start_time # 数据库初始化应该在合理时间内完成 assert execution_time < 5.0 # 5秒内完成 print(f"数据库初始化时间: {execution_time:.3f}秒") @pytest.mark.asyncio async def test_batch_data_storage_performance(self, stock_system): """测试批量数据存储性能""" # 初始化系统 await stock_system.initialize() # 生成批量测试数据 batch_sizes = [100, 500, 1000] for batch_size in batch_sizes: batch_data = [] for i in range(batch_size): batch_data.append({ "code": f"{i:06d}", "name": f"测试股票{i}", "industry": "测试行业", "market": "测试市场", "list_date": "2020-01-01" }) # 测量批量存储时间 start_time = time.time() result = await stock_system.stock_repo.save_stock_basic_info(batch_data) end_time = time.time() execution_time = end_time - start_time assert result is True # 计算每秒处理记录数 records_per_second = batch_size / execution_time print(f"批量存储 {batch_size} 条记录 - 时间: {execution_time:.3f}秒, 速度: {records_per_second:.1f} 条/秒") # 验证性能要求 assert execution_time < 30.0 # 1000条记录应该在30秒内完成 assert records_per_second > 30.0 # 最低性能要求:30条/秒 @pytest.mark.asyncio async def test_batch_kline_data_storage_performance(self, stock_system): """测试批量K线数据存储性能""" # 初始化系统 await stock_system.initialize() # 生成批量K线测试数据 batch_sizes = [100, 500, 1000] for batch_size in batch_sizes: batch_data = [] for i in range(batch_size): batch_data.append({ "code": "000001", "date": f"2024-01-{i+1:02d}", "open": 10.0 + i * 0.1, "high": 11.0 + i * 0.1, "low": 9.0 + i * 0.1, "close": 10.5 + i * 0.1, "volume": 1000000 + i * 1000, "amount": 10500000 + i * 10500 }) # 测量批量存储时间 start_time = time.time() result = await stock_system.stock_repo.save_daily_kline_data(batch_data) end_time = time.time() execution_time = end_time - start_time assert result is True # 计算每秒处理记录数 records_per_second = batch_size / execution_time print(f"批量存储 {batch_size} 条K线记录 - 时间: {execution_time:.3f}秒, 速度: {records_per_second:.1f} 条/秒") # 验证性能要求 assert execution_time < 30.0 # 1000条记录应该在30秒内完成 assert records_per_second > 30.0 # 最低性能要求:30条/秒 @pytest.mark.asyncio async def test_data_query_performance(self, stock_system): """测试数据查询性能""" # 初始化系统 await stock_system.initialize() # 先存储一些测试数据 test_data = [] for i in range(1000): test_data.append({ "code": f"{i:06d}", "name": f"测试股票{i}", "industry": "测试行业", "market": "测试市场", "list_date": "2020-01-01" }) await stock_system.stock_repo.save_stock_basic_info(test_data) # 测试单条查询性能 query_times = [] for i in range(10): code = f"{i:06d}" start_time = time.time() result = await stock_system.stock_repo.get_stock_basic_info(code) end_time = time.time() execution_time = end_time - start_time query_times.append(execution_time) assert len(result) <= 1 # 应该返回0或1条记录 # 计算平均查询时间 avg_query_time = sum(query_times) / len(query_times) max_query_time = max(query_times) print(f"单条查询性能 - 平均时间: {avg_query_time:.3f}秒, 最大时间: {max_query_time:.3f}秒") # 验证性能要求 assert avg_query_time < 0.1 # 平均查询时间应该小于100毫秒 assert max_query_time < 0.5 # 最大查询时间应该小于500毫秒 @pytest.mark.asyncio async def test_batch_query_performance(self, stock_system): """测试批量查询性能""" # 初始化系统 await stock_system.initialize() # 先存储一些K线测试数据 test_data = [] for i in range(100): # 100天的数据 test_data.append({ "code": "000001", "date": f"2024-01-{i+1:02d}", "open": 10.0 + i * 0.1, "high": 11.0 + i * 0.1, "low": 9.0 + i * 0.1, "close": 10.5 + i * 0.1, "volume": 1000000 + i * 1000, "amount": 10500000 + i * 10500 }) await stock_system.stock_repo.save_daily_kline_data(test_data) # 测试批量查询性能 start_time = time.time() result = await stock_system.stock_repo.get_daily_kline_data("000001", "2024-01-01", "2024-04-10") end_time = time.time() execution_time = end_time - start_time assert len(result) == 100 # 应该返回100条记录 print(f"批量查询100条K线记录 - 时间: {execution_time:.3f}秒") # 验证性能要求 assert execution_time < 1.0 # 100条记录应该在1秒内完成 @pytest.mark.asyncio async def test_concurrent_operations_performance(self, stock_system): """测试并发操作性能""" # 初始化系统 await stock_system.initialize() # 定义并发查询任务 async def query_task(code): return await stock_system.stock_repo.get_stock_basic_info(code) # 测试不同并发级别 concurrency_levels = [5, 10, 20] for concurrency in concurrency_levels: tasks = [] for i in range(concurrency): code = f"{i:06d}" tasks.append(query_task(code)) # 测量并发执行时间 start_time = time.time() results = await asyncio.gather(*tasks) end_time = time.time() execution_time = end_time - start_time # 验证所有查询都成功完成 for result in results: assert isinstance(result, list) print(f"并发查询 {concurrency} 个任务 - 时间: {execution_time:.3f}秒") # 验证性能要求 assert execution_time < 2.0 # 20个并发查询应该在2秒内完成 @pytest.mark.asyncio async def test_memory_usage_performance(self, stock_system): """测试内存使用性能""" import psutil import os # 获取当前进程内存使用 process = psutil.Process(os.getpid()) initial_memory = process.memory_info().rss / 1024 / 1024 # MB # 初始化系统 await stock_system.initialize() # 获取初始化后内存使用 memory_after_init = process.memory_info().rss / 1024 / 1024 memory_increase = memory_after_init - initial_memory print(f"系统初始化内存增加: {memory_increase:.2f} MB") # 验证内存使用在合理范围内 assert memory_increase < 100.0 # 系统初始化内存增加应该小于100MB # 测试批量操作内存使用 batch_data = [] for i in range(1000): batch_data.append({ "code": f"{i:06d}", "name": f"测试股票{i}", "industry": "测试行业", "market": "测试市场", "list_date": "2020-01-01" }) memory_before_batch = process.memory_info().rss / 1024 / 1024 # 执行批量存储 await stock_system.stock_repo.save_stock_basic_info(batch_data) memory_after_batch = process.memory_info().rss / 1024 / 1024 batch_memory_increase = memory_after_batch - memory_before_batch print(f"批量存储1000条记录内存增加: {batch_memory_increase:.2f} MB") # 验证批量操作内存使用在合理范围内 assert batch_memory_increase < 50.0 # 批量存储内存增加应该小于50MB @pytest.mark.asyncio async def test_database_connection_pool_performance(self, stock_system): """测试数据库连接池性能""" # 初始化系统 await stock_system.initialize() # 测试连接池的并发处理能力 async def database_operation(code): # 执行数据库操作 result = await stock_system.stock_repo.get_stock_basic_info(code) return result # 模拟高并发场景 concurrent_operations = 50 tasks = [] for i in range(concurrent_operations): code = f"{i % 100:06d}" # 循环使用100个不同的股票代码 tasks.append(database_operation(code)) # 测量并发执行时间 start_time = time.time() results = await asyncio.gather(*tasks) end_time = time.time() execution_time = end_time - start_time print(f"数据库连接池处理 {concurrent_operations} 个并发操作 - 时间: {execution_time:.3f}秒") # 验证连接池性能 assert execution_time < 5.0 # 50个并发操作应该在5秒内完成 @pytest.mark.asyncio async def test_data_processing_performance(self, stock_system): """测试数据处理性能""" # 初始化系统 await stock_system.initialize() # 生成测试数据 batch_sizes = [100, 500, 1000] for batch_size in batch_sizes: raw_data = [] for i in range(batch_size): raw_data.append({ "code": f"{i:06d}", "name": f"测试股票{i}", "industry": "测试行业", "market": "测试市场", "list_date": "2020-01-01", "extra_field1": "should_be_removed", "extra_field2": 123.45 }) # 测量数据处理时间 start_time = time.time() processed_data = stock_system.data_processor.process_stock_basic_info(raw_data) end_time = time.time() execution_time = end_time - start_time assert len(processed_data) == batch_size # 验证字段过滤 for item in processed_data: assert "extra_field1" not in item assert "extra_field2" not in item print(f"处理 {batch_size} 条记录 - 时间: {execution_time:.3f}秒") # 验证性能要求 assert execution_time < 1.0 # 1000条记录应该在1秒内完成 @pytest.mark.asyncio async def test_system_startup_performance(self, stock_system): """测试系统启动性能""" # 测量完整系统启动时间 start_time = time.time() await stock_system.initialize() end_time = time.time() execution_time = end_time - start_time print(f"系统完整启动时间: {execution_time:.3f}秒") # 验证启动性能 assert execution_time < 10.0 # 系统启动应该在10秒内完成 @pytest.mark.asyncio async def test_long_running_performance(self, stock_system): """测试长时间运行性能""" # 初始化系统 await stock_system.initialize() # 模拟长时间运行(执行多次操作) operation_count = 100 operation_times = [] for i in range(operation_count): code = f"{i % 100:06d}" start_time = time.time() result = await stock_system.stock_repo.get_stock_basic_info(code) end_time = time.time() execution_time = end_time - start_time operation_times.append(execution_time) assert isinstance(result, list) # 分析性能稳定性 avg_time = sum(operation_times) / len(operation_times) max_time = max(operation_times) min_time = min(operation_times) print(f"长时间运行性能 - 平均时间: {avg_time:.3f}秒, 最小时间: {min_time:.3f}秒, 最大时间: {max_time:.3f}秒") # 验证性能稳定性 assert max_time / avg_time < 5.0 # 最大时间不应超过平均时间的5倍 assert (max_time - min_time) < 0.5 # 时间差异应该小于0.5秒 class TestScalability: """可扩展性测试类""" @pytest.fixture def temp_database(self): """创建临时数据库""" with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp: temp_db_path = tmp.name yield temp_db_path # 清理临时文件 if os.path.exists(temp_db_path): os.unlink(temp_db_path) @pytest.mark.asyncio async def test_large_scale_data_storage(self, temp_database): """测试大规模数据存储可扩展性""" system = StockAnalysisSystem() system.db_config = { "database_url": f"sqlite:///{temp_database}", "echo": False, "pool_size": 20, "max_overflow": 50 } await system.initialize() # 大规模数据测试 large_batch_sizes = [5000, 10000] for batch_size in large_batch_sizes: batch_data = [] for i in range(batch_size): batch_data.append({ "code": f"{i:06d}", "name": f"测试股票{i}", "industry": "测试行业", "market": "测试市场", "list_date": "2020-01-01" }) # 测量大规模存储时间 start_time = time.time() result = await system.stock_repo.save_stock_basic_info(batch_data) end_time = time.time() execution_time = end_time - start_time assert result is True # 计算性能指标 records_per_second = batch_size / execution_time print(f"大规模存储 {batch_size} 条记录 - 时间: {execution_time:.3f}秒, 速度: {records_per_second:.1f} 条/秒") # 验证可扩展性 assert execution_time < 300.0 # 10000条记录应该在300秒内完成 assert records_per_second > 30.0 # 最低性能要求 @pytest.mark.asyncio async def test_high_concurrency_scalability(self, temp_database): """测试高并发可扩展性""" system = StockAnalysisSystem() system.db_config = { "database_url": f"sqlite:///{temp_database}", "echo": False, "pool_size": 30, "max_overflow": 100 } await system.initialize() # 高并发测试 high_concurrency_levels = [50, 100] for concurrency in high_concurrency_levels: async def query_task(code): return await system.stock_repo.get_stock_basic_info(code) tasks = [] for i in range(concurrency): code = f"{i % 1000:06d}" tasks.append(query_task(code)) # 测量高并发执行时间 start_time = time.time() results = await asyncio.gather(*tasks) end_time = time.time() execution_time = end_time - start_time print(f"高并发 {concurrency} 个查询 - 时间: {execution_time:.3f}秒") # 验证高并发可扩展性 assert execution_time < 10.0 # 100个并发查询应该在10秒内完成 class TestPerformanceBenchmarks: """性能基准测试类""" @pytest.fixture def temp_database(self): """创建临时数据库""" with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp: temp_db_path = tmp.name yield temp_db_path # 清理临时文件 if os.path.exists(temp_database): os.unlink(temp_database) @pytest.mark.asyncio async def test_benchmark_data_collection(self, temp_database): """基准测试:数据采集性能""" system = StockAnalysisSystem() system.db_config = { "database_url": f"sqlite:///{temp_database}", "echo": False } await system.initialize() # Mock数据采集性能测试 with patch.object(system.data_manager, "get_stock_basic_info") as mock_collector: mock_collector.return_value = [{"code": "000001", "name": "测试", "industry": "测试", "market": "测试", "list_date": "2020-01-01"}] # 测量数据采集时间 start_time = time.time() # 执行多次数据采集 for _ in range(100): await system.data_manager.get_stock_basic_info() end_time = time.time() execution_time = end_time - start_time print(f"数据采集基准测试 - 100次采集时间: {execution_time:.3f}秒") # 基准性能要求 assert execution_time < 10.0 # 100次采集应该在10秒内完成 @pytest.mark.asyncio async def test_benchmark_end_to_end_workflow(self, temp_database): """基准测试:端到端工作流性能""" system = StockAnalysisSystem() system.db_config = { "database_url": f"sqlite:///{temp_database}", "echo": False } await system.initialize() # Mock端到端工作流 mock_data = [{"code": "000001", "name": "测试", "industry": "测试", "market": "测试", "list_date": "2020-01-01"}] with patch.object(system.data_manager, "get_stock_basic_info", return_value=mock_data): # 测量完整工作流时间 start_time = time.time() # 执行完整工作流:采集→处理→存储 raw_data = await system.data_manager.get_stock_basic_info() processed_data = system.data_processor.process_stock_basic_info(raw_data) storage_result = await system.stock_repo.save_stock_basic_info(processed_data) end_time = time.time() execution_time = end_time - start_time assert storage_result is True print(f"端到端工作流基准测试 - 时间: {execution_time:.3f}秒") # 基准性能要求 assert execution_time < 2.0 # 单次完整工作流应该在2秒内完成