Files
stock-scanner/services/fund_service_async.py
2025-03-11 10:13:32 +08:00

228 lines
8.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import pandas as pd
from typing import List, Dict, Any, Optional
from utils.logger import get_logger
from datetime import datetime, timedelta
# 获取日志器
logger = get_logger()
class FundServiceAsync:
"""
异步基金服务
提供基金数据的异步搜索和获取功能
"""
def __init__(self):
"""初始化异步基金服务"""
logger.debug("初始化FundServiceAsync")
# 添加缓存
self._etf_cache = None
self._lof_cache = None
self._cache_timestamp = None
self._cache_duration = timedelta(minutes=30) # 缓存30分钟
async def search_funds(self, keyword: str, market_type: str = 'ETF') -> List[Dict[str, Any]]:
"""
异步搜索基金代码
Args:
keyword: 搜索关键词
market_type: 市场类型,'ETF''LOF'
Returns:
匹配的基金列表
"""
try:
logger.info(f"异步搜索基金: {keyword}, 类型: {market_type}")
# 获取基金数据
df = await self._get_funds_data(market_type)
# 模糊匹配搜索(同时匹配代码和名称)
mask = (df['name'].str.contains(keyword, case=False, na=False) |
df['symbol'].str.contains(keyword, case=False, na=False))
results = df[mask]
# 格式化返回结果并处理 NaN 值
formatted_results = []
for _, row in results.iterrows():
formatted_results.append({
'name': row['name'] if pd.notna(row['name']) else '',
'symbol': str(row['symbol']) if pd.notna(row['symbol']) else '',
'price': float(row['price']) if pd.notna(row['price']) else 0.0,
'volume': float(row['volume']) if pd.notna(row['volume']) else 0.0,
'market_value': float(row['market_value']) if pd.notna(row['market_value']) else 0.0,
'total_value': float(row['total_value']) if pd.notna(row['total_value']) else 0.0,
})
# 限制只返回前10个结果
if len(formatted_results) >= 10:
break
logger.info(f"基金搜索完成,找到 {len(formatted_results)} 个匹配项限制显示前10个")
return formatted_results
except Exception as e:
error_msg = f"搜索基金代码失败: {str(e)}"
logger.error(error_msg)
logger.exception(e)
raise Exception(error_msg)
async def _get_funds_data(self, market_type: str = 'ETF') -> pd.DataFrame:
"""
异步获取基金数据,支持缓存
Args:
market_type: 市场类型,'ETF''LOF'
Returns:
包含基金数据的DataFrame
"""
# 检查缓存是否有效
now = datetime.now()
cache_valid = (
self._cache_timestamp is not None and
(now - self._cache_timestamp) < self._cache_duration
)
if market_type == 'ETF' and cache_valid and self._etf_cache is not None:
logger.debug("使用ETF缓存数据")
return self._etf_cache
elif market_type == 'LOF' and cache_valid and self._lof_cache is not None:
logger.debug("使用LOF缓存数据")
return self._lof_cache
# 缓存无效,重新获取数据
try:
logger.debug(f"从API获取{market_type}数据")
# 使用线程池执行同步的akshare调用
if market_type == 'ETF':
df = await asyncio.to_thread(self._get_etf_data)
self._etf_cache = df
else:
df = await asyncio.to_thread(self._get_lof_data)
self._lof_cache = df
self._cache_timestamp = now
return df
except Exception as e:
logger.error(f"获取{market_type}数据失败: {str(e)}")
logger.exception(e)
raise
def _get_etf_data(self) -> pd.DataFrame:
"""
获取ETF数据同步方法将被异步方法调用
Returns:
包含ETF数据的DataFrame
"""
import akshare as ak
try:
# 获取ETF基金数据
df = ak.fund_etf_spot_em()
# 转换列名
df = df.rename(columns={
"代码": "symbol",
"名称": "name",
"最新价": "price",
"涨跌额": "price_change",
"涨跌幅": "price_change_percent",
"成交量": "volume",
"流通市值": "market_value",
"总市值": "total_value",
"基金折价率": "discount_rate",
})
return df
except Exception as e:
logger.error(f"获取ETF数据失败: {str(e)}")
logger.exception(e)
raise Exception(f"获取ETF数据失败: {str(e)}")
def _get_lof_data(self) -> pd.DataFrame:
"""
获取LOF数据同步方法将被异步方法调用
Returns:
包含LOF数据的DataFrame
"""
import akshare as ak
try:
# 获取LOF基金数据
df = ak.fund_lof_spot_em()
# 转换列名
df = df.rename(columns={
"代码": "symbol",
"名称": "name",
"最新价": "price",
"涨跌额": "price_change",
"涨跌幅": "price_change_percent",
"成交量": "volume",
"流通市值": "market_value",
"总市值": "total_value",
"基金折价率": "discount_rate",
})
return df
except Exception as e:
logger.error(f"获取LOF数据失败: {str(e)}")
logger.exception(e)
raise Exception(f"获取LOF数据失败: {str(e)}")
async def get_fund_detail(self, symbol: str, market_type: str = 'ETF') -> Dict[str, Any]:
"""
异步获取单个基金详细信息
Args:
symbol: 基金代码
market_type: 市场类型,'ETF''LOF'
Returns:
基金详细信息
"""
try:
logger.info(f"获取{market_type}基金详情: {symbol}")
# 获取基金数据
df = await self._get_funds_data(market_type)
# 精确匹配基金代码
result = df[df['symbol'] == symbol]
if len(result) == 0:
raise Exception(f"未找到基金代码: {symbol}")
# 获取第一行数据
row = result.iloc[0]
# 格式化为字典
fund_detail = {
'name': row['name'] if pd.notna(row['name']) else '',
'symbol': str(row['symbol']) if pd.notna(row['symbol']) else '',
'price': float(row['price']) if pd.notna(row['price']) else 0.0,
'price_change': float(row['price_change']) if pd.notna(row['price_change']) else 0.0,
'price_change_percent': float(row['price_change_percent'].strip('%'))/100 if pd.notna(row['price_change_percent']) else 0.0,
'volume': float(row['volume']) if pd.notna(row['volume']) else 0.0,
'market_value': float(row['market_value']) if pd.notna(row['market_value']) else 0.0,
'total_value': float(row['total_value']) if pd.notna(row['total_value']) else 0.0,
'discount_rate': float(row['discount_rate'].strip('%'))/100 if pd.notna(row['discount_rate']) else 0.0
}
logger.info(f"获取基金详情成功: {symbol}")
return fund_detail
except Exception as e:
error_msg = f"获取基金详情失败: {str(e)}"
logger.error(error_msg)
logger.exception(e)
raise Exception(error_msg)