From e28b1342c37e16e6b673e8cd3b294c39c87b67b1 Mon Sep 17 00:00:00 2001 From: Cassianvale Date: Tue, 4 Mar 2025 16:00:37 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- logger.py | 62 +++++++++++++++++++++++++------------------- stock_analyzer.py | 48 +++++++++++++++++++--------------- tests/test_stream.py | 23 ++++++++++++++-- web_server.py | 17 ++++++------ 4 files changed, 93 insertions(+), 57 deletions(-) diff --git a/logger.py b/logger.py index 8313c52..4a941b2 100644 --- a/logger.py +++ b/logger.py @@ -2,9 +2,7 @@ from loguru import logger import sys import os from datetime import datetime - -# 获取当前时间作为日志文件名的一部分 -current_time = datetime.now().strftime("%Y%m%d_%H%M%S") +import shutil # 创建日志目录 log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs") @@ -17,42 +15,54 @@ logger.remove() # 移除默认的处理器 logger.add( sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{line} - {message}", - level="DEBUG" + level="INFO", # 同时显示在控制台和写入到日志文件中 ) -# 添加文件处理器(debug级别) +# 添加统一的日志文件处理器,按日期自动轮转 logger.add( - os.path.join(log_dir, f"debug_{current_time}.log"), + os.path.join(log_dir, "stock_scanner_{time:YYYY-MM-DD}.log"), format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{line} - {message}", level="DEBUG", - rotation="100 MB", - retention="1 week" + rotation="00:00", # 每天午夜轮转 + retention="7 days", # 保留7天的日志 + compression="zip", # 压缩旧日志文件 + enqueue=True # 使用队列写入,提高性能 ) -# 添加文件处理器(error级别) +# 添加错误日志文件处理器,专门记录错误信息 logger.add( - os.path.join(log_dir, f"error_{current_time}.log"), + os.path.join(log_dir, "error_{time:YYYY-MM-DD}.log"), format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{line} - {message}", level="ERROR", - rotation="100 MB", - retention="1 month" + rotation="00:00", # 每天午夜轮转 + retention="7 days", # 保留7天的错误日志 + compression="zip", # 压缩旧日志文件 + enqueue=True # 使用队列写入,提高性能 ) -# 添加流处理器(用于记录流式输出) -logger.add( - os.path.join(log_dir, f"stream_{current_time}.log"), - format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {message}", - filter=lambda record: "STREAM" in record["extra"], - level="INFO" -) - -# 创建专用于流式输出的日志器 -stream_logger = logger.bind(STREAM=True) +def clean_old_logs(max_days=7): + """清理超过指定天数的日志文件""" + try: + today = datetime.now() + for filename in os.listdir(log_dir): + file_path = os.path.join(log_dir, filename) + # 跳过目录 + if os.path.isdir(file_path): + continue + + # 检查文件修改时间 + file_time = datetime.fromtimestamp(os.path.getmtime(file_path)) + days_old = (today - file_time).days + + # 如果文件超过指定天数,删除它 + if days_old > max_days: + os.remove(file_path) + logger.info(f"已删除过期日志文件: {filename}") + except Exception as e: + logger.error(f"清理日志文件时出错: {e}") def get_logger(): """获取通用日志器""" + # 启动时清理旧日志 + clean_old_logs() return logger - -def get_stream_logger(): - """获取流式输出专用日志器""" - return stream_logger diff --git a/stock_analyzer.py b/stock_analyzer.py index 7d040ca..369faef 100644 --- a/stock_analyzer.py +++ b/stock_analyzer.py @@ -6,11 +6,10 @@ import requests from typing import Dict, List, Optional, Tuple, Generator from dotenv import load_dotenv import json -from logger import get_logger, get_stream_logger +from logger import get_logger # 获取日志器 logger = get_logger() -stream_logger = get_stream_logger() class StockAnalyzer: def __init__(self, initial_cash=1000000, custom_api_url=None, custom_api_key=None, custom_api_model=None): @@ -246,7 +245,7 @@ class StockAnalyzer: 请基于技术指标和市场动态进行分析,给出具体数据支持。 """ - logger.debug(f"生成的AI分析提示词: {prompt[:100]}...") + logger.debug(f"生成的AI分析提示词: {self._truncate_json_for_logging(prompt, 100)}...") # 检查API配置 if not self.API_URL: @@ -289,7 +288,7 @@ class StockAnalyzer: try: logger.debug(f"发起流式API请求: {api_url}") - logger.debug(f"请求载荷: {json.dumps(payload, indent=2)}") + logger.debug(f"请求载荷: {self._truncate_json_for_logging(payload)}") response = requests.post( api_url, @@ -307,7 +306,7 @@ class StockAnalyzer: else: try: error_response = response.json() - error_text = json.dumps(error_response, indent=2) + error_text = self._truncate_json_for_logging(error_response) except: error_text = response.text[:500] if response.text else "无响应内容" @@ -343,7 +342,7 @@ class StockAnalyzer: else: try: error_response = response.json() - error_text = json.dumps(error_response, indent=2) + error_text = self._truncate_json_for_logging(error_response) except: error_text = response.text[:500] if response.text else "无响应内容" @@ -365,11 +364,26 @@ class StockAnalyzer: if stream: logger.debug("在流式模式下返回异常信息") error_json = json.dumps({"stock_code": stock_code, "error": error_msg}) - stream_logger.info(f"流式异常输出: {error_json}") + logger.info(f"流式异常输出: {error_json}") yield error_json else: return error_msg + def _truncate_json_for_logging(self, json_obj, max_length=500): + """截断JSON对象用于日志记录,避免日志过大 + + Args: + json_obj: 要截断的JSON对象 + max_length: 最大字符长度,默认500 + + Returns: + str: 截断后的JSON字符串 + """ + json_str = json.dumps(json_obj, ensure_ascii=False) + if len(json_str) <= max_length: + return json_str + return json_str[:max_length] + f"... [截断,总长度: {len(json_str)}字符]" + def _process_ai_stream(self, response, stock_code) -> Generator[str, None, None]: """处理AI流式响应""" logger.info(f"开始处理股票 {stock_code} 的AI流式响应") @@ -380,7 +394,6 @@ class StockAnalyzer: for line in response.iter_lines(): if line: line = line.decode('utf-8') - stream_logger.info(f"原始流式行: {line}") # 跳过保持连接的空行 if line.strip() == '': @@ -390,7 +403,6 @@ class StockAnalyzer: # 数据行通常以"data: "开头 if line.startswith('data: '): data_content = line[6:] # 移除 "data: " 前缀 - stream_logger.info(f"数据内容: {data_content}") # 检查是否为流的结束 if data_content.strip() == '[DONE]': @@ -399,7 +411,6 @@ class StockAnalyzer: try: json_data = json.loads(data_content) - logger.debug(f"解析的JSON数据: {json.dumps(json_data)[:100]}...") if 'choices' in json_data: delta = json_data['choices'][0].get('delta', {}) @@ -408,15 +419,12 @@ class StockAnalyzer: if content: chunk_count += 1 buffer += content - logger.debug(f"收到内容片段 #{chunk_count}: {content}") - stream_logger.info(f"发送内容片段: {content}") - + # 创建包含AI分析片段的JSON chunk_json = json.dumps({ "stock_code": stock_code, "ai_analysis_chunk": content }) - stream_logger.info(f"流式输出JSON: {chunk_json}") yield chunk_json except json.JSONDecodeError as e: logger.error(f"JSON解析错误: {str(e)}, 行内容: {data_content}") @@ -488,7 +496,7 @@ class StockAnalyzer: 'volume_status': 'HIGH' if latest['Volume_Ratio'] > 1.5 else 'NORMAL', 'recommendation': self.get_recommendation(score) } - logger.debug(f"生成股票 {stock_code} 基础报告: {json.dumps(report)[:100]}...") + logger.debug(f"生成股票 {stock_code} 基础报告: {self._truncate_json_for_logging(report, 100)}...") if stream: logger.info(f"以流式模式返回股票 {stock_code} 分析结果") @@ -496,8 +504,8 @@ class StockAnalyzer: base_report = dict(report) base_report['ai_analysis'] = '' base_report_json = json.dumps(base_report) - logger.debug(f"基础报告JSON: {base_report_json[:100]}...") - stream_logger.info(f"发送基础报告: {base_report_json}") + logger.debug(f"基础报告JSON: {self._truncate_json_for_logging(base_report_json, 100)}...") + logger.info(f"发送基础报告: {base_report_json}") yield base_report_json # 然后流式返回AI分析部分 @@ -505,7 +513,6 @@ class StockAnalyzer: ai_chunks_count = 0 for ai_chunk in self.get_ai_analysis(df, stock_code, stream=True): ai_chunks_count += 1 - stream_logger.info(f"股票 {stock_code} 流式块 #{ai_chunks_count}: {ai_chunk}") yield ai_chunk logger.info(f"股票 {stock_code} 流式AI分析完成,共发送 {ai_chunks_count} 个块") else: @@ -522,7 +529,7 @@ class StockAnalyzer: if stream: error_json = json.dumps({'stock_code': stock_code, 'error': error_msg}) - stream_logger.info(f"流式错误输出: {error_json}") + logger.info(f"流式错误输出: {error_json}") yield error_json else: raise @@ -564,7 +571,6 @@ class StockAnalyzer: chunk_count = 0 for chunk in self.analyze_stock(stock_code, market_type, stream=True): chunk_count += 1 - stream_logger.info(f"股票 {stock_code} 流式块 #{chunk_count}: {chunk}") yield chunk logger.debug(f"股票 {stock_code} 流式分析完成,共 {chunk_count} 个块") except Exception as e: @@ -572,6 +578,6 @@ class StockAnalyzer: logger.error(error_msg) logger.exception(e) error_json = json.dumps({'stock_code': stock_code, 'error': error_msg}) - stream_logger.info(f"流式错误输出: {error_json}") + logger.info(f"流式错误输出: {error_json}") yield error_json logger.info(f"流式扫描完成,处理了 {stock_count} 只股票") diff --git a/tests/test_stream.py b/tests/test_stream.py index e501081..30cd508 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -8,6 +8,25 @@ from dotenv import load_dotenv logger = get_logger() stream_logger = get_stream_logger() +def _truncate_json_for_logging(json_obj, max_length=500): + """截断JSON对象用于日志记录,避免日志过大 + + Args: + json_obj: 要截断的JSON对象 + max_length: 最大字符长度,默认500 + + Returns: + str: 截断后的JSON字符串 + """ + if isinstance(json_obj, str): + json_str = json_obj + else: + json_str = json.dumps(json_obj, ensure_ascii=False) + + if len(json_str) <= max_length: + return json_str + return json_str[:max_length] + f"... [截断,总长度: {len(json_str)}字符]" + def test_api_stream(): """ 测试API流式响应功能 @@ -57,7 +76,7 @@ def test_api_stream(): "stream": True # 明确设置stream参数为True } - logger.debug(f"请求载荷: {json.dumps(payload, indent=2)}") + logger.debug(f"请求载荷: {_truncate_json_for_logging(payload)}") try: logger.info(f"发起流式API请求: {api_url}") @@ -102,7 +121,7 @@ def test_api_stream(): try: # 解析JSON数据 json_data = json.loads(data_content) - logger.debug(f"JSON结构: {json.dumps(json_data, indent=2)}") + logger.debug(f"JSON结构: {_truncate_json_for_logging(json_data)}") if 'choices' in json_data: delta = json_data['choices'][0].get('delta', {}) diff --git a/web_server.py b/web_server.py index f1c9f89..3a1bf7b 100644 --- a/web_server.py +++ b/web_server.py @@ -5,11 +5,10 @@ import threading import os import traceback import requests -from logger import get_logger, get_stream_logger +from logger import get_logger # 获取日志器 logger = get_logger() -stream_logger = get_stream_logger() app = Flask(__name__) analyzer = StockAnalyzer() @@ -62,31 +61,33 @@ def analyze(): stock_code = stock_codes[0].strip() logger.info(f"开始单股流式分析: {stock_code}") - stream_logger.info(f"初始化单股分析流: {stock_code}") init_message = f'{{"stream_type": "single", "stock_code": "{stock_code}"}}\n' - stream_logger.info(f"发送初始化消息: {init_message}") yield init_message + logger.debug(f"开始处理股票 {stock_code} 的流式响应") + chunk_count = 0 for chunk in custom_analyzer.analyze_stock(stock_code, market_type, stream=True): - stream_logger.info(f"流式输出块: {chunk}") + chunk_count += 1 yield chunk + '\n' + logger.info(f"股票 {stock_code} 流式分析完成,共发送 {chunk_count} 个块") else: # 批量分析流式处理 logger.info(f"开始批量流式分析: {stock_codes}") - stream_logger.info(f"初始化批量分析流: {stock_codes}") init_message = f'{{"stream_type": "batch", "stock_codes": {stock_codes}}}\n' - stream_logger.info(f"发送初始化消息: {init_message}") yield init_message + logger.debug(f"开始处理批量股票的流式响应") + chunk_count = 0 for chunk in custom_analyzer.scan_market( [code.strip() for code in stock_codes], min_score=0, market_type=market_type, stream=True ): - stream_logger.info(f"流式输出块: {chunk}") + chunk_count += 1 yield chunk + '\n' + logger.info(f"批量流式分析完成,共发送 {chunk_count} 个块") logger.info("成功创建流式响应生成器") return Response(stream_with_context(generate()), mimetype='application/json')