26.网络通信-套接字Socket-TCP通信-同步-心跳消息-客户端主动断开连接
26.1 知识点
目前的客户端主动断开连接
存在的问题
目前在客户端主动退出时
我们会调用socket的 ShutDown和Close方法
但是通过调用这两个方法后 服务器端无法得知客户端已经主动断开
尝试修改 服务器端中客户端类的断开逻辑
会发现当客户端主动断开时 服务端中的客户端类仍然显示正在连接 需要解决
修改服务端中ClientSocket类的收发消息判断,要客户段连接的时候才进行收发消息
// 发送消息给客户端
public void Send(BaseMessage baseMessage)
{
if (isClientConnected)
{
...
}
}
// 接收来自客户端的消息
public void Receive()
{
if (!isClientConnected)
return;
...
}
开启服务端和客户端 当客户端主动断开时 服务端中的客户端类仍然显示正在连接 需要解决
解决目前断开不及时的问题
客户端尝试使用Disconnect方法主动断开连接
思路
Socket当中有一个专门在客户端使用的方法
Disconect方法
客户端调用该方法和服务器端断开连接
看是否是因为之前直接Close而没有调用Disconet造成服务器端无法及时获取状态
主要修改的逻辑:
客户端:
主动断开连接
服务端:
收发消息时判断socket是否已经断开
处理删除记录的socket的相关逻辑(会用到线程锁)
TcpNetManager关闭连接并手动将socket置空
// 关闭连接
public void Close()
{
print("客户端主动断开连接");
if (socket != null) // 如果套接字对象存在
{
socket.Shutdown(SocketShutdown.Both); // 关闭套接字的发送和接收
socket.Disconnect(false);//手动停止连接 参数意思是这个socket以后还用不用
socket.Close(); // 关闭套接字连接
socket = null;//当前socket不会再用了 置空
isConnected = false; // 标记连接已关闭
}
}
在服务端的ServerSocket中添加关闭客户端连接并从字典中移除的方法,注意操作客户端对象字典时要添加线程锁
//关闭客户端连接的 从字典中移除
public void CloseClientSocket(ClientSocket socket)
{
//添加线程锁 服务器类有很多线程 假如同时操作字典会有问题 保证线程安全
lock (clientSocketDictionary)
{
socket.Close();
if (clientSocketDictionary.ContainsKey(socket.clientID))
{
clientSocketDictionary.Remove(socket.clientID);
Console.WriteLine("客户端{0}主动断开连接了", socket.clientID);
}
}
}
服务端的ServerSocket其他操作或者访问字典的方法中也要添加线程锁
// 接受客户端连接
private void AcceptClientConnect(object obj)
{
while (!isServerClose)
{
try
{
// 等待并接受一个客户端连接请求
Socket clientSocket = serverSocket.Accept();
// 创建一个新的ClientSocket对象来管理客户端连接
ClientSocket client = new ClientSocket(clientSocket);
// 向客户端发送欢迎消息
//client.Send("欢迎连入服务器");
//线程锁 保证字典线程安全
lock (clientSocketDictionary)
{
// 将客户端Socket对象添加到字典中,以客户端ID作为键
clientSocketDictionary.Add(client.clientID, client);
}
}
catch (Exception e)
{
Console.WriteLine("客户端连入报错" + e.Message);
}
}
}
// 接收客户端消息
private void ReceiveClientMessage(object obj)
{
while (!isServerClose)
{
if (clientSocketDictionary.Count > 0)
{
//线程锁 保证字典线程安全
lock (clientSocketDictionary)
{
foreach (ClientSocket client in clientSocketDictionary.Values)
{
// 从每个客户端接收消息
client.Receive();
}
}
}
}
}
// 向所有客户端广播消息
public void Broadcast(BaseMessage baseMessage)
{
//线程锁 保证字典线程安全
lock (clientSocketDictionary)
{
foreach (ClientSocket client in clientSocketDictionary.Values)
{
client.Send(baseMessage);
}
}
}
关闭客户端类连接并从字典中移除的方法正常来说应该在给客户端发消息或者接收客户端消息的时候移除,但是客户端发消息和接收客户端消息是在服务端类遍历字典调用的,在遍历时操作集合可能会报错,需要解决
在ServerSocket创建一个List存储待移除的客户端socket。并定义添加客户端到待移除的客户端列表的方法。
//有待移除的客户端socket 避免 在foreach时直接从字典中移除 出现问题
private List<ClientSocket> delList = new List<ClientSocket>();
//添加待移除的 socket内容
public void AddDelSocket(ClientSocket socket)
{
if (!delList.Contains(socket))
delList.Add(socket);
}
把Program的serverSocket改成静态变量,在ClientSocket的发送和接收来自客户端的消息方法中,检测到断开连接或者解析报错就把当前客户端添加到服务端待移除的客户端列表中。
// 发送消息给客户端
public void Send(BaseMessage baseMessage)
{
if (isClientConnected)
{
try
{
clientSocket.Send(baseMessage.Writing()); // 将消息编码为UTF-8字节数组并发送给客户端
}
catch (Exception e)
{
Console.WriteLine("发消息出错" + e.Message);
//解析报错也加入到待移除客户端列表
Program.serverSocket.AddDelSocket(this);
//Close(); // 如果发送出现异常,关闭套接字连接
}
}
else
{
//断开连接的话加入到待移除客户端列表
Program.serverSocket.AddDelSocket(this);
}
}
// 接收来自客户端的消息
public void Receive()
{
if (!isClientConnected)
{
//断开连接的话加入到待移除客户端列表
Program.serverSocket.AddDelSocket(this);
return;
}
try
{
if (clientSocket.Available > 0)// 如果套接字中有可读数据
{
byte[] result = new byte[1024 * 5]; // 创建一个缓冲区来存储接收到的数据
int receiveNum = clientSocket.Receive(result);// 从套接字接收数据并存储在缓冲区中
HandleReceiveMsg(result,receiveNum);
////收到数据后 先读取4个字节 转为ID 才知道用哪一个类型去处理反序列化
//int msgID = BitConverter.ToInt32(result, 0);
//BaseMessage baseMessage = null;
//switch (msgID)
//{
// case 1001:
// baseMessage = new PlayerMessage();
// baseMessage.Reading(result, 4);
// break;
//}
//if (baseMessage == null)
// return;
//ThreadPool.QueueUserWorkItem(HandleMessage, baseMessage);
}
}
catch (Exception e)
{
Console.WriteLine("收消息出错" + e.Message);
//解析报错也加入到待移除客户端列表
Program.serverSocket.AddDelSocket(this);
//Close(); // 如果接收出现异常,关闭套接字连接
}
}
在ServerSocket定义遍历待删除客户端列表方法,逐个关闭客户端连接。关闭后清空。
//遍历待删除列表断开连接并删除
public void CloseDelListSocket()
{
//判断有没有 断开连接的 把其 移除
for (int i = 0; i < delList.Count; i++)
CloseClientSocket(delList[i]);
delList.Clear();
}
在ServerSocket接收客户端消息的函数中,遍历完一次客户端字典接收过一轮消息后,调用遍历待删除客户端列表方法。
// 接收客户端消息
private void ReceiveClientMessage(object obj)
{
while (!isServerClose)
{
if (clientSocketDictionary.Count > 0)
{
//线程锁 保证字典线程安全
lock (clientSocketDictionary)
{
foreach (ClientSocket client in clientSocketDictionary.Values)
{
// 从每个客户端接收消息
client.Receive();
}
//把待删除列表中的客户端断开连接后删除
CloseDelListSocket();
}
}
}
}
测试后,发现客户端断开,服务器仍然无法检测到客户端类isClientConnected的变化,需要自定义退出消息来辅助
自定义退出消息
思路
让服务器端收到该消息就知道是客户端想要主动断开
然后服务器端处理释放socket相关工作
创建退出消息QuitMessage 类,继承BaseMessage,只要消息ID和长度,没有消息体
public class QuitMessage : BaseMessage
{
public override int GetBytesNum()
{
//需要存储 ID和长度 两个int 8字节
return 4 + 4;
}
public override int Reading(byte[] bytes, int beginIndex = 0)
{
//没有消息体 0字节
return 0;
}
public override byte[] Writing()
{
//写入ID和长度到字节数组并返回
int index = 0;
byte[] bytes = new byte[GetBytesNum()];
WriteInt(bytes, GetID(), ref index);
WriteInt(bytes, 0, ref index);
return bytes;
}
public override int GetID()
{
return 1003;
}
}
在ClientSocket处理接受消息 分包、黏包问题的方法中,添加解析退出消息的逻辑
// 处理接受消息 分包、黏包问题的方法
private void HandleReceiveMsg(byte[] receiveBytes, int receiveNum)
{
...
while (true)
{
...
// 缓存数组长度减去当前解析的位置假如大于消息长度 且消息长度不能是-1(-1说明没有解析消息长度 那就更不能解析消息体了)说明可以解析消息体
if (cacheNum - nowIndex >= msgLength && msgLength != -1)
{
// 解析消息体
BaseMessage baseMessage = null;
switch (msgID)
{
case 1001:
baseMessage = new PlayerMessage();
baseMessage.Reading(cacheBytes, nowIndex);
break;
case 1003:
baseMessage = new QuitMessage();
//QuitMessage没有消息体 不用反序列化
break;
}
}
...
}
}
在ClientSocket处理收到消息方法中,假如收到的消息是退出消息,就把当前客户端类添加到待删除列表中
// 处理接收到的消息
private void HandleMessage(object obj)
{
BaseMessage baseMessage = obj as BaseMessage;
if (baseMessage is PlayerMessage)
{
PlayerMessage playerMessage = baseMessage as PlayerMessage;
Console.WriteLine(playerMessage.playerID);
Console.WriteLine(playerMessage.playerData.name);
Console.WriteLine(playerMessage.playerData.lev);
Console.WriteLine(playerMessage.playerData.atk);
}
else if(baseMessage is QuitMessage)
{
//收到断开消息 添加到断开列表中
Program.serverSocket.AddDelSocket(this);
}
}
TcpNetManager中断开连接时发送退出消息
// 关闭连接
public void Close()
{
print("客户端主动断开连接");
if (socket != null) // 如果套接字对象存在
{
//主动发送一条断开的消息个服务端
QuitMessage quitMessage = new QuitMessage();
//这里不能用我们封装的Send方法 因为Send方法是开一个线程发送的 可能还没发就直接被断开了
socket.Send(quitMessage.Writing());
socket.Shutdown(SocketShutdown.Both); // 关闭套接字的发送和接收
socket.Disconnect(false);//手动停止连接 参数意思是这个socket以后还用不用
socket.Close(); // 关闭套接字连接
socket = null;//当前socket不会再用了 置空
isConnected = false; // 标记连接已关闭
}
}
断点测试成功实现客户端断开连接检测
总结
客户端可以通过Disconnect方法主动和服务器端断开连接
服务器端可以通过Conected属性判断连接状态决定是否释放Socket
但是由于服务器端Conected变量表示的是上一次收发消息是否成功
所以服务器端无法准确判断客户端的连接状态
因此 我们需要自定义一条退出消息 用于准确断开和客户端之间的连接
26.2 知识点代码
Lesson26_网络通信_套接字Socket_TCP通信_同步_心跳消息_客户端主动断开连接
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading.Tasks;
using UnityEngine;
using UnityEngine.UI;
public class Lesson26_网络通信_套接字Socket_TCP通信_同步_心跳消息_客户端主动断开连接 : MonoBehaviour
{
public InputField InputField;
public Button sendButton;
public Button nianSendButton;
public Button fenSendButton;
public Button fenNiansendButton;
void Start()
{
#region 客户端逻辑
TcpNetManager.Instance.Connect("127.0.0.1", 8080);
//直接发消息测试
sendButton.onClick.AddListener(() =>
{
PlayerMessage playerMessage = new PlayerMessage();
playerMessage.playerID = 1111;
playerMessage.playerData = new PlayerData();
playerMessage.playerData.name = "韬老狮客户端发送的信息";
playerMessage.playerData.atk = 22;
playerMessage.playerData.lev = 10;
TcpNetManager.Instance.Send(playerMessage);
});
//黏包测试
nianSendButton.onClick.AddListener(() =>
{
PlayerMessage playerMessage1 = new PlayerMessage();
playerMessage1.playerID = 1001;
playerMessage1.playerData = new PlayerData();
playerMessage1.playerData.name = "韬老狮1";
playerMessage1.playerData.atk = 1;
playerMessage1.playerData.lev = 1;
PlayerMessage payerMessage = new PlayerMessage();
payerMessage.playerID = 1002;
payerMessage.playerData = new PlayerData();
payerMessage.playerData.name = "韬老狮2";
payerMessage.playerData.atk = 2;
payerMessage.playerData.lev = 2;
//黏包 把两个对象的字节数组合并在一起发送
byte[] bytes = new byte[playerMessage1.GetBytesNum() + payerMessage.GetBytesNum()];
playerMessage1.Writing().CopyTo(bytes, 0);
payerMessage.Writing().CopyTo(bytes, playerMessage1.GetBytesNum());
TcpNetManager.Instance.SendTest(bytes);
});
//分包测试 使用异步函数
fenSendButton.onClick.AddListener(async () =>
{
PlayerMessage playerMessage = new PlayerMessage();
playerMessage.playerID = 1003;
playerMessage.playerData = new PlayerData();
playerMessage.playerData.name = "韬老狮1";
playerMessage.playerData.atk = 3;
playerMessage.playerData.lev = 3;
byte[] bytes = playerMessage.Writing();
//分包
byte[] bytes1 = new byte[10];
byte[] bytes2 = new byte[bytes.Length - 10];
//分成第一个包
Array.Copy(bytes, 0, bytes1, 0, 10);
//第二个包
Array.Copy(bytes, 10, bytes2, 0, bytes.Length - 10);
TcpNetManager.Instance.SendTest(bytes1);
await Task.Delay(500);//延迟执行半秒进行分包
TcpNetManager.Instance.SendTest(bytes2);
});
//分包、黏包测试
fenNiansendButton.onClick.AddListener(async () =>
{
PlayerMessage playerMessage = new PlayerMessage();
playerMessage.playerID = 1004;
playerMessage.playerData = new PlayerData();
playerMessage.playerData.name = "韬老狮1";
playerMessage.playerData.atk = 4;
playerMessage.playerData.lev = 4;
PlayerMessage playerMessage2 = new PlayerMessage();
playerMessage2.playerID = 1005;
playerMessage2.playerData = new PlayerData();
playerMessage2.playerData.name = "韬老狮2";
playerMessage2.playerData.atk = 5;
playerMessage2.playerData.lev = 5;
byte[] bytes1 = playerMessage.Writing();//消息A
byte[] bytes2 = playerMessage2.Writing();//消息B
byte[] bytes2_1 = new byte[10];
byte[] bytes2_2 = new byte[bytes2.Length - 10];
//分成第一个包
Array.Copy(bytes2, 0, bytes2_1, 0, 10);
//第二个包
Array.Copy(bytes2, 10, bytes2_2, 0, bytes2.Length - 10);
//消息A和消息B前一段的 黏包
byte[] bytes1_2 = new byte[bytes1.Length + bytes2_1.Length];
bytes1.CopyTo(bytes1_2, 0);
bytes2_1.CopyTo(bytes1_2, bytes1.Length);
TcpNetManager.Instance.SendTest(bytes1_2);
await Task.Delay(500);
TcpNetManager.Instance.SendTest(bytes2_2);
});
#endregion
#region 知识点一 目前的客户端主动断开连接
//目前在客户端主动退出时
//我们会调用socket的 ShutDown和Close方法
//但是通过调用这两个方法后 服务器端无法得知客户端已经主动断开
//尝试修改 服务器端中客户端类的断开逻辑
//会发现当客户端主动断开时 服务端中的客户端类仍然显示正在连接 需要解决
#endregion
#region 知识点二 解决目前断开不及时的问题
//1.客户端尝试使用Disconnect方法主动断开连接
//Socket当中有一个专门在客户端使用的方法
//Disconect方法
//客户端调用该方法和服务器端断开连接
//看是否是因为之前直接Close而没有调用Disconet造成服务器端无法及时获取状态
//主要修改的逻辑:
//客户端:
//主动断开连接
//服务端:
//1.收发消息时判断socket是否已经断开
//2.处理删除记录的socket的相关逻辑(会用到线程锁)
//2.自定义退出消息
//让服务器端收到该消息就知道是客户端想要主动断开
//然后服务器端处理释放socket相关工作
#endregion
#region 总结
//客户端可以通过Disconnect方法主动和服务器端断开连接
//服务器端可以通过Conected属性判断连接状态决定是否释放Socket
//但是由于服务器端Conected变量表示的是上一次收发消息是否成功
//所以服务器端无法准确判断客户端的连接状态
//因此 我们需要自定义一条退出消息 用于准确断开和客户端之间的连接
#endregion
}
}
TcpNetManager
using System;
using System.Collections;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using UnityEngine;
public class TcpNetManager : BaseSingletonInMonoBehaviour<TcpNetManager>
{
private Socket socket; // 创建Socket对象,用于网络通信
private Queue<BaseMessage> sendMsgQueue = new Queue<BaseMessage>(); // 创建一个队列,用于存储待发送的消息
private Queue<BaseMessage> receiveQueue = new Queue<BaseMessage>(); // 创建一个队列,用于存储接收到的消息
private bool isConnected = false; // 用于标识是否已连接到服务器
//private byte[] receiveBytes = new byte[1024 * 1024]; // 创建一个字节数组,用于存储接收到的数据
//private int receiveNum; // 用于存储接收到的字节数
//用于处理分包时 缓存的 字节数组 和 字节数组长度
private byte[] cacheBytes = new byte[1024 * 1024];
private int cacheNum = 0;
// 连接服务器
public void Connect(string ip, int port)
{
if (isConnected) // 如果已连接,则直接返回
return;
if (socket == null) // 如果套接字为空,创建一个套接字对象
socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPEndPoint ipPoint = new IPEndPoint(IPAddress.Parse(ip), port); // 创建一个IP终结点对象
try
{
// 尝试连接到指定的IP地址和端口
socket.Connect(ipPoint);
isConnected = true; // 标记已连接
ThreadPool.QueueUserWorkItem(SendMsg); // 创建并启动发送消息的线程
ThreadPool.QueueUserWorkItem(ReceiveMsg); // 创建并启动接收消息的线程
}
catch (SocketException e)
{
if (e.ErrorCode == 10061) // 如果连接被服务器拒绝
print("服务器拒绝连接");
else
print("连接失败" + e.ErrorCode + e.Message); // 打印连接失败的信息
}
}
// 关闭连接
public void Close()
{
print("客户端主动断开连接");
if (socket != null) // 如果套接字对象存在
{
//主动发送一条断开的消息个服务端
QuitMessage quitMessage = new QuitMessage();
//这里不能用我们封装的Send方法 因为Send方法是开一个线程发送的 可能还没发就直接被断开了
socket.Send(quitMessage.Writing());
socket.Shutdown(SocketShutdown.Both); // 关闭套接字的发送和接收
socket.Disconnect(false);//手动停止连接 参数意思是这个socket以后还用不用
socket.Close(); // 关闭套接字连接
socket = null;//当前socket不会再用了 置空
isConnected = false; // 标记连接已关闭
}
}
// 当对象被销毁时,确保关闭连接
private void OnDestroy()
{
Close(); // 调用关闭连接的方法
}
/// <summary>
/// 用于测试 直接发字节数组的方法
/// </summary>
/// <param name="bytes"></param>
public void SendTest(byte[] bytes)
{
socket.Send(bytes);
}
// 发送消息
public void Send(BaseMessage baseMessage)
{
sendMsgQueue.Enqueue(baseMessage); // 将消息添加到发送消息队列
}
// 在独立线程中处理发送消息的逻辑
private void SendMsg(object obj)
{
while (isConnected) // 只要连接有效
{
if (sendMsgQueue.Count > 0) // 如果发送消息队列中有待发送的消息
{
// 从队列中取出消息并发送到服务器
socket.Send(sendMsgQueue.Dequeue().Writing());
}
}
}
// 在独立线程中处理接收消息的逻辑
private void ReceiveMsg(object obj)
{
while (isConnected) // 只要连接有效
{
if (socket.Available > 0) // 如果有可接收的数据
{
//临时字节数组
byte[] receiveBytes = new byte[1024 * 1024];
// 接收从服务器发送来的数据,并将数据转换成字符串后存储到接收消息队列 得到字节数组长度
int receiveNum = socket.Receive(receiveBytes);
//处理接受消息 分包、黏包问题
HandleReceiveMsg(receiveBytes, receiveNum);
////首先把收到字节数组的前4个字节 读取出来得到ID
//int msgID = BitConverter.ToInt32(receiveBytes, 0);
//BaseMessage baseMessage = null;
//switch (msgID)
//{
// case 1001:
// PlayerMessage playerMessage = new PlayerMessage();
// playerMessage.Reading(receiveBytes, 4);
// baseMessage = playerMessage;
// break;
//}
////如果消息为空 那证明是不知道类型的消息 没有解析
//if (baseMessage == null)
// continue;
////收到消息 解析消息为字符串 并放入公共容器
//receiveQueue.Enqueue(baseMessage);
}
}
}
// 处理接受消息 分包、黏包问题的方法
private void HandleReceiveMsg(byte[] receiveBytes, int receiveNum)
{
int msgID = 0; // 消息ID
int msgLength = 0; // 消息长度
int nowIndex = 0; // 当前消息解析到哪一位
// 当接收到消息时,检查是否有之前缓存的数据
// 如果有,说明有分包,将新收到的字节数组拼接到后面,数组长度加上缓存长度
// 如果没有,缓存数组是空数组,缓存长度是0,不影响后面的逻辑
receiveBytes.CopyTo(cacheBytes, cacheNum);
cacheNum += receiveNum;
while (true)
{
// 在每次循环开始时将消息长度设置为-1作为标记,以避免上一次解析的数据影响当前的判断
msgLength = -1;
// 如果当前的缓存数组长度大于8 那么就可以解析这一个包的消息ID和消息长度 移动解析位置
if (cacheNum - nowIndex >= 8)
{
// 解析消息ID
msgID = BitConverter.ToInt32(cacheBytes, nowIndex);
nowIndex += 4;
// 解析消息长度
msgLength = BitConverter.ToInt32(cacheBytes, nowIndex);
nowIndex += 4;
}
// 缓存数组长度减去当前解析的位置假如大于消息长度 且消息长度不能是-1(-1说明没有解析消息长度 那就更不能解析消息体了)说明可以解析消息体
if (cacheNum - nowIndex >= msgLength && msgLength != -1)
{
// 解析消息体
BaseMessage baseMessage = null;
switch (msgID)
{
case 1001:
PlayerMessage playerMessage = new PlayerMessage();
playerMessage.Reading(cacheBytes, nowIndex);
baseMessage = playerMessage;
break;
}
// 如果成功解析了消息体,将消息加入接收队列
if (baseMessage != null)
receiveQueue.Enqueue(baseMessage);
//移动解析位置 加上消息体长度
nowIndex += msgLength;
// 如果刚好解析完当前缓存数组所有内容,说明这个包没有黏包,重置缓存并退出循环,解析结束
if (nowIndex == cacheNum)
{
cacheNum = 0;
break;
}
}
// 如果不满足条件,表明存在分包的情况,需要将当前接收的内容记录下来 以便在下次接收到消息后继续处理
else
{
// 如果已经解析了消息ID和消息长度,但没有成功解析消息体,需要减去nowIndex解析位置的偏移。
// 要保留完整的消息ID和消息长度,以便下次完整解析。
if (msgLength != -1)
nowIndex -= 8;
// 使用Array.Copy方法,将剩余未解析的字节数组内容移到前面,用于缓存下次继续解析。
// 参数1: 要拷贝的数组 这里是原始的缓存数组 cacheBytes。
// 参数2: 从第几个索引开始拷贝后面的内容 这里是 nowIndex,表示从未解析的部分开始,把nowIndex到尾部的字节元素都要拷贝
// 参数3: 拷贝到的目标数组 也是原始的缓存数组 cacheBytes,因此在这里实际上是在原数组中进行移动操作。
// 参数4: 目标数组开始索引 这里是0,表示将数据移动到数组的开头。
// 参数5: 拷贝长度 这里是 cacheNum - nowIndex,表示要移动的字节数,即未解析部分的长度。cacheNum代表原先缓存数组所有要解析的字节数组长度,减去nowIndex代表未解析部分的长度。
Array.Copy(cacheBytes, nowIndex, cacheBytes, 0, cacheNum - nowIndex);
// 更新缓存的长度,减去已解析的部分,以便在下次继续解析时正确处理未解析的内容。
cacheNum = cacheNum - nowIndex;
break;
}
}
}
void Update()
{
// 在Unity的每一帧中检查是否有待处理的接收消息,如果有,则打印出来
if (receiveQueue.Count > 0)
{
BaseMessage baseMessage = receiveQueue.Dequeue();
if (baseMessage is PlayerMessage)
{
PlayerMessage playerMessage = (baseMessage as PlayerMessage);
print(playerMessage.playerID);
print(playerMessage.playerData.name);
print(playerMessage.playerData.lev);
print(playerMessage.playerData.atk);
}
}
}
}
QuitMessage
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
public class QuitMessage : BaseMessage
{
public override int GetBytesNum()
{
//需要存储 ID和长度 两个int 8字节
return 4 + 4;
}
public override int Reading(byte[] bytes, int beginIndex = 0)
{
//没有消息体 0字节
return 0;
}
public override byte[] Writing()
{
//写入ID和长度到字节数组并返回
int index = 0;
byte[] bytes = new byte[GetBytesNum()];
WriteInt(bytes, GetID(), ref index);
WriteInt(bytes, 0, ref index);
return bytes;
}
public override int GetID()
{
return 1003;
}
}
Lesson26_网络通信_套接字Socket_TCP通信_同步_心跳消息_客户端主动断开连接服务端
namespace Lesson26_网络通信_套接字Socket_TCP通信_同步_心跳消息_客户端主动断开连接
{
internal class Program
{
public static ServerSocket serverSocket;
static void Main(string[] args)
{
// 创建一个ServerSocket对象,用于处理服务器端的操作
serverSocket = new ServerSocket();
// 启动服务器,绑定到本地IP地址 127.0.0.1,监听端口 8080,允许最大连接数为 1024
serverSocket.Start("127.0.0.1", 8080, 1024);
// 输出服务器开启成功的消息
Console.WriteLine("服务器开启成功");
while (true)
{
// 从控制台读取用户输入
string input = Console.ReadLine();
// 如果用户输入 "Quit",则关闭服务器
if (input == "Quit")
{
serverSocket.Close();
}
// 如果用户输入以 "B:" 开头,表示要广播消息给所有客户端
else if (input.Substring(0, 2) == "B:")
{
if (input.Substring(2) == "1001")
{
PlayerMessage playerMessage = new PlayerMessage();
playerMessage.playerID = 9876;
playerMessage.playerData = new PlayerData();
playerMessage.playerData.name = "服务器端发来的消息";
playerMessage.playerData.lev = 99;
playerMessage.playerData.atk = 80;
serverSocket.Broadcast(playerMessage);
}
}
}
}
}
}
ClientSocket
using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading;
namespace Lesson26_网络通信_套接字Socket_TCP通信_同步_心跳消息_客户端主动断开连接
{
class ClientSocket
{
private static int CLIENT_BEGIN_ID = 1; // 静态变量,用于为客户端分配唯一的客户端ID
public int clientID; // 客户端的唯一ID
public Socket clientSocket; // 与客户端通信的套接字对象
//用于处理分包时 缓存的 字节数组 和 字节数组长度
private byte[] cacheBytes = new byte[1024 * 1024];
private int cacheNum = 0;
/// <summary>
/// 是否是连接状态
/// </summary>
public bool isClientConnected => this.clientSocket.Connected; // 判断套接字是否处于连接状态
public ClientSocket(Socket clientSocket)
{
this.clientID = CLIENT_BEGIN_ID; // 初始化客户端ID
this.clientSocket = clientSocket; // 初始化套接字
++CLIENT_BEGIN_ID; // 为下一个客户端分配不同的ID
}
// 关闭套接字连接
public void Close()
{
if (clientSocket != null)
{
clientSocket.Shutdown(SocketShutdown.Both); // 关闭套接字的读写
clientSocket.Close(); // 关闭套接字连接
clientSocket = null;
}
}
// 发送消息给客户端
public void Send(BaseMessage baseMessage)
{
if (isClientConnected)
{
try
{
clientSocket.Send(baseMessage.Writing()); // 将消息编码为UTF-8字节数组并发送给客户端
}
catch (Exception e)
{
Console.WriteLine("发消息出错" + e.Message);
//解析报错也加入到待移除客户端列表
Program.serverSocket.AddDelSocket(this);
//Close(); // 如果发送出现异常,关闭套接字连接
}
}
else
{
//断开连接的话加入到待移除客户端列表
Program.serverSocket.AddDelSocket(this);
}
}
// 接收来自客户端的消息
public void Receive()
{
if (!isClientConnected)
{
//断开连接的话加入到待移除客户端列表
Program.serverSocket.AddDelSocket(this);
return;
}
try
{
if (clientSocket.Available > 0)// 如果套接字中有可读数据
{
byte[] result = new byte[1024 * 5]; // 创建一个缓冲区来存储接收到的数据
int receiveNum = clientSocket.Receive(result);// 从套接字接收数据并存储在缓冲区中
HandleReceiveMsg(result,receiveNum);
////收到数据后 先读取4个字节 转为ID 才知道用哪一个类型去处理反序列化
//int msgID = BitConverter.ToInt32(result, 0);
//BaseMessage baseMessage = null;
//switch (msgID)
//{
// case 1001:
// baseMessage = new PlayerMessage();
// baseMessage.Reading(result, 4);
// break;
//}
//if (baseMessage == null)
// return;
//ThreadPool.QueueUserWorkItem(HandleMessage, baseMessage);
}
}
catch (Exception e)
{
Console.WriteLine("收消息出错" + e.Message);
//解析报错也加入到待移除客户端列表
Program.serverSocket.AddDelSocket(this);
//Close(); // 如果接收出现异常,关闭套接字连接
}
}
// 处理接受消息 分包、黏包问题的方法
private void HandleReceiveMsg(byte[] receiveBytes, int receiveNum)
{
int msgID = 0; // 消息ID
int msgLength = 0; // 消息长度
int nowIndex = 0; // 当前消息解析到哪一位
// 当接收到消息时,检查是否有之前缓存的数据
// 如果有,说明有分包,将新收到的字节数组拼接到后面,数组长度加上缓存长度
// 如果没有,缓存数组是空数组,缓存长度是0,不影响后面的逻辑
receiveBytes.CopyTo(cacheBytes, cacheNum);
cacheNum += receiveNum;
while (true)
{
// 在每次循环开始时将消息长度设置为-1作为标记,以避免上一次解析的数据影响当前的判断
msgLength = -1;
// 如果当前的缓存数组长度大于8 那么就可以解析这一个包的消息ID和消息长度 移动解析位置
if (cacheNum - nowIndex >= 8)
{
// 解析消息ID
msgID = BitConverter.ToInt32(cacheBytes, nowIndex);
nowIndex += 4;
// 解析消息长度
msgLength = BitConverter.ToInt32(cacheBytes, nowIndex);
nowIndex += 4;
}
// 缓存数组长度减去当前解析的位置假如大于消息长度 且消息长度不能是-1(-1说明没有解析消息长度 那就更不能解析消息体了)说明可以解析消息体
if (cacheNum - nowIndex >= msgLength && msgLength != -1)
{
// 解析消息体
BaseMessage baseMessage = null;
switch (msgID)
{
case 1001:
baseMessage = new PlayerMessage();
baseMessage.Reading(cacheBytes, nowIndex);
break;
case 1003:
baseMessage = new QuitMessage();
//QuitMessage没有消息体 不用反序列化
break;
}
// 如果成功解析了消息体,将消息加入接收队列
if (baseMessage != null)
//receiveQueue.Enqueue(baseMessage);
ThreadPool.QueueUserWorkItem(HandleMessage, baseMessage);//开启线程进行处理
//移动解析位置 加上消息体长度
nowIndex += msgLength;
// 如果刚好解析完当前缓存数组所有内容,说明这个包没有黏包,重置缓存并退出循环,解析结束
if (nowIndex == cacheNum)
{
cacheNum = 0;
break;
}
}
// 如果不满足条件,表明存在分包的情况,需要将当前接收的内容记录下来 以便在下次接收到消息后继续处理
else
{
// 如果已经解析了消息ID和消息长度,但没有成功解析消息体,需要减去nowIndex解析位置的偏移。
// 要保留完整的消息ID和消息长度,以便下次完整解析。
if (msgLength != -1)
nowIndex -= 8;
// 使用Array.Copy方法,将剩余未解析的字节数组内容移到前面,用于缓存下次继续解析。
// 参数1: 要拷贝的数组 这里是原始的缓存数组 cacheBytes。
// 参数2: 从第几个索引开始拷贝后面的内容 这里是 nowIndex,表示从未解析的部分开始,把nowIndex到尾部的字节元素都要拷贝
// 参数3: 拷贝到的目标数组 也是原始的缓存数组 cacheBytes,因此在这里实际上是在原数组中进行移动操作。
// 参数4: 目标数组开始索引 这里是0,表示将数据移动到数组的开头。
// 参数5: 拷贝长度 这里是 cacheNum - nowIndex,表示要移动的字节数,即未解析部分的长度。cacheNum代表原先缓存数组所有要解析的字节数组长度,减去nowIndex代表未解析部分的长度。
Array.Copy(cacheBytes, nowIndex, cacheBytes, 0, cacheNum - nowIndex);
// 更新缓存的长度,减去已解析的部分,以便在下次继续解析时正确处理未解析的内容。
cacheNum = cacheNum - nowIndex;
break;
}
}
}
// 处理接收到的消息
private void HandleMessage(object obj)
{
BaseMessage baseMessage = obj as BaseMessage;
if (baseMessage is PlayerMessage)
{
PlayerMessage playerMessage = baseMessage as PlayerMessage;
Console.WriteLine(playerMessage.playerID);
Console.WriteLine(playerMessage.playerData.name);
Console.WriteLine(playerMessage.playerData.lev);
Console.WriteLine(playerMessage.playerData.atk);
}
else if(baseMessage is QuitMessage)
{
//收到断开消息 添加到断开列表中
Program.serverSocket.AddDelSocket(this);
}
}
}
}
ServerSocket
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
namespace Lesson26_网络通信_套接字Socket_TCP通信_同步_心跳消息_客户端主动断开连接
{
class ServerSocket
{
// 服务器端Socket
public Socket serverSocket;
// 保存客户端连接的所有Socket的字典
public Dictionary<int, ClientSocket> clientSocketDictionary = new Dictionary<int, ClientSocket>();
//有待移除的客户端socket 避免 在foreach时直接从字典中移除 出现问题
private List<ClientSocket> delList = new List<ClientSocket>();
// 用于标识服务器是否关闭的标志
private bool isServerClose;
// 开启服务器端
public void Start(string ipString, int port, int clientSocketMaxNum)
{
// 初始化服务器关闭标志为假
isServerClose = false;
// 创建服务器套接字,指定地址族为IPv4、套接字类型为流套接字、协议类型为TCP
serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
// 创建IP终结点,指定IP地址和端口号
IPEndPoint serverEndPoint = new IPEndPoint(IPAddress.Parse(ipString), port);
// 将套接字绑定到指定的IP终结点
serverSocket.Bind(serverEndPoint);
// 启动服务器套接字,同时指定同时等待连接的最大客户端数
serverSocket.Listen(clientSocketMaxNum);
// 启动线程池中的线程来处理客户端连接请求和消息接收
ThreadPool.QueueUserWorkItem(AcceptClientConnect);
ThreadPool.QueueUserWorkItem(ReceiveClientMessage);
}
// 关闭服务器端
public void Close()
{
// 设置服务器关闭标志为真
isServerClose = true;
// 关闭所有客户端连接
foreach (ClientSocket client in clientSocketDictionary.Values)
{
client.Close();
}
clientSocketDictionary.Clear();
// 关闭服务器套接字的读写
serverSocket.Shutdown(SocketShutdown.Both);
// 关闭服务器套接字
serverSocket.Close();
// 将服务器套接字设置为null
serverSocket = null;
}
// 接受客户端连接
private void AcceptClientConnect(object obj)
{
while (!isServerClose)
{
try
{
// 等待并接受一个客户端连接请求
Socket clientSocket = serverSocket.Accept();
// 创建一个新的ClientSocket对象来管理客户端连接
ClientSocket client = new ClientSocket(clientSocket);
// 向客户端发送欢迎消息
//client.Send("欢迎连入服务器");
//线程锁 保证字典线程安全
lock (clientSocketDictionary)
{
// 将客户端Socket对象添加到字典中,以客户端ID作为键
clientSocketDictionary.Add(client.clientID, client);
}
}
catch (Exception e)
{
Console.WriteLine("客户端连入报错" + e.Message);
}
}
}
// 接收客户端消息
private void ReceiveClientMessage(object obj)
{
while (!isServerClose)
{
if (clientSocketDictionary.Count > 0)
{
//线程锁 保证字典线程安全
lock (clientSocketDictionary)
{
foreach (ClientSocket client in clientSocketDictionary.Values)
{
// 从每个客户端接收消息
client.Receive();
}
//把待删除列表中的客户端断开连接后删除
CloseDelListSocket();
}
}
}
}
// 向所有客户端广播消息
public void Broadcast(BaseMessage baseMessage)
{
//线程锁 保证字典线程安全
lock (clientSocketDictionary)
{
foreach (ClientSocket client in clientSocketDictionary.Values)
{
client.Send(baseMessage);
}
}
}
//关闭客户端连接的 从字典中移除
public void CloseClientSocket(ClientSocket socket)
{
//添加线程锁 服务器类有很多线程 假如同时操作字典会有问题 保证线程安全
lock (clientSocketDictionary)
{
socket.Close();
if (clientSocketDictionary.ContainsKey(socket.clientID))
{
clientSocketDictionary.Remove(socket.clientID);
Console.WriteLine("客户端{0}主动断开连接了", socket.clientID);
}
}
}
//添加待移除的 socket内容
public void AddDelSocket(ClientSocket socket)
{
if (!delList.Contains(socket))
delList.Add(socket);
}
//遍历待删除列表断开连接并删除
public void CloseDelListSocket()
{
//判断有没有 断开连接的 把其 移除
for (int i = 0; i < delList.Count; i++)
CloseClientSocket(delList[i]);
delList.Clear();
}
}
}
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 785293209@qq.com