""" 兼容Baostock格式的财务数据更新脚本 将6位股票代码转换为9位格式进行财务数据采集 """ import sys import os import asyncio import logging from datetime import date, datetime sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.storage.database import db_manager from src.storage.stock_repository import StockRepository from src.data.data_manager import DataManager from src.config.settings import Settings # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def convert_to_baostock_format(stock_code: str) -> str: """ 将6位股票代码转换为Baostock格式(9位) Args: stock_code: 6位股票代码 Returns: 9位Baostock格式股票代码 """ if len(stock_code) == 6: # 判断市场类型 if stock_code.startswith(('6', '9')): return f"sh.{stock_code}" elif stock_code.startswith(('0', '3')): return f"sz.{stock_code}" else: return stock_code return stock_code async def update_financial_data_baostock(): """ 使用Baostock格式更新财务数据 """ try: logger.info("开始使用Baostock格式更新财务数据...") # 加载配置 settings = Settings() logger.info("配置加载成功") # 创建数据管理器 data_manager = DataManager() logger.info("数据管理器创建成功") # 创建存储库 repository = StockRepository(db_manager.get_session()) logger.info("存储库创建成功") # 获取股票基础信息 stocks = repository.get_stock_basic_info() logger.info(f"获取到{len(stocks)}只股票基础信息") if not stocks: logger.error("没有股票基础信息,无法更新财务数据") return {"success": False, "error": "没有股票基础信息"} # 选择前20只股票进行测试 test_stocks = stocks[:20] test_codes = [stock.code for stock in test_stocks] logger.info(f"测试股票代码: {test_codes}") # 设置测试年份和季度 test_year = 2023 test_quarter = 4 total_financial_data = [] success_count = 0 error_count = 0 # 为每只测试股票获取财务数据 for stock in test_stocks: try: # 转换为Baostock格式 baostock_code = convert_to_baostock_format(stock.code) logger.info(f"获取股票{stock.code}({baostock_code})的财务数据...") # 使用数据管理器获取财务数据 financial_data = await data_manager.get_financial_report( baostock_code, test_year, test_quarter ) if financial_data: # 将数据中的代码转换回6位格式 for data in financial_data: data["code"] = stock.code # 使用原始6位代码 total_financial_data.extend(financial_data) success_count += 1 logger.info(f"股票{stock.code}获取到{len(financial_data)}条财务数据") else: logger.warning(f"股票{stock.code}未获取到财务数据") error_count += 1 # 小延迟避免请求过快 await asyncio.sleep(0.5) except Exception as e: logger.error(f"获取股票{stock.code}财务数据失败: {str(e)}") error_count += 1 continue # 保存财务数据 if total_financial_data: try: logger.info(f"开始保存{len(total_financial_data)}条财务数据...") save_result = repository.save_financial_report_data(total_financial_data) logger.info(f"财务数据保存结果: {save_result}") except Exception as e: logger.error(f"保存财务数据失败: {str(e)}") error_count += len(test_stocks) else: logger.warning("没有获取到任何财务数据") # 验证数据库中的财务数据 try: financial_count = repository.session.query(repository.FinancialReport).count() logger.info(f"财务报告表: {financial_count}条记录") if financial_count > 0: # 显示最新的5条财务数据 latest_financial = repository.session.query(repository.FinancialReport).order_by( repository.FinancialReport.report_date.desc() ).limit(5).all() logger.info("最新的5条财务数据:") for i, financial in enumerate(latest_financial): logger.info(f" {i+1}. {financial.stock_code} - {financial.report_date} - EPS: {financial.eps}") except Exception as e: logger.error(f"查询财务数据失败: {str(e)}") # 汇总更新结果 result = { "success": True, "test_stocks": len(test_stocks), "success_count": success_count, "error_count": error_count, "financial_data_count": len(total_financial_data), "saved_count": save_result.get("added_count", 0) + save_result.get("updated_count", 0) if total_financial_data else 0 } logger.info(f"财务数据更新完成: {result}") return result except Exception as e: logger.error(f"财务数据更新异常: {str(e)}") return {"success": False, "error": str(e)} def main(): """ 主函数 """ logger.info("开始兼容Baostock格式的财务数据更新...") # 运行异步更新 result = asyncio.run(update_financial_data_baostock()) if result["success"]: logger.info("财务数据更新成功!") print(f"更新结果: {result}") if result["financial_data_count"] > 0: print("✓ 财务数据获取成功") print(f"✓ 共获取{result['financial_data_count']}条财务数据") else: print("⚠ 未获取到财务数据") if result["saved_count"] > 0: print("✓ 财务数据保存成功") print(f"✓ 共保存{result['saved_count']}条财务数据") else: print("⚠ 财务数据保存失败") print(f"✓ 成功股票数: {result['success_count']}") print(f"✓ 失败股票数: {result['error_count']}") else: logger.error("财务数据更新失败!") print(f"更新失败: {result.get('error')}") return result if __name__ == "__main__": # 运行更新 result = main() # 输出最终结果 if result.get("success", False): print("\n兼容Baostock格式的财务数据更新完成!") sys.exit(0) else: print("\n兼容Baostock格式的财务数据更新失败!") sys.exit(1)