""" 简单测试脚本 测试数据更新功能 """ import sys import os import asyncio import logging sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.data.data_initializer import DataInitializer from src.config.settings import Settings from src.storage.database import db_manager from src.storage.stock_repository import StockRepository # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) async def test_data_update(): """ 测试数据更新功能 """ try: logger.info("开始测试数据更新功能...") # 加载配置 settings = Settings() logger.info("配置加载成功") # 创建数据初始化器 initializer = DataInitializer(settings) logger.info("数据初始化器创建成功") # 创建存储库 repository = StockRepository(db_manager.get_session()) logger.info("存储库创建成功") # 获取所有股票代码 stocks = repository.get_stock_basic_info() logger.info(f"找到{len(stocks)}只股票") if not stocks: logger.error("没有股票基础信息,无法测试") return {"success": False, "error": "没有股票基础信息"} # 只测试前5只股票 test_stocks = stocks[:5] logger.info(f"测试前{len(test_stocks)}只股票") # 测试获取K线数据 total_kline_data = [] for stock in test_stocks: try: logger.info(f"测试获取股票{stock.code}的K线数据...") # 使用数据管理器获取K线数据 kline_data = await initializer.data_manager.get_daily_kline_data( stock.code, "2024-01-01", "2024-01-10" ) if kline_data: total_kline_data.extend(kline_data) logger.info(f"股票{stock.code}获取到{len(kline_data)}条K线数据") else: logger.warning(f"股票{stock.code}未获取到K线数据") except Exception as e: logger.error(f"获取股票{stock.code}K线数据失败: {str(e)}") continue logger.info(f"测试完成,共获取{len(total_kline_data)}条K线数据") return { "success": True, "test_stock_count": len(test_stocks), "kline_data_count": len(total_kline_data) } except Exception as e: logger.error(f"测试数据更新功能异常: {str(e)}") return {"success": False, "error": str(e)} async def main(): """ 主函数 """ result = await test_data_update() if result["success"]: logger.info("数据更新功能测试成功!") print(f"测试结果: {result}") else: logger.error("数据更新功能测试失败!") print(f"测试失败: {result.get('error')}") return result if __name__ == "__main__": # 运行测试 result = asyncio.run(main()) # 输出最终结果 if result.get("success", False): print("测试成功!") sys.exit(0) else: print("测试失败!") sys.exit(1)