stock/update_kline_data.py

247 lines
6.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
更新K线数据脚本
只拉取K线数据和财务数据避免重复拉取股票基础信息
"""
import sys
import os
import asyncio
import logging
from datetime import date, timedelta
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.data.data_initializer import DataInitializer
from src.config.settings import Settings
from src.storage.database import db_manager
from src.storage.stock_repository import StockRepository
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
async def update_kline_data():
"""
更新K线数据
Returns:
更新结果
"""
try:
logger.info("开始更新K线数据...")
# 加载配置
settings = Settings()
# 创建数据初始化器
initializer = DataInitializer(settings)
# 创建存储库
repository = StockRepository(db_manager.get_session())
# 获取所有股票代码
stocks = repository.get_stock_basic_info()
if not stocks:
logger.error("没有股票基础信息无法更新K线数据")
return {"success": False, "error": "没有股票基础信息"}
logger.info(f"找到{len(stocks)}只股票开始更新K线数据...")
# 计算日期范围最近1年数据
end_date = date.today()
start_date = date(end_date.year - 1, end_date.month, end_date.day)
total_kline_data = []
success_count = 0
error_count = 0
# 分批处理,避免内存溢出
batch_size = 20
for i in range(0, len(stocks), batch_size):
batch_stocks = stocks[i:i + batch_size]
# 为每只股票获取K线数据
for stock in batch_stocks:
try:
logger.info(f"获取股票{stock.code}的K线数据...")
# 使用数据管理器获取K线数据
kline_data = await initializer.data_manager.get_daily_kline_data(
stock.code,
start_date.strftime("%Y-%m-%d"),
end_date.strftime("%Y-%m-%d")
)
if kline_data:
total_kline_data.extend(kline_data)
success_count += 1
logger.info(f"股票{stock.code}获取到{len(kline_data)}条K线数据")
else:
logger.warning(f"股票{stock.code}未获取到K线数据")
error_count += 1
# 小延迟避免请求过快
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"获取股票{stock.code}K线数据失败: {str(e)}")
error_count += 1
continue
# 保存K线数据到数据库
if total_kline_data:
logger.info(f"保存{len(total_kline_data)}条K线数据到数据库...")
save_result = repository.save_daily_kline_data(total_kline_data)
logger.info(f"K线数据保存完成: {save_result}")
else:
logger.warning("没有获取到任何K线数据")
save_result = {"added_count": 0, "error_count": 0, "total_count": 0}
result = {
"success": True,
"stock_count": len(stocks),
"success_stocks": success_count,
"error_stocks": error_count,
"kline_data_count": len(total_kline_data),
"save_result": save_result
}
logger.info(f"K线数据更新完成: {result}")
return result
except Exception as e:
logger.error(f"更新K线数据异常: {str(e)}")
return {"success": False, "error": str(e)}
async def update_financial_data():
"""
更新财务数据
Returns:
更新结果
"""
try:
logger.info("开始更新财务数据...")
# 加载配置
settings = Settings()
# 创建数据初始化器
initializer = DataInitializer(settings)
# 创建存储库
repository = StockRepository(db_manager.get_session())
# 获取所有股票代码
stocks = repository.get_stock_basic_info()
if not stocks:
logger.error("没有股票基础信息,无法更新财务数据")
return {"success": False, "error": "没有股票基础信息"}
logger.info(f"找到{len(stocks)}只股票,开始更新财务数据...")
total_financial_data = []
success_count = 0
error_count = 0
# 分批处理
batch_size = 15
for i in range(0, len(stocks), batch_size):
batch_stocks = stocks[i:i + batch_size]
# 为每只股票获取财务数据
for stock in batch_stocks:
try:
logger.info(f"获取股票{stock.code}的财务数据...")
# 使用数据管理器获取财务数据
financial_data = await initializer.data_manager.get_financial_report(stock.code)
if financial_data:
total_financial_data.extend(financial_data)
success_count += 1
logger.info(f"股票{stock.code}获取到{len(financial_data)}条财务数据")
else:
logger.warning(f"股票{stock.code}未获取到财务数据")
error_count += 1
# 小延迟避免请求过快
await asyncio.sleep(0.2)
except Exception as e:
logger.error(f"获取股票{stock.code}财务数据失败: {str(e)}")
error_count += 1
continue
# 保存财务数据到数据库
if total_financial_data:
logger.info(f"保存{len(total_financial_data)}条财务数据到数据库...")
save_result = repository.save_financial_report_data(total_financial_data)
logger.info(f"财务数据保存完成: {save_result}")
else:
logger.warning("没有获取到任何财务数据")
save_result = {"added_count": 0, "updated_count": 0, "error_count": 0, "total_count": 0}
result = {
"success": True,
"stock_count": len(stocks),
"success_stocks": success_count,
"error_stocks": error_count,
"financial_data_count": len(total_financial_data),
"save_result": save_result
}
logger.info(f"财务数据更新完成: {result}")
return result
except Exception as e:
logger.error(f"更新财务数据异常: {str(e)}")
return {"success": False, "error": str(e)}
async def main():
"""
主函数
"""
try:
logger.info("开始更新股票数据...")
# 更新K线数据
kline_result = await update_kline_data()
# 更新财务数据
financial_result = await update_financial_data()
# 汇总结果
total_result = {
"kline_update": kline_result,
"financial_update": financial_result,
"overall_success": kline_result.get("success", False) and financial_result.get("success", False)
}
logger.info(f"数据更新完成: {total_result}")
if total_result["overall_success"]:
logger.info("数据更新成功!")
else:
logger.error("数据更新失败!")
return total_result
except Exception as e:
logger.error(f"数据更新主程序异常: {str(e)}")
return {"success": False, "error": str(e)}
if __name__ == "__main__":
# 运行主程序
result = asyncio.run(main())
# 输出最终结果
if result.get("overall_success", False):
print("数据更新成功!")
sys.exit(0)
else:
print("数据更新失败!")
sys.exit(1)