stock/check_data_status.py

64 lines
2.1 KiB
Python

"""
检查数据状态脚本
查看当前数据库中的数据量
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.storage.database import db_manager
from sqlalchemy import text
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def check_data_status():
"""检查数据状态"""
try:
# 获取数据库会话
session = db_manager.get_session()
logger.info("=== 检查数据状态 ===")
# 检查各表数据量
tables = ['stock_basic', 'daily_kline', 'financial_report', 'data_source', 'system_log']
for table in tables:
result = session.execute(text(f"SELECT COUNT(*) FROM {table}"))
count = result.fetchone()[0]
logger.info(f"{table}: {count} 条记录")
# 检查股票基础信息
result = session.execute(text("SELECT code, name, market FROM stock_basic LIMIT 10"))
stocks = result.fetchall()
if stocks:
logger.info("=== 前10只股票信息 ===")
for stock in stocks:
logger.info(f"代码: {stock[0]}, 名称: {stock[1]}, 市场: {stock[2]}")
else:
logger.info("股票基础信息表为空")
# 检查K线数据
result = session.execute(text("SELECT stock_code, trade_date, closing_price FROM daily_kline LIMIT 5"))
klines = result.fetchall()
if klines:
logger.info("=== 前5条K线数据 ===")
for kline in klines:
logger.info(f"股票: {kline[0]}, 日期: {kline[1]}, 收盘价: {kline[2]}")
else:
logger.info("K线数据表为空")
session.close()
logger.info("=== 数据状态检查完成 ===")
except Exception as e:
logger.error(f"检查数据状态失败: {str(e)}")
raise
if __name__ == "__main__":
check_data_status()