stock/fix_database_charset.py

215 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
修复数据库字符集问题
检查并设置数据库和表的字符集为utf8mb4以支持中文字符
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.storage.database import db_manager
from sqlalchemy import text
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def get_foreign_key_constraints(session, table_name):
"""获取表的外键约束信息"""
constraints = []
# 查询外键约束信息
query = f"""
SELECT
CONSTRAINT_NAME,
COLUMN_NAME,
REFERENCED_TABLE_NAME,
REFERENCED_COLUMN_NAME
FROM information_schema.KEY_COLUMN_USAGE
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = '{table_name}'
AND REFERENCED_TABLE_NAME IS NOT NULL
"""
result = session.execute(text(query))
for row in result.fetchall():
constraints.append({
'name': row[0],
'column': row[1],
'referenced_table': row[2],
'referenced_column': row[3]
})
return constraints
def drop_foreign_key_constraints(session, table_name):
"""删除表的外键约束"""
constraints = get_foreign_key_constraints(session, table_name)
for constraint in constraints:
logger.info(f"删除外键约束: {constraint['name']}")
drop_sql = f"ALTER TABLE {table_name} DROP FOREIGN KEY {constraint['name']}"
session.execute(text(drop_sql))
return constraints
def recreate_foreign_key_constraints(session, table_name, constraints):
"""重新创建外键约束"""
for constraint in constraints:
logger.info(f"重新创建外键约束: {constraint['name']}")
create_sql = f"""
ALTER TABLE {table_name}
ADD CONSTRAINT {constraint['name']}
FOREIGN KEY ({constraint['column']})
REFERENCES {constraint['referenced_table']}({constraint['referenced_column']})
"""
session.execute(text(create_sql))
def check_and_fix_charset():
"""检查和修复数据库字符集"""
try:
# 获取数据库会话
session = db_manager.get_session()
logger.info("检查数据库字符集...")
# 检查数据库字符集
result = session.execute(text("SHOW VARIABLES LIKE 'character_set_database'"))
db_charset = result.fetchone()
logger.info(f"数据库字符集: {db_charset}")
result = session.execute(text("SHOW VARIABLES LIKE 'collation_database'"))
db_collation = result.fetchone()
logger.info(f"数据库排序规则: {db_collation}")
# 检查表的字符集
result = session.execute(text("SHOW TABLE STATUS"))
tables = result.fetchall()
logger.info("=== 表字符集状态 ===")
for table in tables:
table_name = table[0]
charset = table[14] # Collation字段
logger.info(f"{table_name}: {charset}")
# 检查是否需要修复
needs_fix = False
for table in tables:
if table[14] and 'utf8mb4' not in table[14]:
needs_fix = True
logger.warning(f"{table[0]} 需要修复字符集")
if needs_fix:
logger.info("开始修复字符集...")
# 修改数据库字符集
session.execute(text("ALTER DATABASE CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"))
# 存储外键约束信息
all_constraints = {}
# 先删除所有外键约束
for table in tables:
table_name = table[0]
constraints = drop_foreign_key_constraints(session, table_name)
if constraints:
all_constraints[table_name] = constraints
session.commit()
# 修改所有表的字符集
for table in tables:
table_name = table[0]
logger.info(f"修复表 {table_name} 的字符集")
session.execute(text(f"ALTER TABLE {table_name} CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"))
session.commit()
# 重新创建外键约束
for table_name, constraints in all_constraints.items():
if constraints:
recreate_foreign_key_constraints(session, table_name, constraints)
session.commit()
logger.info("字符集修复完成")
# 验证修复结果
result = session.execute(text("SHOW TABLE STATUS"))
tables_after = result.fetchall()
logger.info("=== 修复后表字符集状态 ===")
for table in tables_after:
table_name = table[0]
charset = table[14]
logger.info(f"{table_name}: {charset}")
else:
logger.info("字符集已正确设置,无需修复")
session.close()
except Exception as e:
logger.error(f"检查字符集失败: {str(e)}")
raise
def test_chinese_insert():
"""测试中文字符插入"""
try:
session = db_manager.get_session()
logger.info("测试中文字符插入...")
# 尝试插入包含中文字符的数据
test_data = {
'code': '000001',
'name': '平安银行',
'market': 'sz',
'company_name': '平安银行股份有限公司',
'industry': '银行',
'area': '广东',
'ipo_date': None,
'listing_status': 1
}
# 检查表是否存在
result = session.execute(text("SHOW TABLES LIKE 'stock_basic'"))
if result.fetchone():
# 清空测试数据
session.execute(text("DELETE FROM stock_basic WHERE code = '000001'"))
# 插入测试数据
insert_sql = """
INSERT INTO stock_basic (code, name, market, company_name, industry, area, ipo_date, listing_status)
VALUES (:code, :name, :market, :company_name, :industry, :area, :ipo_date, :listing_status)
"""
session.execute(text(insert_sql), test_data)
session.commit()
# 验证插入
result = session.execute(text("SELECT * FROM stock_basic WHERE code = '000001'"))
inserted_data = result.fetchone()
if inserted_data:
logger.info(f"中文字符插入成功: {inserted_data}")
# 清理测试数据
session.execute(text("DELETE FROM stock_basic WHERE code = '000001'"))
session.commit()
else:
logger.error("中文字符插入失败")
else:
logger.warning("stock_basic表不存在跳过测试")
session.close()
except Exception as e:
logger.error(f"中文字符插入测试失败: {str(e)}")
raise
if __name__ == "__main__":
logger.info("=== 开始检查和修复数据库字符集 ===")
check_and_fix_charset()
logger.info("=== 测试中文字符插入 ===")
test_chinese_insert()
logger.info("=== 字符集检查和修复完成 ===")