stock/tests/test_performance.py

599 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
性能测试模块
测试股票分析系统的性能表现
"""
import pytest
import asyncio
import time
from unittest.mock import Mock, patch
import tempfile
import os
from src.main import StockAnalysisSystem
from src.storage.database import DatabaseManager
from src.storage.stock_repository import StockRepository
from src.data.data_manager import DataManager
class TestPerformance:
"""性能测试类"""
@pytest.fixture
def temp_database(self):
"""创建临时数据库"""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
temp_db_path = tmp.name
yield temp_db_path
# 清理临时文件
if os.path.exists(temp_db_path):
os.unlink(temp_db_path)
@pytest.fixture
def stock_system(self, temp_database):
"""股票分析系统实例"""
system = StockAnalysisSystem()
system.db_config = {
"database_url": f"sqlite:///{temp_database}",
"echo": False,
"pool_size": 10,
"max_overflow": 20
}
return system
@pytest.mark.asyncio
async def test_database_initialization_performance(self, stock_system):
"""测试数据库初始化性能"""
# 测量数据库初始化时间
start_time = time.time()
await stock_system.initialize()
end_time = time.time()
execution_time = end_time - start_time
# 数据库初始化应该在合理时间内完成
assert execution_time < 5.0 # 5秒内完成
print(f"数据库初始化时间: {execution_time:.3f}")
@pytest.mark.asyncio
async def test_batch_data_storage_performance(self, stock_system):
"""测试批量数据存储性能"""
# 初始化系统
await stock_system.initialize()
# 生成批量测试数据
batch_sizes = [100, 500, 1000]
for batch_size in batch_sizes:
batch_data = []
for i in range(batch_size):
batch_data.append({
"code": f"{i:06d}",
"name": f"测试股票{i}",
"industry": "测试行业",
"market": "测试市场",
"list_date": "2020-01-01"
})
# 测量批量存储时间
start_time = time.time()
result = await stock_system.stock_repo.save_stock_basic_info(batch_data)
end_time = time.time()
execution_time = end_time - start_time
assert result is True
# 计算每秒处理记录数
records_per_second = batch_size / execution_time
print(f"批量存储 {batch_size} 条记录 - 时间: {execution_time:.3f}秒, 速度: {records_per_second:.1f} 条/秒")
# 验证性能要求
assert execution_time < 30.0 # 1000条记录应该在30秒内完成
assert records_per_second > 30.0 # 最低性能要求30条/秒
@pytest.mark.asyncio
async def test_batch_kline_data_storage_performance(self, stock_system):
"""测试批量K线数据存储性能"""
# 初始化系统
await stock_system.initialize()
# 生成批量K线测试数据
batch_sizes = [100, 500, 1000]
for batch_size in batch_sizes:
batch_data = []
for i in range(batch_size):
batch_data.append({
"code": "000001",
"date": f"2024-01-{i+1:02d}",
"open": 10.0 + i * 0.1,
"high": 11.0 + i * 0.1,
"low": 9.0 + i * 0.1,
"close": 10.5 + i * 0.1,
"volume": 1000000 + i * 1000,
"amount": 10500000 + i * 10500
})
# 测量批量存储时间
start_time = time.time()
result = await stock_system.stock_repo.save_daily_kline_data(batch_data)
end_time = time.time()
execution_time = end_time - start_time
assert result is True
# 计算每秒处理记录数
records_per_second = batch_size / execution_time
print(f"批量存储 {batch_size} 条K线记录 - 时间: {execution_time:.3f}秒, 速度: {records_per_second:.1f} 条/秒")
# 验证性能要求
assert execution_time < 30.0 # 1000条记录应该在30秒内完成
assert records_per_second > 30.0 # 最低性能要求30条/秒
@pytest.mark.asyncio
async def test_data_query_performance(self, stock_system):
"""测试数据查询性能"""
# 初始化系统
await stock_system.initialize()
# 先存储一些测试数据
test_data = []
for i in range(1000):
test_data.append({
"code": f"{i:06d}",
"name": f"测试股票{i}",
"industry": "测试行业",
"market": "测试市场",
"list_date": "2020-01-01"
})
await stock_system.stock_repo.save_stock_basic_info(test_data)
# 测试单条查询性能
query_times = []
for i in range(10):
code = f"{i:06d}"
start_time = time.time()
result = await stock_system.stock_repo.get_stock_basic_info(code)
end_time = time.time()
execution_time = end_time - start_time
query_times.append(execution_time)
assert len(result) <= 1 # 应该返回0或1条记录
# 计算平均查询时间
avg_query_time = sum(query_times) / len(query_times)
max_query_time = max(query_times)
print(f"单条查询性能 - 平均时间: {avg_query_time:.3f}秒, 最大时间: {max_query_time:.3f}")
# 验证性能要求
assert avg_query_time < 0.1 # 平均查询时间应该小于100毫秒
assert max_query_time < 0.5 # 最大查询时间应该小于500毫秒
@pytest.mark.asyncio
async def test_batch_query_performance(self, stock_system):
"""测试批量查询性能"""
# 初始化系统
await stock_system.initialize()
# 先存储一些K线测试数据
test_data = []
for i in range(100): # 100天的数据
test_data.append({
"code": "000001",
"date": f"2024-01-{i+1:02d}",
"open": 10.0 + i * 0.1,
"high": 11.0 + i * 0.1,
"low": 9.0 + i * 0.1,
"close": 10.5 + i * 0.1,
"volume": 1000000 + i * 1000,
"amount": 10500000 + i * 10500
})
await stock_system.stock_repo.save_daily_kline_data(test_data)
# 测试批量查询性能
start_time = time.time()
result = await stock_system.stock_repo.get_daily_kline_data("000001", "2024-01-01", "2024-04-10")
end_time = time.time()
execution_time = end_time - start_time
assert len(result) == 100 # 应该返回100条记录
print(f"批量查询100条K线记录 - 时间: {execution_time:.3f}")
# 验证性能要求
assert execution_time < 1.0 # 100条记录应该在1秒内完成
@pytest.mark.asyncio
async def test_concurrent_operations_performance(self, stock_system):
"""测试并发操作性能"""
# 初始化系统
await stock_system.initialize()
# 定义并发查询任务
async def query_task(code):
return await stock_system.stock_repo.get_stock_basic_info(code)
# 测试不同并发级别
concurrency_levels = [5, 10, 20]
for concurrency in concurrency_levels:
tasks = []
for i in range(concurrency):
code = f"{i:06d}"
tasks.append(query_task(code))
# 测量并发执行时间
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
execution_time = end_time - start_time
# 验证所有查询都成功完成
for result in results:
assert isinstance(result, list)
print(f"并发查询 {concurrency} 个任务 - 时间: {execution_time:.3f}")
# 验证性能要求
assert execution_time < 2.0 # 20个并发查询应该在2秒内完成
@pytest.mark.asyncio
async def test_memory_usage_performance(self, stock_system):
"""测试内存使用性能"""
import psutil
import os
# 获取当前进程内存使用
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
# 初始化系统
await stock_system.initialize()
# 获取初始化后内存使用
memory_after_init = process.memory_info().rss / 1024 / 1024
memory_increase = memory_after_init - initial_memory
print(f"系统初始化内存增加: {memory_increase:.2f} MB")
# 验证内存使用在合理范围内
assert memory_increase < 100.0 # 系统初始化内存增加应该小于100MB
# 测试批量操作内存使用
batch_data = []
for i in range(1000):
batch_data.append({
"code": f"{i:06d}",
"name": f"测试股票{i}",
"industry": "测试行业",
"market": "测试市场",
"list_date": "2020-01-01"
})
memory_before_batch = process.memory_info().rss / 1024 / 1024
# 执行批量存储
await stock_system.stock_repo.save_stock_basic_info(batch_data)
memory_after_batch = process.memory_info().rss / 1024 / 1024
batch_memory_increase = memory_after_batch - memory_before_batch
print(f"批量存储1000条记录内存增加: {batch_memory_increase:.2f} MB")
# 验证批量操作内存使用在合理范围内
assert batch_memory_increase < 50.0 # 批量存储内存增加应该小于50MB
@pytest.mark.asyncio
async def test_database_connection_pool_performance(self, stock_system):
"""测试数据库连接池性能"""
# 初始化系统
await stock_system.initialize()
# 测试连接池的并发处理能力
async def database_operation(code):
# 执行数据库操作
result = await stock_system.stock_repo.get_stock_basic_info(code)
return result
# 模拟高并发场景
concurrent_operations = 50
tasks = []
for i in range(concurrent_operations):
code = f"{i % 100:06d}" # 循环使用100个不同的股票代码
tasks.append(database_operation(code))
# 测量并发执行时间
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
execution_time = end_time - start_time
print(f"数据库连接池处理 {concurrent_operations} 个并发操作 - 时间: {execution_time:.3f}")
# 验证连接池性能
assert execution_time < 5.0 # 50个并发操作应该在5秒内完成
@pytest.mark.asyncio
async def test_data_processing_performance(self, stock_system):
"""测试数据处理性能"""
# 初始化系统
await stock_system.initialize()
# 生成测试数据
batch_sizes = [100, 500, 1000]
for batch_size in batch_sizes:
raw_data = []
for i in range(batch_size):
raw_data.append({
"code": f"{i:06d}",
"name": f"测试股票{i}",
"industry": "测试行业",
"market": "测试市场",
"list_date": "2020-01-01",
"extra_field1": "should_be_removed",
"extra_field2": 123.45
})
# 测量数据处理时间
start_time = time.time()
processed_data = stock_system.data_processor.process_stock_basic_info(raw_data)
end_time = time.time()
execution_time = end_time - start_time
assert len(processed_data) == batch_size
# 验证字段过滤
for item in processed_data:
assert "extra_field1" not in item
assert "extra_field2" not in item
print(f"处理 {batch_size} 条记录 - 时间: {execution_time:.3f}")
# 验证性能要求
assert execution_time < 1.0 # 1000条记录应该在1秒内完成
@pytest.mark.asyncio
async def test_system_startup_performance(self, stock_system):
"""测试系统启动性能"""
# 测量完整系统启动时间
start_time = time.time()
await stock_system.initialize()
end_time = time.time()
execution_time = end_time - start_time
print(f"系统完整启动时间: {execution_time:.3f}")
# 验证启动性能
assert execution_time < 10.0 # 系统启动应该在10秒内完成
@pytest.mark.asyncio
async def test_long_running_performance(self, stock_system):
"""测试长时间运行性能"""
# 初始化系统
await stock_system.initialize()
# 模拟长时间运行(执行多次操作)
operation_count = 100
operation_times = []
for i in range(operation_count):
code = f"{i % 100:06d}"
start_time = time.time()
result = await stock_system.stock_repo.get_stock_basic_info(code)
end_time = time.time()
execution_time = end_time - start_time
operation_times.append(execution_time)
assert isinstance(result, list)
# 分析性能稳定性
avg_time = sum(operation_times) / len(operation_times)
max_time = max(operation_times)
min_time = min(operation_times)
print(f"长时间运行性能 - 平均时间: {avg_time:.3f}秒, 最小时间: {min_time:.3f}秒, 最大时间: {max_time:.3f}")
# 验证性能稳定性
assert max_time / avg_time < 5.0 # 最大时间不应超过平均时间的5倍
assert (max_time - min_time) < 0.5 # 时间差异应该小于0.5秒
class TestScalability:
"""可扩展性测试类"""
@pytest.fixture
def temp_database(self):
"""创建临时数据库"""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
temp_db_path = tmp.name
yield temp_db_path
# 清理临时文件
if os.path.exists(temp_db_path):
os.unlink(temp_db_path)
@pytest.mark.asyncio
async def test_large_scale_data_storage(self, temp_database):
"""测试大规模数据存储可扩展性"""
system = StockAnalysisSystem()
system.db_config = {
"database_url": f"sqlite:///{temp_database}",
"echo": False,
"pool_size": 20,
"max_overflow": 50
}
await system.initialize()
# 大规模数据测试
large_batch_sizes = [5000, 10000]
for batch_size in large_batch_sizes:
batch_data = []
for i in range(batch_size):
batch_data.append({
"code": f"{i:06d}",
"name": f"测试股票{i}",
"industry": "测试行业",
"market": "测试市场",
"list_date": "2020-01-01"
})
# 测量大规模存储时间
start_time = time.time()
result = await system.stock_repo.save_stock_basic_info(batch_data)
end_time = time.time()
execution_time = end_time - start_time
assert result is True
# 计算性能指标
records_per_second = batch_size / execution_time
print(f"大规模存储 {batch_size} 条记录 - 时间: {execution_time:.3f}秒, 速度: {records_per_second:.1f} 条/秒")
# 验证可扩展性
assert execution_time < 300.0 # 10000条记录应该在300秒内完成
assert records_per_second > 30.0 # 最低性能要求
@pytest.mark.asyncio
async def test_high_concurrency_scalability(self, temp_database):
"""测试高并发可扩展性"""
system = StockAnalysisSystem()
system.db_config = {
"database_url": f"sqlite:///{temp_database}",
"echo": False,
"pool_size": 30,
"max_overflow": 100
}
await system.initialize()
# 高并发测试
high_concurrency_levels = [50, 100]
for concurrency in high_concurrency_levels:
async def query_task(code):
return await system.stock_repo.get_stock_basic_info(code)
tasks = []
for i in range(concurrency):
code = f"{i % 1000:06d}"
tasks.append(query_task(code))
# 测量高并发执行时间
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
execution_time = end_time - start_time
print(f"高并发 {concurrency} 个查询 - 时间: {execution_time:.3f}")
# 验证高并发可扩展性
assert execution_time < 10.0 # 100个并发查询应该在10秒内完成
class TestPerformanceBenchmarks:
"""性能基准测试类"""
@pytest.fixture
def temp_database(self):
"""创建临时数据库"""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
temp_db_path = tmp.name
yield temp_db_path
# 清理临时文件
if os.path.exists(temp_database):
os.unlink(temp_database)
@pytest.mark.asyncio
async def test_benchmark_data_collection(self, temp_database):
"""基准测试:数据采集性能"""
system = StockAnalysisSystem()
system.db_config = {
"database_url": f"sqlite:///{temp_database}",
"echo": False
}
await system.initialize()
# Mock数据采集性能测试
with patch.object(system.data_manager, "get_stock_basic_info") as mock_collector:
mock_collector.return_value = [{"code": "000001", "name": "测试", "industry": "测试", "market": "测试", "list_date": "2020-01-01"}]
# 测量数据采集时间
start_time = time.time()
# 执行多次数据采集
for _ in range(100):
await system.data_manager.get_stock_basic_info()
end_time = time.time()
execution_time = end_time - start_time
print(f"数据采集基准测试 - 100次采集时间: {execution_time:.3f}")
# 基准性能要求
assert execution_time < 10.0 # 100次采集应该在10秒内完成
@pytest.mark.asyncio
async def test_benchmark_end_to_end_workflow(self, temp_database):
"""基准测试:端到端工作流性能"""
system = StockAnalysisSystem()
system.db_config = {
"database_url": f"sqlite:///{temp_database}",
"echo": False
}
await system.initialize()
# Mock端到端工作流
mock_data = [{"code": "000001", "name": "测试", "industry": "测试", "market": "测试", "list_date": "2020-01-01"}]
with patch.object(system.data_manager, "get_stock_basic_info", return_value=mock_data):
# 测量完整工作流时间
start_time = time.time()
# 执行完整工作流:采集→处理→存储
raw_data = await system.data_manager.get_stock_basic_info()
processed_data = system.data_processor.process_stock_basic_info(raw_data)
storage_result = await system.stock_repo.save_stock_basic_info(processed_data)
end_time = time.time()
execution_time = end_time - start_time
assert storage_result is True
print(f"端到端工作流基准测试 - 时间: {execution_time:.3f}")
# 基准性能要求
assert execution_time < 2.0 # 单次完整工作流应该在2秒内完成