24 KiB
24 KiB
打飞机小程序 WebSocket 接口规范与断线重连策略
1. WebSocket 连接规范
1.1 连接地址
wss://your-domain.com/game-hub?token={authToken}&gameId={gameId}
1.2 .NET Core WebSocket Hub 实现参考
[Authorize]
public class GameHub : Hub
{
private readonly IGameService _gameService;
private readonly IConnectionManager _connectionManager;
public GameHub(IGameService gameService, IConnectionManager connectionManager)
{
_gameService = gameService;
_connectionManager = connectionManager;
}
public override async Task OnConnectedAsync()
{
var userId = Context.UserIdentifier;
var gameId = Context.GetHttpContext().Request.Query["gameId"];
await Groups.AddToGroupAsync(Context.ConnectionId, $"Game_{gameId}");
await _connectionManager.RegisterConnection(userId, Context.ConnectionId, gameId);
// 发送连接成功确认
await Clients.Caller.SendAsync("ConnectionConfirmed", new
{
ConnectionId = Context.ConnectionId,
Timestamp = DateTime.UtcNow,
GameId = gameId
});
await base.OnConnectedAsync();
}
public override async Task OnDisconnectedAsync(Exception exception)
{
var userId = Context.UserIdentifier;
await _connectionManager.UnregisterConnection(Context.ConnectionId);
// 通知游戏中的其他玩家
await Clients.Others.SendAsync("PlayerDisconnected", new
{
UserId = userId,
Timestamp = DateTime.UtcNow,
Reason = exception?.Message
});
await base.OnDisconnectedAsync(exception);
}
}
2. 消息协议定义
2.1 基础消息格式
interface WebSocketMessage {
messageId: string // 消息唯一ID,用于确认和重发
type: MessageType // 消息类型
timestamp: number // 时间戳
gameId: string // 游戏ID
userId: string // 发送者ID
data: any // 消息内容
sequenceNumber: number // 序列号,用于消息排序
requiresAck?: boolean // 是否需要确认回复
}
enum MessageType {
// 连接管理
PING = 'PING',
PONG = 'PONG',
HEARTBEAT = 'HEARTBEAT',
CONNECTION_CONFIRMED = 'CONNECTION_CONFIRMED',
// 游戏房间
JOIN_ROOM = 'JOIN_ROOM',
LEAVE_ROOM = 'LEAVE_ROOM',
ROOM_STATE_UPDATE = 'ROOM_STATE_UPDATE',
// 游戏操作
PLACE_PLANES = 'PLACE_PLANES',
ATTACK_POSITION = 'ATTACK_POSITION',
ATTACK_RESULT = 'ATTACK_RESULT',
GAME_STATE_SYNC = 'GAME_STATE_SYNC',
// 系统消息
PLAYER_RECONNECT = 'PLAYER_RECONNECT',
PLAYER_DISCONNECT = 'PLAYER_DISCONNECT',
GAME_END = 'GAME_END',
ERROR = 'ERROR',
// 确认消息
ACK = 'ACK',
MESSAGE_RECEIVED = 'MESSAGE_RECEIVED'
}
2.2 关键消息定义
A. 游戏操作消息
// .NET 消息模型
public class AttackMessage
{
public string MessageId { get; set; } = Guid.NewGuid().ToString();
public string Type { get; set; } = "ATTACK_POSITION";
public long Timestamp { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
public string GameId { get; set; }
public string UserId { get; set; }
public AttackData Data { get; set; }
public int SequenceNumber { get; set; }
public bool RequiresAck { get; set; } = true;
}
public class AttackData
{
public Position Position { get; set; }
public string PlayerId { get; set; }
}
public class Position
{
public int X { get; set; }
public int Y { get; set; }
public string Coordinate { get; set; }
}
B. 游戏状态同步
public class GameStateSyncMessage
{
public string MessageId { get; set; } = Guid.NewGuid().ToString();
public string Type { get; set; } = "GAME_STATE_SYNC";
public long Timestamp { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
public string GameId { get; set; }
public GameStateData Data { get; set; }
public int SequenceNumber { get; set; }
}
public class GameStateData
{
public string CurrentPlayer { get; set; }
public string GamePhase { get; set; }
public List<GameMove> MoveHistory { get; set; }
public Dictionary<string, PlayerState> Players { get; set; }
public long LastUpdateTime { get; set; }
}
3. .NET WebSocket Hub 核心方法
3.1 游戏操作处理
[HubMethodName("AttackPosition")]
public async Task HandleAttack(AttackMessage message)
{
try
{
// 验证消息
if (!await ValidateMessage(message))
{
await SendError("Invalid attack message", message.MessageId);
return;
}
// 处理攻击逻辑
var result = await _gameService.ProcessAttack(
message.GameId,
message.UserId,
message.Data.Position
);
// 发送攻击结果给所有玩家
var resultMessage = new AttackResultMessage
{
MessageId = Guid.NewGuid().ToString(),
GameId = message.GameId,
Data = result,
SequenceNumber = await GetNextSequenceNumber(message.GameId)
};
await Clients.Group($"Game_{message.GameId}")
.SendAsync("AttackResult", resultMessage);
// 发送确认回复
if (message.RequiresAck)
{
await Clients.Caller.SendAsync("MessageReceived", new
{
OriginalMessageId = message.MessageId,
Status = "SUCCESS",
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
});
}
// 检查游戏是否结束
if (result.GameEnded)
{
await HandleGameEnd(message.GameId, result.Winner);
}
}
catch (Exception ex)
{
await SendError($"Attack processing failed: {ex.Message}", message.MessageId);
}
}
[HubMethodName("PlacePlanes")]
public async Task HandlePlacePlanes(PlacePlanesMessage message)
{
try
{
var isValid = await _gameService.ValidatePlacement(
message.GameId,
message.UserId,
message.Data.Planes
);
if (!isValid.Success)
{
await SendError(isValid.ErrorMessage, message.MessageId);
return;
}
await _gameService.SavePlacement(message.GameId, message.UserId, message.Data.Planes);
// 通知房间状态更新
var roomState = await _gameService.GetRoomState(message.GameId);
await Clients.Group($"Game_{message.GameId}")
.SendAsync("RoomStateUpdate", roomState);
// 确认消息
if (message.RequiresAck)
{
await Clients.Caller.SendAsync("MessageReceived", new
{
OriginalMessageId = message.MessageId,
Status = "SUCCESS"
});
}
}
catch (Exception ex)
{
await SendError($"Plane placement failed: {ex.Message}", message.MessageId);
}
}
3.2 心跳和连接管理
[HubMethodName("Heartbeat")]
public async Task HandleHeartbeat(HeartbeatMessage message)
{
await _connectionManager.UpdateLastActivity(Context.ConnectionId);
// 发送心跳响应
await Clients.Caller.SendAsync("HeartbeatResponse", new
{
ServerTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
ConnectionId = Context.ConnectionId,
Status = "ALIVE"
});
}
[HubMethodName("RequestGameState")]
public async Task HandleGameStateRequest(string gameId)
{
var gameState = await _gameService.GetCompleteGameState(gameId);
await Clients.Caller.SendAsync("GameStateSync", new GameStateSyncMessage
{
GameId = gameId,
Data = gameState,
SequenceNumber = await GetNextSequenceNumber(gameId)
});
}
4. 客户端断线重连策略
4.1 连接管理器
export class WebSocketManager {
private connection: HubConnection | null = null
private reconnectAttempts = 0
private readonly maxReconnectAttempts = 5
private readonly baseReconnectDelay = 1000
private heartbeatInterval: NodeJS.Timeout | null = null
private messageQueue: QueuedMessage[] = []
private sequenceNumber = 0
private lastReceivedSequence = 0
constructor(
private gameId: string,
private authToken: string,
private onMessage: (message: WebSocketMessage) => void,
private onConnectionStateChange: (state: ConnectionState) => void
) {}
async connect(): Promise<void> {
try {
this.connection = new HubConnectionBuilder()
.withUrl(`wss://your-domain.com/game-hub?gameId=${this.gameId}`, {
accessTokenFactory: () => this.authToken,
transport: HttpTransportType.WebSockets
})
.withAutomaticReconnect({
nextRetryDelayInMilliseconds: (retryContext) => {
// 指数退避策略
return Math.min(
this.baseReconnectDelay * Math.pow(2, retryContext.previousRetryCount),
30000
)
}
})
.configureLogging(LogLevel.Information)
.build()
// 注册消息处理器
this.setupMessageHandlers()
// 开始连接
await this.connection.start()
this.onConnectionStateChange('CONNECTED')
this.startHeartbeat()
this.reconnectAttempts = 0
// 重连后请求完整游戏状态
await this.requestGameState()
} catch (error) {
console.error('WebSocket connection failed:', error)
await this.handleConnectionError()
}
}
private setupMessageHandlers(): void {
if (!this.connection) return
// 连接确认
this.connection.on('ConnectionConfirmed', (data) => {
console.log('Connection confirmed:', data)
this.onConnectionStateChange('CONNECTED')
})
// 攻击结果
this.connection.on('AttackResult', (message: WebSocketMessage) => {
this.handleReceivedMessage(message)
})
// 游戏状态同步
this.connection.on('GameStateSync', (message: WebSocketMessage) => {
this.handleReceivedMessage(message)
this.syncSequenceNumber(message.sequenceNumber)
})
// 房间状态更新
this.connection.on('RoomStateUpdate', (message: WebSocketMessage) => {
this.handleReceivedMessage(message)
})
// 玩家断线/重连
this.connection.on('PlayerDisconnected', (data) => {
this.onMessage({
messageId: Date.now().toString(),
type: 'PLAYER_DISCONNECT',
timestamp: Date.now(),
gameId: this.gameId,
userId: data.userId,
data,
sequenceNumber: 0
})
})
this.connection.on('PlayerReconnected', (data) => {
this.onMessage({
messageId: Date.now().toString(),
type: 'PLAYER_RECONNECT',
timestamp: Date.now(),
gameId: this.gameId,
userId: data.userId,
data,
sequenceNumber: 0
})
})
// 消息确认
this.connection.on('MessageReceived', (ack) => {
this.handleMessageAck(ack.originalMessageId)
})
// 心跳响应
this.connection.on('HeartbeatResponse', (data) => {
// 计算网络延迟
const now = Date.now()
const latency = now - (data.clientTimestamp || now)
console.log(`Network latency: ${latency}ms`)
})
// 错误处理
this.connection.on('Error', (error) => {
console.error('Server error:', error)
this.onMessage({
messageId: Date.now().toString(),
type: 'ERROR',
timestamp: Date.now(),
gameId: this.gameId,
userId: '',
data: error,
sequenceNumber: 0
})
})
// 连接状态变化
this.connection.onreconnecting(() => {
this.onConnectionStateChange('RECONNECTING')
this.stopHeartbeat()
})
this.connection.onreconnected(() => {
this.onConnectionStateChange('CONNECTED')
this.startHeartbeat()
this.resendQueuedMessages()
this.requestGameState() // 重连后同步状态
})
this.connection.onclose(() => {
this.onConnectionStateChange('DISCONNECTED')
this.stopHeartbeat()
})
}
private handleReceivedMessage(message: WebSocketMessage): void {
// 检查消息序列号,处理消息丢失
if (message.sequenceNumber && message.sequenceNumber > this.lastReceivedSequence + 1) {
console.warn(`Message sequence gap detected: expected ${this.lastReceivedSequence + 1}, got ${message.sequenceNumber}`)
// 请求缺失的消息或完整状态同步
this.requestGameState()
}
this.lastReceivedSequence = Math.max(this.lastReceivedSequence, message.sequenceNumber || 0)
this.onMessage(message)
}
async sendMessage(type: MessageType, data: any, requiresAck: boolean = true): Promise<void> {
const message: WebSocketMessage = {
messageId: this.generateMessageId(),
type,
timestamp: Date.now(),
gameId: this.gameId,
userId: this.getCurrentUserId(),
data,
sequenceNumber: ++this.sequenceNumber,
requiresAck
}
if (this.connection?.state === 'Connected') {
try {
await this.sendDirectMessage(message)
// 如果需要确认,加入等待队列
if (requiresAck) {
this.messageQueue.push({
message,
attempts: 0,
timestamp: Date.now()
})
}
} catch (error) {
console.error('Send message failed:', error)
this.queueMessage(message)
}
} else {
this.queueMessage(message)
}
}
private async sendDirectMessage(message: WebSocketMessage): Promise<void> {
switch (message.type) {
case 'ATTACK_POSITION':
await this.connection!.invoke('AttackPosition', message)
break
case 'PLACE_PLANES':
await this.connection!.invoke('PlacePlanes', message)
break
case 'HEARTBEAT':
await this.connection!.invoke('Heartbeat', message)
break
default:
console.warn(`Unknown message type: ${message.type}`)
}
}
private startHeartbeat(): void {
this.heartbeatInterval = setInterval(async () => {
try {
await this.sendMessage('HEARTBEAT', {
clientTimestamp: Date.now()
}, false)
} catch (error) {
console.error('Heartbeat failed:', error)
}
}, 10000) // 10秒心跳
}
private stopHeartbeat(): void {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval)
this.heartbeatInterval = null
}
}
private async requestGameState(): Promise<void> {
if (this.connection?.state === 'Connected') {
try {
await this.connection.invoke('RequestGameState', this.gameId)
} catch (error) {
console.error('Request game state failed:', error)
}
}
}
private queueMessage(message: WebSocketMessage): void {
this.messageQueue.push({
message,
attempts: 0,
timestamp: Date.now()
})
}
private async resendQueuedMessages(): Promise<void> {
const pendingMessages = [...this.messageQueue]
this.messageQueue = []
for (const queuedMessage of pendingMessages) {
if (queuedMessage.attempts < 3) { // 最多重试3次
try {
await this.sendDirectMessage(queuedMessage.message)
queuedMessage.attempts++
if (queuedMessage.message.requiresAck) {
this.messageQueue.push(queuedMessage)
}
} catch (error) {
console.error('Resend message failed:', error)
queuedMessage.attempts++
if (queuedMessage.attempts < 3) {
this.messageQueue.push(queuedMessage)
}
}
}
}
}
private handleMessageAck(messageId: string): void {
this.messageQueue = this.messageQueue.filter(
queuedMessage => queuedMessage.message.messageId !== messageId
)
}
private generateMessageId(): string {
return `${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
}
private getCurrentUserId(): string {
// 从认证token或存储中获取用户ID
return 'current-user-id' // 实现细节
}
async disconnect(): Promise<void> {
this.stopHeartbeat()
if (this.connection) {
await this.connection.stop()
}
this.onConnectionStateChange('DISCONNECTED')
}
}
interface QueuedMessage {
message: WebSocketMessage
attempts: number
timestamp: number
}
type ConnectionState = 'CONNECTING' | 'CONNECTED' | 'RECONNECTING' | 'DISCONNECTED'
5. 关键 .NET 服务实现
5.1 连接管理服务
public interface IConnectionManager
{
Task RegisterConnection(string userId, string connectionId, string gameId);
Task UnregisterConnection(string connectionId);
Task UpdateLastActivity(string connectionId);
Task<List<string>> GetUserConnections(string userId);
Task<bool> IsUserOnline(string userId);
}
public class ConnectionManager : IConnectionManager
{
private readonly IMemoryCache _cache;
private readonly ILogger<ConnectionManager> _logger;
public ConnectionManager(IMemoryCache cache, ILogger<ConnectionManager> logger)
{
_cache = cache;
_logger = logger;
}
public async Task RegisterConnection(string userId, string connectionId, string gameId)
{
var connectionInfo = new ConnectionInfo
{
UserId = userId,
ConnectionId = connectionId,
GameId = gameId,
ConnectedAt = DateTime.UtcNow,
LastActivity = DateTime.UtcNow
};
_cache.Set($"connection_{connectionId}", connectionInfo, TimeSpan.FromHours(2));
// 更新用户连接列表
var userConnections = await GetUserConnections(userId);
userConnections.Add(connectionId);
_cache.Set($"user_connections_{userId}", userConnections, TimeSpan.FromHours(2));
_logger.LogInformation($"User {userId} connected with connection {connectionId}");
}
public async Task UnregisterConnection(string connectionId)
{
if (_cache.TryGetValue($"connection_{connectionId}", out ConnectionInfo connectionInfo))
{
_cache.Remove($"connection_{connectionId}");
var userConnections = await GetUserConnections(connectionInfo.UserId);
userConnections.Remove(connectionId);
_cache.Set($"user_connections_{connectionInfo.UserId}", userConnections);
_logger.LogInformation($"Connection {connectionId} unregistered");
}
}
public async Task UpdateLastActivity(string connectionId)
{
if (_cache.TryGetValue($"connection_{connectionId}", out ConnectionInfo connectionInfo))
{
connectionInfo.LastActivity = DateTime.UtcNow;
_cache.Set($"connection_{connectionId}", connectionInfo, TimeSpan.FromHours(2));
}
}
public async Task<List<string>> GetUserConnections(string userId)
{
if (_cache.TryGetValue($"user_connections_{userId}", out List<string> connections))
{
return connections;
}
return new List<string>();
}
public async Task<bool> IsUserOnline(string userId)
{
var connections = await GetUserConnections(userId);
return connections.Any();
}
}
public class ConnectionInfo
{
public string UserId { get; set; }
public string ConnectionId { get; set; }
public string GameId { get; set; }
public DateTime ConnectedAt { get; set; }
public DateTime LastActivity { get; set; }
}
5.2 消息确认和重发机制
public class MessageReliabilityService
{
private readonly IMemoryCache _cache;
private readonly IHubContext<GameHub> _hubContext;
public async Task SendReliableMessage(string connectionId, string method, object data)
{
var messageId = Guid.NewGuid().ToString();
var message = new ReliableMessage
{
MessageId = messageId,
Method = method,
Data = data,
SentAt = DateTime.UtcNow,
Attempts = 0
};
// 存储消息等待确认
_cache.Set($"pending_message_{messageId}", message, TimeSpan.FromMinutes(5));
// 发送消息
await _hubContext.Clients.Client(connectionId).SendAsync(method, data);
// 设置超时重发
_ = Task.Delay(TimeSpan.FromSeconds(5)).ContinueWith(async _ =>
{
await CheckAndResendMessage(messageId, connectionId);
});
}
public async Task ConfirmMessage(string messageId)
{
_cache.Remove($"pending_message_{messageId}");
}
private async Task CheckAndResendMessage(string messageId, string connectionId)
{
if (_cache.TryGetValue($"pending_message_{messageId}", out ReliableMessage message))
{
message.Attempts++;
if (message.Attempts < 3) // 最多重试3次
{
await _hubContext.Clients.Client(connectionId).SendAsync(message.Method, message.Data);
// 更新缓存
_cache.Set($"pending_message_{messageId}", message, TimeSpan.FromMinutes(5));
// 再次设置超时
_ = Task.Delay(TimeSpan.FromSeconds(5)).ContinueWith(async _ =>
{
await CheckAndResendMessage(messageId, connectionId);
});
}
else
{
// 超过重试次数,移除消息
_cache.Remove($"pending_message_{messageId}");
}
}
}
}
public class ReliableMessage
{
public string MessageId { get; set; }
public string Method { get; set; }
public object Data { get; set; }
public DateTime SentAt { get; set; }
public int Attempts { get; set; }
}
6. Startup.cs 配置
public void ConfigureServices(IServiceCollection services)
{
// SignalR配置
services.AddSignalR(options =>
{
options.EnableDetailedErrors = true;
options.KeepAliveInterval = TimeSpan.FromSeconds(10);
options.ClientTimeoutInterval = TimeSpan.FromSeconds(30);
options.HandshakeTimeout = TimeSpan.FromSeconds(15);
}).AddJsonProtocol(options =>
{
options.PayloadSerializerOptions.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
});
// CORS配置
services.AddCors(options =>
{
options.AddDefaultPolicy(builder =>
{
builder
.WithOrigins("https://your-miniprogram-domain.com")
.AllowAnyHeader()
.AllowAnyMethod()
.AllowCredentials();
});
});
// 服务注册
services.AddScoped<IGameService, GameService>();
services.AddSingleton<IConnectionManager, ConnectionManager>();
services.AddSingleton<MessageReliabilityService>();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
app.UseCors();
app.UseAuthentication();
app.UseAuthorization();
// SignalR Hub路由
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapHub<GameHub>("/game-hub");
endpoints.MapControllers();
});
}
这个WebSocket接口规范提供了完整的断线重连策略,包括指数退避重连、消息队列管理、心跳机制、状态同步等关键功能。.NET后端通过SignalR提供了可靠的WebSocket服务,支持自动重连和消息确认机制。