599 lines
17 KiB
Python
599 lines
17 KiB
Python
"""
|
||
性能测试模块
|
||
测试股票分析系统的性能表现
|
||
"""
|
||
|
||
import pytest
|
||
import asyncio
|
||
import time
|
||
from unittest.mock import Mock, patch
|
||
import tempfile
|
||
import os
|
||
|
||
from src.main import StockAnalysisSystem
|
||
from src.storage.database import DatabaseManager
|
||
from src.storage.stock_repository import StockRepository
|
||
from src.data.data_manager import DataManager
|
||
|
||
|
||
class TestPerformance:
|
||
"""性能测试类"""
|
||
|
||
@pytest.fixture
|
||
def temp_database(self):
|
||
"""创建临时数据库"""
|
||
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
|
||
temp_db_path = tmp.name
|
||
|
||
yield temp_db_path
|
||
|
||
# 清理临时文件
|
||
if os.path.exists(temp_db_path):
|
||
os.unlink(temp_db_path)
|
||
|
||
@pytest.fixture
|
||
def stock_system(self, temp_database):
|
||
"""股票分析系统实例"""
|
||
system = StockAnalysisSystem()
|
||
system.db_config = {
|
||
"database_url": f"sqlite:///{temp_database}",
|
||
"echo": False,
|
||
"pool_size": 10,
|
||
"max_overflow": 20
|
||
}
|
||
return system
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_database_initialization_performance(self, stock_system):
|
||
"""测试数据库初始化性能"""
|
||
# 测量数据库初始化时间
|
||
start_time = time.time()
|
||
|
||
await stock_system.initialize()
|
||
|
||
end_time = time.time()
|
||
execution_time = end_time - start_time
|
||
|
||
# 数据库初始化应该在合理时间内完成
|
||
assert execution_time < 5.0 # 5秒内完成
|
||
|
||
print(f"数据库初始化时间: {execution_time:.3f}秒")
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_batch_data_storage_performance(self, stock_system):
|
||
"""测试批量数据存储性能"""
|
||
# 初始化系统
|
||
await stock_system.initialize()
|
||
|
||
# 生成批量测试数据
|
||
batch_sizes = [100, 500, 1000]
|
||
|
||
for batch_size in batch_sizes:
|
||
batch_data = []
|
||
for i in range(batch_size):
|
||
batch_data.append({
|
||
"code": f"{i:06d}",
|
||
"name": f"测试股票{i}",
|
||
"industry": "测试行业",
|
||
"market": "测试市场",
|
||
"list_date": "2020-01-01"
|
||
})
|
||
|
||
# 测量批量存储时间
|
||
start_time = time.time()
|
||
result = await stock_system.stock_repo.save_stock_basic_info(batch_data)
|
||
end_time = time.time()
|
||
|
||
execution_time = end_time - start_time
|
||
|
||
assert result is True
|
||
|
||
# 计算每秒处理记录数
|
||
records_per_second = batch_size / execution_time
|
||
|
||
print(f"批量存储 {batch_size} 条记录 - 时间: {execution_time:.3f}秒, 速度: {records_per_second:.1f} 条/秒")
|
||
|
||
# 验证性能要求
|
||
assert execution_time < 30.0 # 1000条记录应该在30秒内完成
|
||
assert records_per_second > 30.0 # 最低性能要求:30条/秒
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_batch_kline_data_storage_performance(self, stock_system):
|
||
"""测试批量K线数据存储性能"""
|
||
# 初始化系统
|
||
await stock_system.initialize()
|
||
|
||
# 生成批量K线测试数据
|
||
batch_sizes = [100, 500, 1000]
|
||
|
||
for batch_size in batch_sizes:
|
||
batch_data = []
|
||
for i in range(batch_size):
|
||
batch_data.append({
|
||
"code": "000001",
|
||
"date": f"2024-01-{i+1:02d}",
|
||
"open": 10.0 + i * 0.1,
|
||
"high": 11.0 + i * 0.1,
|
||
"low": 9.0 + i * 0.1,
|
||
"close": 10.5 + i * 0.1,
|
||
"volume": 1000000 + i * 1000,
|
||
"amount": 10500000 + i * 10500
|
||
})
|
||
|
||
# 测量批量存储时间
|
||
start_time = time.time()
|
||
result = await stock_system.stock_repo.save_daily_kline_data(batch_data)
|
||
end_time = time.time()
|
||
|
||
execution_time = end_time - start_time
|
||
|
||
assert result is True
|
||
|
||
# 计算每秒处理记录数
|
||
records_per_second = batch_size / execution_time
|
||
|
||
print(f"批量存储 {batch_size} 条K线记录 - 时间: {execution_time:.3f}秒, 速度: {records_per_second:.1f} 条/秒")
|
||
|
||
# 验证性能要求
|
||
assert execution_time < 30.0 # 1000条记录应该在30秒内完成
|
||
assert records_per_second > 30.0 # 最低性能要求:30条/秒
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_data_query_performance(self, stock_system):
|
||
"""测试数据查询性能"""
|
||
# 初始化系统
|
||
await stock_system.initialize()
|
||
|
||
# 先存储一些测试数据
|
||
test_data = []
|
||
for i in range(1000):
|
||
test_data.append({
|
||
"code": f"{i:06d}",
|
||
"name": f"测试股票{i}",
|
||
"industry": "测试行业",
|
||
"market": "测试市场",
|
||
"list_date": "2020-01-01"
|
||
})
|
||
|
||
await stock_system.stock_repo.save_stock_basic_info(test_data)
|
||
|
||
# 测试单条查询性能
|
||
query_times = []
|
||
|
||
for i in range(10):
|
||
code = f"{i:06d}"
|
||
|
||
start_time = time.time()
|
||
result = await stock_system.stock_repo.get_stock_basic_info(code)
|
||
end_time = time.time()
|
||
|
||
execution_time = end_time - start_time
|
||
query_times.append(execution_time)
|
||
|
||
assert len(result) <= 1 # 应该返回0或1条记录
|
||
|
||
# 计算平均查询时间
|
||
avg_query_time = sum(query_times) / len(query_times)
|
||
max_query_time = max(query_times)
|
||
|
||
print(f"单条查询性能 - 平均时间: {avg_query_time:.3f}秒, 最大时间: {max_query_time:.3f}秒")
|
||
|
||
# 验证性能要求
|
||
assert avg_query_time < 0.1 # 平均查询时间应该小于100毫秒
|
||
assert max_query_time < 0.5 # 最大查询时间应该小于500毫秒
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_batch_query_performance(self, stock_system):
|
||
"""测试批量查询性能"""
|
||
# 初始化系统
|
||
await stock_system.initialize()
|
||
|
||
# 先存储一些K线测试数据
|
||
test_data = []
|
||
for i in range(100): # 100天的数据
|
||
test_data.append({
|
||
"code": "000001",
|
||
"date": f"2024-01-{i+1:02d}",
|
||
"open": 10.0 + i * 0.1,
|
||
"high": 11.0 + i * 0.1,
|
||
"low": 9.0 + i * 0.1,
|
||
"close": 10.5 + i * 0.1,
|
||
"volume": 1000000 + i * 1000,
|
||
"amount": 10500000 + i * 10500
|
||
})
|
||
|
||
await stock_system.stock_repo.save_daily_kline_data(test_data)
|
||
|
||
# 测试批量查询性能
|
||
start_time = time.time()
|
||
result = await stock_system.stock_repo.get_daily_kline_data("000001", "2024-01-01", "2024-04-10")
|
||
end_time = time.time()
|
||
|
||
execution_time = end_time - start_time
|
||
|
||
assert len(result) == 100 # 应该返回100条记录
|
||
|
||
print(f"批量查询100条K线记录 - 时间: {execution_time:.3f}秒")
|
||
|
||
# 验证性能要求
|
||
assert execution_time < 1.0 # 100条记录应该在1秒内完成
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_concurrent_operations_performance(self, stock_system):
|
||
"""测试并发操作性能"""
|
||
# 初始化系统
|
||
await stock_system.initialize()
|
||
|
||
# 定义并发查询任务
|
||
async def query_task(code):
|
||
return await stock_system.stock_repo.get_stock_basic_info(code)
|
||
|
||
# 测试不同并发级别
|
||
concurrency_levels = [5, 10, 20]
|
||
|
||
for concurrency in concurrency_levels:
|
||
tasks = []
|
||
for i in range(concurrency):
|
||
code = f"{i:06d}"
|
||
tasks.append(query_task(code))
|
||
|
||
# 测量并发执行时间
|
||
start_time = time.time()
|
||
results = await asyncio.gather(*tasks)
|
||
end_time = time.time()
|
||
|
||
execution_time = end_time - start_time
|
||
|
||
# 验证所有查询都成功完成
|
||
for result in results:
|
||
assert isinstance(result, list)
|
||
|
||
print(f"并发查询 {concurrency} 个任务 - 时间: {execution_time:.3f}秒")
|
||
|
||
# 验证性能要求
|
||
assert execution_time < 2.0 # 20个并发查询应该在2秒内完成
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_memory_usage_performance(self, stock_system):
|
||
"""测试内存使用性能"""
|
||
import psutil
|
||
import os
|
||
|
||
# 获取当前进程内存使用
|
||
process = psutil.Process(os.getpid())
|
||
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
|
||
|
||
# 初始化系统
|
||
await stock_system.initialize()
|
||
|
||
# 获取初始化后内存使用
|
||
memory_after_init = process.memory_info().rss / 1024 / 1024
|
||
memory_increase = memory_after_init - initial_memory
|
||
|
||
print(f"系统初始化内存增加: {memory_increase:.2f} MB")
|
||
|
||
# 验证内存使用在合理范围内
|
||
assert memory_increase < 100.0 # 系统初始化内存增加应该小于100MB
|
||
|
||
# 测试批量操作内存使用
|
||
batch_data = []
|
||
for i in range(1000):
|
||
batch_data.append({
|
||
"code": f"{i:06d}",
|
||
"name": f"测试股票{i}",
|
||
"industry": "测试行业",
|
||
"market": "测试市场",
|
||
"list_date": "2020-01-01"
|
||
})
|
||
|
||
memory_before_batch = process.memory_info().rss / 1024 / 1024
|
||
|
||
# 执行批量存储
|
||
await stock_system.stock_repo.save_stock_basic_info(batch_data)
|
||
|
||
memory_after_batch = process.memory_info().rss / 1024 / 1024
|
||
batch_memory_increase = memory_after_batch - memory_before_batch
|
||
|
||
print(f"批量存储1000条记录内存增加: {batch_memory_increase:.2f} MB")
|
||
|
||
# 验证批量操作内存使用在合理范围内
|
||
assert batch_memory_increase < 50.0 # 批量存储内存增加应该小于50MB
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_database_connection_pool_performance(self, stock_system):
|
||
"""测试数据库连接池性能"""
|
||
# 初始化系统
|
||
await stock_system.initialize()
|
||
|
||
# 测试连接池的并发处理能力
|
||
async def database_operation(code):
|
||
# 执行数据库操作
|
||
result = await stock_system.stock_repo.get_stock_basic_info(code)
|
||
return result
|
||
|
||
# 模拟高并发场景
|
||
concurrent_operations = 50
|
||
tasks = []
|
||
|
||
for i in range(concurrent_operations):
|
||
code = f"{i % 100:06d}" # 循环使用100个不同的股票代码
|
||
tasks.append(database_operation(code))
|
||
|
||
# 测量并发执行时间
|
||
start_time = time.time()
|
||
results = await asyncio.gather(*tasks)
|
||
end_time = time.time()
|
||
|
||
execution_time = end_time - start_time
|
||
|
||
print(f"数据库连接池处理 {concurrent_operations} 个并发操作 - 时间: {execution_time:.3f}秒")
|
||
|
||
# 验证连接池性能
|
||
assert execution_time < 5.0 # 50个并发操作应该在5秒内完成
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_data_processing_performance(self, stock_system):
|
||
"""测试数据处理性能"""
|
||
# 初始化系统
|
||
await stock_system.initialize()
|
||
|
||
# 生成测试数据
|
||
batch_sizes = [100, 500, 1000]
|
||
|
||
for batch_size in batch_sizes:
|
||
raw_data = []
|
||
for i in range(batch_size):
|
||
raw_data.append({
|
||
"code": f"{i:06d}",
|
||
"name": f"测试股票{i}",
|
||
"industry": "测试行业",
|
||
"market": "测试市场",
|
||
"list_date": "2020-01-01",
|
||
"extra_field1": "should_be_removed",
|
||
"extra_field2": 123.45
|
||
})
|
||
|
||
# 测量数据处理时间
|
||
start_time = time.time()
|
||
processed_data = stock_system.data_processor.process_stock_basic_info(raw_data)
|
||
end_time = time.time()
|
||
|
||
execution_time = end_time - start_time
|
||
|
||
assert len(processed_data) == batch_size
|
||
|
||
# 验证字段过滤
|
||
for item in processed_data:
|
||
assert "extra_field1" not in item
|
||
assert "extra_field2" not in item
|
||
|
||
print(f"处理 {batch_size} 条记录 - 时间: {execution_time:.3f}秒")
|
||
|
||
# 验证性能要求
|
||
assert execution_time < 1.0 # 1000条记录应该在1秒内完成
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_system_startup_performance(self, stock_system):
|
||
"""测试系统启动性能"""
|
||
# 测量完整系统启动时间
|
||
start_time = time.time()
|
||
|
||
await stock_system.initialize()
|
||
|
||
end_time = time.time()
|
||
execution_time = end_time - start_time
|
||
|
||
print(f"系统完整启动时间: {execution_time:.3f}秒")
|
||
|
||
# 验证启动性能
|
||
assert execution_time < 10.0 # 系统启动应该在10秒内完成
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_long_running_performance(self, stock_system):
|
||
"""测试长时间运行性能"""
|
||
# 初始化系统
|
||
await stock_system.initialize()
|
||
|
||
# 模拟长时间运行(执行多次操作)
|
||
operation_count = 100
|
||
operation_times = []
|
||
|
||
for i in range(operation_count):
|
||
code = f"{i % 100:06d}"
|
||
|
||
start_time = time.time()
|
||
result = await stock_system.stock_repo.get_stock_basic_info(code)
|
||
end_time = time.time()
|
||
|
||
execution_time = end_time - start_time
|
||
operation_times.append(execution_time)
|
||
|
||
assert isinstance(result, list)
|
||
|
||
# 分析性能稳定性
|
||
avg_time = sum(operation_times) / len(operation_times)
|
||
max_time = max(operation_times)
|
||
min_time = min(operation_times)
|
||
|
||
print(f"长时间运行性能 - 平均时间: {avg_time:.3f}秒, 最小时间: {min_time:.3f}秒, 最大时间: {max_time:.3f}秒")
|
||
|
||
# 验证性能稳定性
|
||
assert max_time / avg_time < 5.0 # 最大时间不应超过平均时间的5倍
|
||
assert (max_time - min_time) < 0.5 # 时间差异应该小于0.5秒
|
||
|
||
|
||
class TestScalability:
|
||
"""可扩展性测试类"""
|
||
|
||
@pytest.fixture
|
||
def temp_database(self):
|
||
"""创建临时数据库"""
|
||
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
|
||
temp_db_path = tmp.name
|
||
|
||
yield temp_db_path
|
||
|
||
# 清理临时文件
|
||
if os.path.exists(temp_db_path):
|
||
os.unlink(temp_db_path)
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_large_scale_data_storage(self, temp_database):
|
||
"""测试大规模数据存储可扩展性"""
|
||
system = StockAnalysisSystem()
|
||
system.db_config = {
|
||
"database_url": f"sqlite:///{temp_database}",
|
||
"echo": False,
|
||
"pool_size": 20,
|
||
"max_overflow": 50
|
||
}
|
||
|
||
await system.initialize()
|
||
|
||
# 大规模数据测试
|
||
large_batch_sizes = [5000, 10000]
|
||
|
||
for batch_size in large_batch_sizes:
|
||
batch_data = []
|
||
for i in range(batch_size):
|
||
batch_data.append({
|
||
"code": f"{i:06d}",
|
||
"name": f"测试股票{i}",
|
||
"industry": "测试行业",
|
||
"market": "测试市场",
|
||
"list_date": "2020-01-01"
|
||
})
|
||
|
||
# 测量大规模存储时间
|
||
start_time = time.time()
|
||
result = await system.stock_repo.save_stock_basic_info(batch_data)
|
||
end_time = time.time()
|
||
|
||
execution_time = end_time - start_time
|
||
|
||
assert result is True
|
||
|
||
# 计算性能指标
|
||
records_per_second = batch_size / execution_time
|
||
|
||
print(f"大规模存储 {batch_size} 条记录 - 时间: {execution_time:.3f}秒, 速度: {records_per_second:.1f} 条/秒")
|
||
|
||
# 验证可扩展性
|
||
assert execution_time < 300.0 # 10000条记录应该在300秒内完成
|
||
assert records_per_second > 30.0 # 最低性能要求
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_high_concurrency_scalability(self, temp_database):
|
||
"""测试高并发可扩展性"""
|
||
system = StockAnalysisSystem()
|
||
system.db_config = {
|
||
"database_url": f"sqlite:///{temp_database}",
|
||
"echo": False,
|
||
"pool_size": 30,
|
||
"max_overflow": 100
|
||
}
|
||
|
||
await system.initialize()
|
||
|
||
# 高并发测试
|
||
high_concurrency_levels = [50, 100]
|
||
|
||
for concurrency in high_concurrency_levels:
|
||
async def query_task(code):
|
||
return await system.stock_repo.get_stock_basic_info(code)
|
||
|
||
tasks = []
|
||
for i in range(concurrency):
|
||
code = f"{i % 1000:06d}"
|
||
tasks.append(query_task(code))
|
||
|
||
# 测量高并发执行时间
|
||
start_time = time.time()
|
||
results = await asyncio.gather(*tasks)
|
||
end_time = time.time()
|
||
|
||
execution_time = end_time - start_time
|
||
|
||
print(f"高并发 {concurrency} 个查询 - 时间: {execution_time:.3f}秒")
|
||
|
||
# 验证高并发可扩展性
|
||
assert execution_time < 10.0 # 100个并发查询应该在10秒内完成
|
||
|
||
|
||
class TestPerformanceBenchmarks:
|
||
"""性能基准测试类"""
|
||
|
||
@pytest.fixture
|
||
def temp_database(self):
|
||
"""创建临时数据库"""
|
||
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
|
||
temp_db_path = tmp.name
|
||
|
||
yield temp_db_path
|
||
|
||
# 清理临时文件
|
||
if os.path.exists(temp_database):
|
||
os.unlink(temp_database)
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_benchmark_data_collection(self, temp_database):
|
||
"""基准测试:数据采集性能"""
|
||
system = StockAnalysisSystem()
|
||
system.db_config = {
|
||
"database_url": f"sqlite:///{temp_database}",
|
||
"echo": False
|
||
}
|
||
|
||
await system.initialize()
|
||
|
||
# Mock数据采集性能测试
|
||
with patch.object(system.data_manager, "get_stock_basic_info") as mock_collector:
|
||
mock_collector.return_value = [{"code": "000001", "name": "测试", "industry": "测试", "market": "测试", "list_date": "2020-01-01"}]
|
||
|
||
# 测量数据采集时间
|
||
start_time = time.time()
|
||
|
||
# 执行多次数据采集
|
||
for _ in range(100):
|
||
await system.data_manager.get_stock_basic_info()
|
||
|
||
end_time = time.time()
|
||
execution_time = end_time - start_time
|
||
|
||
print(f"数据采集基准测试 - 100次采集时间: {execution_time:.3f}秒")
|
||
|
||
# 基准性能要求
|
||
assert execution_time < 10.0 # 100次采集应该在10秒内完成
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_benchmark_end_to_end_workflow(self, temp_database):
|
||
"""基准测试:端到端工作流性能"""
|
||
system = StockAnalysisSystem()
|
||
system.db_config = {
|
||
"database_url": f"sqlite:///{temp_database}",
|
||
"echo": False
|
||
}
|
||
|
||
await system.initialize()
|
||
|
||
# Mock端到端工作流
|
||
mock_data = [{"code": "000001", "name": "测试", "industry": "测试", "market": "测试", "list_date": "2020-01-01"}]
|
||
|
||
with patch.object(system.data_manager, "get_stock_basic_info", return_value=mock_data):
|
||
# 测量完整工作流时间
|
||
start_time = time.time()
|
||
|
||
# 执行完整工作流:采集→处理→存储
|
||
raw_data = await system.data_manager.get_stock_basic_info()
|
||
processed_data = system.data_processor.process_stock_basic_info(raw_data)
|
||
storage_result = await system.stock_repo.save_stock_basic_info(processed_data)
|
||
|
||
end_time = time.time()
|
||
execution_time = end_time - start_time
|
||
|
||
assert storage_result is True
|
||
|
||
print(f"端到端工作流基准测试 - 时间: {execution_time:.3f}秒")
|
||
|
||
# 基准性能要求
|
||
assert execution_time < 2.0 # 单次完整工作流应该在2秒内完成 |