""" 股票分析系统启动脚本 提供命令行接口来运行系统功能 """ import argparse import asyncio import sys import os # 添加项目根目录和src目录到Python路径 project_root = os.path.dirname(__file__) sys.path.insert(0, project_root) sys.path.insert(0, os.path.join(project_root, 'src')) from src.main import StockAnalysisSystem # 直接导入Config类 import sys import os import importlib # 动态导入Config类 config_path = os.path.join(os.path.dirname(__file__), 'config', 'config.py') spec = importlib.util.spec_from_file_location("config", config_path) config_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(config_module) Config = config_module.Config class CommandLineInterface: """命令行接口类""" def __init__(self): self.parser = argparse.ArgumentParser( description='股票分析系统 - 数据采集、处理和分析平台', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=''' 使用示例: python run.py init # 初始化系统数据 python run.py scheduler # 启动定时任务调度器 python run.py status # 查看系统状态 python run.py update # 手动更新数据 python run.py test # 运行测试 python run.py performance # 运行性能测试 ''' ) self.parser.add_argument( 'command', choices=['init', 'scheduler', 'status', 'update', 'test', 'performance'], help='要执行的命令' ) self.parser.add_argument( '--config', default='config/config.py', help='配置文件路径' ) self.parser.add_argument( '--database', help='数据库URL (覆盖配置文件中的设置)' ) self.parser.add_argument( '--debug', action='store_true', help='启用调试模式' ) self.parser.add_argument( '--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO', help='日志级别' ) async def run(self, args): """运行命令""" try: # 配置系统 system = await self._configure_system(args) # 执行命令 if args.command == 'init': await self._run_init(system) elif args.command == 'scheduler': await self._run_scheduler(system) elif args.command == 'status': await self._run_status(system) elif args.command == 'update': await self._run_update(system) elif args.command == 'test': await self._run_test() elif args.command == 'performance': await self._run_performance() except KeyboardInterrupt: print("\n用户中断操作") except Exception as e: print(f"执行命令时发生错误: {e}") if args.debug: import traceback traceback.print_exc() sys.exit(1) async def _configure_system(self, args): """配置系统""" print("正在配置股票分析系统...") # 创建系统实例 system = StockAnalysisSystem() # 配置数据库 if args.database: system.db_config['database_url'] = args.database # 配置调试模式 if args.debug: Config.DEVELOPMENT_CONFIG['debug'] = True Config.LOGGING_CONFIG['level'] = args.log_level Config.LOGGING_CONFIG['console']['level'] = args.log_level # 系统已在构造函数中初始化完成 print("系统配置完成") return system async def _run_init(self, system): """运行初始化命令""" print("开始初始化系统数据...") # 自动确认操作(用于自动化测试) print("此操作将清空现有数据并重新初始化") print("自动确认: 继续执行") # 执行初始化 result = await system.initialize_data() if result: print("系统数据初始化完成") else: print("系统数据初始化失败") async def _run_scheduler(self, system): """运行调度器命令""" print("启动定时任务调度器...") print("按 Ctrl+C 停止调度器") try: # 启动调度器 await system.start_scheduler() # 保持运行 while True: await asyncio.sleep(1) except KeyboardInterrupt: print("\n正在停止调度器...") await system.stop_scheduler() print("调度器已停止") async def _run_status(self, system): """运行状态命令""" print("正在检查系统状态...") # 获取系统状态 status = await system.check_system_status() # 显示状态信息 print("\n=== 系统状态报告 ===") print(f"数据库连接: {'正常' if status['database'] else '异常'}") print(f"数据采集器: {'正常' if status['collectors'] else '异常'}") print(f"定时任务: {'运行中' if status['scheduler'] else '已停止'}") if 'data_counts' in status: print("\n=== 数据统计 ===") for data_type, count in status['data_counts'].items(): print(f"{data_type}: {count} 条记录") if 'last_update' in status: print(f"\n最后更新时间: {status['last_update']}") async def _run_update(self, system): """运行更新命令""" print("开始手动更新数据...") # 选择更新类型 print("请选择要更新的数据类型:") print("1. 股票基础信息") print("2. 日K线数据") print("3. 财务报告数据") print("4. 全部数据") choice = input("请输入选择 (1-4): ").strip() update_types = { '1': 'basic', '2': 'kline', '3': 'financial', '4': 'all' } if choice not in update_types: print("无效选择") return update_type = update_types[choice] # 执行更新 result = await system.update_data(update_type) if result: print(f"{update_type} 数据更新完成") else: print(f"{update_type} 数据更新失败") async def _run_test(self): """运行测试命令""" print("开始运行测试...") # 运行pytest import subprocess try: result = subprocess.run([ sys.executable, '-m', 'pytest', 'tests/', '-v', '--tb=short', '--color=yes' ], cwd=os.path.dirname(__file__)) if result.returncode == 0: print("所有测试通过") else: print("部分测试失败") except Exception as e: print(f"运行测试时发生错误: {e}") async def _run_performance(self): """运行性能测试命令""" print("开始运行性能测试...") # 运行性能测试 import subprocess try: result = subprocess.run([ sys.executable, '-m', 'pytest', 'tests/test_performance.py', '-v', '--tb=short', '--color=yes' ], cwd=os.path.dirname(__file__)) if result.returncode == 0: print("性能测试完成") else: print("性能测试失败") except Exception as e: print(f"运行性能测试时发生错误: {e}") def main(): """主函数""" cli = CommandLineInterface() args = cli.parser.parse_args() # 运行命令 asyncio.run(cli.run(args)) if __name__ == '__main__': main()