ADD: 支持ETF、 LOF基金!

This commit is contained in:
兰志宏
2025-03-05 16:32:30 +08:00
parent f5ebe29782
commit 25601b9bd6
4 changed files with 273 additions and 93 deletions

View File

@@ -37,8 +37,8 @@ class StockAnalyzer:
}
def get_stock_data(self, stock_code, market_type='A', start_date=None, end_date=None, ):
"""获取股票数据"""
def get_stock_data(self, stock_code, market_type='A', start_date=None, end_date=None):
"""获取股票或基金数据"""
import akshare as ak
if start_date is None:
@@ -55,7 +55,6 @@ class StockAnalyzer:
end_date=end_date,
adjust="qfq"
)
# A股数据列名映射
elif market_type == 'HK':
df = ak.stock_hk_daily(
symbol=stock_code,
@@ -68,10 +67,22 @@ class StockAnalyzer:
end_date=end_date,
adjust="qfq"
)
# elif market_type == 'CRYPTO':
# df = ak.crypto_js_spot(
# symbol=stock_code
# )
elif market_type == 'ETF':
df = ak.fund_etf_hist_em(
symbol=stock_code,
period="daily",
start_date=start_date,
end_date=end_date,
adjust="qfq"
)
elif market_type == 'LOF':
df = ak.fund_lof_hist_em(
symbol=stock_code,
period="daily",
start_date=start_date,
end_date=end_date,
adjust="qfq"
)
else:
raise ValueError(f"不支持的市场类型: {market_type}")
@@ -214,10 +225,10 @@ class StockAnalyzer:
print(f"计算评分时出错: {str(e)}")
raise
def get_ai_analysis(self, df, stock_code, stream=False):
def get_ai_analysis(self, df, stock_code, market_type='A', stream=False):
"""使用 OpenAI 进行 AI 分析"""
try:
logger.info(f"开始AI分析股票 {stock_code}, 流式模式: {stream}")
logger.info(f"开始AI分析 {stock_code}, 流式模式: {stream}")
recent_data = df.tail(14).to_dict('records')
technical_summary = {
@@ -227,25 +238,87 @@ class StockAnalyzer:
'rsi_level': df.iloc[-1]['RSI']
}
prompt = f"""
分析股票 {stock_code}
# 根据市场类型调整分析提示
if market_type in ['ETF', 'LOF']:
prompt = f"""
分析基金 {stock_code}
技术指标概要:
{technical_summary}
近14日交易数据
{recent_data}
请提供:
1. 趋势分析(包含支撑位和压力位)
2. 成交量分析及其含义
3. 风险评估(包含波动率分析)
4. 短期和中期目标价位
5. 关键技术位分析
6. 具体交易建议(包含止损位)
请基于技术指标和市场动态进行分析,给出具体数据支持。
"""
技术指标概要:
{technical_summary}
近14日交易数据
{recent_data}
请提供:
1. 净值走势分析(包含支撑位和压力位)
2. 成交量分析及其对净值的影响
3. 风险评估(包含波动率和折溢价分析)
4. 短期和中期净值预测
5. 关键价格位分析
6. 申购赎回建议(包含止损位)
请基于技术指标和市场表现进行分析,给出具体数据支持。
"""
elif market_type == 'US':
prompt = f"""
分析美股 {stock_code}
技术指标概要:
{technical_summary}
近14日交易数据
{recent_data}
请提供:
1. 趋势分析(包含支撑位和压力位,美元计价)
2. 成交量分析及其含义
3. 风险评估(包含波动率和美股市场特有风险)
4. 短期和中期目标价位(美元)
5. 关键技术位分析
6. 具体交易建议(包含止损位)
请基于技术指标和美股市场特点进行分析,给出具体数据支持。
"""
elif market_type == 'HK':
prompt = f"""
分析港股 {stock_code}
技术指标概要:
{technical_summary}
近14日交易数据
{recent_data}
请提供:
1. 趋势分析(包含支撑位和压力位,港币计价)
2. 成交量分析及其含义
3. 风险评估(包含波动率和港股市场特有风险)
4. 短期和中期目标价位(港币)
5. 关键技术位分析
6. 具体交易建议(包含止损位)
请基于技术指标和港股市场特点进行分析,给出具体数据支持。
"""
else: # A股
prompt = f"""
分析A股 {stock_code}
技术指标概要:
{technical_summary}
近14日交易数据
{recent_data}
请提供:
1. 趋势分析(包含支撑位和压力位)
2. 成交量分析及其含义
3. 风险评估(包含波动率分析)
4. 短期和中期目标价位
5. 关键技术位分析
6. 具体交易建议(包含止损位)
请基于技术指标和A股市场特点进行分析给出具体数据支持。
"""
logger.debug(f"生成的AI分析提示词: {self._truncate_json_for_logging(prompt, 100)}...")
@@ -381,7 +454,7 @@ class StockAnalyzer:
def _process_ai_stream(self, response, stock_code) -> Generator[str, None, None]:
"""处理AI流式响应"""
logger.info(f"开始处理股票 {stock_code} 的AI流式响应")
logger.info(f"开始处理 {stock_code} 的AI流式响应")
buffer = ""
chunk_count = 0
@@ -457,49 +530,45 @@ class StockAnalyzer:
return '强烈建议卖出'
def analyze_stock(self, stock_code, market_type='A', stream=False):
"""分析单个股票"""
"""分析股票或基金"""
try:
logger.info(f"开始分析股票: {stock_code}, 市场: {market_type}, 流式模式: {stream}")
logger.info(f"开始分析: {stock_code}, 市场: {market_type}, 流式模式: {stream}")
# 获取股票数据
logger.debug(f"获取股票 {stock_code} 数据")
# 获取数据
logger.debug(f"获取 {stock_code} 数据")
df = self.get_stock_data(stock_code, market_type)
# 计算技术指标
logger.debug(f"计算股票 {stock_code} 技术指标")
logger.debug(f"计算 {stock_code} 技术指标")
df = self.calculate_indicators(df)
# 评分系统
logger.debug(f"计算股票 {stock_code} 评分")
logger.debug(f"计算 {stock_code} 评分")
score = self.calculate_score(df)
logger.info(f"股票 {stock_code} 评分结果: {score}")
logger.info(f"{stock_code} 评分结果: {score}")
# 获取最新数据
latest = df.iloc[-1]
prev = df.iloc[-2]
# 处理 RSI 的 NaN 值
rsi_value = latest['RSI']
if pd.isna(rsi_value):
rsi_value = None
# 生成报告(保持原有格式)
# 生成报告
report = {
'stock_code': stock_code,
'market_type': market_type, # 添加市场类型
'analysis_date': datetime.now().strftime('%Y-%m-%d'),
'score': score,
'price': latest['close'],
'price_change': (latest['close'] - prev['close']) / prev['close'] * 100,
'ma_trend': 'UP' if latest['MA5'] > latest['MA20'] else 'DOWN',
'rsi': rsi_value, # 使用处理后的 RSI 值
'rsi': latest['RSI'] if not pd.isna(latest['RSI']) else None,
'macd_signal': 'BUY' if latest['MACD'] > latest['Signal'] else 'SELL',
'volume_status': 'HIGH' if latest['Volume_Ratio'] > 1.5 else 'NORMAL',
'recommendation': self.get_recommendation(score)
}
logger.debug(f"生成股票 {stock_code} 基础报告: {self._truncate_json_for_logging(report, 100)}...")
logger.debug(f"生成 {stock_code} 基础报告: {self._truncate_json_for_logging(report, 100)}...")
if stream:
logger.info(f"以流式模式返回股票 {stock_code} 分析结果")
logger.info(f"以流式模式返回 {stock_code} 分析结果")
# 先返回基本报告结构
base_report = dict(report)
base_report['ai_analysis'] = ''
@@ -509,21 +578,21 @@ class StockAnalyzer:
yield base_report_json
# 然后流式返回AI分析部分
logger.debug(f"开始获取股票 {stock_code} 的流式AI分析")
logger.debug(f"开始获取 {stock_code} 的流式AI分析")
ai_chunks_count = 0
for ai_chunk in self.get_ai_analysis(df, stock_code, stream=True):
for ai_chunk in self.get_ai_analysis(df, stock_code, market_type, stream=True):
ai_chunks_count += 1
yield ai_chunk
logger.info(f"股票 {stock_code} 流式AI分析完成共发送 {ai_chunks_count} 个块")
logger.info(f" {stock_code} 流式AI分析完成共发送 {ai_chunks_count} 个块")
else:
logger.info(f"以非流式模式返回股票 {stock_code} 分析结果")
logger.debug(f"开始获取股票 {stock_code} 的AI分析")
report['ai_analysis'] = self.get_ai_analysis(df, stock_code)
logger.info(f"以非流式模式返回 {stock_code} 分析结果")
logger.debug(f"开始获取 {stock_code} 的AI分析")
report['ai_analysis'] = self.get_ai_analysis(df, stock_code, market_type)
logger.debug(f"AI分析结果长度: {len(report['ai_analysis'])}")
return report
except Exception as e:
error_msg = f"分析股票 {stock_code} 时出错: {str(e)}"
error_msg = f"分析 {stock_code} 时出错: {str(e)}"
logger.error(error_msg)
logger.exception(e)
@@ -535,23 +604,23 @@ class StockAnalyzer:
raise
def scan_market(self, stock_list, min_score=60, market_type='A', stream=False):
"""扫描市场,寻找符合条件的股票"""
logger.info(f"开始扫描市场,股票数量: {len(stock_list)}, 最低分数: {min_score}, 市场: {market_type}, 流式模式: {stream}")
"""扫描市场,寻找符合条件的"""
logger.info(f"开始扫描市场,数量: {len(stock_list)}, 最低分数: {min_score}, 市场: {market_type}, 流式模式: {stream}")
if not stream:
recommendations = []
for stock_code in stock_list:
try:
logger.debug(f"分析股票: {stock_code}")
logger.debug(f"分析: {stock_code}")
report = self.analyze_stock(stock_code, market_type)
if report['score'] >= min_score:
logger.info(f"股票 {stock_code} 评分 {report['score']} >= {min_score},添加到推荐列表")
logger.info(f" {stock_code} 评分 {report['score']} >= {min_score},添加到推荐列表")
recommendations.append(report)
else:
logger.debug(f"股票 {stock_code} 评分 {report['score']} < {min_score},不添加到推荐列表")
logger.debug(f" {stock_code} 评分 {report['score']} < {min_score},不添加到推荐列表")
except Exception as e:
logger.error(f"分析股票 {stock_code} 时出错: {str(e)}")
logger.error(f"分析 {stock_code} 时出错: {str(e)}")
logger.exception(e)
continue
@@ -565,16 +634,16 @@ class StockAnalyzer:
stock_count = 0
for stock_code in stock_list:
stock_count += 1
logger.debug(f"流式分析股票 {stock_code} ({stock_count}/{len(stock_list)})")
logger.debug(f"流式分析 {stock_code} ({stock_count}/{len(stock_list)})")
try:
# 分析单只股票并获取流式结果
chunk_count = 0
for chunk in self.analyze_stock(stock_code, market_type, stream=True):
chunk_count += 1
yield chunk
logger.debug(f"股票 {stock_code} 流式分析完成,共 {chunk_count} 个块")
logger.debug(f" {stock_code} 流式分析完成,共 {chunk_count} 个块")
except Exception as e:
error_msg = f"分析股票 {stock_code} 时出错: {str(e)}"
error_msg = f"分析 {stock_code} 时出错: {str(e)}"
logger.error(error_msg)
logger.exception(e)
error_json = json.dumps({'stock_code': stock_code, 'error': error_msg})