stock/run.py

271 lines
6.6 KiB
Python

"""
股票分析系统启动脚本
提供命令行接口来运行系统功能
"""
import argparse
import asyncio
import sys
import os
# 添加项目根目录和src目录到Python路径
project_root = os.path.dirname(__file__)
sys.path.insert(0, project_root)
sys.path.insert(0, os.path.join(project_root, 'src'))
from src.main import StockAnalysisSystem
# 直接导入Config类
import sys
import os
import importlib
# 动态导入Config类
config_path = os.path.join(os.path.dirname(__file__), 'config', 'config.py')
spec = importlib.util.spec_from_file_location("config", config_path)
config_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(config_module)
Config = config_module.Config
class CommandLineInterface:
"""命令行接口类"""
def __init__(self):
self.parser = argparse.ArgumentParser(
description='股票分析系统 - 数据采集、处理和分析平台',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
使用示例:
python run.py init # 初始化系统数据
python run.py scheduler # 启动定时任务调度器
python run.py status # 查看系统状态
python run.py update # 手动更新数据
python run.py test # 运行测试
python run.py performance # 运行性能测试
'''
)
self.parser.add_argument(
'command',
choices=['init', 'scheduler', 'status', 'update', 'test', 'performance'],
help='要执行的命令'
)
self.parser.add_argument(
'--config',
default='config/config.py',
help='配置文件路径'
)
self.parser.add_argument(
'--database',
help='数据库URL (覆盖配置文件中的设置)'
)
self.parser.add_argument(
'--debug',
action='store_true',
help='启用调试模式'
)
self.parser.add_argument(
'--log-level',
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
default='INFO',
help='日志级别'
)
async def run(self, args):
"""运行命令"""
try:
# 配置系统
system = await self._configure_system(args)
# 执行命令
if args.command == 'init':
await self._run_init(system)
elif args.command == 'scheduler':
await self._run_scheduler(system)
elif args.command == 'status':
await self._run_status(system)
elif args.command == 'update':
await self._run_update(system)
elif args.command == 'test':
await self._run_test()
elif args.command == 'performance':
await self._run_performance()
except KeyboardInterrupt:
print("\n用户中断操作")
except Exception as e:
print(f"执行命令时发生错误: {e}")
if args.debug:
import traceback
traceback.print_exc()
sys.exit(1)
async def _configure_system(self, args):
"""配置系统"""
print("正在配置股票分析系统...")
# 创建系统实例
system = StockAnalysisSystem()
# 配置数据库
if args.database:
system.db_config['database_url'] = args.database
# 配置调试模式
if args.debug:
Config.DEVELOPMENT_CONFIG['debug'] = True
Config.LOGGING_CONFIG['level'] = args.log_level
Config.LOGGING_CONFIG['console']['level'] = args.log_level
# 系统已在构造函数中初始化完成
print("系统配置完成")
return system
async def _run_init(self, system):
"""运行初始化命令"""
print("开始初始化系统数据...")
# 自动确认操作(用于自动化测试)
print("此操作将清空现有数据并重新初始化")
print("自动确认: 继续执行")
# 执行初始化
result = await system.initialize_data()
if result:
print("系统数据初始化完成")
else:
print("系统数据初始化失败")
async def _run_scheduler(self, system):
"""运行调度器命令"""
print("启动定时任务调度器...")
print("按 Ctrl+C 停止调度器")
try:
# 启动调度器
await system.start_scheduler()
# 保持运行
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\n正在停止调度器...")
await system.stop_scheduler()
print("调度器已停止")
async def _run_status(self, system):
"""运行状态命令"""
print("正在检查系统状态...")
# 获取系统状态
status = await system.check_system_status()
# 显示状态信息
print("\n=== 系统状态报告 ===")
print(f"数据库连接: {'正常' if status['database'] else '异常'}")
print(f"数据采集器: {'正常' if status['collectors'] else '异常'}")
print(f"定时任务: {'运行中' if status['scheduler'] else '已停止'}")
if 'data_counts' in status:
print("\n=== 数据统计 ===")
for data_type, count in status['data_counts'].items():
print(f"{data_type}: {count} 条记录")
if 'last_update' in status:
print(f"\n最后更新时间: {status['last_update']}")
async def _run_update(self, system):
"""运行更新命令"""
print("开始手动更新数据...")
# 选择更新类型
print("请选择要更新的数据类型:")
print("1. 股票基础信息")
print("2. 日K线数据")
print("3. 财务报告数据")
print("4. 全部数据")
choice = input("请输入选择 (1-4): ").strip()
update_types = {
'1': 'basic',
'2': 'kline',
'3': 'financial',
'4': 'all'
}
if choice not in update_types:
print("无效选择")
return
update_type = update_types[choice]
# 执行更新
result = await system.update_data(update_type)
if result:
print(f"{update_type} 数据更新完成")
else:
print(f"{update_type} 数据更新失败")
async def _run_test(self):
"""运行测试命令"""
print("开始运行测试...")
# 运行pytest
import subprocess
try:
result = subprocess.run([
sys.executable, '-m', 'pytest', 'tests/',
'-v', '--tb=short', '--color=yes'
], cwd=os.path.dirname(__file__))
if result.returncode == 0:
print("所有测试通过")
else:
print("部分测试失败")
except Exception as e:
print(f"运行测试时发生错误: {e}")
async def _run_performance(self):
"""运行性能测试命令"""
print("开始运行性能测试...")
# 运行性能测试
import subprocess
try:
result = subprocess.run([
sys.executable, '-m', 'pytest',
'tests/test_performance.py',
'-v', '--tb=short', '--color=yes'
], cwd=os.path.dirname(__file__))
if result.returncode == 0:
print("性能测试完成")
else:
print("性能测试失败")
except Exception as e:
print(f"运行性能测试时发生错误: {e}")
def main():
"""主函数"""
cli = CommandLineInterface()
args = cli.parser.parse_args()
# 运行命令
asyncio.run(cli.run(args))
if __name__ == '__main__':
main()