Add files via upload

This commit is contained in:
DR-lin-eng
2025-02-27 22:50:22 +08:00
committed by GitHub
commit 6a41109204
4 changed files with 1630 additions and 0 deletions

218
gui.py Normal file
View File

@@ -0,0 +1,218 @@
import sys
from PyQt6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QHBoxLayout, QLineEdit, QPushButton, QTextBrowser,
QLabel, QTextEdit, QMessageBox, QProgressBar)
from PyQt6.QtCore import Qt, QThread, pyqtSignal
from PyQt6.QtGui import QFont
import markdown2
from stock_analyzer import StockAnalyzer # 导入股票分析器
class AnalysisWorker(QThread):
"""后台工作线程,用于执行分析任务"""
finished = pyqtSignal(dict)
error = pyqtSignal(str)
progress = pyqtSignal(int)
def __init__(self, analyzer, stock_code):
super().__init__()
self.analyzer = analyzer
self.stock_code = stock_code
def run(self):
try:
report = self.analyzer.analyze_stock(self.stock_code)
self.finished.emit(report)
except Exception as e:
self.error.emit(str(e))
class BatchAnalysisWorker(QThread):
"""后台工作线程,用于执行批量分析任务"""
finished = pyqtSignal(list)
error = pyqtSignal(str)
progress = pyqtSignal(int)
def __init__(self, analyzer, stock_list):
super().__init__()
self.analyzer = analyzer
self.stock_list = stock_list
def run(self):
try:
results = []
total = len(self.stock_list)
for i, stock_code in enumerate(self.stock_list):
report = self.analyzer.analyze_stock(stock_code)
results.append(report)
self.progress.emit(int((i + 1) / total * 100))
self.finished.emit(results)
except Exception as e:
self.error.emit(str(e))
class StockAnalyzerGUI(QMainWindow):
def __init__(self):
super().__init__()
self.analyzer = StockAnalyzer()
self.initUI()
def initUI(self):
self.setWindowTitle('股票分析系统')
self.setGeometry(100, 100, 1200, 800)
# 创建中央部件和布局
central_widget = QWidget()
self.setCentralWidget(central_widget)
layout = QVBoxLayout(central_widget)
# 创建输入区域
input_layout = QHBoxLayout()
# 单只股票分析部分
single_stock_layout = QVBoxLayout()
single_label = QLabel('单只股票分析:')
single_label.setFont(QFont('Arial', 12, QFont.Weight.Bold))
self.single_stock_input = QLineEdit()
self.single_stock_input.setPlaceholderText('输入股票代码600000')
self.single_stock_input.setFont(QFont('Arial', 10))
self.analyze_btn = QPushButton('分析')
self.analyze_btn.setFont(QFont('Arial', 10, QFont.Weight.Bold))
self.analyze_btn.clicked.connect(self.analyze_single_stock)
single_stock_layout.addWidget(single_label)
single_stock_layout.addWidget(self.single_stock_input)
single_stock_layout.addWidget(self.analyze_btn)
# 批量分析部分
batch_stock_layout = QVBoxLayout()
batch_label = QLabel('批量股票分析:')
batch_label.setFont(QFont('Arial', 12, QFont.Weight.Bold))
self.batch_stock_input = QTextEdit()
self.batch_stock_input.setPlaceholderText('输入多个股票代码,每行一个')
self.batch_stock_input.setFont(QFont('Arial', 10))
self.batch_stock_input.setMaximumHeight(100)
self.batch_analyze_btn = QPushButton('批量分析')
self.batch_analyze_btn.setFont(QFont('Arial', 10, QFont.Weight.Bold))
self.batch_analyze_btn.clicked.connect(self.analyze_multiple_stocks)
batch_stock_layout.addWidget(batch_label)
batch_stock_layout.addWidget(self.batch_stock_input)
batch_stock_layout.addWidget(self.batch_analyze_btn)
input_layout.addLayout(single_stock_layout)
input_layout.addLayout(batch_stock_layout)
# 添加进度条
self.progress_bar = QProgressBar()
self.progress_bar.setVisible(False)
# 添加结果显示区域
self.result_browser = QTextBrowser()
self.result_browser.setOpenExternalLinks(True)
self.result_browser.setFont(QFont('Arial', 10))
layout.addLayout(input_layout)
layout.addWidget(self.progress_bar)
layout.addWidget(self.result_browser)
def format_report(self, report, is_single=True):
"""将分析报告格式化为Markdown格式"""
md = f"""# 股票分析报告 - {report['stock_code']}
## 基本信息
- 分析日期:{report['analysis_date']}
- 当前价格:{report['price']:.2f}
- 价格变动:{report['price_change']:.2f}%
## 技术指标
- 均线趋势:{report['ma_trend']}
- RSI指标{report['rsi']:.2f}
- MACD信号{report['macd_signal']}
- 成交量状态:{report['volume_status']}
## 评分与建议
- 综合评分:{report['score']}
- 投资建议:{report['recommendation']}
## AI分析
{report['ai_analysis']}
---
"""
return md
def analyze_single_stock(self):
"""分析单只股票"""
stock_code = self.single_stock_input.text().strip()
if not stock_code:
QMessageBox.warning(self, '警告', '请输入股票代码')
return
self.analyze_btn.setEnabled(False)
self.progress_bar.setVisible(True)
self.progress_bar.setValue(0)
# 创建工作线程
self.worker = AnalysisWorker(self.analyzer, stock_code)
self.worker.finished.connect(self.handle_single_analysis_result)
self.worker.error.connect(self.handle_analysis_error)
self.worker.progress.connect(self.progress_bar.setValue)
self.worker.start()
def handle_single_analysis_result(self, report):
"""处理单只股票分析结果"""
markdown_text = self.format_report(report)
html_content = markdown2.markdown(markdown_text)
self.result_browser.setHtml(html_content)
self.analyze_btn.setEnabled(True)
self.progress_bar.setVisible(False)
def analyze_multiple_stocks(self):
"""批量分析股票"""
text = self.batch_stock_input.toPlainText().strip()
if not text:
QMessageBox.warning(self, '警告', '请输入股票代码')
return
stock_list = [code.strip() for code in text.split('\n') if code.strip()]
self.batch_analyze_btn.setEnabled(False)
self.progress_bar.setVisible(True)
self.progress_bar.setValue(0)
# 创建工作线程
self.batch_worker = BatchAnalysisWorker(self.analyzer, stock_list)
self.batch_worker.finished.connect(self.handle_batch_analysis_result)
self.batch_worker.error.connect(self.handle_analysis_error)
self.batch_worker.progress.connect(self.progress_bar.setValue)
self.batch_worker.start()
def handle_batch_analysis_result(self, recommendations):
"""处理批量分析结果"""
# 生成markdown格式的报告
markdown_text = "# 批量股票分析报告\n\n"
for rec in recommendations:
markdown_text += self.format_report(rec, False)
html_content = markdown2.markdown(markdown_text)
self.result_browser.setHtml(html_content)
self.batch_analyze_btn.setEnabled(True)
self.progress_bar.setVisible(False)
def handle_analysis_error(self, error_message):
"""处理分析错误"""
QMessageBox.critical(self, '错误', f'分析过程中出现错误:{error_message}')
self.analyze_btn.setEnabled(True)
self.batch_analyze_btn.setEnabled(True)
self.progress_bar.setVisible(False)
def main():
app = QApplication(sys.argv)
# 设置应用程序样式
app.setStyle('Fusion')
# 创建并显示主窗口
window = StockAnalyzerGUI()
window.show()
sys.exit(app.exec())
if __name__ == '__main__':
main()

497
gui2.py Normal file
View File

@@ -0,0 +1,497 @@
import sys
from PyQt6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QHBoxLayout, QLineEdit, QPushButton, QTextBrowser,
QLabel, QTextEdit, QMessageBox, QProgressBar,
QFrame, QSizePolicy)
from PyQt6.QtCore import Qt, QThread, pyqtSignal
from PyQt6.QtGui import QFont, QPalette, QColor
import markdown2
from stock_analyzer import StockAnalyzer
class ModernFrame(QFrame):
"""现代化的面板组件"""
def __init__(self, parent=None):
super().__init__(parent)
self.setFrameStyle(QFrame.Shape.StyledPanel | QFrame.Shadow.Raised)
self.setStyleSheet("""
ModernFrame {
background-color: #ffffff;
border-radius: 10px;
border: 1px solid #e0e0e0;
}
""")
class ModernButton(QPushButton):
"""现代化的按钮组件"""
def __init__(self, text, parent=None, primary=True):
super().__init__(text, parent)
self.setMinimumHeight(40)
if primary:
self.setStyleSheet("""
QPushButton {
background-color: #1a73e8;
color: white;
border: none;
border-radius: 5px;
padding: 8px 16px;
font-weight: bold;
}
QPushButton:hover {
background-color: #1557b0;
}
QPushButton:pressed {
background-color: #0d47a1;
}
QPushButton:disabled {
background-color: #cccccc;
}
""")
else:
self.setStyleSheet("""
QPushButton {
background-color: #f8f9fa;
color: #1a73e8;
border: 1px solid #dadce0;
border-radius: 5px;
padding: 8px 16px;
font-weight: bold;
}
QPushButton:hover {
background-color: #f1f3f4;
border-color: #d2e3fc;
}
QPushButton:pressed {
background-color: #e8eaed;
}
QPushButton:disabled {
color: #5f6368;
border-color: #e0e0e0;
}
""")
class ModernLineEdit(QLineEdit):
"""现代化的输入框组件"""
def __init__(self, parent=None):
super().__init__(parent)
self.setMinimumHeight(40)
self.setStyleSheet("""
QLineEdit {
border: 2px solid #e0e0e0;
border-radius: 5px;
padding: 8px 12px;
background-color: white;
selection-background-color: #cce0ff;
}
QLineEdit:focus {
border-color: #1a73e8;
}
QLineEdit:hover {
border-color: #999999;
}
""")
class ModernTextEdit(QTextEdit):
"""现代化的多行文本输入框组件"""
def __init__(self, parent=None):
super().__init__(parent)
self.setStyleSheet("""
QTextEdit {
border: 2px solid #e0e0e0;
border-radius: 5px;
padding: 8px;
background-color: white;
selection-background-color: #cce0ff;
}
QTextEdit:focus {
border-color: #1a73e8;
}
QTextEdit:hover {
border-color: #999999;
}
""")
class ModernProgressBar(QProgressBar):
"""现代化的进度条组件"""
def __init__(self, parent=None):
super().__init__(parent)
self.setStyleSheet("""
QProgressBar {
border: none;
border-radius: 3px;
background-color: #f0f0f0;
height: 6px;
text-align: center;
}
QProgressBar::chunk {
background-color: #1a73e8;
border-radius: 3px;
}
""")
self.setTextVisible(False)
class AnalysisWorker(QThread):
"""后台工作线程,用于执行分析任务"""
finished = pyqtSignal(dict)
error = pyqtSignal(str)
progress = pyqtSignal(int)
def __init__(self, analyzer, stock_code):
super().__init__()
self.analyzer = analyzer
self.stock_code = stock_code
def run(self):
try:
report = self.analyzer.analyze_stock(self.stock_code)
self.finished.emit(report)
except Exception as e:
self.error.emit(str(e))
class BatchAnalysisWorker(QThread):
"""后台工作线程,用于执行批量分析任务"""
finished = pyqtSignal(list)
error = pyqtSignal(str)
progress = pyqtSignal(int)
def __init__(self, analyzer, stock_list):
super().__init__()
self.analyzer = analyzer
self.stock_list = stock_list
def run(self):
try:
results = []
total = len(self.stock_list)
for i, stock_code in enumerate(self.stock_list):
report = self.analyzer.analyze_stock(stock_code)
results.append(report)
self.progress.emit(int((i + 1) / total * 100))
self.finished.emit(results)
except Exception as e:
self.error.emit(str(e))
class ModernStockAnalyzerGUI(QMainWindow):
def __init__(self):
super().__init__()
self.analyzer = StockAnalyzer()
self.init_ui()
self.adjust_size_and_position()
def adjust_size_and_position(self):
"""调整窗口大小和位置以适应不同分辨率"""
screen = QApplication.primaryScreen()
if screen:
geometry = screen.availableGeometry()
# 设置窗口大小为屏幕的75%
width = int(geometry.width() * 0.75)
height = int(geometry.height() * 0.75)
self.resize(width, height)
# 居中显示
center = geometry.center()
frame = self.frameGeometry()
frame.moveCenter(center)
self.move(frame.topLeft())
def init_ui(self):
self.setWindowTitle('现代股票分析系统')
self.setStyleSheet("""
QMainWindow {
background-color: #f8f9fa;
}
""")
# 创建中央部件和主布局
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QVBoxLayout(central_widget)
main_layout.setSpacing(20)
main_layout.setContentsMargins(20, 20, 20, 20)
# 创建标题
title_label = QLabel('股票分析系统')
title_label.setStyleSheet("""
QLabel {
color: #202124;
font-size: 24px;
font-weight: bold;
margin-bottom: 20px;
}
""")
main_layout.addWidget(title_label, alignment=Qt.AlignmentFlag.AlignTop)
# 创建输入区域容器
input_container = ModernFrame()
input_layout = QHBoxLayout(input_container)
input_layout.setSpacing(20)
input_layout.setContentsMargins(20, 20, 20, 20)
# 单只股票分析部分
single_stock_frame = self.create_single_stock_section()
input_layout.addWidget(single_stock_frame)
# 分隔线
separator = QFrame()
separator.setFrameShape(QFrame.Shape.VLine)
separator.setStyleSheet("background-color: #e0e0e0;")
input_layout.addWidget(separator)
# 批量分析部分
batch_stock_frame = self.create_batch_stock_section()
input_layout.addWidget(batch_stock_frame)
main_layout.addWidget(input_container)
# 进度条
self.progress_bar = ModernProgressBar()
self.progress_bar.setVisible(False)
main_layout.addWidget(self.progress_bar)
# 结果显示区域
result_frame = ModernFrame()
result_layout = QVBoxLayout(result_frame)
result_layout.setContentsMargins(15, 15, 15, 15)
result_label = QLabel('分析结果')
result_label.setStyleSheet("""
QLabel {
color: #202124;
font-size: 18px;
font-weight: bold;
}
""")
result_layout.addWidget(result_label)
self.result_browser = QTextBrowser()
self.result_browser.setOpenExternalLinks(True)
self.result_browser.setStyleSheet("""
QTextBrowser {
border: none;
background-color: white;
font-size: 14px;
line-height: 1.5;
}
""")
result_layout.addWidget(self.result_browser)
main_layout.addWidget(result_frame)
def create_single_stock_section(self):
"""创建单只股票分析部分"""
frame = QFrame()
layout = QVBoxLayout(frame)
layout.setSpacing(15)
label = QLabel('单只股票分析')
label.setStyleSheet("""
QLabel {
color: #202124;
font-size: 16px;
font-weight: bold;
}
""")
layout.addWidget(label)
self.single_stock_input = ModernLineEdit()
self.single_stock_input.setPlaceholderText('输入股票代码600000')
layout.addWidget(self.single_stock_input)
self.analyze_btn = ModernButton('分析')
self.analyze_btn.clicked.connect(self.analyze_single_stock)
layout.addWidget(self.analyze_btn)
layout.addStretch()
return frame
def create_batch_stock_section(self):
"""创建批量分析部分"""
frame = QFrame()
layout = QVBoxLayout(frame)
layout.setSpacing(15)
label = QLabel('批量股票分析')
label.setStyleSheet("""
QLabel {
color: #202124;
font-size: 16px;
font-weight: bold;
}
""")
layout.addWidget(label)
self.batch_stock_input = ModernTextEdit()
self.batch_stock_input.setPlaceholderText('输入多个股票代码,每行一个')
self.batch_stock_input.setMinimumHeight(100)
layout.addWidget(self.batch_stock_input)
self.batch_analyze_btn = ModernButton('批量分析')
self.batch_analyze_btn.clicked.connect(self.analyze_multiple_stocks)
layout.addWidget(self.batch_analyze_btn)
layout.addStretch()
return frame
def format_report(self, report, is_single=True):
"""将分析报告格式化为现代化的Markdown格式"""
md = f"""# 股票分析报告 - {report['stock_code']}
## 基本信息
- **分析日期:** {report['analysis_date']}
- **当前价格:** ¥{report['price']:.2f}
- **价格变动:** {report['price_change']:.2f}%
## 技术指标
- **均线趋势:** {report['ma_trend']}
- **RSI指标** {report['rsi']:.2f}
- **MACD信号** {report['macd_signal']}
- **成交量状态:** {report['volume_status']}
## 评分与建议
- **综合评分:** {report['score']}
- **投资建议:** {report['recommendation']}
## AI分析
{report['ai_analysis']}
---
"""
return md
def analyze_single_stock(self):
"""分析单只股票"""
stock_code = self.single_stock_input.text().strip()
if not stock_code:
self.show_warning('请输入股票代码')
return
self.analyze_btn.setEnabled(False)
self.progress_bar.setVisible(True)
self.progress_bar.setValue(0)
self.worker = AnalysisWorker(self.analyzer, stock_code)
self.worker.finished.connect(self.handle_single_analysis_result)
self.worker.error.connect(self.handle_analysis_error)
self.worker.progress.connect(self.progress_bar.setValue)
self.worker.start()
def analyze_multiple_stocks(self):
"""批量分析股票"""
text = self.batch_stock_input.toPlainText().strip()
if not text:
self.show_warning('请输入股票代码')
return
stock_list = [code.strip() for code in text.split('\n') if code.strip()]
self.batch_analyze_btn.setEnabled(False)
self.progress_bar.setVisible(True)
self.progress_bar.setValue(0)
self.batch_worker = BatchAnalysisWorker(self.analyzer, stock_list)
self.batch_worker.finished.connect(self.handle_batch_analysis_result)
self.batch_worker.error.connect(self.handle_analysis_error)
self.batch_worker.progress.connect(self.progress_bar.setValue)
self.batch_worker.start()
def handle_single_analysis_result(self, report):
"""处理单只股票分析结果"""
markdown_text = self.format_report(report)
html_content = markdown2.markdown(markdown_text, extras=['tables', 'fenced-code-blocks'])
self.result_browser.setHtml(html_content)
self.analyze_btn.setEnabled(True)
self.progress_bar.setVisible(False)
def handle_batch_analysis_result(self, recommendations):
"""处理批量分析结果"""
markdown_text = "# 批量股票分析报告\n\n"
for rec in recommendations:
markdown_text += self.format_report(rec, False)
html_content = markdown2.markdown(markdown_text, extras=['tables', 'fenced-code-blocks'])
self.result_browser.setHtml(html_content)
self.batch_analyze_btn.setEnabled(True)
self.progress_bar.setVisible(False)
def handle_analysis_error(self, error_message):
"""处理分析错误"""
self.show_error(f'分析过程中出现错误:{error_message}')
self.analyze_btn.setEnabled(True)
self.batch_analyze_btn.setEnabled(True)
self.progress_bar.setVisible(False)
def show_warning(self, message):
"""显示警告对话框"""
warning = QMessageBox(self)
warning.setIcon(QMessageBox.Icon.Warning)
warning.setWindowTitle('警告')
warning.setText(message)
warning.setStandardButtons(QMessageBox.StandardButton.Ok)
warning.setStyleSheet("""
QMessageBox {
background-color: white;
}
QMessageBox QLabel {
color: #202124;
min-width: 200px;
}
QPushButton {
background-color: #1a73e8;
color: white;
border: none;
border-radius: 4px;
padding: 8px 16px;
min-width: 80px;
}
QPushButton:hover {
background-color: #1557b0;
}
QPushButton:pressed {
background-color: #0d47a1;
}
""")
warning.exec()
def show_error(self, message):
"""显示错误对话框"""
error = QMessageBox(self)
error.setIcon(QMessageBox.Icon.Critical)
error.setWindowTitle('错误')
error.setText(message)
error.setStandardButtons(QMessageBox.StandardButton.Ok)
error.setStyleSheet("""
QMessageBox {
background-color: white;
}
QMessageBox QLabel {
color: #202124;
min-width: 200px;
}
QPushButton {
background-color: #1a73e8;
color: white;
border: none;
border-radius: 4px;
padding: 8px 16px;
min-width: 80px;
}
QPushButton:hover {
background-color: #1557b0;
}
QPushButton:pressed {
background-color: #0d47a1;
}
""")
error.exec()
def main():
app = QApplication(sys.argv)
app.setStyle('Fusion')
# 创建并显示主窗口
window = ModernStockAnalyzerGUI()
window.show()
sys.exit(app.exec())
if __name__ == '__main__':
main()

313
stock_analyzer.py Normal file
View File

@@ -0,0 +1,313 @@
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import os
import requests
from typing import Dict, List, Optional, Tuple
from dotenv import load_dotenv
import logging
class StockAnalyzer:
def __init__(self, initial_cash=1000000):
# 设置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
self.logger = logging.getLogger(__name__)
# 加载环境变量
load_dotenv()
# 设置 Gemini API
self.gemini_api_url = "https://api.linzefeng.top"
self.gemini_api_key = os.getenv('GEMINI_API_KEY')
# 配置参数
self.params = {
'ma_periods': {'short': 5, 'medium': 20, 'long': 60},
'rsi_period': 14,
'bollinger_period': 20,
'bollinger_std': 2,
'volume_ma_period': 20,
'atr_period': 14
}
def get_stock_data(self, stock_code, start_date=None, end_date=None):
"""获取股票数据"""
import akshare as ak
if start_date is None:
start_date = (datetime.now() - timedelta(days=365)).strftime('%Y%m%d')
if end_date is None:
end_date = datetime.now().strftime('%Y%m%d')
try:
# 使用 akshare 获取股票数据
df = ak.stock_zh_a_hist(symbol=stock_code,
start_date=start_date,
end_date=end_date,
adjust="qfq")
# 重命名列名以匹配分析需求
df = df.rename(columns={
"日期": "date",
"开盘": "open",
"收盘": "close",
"最高": "high",
"最低": "low",
"成交量": "volume"
})
# 确保日期格式正确
df['date'] = pd.to_datetime(df['date'])
# 数据类型转换
numeric_columns = ['open', 'close', 'high', 'low', 'volume']
df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, errors='coerce')
# 删除空值
df = df.dropna()
return df.sort_values('date')
except Exception as e:
self.logger.error(f"获取股票数据失败: {str(e)}")
raise Exception(f"获取股票数据失败: {str(e)}")
def calculate_ema(self, series, period):
"""计算指数移动平均线"""
return series.ewm(span=period, adjust=False).mean()
def calculate_rsi(self, series, period):
"""计算RSI指标"""
delta = series.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
return 100 - (100 / (1 + rs))
def calculate_macd(self, series):
"""计算MACD指标"""
exp1 = series.ewm(span=12, adjust=False).mean()
exp2 = series.ewm(span=26, adjust=False).mean()
macd = exp1 - exp2
signal = macd.ewm(span=9, adjust=False).mean()
hist = macd - signal
return macd, signal, hist
def calculate_bollinger_bands(self, series, period, std_dev):
"""计算布林带"""
middle = series.rolling(window=period).mean()
std = series.rolling(window=period).std()
upper = middle + (std * std_dev)
lower = middle - (std * std_dev)
return upper, middle, lower
def calculate_atr(self, df, period):
"""计算ATR指标"""
high = df['high']
low = df['low']
close = df['close'].shift(1)
tr1 = high - low
tr2 = abs(high - close)
tr3 = abs(low - close)
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
return tr.rolling(window=period).mean()
def calculate_indicators(self, df):
"""计算技术指标"""
try:
# 计算移动平均线
df['MA5'] = self.calculate_ema(df['close'], self.params['ma_periods']['short'])
df['MA20'] = self.calculate_ema(df['close'], self.params['ma_periods']['medium'])
df['MA60'] = self.calculate_ema(df['close'], self.params['ma_periods']['long'])
# 计算RSI
df['RSI'] = self.calculate_rsi(df['close'], self.params['rsi_period'])
# 计算MACD
df['MACD'], df['Signal'], df['MACD_hist'] = self.calculate_macd(df['close'])
# 计算布林带
df['BB_upper'], df['BB_middle'], df['BB_lower'] = self.calculate_bollinger_bands(
df['close'],
self.params['bollinger_period'],
self.params['bollinger_std']
)
# 成交量分析
df['Volume_MA'] = df['volume'].rolling(window=self.params['volume_ma_period']).mean()
df['Volume_Ratio'] = df['volume'] / df['Volume_MA']
# 计算ATR和波动率
df['ATR'] = self.calculate_atr(df, self.params['atr_period'])
df['Volatility'] = df['ATR'] / df['close'] * 100
# 动量指标
df['ROC'] = df['close'].pct_change(periods=10) * 100
return df
except Exception as e:
self.logger.error(f"计算技术指标时出错: {str(e)}")
raise
def calculate_score(self, df):
"""计算股票评分"""
try:
score = 0
latest = df.iloc[-1]
# 趋势得分 (30分)
if latest['MA5'] > latest['MA20']:
score += 15
if latest['MA20'] > latest['MA60']:
score += 15
# RSI得分 (20分)
if 30 <= latest['RSI'] <= 70:
score += 20
elif latest['RSI'] < 30: # 超卖
score += 15
# MACD得分 (20分)
if latest['MACD'] > latest['Signal']:
score += 20
# 成交量得分 (30分)
if latest['Volume_Ratio'] > 1.5:
score += 30
elif latest['Volume_Ratio'] > 1:
score += 15
return score
except Exception as e:
self.logger.error(f"计算评分时出错: {str(e)}")
raise
def get_ai_analysis(self, df, stock_code):
"""使用 Gemini 进行 AI 分析"""
try:
recent_data = df.tail(14).to_dict('records')
technical_summary = {
'trend': 'upward' if df.iloc[-1]['MA5'] > df.iloc[-1]['MA20'] else 'downward',
'volatility': f"{df.iloc[-1]['Volatility']:.2f}%",
'volume_trend': 'increasing' if df.iloc[-1]['Volume_Ratio'] > 1 else 'decreasing',
'rsi_level': df.iloc[-1]['RSI']
}
prompt = f"""
分析股票 {stock_code}
技术指标概要:
{technical_summary}
近14日交易数据
{recent_data}
请提供:
1. 趋势分析(包含支撑位和压力位)
2. 成交量分析及其含义
3. 风险评估(包含波动率分析)
4. 短期和中期目标价位
5. 关键技术位分析
6. 具体交易建议(包含止损位)
请基于技术指标和市场动态进行分析,给出具体数据支持。
"""
headers = {
"Authorization": f"Bearer {self.gemini_api_key}",
"Content-Type": "application/json"
}
data = {
"model": "gemini-1.5-flash",
"messages": [{"role": "user", "content": prompt}]
}
response = requests.post(
f"{self.gemini_api_url}/v1/chat/completions",
headers=headers,
json=data,
timeout=10
)
if response.status_code == 200:
return response.json()['choices'][0]['message']['content']
else:
return "AI 分析暂时无法使用"
except Exception as e:
self.logger.error(f"AI 分析发生错误: {str(e)}")
return "AI 分析过程中发生错误"
def get_recommendation(self, score):
"""根据得分给出建议"""
if score >= 80:
return '强烈推荐买入'
elif score >= 60:
return '建议买入'
elif score >= 40:
return '观望'
elif score >= 20:
return '建议卖出'
else:
return '强烈建议卖出'
def analyze_stock(self, stock_code):
"""分析单个股票"""
try:
# 获取股票数据
df = self.get_stock_data(stock_code)
# 计算技术指标
df = self.calculate_indicators(df)
# 评分系统
score = self.calculate_score(df)
# 获取最新数据
latest = df.iloc[-1]
prev = df.iloc[-2]
# 生成报告(保持原有格式)
report = {
'stock_code': stock_code,
'analysis_date': datetime.now().strftime('%Y-%m-%d'),
'score': score,
'price': latest['close'],
'price_change': (latest['close'] - prev['close']) / prev['close'] * 100,
'ma_trend': 'UP' if latest['MA5'] > latest['MA20'] else 'DOWN',
'rsi': latest['RSI'],
'macd_signal': 'BUY' if latest['MACD'] > latest['Signal'] else 'SELL',
'volume_status': 'HIGH' if latest['Volume_Ratio'] > 1.5 else 'NORMAL',
'recommendation': self.get_recommendation(score),
'ai_analysis': self.get_ai_analysis(df, stock_code)
}
return report
except Exception as e:
self.logger.error(f"分析股票时出错: {str(e)}")
raise
def scan_market(self, stock_list, min_score=60):
"""扫描市场,寻找符合条件的股票"""
recommendations = []
for stock_code in stock_list:
try:
report = self.analyze_stock(stock_code)
if report['score'] >= min_score:
recommendations.append(report)
except Exception as e:
self.logger.error(f"分析股票 {stock_code} 时出错: {str(e)}")
continue
# 按得分排序
recommendations.sort(key=lambda x: x['score'], reverse=True)
return recommendations

View File

@@ -0,0 +1,602 @@
""""
Stock Analysis System
优化后的全盘股票技术分析系统——用于A股市场股票的全面分析已加速分析并增加额外指标。
"""
import os
import time
import random
import logging
import traceback
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List, Optional, Tuple, Union
from dataclasses import dataclass
import numpy as np
import pandas as pd
import akshare as ak
from tqdm import tqdm
# -------------------------------
# **技术指标配置**
# -------------------------------
@dataclass
class TechnicalParams:
"""技术指标参数配置"""
ma_periods: Dict[str, int]
rsi_period: int
bollinger_period: int
bollinger_std: int
volume_ma_period: int
atr_period: int
@classmethod
def default(cls) -> 'TechnicalParams':
"""返回默认的技术指标参数"""
return cls(
ma_periods={'short': 5, 'medium': 20, 'long': 60},
rsi_period=14,
bollinger_period=20,
bollinger_std=2,
volume_ma_period=20,
atr_period=14
)
# -------------------------------
# **股票分析引擎**
# -------------------------------
class StockAnalyzer:
"""股票分析引擎,计算各类技术指标"""
def __init__(self, params: Optional[TechnicalParams] = None):
"""
初始化股票分析引擎
Args:
params: 技术指标配置参数
"""
self._setup_logging()
self.params = params or TechnicalParams.default()
def _setup_logging(self) -> None:
"""配置日志记录"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
self.logger = logging.getLogger(__name__)
def get_stock_data(self, stock_code: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None) -> pd.DataFrame:
"""
获取单只股票历史数据,默认使用前一年的数据。
Args:
stock_code: 股票代码(可以带市场前缀或纯代码)
start_date: 开始日期(格式YYYYMMDD)
end_date: 结束日期(格式YYYYMMDD)
Returns:
包含日期、开盘、收盘、最高、最低、成交量的 DataFrame
"""
try:
if not start_date:
start_date = (datetime.now() - timedelta(days=365)).strftime('%Y%m%d')
if not end_date:
end_date = datetime.now().strftime('%Y%m%d')
code = stock_code[2:] if stock_code.startswith(('sz', 'sh')) else stock_code
df = ak.stock_zh_a_hist(
symbol=code,
start_date=start_date,
end_date=end_date,
adjust="qfq"
)
self.logger.info(f"获取到 {len(df)} 行数据,列名:{df.columns.tolist()}")
df = df.rename(columns={
"日期": "date",
"开盘": "open",
"收盘": "close",
"最高": "high",
"最低": "low",
"成交量": "volume",
"trade_date": "date"
})
required_columns = {'date', 'open', 'close', 'high', 'low', 'volume'}
missing_columns = required_columns - set(df.columns)
if missing_columns:
raise ValueError(f"缺失必须字段: {missing_columns}")
df['date'] = pd.to_datetime(df['date'], errors='coerce')
numeric_columns = ['open', 'close', 'high', 'low', 'volume']
df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, errors='coerce')
df_cleaned = df.dropna(subset=['date'] + numeric_columns).sort_values('date')
if len(df_cleaned) < 60:
raise ValueError(f"数据不足(仅 {len(df_cleaned)}无法计算至少60日均线")
return df_cleaned
except Exception as e:
self.logger.error(f"获取股票数据失败,股票代码 {stock_code},错误信息:{str(e)}")
raise ValueError(f"股票 {stock_code} 数据获取出错: {str(e)}")
@staticmethod
def calculate_ema(series: pd.Series, period: int) -> pd.Series:
"""计算 EMA"""
return series.ewm(span=period, adjust=False).mean()
@staticmethod
def calculate_rsi(series: pd.Series, period: int) -> pd.Series:
"""
计算 RSI指数加权移动平均法
"""
delta = series.diff()
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.ewm(com=period - 1, adjust=False).mean()
avg_loss = loss.ewm(com=period - 1, adjust=False).mean()
rs = avg_gain / (avg_loss + 1e-10)
return 100 - (100 / (1 + rs))
@staticmethod
def calculate_macd(series: pd.Series) -> Tuple[pd.Series, pd.Series, pd.Series]:
"""计算 MACD、信号线和直方图"""
exp1 = series.ewm(span=12, adjust=False).mean()
exp2 = series.ewm(span=26, adjust=False).mean()
macd = exp1 - exp2
signal = macd.ewm(span=9, adjust=False).mean()
return macd, signal, macd - signal
@staticmethod
def calculate_bollinger_bands(series: pd.Series, period: int, std_dev: int) -> Tuple[pd.Series, pd.Series, pd.Series]:
"""计算 Bollinger 通道"""
middle = series.rolling(window=period, min_periods=period).mean()
std = series.rolling(window=period, min_periods=period).std()
upper = middle + std * std_dev
lower = middle - std * std_dev
return upper, middle, lower
def calculate_atr(self, df: pd.DataFrame, period: int) -> pd.Series:
"""计算 ATR"""
high = df['high']
low = df['low']
prev_close = df['close'].shift(1)
tr = pd.concat([
high - low,
(high - prev_close).abs(),
(low - prev_close).abs()
], axis=1).max(axis=1)
return tr.rolling(window=period, min_periods=period).mean()
@staticmethod
def calculate_obv(series_close: pd.Series, series_volume: pd.Series) -> pd.Series:
"""计算 OBV能量潮指标"""
diff = series_close.diff().fillna(0)
obv = np.where(diff > 0, series_volume, np.where(diff < 0, -series_volume, 0))
return pd.Series(obv, index=series_close.index).cumsum()
@staticmethod
def calculate_stochastic(series_close: pd.Series, window: int = 14) -> Tuple[pd.Series, pd.Series]:
"""
计算随机指标Stochastic Oscillator
%K = (close - lowest_low) / (highest_high - lowest_low)*100
%D 为 %K 的3日简单移动平均
"""
lowest = series_close.rolling(window=window, min_periods=window).min()
highest = series_close.rolling(window=window, min_periods=window).max()
percentK = (series_close - lowest) / (highest - lowest + 1e-10) * 100
percentD = percentK.rolling(window=3, min_periods=3).mean()
return percentK, percentD
def calculate_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
"""计算所有技术指标,并增加 OBV 和随机指标"""
try:
# 移动均线
for key, period in self.params.ma_periods.items():
df[f'MA{period}'] = self.calculate_ema(df['close'], period)
df['RSI'] = self.calculate_rsi(df['close'], self.params.rsi_period)
df['MACD'], df['Signal'], df['MACD_hist'] = self.calculate_macd(df['close'])
df['BB_upper'], df['BB_middle'], df['BB_lower'] = self.calculate_bollinger_bands(
df['close'], self.params.bollinger_period, self.params.bollinger_std)
df['Volume_MA'] = df['volume'].rolling(window=self.params.volume_ma_period,
min_periods=self.params.volume_ma_period).mean()
df['Volume_Ratio'] = df['volume'] / (df['Volume_MA'] + 1e-10)
df['ATR'] = self.calculate_atr(df, self.params.atr_period)
df['Volatility'] = df['ATR'] / df['close'] * 100
df['ROC'] = df['close'].pct_change(periods=10) * 100
# 增加 OBV 指标
df['OBV'] = self.calculate_obv(df['close'], df['volume'])
df['OBV_MA10'] = df['OBV'].rolling(window=10, min_periods=10).mean()
# 增加随机指标 Stochastic
df['%K'], df['%D'] = self.calculate_stochastic(df['close'], window=14)
return df
except Exception as e:
self.logger.error(f"指标计算出错:{str(e)}")
raise
def calculate_score(self, df: pd.DataFrame) -> float:
"""
计算股票综合打分基本打分满分100分并根据 OBV 与随机指标调整±5分
使量化结果更全面。
基本打分逻辑不变:
趋势30分、RSI20分、MACD20分、成交量30分
附加指标调整:
OBVOBV > OBV_MA10+5分反之-5分
随机指标:%K < 20为超卖+5分%K > 80为超买-5分。
"""
try:
score = 0
latest = df.iloc[-1]
# 趋势打分
if latest['MA5'] > latest['MA20'] and latest['MA20'] > latest['MA60']:
score += 30
else:
if latest['MA5'] > latest['MA20']:
score += 15
if latest['MA20'] > latest['MA60']:
score += 15
# RSI 打分
if 30 <= latest['RSI'] <= 70:
score += 20
elif latest['RSI'] < 30:
score += 15
# MACD 打分
if latest['MACD'] > latest['Signal']:
score += 20
# 成交量打分
if latest['Volume_Ratio'] > 1.5:
score += 30
elif latest['Volume_Ratio'] > 1:
score += 15
# 附加 OBV 调整
if latest['OBV'] > latest['OBV_MA10']:
score += 5
else:
score -= 5
# 附加随机指标调整
if latest['%K'] < 20:
score += 5
elif latest['%K'] > 80:
score -= 5
return score
except Exception as e:
self.logger.error(f"计算打分失败:{str(e)}")
raise
@staticmethod
def get_recommendation(score: float) -> str:
"""根据最终打分给出投资建议"""
if score >= 80:
return '强烈推荐买入'
elif score >= 60:
return '建议买入'
elif score >= 40:
return '建议观望'
elif score >= 20:
return '建议卖出'
else:
return '强烈建议卖出'
def analyze_stock(self, stock_code: str) -> Dict:
"""针对单只股票执行完整的技术分析流程"""
try:
df = self.get_stock_data(stock_code)
df = self.calculate_indicators(df)
score = self.calculate_score(df)
latest = df.iloc[-1]
prev = df.iloc[-2]
return {
'stock_code': stock_code,
'analysis_date': datetime.now().strftime('%Y-%m-%d'),
'score': score,
'price': latest['close'],
'price_change': (latest['close'] - prev['close']) / prev['close'] * 100,
'ma_trend': 'UP' if latest['MA5'] > latest['MA20'] else 'DOWN',
'rsi': latest['RSI'],
'macd_signal': 'BUY' if latest['MACD'] > latest['Signal'] else 'SELL',
'volume_status': 'HIGH' if latest['Volume_Ratio'] > 1.5 else 'NORMAL',
'recommendation': self.get_recommendation(score)
}
except Exception as e:
self.logger.error(f"分析股票 {stock_code} 失败:{str(e)}")
raise
# -------------------------------
# **全盘股票扫描器**
# -------------------------------
class TopStockScanner:
"""全盘筛选高打分股票的扫描器"""
def __init__(self, max_workers: int = 20, min_score: float = 85):
"""
初始化扫描器
Args:
max_workers: 并发线程数量已增至20以加速分析
min_score: 高分最低阈值
"""
self.analyzer = StockAnalyzer()
self.max_workers = max_workers
self.min_score = min_score
self.logger = logging.getLogger(__name__)
def get_all_stocks(self) -> List[str]:
"""
获取所有上市 A 股股票代码(全盘版)。
使用 ak.stock_info_sh_name_code(symbol="主板A股") 与 ak.stock_info_sz_name_code(symbol="A股列表")
候选字段列表为:['A股代码', '证券代码', '股票代码', 'code'] 。
"""
try:
sh_df = ak.stock_info_sh_name_code(symbol="主板A股")
sz_df = ak.stock_info_sz_name_code(symbol="A股列表")
candidate_cols = ['A股代码', '证券代码', '股票代码', 'code']
def get_codes(df: pd.DataFrame) -> set:
for col in candidate_cols:
if col in df.columns:
return {str(code).zfill(6) for code in df[col]}
raise KeyError(f"未能找到股票代码字段,现有字段:{df.columns.tolist()}")
sh_codes = get_codes(sh_df)
sz_codes = get_codes(sz_df)
all_codes = sorted(sh_codes | sz_codes)
self.logger.info(f"完整股票列表获取到 {len(all_codes)} 支股票信息")
print(f"\n开始分析 {len(all_codes)} 支股票...")
return all_codes
except Exception as e:
self.logger.error(f"获取股票列表失败:{str(e)}")
raise
def analyze_stock_safe(self, stock_code: str, max_retries: int = 3) -> Optional[Dict]:
"""
安全分析单只股票(加入重试机制),数据异常则跳过。
"""
for attempt in range(max_retries):
try:
return self.analyzer.analyze_stock(stock_code)
except ValueError as e:
self.logger.warning(f"跳过股票 {stock_code}: {str(e)}")
return None
except Exception as e:
if attempt == max_retries - 1:
self.logger.error(f"股票 {stock_code} 分析尝试 {max_retries} 次后失败:{str(e)}")
return None
self.logger.warning(f"股票 {stock_code}{attempt+1} 次分析失败:{str(e)}")
time.sleep(random.uniform(2, 5))
def process_batch(self, stock_codes: List[str]) -> List[Dict]:
"""利用多线程并行处理一批股票的分析任务"""
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self.analyze_stock_safe, code): code for code in stock_codes}
for future in tqdm(futures, desc="分析进度", ncols=80):
try:
result = future.result()
if result is not None:
results.append(result)
except Exception as e:
stock = futures[future]
self.logger.error(f"处理股票 {stock} 时出错:{str(e)}")
return results
def save_intermediate_results(self, results: List[Dict]) -> None:
"""周期性保存中间结果,便于后续查看进度"""
try:
df = pd.DataFrame(results)
high_score_stocks = df[df['score'] >= self.min_score].sort_values('score', ascending=False)
output_lines = [
"=" * 80,
f"股票扫描中间结果 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
f"共分析 {len(results)} 支股票",
"=" * 80,
f"\n发现 {len(high_score_stocks)} 支高分股票(得分≥{self.min_score}"
]
for _, row in high_score_stocks.iterrows():
output_lines.extend([
f"\n股票代码: {row['stock_code']}",
f"得分: {row['score']:.1f} | 价格: ¥{row['price']:.2f} | 涨跌幅: {row['price_change']:.2f}%"
])
os.makedirs('scanner', exist_ok=True)
with open('scanner/temp_results.txt', 'w', encoding='utf-8') as f:
f.write('\n'.join(output_lines))
except Exception as e:
self.logger.error(f"保存中间结果失败:{str(e)}")
def get_high_score_stocks(self, batch_size: int = 20) -> List[Dict]:
"""扫描全盘股票,返回高打分结果列表"""
try:
all_stocks = self.get_all_stocks()
total_stocks = len(all_stocks)
print(f"\n开始扫描 {total_stocks} 支股票……")
results = []
total_batches = (total_stocks + batch_size - 1) // batch_size
for i in range(0, total_stocks, batch_size):
batch_number = i // batch_size + 1
print(f"\r当前进度: 批次 {batch_number}/{total_batches}", end="")
batch = all_stocks[i:i + batch_size]
batch_results = self.process_batch(batch)
results.extend(batch_results)
if i + batch_size < total_stocks:
time.sleep(random.uniform(3, 5))
if results and ((len(results) % 100 == 0) or (i + batch_size >= total_stocks)):
self.save_intermediate_results(results)
print("\n扫描结束!")
if results:
df_results = pd.DataFrame(results)
high_score_stocks = df_results[df_results['score'] >= self.min_score].sort_values('score', ascending=False)
formatted_results = []
for _, row in high_score_stocks.iterrows():
formatted_results.append({
'股票代码': row['stock_code'],
'评分': f"{row['score']:.1f}",
'当前价格': f"¥{row['price']:.2f}",
'涨跌幅': f"{row['price_change']:.2f}%",
'RSI指标': f"{row['rsi']:.2f}",
'均线趋势': '上升' if row['ma_trend'] == 'UP' else '下降',
'MACD信号': '买入' if row['macd_signal'] == 'BUY' else '卖出',
'成交量状态': '放量' if row['volume_status'] == 'HIGH' else '正常',
'投资建议': row['recommendation']
})
return formatted_results
return []
except Exception as e:
self.logger.error(f"全盘扫描失败:{str(e)}")
raise
# -------------------------------
# **结果分组与报告生成**
# -------------------------------
def format_price_category(price: float) -> str:
"""将价格划分为区间(例如 32.5 -> '30-40'"""
base = (price // 10) * 10
return f"{int(base)}-{int(base+10)}"
def save_results_by_price(results: List[Dict]) -> None:
"""按价格区间保存分析结果至文件"""
try:
os.makedirs('scanner', exist_ok=True)
price_groups = {}
for stock in results:
price = float(stock['当前价格'].replace('¥', ''))
category = format_price_category(price)
price_groups.setdefault(category, []).append(stock)
for category, stocks in price_groups.items():
output_lines = [
"=" * 80,
f"股票分析结果 - 价格区间: {category}",
f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"=" * 80,
f"\n该区间共发现 {len(stocks)} 支高分股票得分≥85",
"-" * 80
]
stocks.sort(key=lambda x: float(x['评分']), reverse=True)
for i, stock in enumerate(stocks, 1):
output_lines.extend([
f"\n{i}. 股票代码: {stock['股票代码']}",
f" 评分: {stock['评分']} | 价格: {stock['当前价格']} | 涨跌幅: {stock['涨跌幅']}",
f" RSI指标: {stock['RSI指标']} | 均线趋势: {stock['均线趋势']} | MACD信号: {stock['MACD信号']}",
f" 成交量状态: {stock['成交量状态']}",
f" 投资建议: {stock['投资建议']}",
"-" * 80
])
output_lines.extend([
f"\n价格区间 {category}元 分析汇总:",
f"1. 股票数量: {len(stocks)}",
f"2. 平均评分: {np.mean([float(stock['评分']) for stock in stocks]):.1f}",
f"3. 买入信号股票数: {sum(1 for stock in stocks if stock['MACD信号'] == '买入')}",
f"4. 放量股票数: {sum(1 for stock in stocks if stock['成交量状态'] == '放量')}"
])
filename = f'scanner/price_{category.replace("-", "_")}.txt'
with open(filename, 'w', encoding='utf-8') as f:
f.write('\n'.join(output_lines))
create_summary_file(price_groups)
except Exception as e:
logging.error(f"保存结果时发生错误: {str(e)}")
raise
def create_summary_file(price_groups: Dict[str, List[Dict]]) -> None:
"""生成综合汇总报告"""
try:
output_lines = [
"=" * 80,
"A股市场优质股票筛选报告",
f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"=" * 80
]
total_stocks = sum(len(stocks) for stocks in price_groups.values())
all_scores = [float(stock['评分']) for stocks in price_groups.values() for stock in stocks]
output_lines.extend([
"\n整体统计:",
f"1. 共筛选出 {total_stocks} 支高分股票得分≥85",
f"2. 平均评分: {np.mean(all_scores):.1f}",
f"3. 最高评分: {max(all_scores):.1f}",
"\n各价格区间分布:",
"-" * 80
])
for category, stocks in sorted(price_groups.items(), key=lambda x: float(x[0].split('-')[0])):
output_lines.extend([
f"\n价格区间 {category}元:",
f" - 股票数量: {len(stocks)}",
f" - 平均评分: {np.mean([float(stock['评分']) for stock in stocks]):.1f}"
])
with open('scanner/summary.txt', 'w', encoding='utf-8') as f:
f.write('\n'.join(output_lines))
except Exception as e:
logging.error(f"生成汇总报告失败:{str(e)}")
raise
# -------------------------------
# **主程序入口**
# -------------------------------
def main():
"""程序主入口"""
print("\n" + "=" * 80)
print("Market-Wide High-Score Stock Scanner".center(76))
print("=" * 80)
scanner = TopStockScanner(max_workers=20) # 已提升至20线程
try:
print("\n开始全盘扫描股票……")
high_score_stocks = scanner.get_high_score_stocks(batch_size=20)
if not high_score_stocks:
print("\n未找到得分大于等于85分的股票。")
return
save_results_by_price(high_score_stocks)
print(f"\n分析完成!结果已保存至 scanner 文件夹中:")
print("1. 按价格区间保存的详细分析文件price_XX_YY.txt")
print("2. 汇总报告summary.txt")
temp_file = 'scanner/temp_results.txt'
if os.path.exists(temp_file):
os.remove(temp_file)
print("\n" + "=" * 80)
input("\n按Enter键退出……")
except Exception as e:
error_msg = f"\n程序错误:{str(e)}\n"
print("=" * 80)
print(error_msg)
print("=" * 80)
os.makedirs('scanner', exist_ok=True)
with open('scanner/error_log.txt', 'w', encoding='utf-8') as f:
f.write("Stock Analysis System Error Report\n")
f.write("=" * 80 + "\n")
f.write(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Error: {str(e)}\n")
f.write("=" * 80 + "\n")
f.write(f"详细堆栈信息:\n{traceback.format_exc()}")
print("错误日志已保存至 scanner/error_log.txt")
input("\n按Enter键退出……")
if __name__ == "__main__":
main()