feat: 优化日志记录

This commit is contained in:
Cassianvale
2025-03-04 16:00:37 +08:00
parent 6df78314d6
commit e28b1342c3
4 changed files with 93 additions and 57 deletions

View File

@@ -2,9 +2,7 @@ from loguru import logger
import sys import sys
import os import os
from datetime import datetime from datetime import datetime
import shutil
# 获取当前时间作为日志文件名的一部分
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
# 创建日志目录 # 创建日志目录
log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs") log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs")
@@ -17,42 +15,54 @@ logger.remove() # 移除默认的处理器
logger.add( logger.add(
sys.stdout, sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>", format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="DEBUG" level="INFO", # 同时显示在控制台和写入到日志文件中
) )
# 添加文件处理器debug级别 # 添加统一的日志文件处理器,按日期自动轮转
logger.add( logger.add(
os.path.join(log_dir, f"debug_{current_time}.log"), os.path.join(log_dir, "stock_scanner_{time:YYYY-MM-DD}.log"),
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{line} - {message}", format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{line} - {message}",
level="DEBUG", level="DEBUG",
rotation="100 MB", rotation="00:00", # 每天午夜轮转
retention="1 week" retention="7 days", # 保留7天的日志
compression="zip", # 压缩旧日志文件
enqueue=True # 使用队列写入,提高性能
) )
# 添加文件处理器error级别 # 添加错误日志文件处理器,专门记录错误信息
logger.add( logger.add(
os.path.join(log_dir, f"error_{current_time}.log"), os.path.join(log_dir, "error_{time:YYYY-MM-DD}.log"),
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{line} - {message}", format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{line} - {message}",
level="ERROR", level="ERROR",
rotation="100 MB", rotation="00:00", # 每天午夜轮转
retention="1 month" retention="7 days", # 保留7天的错误日志
compression="zip", # 压缩旧日志文件
enqueue=True # 使用队列写入,提高性能
) )
# 添加流处理器(用于记录流式输出) def clean_old_logs(max_days=7):
logger.add( """清理超过指定天数的日志文件"""
os.path.join(log_dir, f"stream_{current_time}.log"), try:
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {message}", today = datetime.now()
filter=lambda record: "STREAM" in record["extra"], for filename in os.listdir(log_dir):
level="INFO" file_path = os.path.join(log_dir, filename)
) # 跳过目录
if os.path.isdir(file_path):
continue
# 创建专用于流式输出的日志器 # 检查文件修改时间
stream_logger = logger.bind(STREAM=True) file_time = datetime.fromtimestamp(os.path.getmtime(file_path))
days_old = (today - file_time).days
# 如果文件超过指定天数,删除它
if days_old > max_days:
os.remove(file_path)
logger.info(f"已删除过期日志文件: {filename}")
except Exception as e:
logger.error(f"清理日志文件时出错: {e}")
def get_logger(): def get_logger():
"""获取通用日志器""" """获取通用日志器"""
# 启动时清理旧日志
clean_old_logs()
return logger return logger
def get_stream_logger():
"""获取流式输出专用日志器"""
return stream_logger

View File

@@ -6,11 +6,10 @@ import requests
from typing import Dict, List, Optional, Tuple, Generator from typing import Dict, List, Optional, Tuple, Generator
from dotenv import load_dotenv from dotenv import load_dotenv
import json import json
from logger import get_logger, get_stream_logger from logger import get_logger
# 获取日志器 # 获取日志器
logger = get_logger() logger = get_logger()
stream_logger = get_stream_logger()
class StockAnalyzer: class StockAnalyzer:
def __init__(self, initial_cash=1000000, custom_api_url=None, custom_api_key=None, custom_api_model=None): def __init__(self, initial_cash=1000000, custom_api_url=None, custom_api_key=None, custom_api_model=None):
@@ -246,7 +245,7 @@ class StockAnalyzer:
请基于技术指标和市场动态进行分析,给出具体数据支持。 请基于技术指标和市场动态进行分析,给出具体数据支持。
""" """
logger.debug(f"生成的AI分析提示词: {prompt[:100]}...") logger.debug(f"生成的AI分析提示词: {self._truncate_json_for_logging(prompt, 100)}...")
# 检查API配置 # 检查API配置
if not self.API_URL: if not self.API_URL:
@@ -289,7 +288,7 @@ class StockAnalyzer:
try: try:
logger.debug(f"发起流式API请求: {api_url}") logger.debug(f"发起流式API请求: {api_url}")
logger.debug(f"请求载荷: {json.dumps(payload, indent=2)}") logger.debug(f"请求载荷: {self._truncate_json_for_logging(payload)}")
response = requests.post( response = requests.post(
api_url, api_url,
@@ -307,7 +306,7 @@ class StockAnalyzer:
else: else:
try: try:
error_response = response.json() error_response = response.json()
error_text = json.dumps(error_response, indent=2) error_text = self._truncate_json_for_logging(error_response)
except: except:
error_text = response.text[:500] if response.text else "无响应内容" error_text = response.text[:500] if response.text else "无响应内容"
@@ -343,7 +342,7 @@ class StockAnalyzer:
else: else:
try: try:
error_response = response.json() error_response = response.json()
error_text = json.dumps(error_response, indent=2) error_text = self._truncate_json_for_logging(error_response)
except: except:
error_text = response.text[:500] if response.text else "无响应内容" error_text = response.text[:500] if response.text else "无响应内容"
@@ -365,11 +364,26 @@ class StockAnalyzer:
if stream: if stream:
logger.debug("在流式模式下返回异常信息") logger.debug("在流式模式下返回异常信息")
error_json = json.dumps({"stock_code": stock_code, "error": error_msg}) error_json = json.dumps({"stock_code": stock_code, "error": error_msg})
stream_logger.info(f"流式异常输出: {error_json}") logger.info(f"流式异常输出: {error_json}")
yield error_json yield error_json
else: else:
return error_msg return error_msg
def _truncate_json_for_logging(self, json_obj, max_length=500):
"""截断JSON对象用于日志记录避免日志过大
Args:
json_obj: 要截断的JSON对象
max_length: 最大字符长度默认500
Returns:
str: 截断后的JSON字符串
"""
json_str = json.dumps(json_obj, ensure_ascii=False)
if len(json_str) <= max_length:
return json_str
return json_str[:max_length] + f"... [截断,总长度: {len(json_str)}字符]"
def _process_ai_stream(self, response, stock_code) -> Generator[str, None, None]: def _process_ai_stream(self, response, stock_code) -> Generator[str, None, None]:
"""处理AI流式响应""" """处理AI流式响应"""
logger.info(f"开始处理股票 {stock_code} 的AI流式响应") logger.info(f"开始处理股票 {stock_code} 的AI流式响应")
@@ -380,7 +394,6 @@ class StockAnalyzer:
for line in response.iter_lines(): for line in response.iter_lines():
if line: if line:
line = line.decode('utf-8') line = line.decode('utf-8')
stream_logger.info(f"原始流式行: {line}")
# 跳过保持连接的空行 # 跳过保持连接的空行
if line.strip() == '': if line.strip() == '':
@@ -390,7 +403,6 @@ class StockAnalyzer:
# 数据行通常以"data: "开头 # 数据行通常以"data: "开头
if line.startswith('data: '): if line.startswith('data: '):
data_content = line[6:] # 移除 "data: " 前缀 data_content = line[6:] # 移除 "data: " 前缀
stream_logger.info(f"数据内容: {data_content}")
# 检查是否为流的结束 # 检查是否为流的结束
if data_content.strip() == '[DONE]': if data_content.strip() == '[DONE]':
@@ -399,7 +411,6 @@ class StockAnalyzer:
try: try:
json_data = json.loads(data_content) json_data = json.loads(data_content)
logger.debug(f"解析的JSON数据: {json.dumps(json_data)[:100]}...")
if 'choices' in json_data: if 'choices' in json_data:
delta = json_data['choices'][0].get('delta', {}) delta = json_data['choices'][0].get('delta', {})
@@ -408,15 +419,12 @@ class StockAnalyzer:
if content: if content:
chunk_count += 1 chunk_count += 1
buffer += content buffer += content
logger.debug(f"收到内容片段 #{chunk_count}: {content}")
stream_logger.info(f"发送内容片段: {content}")
# 创建包含AI分析片段的JSON # 创建包含AI分析片段的JSON
chunk_json = json.dumps({ chunk_json = json.dumps({
"stock_code": stock_code, "stock_code": stock_code,
"ai_analysis_chunk": content "ai_analysis_chunk": content
}) })
stream_logger.info(f"流式输出JSON: {chunk_json}")
yield chunk_json yield chunk_json
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
logger.error(f"JSON解析错误: {str(e)}, 行内容: {data_content}") logger.error(f"JSON解析错误: {str(e)}, 行内容: {data_content}")
@@ -488,7 +496,7 @@ class StockAnalyzer:
'volume_status': 'HIGH' if latest['Volume_Ratio'] > 1.5 else 'NORMAL', 'volume_status': 'HIGH' if latest['Volume_Ratio'] > 1.5 else 'NORMAL',
'recommendation': self.get_recommendation(score) 'recommendation': self.get_recommendation(score)
} }
logger.debug(f"生成股票 {stock_code} 基础报告: {json.dumps(report)[:100]}...") logger.debug(f"生成股票 {stock_code} 基础报告: {self._truncate_json_for_logging(report, 100)}...")
if stream: if stream:
logger.info(f"以流式模式返回股票 {stock_code} 分析结果") logger.info(f"以流式模式返回股票 {stock_code} 分析结果")
@@ -496,8 +504,8 @@ class StockAnalyzer:
base_report = dict(report) base_report = dict(report)
base_report['ai_analysis'] = '' base_report['ai_analysis'] = ''
base_report_json = json.dumps(base_report) base_report_json = json.dumps(base_report)
logger.debug(f"基础报告JSON: {base_report_json[:100]}...") logger.debug(f"基础报告JSON: {self._truncate_json_for_logging(base_report_json, 100)}...")
stream_logger.info(f"发送基础报告: {base_report_json}") logger.info(f"发送基础报告: {base_report_json}")
yield base_report_json yield base_report_json
# 然后流式返回AI分析部分 # 然后流式返回AI分析部分
@@ -505,7 +513,6 @@ class StockAnalyzer:
ai_chunks_count = 0 ai_chunks_count = 0
for ai_chunk in self.get_ai_analysis(df, stock_code, stream=True): for ai_chunk in self.get_ai_analysis(df, stock_code, stream=True):
ai_chunks_count += 1 ai_chunks_count += 1
stream_logger.info(f"股票 {stock_code} 流式块 #{ai_chunks_count}: {ai_chunk}")
yield ai_chunk yield ai_chunk
logger.info(f"股票 {stock_code} 流式AI分析完成共发送 {ai_chunks_count} 个块") logger.info(f"股票 {stock_code} 流式AI分析完成共发送 {ai_chunks_count} 个块")
else: else:
@@ -522,7 +529,7 @@ class StockAnalyzer:
if stream: if stream:
error_json = json.dumps({'stock_code': stock_code, 'error': error_msg}) error_json = json.dumps({'stock_code': stock_code, 'error': error_msg})
stream_logger.info(f"流式错误输出: {error_json}") logger.info(f"流式错误输出: {error_json}")
yield error_json yield error_json
else: else:
raise raise
@@ -564,7 +571,6 @@ class StockAnalyzer:
chunk_count = 0 chunk_count = 0
for chunk in self.analyze_stock(stock_code, market_type, stream=True): for chunk in self.analyze_stock(stock_code, market_type, stream=True):
chunk_count += 1 chunk_count += 1
stream_logger.info(f"股票 {stock_code} 流式块 #{chunk_count}: {chunk}")
yield chunk yield chunk
logger.debug(f"股票 {stock_code} 流式分析完成,共 {chunk_count} 个块") logger.debug(f"股票 {stock_code} 流式分析完成,共 {chunk_count} 个块")
except Exception as e: except Exception as e:
@@ -572,6 +578,6 @@ class StockAnalyzer:
logger.error(error_msg) logger.error(error_msg)
logger.exception(e) logger.exception(e)
error_json = json.dumps({'stock_code': stock_code, 'error': error_msg}) error_json = json.dumps({'stock_code': stock_code, 'error': error_msg})
stream_logger.info(f"流式错误输出: {error_json}") logger.info(f"流式错误输出: {error_json}")
yield error_json yield error_json
logger.info(f"流式扫描完成,处理了 {stock_count} 只股票") logger.info(f"流式扫描完成,处理了 {stock_count} 只股票")

View File

@@ -8,6 +8,25 @@ from dotenv import load_dotenv
logger = get_logger() logger = get_logger()
stream_logger = get_stream_logger() stream_logger = get_stream_logger()
def _truncate_json_for_logging(json_obj, max_length=500):
"""截断JSON对象用于日志记录避免日志过大
Args:
json_obj: 要截断的JSON对象
max_length: 最大字符长度默认500
Returns:
str: 截断后的JSON字符串
"""
if isinstance(json_obj, str):
json_str = json_obj
else:
json_str = json.dumps(json_obj, ensure_ascii=False)
if len(json_str) <= max_length:
return json_str
return json_str[:max_length] + f"... [截断,总长度: {len(json_str)}字符]"
def test_api_stream(): def test_api_stream():
""" """
测试API流式响应功能 测试API流式响应功能
@@ -57,7 +76,7 @@ def test_api_stream():
"stream": True # 明确设置stream参数为True "stream": True # 明确设置stream参数为True
} }
logger.debug(f"请求载荷: {json.dumps(payload, indent=2)}") logger.debug(f"请求载荷: {_truncate_json_for_logging(payload)}")
try: try:
logger.info(f"发起流式API请求: {api_url}") logger.info(f"发起流式API请求: {api_url}")
@@ -102,7 +121,7 @@ def test_api_stream():
try: try:
# 解析JSON数据 # 解析JSON数据
json_data = json.loads(data_content) json_data = json.loads(data_content)
logger.debug(f"JSON结构: {json.dumps(json_data, indent=2)}") logger.debug(f"JSON结构: {_truncate_json_for_logging(json_data)}")
if 'choices' in json_data: if 'choices' in json_data:
delta = json_data['choices'][0].get('delta', {}) delta = json_data['choices'][0].get('delta', {})

View File

@@ -5,11 +5,10 @@ import threading
import os import os
import traceback import traceback
import requests import requests
from logger import get_logger, get_stream_logger from logger import get_logger
# 获取日志器 # 获取日志器
logger = get_logger() logger = get_logger()
stream_logger = get_stream_logger()
app = Flask(__name__) app = Flask(__name__)
analyzer = StockAnalyzer() analyzer = StockAnalyzer()
@@ -62,31 +61,33 @@ def analyze():
stock_code = stock_codes[0].strip() stock_code = stock_codes[0].strip()
logger.info(f"开始单股流式分析: {stock_code}") logger.info(f"开始单股流式分析: {stock_code}")
stream_logger.info(f"初始化单股分析流: {stock_code}")
init_message = f'{{"stream_type": "single", "stock_code": "{stock_code}"}}\n' init_message = f'{{"stream_type": "single", "stock_code": "{stock_code}"}}\n'
stream_logger.info(f"发送初始化消息: {init_message}")
yield init_message yield init_message
logger.debug(f"开始处理股票 {stock_code} 的流式响应")
chunk_count = 0
for chunk in custom_analyzer.analyze_stock(stock_code, market_type, stream=True): for chunk in custom_analyzer.analyze_stock(stock_code, market_type, stream=True):
stream_logger.info(f"流式输出块: {chunk}") chunk_count += 1
yield chunk + '\n' yield chunk + '\n'
logger.info(f"股票 {stock_code} 流式分析完成,共发送 {chunk_count} 个块")
else: else:
# 批量分析流式处理 # 批量分析流式处理
logger.info(f"开始批量流式分析: {stock_codes}") logger.info(f"开始批量流式分析: {stock_codes}")
stream_logger.info(f"初始化批量分析流: {stock_codes}")
init_message = f'{{"stream_type": "batch", "stock_codes": {stock_codes}}}\n' init_message = f'{{"stream_type": "batch", "stock_codes": {stock_codes}}}\n'
stream_logger.info(f"发送初始化消息: {init_message}")
yield init_message yield init_message
logger.debug(f"开始处理批量股票的流式响应")
chunk_count = 0
for chunk in custom_analyzer.scan_market( for chunk in custom_analyzer.scan_market(
[code.strip() for code in stock_codes], [code.strip() for code in stock_codes],
min_score=0, min_score=0,
market_type=market_type, market_type=market_type,
stream=True stream=True
): ):
stream_logger.info(f"流式输出块: {chunk}") chunk_count += 1
yield chunk + '\n' yield chunk + '\n'
logger.info(f"批量流式分析完成,共发送 {chunk_count} 个块")
logger.info("成功创建流式响应生成器") logger.info("成功创建流式响应生成器")
return Response(stream_with_context(generate()), mimetype='application/json') return Response(stream_with_context(generate()), mimetype='application/json')