271 lines
6.6 KiB
Python
271 lines
6.6 KiB
Python
"""
|
|
股票分析系统启动脚本
|
|
提供命令行接口来运行系统功能
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import sys
|
|
import os
|
|
|
|
# 添加项目根目录和src目录到Python路径
|
|
project_root = os.path.dirname(__file__)
|
|
sys.path.insert(0, project_root)
|
|
sys.path.insert(0, os.path.join(project_root, 'src'))
|
|
|
|
from src.main import StockAnalysisSystem
|
|
# 直接导入Config类
|
|
import sys
|
|
import os
|
|
import importlib
|
|
|
|
# 动态导入Config类
|
|
config_path = os.path.join(os.path.dirname(__file__), 'config', 'config.py')
|
|
spec = importlib.util.spec_from_file_location("config", config_path)
|
|
config_module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(config_module)
|
|
Config = config_module.Config
|
|
|
|
|
|
class CommandLineInterface:
|
|
"""命令行接口类"""
|
|
|
|
def __init__(self):
|
|
self.parser = argparse.ArgumentParser(
|
|
description='股票分析系统 - 数据采集、处理和分析平台',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog='''
|
|
使用示例:
|
|
python run.py init # 初始化系统数据
|
|
python run.py scheduler # 启动定时任务调度器
|
|
python run.py status # 查看系统状态
|
|
python run.py update # 手动更新数据
|
|
python run.py test # 运行测试
|
|
python run.py performance # 运行性能测试
|
|
'''
|
|
)
|
|
|
|
self.parser.add_argument(
|
|
'command',
|
|
choices=['init', 'scheduler', 'status', 'update', 'test', 'performance'],
|
|
help='要执行的命令'
|
|
)
|
|
|
|
self.parser.add_argument(
|
|
'--config',
|
|
default='config/config.py',
|
|
help='配置文件路径'
|
|
)
|
|
|
|
self.parser.add_argument(
|
|
'--database',
|
|
help='数据库URL (覆盖配置文件中的设置)'
|
|
)
|
|
|
|
self.parser.add_argument(
|
|
'--debug',
|
|
action='store_true',
|
|
help='启用调试模式'
|
|
)
|
|
|
|
self.parser.add_argument(
|
|
'--log-level',
|
|
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
|
|
default='INFO',
|
|
help='日志级别'
|
|
)
|
|
|
|
async def run(self, args):
|
|
"""运行命令"""
|
|
try:
|
|
# 配置系统
|
|
system = await self._configure_system(args)
|
|
|
|
# 执行命令
|
|
if args.command == 'init':
|
|
await self._run_init(system)
|
|
elif args.command == 'scheduler':
|
|
await self._run_scheduler(system)
|
|
elif args.command == 'status':
|
|
await self._run_status(system)
|
|
elif args.command == 'update':
|
|
await self._run_update(system)
|
|
elif args.command == 'test':
|
|
await self._run_test()
|
|
elif args.command == 'performance':
|
|
await self._run_performance()
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n用户中断操作")
|
|
except Exception as e:
|
|
print(f"执行命令时发生错误: {e}")
|
|
if args.debug:
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
async def _configure_system(self, args):
|
|
"""配置系统"""
|
|
print("正在配置股票分析系统...")
|
|
|
|
# 创建系统实例
|
|
system = StockAnalysisSystem()
|
|
|
|
# 配置数据库
|
|
if args.database:
|
|
system.db_config['database_url'] = args.database
|
|
|
|
# 配置调试模式
|
|
if args.debug:
|
|
Config.DEVELOPMENT_CONFIG['debug'] = True
|
|
Config.LOGGING_CONFIG['level'] = args.log_level
|
|
Config.LOGGING_CONFIG['console']['level'] = args.log_level
|
|
|
|
# 系统已在构造函数中初始化完成
|
|
print("系统配置完成")
|
|
return system
|
|
|
|
async def _run_init(self, system):
|
|
"""运行初始化命令"""
|
|
print("开始初始化系统数据...")
|
|
|
|
# 自动确认操作(用于自动化测试)
|
|
print("此操作将清空现有数据并重新初始化")
|
|
print("自动确认: 继续执行")
|
|
|
|
# 执行初始化
|
|
result = await system.initialize_data()
|
|
|
|
if result:
|
|
print("系统数据初始化完成")
|
|
else:
|
|
print("系统数据初始化失败")
|
|
|
|
async def _run_scheduler(self, system):
|
|
"""运行调度器命令"""
|
|
print("启动定时任务调度器...")
|
|
print("按 Ctrl+C 停止调度器")
|
|
|
|
try:
|
|
# 启动调度器
|
|
await system.start_scheduler()
|
|
|
|
# 保持运行
|
|
while True:
|
|
await asyncio.sleep(1)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n正在停止调度器...")
|
|
await system.stop_scheduler()
|
|
print("调度器已停止")
|
|
|
|
async def _run_status(self, system):
|
|
"""运行状态命令"""
|
|
print("正在检查系统状态...")
|
|
|
|
# 获取系统状态
|
|
status = await system.check_system_status()
|
|
|
|
# 显示状态信息
|
|
print("\n=== 系统状态报告 ===")
|
|
print(f"数据库连接: {'正常' if status['database'] else '异常'}")
|
|
print(f"数据采集器: {'正常' if status['collectors'] else '异常'}")
|
|
print(f"定时任务: {'运行中' if status['scheduler'] else '已停止'}")
|
|
|
|
if 'data_counts' in status:
|
|
print("\n=== 数据统计 ===")
|
|
for data_type, count in status['data_counts'].items():
|
|
print(f"{data_type}: {count} 条记录")
|
|
|
|
if 'last_update' in status:
|
|
print(f"\n最后更新时间: {status['last_update']}")
|
|
|
|
async def _run_update(self, system):
|
|
"""运行更新命令"""
|
|
print("开始手动更新数据...")
|
|
|
|
# 选择更新类型
|
|
print("请选择要更新的数据类型:")
|
|
print("1. 股票基础信息")
|
|
print("2. 日K线数据")
|
|
print("3. 财务报告数据")
|
|
print("4. 全部数据")
|
|
|
|
choice = input("请输入选择 (1-4): ").strip()
|
|
|
|
update_types = {
|
|
'1': 'basic',
|
|
'2': 'kline',
|
|
'3': 'financial',
|
|
'4': 'all'
|
|
}
|
|
|
|
if choice not in update_types:
|
|
print("无效选择")
|
|
return
|
|
|
|
update_type = update_types[choice]
|
|
|
|
# 执行更新
|
|
result = await system.update_data(update_type)
|
|
|
|
if result:
|
|
print(f"{update_type} 数据更新完成")
|
|
else:
|
|
print(f"{update_type} 数据更新失败")
|
|
|
|
async def _run_test(self):
|
|
"""运行测试命令"""
|
|
print("开始运行测试...")
|
|
|
|
# 运行pytest
|
|
import subprocess
|
|
|
|
try:
|
|
result = subprocess.run([
|
|
sys.executable, '-m', 'pytest', 'tests/',
|
|
'-v', '--tb=short', '--color=yes'
|
|
], cwd=os.path.dirname(__file__))
|
|
|
|
if result.returncode == 0:
|
|
print("所有测试通过")
|
|
else:
|
|
print("部分测试失败")
|
|
|
|
except Exception as e:
|
|
print(f"运行测试时发生错误: {e}")
|
|
|
|
async def _run_performance(self):
|
|
"""运行性能测试命令"""
|
|
print("开始运行性能测试...")
|
|
|
|
# 运行性能测试
|
|
import subprocess
|
|
|
|
try:
|
|
result = subprocess.run([
|
|
sys.executable, '-m', 'pytest',
|
|
'tests/test_performance.py',
|
|
'-v', '--tb=short', '--color=yes'
|
|
], cwd=os.path.dirname(__file__))
|
|
|
|
if result.returncode == 0:
|
|
print("性能测试完成")
|
|
else:
|
|
print("性能测试失败")
|
|
|
|
except Exception as e:
|
|
print(f"运行性能测试时发生错误: {e}")
|
|
|
|
|
|
def main():
|
|
"""主函数"""
|
|
cli = CommandLineInterface()
|
|
args = cli.parser.parse_args()
|
|
|
|
# 运行命令
|
|
asyncio.run(cli.run(args))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main() |