stock-monitor/docs/database/migrate_to_database.py
ycg 569c1c8813 重构股票监控系统:数据库架构升级与功能完善
- 重构数据访问层:引入DAO模式,支持MySQL/SQLite双数据库
- 新增数据库架构:完整的股票数据、AI分析、自选股管理表结构
- 升级AI分析服务:集成豆包大模型,支持多维度分析
- 优化API路由:分离市场数据API,提供更清晰的接口设计
- 完善项目文档:添加数据库迁移指南、新功能指南等
- 清理冗余文件:删除旧的缓存文件和无用配置
- 新增调度器:支持定时任务和数据自动更新
- 改进前端模板:简化的股票展示页面

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-01 15:44:25 +08:00

325 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
数据迁移脚本将JSON文件数据迁移到MySQL数据库
"""
import json
import os
import sys
from datetime import datetime
from pathlib import Path
# 添加项目根目录到Python路径
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from app.dao import StockDAO, WatchlistDAO, AIAnalysisDAO, ConfigDAO
from app.config import Config
class DataMigration:
def __init__(self):
self.stock_dao = StockDAO()
self.watchlist_dao = WatchlistDAO()
self.ai_dao = AIAnalysisDAO()
self.config_dao = ConfigDAO()
# JSON文件路径
self.config_file = Config.CONFIG_FILE
self.cache_file = os.path.join(Config.BASE_DIR, "stock_cache.json")
self.ai_cache_dir = os.path.join(Config.BASE_DIR, "ai_stock_analysis")
self.dao_cache_dir = os.path.join(Config.BASE_DIR, "dao_analysis")
self.daka_cache_dir = os.path.join(Config.BASE_DIR, "daka_analysis")
print("数据迁移工具初始化完成")
print(f"配置文件: {self.config_file}")
print(f"股票缓存文件: {self.cache_file}")
print(f"AI分析缓存目录: {self.ai_cache_dir}")
def migrate_watchlist(self):
"""迁移监控列表"""
print("\n开始迁移监控列表...")
if not os.path.exists(self.config_file):
print("配置文件不存在,跳过监控列表迁移")
return 0
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
config_data = json.load(f)
watchlist = config_data.get('watchlist', {})
migrated_count = 0
for stock_code, targets in watchlist.items():
try:
target_min = targets.get('target_market_value', {}).get('min')
target_max = targets.get('target_market_value', {}).get('max')
success = self.watchlist_dao.add_to_watchlist(
stock_code, target_min, target_max
)
if success:
migrated_count += 1
print(f"✓ 迁移监控股票: {stock_code}")
else:
print(f"✗ 迁移失败: {stock_code}")
except Exception as e:
print(f"✗ 迁移股票 {stock_code} 失败: {e}")
print(f"监控列表迁移完成,共迁移 {migrated_count} 支股票")
return migrated_count
except Exception as e:
print(f"监控列表迁移失败: {e}")
return 0
def migrate_stock_cache(self):
"""迁移股票缓存数据"""
print("\n开始迁移股票缓存数据...")
if not os.path.exists(self.cache_file):
print("股票缓存文件不存在,跳过缓存数据迁移")
return 0
try:
with open(self.cache_file, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
migrated_count = 0
for stock_code, data in cache_data.items():
try:
stock_info = data.get('data', {}).get('stock_info', {})
timestamp = data.get('timestamp', datetime.now().strftime('%Y-%m-%d'))
# 迁移股票信息
if stock_info:
stock_name = stock_info.get('name', '')
market = 'SH' if stock_code.startswith('6') else 'SZ'
# 添加或更新股票基础信息
self.stock_dao.add_or_update_stock(stock_code, stock_name, market)
# 保存股票数据
success = self.stock_dao.save_stock_data(
stock_code, stock_info, timestamp
)
if success:
migrated_count += 1
print(f"✓ 迁移股票数据: {stock_code} ({timestamp})")
else:
print(f"✗ 迁移失败: {stock_code}")
except Exception as e:
print(f"✗ 迁移股票数据 {stock_code} 失败: {e}")
print(f"股票缓存数据迁移完成,共迁移 {migrated_count} 条记录")
return migrated_count
except Exception as e:
print(f"股票缓存数据迁移失败: {e}")
return 0
def migrate_ai_analysis(self, cache_dir: str, analysis_type: str):
"""迁移AI分析数据"""
if not os.path.exists(cache_dir):
print(f"分析缓存目录不存在: {cache_dir}")
return 0
migrated_count = 0
try:
for filename in os.listdir(cache_dir):
if filename.endswith('.json'):
stock_code = filename[:-5] # 移除.json后缀
file_path = os.path.join(cache_dir, filename)
try:
with open(file_path, 'r', encoding='utf-8') as f:
analysis_data = json.load(f)
# 获取文件修改时间作为分析日期
file_mtime = os.path.getmtime(file_path)
analysis_date = datetime.fromtimestamp(file_mtime).strftime('%Y-%m-%d')
# 保存到数据库
success = self.ai_dao.save_analysis(
stock_code, analysis_type, analysis_data, analysis_date
)
if success:
migrated_count += 1
print(f"✓ 迁移{analysis_type}分析: {stock_code}")
else:
print(f"✗ 迁移失败: {stock_code}")
except Exception as e:
print(f"✗ 迁移{analysis_type}分析 {stock_code} 失败: {e}")
return migrated_count
except Exception as e:
print(f"{analysis_type}分析数据迁移失败: {e}")
return 0
def migrate_all_ai_analysis(self):
"""迁移所有AI分析数据"""
print("\n开始迁移AI分析数据...")
total_migrated = 0
# 迁移标准AI分析
print("迁移标准AI分析...")
count = self.migrate_ai_analysis(self.ai_cache_dir, 'stock')
total_migrated += count
print(f"标准AI分析迁移完成{count}")
# 迁移道德经分析
print("\n迁移道德经分析...")
count = self.migrate_ai_analysis(self.dao_cache_dir, 'dao')
total_migrated += count
print(f"道德经分析迁移完成,共 {count}")
# 迁移大咖分析
print("\n迁移大咖分析...")
count = self.migrate_ai_analysis(self.daka_cache_dir, 'daka')
total_migrated += count
print(f"大咖分析迁移完成,共 {count}")
print(f"\nAI分析数据迁移完成共迁移 {total_migrated} 条记录")
return total_migrated
def backup_json_files(self):
"""备份JSON文件"""
print("\n备份JSON文件...")
backup_dir = os.path.join(Config.BASE_DIR, f"json_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
os.makedirs(backup_dir, exist_ok=True)
files_to_backup = [
(self.config_file, "config.json"),
(self.cache_file, "stock_cache.json")
]
directories_to_backup = [
(self.ai_cache_dir, "ai_stock_analysis"),
(self.dao_cache_dir, "dao_analysis"),
(self.daka_cache_dir, "daka_analysis")
]
import shutil
# 备份文件
for file_path, filename in files_to_backup:
if os.path.exists(file_path):
shutil.copy2(file_path, os.path.join(backup_dir, filename))
print(f"✓ 备份文件: {filename}")
# 备份目录
for dir_path, dirname in directories_to_backup:
if os.path.exists(dir_path):
shutil.copytree(dir_path, os.path.join(backup_dir, dirname), dirs_exist_ok=True)
print(f"✓ 备份目录: {dirname}")
print(f"JSON文件备份完成备份位置: {backup_dir}")
return backup_dir
def run_full_migration(self):
"""执行完整数据迁移"""
print("=" * 60)
print("开始从JSON到数据库的完整数据迁移")
print("=" * 60)
# 备份JSON文件
backup_dir = self.backup_json_files()
# 执行迁移
try:
watchlist_count = self.migrate_watchlist()
stock_cache_count = self.migrate_stock_cache()
ai_analysis_count = self.migrate_all_ai_analysis()
print("\n" + "=" * 60)
print("数据迁移完成!")
print("=" * 60)
print(f"监控列表: {watchlist_count}")
print(f"股票缓存数据: {stock_cache_count}")
print(f"AI分析数据: {ai_analysis_count}")
print(f"JSON文件备份: {backup_dir}")
print("=" * 60)
return True
except Exception as e:
print(f"\n数据迁移过程中发生错误: {e}")
print("请检查数据库连接和权限设置")
return False
def verify_migration(self):
"""验证迁移结果"""
print("\n验证迁移结果...")
try:
# 检查股票数据
stock_count = self.stock_dao.get_stock_count()
print(f"数据库中股票数量: {stock_count}")
# 检查监控列表
watchlist_count = self.watchlist_dao.get_watchlist_count()
print(f"监控列表股票数量: {watchlist_count}")
# 检查AI分析数据
ai_analysis_count = self.ai_dao.get_analysis_count()
print(f"AI分析记录数量: {ai_analysis_count}")
# 检查日期范围
date_range = self.stock_dao.get_data_date_range()
if date_range:
print(f"数据日期范围: {date_range.get('min_date')}{date_range.get('max_date')}")
return True
except Exception as e:
print(f"验证迁移结果失败: {e}")
return False
def main():
"""主函数"""
migration = DataMigration()
print("数据迁移工具")
print("1. 执行完整迁移")
print("2. 仅迁移监控列表")
print("3. 仅迁移股票缓存")
print("4. 仅迁移AI分析")
print("5. 验证迁移结果")
try:
choice = input("\n请选择操作 (1-5): ").strip()
if choice == '1':
migration.run_full_migration()
migration.verify_migration()
elif choice == '2':
migration.migrate_watchlist()
elif choice == '3':
migration.migrate_stock_cache()
elif choice == '4':
migration.migrate_all_ai_analysis()
elif choice == '5':
migration.verify_migration()
else:
print("无效选择")
except KeyboardInterrupt:
print("\n\n迁移被用户中断")
except Exception as e:
print(f"\n迁移过程中发生错误: {e}")
if __name__ == "__main__":
main()