马良AI写作初始化仓库

This commit is contained in:
邓滨杰
2025-09-10 00:07:52 +08:00
parent 3c06bb1a03
commit 39c0f8840f
1309 changed files with 318528 additions and 0 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,36 @@
/// API异常类
class ApiException implements Exception {
ApiException(this.statusCode, this.message);
final int statusCode;
final String message;
@override
String toString() => 'ApiException: $statusCode - $message';
}
/// 🚀 新增:积分不足异常
/// 当用户积分余额不足时抛出
class InsufficientCreditsException extends ApiException {
final int? requiredCredits;
InsufficientCreditsException(String message, [this.requiredCredits])
: super(402, message); // HTTP 402 Payment Required
/// 从错误消息中提取需要的积分数量
static int? extractRequiredCredits(String message) {
final regex = RegExp(r'需要 (\d+) 积分');
final match = regex.firstMatch(message);
if (match != null) {
return int.tryParse(match.group(1) ?? '');
}
return null;
}
/// 创建带有自动提取积分数量的实例
factory InsufficientCreditsException.fromMessage(String message) {
final requiredCredits = extractRequiredCredits(message);
return InsufficientCreditsException(message, requiredCredits);
}
}

View File

@@ -0,0 +1,470 @@
import 'dart:async';
import 'dart:convert';
import 'package:ainoval/config/app_config.dart';
import 'package:ainoval/services/api_service/base/api_exception.dart';
import 'package:ainoval/utils/logger.dart';
import 'package:flutter_client_sse/constants/sse_request_type_enum.dart';
import 'package:flutter_client_sse/flutter_client_sse.dart';
import 'package:flutter_client_sse/flutter_client_sse.dart' as flutter_sse;
/// A client specifically designed for handling Server-Sent Events (SSE).
///
/// Encapsulates connection details, authentication, and event parsing logic,
/// using the 'flutter_client_sse' package.
class _RetryState {
int errorCount;
DateTime firstErrorAt;
_RetryState({required this.errorCount, required this.firstErrorAt});
}
class SseClient {
// --------------- Singleton Pattern (Optional but common) ---------------
// Private constructor
SseClient._internal() : _baseUrl = AppConfig.apiBaseUrl;
// Factory constructor to return the instance
factory SseClient() {
return _instance;
}
final String _tag = 'SseClient';
final String _baseUrl;
// 存储活跃连接,以便于管理
final Map<String, StreamSubscription> _activeConnections = {};
final Map<String, _RetryState> _retryStates = {};
// Static instance
static final SseClient _instance = SseClient._internal();
// --------------- End Singleton Pattern ---------------
// Or a simple public constructor if singleton is not desired:
// SseClient() : _baseUrl = AppConfig.apiBaseUrl;
/// Connects to an SSE endpoint and streams parsed events of type [T].
///
/// Handles base URL construction, authentication, and event parsing using flutter_client_sse.
///
/// - [path]: The relative path to the SSE endpoint (e.g., '/novels/import/jobId/status').
/// - [parser]: A function that takes a JSON map and returns an object of type [T].
/// - [eventName]: (Optional) The specific SSE event name to listen for. Defaults to 'message'.
/// - [queryParams]: (Optional) Query parameters to add to the URL.
/// - [method]: The HTTP method (defaults to GET).
/// - [body]: The request body for POST requests.
/// - [connectionId]: Optional. An identifier for this connection. If not provided, a random ID will be generated.
/// - [timeout]: Optional. Timeout duration for the stream. If not provided, no timeout is applied.
Stream<T> streamEvents<T>({
required String path,
required T Function(Map<String, dynamic>) parser,
String? eventName = 'message', // Default event name to filter
Map<String, String>? queryParams,
SSERequestType method = SSERequestType.GET, // Default to GET
Map<String, dynamic>? body, // For POST requests
String? connectionId,
Duration? timeout,
}) {
final controller = StreamController<T>();
final cid = connectionId ?? 'conn_${DateTime.now().millisecondsSinceEpoch}_${_activeConnections.length}';
try {
// 1. Prepare URL
final fullPath = path.startsWith('/') ? path : '/$path';
final uri = Uri.parse('$_baseUrl$fullPath');
final urlWithParams = queryParams != null ? uri.replace(queryParameters: queryParams) : uri;
final urlString = urlWithParams.toString(); // flutter_client_sse uses String URL
AppLogger.i(_tag, '[SSE] Connecting via ${method.name} to endpoint: $urlString');
// 针对设定生成等POST流若发生错误/完成,需全局取消以阻止插件自动重连
final bool shouldGlobalUnsubscribe = method == SSERequestType.POST && fullPath.contains('/setting-generation');
final String retryKey = '${method.name}:$fullPath';
// 冷却窗口1分钟内达到阈值则熔断
const int maxRetries = 3;
const Duration retryWindow = Duration(minutes: 1);
void _resetRetryIfWindowPassed() {
final existing = _retryStates[retryKey];
if (existing != null) {
if (DateTime.now().difference(existing.firstErrorAt) > retryWindow) {
_retryStates.remove(retryKey);
}
}
}
_resetRetryIfWindowPassed();
// 2. Prepare Headers & Authentication
final authToken = AppConfig.authToken;
final headers = {
// Accept and Cache-Control might be added automatically by the package,
// but explicitly adding them is safer.
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
// Add content-type if needed for POST
if (method == SSERequestType.POST && body != null)
'Content-Type': 'application/json',
};
// 🔧 修复在开发环境中允许无token连接生产环境中仍要求token
if (authToken != null) {
headers['Authorization'] = 'Bearer $authToken';
AppLogger.d(_tag, '[SSE] Added Authorization header');
} else if (AppConfig.environment == Environment.production) {
AppLogger.e(_tag, '[SSE] Auth token is null in production environment');
throw ApiException(401, 'Authentication token is missing');
} else {
AppLogger.w(_tag, '[SSE] Warning: No auth token in development environment, proceeding without Authorization header');
}
// 🔧 新增添加用户ID头部与API客户端保持一致
final userId = AppConfig.userId;
if (userId != null) {
headers['X-User-Id'] = userId;
AppLogger.d(_tag, '[SSE] Added X-User-Id header: $userId');
} else {
AppLogger.w(_tag, '[SSE] Warning: X-User-Id header not set (userId is null)');
}
AppLogger.d(_tag, '[SSE] Headers: $headers');
if (body != null) {
AppLogger.d(_tag, '[SSE] Body: $body');
}
// 3. Subscribe using flutter_client_sse
// This method directly returns the stream subscription management is handled internally.
// We listen to it and push data/errors into our controller.
late StreamSubscription sseSubscription; // 预声明变量
sseSubscription = SSEClient.subscribeToSSE(
method: method,
url: urlString,
header: headers,
body: body,
).listen(
(event) {
//TODO调试
//AppLogger.v(_tag, '[SSE] Raw Event: ID=${event.id}, Event=${event.event}, Data=${event.data}');
// 处理心跳消息
if (event.id != null && event.id!.startsWith('heartbeat-')) {
//AppLogger.v(_tag, '[SSE] 收到心跳消息: ${event.id}');
return; // 跳过心跳处理
}
// Determine event name (treat null/empty as 'message')
final currentEventName = (event.event == null || event.event!.isEmpty) ? 'message' : event.event;
// 处理complete事件 - 这是流式生成结束的标志
if (currentEventName == 'complete') {
AppLogger.i(_tag, '[SSE] 收到complete事件表示流式生成已完成');
// 🚀 修复:发送结束信号给下游,而不是直接关闭
try {
final json = jsonDecode(event.data ?? '{}');
if (json is Map<String, dynamic> && json.containsKey('data') && json['data'] == '[DONE]') {
AppLogger.i(_tag, '[SSE] 收到[DONE]标记,发送结束信号给下游');
// 🚀 发送一个带有finishReason的结束信号
final endSignal = {
'id': 'stream_end_${DateTime.now().millisecondsSinceEpoch}',
'content': '',
'finishReason': 'stop',
'isComplete': true,
};
final parsedEndSignal = parser(endSignal);
if (!controller.isClosed) {
controller.add(parsedEndSignal);
// 先主动取消底层连接,避免插件层自动重连
try { sseSubscription.cancel(); } catch (_) {}
_activeConnections.remove(cid);
if (shouldGlobalUnsubscribe) {
try { flutter_sse.SSEClient.unsubscribeFromSSE(); } catch (_) {}
}
// 延迟关闭,确保下游能收到结束信号
Future.delayed(const Duration(milliseconds: 100), () {
if (!controller.isClosed) {
controller.close();
}
});
}
return;
}
} catch (e) {
AppLogger.e(_tag, '[SSE] 解析complete事件数据失败', e);
}
// 🚀 如果解析失败,也要发送结束信号
try {
final endSignal = {
'id': 'stream_end_${DateTime.now().millisecondsSinceEpoch}',
'content': '',
'finishReason': 'stop',
'isComplete': true,
};
final parsedEndSignal = parser(endSignal);
if (!controller.isClosed) {
controller.add(parsedEndSignal);
try { sseSubscription.cancel(); } catch (_) {}
_activeConnections.remove(cid);
if (shouldGlobalUnsubscribe) {
try { flutter_sse.SSEClient.unsubscribeFromSSE(); } catch (_) {}
}
Future.delayed(const Duration(milliseconds: 100), () {
if (!controller.isClosed) {
controller.close();
}
});
}
} catch (parseError) {
AppLogger.e(_tag, '[SSE] 发送结束信号失败', parseError);
if (!controller.isClosed) {
controller.close();
}
}
return; // 无论如何都跳过complete事件的后续处理
}
// Filter by expected event name
if (eventName != null && currentEventName != eventName) {
//AppLogger.v(_tag, '[SSE] Skipping event name: $currentEventName (Expected: $eventName)');
return; // Skip this event
}
final data = event.data;
if (data == null || data.isEmpty || data == '[DONE]') {
//AppLogger.v(_tag, '[SSE] Skipping empty or [DONE] data.');
return; // Skip this event
}
// 检查特殊结束标记 "}"
if (data == '}' || data.trim() == '}') {
AppLogger.i(_tag, '[SSE] 检测到特殊结束标记 "}",关闭流');
try { sseSubscription.cancel(); } catch (_) {}
_activeConnections.remove(cid);
if (shouldGlobalUnsubscribe) {
try { flutter_sse.SSEClient.unsubscribeFromSSE(); } catch (_) {}
}
if (!controller.isClosed) {
controller.close();
}
return;
}
// Parse data
try {
final json = jsonDecode(data);
if (json is Map<String, dynamic>) {
// 检查JSON对象中是否包含特殊结束标记
if (json['content'] == '}' ||
(json['finishReason'] != null && json['finishReason'].toString().isNotEmpty)) {
AppLogger.i(_tag, '[SSE] 检测到JSON中的结束标记: content="${json['content']}", finishReason=${json['finishReason']}');
try { sseSubscription.cancel(); } catch (_) {}
_activeConnections.remove(cid);
if (shouldGlobalUnsubscribe) {
try { flutter_sse.SSEClient.unsubscribeFromSSE(); } catch (_) {}
}
if (!controller.isClosed) {
controller.close();
}
return;
}
final parsedData = parser(json);
//AppLogger.v(_tag, '[SSE] Parsed data for event \'$currentEventName\': $parsedData');
if (!controller.isClosed) {
controller.add(parsedData); // Add parsed data to our stream
}
} else {
AppLogger.w(_tag, '[SSE] Event data is not a JSON object: $data');
}
} catch (e, stack) {
AppLogger.e(_tag, '[SSE] Failed to parse JSON data: $data', e, stack);
if (!controller.isClosed) {
// 🚀 修复:保持原始异常类型,特别是 InsufficientCreditsException
if (e is InsufficientCreditsException) {
AppLogger.w(_tag, '[SSE] 保持积分不足异常类型不变');
controller.addError(e, stack);
} else {
// Report parsing errors through the stream
controller.addError(ApiException(-1, 'Failed to parse SSE data: $e'), stack);
}
}
}
},
onError: (error, stackTrace) {
AppLogger.e(_tag, '[SSE] Stream error received', error, stackTrace);
// 🔧 新增:检查是否为不可恢复的网络错误 & 对 POST 端点设置最多重试3次
final bool isPostMethod = method == SSERequestType.POST;
bool shouldStopRetry;
if (isPostMethod && shouldGlobalUnsubscribe) {
_resetRetryIfWindowPassed();
final current = _retryStates[retryKey] ?? _RetryState(errorCount: 0, firstErrorAt: DateTime.now());
current.errorCount += 1;
_retryStates[retryKey] = current;
AppLogger.w(_tag, '[SSE] ${retryKey} 错误次数: ${current.errorCount}');
shouldStopRetry = current.errorCount >= maxRetries || _shouldStopRetryOnError(error);
} else {
shouldStopRetry = _shouldStopRetryOnError(error);
}
if (shouldStopRetry) {
AppLogger.w(_tag, '[SSE] 检测到不可恢复的网络错误,停止重试: $error');
// 取消订阅以停止自动重试
sseSubscription.cancel();
if (shouldGlobalUnsubscribe) {
try { flutter_sse.SSEClient.unsubscribeFromSSE(); } catch (_) {}
}
}
if (!controller.isClosed) {
// Convert to ApiException for consistency
controller.addError(ApiException(-1, 'SSE stream error: $error'), stackTrace);
// 仅在停止重试时才关闭下游,允许在窗口内继续尝试
if (shouldStopRetry) {
controller.close();
}
}
// 移除连接
_activeConnections.remove(cid);
},
onDone: () {
AppLogger.i(_tag, '[SSE] Stream finished (onDone received).');
if (!controller.isClosed) {
controller.close(); // Close controller when the source stream is done
}
// 移除连接
_activeConnections.remove(cid);
},
);
// 保存此连接以便于后续管理
_activeConnections[cid] = sseSubscription;
AppLogger.i(_tag, '[SSE] Connection $cid has been registered. Active connections: ${_activeConnections.length}');
// Handle cancellation of the downstream listener
controller.onCancel = () {
AppLogger.i(_tag, '[SSE] Downstream listener cancelled. Cancelling SSE subscription for connection $cid.');
sseSubscription.cancel();
// 移除连接
_activeConnections.remove(cid);
if (shouldGlobalUnsubscribe) {
try { flutter_sse.SSEClient.unsubscribeFromSSE(); } catch (_) {}
}
// Ensure controller is closed if not already
if (!controller.isClosed) {
controller.close();
}
};
} catch (e, stack) {
// Catch synchronous errors during setup (e.g., URI parsing, initial auth check)
AppLogger.e(_tag, '[SSE] Setup Error', e, stack);
controller.addError(
e is ApiException ? e : ApiException(-1, 'SSE setup failed: $e'), stack);
controller.close();
}
// 应用超时(如果指定)
if (timeout != null) {
return controller.stream.timeout(
timeout,
onTimeout: (sink) {
AppLogger.w(_tag, '[SSE] Stream timeout after ${timeout.inSeconds} seconds for connection $cid');
// 主动取消SSE连接
cancelConnection(cid);
// 发送超时错误
sink.addError(
ApiException(-1, 'SSE stream timeout after ${timeout.inSeconds} seconds'),
StackTrace.current,
);
sink.close();
},
);
} else {
return controller.stream;
}
}
/// 取消特定连接
///
/// - [connectionId]: The ID of the connection to cancel
/// - 返回: True if connection was found and cancelled, false otherwise
Future<bool> cancelConnection(String connectionId) async {
final connection = _activeConnections[connectionId];
if (connection != null) {
AppLogger.i(_tag, '[SSE] Manually cancelling connection $connectionId');
await connection.cancel();
_activeConnections.remove(connectionId);
return true;
}
AppLogger.w(_tag, '[SSE] Connection $connectionId not found or already closed');
return false;
}
/// 取消所有活跃连接
Future<void> cancelAllConnections() async {
AppLogger.i(_tag, '[SSE] Cancelling all active connections (count: ${_activeConnections.length})');
// 创建一个连接ID列表以避免在迭代过程中修改集合
final connectionIds = _activeConnections.keys.toList();
for (final id in connectionIds) {
try {
final connection = _activeConnections[id];
if (connection != null) {
await connection.cancel();
_activeConnections.remove(id);
AppLogger.d(_tag, '[SSE] Cancelled connection $id');
}
} catch (e) {
AppLogger.e(_tag, '[SSE] Error cancelling connection $id', e);
}
}
AppLogger.i(_tag, '[SSE] All connections cancelled. Remaining: ${_activeConnections.length}');
}
/// 获取活跃连接数
int get activeConnectionCount => _activeConnections.length;
/// 检查是否应该因为特定错误而停止重试
///
/// 规则:
/// - POST 方法:一律不重试(避免 /start 在后端重启后被重复触发)
/// - ClientException: Failed to fetch - 服务器不可达,停止重试
/// - ClientException: network error - 也停止重试(后端重启期间常见,避免刷屏与重复日志)
/// - 连接拒绝/重置/关闭、502/503/404停止重试
/// - 其他错误类型继续重试
bool _shouldStopRetryOnError(dynamic error) {
final errorString = error.toString().toLowerCase();
// 检查特定的错误模式
if (errorString.contains('clientexception') && errorString.contains('failed to fetch')) {
AppLogger.i(_tag, '[SSE] 检测到 "Failed to fetch" 错误,判定为服务器不可达');
return true;
}
if (errorString.contains('clientexception') && errorString.contains('network error')) {
AppLogger.i(_tag, '[SSE] 检测到通用network error停止重试以避免后端重启期间重复请求');
return true;
}
// 检查连接被拒绝的错误
if (errorString.contains('connection refused') ||
errorString.contains('connection reset') ||
errorString.contains('connection closed')) {
AppLogger.i(_tag, '[SSE] 检测到连接被拒绝/重置/关闭,判定为服务器不可达');
return true;
}
// 检查 HTTP 404、503 等明确的服务错误
if (errorString.contains('404') || errorString.contains('503') || errorString.contains('502')) {
AppLogger.i(_tag, '[SSE] 检测到 HTTP 服务错误,判定为服务器不可达');
return true;
}
// 其他错误继续重试(如临时网络波动)
AppLogger.d(_tag, '[SSE] 错误类型允许重试: $error');
return false;
}
}