ADD: 增加港股支持 增加Dockerfile

This commit is contained in:
兰志宏
2025-03-01 21:46:26 +08:00
parent b0180b25d5
commit 1029a7eb07
9 changed files with 297 additions and 1458 deletions

27
Dockerfile Normal file
View File

@@ -0,0 +1,27 @@
# 使用 Python 3.9 作为基础镜像
FROM python:3.10-slim
# 设置工作目录
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
libgl1-mesa-glx \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
# 复制项目文件
COPY . /app/
# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install akshare --upgrade -i https://pypi.org/simple
# 设置环境变量
ENV PYTHONPATH=/app
# 暴露端口(如果需要)
EXPOSE 8888
# 启动命令
CMD ["python", "web_server.py"]

218
gui.py
View File

@@ -1,218 +0,0 @@
import sys
from PyQt6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QHBoxLayout, QLineEdit, QPushButton, QTextBrowser,
QLabel, QTextEdit, QMessageBox, QProgressBar)
from PyQt6.QtCore import Qt, QThread, pyqtSignal
from PyQt6.QtGui import QFont
import markdown2
from stock_analyzer import StockAnalyzer # 导入股票分析器
class AnalysisWorker(QThread):
"""后台工作线程,用于执行分析任务"""
finished = pyqtSignal(dict)
error = pyqtSignal(str)
progress = pyqtSignal(int)
def __init__(self, analyzer, stock_code):
super().__init__()
self.analyzer = analyzer
self.stock_code = stock_code
def run(self):
try:
report = self.analyzer.analyze_stock(self.stock_code)
self.finished.emit(report)
except Exception as e:
self.error.emit(str(e))
class BatchAnalysisWorker(QThread):
"""后台工作线程,用于执行批量分析任务"""
finished = pyqtSignal(list)
error = pyqtSignal(str)
progress = pyqtSignal(int)
def __init__(self, analyzer, stock_list):
super().__init__()
self.analyzer = analyzer
self.stock_list = stock_list
def run(self):
try:
results = []
total = len(self.stock_list)
for i, stock_code in enumerate(self.stock_list):
report = self.analyzer.analyze_stock(stock_code)
results.append(report)
self.progress.emit(int((i + 1) / total * 100))
self.finished.emit(results)
except Exception as e:
self.error.emit(str(e))
class StockAnalyzerGUI(QMainWindow):
def __init__(self):
super().__init__()
self.analyzer = StockAnalyzer()
self.initUI()
def initUI(self):
self.setWindowTitle('股票分析系统')
self.setGeometry(100, 100, 1200, 800)
# 创建中央部件和布局
central_widget = QWidget()
self.setCentralWidget(central_widget)
layout = QVBoxLayout(central_widget)
# 创建输入区域
input_layout = QHBoxLayout()
# 单只股票分析部分
single_stock_layout = QVBoxLayout()
single_label = QLabel('单只股票分析:')
single_label.setFont(QFont('Arial', 12, QFont.Weight.Bold))
self.single_stock_input = QLineEdit()
self.single_stock_input.setPlaceholderText('输入股票代码600000')
self.single_stock_input.setFont(QFont('Arial', 10))
self.analyze_btn = QPushButton('分析')
self.analyze_btn.setFont(QFont('Arial', 10, QFont.Weight.Bold))
self.analyze_btn.clicked.connect(self.analyze_single_stock)
single_stock_layout.addWidget(single_label)
single_stock_layout.addWidget(self.single_stock_input)
single_stock_layout.addWidget(self.analyze_btn)
# 批量分析部分
batch_stock_layout = QVBoxLayout()
batch_label = QLabel('批量股票分析:')
batch_label.setFont(QFont('Arial', 12, QFont.Weight.Bold))
self.batch_stock_input = QTextEdit()
self.batch_stock_input.setPlaceholderText('输入多个股票代码,每行一个')
self.batch_stock_input.setFont(QFont('Arial', 10))
self.batch_stock_input.setMaximumHeight(100)
self.batch_analyze_btn = QPushButton('批量分析')
self.batch_analyze_btn.setFont(QFont('Arial', 10, QFont.Weight.Bold))
self.batch_analyze_btn.clicked.connect(self.analyze_multiple_stocks)
batch_stock_layout.addWidget(batch_label)
batch_stock_layout.addWidget(self.batch_stock_input)
batch_stock_layout.addWidget(self.batch_analyze_btn)
input_layout.addLayout(single_stock_layout)
input_layout.addLayout(batch_stock_layout)
# 添加进度条
self.progress_bar = QProgressBar()
self.progress_bar.setVisible(False)
# 添加结果显示区域
self.result_browser = QTextBrowser()
self.result_browser.setOpenExternalLinks(True)
self.result_browser.setFont(QFont('Arial', 10))
layout.addLayout(input_layout)
layout.addWidget(self.progress_bar)
layout.addWidget(self.result_browser)
def format_report(self, report, is_single=True):
"""将分析报告格式化为Markdown格式"""
md = f"""# 股票分析报告 - {report['stock_code']}
## 基本信息
- 分析日期:{report['analysis_date']}
- 当前价格:{report['price']:.2f}
- 价格变动:{report['price_change']:.2f}%
## 技术指标
- 均线趋势:{report['ma_trend']}
- RSI指标{report['rsi']:.2f}
- MACD信号{report['macd_signal']}
- 成交量状态:{report['volume_status']}
## 评分与建议
- 综合评分:{report['score']}
- 投资建议:{report['recommendation']}
## AI分析
{report['ai_analysis']}
---
"""
return md
def analyze_single_stock(self):
"""分析单只股票"""
stock_code = self.single_stock_input.text().strip()
if not stock_code:
QMessageBox.warning(self, '警告', '请输入股票代码')
return
self.analyze_btn.setEnabled(False)
self.progress_bar.setVisible(True)
self.progress_bar.setValue(0)
# 创建工作线程
self.worker = AnalysisWorker(self.analyzer, stock_code)
self.worker.finished.connect(self.handle_single_analysis_result)
self.worker.error.connect(self.handle_analysis_error)
self.worker.progress.connect(self.progress_bar.setValue)
self.worker.start()
def handle_single_analysis_result(self, report):
"""处理单只股票分析结果"""
markdown_text = self.format_report(report)
html_content = markdown2.markdown(markdown_text)
self.result_browser.setHtml(html_content)
self.analyze_btn.setEnabled(True)
self.progress_bar.setVisible(False)
def analyze_multiple_stocks(self):
"""批量分析股票"""
text = self.batch_stock_input.toPlainText().strip()
if not text:
QMessageBox.warning(self, '警告', '请输入股票代码')
return
stock_list = [code.strip() for code in text.split('\n') if code.strip()]
self.batch_analyze_btn.setEnabled(False)
self.progress_bar.setVisible(True)
self.progress_bar.setValue(0)
# 创建工作线程
self.batch_worker = BatchAnalysisWorker(self.analyzer, stock_list)
self.batch_worker.finished.connect(self.handle_batch_analysis_result)
self.batch_worker.error.connect(self.handle_analysis_error)
self.batch_worker.progress.connect(self.progress_bar.setValue)
self.batch_worker.start()
def handle_batch_analysis_result(self, recommendations):
"""处理批量分析结果"""
# 生成markdown格式的报告
markdown_text = "# 批量股票分析报告\n\n"
for rec in recommendations:
markdown_text += self.format_report(rec, False)
html_content = markdown2.markdown(markdown_text)
self.result_browser.setHtml(html_content)
self.batch_analyze_btn.setEnabled(True)
self.progress_bar.setVisible(False)
def handle_analysis_error(self, error_message):
"""处理分析错误"""
QMessageBox.critical(self, '错误', f'分析过程中出现错误:{error_message}')
self.analyze_btn.setEnabled(True)
self.batch_analyze_btn.setEnabled(True)
self.progress_bar.setVisible(False)
def main():
app = QApplication(sys.argv)
# 设置应用程序样式
app.setStyle('Fusion')
# 创建并显示主窗口
window = StockAnalyzerGUI()
window.show()
sys.exit(app.exec())
if __name__ == '__main__':
main()

497
gui2.py
View File

@@ -1,497 +0,0 @@
import sys
from PyQt6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QHBoxLayout, QLineEdit, QPushButton, QTextBrowser,
QLabel, QTextEdit, QMessageBox, QProgressBar,
QFrame, QSizePolicy)
from PyQt6.QtCore import Qt, QThread, pyqtSignal
from PyQt6.QtGui import QFont, QPalette, QColor
import markdown2
from stock_analyzer import StockAnalyzer
class ModernFrame(QFrame):
"""现代化的面板组件"""
def __init__(self, parent=None):
super().__init__(parent)
self.setFrameStyle(QFrame.Shape.StyledPanel | QFrame.Shadow.Raised)
self.setStyleSheet("""
ModernFrame {
background-color: #ffffff;
border-radius: 10px;
border: 1px solid #e0e0e0;
}
""")
class ModernButton(QPushButton):
"""现代化的按钮组件"""
def __init__(self, text, parent=None, primary=True):
super().__init__(text, parent)
self.setMinimumHeight(40)
if primary:
self.setStyleSheet("""
QPushButton {
background-color: #1a73e8;
color: white;
border: none;
border-radius: 5px;
padding: 8px 16px;
font-weight: bold;
}
QPushButton:hover {
background-color: #1557b0;
}
QPushButton:pressed {
background-color: #0d47a1;
}
QPushButton:disabled {
background-color: #cccccc;
}
""")
else:
self.setStyleSheet("""
QPushButton {
background-color: #f8f9fa;
color: #1a73e8;
border: 1px solid #dadce0;
border-radius: 5px;
padding: 8px 16px;
font-weight: bold;
}
QPushButton:hover {
background-color: #f1f3f4;
border-color: #d2e3fc;
}
QPushButton:pressed {
background-color: #e8eaed;
}
QPushButton:disabled {
color: #5f6368;
border-color: #e0e0e0;
}
""")
class ModernLineEdit(QLineEdit):
"""现代化的输入框组件"""
def __init__(self, parent=None):
super().__init__(parent)
self.setMinimumHeight(40)
self.setStyleSheet("""
QLineEdit {
border: 2px solid #e0e0e0;
border-radius: 5px;
padding: 8px 12px;
background-color: white;
selection-background-color: #cce0ff;
}
QLineEdit:focus {
border-color: #1a73e8;
}
QLineEdit:hover {
border-color: #999999;
}
""")
class ModernTextEdit(QTextEdit):
"""现代化的多行文本输入框组件"""
def __init__(self, parent=None):
super().__init__(parent)
self.setStyleSheet("""
QTextEdit {
border: 2px solid #e0e0e0;
border-radius: 5px;
padding: 8px;
background-color: white;
selection-background-color: #cce0ff;
}
QTextEdit:focus {
border-color: #1a73e8;
}
QTextEdit:hover {
border-color: #999999;
}
""")
class ModernProgressBar(QProgressBar):
"""现代化的进度条组件"""
def __init__(self, parent=None):
super().__init__(parent)
self.setStyleSheet("""
QProgressBar {
border: none;
border-radius: 3px;
background-color: #f0f0f0;
height: 6px;
text-align: center;
}
QProgressBar::chunk {
background-color: #1a73e8;
border-radius: 3px;
}
""")
self.setTextVisible(False)
class AnalysisWorker(QThread):
"""后台工作线程,用于执行分析任务"""
finished = pyqtSignal(dict)
error = pyqtSignal(str)
progress = pyqtSignal(int)
def __init__(self, analyzer, stock_code):
super().__init__()
self.analyzer = analyzer
self.stock_code = stock_code
def run(self):
try:
report = self.analyzer.analyze_stock(self.stock_code)
self.finished.emit(report)
except Exception as e:
self.error.emit(str(e))
class BatchAnalysisWorker(QThread):
"""后台工作线程,用于执行批量分析任务"""
finished = pyqtSignal(list)
error = pyqtSignal(str)
progress = pyqtSignal(int)
def __init__(self, analyzer, stock_list):
super().__init__()
self.analyzer = analyzer
self.stock_list = stock_list
def run(self):
try:
results = []
total = len(self.stock_list)
for i, stock_code in enumerate(self.stock_list):
report = self.analyzer.analyze_stock(stock_code)
results.append(report)
self.progress.emit(int((i + 1) / total * 100))
self.finished.emit(results)
except Exception as e:
self.error.emit(str(e))
class ModernStockAnalyzerGUI(QMainWindow):
def __init__(self):
super().__init__()
self.analyzer = StockAnalyzer()
self.init_ui()
self.adjust_size_and_position()
def adjust_size_and_position(self):
"""调整窗口大小和位置以适应不同分辨率"""
screen = QApplication.primaryScreen()
if screen:
geometry = screen.availableGeometry()
# 设置窗口大小为屏幕的75%
width = int(geometry.width() * 0.75)
height = int(geometry.height() * 0.75)
self.resize(width, height)
# 居中显示
center = geometry.center()
frame = self.frameGeometry()
frame.moveCenter(center)
self.move(frame.topLeft())
def init_ui(self):
self.setWindowTitle('现代股票分析系统')
self.setStyleSheet("""
QMainWindow {
background-color: #f8f9fa;
}
""")
# 创建中央部件和主布局
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QVBoxLayout(central_widget)
main_layout.setSpacing(20)
main_layout.setContentsMargins(20, 20, 20, 20)
# 创建标题
title_label = QLabel('股票分析系统')
title_label.setStyleSheet("""
QLabel {
color: #202124;
font-size: 24px;
font-weight: bold;
margin-bottom: 20px;
}
""")
main_layout.addWidget(title_label, alignment=Qt.AlignmentFlag.AlignTop)
# 创建输入区域容器
input_container = ModernFrame()
input_layout = QHBoxLayout(input_container)
input_layout.setSpacing(20)
input_layout.setContentsMargins(20, 20, 20, 20)
# 单只股票分析部分
single_stock_frame = self.create_single_stock_section()
input_layout.addWidget(single_stock_frame)
# 分隔线
separator = QFrame()
separator.setFrameShape(QFrame.Shape.VLine)
separator.setStyleSheet("background-color: #e0e0e0;")
input_layout.addWidget(separator)
# 批量分析部分
batch_stock_frame = self.create_batch_stock_section()
input_layout.addWidget(batch_stock_frame)
main_layout.addWidget(input_container)
# 进度条
self.progress_bar = ModernProgressBar()
self.progress_bar.setVisible(False)
main_layout.addWidget(self.progress_bar)
# 结果显示区域
result_frame = ModernFrame()
result_layout = QVBoxLayout(result_frame)
result_layout.setContentsMargins(15, 15, 15, 15)
result_label = QLabel('分析结果')
result_label.setStyleSheet("""
QLabel {
color: #202124;
font-size: 18px;
font-weight: bold;
}
""")
result_layout.addWidget(result_label)
self.result_browser = QTextBrowser()
self.result_browser.setOpenExternalLinks(True)
self.result_browser.setStyleSheet("""
QTextBrowser {
border: none;
background-color: white;
font-size: 14px;
line-height: 1.5;
}
""")
result_layout.addWidget(self.result_browser)
main_layout.addWidget(result_frame)
def create_single_stock_section(self):
"""创建单只股票分析部分"""
frame = QFrame()
layout = QVBoxLayout(frame)
layout.setSpacing(15)
label = QLabel('单只股票分析')
label.setStyleSheet("""
QLabel {
color: #202124;
font-size: 16px;
font-weight: bold;
}
""")
layout.addWidget(label)
self.single_stock_input = ModernLineEdit()
self.single_stock_input.setPlaceholderText('输入股票代码600000')
layout.addWidget(self.single_stock_input)
self.analyze_btn = ModernButton('分析')
self.analyze_btn.clicked.connect(self.analyze_single_stock)
layout.addWidget(self.analyze_btn)
layout.addStretch()
return frame
def create_batch_stock_section(self):
"""创建批量分析部分"""
frame = QFrame()
layout = QVBoxLayout(frame)
layout.setSpacing(15)
label = QLabel('批量股票分析')
label.setStyleSheet("""
QLabel {
color: #202124;
font-size: 16px;
font-weight: bold;
}
""")
layout.addWidget(label)
self.batch_stock_input = ModernTextEdit()
self.batch_stock_input.setPlaceholderText('输入多个股票代码,每行一个')
self.batch_stock_input.setMinimumHeight(100)
layout.addWidget(self.batch_stock_input)
self.batch_analyze_btn = ModernButton('批量分析')
self.batch_analyze_btn.clicked.connect(self.analyze_multiple_stocks)
layout.addWidget(self.batch_analyze_btn)
layout.addStretch()
return frame
def format_report(self, report, is_single=True):
"""将分析报告格式化为现代化的Markdown格式"""
md = f"""# 股票分析报告 - {report['stock_code']}
## 基本信息
- **分析日期:** {report['analysis_date']}
- **当前价格:** ¥{report['price']:.2f}
- **价格变动:** {report['price_change']:.2f}%
## 技术指标
- **均线趋势:** {report['ma_trend']}
- **RSI指标** {report['rsi']:.2f}
- **MACD信号** {report['macd_signal']}
- **成交量状态:** {report['volume_status']}
## 评分与建议
- **综合评分:** {report['score']}
- **投资建议:** {report['recommendation']}
## AI分析
{report['ai_analysis']}
---
"""
return md
def analyze_single_stock(self):
"""分析单只股票"""
stock_code = self.single_stock_input.text().strip()
if not stock_code:
self.show_warning('请输入股票代码')
return
self.analyze_btn.setEnabled(False)
self.progress_bar.setVisible(True)
self.progress_bar.setValue(0)
self.worker = AnalysisWorker(self.analyzer, stock_code)
self.worker.finished.connect(self.handle_single_analysis_result)
self.worker.error.connect(self.handle_analysis_error)
self.worker.progress.connect(self.progress_bar.setValue)
self.worker.start()
def analyze_multiple_stocks(self):
"""批量分析股票"""
text = self.batch_stock_input.toPlainText().strip()
if not text:
self.show_warning('请输入股票代码')
return
stock_list = [code.strip() for code in text.split('\n') if code.strip()]
self.batch_analyze_btn.setEnabled(False)
self.progress_bar.setVisible(True)
self.progress_bar.setValue(0)
self.batch_worker = BatchAnalysisWorker(self.analyzer, stock_list)
self.batch_worker.finished.connect(self.handle_batch_analysis_result)
self.batch_worker.error.connect(self.handle_analysis_error)
self.batch_worker.progress.connect(self.progress_bar.setValue)
self.batch_worker.start()
def handle_single_analysis_result(self, report):
"""处理单只股票分析结果"""
markdown_text = self.format_report(report)
html_content = markdown2.markdown(markdown_text, extras=['tables', 'fenced-code-blocks'])
self.result_browser.setHtml(html_content)
self.analyze_btn.setEnabled(True)
self.progress_bar.setVisible(False)
def handle_batch_analysis_result(self, recommendations):
"""处理批量分析结果"""
markdown_text = "# 批量股票分析报告\n\n"
for rec in recommendations:
markdown_text += self.format_report(rec, False)
html_content = markdown2.markdown(markdown_text, extras=['tables', 'fenced-code-blocks'])
self.result_browser.setHtml(html_content)
self.batch_analyze_btn.setEnabled(True)
self.progress_bar.setVisible(False)
def handle_analysis_error(self, error_message):
"""处理分析错误"""
self.show_error(f'分析过程中出现错误:{error_message}')
self.analyze_btn.setEnabled(True)
self.batch_analyze_btn.setEnabled(True)
self.progress_bar.setVisible(False)
def show_warning(self, message):
"""显示警告对话框"""
warning = QMessageBox(self)
warning.setIcon(QMessageBox.Icon.Warning)
warning.setWindowTitle('警告')
warning.setText(message)
warning.setStandardButtons(QMessageBox.StandardButton.Ok)
warning.setStyleSheet("""
QMessageBox {
background-color: white;
}
QMessageBox QLabel {
color: #202124;
min-width: 200px;
}
QPushButton {
background-color: #1a73e8;
color: white;
border: none;
border-radius: 4px;
padding: 8px 16px;
min-width: 80px;
}
QPushButton:hover {
background-color: #1557b0;
}
QPushButton:pressed {
background-color: #0d47a1;
}
""")
warning.exec()
def show_error(self, message):
"""显示错误对话框"""
error = QMessageBox(self)
error.setIcon(QMessageBox.Icon.Critical)
error.setWindowTitle('错误')
error.setText(message)
error.setStandardButtons(QMessageBox.StandardButton.Ok)
error.setStyleSheet("""
QMessageBox {
background-color: white;
}
QMessageBox QLabel {
color: #202124;
min-width: 200px;
}
QPushButton {
background-color: #1a73e8;
color: white;
border: none;
border-radius: 4px;
padding: 8px 16px;
min-width: 80px;
}
QPushButton:hover {
background-color: #1557b0;
}
QPushButton:pressed {
background-color: #0d47a1;
}
""")
error.exec()
def main():
app = QApplication(sys.argv)
app.setStyle('Fusion')
# 创建并显示主窗口
window = ModernStockAnalyzerGUI()
window.show()
sys.exit(app.exec())
if __name__ == '__main__':
main()

View File

@@ -1,5 +1,5 @@
# 基础科学计算和数据处理库 # 基础科学计算和数据处理库
numpy==2.0.0 numpy==2.1.2
pandas==2.2.2 pandas==2.2.2
scipy==1.15.1 scipy==1.15.1
@@ -7,13 +7,11 @@ scipy==1.15.1
akshare==1.15.87 akshare==1.15.87
tqdm==4.67.1 tqdm==4.67.1
# GUI库
PyQt6==6.8.1
markdown2==2.5.3
# 网络和API请求 # 网络和API请求
requests==2.32.3 requests==2.32.3
python-dotenv==1.0.1 python-dotenv==1.0.1
flask==3.1.0
# 日志和系统工具 # 日志和系统工具
loguru==0.7.2 loguru==0.7.2

View File

@@ -18,7 +18,7 @@ class StockAnalyzer:
load_dotenv() load_dotenv()
# 设置 Gemini API # 设置 Gemini API
self.gemini_api_url = "https://api.xxx.xxx" self.gemini_api_url = os.getenv('GEMINI_API_URL')
self.gemini_api_key = os.getenv('GEMINI_API_KEY') self.gemini_api_key = os.getenv('GEMINI_API_KEY')
# 配置参数 # 配置参数
@@ -31,7 +31,14 @@ class StockAnalyzer:
'atr_period': 14 'atr_period': 14
} }
def get_stock_data(self, stock_code, start_date=None, end_date=None): # 添加市场类型枚举
self.MARKET_TYPES = {
'A': 'A股',
'HK': '港股',
'CRYPTO': '加密货币'
}
def get_stock_data(self, stock_code, market_type='A', start_date=None, end_date=None, ):
"""获取股票数据""" """获取股票数据"""
import akshare as ak import akshare as ak
@@ -41,11 +48,33 @@ class StockAnalyzer:
end_date = datetime.now().strftime('%Y%m%d') end_date = datetime.now().strftime('%Y%m%d')
try: try:
# 使用 akshare 获取股票数据 # 根据市场类型获取数据
df = ak.stock_zh_a_hist(symbol=stock_code, if market_type == 'A':
start_date=start_date, df = ak.stock_zh_a_hist(
end_date=end_date, symbol=stock_code,
adjust="qfq") start_date=start_date,
end_date=end_date,
adjust="qfq"
)
# A股数据列名映射
elif market_type == 'HK':
df = ak.stock_hk_daily(
symbol=stock_code,
adjust="qfq"
)
elif market_type == 'US':
df = ak.stock_us_hist(
symbol=stock_code,
start_date=start_date,
end_date=end_date,
adjust="qfq"
)
elif market_type == 'CRYPTO':
df = ak.crypto_js_spot(
symbol=stock_code
)
else:
raise ValueError(f"不支持的市场类型: {market_type}")
# 重命名列名以匹配分析需求 # 重命名列名以匹配分析需求
df = df.rename(columns={ df = df.rename(columns={
@@ -225,7 +254,7 @@ class StockAnalyzer:
} }
data = { data = {
"model": "gemini-1.5-flash", "model": os.getenv('GEMINI_API_MODEL'),
"messages": [{"role": "user", "content": prompt}] "messages": [{"role": "user", "content": prompt}]
} }
@@ -233,8 +262,11 @@ class StockAnalyzer:
f"{self.gemini_api_url}/v1/chat/completions", f"{self.gemini_api_url}/v1/chat/completions",
headers=headers, headers=headers,
json=data, json=data,
timeout=10 timeout=30
) )
print(headers)
print(data)
print(response.json())
if response.status_code == 200: if response.status_code == 200:
return response.json()['choices'][0]['message']['content'] return response.json()['choices'][0]['message']['content']
@@ -258,11 +290,11 @@ class StockAnalyzer:
else: else:
return '强烈建议卖出' return '强烈建议卖出'
def analyze_stock(self, stock_code): def analyze_stock(self, stock_code, market_type='A'):
"""分析单个股票""" """分析单个股票"""
try: try:
# 获取股票数据 # 获取股票数据
df = self.get_stock_data(stock_code) df = self.get_stock_data(stock_code, market_type)
# 计算技术指标 # 计算技术指标
df = self.calculate_indicators(df) df = self.calculate_indicators(df)
@@ -295,13 +327,13 @@ class StockAnalyzer:
self.logger.error(f"分析股票时出错: {str(e)}") self.logger.error(f"分析股票时出错: {str(e)}")
raise raise
def scan_market(self, stock_list, min_score=60): def scan_market(self, stock_list, min_score=60, market_type='A'):
"""扫描市场,寻找符合条件的股票""" """扫描市场,寻找符合条件的股票"""
recommendations = [] recommendations = []
for stock_code in stock_list: for stock_code in stock_list:
try: try:
report = self.analyze_stock(stock_code) report = self.analyze_stock(stock_code, market_type)
if report['score'] >= min_score: if report['score'] >= min_score:
recommendations.append(report) recommendations.append(report)
except Exception as e: except Exception as e:

View File

@@ -10,130 +10,206 @@
<div class="container mx-auto px-4 py-8"> <div class="container mx-auto px-4 py-8">
<h1 class="text-3xl font-bold mb-8 text-center">股票分析系统</h1> <h1 class="text-3xl font-bold mb-8 text-center">股票分析系统</h1>
<div class="grid grid-cols-1 md:grid-cols-2 gap-8"> <div class="max-w-4xl mx-auto"> <!-- 将 max-w-2xl 改为 max-w-4xl -->
<!-- 单只股票分析 -->
<div class="bg-white p-6 rounded-lg shadow-md">
<h2 class="text-xl font-semibold mb-4">单只股票分析</h2>
<div class="mb-4">
<input type="text" id="singleStock"
class="w-full p-2 border rounded"
placeholder="输入股票代码600000">
</div>
<button onclick="analyzeSingleStock()"
class="w-full bg-blue-600 text-white py-2 rounded hover:bg-blue-700">
分析
</button>
</div>
<!-- 批量分析 --> <!-- 批量分析 -->
<div class="bg-white p-6 rounded-lg shadow-md"> <div class="bg-white p-6 rounded-lg shadow-md">
<h2 class="text-xl font-semibold mb-4">批量股票分析</h2> <h2 class="text-xl font-semibold mb-4">股票批量分析</h2>
<!-- 添加市场类型选择 -->
<div class="mb-4"> <div class="mb-4">
<label for="marketType" class="block text-sm font-medium text-gray-700 mb-2">
选择市场类型
</label>
<select id="marketType"
class="w-full p-2 border rounded bg-white focus:ring-2 focus:ring-blue-500 focus:border-blue-500">
<option value="A">A股</option>
<option value="HK">港股</option>
<!-- <option value="US">美股</option> -->
<!-- <option value="CRYPTO">加密货币</option> -->
</select>
</div>
<div class="mb-4">
<label for="batchStocks" class="block text-sm font-medium text-gray-700 mb-2">
输入代码
</label>
<textarea id="batchStocks" <textarea id="batchStocks"
class="w-full p-2 border rounded h-32" class="w-full p-2 border rounded h-32"
placeholder="输入多个股票代码,每行一个"></textarea> placeholder="输入代码,支持多个代码(用回车或逗号分隔)"></textarea>
</div> </div>
<button onclick="analyzeBatchStocks()"
class="w-full bg-blue-600 text-white py-2 rounded hover:bg-blue-700"> <button id="analyzeBtn" onclick="analyzeStocks()"
批量分析 class="w-full bg-blue-600 text-white py-2 rounded hover:bg-blue-700 flex items-center justify-center">
<span>开始分析</span>
<div id="loadingSpinner" class="hidden ml-2">
<svg class="animate-spin h-5 w-5 text-white" xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24">
<circle class="opacity-25" cx="12" cy="12" r="10" stroke="currentColor" stroke-width="4"></circle>
<path class="opacity-75" fill="currentColor" d="M4 12a8 8 0 018-8V0C5.373 0 0 5.373 0 12h4zm2 5.291A7.962 7.962 0 014 12H0c0 3.042 1.135 5.824 3 7.938l3-2.647z"></path>
</svg>
</div>
</button> </button>
</div> </div>
</div>
<!-- 结果展示 --> <!-- 结果展示 -->
<div id="results" class="mt-8 bg-white p-6 rounded-lg shadow-md"> <div id="results" class="mt-8">
<h2 class="text-xl font-semibold mb-4">分析结果</h2> <h2 class="text-2xl font-bold mb-6 text-gray-800">分析结果</h2>
<div id="resultContent" class="prose"></div> <div id="resultContent" class="space-y-8"></div>
</div> </div>
</div>
<script> <script>
async function analyzeSingleStock() { let isAnalyzing = false;
const stockCode = document.getElementById('singleStock').value.trim();
if (!stockCode) { async function analyzeStocks() {
alert('请输入股票代码'); if (isAnalyzing) return; // 防止重复点击
const stockInput = document.getElementById('batchStocks').value.trim();
const marketType = document.getElementById('marketType').value;
const analyzeBtn = document.getElementById('analyzeBtn');
const loadingSpinner = document.getElementById('loadingSpinner');
if (!stockInput) {
alert('请输入代码');
return; return;
} }
const stockCodes = stockInput.split(/[\n,]/)
.map(code => code.trim())
.filter(code => code.length > 0);
try { try {
const response = await fetch('/api/analyze', { isAnalyzing = true;
analyzeBtn.disabled = true;
loadingSpinner.classList.remove('hidden');
analyzeBtn.querySelector('span').textContent = '分析中...';
const response = await fetch('/analyze', {
method: 'POST', method: 'POST',
headers: { headers: {
'Content-Type': 'application/json', 'Content-Type': 'application/json',
}, },
body: JSON.stringify({ stock_code: stockCode }) body: JSON.stringify({
stock_codes: stockCodes,
market_type: marketType // 添加市场类型参数
})
}); });
const result = await response.json(); const data = await response.json();
if (response.ok) {
displayResults([result]); if (!response.ok) {
} else { throw new Error(data.error || '分析失败');
alert(result.error || '分析失败');
}
} catch (error) {
alert('请求失败: ' + error.message);
}
}
async function analyzeBatchStocks() {
const stocksText = document.getElementById('batchStocks').value.trim();
if (!stocksText) {
alert('请输入股票代码');
return;
}
const stockList = stocksText.split('\n').map(s => s.trim()).filter(s => s);
try {
const response = await fetch('/api/batch-analyze', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ stock_list: stockList })
});
const results = await response.json();
if (response.ok) {
displayResults(results);
} else {
alert(results.error || '分析失败');
} }
const results = Array.isArray(data.results) ? data.results : [data];
displayResults(results);
} catch (error) { } catch (error) {
alert('请求失败: ' + error.message); alert('请求失败: ' + error.message);
document.getElementById('resultContent').innerHTML = `
<div class="p-4 bg-red-50 text-red-600 rounded">
分析出错:${error.message}
</div>
`;
} finally {
isAnalyzing = false;
analyzeBtn.disabled = false;
loadingSpinner.classList.add('hidden');
analyzeBtn.querySelector('span').textContent = '开始分析';
} }
} }
function displayResults(results) { function displayResults(results) {
const resultContent = document.getElementById('resultContent'); const resultContent = document.getElementById('resultContent');
let html = ''; if (!results || results.length === 0) {
resultContent.innerHTML = '<div class="p-6 bg-yellow-50 text-yellow-600 rounded-lg text-center">没有分析结果</div>';
return;
}
let html = '';
results.forEach(result => { results.forEach(result => {
html += ` html += `
<div class="mb-8 p-4 border rounded"> <div class="bg-white rounded-lg shadow-lg overflow-hidden">
<h3 class="text-lg font-semibold mb-2">股票代码: ${result.stock_code}</h3> <!-- 头部信息 -->
<div class="grid grid-cols-2 gap-4"> <div class="bg-gradient-to-r from-blue-600 to-blue-700 px-6 py-4">
<div> <h3 class="text-xl font-bold text-white">
<p><strong>分析日期:</strong> ${result.analysis_date}</p> ${result.stock_code}
<p><strong>当前价格:</strong> ¥${result.price.toFixed(2)}</p> </h3>
<p><strong>价格变动:</strong> ${result.price_change.toFixed(2)}%</p>
</div>
<div>
<p><strong>综合评分:</strong> ${result.score}分</p>
<p><strong>投资建议:</strong> ${result.recommendation}</p>
<p><strong>RSI指标:</strong> ${result.rsi.toFixed(2)}</p>
</div>
</div> </div>
<div class="mt-4">
<h4 class="font-semibold mb-2">AI分析:</h4> <!-- 主要指标 -->
<p class="whitespace-pre-line">${result.ai_analysis}</p> <div class="p-6">
<div class="grid grid-cols-2 gap-6 mb-6">
<div class="space-y-3">
<div class="flex justify-between items-center p-3 bg-gray-50 rounded">
<span class="text-gray-600">分析时间</span>
<span class="font-medium">${result.analysis_date}</span>
</div>
<div class="flex justify-between items-center p-3 bg-gray-50 rounded">
<span class="text-gray-600">当前价格</span>
<span class="font-medium">¥${result.price.toFixed(2)}</span>
</div>
<div class="flex justify-between items-center p-3 bg-gray-50 rounded">
<span class="text-gray-600">价格变动</span>
<span class="font-medium ${result.price_change >= 0 ? 'text-red-500' : 'text-green-500'}">
${result.price_change.toFixed(2)}%
</span>
</div>
</div>
<div class="space-y-3">
<div class="flex justify-between items-center p-3 bg-gray-50 rounded">
<span class="text-gray-600">综合评分</span>
<span class="font-medium text-blue-600">${result.score}分</span>
</div>
<div class="flex justify-between items-center p-3 bg-gray-50 rounded">
<span class="text-gray-600">投资建议</span>
<span class="font-medium text-purple-600">${result.recommendation}</span>
</div>
<div class="flex justify-between items-center p-3 bg-gray-50 rounded">
<span class="text-gray-600">RSI指标</span>
<span class="font-medium">${result.rsi.toFixed(2)}</span>
</div>
</div>
</div>
<!-- AI分析部分 -->
<div class="mt-6">
<h4 class="text-lg font-semibold text-gray-800 mb-3">AI分析</h4>
<div class="prose prose-blue max-w-none bg-gray-50 p-4 rounded-lg">
${marked.parse(result.ai_analysis)}
</div>
</div>
<!-- 免责声明 -->
<div class="mt-6 border-t border-gray-100 pt-4">
<div class="bg-blue-50 p-4 rounded-lg">
<p class="text-sm text-blue-800 font-semibold mb-1">声明:</p>
<p class="text-sm text-blue-600">本分析仅基于技术指标和历史数据,不构成投资建议。股市有风险,投资需谨慎。</p>
</div>
</div>
</div> </div>
</div> </div>
`; `;
}); });
resultContent.innerHTML = html; resultContent.innerHTML = html;
// 添加 Markdown 样式
const style = document.createElement('style');
style.textContent = `
.prose h1 { font-size: 1.5em; margin-top: 1em; margin-bottom: 0.5em; font-weight: bold; }
.prose h2 { font-size: 1.3em; margin-top: 1em; margin-bottom: 0.5em; font-weight: bold; }
.prose h3 { font-size: 1.1em; margin-top: 1em; margin-bottom: 0.5em; font-weight: bold; }
.prose p { margin-bottom: 1em; line-height: 1.6; }
.prose ul { list-style-type: disc; padding-left: 1.5em; margin-bottom: 1em; }
.prose ol { list-style-type: decimal; padding-left: 1.5em; margin-bottom: 1em; }
.prose li { margin-bottom: 0.5em; }
.prose strong { font-weight: 600; color: #1a56db; }
.prose em { font-style: italic; }
.prose blockquote { border-left: 4px solid #e5e7eb; padding-left: 1em; margin: 1em 0; color: #4b5563; }
.prose code { background-color: #f3f4f6; padding: 0.2em 0.4em; border-radius: 0.25em; font-size: 0.9em; }
`;
document.head.appendChild(style);
} }
</script> </script>
<!-- 添加 marked.js 用于解析 Markdown -->
<script src="https://cdn.jsdelivr.net/npm/marked/marked.min.js"></script>
</body> </body>
</html> </html>

View File

@@ -1,39 +0,0 @@
from flask import Flask, request, jsonify, render_template
from stock_analyzer import StockAnalyzer
import os
app = Flask(__name__)
analyzer = StockAnalyzer()
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/analyze', methods=['POST'])
def analyze():
try:
data = request.json
stock_code = data.get('stock_code')
if not stock_code:
return jsonify({'error': '请提供股票代码'}), 400
result = analyzer.analyze_stock(stock_code)
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/batch-analyze', methods=['POST'])
def batch_analyze():
try:
data = request.json
stock_list = data.get('stock_list', [])
if not stock_list:
return jsonify({'error': '请提供股票代码列表'}), 400
results = analyzer.scan_market(stock_list)
return jsonify(results)
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8443)

62
web_server.py Normal file
View File

@@ -0,0 +1,62 @@
from flask import Flask, render_template, request, jsonify
from stock_analyzer import StockAnalyzer
from us_stock_service import USStockService
import threading
import logging
from logging.handlers import RotatingFileHandler
import traceback
app = Flask(__name__)
analyzer = StockAnalyzer()
us_stock_service = USStockService()
# 配置日志
logging.basicConfig(level=logging.INFO)
handler = RotatingFileHandler('flask_app.log', maxBytes=10000000, backupCount=5)
handler.setFormatter(logging.Formatter(
'[%(asctime)s] %(levelname)s in %(module)s: %(message)s'
))
app.logger.addHandler(handler)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/analyze', methods=['POST'])
def analyze():
try:
data = request.json
stock_codes = data.get('stock_codes', [])
market_type = data.get('market_type', 'A')
if not stock_codes:
return jsonify({'error': '请输入代码'}), 400
results = []
for stock_code in stock_codes:
result = analyzer.analyze_stock(stock_code.strip(), market_type)
results.append(result)
return jsonify({'results': results})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/search_us_stocks', methods=['GET'])
def search_us_stocks():
try:
keyword = request.args.get('keyword', '')
if not keyword:
return jsonify({'error': '请输入搜索关键词'}), 400
results = us_stock_service.search_us_stocks(keyword)
return jsonify({'results': results})
except Exception as e:
app.logger.error(f"搜索美股代码时出错: {str(e)}")
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
# 将 host 设置为 '0.0.0.0' 使其支持所有网络接口访问
app.run(host='0.0.0.0', port=8888, debug=True)

View File

@@ -1,602 +0,0 @@
""""
Stock Analysis System
优化后的全盘股票技术分析系统——用于A股市场股票的全面分析已加速分析并增加额外指标。
"""
import os
import time
import random
import logging
import traceback
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List, Optional, Tuple, Union
from dataclasses import dataclass
import numpy as np
import pandas as pd
import akshare as ak
from tqdm import tqdm
# -------------------------------
# **技术指标配置**
# -------------------------------
@dataclass
class TechnicalParams:
"""技术指标参数配置"""
ma_periods: Dict[str, int]
rsi_period: int
bollinger_period: int
bollinger_std: int
volume_ma_period: int
atr_period: int
@classmethod
def default(cls) -> 'TechnicalParams':
"""返回默认的技术指标参数"""
return cls(
ma_periods={'short': 5, 'medium': 20, 'long': 60},
rsi_period=14,
bollinger_period=20,
bollinger_std=2,
volume_ma_period=20,
atr_period=14
)
# -------------------------------
# **股票分析引擎**
# -------------------------------
class StockAnalyzer:
"""股票分析引擎,计算各类技术指标"""
def __init__(self, params: Optional[TechnicalParams] = None):
"""
初始化股票分析引擎
Args:
params: 技术指标配置参数
"""
self._setup_logging()
self.params = params or TechnicalParams.default()
def _setup_logging(self) -> None:
"""配置日志记录"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
self.logger = logging.getLogger(__name__)
def get_stock_data(self, stock_code: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None) -> pd.DataFrame:
"""
获取单只股票历史数据,默认使用前一年的数据。
Args:
stock_code: 股票代码(可以带市场前缀或纯代码)
start_date: 开始日期(格式YYYYMMDD)
end_date: 结束日期(格式YYYYMMDD)
Returns:
包含日期、开盘、收盘、最高、最低、成交量的 DataFrame
"""
try:
if not start_date:
start_date = (datetime.now() - timedelta(days=365)).strftime('%Y%m%d')
if not end_date:
end_date = datetime.now().strftime('%Y%m%d')
code = stock_code[2:] if stock_code.startswith(('sz', 'sh')) else stock_code
df = ak.stock_zh_a_hist(
symbol=code,
start_date=start_date,
end_date=end_date,
adjust="qfq"
)
self.logger.info(f"获取到 {len(df)} 行数据,列名:{df.columns.tolist()}")
df = df.rename(columns={
"日期": "date",
"开盘": "open",
"收盘": "close",
"最高": "high",
"最低": "low",
"成交量": "volume",
"trade_date": "date"
})
required_columns = {'date', 'open', 'close', 'high', 'low', 'volume'}
missing_columns = required_columns - set(df.columns)
if missing_columns:
raise ValueError(f"缺失必须字段: {missing_columns}")
df['date'] = pd.to_datetime(df['date'], errors='coerce')
numeric_columns = ['open', 'close', 'high', 'low', 'volume']
df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, errors='coerce')
df_cleaned = df.dropna(subset=['date'] + numeric_columns).sort_values('date')
if len(df_cleaned) < 60:
raise ValueError(f"数据不足(仅 {len(df_cleaned)}无法计算至少60日均线")
return df_cleaned
except Exception as e:
self.logger.error(f"获取股票数据失败,股票代码 {stock_code},错误信息:{str(e)}")
raise ValueError(f"股票 {stock_code} 数据获取出错: {str(e)}")
@staticmethod
def calculate_ema(series: pd.Series, period: int) -> pd.Series:
"""计算 EMA"""
return series.ewm(span=period, adjust=False).mean()
@staticmethod
def calculate_rsi(series: pd.Series, period: int) -> pd.Series:
"""
计算 RSI指数加权移动平均法
"""
delta = series.diff()
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.ewm(com=period - 1, adjust=False).mean()
avg_loss = loss.ewm(com=period - 1, adjust=False).mean()
rs = avg_gain / (avg_loss + 1e-10)
return 100 - (100 / (1 + rs))
@staticmethod
def calculate_macd(series: pd.Series) -> Tuple[pd.Series, pd.Series, pd.Series]:
"""计算 MACD、信号线和直方图"""
exp1 = series.ewm(span=12, adjust=False).mean()
exp2 = series.ewm(span=26, adjust=False).mean()
macd = exp1 - exp2
signal = macd.ewm(span=9, adjust=False).mean()
return macd, signal, macd - signal
@staticmethod
def calculate_bollinger_bands(series: pd.Series, period: int, std_dev: int) -> Tuple[pd.Series, pd.Series, pd.Series]:
"""计算 Bollinger 通道"""
middle = series.rolling(window=period, min_periods=period).mean()
std = series.rolling(window=period, min_periods=period).std()
upper = middle + std * std_dev
lower = middle - std * std_dev
return upper, middle, lower
def calculate_atr(self, df: pd.DataFrame, period: int) -> pd.Series:
"""计算 ATR"""
high = df['high']
low = df['low']
prev_close = df['close'].shift(1)
tr = pd.concat([
high - low,
(high - prev_close).abs(),
(low - prev_close).abs()
], axis=1).max(axis=1)
return tr.rolling(window=period, min_periods=period).mean()
@staticmethod
def calculate_obv(series_close: pd.Series, series_volume: pd.Series) -> pd.Series:
"""计算 OBV能量潮指标"""
diff = series_close.diff().fillna(0)
obv = np.where(diff > 0, series_volume, np.where(diff < 0, -series_volume, 0))
return pd.Series(obv, index=series_close.index).cumsum()
@staticmethod
def calculate_stochastic(series_close: pd.Series, window: int = 14) -> Tuple[pd.Series, pd.Series]:
"""
计算随机指标Stochastic Oscillator
%K = (close - lowest_low) / (highest_high - lowest_low)*100
%D 为 %K 的3日简单移动平均
"""
lowest = series_close.rolling(window=window, min_periods=window).min()
highest = series_close.rolling(window=window, min_periods=window).max()
percentK = (series_close - lowest) / (highest - lowest + 1e-10) * 100
percentD = percentK.rolling(window=3, min_periods=3).mean()
return percentK, percentD
def calculate_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
"""计算所有技术指标,并增加 OBV 和随机指标"""
try:
# 移动均线
for key, period in self.params.ma_periods.items():
df[f'MA{period}'] = self.calculate_ema(df['close'], period)
df['RSI'] = self.calculate_rsi(df['close'], self.params.rsi_period)
df['MACD'], df['Signal'], df['MACD_hist'] = self.calculate_macd(df['close'])
df['BB_upper'], df['BB_middle'], df['BB_lower'] = self.calculate_bollinger_bands(
df['close'], self.params.bollinger_period, self.params.bollinger_std)
df['Volume_MA'] = df['volume'].rolling(window=self.params.volume_ma_period,
min_periods=self.params.volume_ma_period).mean()
df['Volume_Ratio'] = df['volume'] / (df['Volume_MA'] + 1e-10)
df['ATR'] = self.calculate_atr(df, self.params.atr_period)
df['Volatility'] = df['ATR'] / df['close'] * 100
df['ROC'] = df['close'].pct_change(periods=10) * 100
# 增加 OBV 指标
df['OBV'] = self.calculate_obv(df['close'], df['volume'])
df['OBV_MA10'] = df['OBV'].rolling(window=10, min_periods=10).mean()
# 增加随机指标 Stochastic
df['%K'], df['%D'] = self.calculate_stochastic(df['close'], window=14)
return df
except Exception as e:
self.logger.error(f"指标计算出错:{str(e)}")
raise
def calculate_score(self, df: pd.DataFrame) -> float:
"""
计算股票综合打分基本打分满分100分并根据 OBV 与随机指标调整±5分
使量化结果更全面。
基本打分逻辑不变:
趋势30分、RSI20分、MACD20分、成交量30分
附加指标调整:
OBVOBV > OBV_MA10+5分反之-5分
随机指标:%K < 20为超卖+5分%K > 80为超买-5分。
"""
try:
score = 0
latest = df.iloc[-1]
# 趋势打分
if latest['MA5'] > latest['MA20'] and latest['MA20'] > latest['MA60']:
score += 30
else:
if latest['MA5'] > latest['MA20']:
score += 15
if latest['MA20'] > latest['MA60']:
score += 15
# RSI 打分
if 30 <= latest['RSI'] <= 70:
score += 20
elif latest['RSI'] < 30:
score += 15
# MACD 打分
if latest['MACD'] > latest['Signal']:
score += 20
# 成交量打分
if latest['Volume_Ratio'] > 1.5:
score += 30
elif latest['Volume_Ratio'] > 1:
score += 15
# 附加 OBV 调整
if latest['OBV'] > latest['OBV_MA10']:
score += 5
else:
score -= 5
# 附加随机指标调整
if latest['%K'] < 20:
score += 5
elif latest['%K'] > 80:
score -= 5
return score
except Exception as e:
self.logger.error(f"计算打分失败:{str(e)}")
raise
@staticmethod
def get_recommendation(score: float) -> str:
"""根据最终打分给出投资建议"""
if score >= 80:
return '强烈推荐买入'
elif score >= 60:
return '建议买入'
elif score >= 40:
return '建议观望'
elif score >= 20:
return '建议卖出'
else:
return '强烈建议卖出'
def analyze_stock(self, stock_code: str) -> Dict:
"""针对单只股票执行完整的技术分析流程"""
try:
df = self.get_stock_data(stock_code)
df = self.calculate_indicators(df)
score = self.calculate_score(df)
latest = df.iloc[-1]
prev = df.iloc[-2]
return {
'stock_code': stock_code,
'analysis_date': datetime.now().strftime('%Y-%m-%d'),
'score': score,
'price': latest['close'],
'price_change': (latest['close'] - prev['close']) / prev['close'] * 100,
'ma_trend': 'UP' if latest['MA5'] > latest['MA20'] else 'DOWN',
'rsi': latest['RSI'],
'macd_signal': 'BUY' if latest['MACD'] > latest['Signal'] else 'SELL',
'volume_status': 'HIGH' if latest['Volume_Ratio'] > 1.5 else 'NORMAL',
'recommendation': self.get_recommendation(score)
}
except Exception as e:
self.logger.error(f"分析股票 {stock_code} 失败:{str(e)}")
raise
# -------------------------------
# **全盘股票扫描器**
# -------------------------------
class TopStockScanner:
"""全盘筛选高打分股票的扫描器"""
def __init__(self, max_workers: int = 20, min_score: float = 85):
"""
初始化扫描器
Args:
max_workers: 并发线程数量已增至20以加速分析
min_score: 高分最低阈值
"""
self.analyzer = StockAnalyzer()
self.max_workers = max_workers
self.min_score = min_score
self.logger = logging.getLogger(__name__)
def get_all_stocks(self) -> List[str]:
"""
获取所有上市 A 股股票代码(全盘版)。
使用 ak.stock_info_sh_name_code(symbol="主板A股") 与 ak.stock_info_sz_name_code(symbol="A股列表")
候选字段列表为:['A股代码', '证券代码', '股票代码', 'code'] 。
"""
try:
sh_df = ak.stock_info_sh_name_code(symbol="主板A股")
sz_df = ak.stock_info_sz_name_code(symbol="A股列表")
candidate_cols = ['A股代码', '证券代码', '股票代码', 'code']
def get_codes(df: pd.DataFrame) -> set:
for col in candidate_cols:
if col in df.columns:
return {str(code).zfill(6) for code in df[col]}
raise KeyError(f"未能找到股票代码字段,现有字段:{df.columns.tolist()}")
sh_codes = get_codes(sh_df)
sz_codes = get_codes(sz_df)
all_codes = sorted(sh_codes | sz_codes)
self.logger.info(f"完整股票列表获取到 {len(all_codes)} 支股票信息")
print(f"\n开始分析 {len(all_codes)} 支股票...")
return all_codes
except Exception as e:
self.logger.error(f"获取股票列表失败:{str(e)}")
raise
def analyze_stock_safe(self, stock_code: str, max_retries: int = 3) -> Optional[Dict]:
"""
安全分析单只股票(加入重试机制),数据异常则跳过。
"""
for attempt in range(max_retries):
try:
return self.analyzer.analyze_stock(stock_code)
except ValueError as e:
self.logger.warning(f"跳过股票 {stock_code}: {str(e)}")
return None
except Exception as e:
if attempt == max_retries - 1:
self.logger.error(f"股票 {stock_code} 分析尝试 {max_retries} 次后失败:{str(e)}")
return None
self.logger.warning(f"股票 {stock_code}{attempt+1} 次分析失败:{str(e)}")
time.sleep(random.uniform(2, 5))
def process_batch(self, stock_codes: List[str]) -> List[Dict]:
"""利用多线程并行处理一批股票的分析任务"""
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self.analyze_stock_safe, code): code for code in stock_codes}
for future in tqdm(futures, desc="分析进度", ncols=80):
try:
result = future.result()
if result is not None:
results.append(result)
except Exception as e:
stock = futures[future]
self.logger.error(f"处理股票 {stock} 时出错:{str(e)}")
return results
def save_intermediate_results(self, results: List[Dict]) -> None:
"""周期性保存中间结果,便于后续查看进度"""
try:
df = pd.DataFrame(results)
high_score_stocks = df[df['score'] >= self.min_score].sort_values('score', ascending=False)
output_lines = [
"=" * 80,
f"股票扫描中间结果 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
f"共分析 {len(results)} 支股票",
"=" * 80,
f"\n发现 {len(high_score_stocks)} 支高分股票(得分≥{self.min_score}"
]
for _, row in high_score_stocks.iterrows():
output_lines.extend([
f"\n股票代码: {row['stock_code']}",
f"得分: {row['score']:.1f} | 价格: ¥{row['price']:.2f} | 涨跌幅: {row['price_change']:.2f}%"
])
os.makedirs('scanner', exist_ok=True)
with open('scanner/temp_results.txt', 'w', encoding='utf-8') as f:
f.write('\n'.join(output_lines))
except Exception as e:
self.logger.error(f"保存中间结果失败:{str(e)}")
def get_high_score_stocks(self, batch_size: int = 20) -> List[Dict]:
"""扫描全盘股票,返回高打分结果列表"""
try:
all_stocks = self.get_all_stocks()
total_stocks = len(all_stocks)
print(f"\n开始扫描 {total_stocks} 支股票……")
results = []
total_batches = (total_stocks + batch_size - 1) // batch_size
for i in range(0, total_stocks, batch_size):
batch_number = i // batch_size + 1
print(f"\r当前进度: 批次 {batch_number}/{total_batches}", end="")
batch = all_stocks[i:i + batch_size]
batch_results = self.process_batch(batch)
results.extend(batch_results)
if i + batch_size < total_stocks:
time.sleep(random.uniform(3, 5))
if results and ((len(results) % 100 == 0) or (i + batch_size >= total_stocks)):
self.save_intermediate_results(results)
print("\n扫描结束!")
if results:
df_results = pd.DataFrame(results)
high_score_stocks = df_results[df_results['score'] >= self.min_score].sort_values('score', ascending=False)
formatted_results = []
for _, row in high_score_stocks.iterrows():
formatted_results.append({
'股票代码': row['stock_code'],
'评分': f"{row['score']:.1f}",
'当前价格': f"¥{row['price']:.2f}",
'涨跌幅': f"{row['price_change']:.2f}%",
'RSI指标': f"{row['rsi']:.2f}",
'均线趋势': '上升' if row['ma_trend'] == 'UP' else '下降',
'MACD信号': '买入' if row['macd_signal'] == 'BUY' else '卖出',
'成交量状态': '放量' if row['volume_status'] == 'HIGH' else '正常',
'投资建议': row['recommendation']
})
return formatted_results
return []
except Exception as e:
self.logger.error(f"全盘扫描失败:{str(e)}")
raise
# -------------------------------
# **结果分组与报告生成**
# -------------------------------
def format_price_category(price: float) -> str:
"""将价格划分为区间(例如 32.5 -> '30-40'"""
base = (price // 10) * 10
return f"{int(base)}-{int(base+10)}"
def save_results_by_price(results: List[Dict]) -> None:
"""按价格区间保存分析结果至文件"""
try:
os.makedirs('scanner', exist_ok=True)
price_groups = {}
for stock in results:
price = float(stock['当前价格'].replace('¥', ''))
category = format_price_category(price)
price_groups.setdefault(category, []).append(stock)
for category, stocks in price_groups.items():
output_lines = [
"=" * 80,
f"股票分析结果 - 价格区间: {category}",
f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"=" * 80,
f"\n该区间共发现 {len(stocks)} 支高分股票得分≥85",
"-" * 80
]
stocks.sort(key=lambda x: float(x['评分']), reverse=True)
for i, stock in enumerate(stocks, 1):
output_lines.extend([
f"\n{i}. 股票代码: {stock['股票代码']}",
f" 评分: {stock['评分']} | 价格: {stock['当前价格']} | 涨跌幅: {stock['涨跌幅']}",
f" RSI指标: {stock['RSI指标']} | 均线趋势: {stock['均线趋势']} | MACD信号: {stock['MACD信号']}",
f" 成交量状态: {stock['成交量状态']}",
f" 投资建议: {stock['投资建议']}",
"-" * 80
])
output_lines.extend([
f"\n价格区间 {category}元 分析汇总:",
f"1. 股票数量: {len(stocks)}",
f"2. 平均评分: {np.mean([float(stock['评分']) for stock in stocks]):.1f}",
f"3. 买入信号股票数: {sum(1 for stock in stocks if stock['MACD信号'] == '买入')}",
f"4. 放量股票数: {sum(1 for stock in stocks if stock['成交量状态'] == '放量')}"
])
filename = f'scanner/price_{category.replace("-", "_")}.txt'
with open(filename, 'w', encoding='utf-8') as f:
f.write('\n'.join(output_lines))
create_summary_file(price_groups)
except Exception as e:
logging.error(f"保存结果时发生错误: {str(e)}")
raise
def create_summary_file(price_groups: Dict[str, List[Dict]]) -> None:
"""生成综合汇总报告"""
try:
output_lines = [
"=" * 80,
"A股市场优质股票筛选报告",
f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"=" * 80
]
total_stocks = sum(len(stocks) for stocks in price_groups.values())
all_scores = [float(stock['评分']) for stocks in price_groups.values() for stock in stocks]
output_lines.extend([
"\n整体统计:",
f"1. 共筛选出 {total_stocks} 支高分股票得分≥85",
f"2. 平均评分: {np.mean(all_scores):.1f}",
f"3. 最高评分: {max(all_scores):.1f}",
"\n各价格区间分布:",
"-" * 80
])
for category, stocks in sorted(price_groups.items(), key=lambda x: float(x[0].split('-')[0])):
output_lines.extend([
f"\n价格区间 {category}元:",
f" - 股票数量: {len(stocks)}",
f" - 平均评分: {np.mean([float(stock['评分']) for stock in stocks]):.1f}"
])
with open('scanner/summary.txt', 'w', encoding='utf-8') as f:
f.write('\n'.join(output_lines))
except Exception as e:
logging.error(f"生成汇总报告失败:{str(e)}")
raise
# -------------------------------
# **主程序入口**
# -------------------------------
def main():
"""程序主入口"""
print("\n" + "=" * 80)
print("Market-Wide High-Score Stock Scanner".center(76))
print("=" * 80)
scanner = TopStockScanner(max_workers=20) # 已提升至20线程
try:
print("\n开始全盘扫描股票……")
high_score_stocks = scanner.get_high_score_stocks(batch_size=20)
if not high_score_stocks:
print("\n未找到得分大于等于85分的股票。")
return
save_results_by_price(high_score_stocks)
print(f"\n分析完成!结果已保存至 scanner 文件夹中:")
print("1. 按价格区间保存的详细分析文件price_XX_YY.txt")
print("2. 汇总报告summary.txt")
temp_file = 'scanner/temp_results.txt'
if os.path.exists(temp_file):
os.remove(temp_file)
print("\n" + "=" * 80)
input("\n按Enter键退出……")
except Exception as e:
error_msg = f"\n程序错误:{str(e)}\n"
print("=" * 80)
print(error_msg)
print("=" * 80)
os.makedirs('scanner', exist_ok=True)
with open('scanner/error_log.txt', 'w', encoding='utf-8') as f:
f.write("Stock Analysis System Error Report\n")
f.write("=" * 80 + "\n")
f.write(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Error: {str(e)}\n")
f.write("=" * 80 + "\n")
f.write(f"详细堆栈信息:\n{traceback.format_exc()}")
print("错误日志已保存至 scanner/error_log.txt")
input("\n按Enter键退出……")
if __name__ == "__main__":
main()