#!/usr/bin/env python3 """ 数据迁移脚本:将JSON文件数据迁移到MySQL数据库 """ import json import os import sys from datetime import datetime from pathlib import Path # 添加项目根目录到Python路径 project_root = Path(__file__).parent sys.path.insert(0, str(project_root)) from app.dao import StockDAO, WatchlistDAO, AIAnalysisDAO, ConfigDAO from app.config import Config class DataMigration: def __init__(self): self.stock_dao = StockDAO() self.watchlist_dao = WatchlistDAO() self.ai_dao = AIAnalysisDAO() self.config_dao = ConfigDAO() # JSON文件路径 self.config_file = Config.CONFIG_FILE self.cache_file = os.path.join(Config.BASE_DIR, "stock_cache.json") self.ai_cache_dir = os.path.join(Config.BASE_DIR, "ai_stock_analysis") self.dao_cache_dir = os.path.join(Config.BASE_DIR, "dao_analysis") self.daka_cache_dir = os.path.join(Config.BASE_DIR, "daka_analysis") print("数据迁移工具初始化完成") print(f"配置文件: {self.config_file}") print(f"股票缓存文件: {self.cache_file}") print(f"AI分析缓存目录: {self.ai_cache_dir}") def migrate_watchlist(self): """迁移监控列表""" print("\n开始迁移监控列表...") if not os.path.exists(self.config_file): print("配置文件不存在,跳过监控列表迁移") return 0 try: with open(self.config_file, 'r', encoding='utf-8') as f: config_data = json.load(f) watchlist = config_data.get('watchlist', {}) migrated_count = 0 for stock_code, targets in watchlist.items(): try: target_min = targets.get('target_market_value', {}).get('min') target_max = targets.get('target_market_value', {}).get('max') success = self.watchlist_dao.add_to_watchlist( stock_code, target_min, target_max ) if success: migrated_count += 1 print(f"✓ 迁移监控股票: {stock_code}") else: print(f"✗ 迁移失败: {stock_code}") except Exception as e: print(f"✗ 迁移股票 {stock_code} 失败: {e}") print(f"监控列表迁移完成,共迁移 {migrated_count} 支股票") return migrated_count except Exception as e: print(f"监控列表迁移失败: {e}") return 0 def migrate_stock_cache(self): """迁移股票缓存数据""" print("\n开始迁移股票缓存数据...") if not os.path.exists(self.cache_file): print("股票缓存文件不存在,跳过缓存数据迁移") return 0 try: with open(self.cache_file, 'r', encoding='utf-8') as f: cache_data = json.load(f) migrated_count = 0 for stock_code, data in cache_data.items(): try: stock_info = data.get('data', {}).get('stock_info', {}) timestamp = data.get('timestamp', datetime.now().strftime('%Y-%m-%d')) # 迁移股票信息 if stock_info: stock_name = stock_info.get('name', '') market = 'SH' if stock_code.startswith('6') else 'SZ' # 添加或更新股票基础信息 self.stock_dao.add_or_update_stock(stock_code, stock_name, market) # 保存股票数据 success = self.stock_dao.save_stock_data( stock_code, stock_info, timestamp ) if success: migrated_count += 1 print(f"✓ 迁移股票数据: {stock_code} ({timestamp})") else: print(f"✗ 迁移失败: {stock_code}") except Exception as e: print(f"✗ 迁移股票数据 {stock_code} 失败: {e}") print(f"股票缓存数据迁移完成,共迁移 {migrated_count} 条记录") return migrated_count except Exception as e: print(f"股票缓存数据迁移失败: {e}") return 0 def migrate_ai_analysis(self, cache_dir: str, analysis_type: str): """迁移AI分析数据""" if not os.path.exists(cache_dir): print(f"分析缓存目录不存在: {cache_dir}") return 0 migrated_count = 0 try: for filename in os.listdir(cache_dir): if filename.endswith('.json'): stock_code = filename[:-5] # 移除.json后缀 file_path = os.path.join(cache_dir, filename) try: with open(file_path, 'r', encoding='utf-8') as f: analysis_data = json.load(f) # 获取文件修改时间作为分析日期 file_mtime = os.path.getmtime(file_path) analysis_date = datetime.fromtimestamp(file_mtime).strftime('%Y-%m-%d') # 保存到数据库 success = self.ai_dao.save_analysis( stock_code, analysis_type, analysis_data, analysis_date ) if success: migrated_count += 1 print(f"✓ 迁移{analysis_type}分析: {stock_code}") else: print(f"✗ 迁移失败: {stock_code}") except Exception as e: print(f"✗ 迁移{analysis_type}分析 {stock_code} 失败: {e}") return migrated_count except Exception as e: print(f"{analysis_type}分析数据迁移失败: {e}") return 0 def migrate_all_ai_analysis(self): """迁移所有AI分析数据""" print("\n开始迁移AI分析数据...") total_migrated = 0 # 迁移标准AI分析 print("迁移标准AI分析...") count = self.migrate_ai_analysis(self.ai_cache_dir, 'stock') total_migrated += count print(f"标准AI分析迁移完成,共 {count} 条") # 迁移道德经分析 print("\n迁移道德经分析...") count = self.migrate_ai_analysis(self.dao_cache_dir, 'dao') total_migrated += count print(f"道德经分析迁移完成,共 {count} 条") # 迁移大咖分析 print("\n迁移大咖分析...") count = self.migrate_ai_analysis(self.daka_cache_dir, 'daka') total_migrated += count print(f"大咖分析迁移完成,共 {count} 条") print(f"\nAI分析数据迁移完成,共迁移 {total_migrated} 条记录") return total_migrated def backup_json_files(self): """备份JSON文件""" print("\n备份JSON文件...") backup_dir = os.path.join(Config.BASE_DIR, f"json_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}") os.makedirs(backup_dir, exist_ok=True) files_to_backup = [ (self.config_file, "config.json"), (self.cache_file, "stock_cache.json") ] directories_to_backup = [ (self.ai_cache_dir, "ai_stock_analysis"), (self.dao_cache_dir, "dao_analysis"), (self.daka_cache_dir, "daka_analysis") ] import shutil # 备份文件 for file_path, filename in files_to_backup: if os.path.exists(file_path): shutil.copy2(file_path, os.path.join(backup_dir, filename)) print(f"✓ 备份文件: {filename}") # 备份目录 for dir_path, dirname in directories_to_backup: if os.path.exists(dir_path): shutil.copytree(dir_path, os.path.join(backup_dir, dirname), dirs_exist_ok=True) print(f"✓ 备份目录: {dirname}") print(f"JSON文件备份完成,备份位置: {backup_dir}") return backup_dir def run_full_migration(self): """执行完整数据迁移""" print("=" * 60) print("开始从JSON到数据库的完整数据迁移") print("=" * 60) # 备份JSON文件 backup_dir = self.backup_json_files() # 执行迁移 try: watchlist_count = self.migrate_watchlist() stock_cache_count = self.migrate_stock_cache() ai_analysis_count = self.migrate_all_ai_analysis() print("\n" + "=" * 60) print("数据迁移完成!") print("=" * 60) print(f"监控列表: {watchlist_count} 条") print(f"股票缓存数据: {stock_cache_count} 条") print(f"AI分析数据: {ai_analysis_count} 条") print(f"JSON文件备份: {backup_dir}") print("=" * 60) return True except Exception as e: print(f"\n数据迁移过程中发生错误: {e}") print("请检查数据库连接和权限设置") return False def verify_migration(self): """验证迁移结果""" print("\n验证迁移结果...") try: # 检查股票数据 stock_count = self.stock_dao.get_stock_count() print(f"数据库中股票数量: {stock_count}") # 检查监控列表 watchlist_count = self.watchlist_dao.get_watchlist_count() print(f"监控列表股票数量: {watchlist_count}") # 检查AI分析数据 ai_analysis_count = self.ai_dao.get_analysis_count() print(f"AI分析记录数量: {ai_analysis_count}") # 检查日期范围 date_range = self.stock_dao.get_data_date_range() if date_range: print(f"数据日期范围: {date_range.get('min_date')} 至 {date_range.get('max_date')}") return True except Exception as e: print(f"验证迁移结果失败: {e}") return False def main(): """主函数""" migration = DataMigration() print("数据迁移工具") print("1. 执行完整迁移") print("2. 仅迁移监控列表") print("3. 仅迁移股票缓存") print("4. 仅迁移AI分析") print("5. 验证迁移结果") try: choice = input("\n请选择操作 (1-5): ").strip() if choice == '1': migration.run_full_migration() migration.verify_migration() elif choice == '2': migration.migrate_watchlist() elif choice == '3': migration.migrate_stock_cache() elif choice == '4': migration.migrate_all_ai_analysis() elif choice == '5': migration.verify_migration() else: print("无效选择") except KeyboardInterrupt: print("\n\n迁移被用户中断") except Exception as e: print(f"\n迁移过程中发生错误: {e}") if __name__ == "__main__": main()