- 重构数据访问层:引入DAO模式,支持MySQL/SQLite双数据库 - 新增数据库架构:完整的股票数据、AI分析、自选股管理表结构 - 升级AI分析服务:集成豆包大模型,支持多维度分析 - 优化API路由:分离市场数据API,提供更清晰的接口设计 - 完善项目文档:添加数据库迁移指南、新功能指南等 - 清理冗余文件:删除旧的缓存文件和无用配置 - 新增调度器:支持定时任务和数据自动更新 - 改进前端模板:简化的股票展示页面 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
159 lines
4.3 KiB
Python
159 lines
4.3 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
数据库初始化脚本
|
||
创建数据库表结构并初始化基础数据
|
||
"""
|
||
import sys
|
||
import os
|
||
from pathlib import Path
|
||
|
||
# 添加项目根目录到Python路径
|
||
project_root = Path(__file__).parent
|
||
sys.path.insert(0, str(project_root))
|
||
|
||
from app.database import DatabaseManager
|
||
from app.config import Config
|
||
|
||
|
||
def create_database():
|
||
"""创建数据库"""
|
||
print("正在创建数据库...")
|
||
|
||
# 创建数据库管理器,连接到MySQL服务器(不指定数据库)
|
||
db_manager = DatabaseManager()
|
||
|
||
try:
|
||
with db_manager.get_connection() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
# 创建数据库
|
||
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {Config.MYSQL_DATABASE} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")
|
||
print(f"✓ 数据库 {Config.MYSQL_DATABASE} 创建成功")
|
||
|
||
cursor.close()
|
||
|
||
except Exception as e:
|
||
print(f"✗ 创建数据库失败: {e}")
|
||
return False
|
||
|
||
return True
|
||
|
||
|
||
def create_tables():
|
||
"""创建数据表"""
|
||
print("正在创建数据表...")
|
||
|
||
# 读取SQL脚本
|
||
schema_file = project_root / "database_schema.sql"
|
||
if not schema_file.exists():
|
||
print(f"✗ 数据库表结构文件不存在: {schema_file}")
|
||
return False
|
||
|
||
with open(schema_file, 'r', encoding='utf-8') as f:
|
||
sql_content = f.read()
|
||
|
||
db_manager = DatabaseManager()
|
||
|
||
try:
|
||
with db_manager.get_connection() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
# 分割SQL语句并执行
|
||
statements = [stmt.strip() for stmt in sql_content.split(';') if stmt.strip()]
|
||
|
||
for statement in statements:
|
||
if statement:
|
||
try:
|
||
cursor.execute(statement)
|
||
except Exception as e:
|
||
# 忽略表已存在的错误
|
||
if "already exists" not in str(e):
|
||
print(f"警告: 执行SQL语句失败: {statement[:50]}... 错误: {e}")
|
||
|
||
conn.commit()
|
||
print("✓ 数据表创建成功")
|
||
cursor.close()
|
||
|
||
except Exception as e:
|
||
print(f"✗ 创建数据表失败: {e}")
|
||
return False
|
||
|
||
return True
|
||
|
||
|
||
def test_connection():
|
||
"""测试数据库连接"""
|
||
print("正在测试数据库连接...")
|
||
|
||
try:
|
||
from app.dao import StockDAO, WatchlistDAO, AIAnalysisDAO, ConfigDAO
|
||
|
||
# 测试各个DAO
|
||
stock_dao = StockDAO()
|
||
watchlist_dao = WatchlistDAO()
|
||
ai_dao = AIAnalysisDAO()
|
||
config_dao = ConfigDAO()
|
||
|
||
# 获取数据库状态
|
||
stock_count = stock_dao.get_stock_count()
|
||
watchlist_count = watchlist_dao.get_watchlist_count()
|
||
ai_count = ai_dao.get_analysis_count()
|
||
|
||
print(f"✓ 数据库连接成功")
|
||
print(f" - 股票数量: {stock_count}")
|
||
print(f" - 监控列表: {watchlist_count}")
|
||
print(f" - AI分析: {ai_count}")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"✗ 数据库连接失败: {e}")
|
||
return False
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
print("=" * 60)
|
||
print("股票监控系统数据库初始化")
|
||
print("=" * 60)
|
||
print(f"数据库主机: {Config.MYSQL_HOST}:{Config.MYSQL_PORT}")
|
||
print(f"数据库名称: {Config.MYSQL_DATABASE}")
|
||
print(f"数据库用户: {Config.MYSQL_USER}")
|
||
print("=" * 60)
|
||
|
||
# 1. 创建数据库
|
||
if not create_database():
|
||
print("数据库创建失败,初始化终止")
|
||
return False
|
||
|
||
# 2. 创建数据表
|
||
if not create_tables():
|
||
print("数据表创建失败,初始化终止")
|
||
return False
|
||
|
||
# 3. 测试连接
|
||
if not test_connection():
|
||
print("数据库连接测试失败,初始化终止")
|
||
return False
|
||
|
||
print("\n" + "=" * 60)
|
||
print("数据库初始化完成!")
|
||
print("=" * 60)
|
||
print("\n下一步操作:")
|
||
print("1. 运行数据迁移脚本: python migrate_to_database.py")
|
||
print("2. 启动应用系统")
|
||
print("=" * 60)
|
||
|
||
return True
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
success = main()
|
||
sys.exit(0 if success else 1)
|
||
except KeyboardInterrupt:
|
||
print("\n数据库初始化被用户中断")
|
||
sys.exit(1)
|
||
except Exception as e:
|
||
print(f"\n数据库初始化过程中发生错误: {e}")
|
||
sys.exit(1) |