stock/test_simple_update.py

115 lines
2.8 KiB
Python

"""
简单测试脚本
测试数据更新功能
"""
import sys
import os
import asyncio
import logging
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.data.data_initializer import DataInitializer
from src.config.settings import Settings
from src.storage.database import db_manager
from src.storage.stock_repository import StockRepository
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
async def test_data_update():
"""
测试数据更新功能
"""
try:
logger.info("开始测试数据更新功能...")
# 加载配置
settings = Settings()
logger.info("配置加载成功")
# 创建数据初始化器
initializer = DataInitializer(settings)
logger.info("数据初始化器创建成功")
# 创建存储库
repository = StockRepository(db_manager.get_session())
logger.info("存储库创建成功")
# 获取所有股票代码
stocks = repository.get_stock_basic_info()
logger.info(f"找到{len(stocks)}只股票")
if not stocks:
logger.error("没有股票基础信息,无法测试")
return {"success": False, "error": "没有股票基础信息"}
# 只测试前5只股票
test_stocks = stocks[:5]
logger.info(f"测试前{len(test_stocks)}只股票")
# 测试获取K线数据
total_kline_data = []
for stock in test_stocks:
try:
logger.info(f"测试获取股票{stock.code}的K线数据...")
# 使用数据管理器获取K线数据
kline_data = await initializer.data_manager.get_daily_kline_data(
stock.code,
"2024-01-01",
"2024-01-10"
)
if kline_data:
total_kline_data.extend(kline_data)
logger.info(f"股票{stock.code}获取到{len(kline_data)}条K线数据")
else:
logger.warning(f"股票{stock.code}未获取到K线数据")
except Exception as e:
logger.error(f"获取股票{stock.code}K线数据失败: {str(e)}")
continue
logger.info(f"测试完成,共获取{len(total_kline_data)}条K线数据")
return {
"success": True,
"test_stock_count": len(test_stocks),
"kline_data_count": len(total_kline_data)
}
except Exception as e:
logger.error(f"测试数据更新功能异常: {str(e)}")
return {"success": False, "error": str(e)}
async def main():
"""
主函数
"""
result = await test_data_update()
if result["success"]:
logger.info("数据更新功能测试成功!")
print(f"测试结果: {result}")
else:
logger.error("数据更新功能测试失败!")
print(f"测试失败: {result.get('error')}")
return result
if __name__ == "__main__":
# 运行测试
result = asyncio.run(main())
# 输出最终结果
if result.get("success", False):
print("测试成功!")
sys.exit(0)
else:
print("测试失败!")
sys.exit(1)