27.网络通信-套接字Socket-TCP通信-同步-心跳消息-实现
27.1 知识点
什么是心跳消息
所谓心跳消息,就是在长连接中,客户端和服务端之间定期发送的一种特殊的数据包,用于通知对方自己还在线,以确保长连接的有效性。由于其发送的时间间隔往往是固定的持续的,就像是心跳一样一直存在,所以我们称之为心跳消息。
为什么需要心跳消息
避免非正常关闭客户端时,服务器无法正常收到关闭连接消息。通过心跳消息我们可以自定义超时判断,如果超时没有收到客户端消息,证明客户端已经断开连接。
避免客户端长期不发送消息,防火墙或者路由器会断开连接,我们可以通过心跳消息一直保持活跃状态。
实现心跳消息
主要功能
客户端:主要功能是定时发送消息。
服务器端:主要功能是不停检测上次收到某客户端消息的时间,如果超时则认为连接已经断开。
定义心跳消息类
public class HeartMessage : BaseMessage
{
public override int GetBytesNum()
{
// 需要存储 ID 和长度,两个 int,共 8 字节
return 4 + 4;
}
public override int Reading(byte[] bytes, int beginIndex = 0)
{
// 没有消息体,0 字节
return 0;
}
public override byte[] Writing()
{
// 写入 ID 和长度到字节数组并返回
int index = 0;
byte[] bytes = new byte[GetBytesNum()];
WriteInt(bytes, GetID(), ref index);
WriteInt(bytes, 0, ref index);
return bytes;
}
public override int GetID()
{
return 999;
}
}
TcpNetManager实现连接时每隔一段时间发送心跳消息
// 发送心跳消息的间隔时间
private int SEND_HEART_MSG_TIME = 2;
private HeartMessage heartMessage = new HeartMessage();
protected override void Awake()
{
base.Awake();
// 客户端循环定时给服务端发送心跳消息
InvokeRepeating("SendHeartMsg", 0, SEND_HEART_MSG_TIME);
}
private void SendHeartMsg()
{
if (isConnected)
Send(heartMessage);
}
在服务端 ClientSocket 类定义上次心跳时间和超时时间
// 上一次收到消息的时间
private long frontTime = -1;
// 超时时间
private static int TIME_OUT_TIME = 10;
在服务端 ClientSocket 类定义检查心跳是否超时的方法。可以在构造函数时就开一个线程每隔几秒检查一次,但是比较耗性能。可以在收消息的时候检查,但是也会每帧判断,可以自行加逻辑优化
public ClientSocket(Socket clientSocket)
{
this.clientID = CLIENT_BEGIN_ID; // 初始化客户端ID
this.clientSocket = clientSocket; // 初始化套接字
++CLIENT_BEGIN_ID; // 为下一个客户端分配不同的ID
//我们现在为了方便大家理解 所以开了一个线程专门计时 但是这种方式比较消耗性能 不建议这样使用
//ThreadPool.QueueUserWorkItem(CheckTimeOut);
}
/// <summary>
/// 间隔一段时间 检测一次超时 如果超时 就会主动断开该客户端的连接
/// </summary>
/// <param name="obj"></param>
private void CheckTimeOut(/*object obj*/)
{
//while (Connected)
//{
if (frontTime != -1 &&
DateTime.Now.Ticks / TimeSpan.TicksPerSecond - frontTime >= TIME_OUT_TIME)
{
Program.serverSocket.AddDelSocket(this);
//break;
}
//Thread.Sleep(5000);
//}
}
// 接收来自客户端的消息
public void Receive()
{
if (!isClientConnected)
{
...
}
try
{
if (clientSocket.Available > 0)// 如果套接字中有可读数据
{
...
}
//检测 是否超时
CheckTimeOut();
}
catch (Exception e)
{
...
}
}
在服务端 ClientSocket 类分包、黏包方法和处理消息方法中加上对心跳消息的处理
// 处理接受消息 分包、黏包问题的方法
private void HandleReceiveMsg(byte[] receiveBytes, int receiveNum)
{
...
// 缓存数组长度减去当前解析的位置假如大于消息长度 且消息长度不能是-1(-1说明没有解析消息长度 那就更不能解析消息体了)说明可以解析消息体
if (cacheNum - nowIndex >= msgLength && msgLength != -1)
{
// 解析消息体
BaseMessage baseMessage = null;
switch (msgID)
{
case 1001:
baseMessage = new PlayerMessage();
baseMessage.Reading(cacheBytes, nowIndex);
break;
case 1003:
baseMessage = new QuitMessage();
//QuitMessage没有消息体 不用反序列化
break;
case 999:
baseMessage = new HeartMessage();
//由于该消息没有消息体 所以都不用反序列化
break;
}
...
}
else
{
...
}
}
// 处理接收到的消息
private void HandleMessage(object obj)
{
BaseMessage baseMessage = obj as BaseMessage;
if (baseMessage is PlayerMessage)
{
.../
}
else if(baseMessage is QuitMessage)
{
...
}
else if (baseMessage is HeartMessage)
{
//收到心跳消息 记录收到消息的时间
frontTime = DateTime.Now.Ticks / TimeSpan.TicksPerSecond;
Console.WriteLine("收到心跳消息");
}
}
为了测试心跳消息超时是否能检测生效,暂时把 TcpNetManager 断开发送退出消息的逻辑注释
// 关闭连接
public void Close()
{
print("客户端主动断开连接");
if (socket != null) // 如果套接字对象存在
{
////主动发送一条断开的消息个服务端
//QuitMessage quitMessage = new QuitMessage();
////这里不能用我们封装的Send方法 因为Send方法是开一个线程发送的 可能还没发就直接被断开了
//socket.Send(quitMessage.Writing());
//socket.Shutdown(SocketShutdown.Both); // 关闭套接字的发送和接收
//socket.Disconnect(false);//手动停止连接 参数意思是这个socket以后还用不用
//socket.Close(); // 关闭套接字连接
socket = null;//当前socket不会再用了 置空
isConnected = false; // 标记连接已关闭
}
}
开启服务端和客户端后,关闭客户端等10秒可以检测到客户端断开连接
总结
心跳消息是长连接项目中必备的一套逻辑规则。通过它可以帮助我们在服务器端及时的释放掉失效的 socket,可以有效避免当客户端非正常关闭时,服务器端不能及时判断连接已断开。
27.2 知识点代码
TcpNetManager
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
public class TcpNetManager : BaseSingletonInMonoBehaviour<TcpNetManager>
{
private Socket socket; // 创建Socket对象,用于网络通信
private Queue<BaseMessage> sendMsgQueue = new Queue<BaseMessage>(); // 创建一个队列,用于存储待发送的消息
private Queue<BaseMessage> receiveQueue = new Queue<BaseMessage>(); // 创建一个队列,用于存储接收到的消息
private bool isConnected = false; // 用于标识是否已连接到服务器
//private byte[] receiveBytes = new byte[1024 * 1024]; // 创建一个字节数组,用于存储接收到的数据
//private int receiveNum; // 用于存储接收到的字节数
//用于处理分包时 缓存的 字节数组 和 字节数组长度
private byte[] cacheBytes = new byte[1024 * 1024];
private int cacheNum = 0;
//发送心跳消息的间隔时间
private int SEND_HEART_MSG_TIME = 2;
private HeartMessage heartMessage = new HeartMessage();
protected override void Awake()
{
base.Awake();
//客户端循环定时给服务端发送心跳消息
InvokeRepeating("SendHeartMsg", 0, SEND_HEART_MSG_TIME);
}
private void SendHeartMsg()
{
if (isConnected)
Send(heartMessage);
}
// 连接服务器
public void Connect(string ip, int port)
{
if (isConnected) // 如果已连接,则直接返回
return;
if (socket == null) // 如果套接字为空,创建一个套接字对象
socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPEndPoint ipPoint = new IPEndPoint(IPAddress.Parse(ip), port); // 创建一个IP终结点对象
try
{
// 尝试连接到指定的IP地址和端口
socket.Connect(ipPoint);
isConnected = true; // 标记已连接
ThreadPool.QueueUserWorkItem(SendMsg); // 创建并启动发送消息的线程
ThreadPool.QueueUserWorkItem(ReceiveMsg); // 创建并启动接收消息的线程
}
catch (SocketException e)
{
if (e.ErrorCode == 10061) // 如果连接被服务器拒绝
print("服务器拒绝连接");
else
print("连接失败" + e.ErrorCode + e.Message); // 打印连接失败的信息
}
}
// 关闭连接
public void Close()
{
print("客户端主动断开连接");
if (socket != null) // 如果套接字对象存在
{
//主动发送一条断开的消息个服务端
QuitMessage quitMessage = new QuitMessage();
//这里不能用我们封装的Send方法 因为Send方法是开一个线程发送的 可能还没发就直接被断开了
socket.Send(quitMessage.Writing());
socket.Shutdown(SocketShutdown.Both); // 关闭套接字的发送和接收
socket.Disconnect(false);//手动停止连接 参数意思是这个socket以后还用不用
socket.Close(); // 关闭套接字连接
socket = null;//当前socket不会再用了 置空
isConnected = false; // 标记连接已关闭
}
}
// 当对象被销毁时,确保关闭连接
private void OnDestroy()
{
Close(); // 调用关闭连接的方法
}
/// <summary>
/// 用于测试 直接发字节数组的方法
/// </summary>
/// <param name="bytes"></param>
public void SendTest(byte[] bytes)
{
socket.Send(bytes);
}
// 发送消息
public void Send(BaseMessage baseMessage)
{
sendMsgQueue.Enqueue(baseMessage); // 将消息添加到发送消息队列
}
// 在独立线程中处理发送消息的逻辑
private void SendMsg(object obj)
{
while (isConnected) // 只要连接有效
{
if (sendMsgQueue.Count > 0) // 如果发送消息队列中有待发送的消息
{
// 从队列中取出消息并发送到服务器
socket.Send(sendMsgQueue.Dequeue().Writing());
}
}
}
// 在独立线程中处理接收消息的逻辑
private void ReceiveMsg(object obj)
{
while (isConnected) // 只要连接有效
{
if (socket.Available > 0) // 如果有可接收的数据
{
//临时字节数组
byte[] receiveBytes = new byte[1024 * 1024];
// 接收从服务器发送来的数据,并将数据转换成字符串后存储到接收消息队列 得到字节数组长度
int receiveNum = socket.Receive(receiveBytes);
//处理接受消息 分包、黏包问题
HandleReceiveMsg(receiveBytes, receiveNum);
////首先把收到字节数组的前4个字节 读取出来得到ID
//int msgID = BitConverter.ToInt32(receiveBytes, 0);
//BaseMessage baseMessage = null;
//switch (msgID)
//{
// case 1001:
// PlayerMessage playerMessage = new PlayerMessage();
// playerMessage.Reading(receiveBytes, 4);
// baseMessage = playerMessage;
// break;
//}
////如果消息为空 那证明是不知道类型的消息 没有解析
//if (baseMessage == null)
// continue;
////收到消息 解析消息为字符串 并放入公共容器
//receiveQueue.Enqueue(baseMessage);
}
}
}
// 处理接受消息 分包、黏包问题的方法
private void HandleReceiveMsg(byte[] receiveBytes, int receiveNum)
{
int msgID = 0; // 消息ID
int msgLength = 0; // 消息长度
int nowIndex = 0; // 当前消息解析到哪一位
// 当接收到消息时,检查是否有之前缓存的数据
// 如果有,说明有分包,将新收到的字节数组拼接到后面,数组长度加上缓存长度
// 如果没有,缓存数组是空数组,缓存长度是0,不影响后面的逻辑
receiveBytes.CopyTo(cacheBytes, cacheNum);
cacheNum += receiveNum;
while (true)
{
// 在每次循环开始时将消息长度设置为-1作为标记,以避免上一次解析的数据影响当前的判断
msgLength = -1;
// 如果当前的缓存数组长度大于8 那么就可以解析这一个包的消息ID和消息长度 移动解析位置
if (cacheNum - nowIndex >= 8)
{
// 解析消息ID
msgID = BitConverter.ToInt32(cacheBytes, nowIndex);
nowIndex += 4;
// 解析消息长度
msgLength = BitConverter.ToInt32(cacheBytes, nowIndex);
nowIndex += 4;
}
// 缓存数组长度减去当前解析的位置假如大于消息长度 且消息长度不能是-1(-1说明没有解析消息长度 那就更不能解析消息体了)说明可以解析消息体
if (cacheNum - nowIndex >= msgLength && msgLength != -1)
{
// 解析消息体
BaseMessage baseMessage = null;
switch (msgID)
{
case 1001:
PlayerMessage playerMessage = new PlayerMessage();
playerMessage.Reading(cacheBytes, nowIndex);
baseMessage = playerMessage;
break;
}
// 如果成功解析了消息体,将消息加入接收队列
if (baseMessage != null)
receiveQueue.Enqueue(baseMessage);
//移动解析位置 加上消息体长度
nowIndex += msgLength;
// 如果刚好解析完当前缓存数组所有内容,说明这个包没有黏包,重置缓存并退出循环,解析结束
if (nowIndex == cacheNum)
{
cacheNum = 0;
break;
}
}
// 如果不满足条件,表明存在分包的情况,需要将当前接收的内容记录下来 以便在下次接收到消息后继续处理
else
{
// 如果已经解析了消息ID和消息长度,但没有成功解析消息体,需要减去nowIndex解析位置的偏移。
// 要保留完整的消息ID和消息长度,以便下次完整解析。
if (msgLength != -1)
nowIndex -= 8;
// 使用Array.Copy方法,将剩余未解析的字节数组内容移到前面,用于缓存下次继续解析。
// 参数1: 要拷贝的数组 这里是原始的缓存数组 cacheBytes。
// 参数2: 从第几个索引开始拷贝后面的内容 这里是 nowIndex,表示从未解析的部分开始,把nowIndex到尾部的字节元素都要拷贝
// 参数3: 拷贝到的目标数组 也是原始的缓存数组 cacheBytes,因此在这里实际上是在原数组中进行移动操作。
// 参数4: 目标数组开始索引 这里是0,表示将数据移动到数组的开头。
// 参数5: 拷贝长度 这里是 cacheNum - nowIndex,表示要移动的字节数,即未解析部分的长度。cacheNum代表原先缓存数组所有要解析的字节数组长度,减去nowIndex代表未解析部分的长度。
Array.Copy(cacheBytes, nowIndex, cacheBytes, 0, cacheNum - nowIndex);
// 更新缓存的长度,减去已解析的部分,以便在下次继续解析时正确处理未解析的内容。
cacheNum = cacheNum - nowIndex;
break;
}
}
}
void Update()
{
// 在Unity的每一帧中检查是否有待处理的接收消息,如果有,则打印出来
if (receiveQueue.Count > 0)
{
BaseMessage baseMessage = receiveQueue.Dequeue();
if (baseMessage is PlayerMessage)
{
PlayerMessage playerMessage = (baseMessage as PlayerMessage);
print(playerMessage.playerID);
print(playerMessage.playerData.name);
print(playerMessage.playerData.lev);
print(playerMessage.playerData.atk);
}
}
}
}
Lesson27_网络通信_套接字Socket_TCP通信_同步_心跳消息_实现
public class Lesson27_网络通信_套接字Socket_TCP通信_同步_心跳消息_实现 : MonoBehaviour
{
public InputField InputField;
public Button sendButton;
public Button nianSendButton;
public Button fenSendButton;
public Button fenNiansendButton;
void Start()
{
#region 客户端逻辑
TcpNetManager.Instance.Connect("127.0.0.1", 8080);
//直接发消息测试
sendButton.onClick.AddListener(() =>
{
PlayerMessage playerMessage = new PlayerMessage();
playerMessage.playerID = 1111;
playerMessage.playerData = new PlayerData();
playerMessage.playerData.name = "韬老狮客户端发送的信息";
playerMessage.playerData.atk = 22;
playerMessage.playerData.lev = 10;
TcpNetManager.Instance.Send(playerMessage);
});
//黏包测试
nianSendButton.onClick.AddListener(() =>
{
PlayerMessage playerMessage1 = new PlayerMessage();
playerMessage1.playerID = 1001;
playerMessage1.playerData = new PlayerData();
playerMessage1.playerData.name = "韬老狮1";
playerMessage1.playerData.atk = 1;
playerMessage1.playerData.lev = 1;
PlayerMessage payerMessage = new PlayerMessage();
payerMessage.playerID = 1002;
payerMessage.playerData = new PlayerData();
payerMessage.playerData.name = "韬老狮2";
payerMessage.playerData.atk = 2;
payerMessage.playerData.lev = 2;
//黏包 把两个对象的字节数组合并在一起发送
byte[] bytes = new byte[playerMessage1.GetBytesNum() + payerMessage.GetBytesNum()];
playerMessage1.Writing().CopyTo(bytes, 0);
payerMessage.Writing().CopyTo(bytes, playerMessage1.GetBytesNum());
TcpNetManager.Instance.SendTest(bytes);
});
//分包测试 使用异步函数
fenSendButton.onClick.AddListener(async () =>
{
PlayerMessage playerMessage = new PlayerMessage();
playerMessage.playerID = 1003;
playerMessage.playerData = new PlayerData();
playerMessage.playerData.name = "韬老狮1";
playerMessage.playerData.atk = 3;
playerMessage.playerData.lev = 3;
byte[] bytes = playerMessage.Writing();
//分包
byte[] bytes1 = new byte[10];
byte[] bytes2 = new byte[bytes.Length - 10];
//分成第一个包
Array.Copy(bytes, 0, bytes1, 0, 10);
//第二个包
Array.Copy(bytes, 10, bytes2, 0, bytes.Length - 10);
TcpNetManager.Instance.SendTest(bytes1);
await Task.Delay(500);//延迟执行半秒进行分包
TcpNetManager.Instance.SendTest(bytes2);
});
//分包、黏包测试
fenNiansendButton.onClick.AddListener(async () =>
{
PlayerMessage playerMessage = new PlayerMessage();
playerMessage.playerID = 1004;
playerMessage.playerData = new PlayerData();
playerMessage.playerData.name = "韬老狮1";
playerMessage.playerData.atk = 4;
playerMessage.playerData.lev = 4;
PlayerMessage playerMessage2 = new PlayerMessage();
playerMessage2.playerID = 1005;
playerMessage2.playerData = new PlayerData();
playerMessage2.playerData.name = "韬老狮2";
playerMessage2.playerData.atk = 5;
playerMessage2.playerData.lev = 5;
byte[] bytes1 = playerMessage.Writing();//消息A
byte[] bytes2 = playerMessage2.Writing();//消息B
byte[] bytes2_1 = new byte[10];
byte[] bytes2_2 = new byte[bytes2.Length - 10];
//分成第一个包
Array.Copy(bytes2, 0, bytes2_1, 0, 10);
//第二个包
Array.Copy(bytes2, 10, bytes2_2, 0, bytes2.Length - 10);
//消息A和消息B前一段的 黏包
byte[] bytes1_2 = new byte[bytes1.Length + bytes2_1.Length];
bytes1.CopyTo(bytes1_2, 0);
bytes2_1.CopyTo(bytes1_2, bytes1.Length);
TcpNetManager.Instance.SendTest(bytes1_2);
await Task.Delay(500);
TcpNetManager.Instance.SendTest(bytes2_2);
});
#endregion
#region 知识点一 什么是心跳消息?
//所谓心跳消息,就是在长连接中,客户端和服务端之间定期发送的一种特殊的数据包
//用于通知对方自己还在线,以确保长连接的有效性
//由于其发送的时间间隔往往是固定的持续的,就像是心跳一样一直存在
//所以我们称之为心跳消息
#endregion
#region 知识点二 为什么需要心跳消息?
//1.避免非正常关闭客户端时,服务器无法正常收到关闭连接消息
//通过心跳消息我们可以自定义超时判断,如果超时没有收到客户端消息,证明客户端已经断开连接
//2.避免客户端长期不发送消息,防火墙或者路由器会断开连接,我们可以通过心跳消息一直保持活跃状态
#endregion
#region 知识点三 实现心跳消息
//客户端
//主要功能:定时发送消息
//服务器
//主要功能:不停检测上次收到某客户端消息的时间,如果超时则认为连接已经断开
#endregion
#region 总结
//心跳消息是长连接项目中必备的一套逻辑规则
//通过它可以帮助我们在服务器端及时的释放掉失效的socket
//可以有效避免当客户端非正常关闭时,服务器端不能及时判断连接已断开
#endregion
}
}
HeartMessage
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
public class HeartMessage : BaseMessage
{
public override int GetBytesNum()
{
//需要存储 ID和长度 两个int 8字节
return 4 + 4;
}
public override int Reading(byte[] bytes, int beginIndex = 0)
{
//没有消息体 0字节
return 0;
}
public override byte[] Writing()
{
//写入ID和长度到字节数组并返回
int index = 0;
byte[] bytes = new byte[GetBytesNum()];
WriteInt(bytes, GetID(), ref index);
WriteInt(bytes, 0, ref index);
return bytes;
}
public override int GetID()
{
return 999;
}
}
ClientSocket
using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading;
namespace Lesson27_网络通信_套接字Socket_TCP通信_同步_心跳消息_实现
{
class ClientSocket
{
private static int CLIENT_BEGIN_ID = 1; // 静态变量,用于为客户端分配唯一的客户端ID
public int clientID; // 客户端的唯一ID
public Socket clientSocket; // 与客户端通信的套接字对象
//用于处理分包时 缓存的 字节数组 和 字节数组长度
private byte[] cacheBytes = new byte[1024 * 1024];
private int cacheNum = 0;
//上一次收到消息的时间
private long frontTime = -1;
//超时时间
private static int TIME_OUT_TIME = 10;
/// <summary>
/// 是否是连接状态
/// </summary>
public bool isClientConnected => this.clientSocket.Connected; // 判断套接字是否处于连接状态
public ClientSocket(Socket clientSocket)
{
this.clientID = CLIENT_BEGIN_ID; // 初始化客户端ID
this.clientSocket = clientSocket; // 初始化套接字
++CLIENT_BEGIN_ID; // 为下一个客户端分配不同的ID
//我们现在为了方便大家理解 所以开了一个线程专门计时 但是这种方式比较消耗性能 不建议这样使用
//ThreadPool.QueueUserWorkItem(CheckTimeOut);
}
/// <summary>
/// 间隔一段时间 检测一次超时 如果超时 就会主动断开该客户端的连接
/// </summary>
/// <param name="obj"></param>
private void CheckTimeOut(/*object obj*/)
{
//while (Connected)
//{
if (frontTime != -1 &&
DateTime.Now.Ticks / TimeSpan.TicksPerSecond - frontTime >= TIME_OUT_TIME)
{
Program.serverSocket.AddDelSocket(this);
//break;
}
//Thread.Sleep(5000);
//}
}
// 关闭套接字连接
public void Close()
{
if (clientSocket != null)
{
clientSocket.Shutdown(SocketShutdown.Both); // 关闭套接字的读写
clientSocket.Close(); // 关闭套接字连接
clientSocket = null;
}
}
// 发送消息给客户端
public void Send(BaseMessage baseMessage)
{
if (isClientConnected)
{
try
{
clientSocket.Send(baseMessage.Writing()); // 将消息编码为UTF-8字节数组并发送给客户端
}
catch (Exception e)
{
Console.WriteLine("发消息出错" + e.Message);
//解析报错也加入到待移除客户端列表
Program.serverSocket.AddDelSocket(this);
//Close(); // 如果发送出现异常,关闭套接字连接
}
}
else
{
//断开连接的话加入到待移除客户端列表
Program.serverSocket.AddDelSocket(this);
}
}
// 接收来自客户端的消息
public void Receive()
{
if (!isClientConnected)
{
//断开连接的话加入到待移除客户端列表
Program.serverSocket.AddDelSocket(this);
return;
}
try
{
if (clientSocket.Available > 0)// 如果套接字中有可读数据
{
byte[] result = new byte[1024 * 5]; // 创建一个缓冲区来存储接收到的数据
int receiveNum = clientSocket.Receive(result);// 从套接字接收数据并存储在缓冲区中
HandleReceiveMsg(result,receiveNum);
////收到数据后 先读取4个字节 转为ID 才知道用哪一个类型去处理反序列化
//int msgID = BitConverter.ToInt32(result, 0);
//BaseMessage baseMessage = null;
//switch (msgID)
//{
// case 1001:
// baseMessage = new PlayerMessage();
// baseMessage.Reading(result, 4);
// break;
//}
//if (baseMessage == null)
// return;
//ThreadPool.QueueUserWorkItem(HandleMessage, baseMessage);
}
//检测 是否超时
CheckTimeOut();
}
catch (Exception e)
{
Console.WriteLine("收消息出错" + e.Message);
//解析报错也加入到待移除客户端列表
Program.serverSocket.AddDelSocket(this);
//Close(); // 如果接收出现异常,关闭套接字连接
}
}
// 处理接受消息 分包、黏包问题的方法
private void HandleReceiveMsg(byte[] receiveBytes, int receiveNum)
{
int msgID = 0; // 消息ID
int msgLength = 0; // 消息长度
int nowIndex = 0; // 当前消息解析到哪一位
// 当接收到消息时,检查是否有之前缓存的数据
// 如果有,说明有分包,将新收到的字节数组拼接到后面,数组长度加上缓存长度
// 如果没有,缓存数组是空数组,缓存长度是0,不影响后面的逻辑
receiveBytes.CopyTo(cacheBytes, cacheNum);
cacheNum += receiveNum;
while (true)
{
// 在每次循环开始时将消息长度设置为-1作为标记,以避免上一次解析的数据影响当前的判断
msgLength = -1;
// 如果当前的缓存数组长度大于8 那么就可以解析这一个包的消息ID和消息长度 移动解析位置
if (cacheNum - nowIndex >= 8)
{
// 解析消息ID
msgID = BitConverter.ToInt32(cacheBytes, nowIndex);
nowIndex += 4;
// 解析消息长度
msgLength = BitConverter.ToInt32(cacheBytes, nowIndex);
nowIndex += 4;
}
// 缓存数组长度减去当前解析的位置假如大于消息长度 且消息长度不能是-1(-1说明没有解析消息长度 那就更不能解析消息体了)说明可以解析消息体
if (cacheNum - nowIndex >= msgLength && msgLength != -1)
{
// 解析消息体
BaseMessage baseMessage = null;
switch (msgID)
{
case 1001:
baseMessage = new PlayerMessage();
baseMessage.Reading(cacheBytes, nowIndex);
break;
case 1003:
baseMessage = new QuitMessage();
//QuitMessage没有消息体 不用反序列化
break;
case 999:
baseMessage = new HeartMessage();
//由于该消息没有消息体 所以都不用反序列化
break;
}
// 如果成功解析了消息体,将消息加入接收队列
if (baseMessage != null)
//receiveQueue.Enqueue(baseMessage);
ThreadPool.QueueUserWorkItem(HandleMessage, baseMessage);//开启线程进行处理
//移动解析位置 加上消息体长度
nowIndex += msgLength;
// 如果刚好解析完当前缓存数组所有内容,说明这个包没有黏包,重置缓存并退出循环,解析结束
if (nowIndex == cacheNum)
{
cacheNum = 0;
break;
}
}
// 如果不满足条件,表明存在分包的情况,需要将当前接收的内容记录下来 以便在下次接收到消息后继续处理
else
{
// 如果已经解析了消息ID和消息长度,但没有成功解析消息体,需要减去nowIndex解析位置的偏移。
// 要保留完整的消息ID和消息长度,以便下次完整解析。
if (msgLength != -1)
nowIndex -= 8;
// 使用Array.Copy方法,将剩余未解析的字节数组内容移到前面,用于缓存下次继续解析。
// 参数1: 要拷贝的数组 这里是原始的缓存数组 cacheBytes。
// 参数2: 从第几个索引开始拷贝后面的内容 这里是 nowIndex,表示从未解析的部分开始,把nowIndex到尾部的字节元素都要拷贝
// 参数3: 拷贝到的目标数组 也是原始的缓存数组 cacheBytes,因此在这里实际上是在原数组中进行移动操作。
// 参数4: 目标数组开始索引 这里是0,表示将数据移动到数组的开头。
// 参数5: 拷贝长度 这里是 cacheNum - nowIndex,表示要移动的字节数,即未解析部分的长度。cacheNum代表原先缓存数组所有要解析的字节数组长度,减去nowIndex代表未解析部分的长度。
Array.Copy(cacheBytes, nowIndex, cacheBytes, 0, cacheNum - nowIndex);
// 更新缓存的长度,减去已解析的部分,以便在下次继续解析时正确处理未解析的内容。
cacheNum = cacheNum - nowIndex;
break;
}
}
}
// 处理接收到的消息
private void HandleMessage(object obj)
{
BaseMessage baseMessage = obj as BaseMessage;
if (baseMessage is PlayerMessage)
{
PlayerMessage playerMessage = baseMessage as PlayerMessage;
Console.WriteLine(playerMessage.playerID);
Console.WriteLine(playerMessage.playerData.name);
Console.WriteLine(playerMessage.playerData.lev);
Console.WriteLine(playerMessage.playerData.atk);
}
else if(baseMessage is QuitMessage)
{
//收到断开消息 添加到断开列表中
Program.serverSocket.AddDelSocket(this);
}
else if (baseMessage is HeartMessage)
{
//收到心跳消息 记录收到消息的时间
frontTime = DateTime.Now.Ticks / TimeSpan.TicksPerSecond;
Console.WriteLine("收到心跳消息");
}
}
}
}
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 785293209@qq.com