27.TCP同步心跳消息

27.网络通信-套接字Socket-TCP通信-同步-心跳消息-实现


27.1 知识点

什么是心跳消息

所谓心跳消息,就是在长连接中,客户端和服务端之间定期发送的一种特殊的数据包,用于通知对方自己还在线,以确保长连接的有效性。由于其发送的时间间隔往往是固定的持续的,就像是心跳一样一直存在,所以我们称之为心跳消息。

为什么需要心跳消息

  1. 避免非正常关闭客户端时,服务器无法正常收到关闭连接消息。通过心跳消息我们可以自定义超时判断,如果超时没有收到客户端消息,证明客户端已经断开连接。

  2. 避免客户端长期不发送消息,防火墙或者路由器会断开连接,我们可以通过心跳消息一直保持活跃状态。

实现心跳消息

主要功能

客户端:主要功能是定时发送消息。

服务器端:主要功能是不停检测上次收到某客户端消息的时间,如果超时则认为连接已经断开。

定义心跳消息类

public class HeartMessage : BaseMessage
{
    public override int GetBytesNum()
    {
        // 需要存储 ID 和长度,两个 int,共 8 字节
        return 4 + 4;
    }

    public override int Reading(byte[] bytes, int beginIndex = 0)
    {
        // 没有消息体,0 字节
        return 0;
    }

    public override byte[] Writing()
    {
        // 写入 ID 和长度到字节数组并返回
        int index = 0;
        byte[] bytes = new byte[GetBytesNum()];
        WriteInt(bytes, GetID(), ref index);
        WriteInt(bytes, 0, ref index);
        return bytes;
    }

    public override int GetID()
    {
        return 999;
    }
}

TcpNetManager实现连接时每隔一段时间发送心跳消息

// 发送心跳消息的间隔时间
private int SEND_HEART_MSG_TIME = 2;
private HeartMessage heartMessage = new HeartMessage();

protected override void Awake()
{
    base.Awake();

    // 客户端循环定时给服务端发送心跳消息
    InvokeRepeating("SendHeartMsg", 0, SEND_HEART_MSG_TIME);
}

private void SendHeartMsg()
{
    if (isConnected)
        Send(heartMessage);
}

在服务端 ClientSocket 类定义上次心跳时间和超时时间

// 上一次收到消息的时间
private long frontTime = -1;
// 超时时间
private static int TIME_OUT_TIME = 10;

在服务端 ClientSocket 类定义检查心跳是否超时的方法。可以在构造函数时就开一个线程每隔几秒检查一次,但是比较耗性能。可以在收消息的时候检查,但是也会每帧判断,可以自行加逻辑优化

public ClientSocket(Socket clientSocket)
{
    this.clientID = CLIENT_BEGIN_ID;  // 初始化客户端ID
    this.clientSocket = clientSocket;  // 初始化套接字
    ++CLIENT_BEGIN_ID;  // 为下一个客户端分配不同的ID

    //我们现在为了方便大家理解 所以开了一个线程专门计时 但是这种方式比较消耗性能 不建议这样使用
    //ThreadPool.QueueUserWorkItem(CheckTimeOut);
}

/// <summary>
/// 间隔一段时间 检测一次超时 如果超时 就会主动断开该客户端的连接
/// </summary>
/// <param name="obj"></param>
private void CheckTimeOut(/*object obj*/)
{
    //while (Connected)
    //{
    if (frontTime != -1 &&
    DateTime.Now.Ticks / TimeSpan.TicksPerSecond - frontTime >= TIME_OUT_TIME)
    {
        Program.serverSocket.AddDelSocket(this);
        //break;
    }
    //Thread.Sleep(5000);
    //}
}

// 接收来自客户端的消息
public void Receive()
{
    if (!isClientConnected)
    {
            ...
    }
        
    try
    {
        if (clientSocket.Available > 0)// 如果套接字中有可读数据
        {
                ...
        }

        //检测 是否超时 
        CheckTimeOut();
    }
    catch (Exception e)
    {
            ...
    }
}

在服务端 ClientSocket 类分包、黏包方法和处理消息方法中加上对心跳消息的处理

// 处理接受消息 分包、黏包问题的方法
private void HandleReceiveMsg(byte[] receiveBytes, int receiveNum)
{
            ...
            
        // 缓存数组长度减去当前解析的位置假如大于消息长度 且消息长度不能是-1(-1说明没有解析消息长度 那就更不能解析消息体了)说明可以解析消息体
        if (cacheNum - nowIndex >= msgLength && msgLength != -1)
        {
            // 解析消息体
            BaseMessage baseMessage = null;
            switch (msgID)
            {
                case 1001:
                    baseMessage = new PlayerMessage();
                    baseMessage.Reading(cacheBytes, nowIndex);
                    break;
                case 1003:
                    baseMessage = new QuitMessage();
                    //QuitMessage没有消息体 不用反序列化
                    break;
                case 999:
                    baseMessage = new HeartMessage();
                    //由于该消息没有消息体 所以都不用反序列化
                    break;
            }

                ...
        }
        
        else
        {
            ...
        }
}


// 处理接收到的消息
private void HandleMessage(object obj)
{
    BaseMessage baseMessage = obj as BaseMessage;
    if (baseMessage is PlayerMessage)
    {
            .../
    }
    else if(baseMessage is QuitMessage)
    {
        ...
    }
    else if (baseMessage is HeartMessage)
    {
        //收到心跳消息 记录收到消息的时间
        frontTime = DateTime.Now.Ticks / TimeSpan.TicksPerSecond;
        Console.WriteLine("收到心跳消息");
    }
}

为了测试心跳消息超时是否能检测生效,暂时把 TcpNetManager 断开发送退出消息的逻辑注释

// 关闭连接
public void Close()
{
    print("客户端主动断开连接");
    if (socket != null) // 如果套接字对象存在
    {
        ////主动发送一条断开的消息个服务端 
        //QuitMessage quitMessage = new QuitMessage();
        ////这里不能用我们封装的Send方法 因为Send方法是开一个线程发送的 可能还没发就直接被断开了
        //socket.Send(quitMessage.Writing());

        //socket.Shutdown(SocketShutdown.Both); // 关闭套接字的发送和接收

        //socket.Disconnect(false);//手动停止连接 参数意思是这个socket以后还用不用

        //socket.Close(); // 关闭套接字连接

        socket = null;//当前socket不会再用了 置空

        isConnected = false; // 标记连接已关闭
    }
}

开启服务端和客户端后,关闭客户端等10秒可以检测到客户端断开连接

总结

心跳消息是长连接项目中必备的一套逻辑规则。通过它可以帮助我们在服务器端及时的释放掉失效的 socket,可以有效避免当客户端非正常关闭时,服务器端不能及时判断连接已断开。


27.2 知识点代码

TcpNetManager

using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;

public class TcpNetManager : BaseSingletonInMonoBehaviour<TcpNetManager>
{
    private Socket socket; // 创建Socket对象,用于网络通信
    private Queue<BaseMessage> sendMsgQueue = new Queue<BaseMessage>(); // 创建一个队列,用于存储待发送的消息
    private Queue<BaseMessage> receiveQueue = new Queue<BaseMessage>(); // 创建一个队列,用于存储接收到的消息

    private bool isConnected = false; // 用于标识是否已连接到服务器

    //private byte[] receiveBytes = new byte[1024 * 1024]; // 创建一个字节数组,用于存储接收到的数据
    //private int receiveNum; // 用于存储接收到的字节数

    //用于处理分包时 缓存的 字节数组 和 字节数组长度
    private byte[] cacheBytes = new byte[1024 * 1024];
    private int cacheNum = 0;

    //发送心跳消息的间隔时间
    private int SEND_HEART_MSG_TIME = 2;
    private HeartMessage heartMessage = new HeartMessage();

    protected override void Awake()
    {
        base.Awake();

        //客户端循环定时给服务端发送心跳消息
        InvokeRepeating("SendHeartMsg", 0, SEND_HEART_MSG_TIME);
    }

    private void SendHeartMsg()
    {
        if (isConnected)
            Send(heartMessage);
    }

    // 连接服务器
    public void Connect(string ip, int port)
    {
        if (isConnected) // 如果已连接,则直接返回
            return;

        if (socket == null) // 如果套接字为空,创建一个套接字对象
            socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

        IPEndPoint ipPoint = new IPEndPoint(IPAddress.Parse(ip), port); // 创建一个IP终结点对象
        try
        {
            // 尝试连接到指定的IP地址和端口
            socket.Connect(ipPoint);
            isConnected = true; // 标记已连接
            ThreadPool.QueueUserWorkItem(SendMsg); // 创建并启动发送消息的线程
            ThreadPool.QueueUserWorkItem(ReceiveMsg); // 创建并启动接收消息的线程
        }
        catch (SocketException e)
        {
            if (e.ErrorCode == 10061) // 如果连接被服务器拒绝
                print("服务器拒绝连接");
            else
                print("连接失败" + e.ErrorCode + e.Message); // 打印连接失败的信息
        }
    }

    // 关闭连接
    public void Close()
    {
        print("客户端主动断开连接");
        if (socket != null) // 如果套接字对象存在
        {
            //主动发送一条断开的消息个服务端 
            QuitMessage quitMessage = new QuitMessage();

            //这里不能用我们封装的Send方法 因为Send方法是开一个线程发送的 可能还没发就直接被断开了
            socket.Send(quitMessage.Writing());

            socket.Shutdown(SocketShutdown.Both); // 关闭套接字的发送和接收

            socket.Disconnect(false);//手动停止连接 参数意思是这个socket以后还用不用

            socket.Close(); // 关闭套接字连接

            socket = null;//当前socket不会再用了 置空

            isConnected = false; // 标记连接已关闭
        }
    }

    // 当对象被销毁时,确保关闭连接
    private void OnDestroy()
    {
        Close(); // 调用关闭连接的方法
    }

    /// <summary>
    /// 用于测试 直接发字节数组的方法
    /// </summary>
    /// <param name="bytes"></param>
    public void SendTest(byte[] bytes)
    {

        socket.Send(bytes);
    }

    // 发送消息
    public void Send(BaseMessage baseMessage)
    {
        sendMsgQueue.Enqueue(baseMessage); // 将消息添加到发送消息队列
    }

    // 在独立线程中处理发送消息的逻辑
    private void SendMsg(object obj)
    {
        while (isConnected) // 只要连接有效
        {
            if (sendMsgQueue.Count > 0) // 如果发送消息队列中有待发送的消息
            {
                // 从队列中取出消息并发送到服务器
                socket.Send(sendMsgQueue.Dequeue().Writing());
            }
        }
    }

    // 在独立线程中处理接收消息的逻辑
    private void ReceiveMsg(object obj)
    {
        while (isConnected) // 只要连接有效
        {
            if (socket.Available > 0) // 如果有可接收的数据
            {
                //临时字节数组
                byte[] receiveBytes = new byte[1024 * 1024];

                // 接收从服务器发送来的数据,并将数据转换成字符串后存储到接收消息队列 得到字节数组长度
                int receiveNum = socket.Receive(receiveBytes);

                //处理接受消息 分包、黏包问题
                HandleReceiveMsg(receiveBytes, receiveNum);

                ////首先把收到字节数组的前4个字节  读取出来得到ID
                //int msgID = BitConverter.ToInt32(receiveBytes, 0);
                //BaseMessage baseMessage = null;
                //switch (msgID)
                //{
                //    case 1001:
                //        PlayerMessage playerMessage = new PlayerMessage();
                //        playerMessage.Reading(receiveBytes, 4);
                //        baseMessage = playerMessage;
                //        break;
                //}
                ////如果消息为空 那证明是不知道类型的消息 没有解析
                //if (baseMessage == null)
                //    continue;
                ////收到消息 解析消息为字符串 并放入公共容器
                //receiveQueue.Enqueue(baseMessage);
            }
        }
    }

    // 处理接受消息 分包、黏包问题的方法
    private void HandleReceiveMsg(byte[] receiveBytes, int receiveNum)
    {
        int msgID = 0;       // 消息ID
        int msgLength = 0;   // 消息长度
        int nowIndex = 0;    // 当前消息解析到哪一位

        // 当接收到消息时,检查是否有之前缓存的数据
        // 如果有,说明有分包,将新收到的字节数组拼接到后面,数组长度加上缓存长度
        // 如果没有,缓存数组是空数组,缓存长度是0,不影响后面的逻辑
        receiveBytes.CopyTo(cacheBytes, cacheNum);
        cacheNum += receiveNum;

        while (true)
        {
            // 在每次循环开始时将消息长度设置为-1作为标记,以避免上一次解析的数据影响当前的判断
            msgLength = -1;

            // 如果当前的缓存数组长度大于8 那么就可以解析这一个包的消息ID和消息长度 移动解析位置
            if (cacheNum - nowIndex >= 8)
            {
                // 解析消息ID
                msgID = BitConverter.ToInt32(cacheBytes, nowIndex);
                nowIndex += 4;

                // 解析消息长度
                msgLength = BitConverter.ToInt32(cacheBytes, nowIndex);
                nowIndex += 4;
            }

            // 缓存数组长度减去当前解析的位置假如大于消息长度 且消息长度不能是-1(-1说明没有解析消息长度 那就更不能解析消息体了)说明可以解析消息体
            if (cacheNum - nowIndex >= msgLength && msgLength != -1)
            {
                // 解析消息体
                BaseMessage baseMessage = null;
                switch (msgID)
                {
                    case 1001:
                        PlayerMessage playerMessage = new PlayerMessage();
                        playerMessage.Reading(cacheBytes, nowIndex);
                        baseMessage = playerMessage;
                        break;
                }

                // 如果成功解析了消息体,将消息加入接收队列
                if (baseMessage != null)
                    receiveQueue.Enqueue(baseMessage);

                //移动解析位置 加上消息体长度
                nowIndex += msgLength;

                // 如果刚好解析完当前缓存数组所有内容,说明这个包没有黏包,重置缓存并退出循环,解析结束
                if (nowIndex == cacheNum)
                {
                    cacheNum = 0;
                    break;
                }
            }
            // 如果不满足条件,表明存在分包的情况,需要将当前接收的内容记录下来 以便在下次接收到消息后继续处理
            else
            {
                // 如果已经解析了消息ID和消息长度,但没有成功解析消息体,需要减去nowIndex解析位置的偏移。
                // 要保留完整的消息ID和消息长度,以便下次完整解析。
                if (msgLength != -1)
                    nowIndex -= 8;

                // 使用Array.Copy方法,将剩余未解析的字节数组内容移到前面,用于缓存下次继续解析。
                // 参数1: 要拷贝的数组  这里是原始的缓存数组 cacheBytes。
                // 参数2: 从第几个索引开始拷贝后面的内容  这里是 nowIndex,表示从未解析的部分开始,把nowIndex到尾部的字节元素都要拷贝
                // 参数3: 拷贝到的目标数组  也是原始的缓存数组 cacheBytes,因此在这里实际上是在原数组中进行移动操作。
                // 参数4: 目标数组开始索引  这里是0,表示将数据移动到数组的开头。
                // 参数5: 拷贝长度  这里是 cacheNum - nowIndex,表示要移动的字节数,即未解析部分的长度。cacheNum代表原先缓存数组所有要解析的字节数组长度,减去nowIndex代表未解析部分的长度。
                Array.Copy(cacheBytes, nowIndex, cacheBytes, 0, cacheNum - nowIndex);

                // 更新缓存的长度,减去已解析的部分,以便在下次继续解析时正确处理未解析的内容。
                cacheNum = cacheNum - nowIndex;

                break;
            }
        }
    }

    void Update()
    {
        // 在Unity的每一帧中检查是否有待处理的接收消息,如果有,则打印出来
        if (receiveQueue.Count > 0)
        {
            BaseMessage baseMessage = receiveQueue.Dequeue();
            if (baseMessage is PlayerMessage)
            {
                PlayerMessage playerMessage = (baseMessage as PlayerMessage);
                print(playerMessage.playerID);
                print(playerMessage.playerData.name);
                print(playerMessage.playerData.lev);
                print(playerMessage.playerData.atk);
            }
        }
    }
}

Lesson27_网络通信_套接字Socket_TCP通信_同步_心跳消息_实现

public class Lesson27_网络通信_套接字Socket_TCP通信_同步_心跳消息_实现 : MonoBehaviour
{
    public InputField InputField;
    public Button sendButton;
    public Button nianSendButton;
    public Button fenSendButton;
    public Button fenNiansendButton;
    void Start()
    {
        #region 客户端逻辑

        TcpNetManager.Instance.Connect("127.0.0.1", 8080);

        //直接发消息测试
        sendButton.onClick.AddListener(() =>
        {
            PlayerMessage playerMessage = new PlayerMessage();
            playerMessage.playerID = 1111;
            playerMessage.playerData = new PlayerData();
            playerMessage.playerData.name = "韬老狮客户端发送的信息";
            playerMessage.playerData.atk = 22;
            playerMessage.playerData.lev = 10;
            TcpNetManager.Instance.Send(playerMessage);
        });

        //黏包测试
        nianSendButton.onClick.AddListener(() =>
        {
            PlayerMessage playerMessage1 = new PlayerMessage();
            playerMessage1.playerID = 1001;
            playerMessage1.playerData = new PlayerData();
            playerMessage1.playerData.name = "韬老狮1";
            playerMessage1.playerData.atk = 1;
            playerMessage1.playerData.lev = 1;

            PlayerMessage payerMessage = new PlayerMessage();
            payerMessage.playerID = 1002;
            payerMessage.playerData = new PlayerData();
            payerMessage.playerData.name = "韬老狮2";
            payerMessage.playerData.atk = 2;
            payerMessage.playerData.lev = 2;

            //黏包 把两个对象的字节数组合并在一起发送
            byte[] bytes = new byte[playerMessage1.GetBytesNum() + payerMessage.GetBytesNum()];
            playerMessage1.Writing().CopyTo(bytes, 0);
            payerMessage.Writing().CopyTo(bytes, playerMessage1.GetBytesNum());
            TcpNetManager.Instance.SendTest(bytes);
        });

        //分包测试 使用异步函数
        fenSendButton.onClick.AddListener(async () =>
        {
            PlayerMessage playerMessage = new PlayerMessage();
            playerMessage.playerID = 1003;
            playerMessage.playerData = new PlayerData();
            playerMessage.playerData.name = "韬老狮1";
            playerMessage.playerData.atk = 3;
            playerMessage.playerData.lev = 3;

            byte[] bytes = playerMessage.Writing();
            //分包
            byte[] bytes1 = new byte[10];
            byte[] bytes2 = new byte[bytes.Length - 10];
            //分成第一个包
            Array.Copy(bytes, 0, bytes1, 0, 10);
            //第二个包
            Array.Copy(bytes, 10, bytes2, 0, bytes.Length - 10);

            TcpNetManager.Instance.SendTest(bytes1);
            await Task.Delay(500);//延迟执行半秒进行分包
            TcpNetManager.Instance.SendTest(bytes2);
        });

        //分包、黏包测试
        fenNiansendButton.onClick.AddListener(async () =>
        {
            PlayerMessage playerMessage = new PlayerMessage();
            playerMessage.playerID = 1004;
            playerMessage.playerData = new PlayerData();
            playerMessage.playerData.name = "韬老狮1";
            playerMessage.playerData.atk = 4;
            playerMessage.playerData.lev = 4;

            PlayerMessage playerMessage2 = new PlayerMessage();
            playerMessage2.playerID = 1005;
            playerMessage2.playerData = new PlayerData();
            playerMessage2.playerData.name = "韬老狮2";
            playerMessage2.playerData.atk = 5;
            playerMessage2.playerData.lev = 5;

            byte[] bytes1 = playerMessage.Writing();//消息A
            byte[] bytes2 = playerMessage2.Writing();//消息B

            byte[] bytes2_1 = new byte[10];
            byte[] bytes2_2 = new byte[bytes2.Length - 10];
            //分成第一个包
            Array.Copy(bytes2, 0, bytes2_1, 0, 10);
            //第二个包
            Array.Copy(bytes2, 10, bytes2_2, 0, bytes2.Length - 10);

            //消息A和消息B前一段的 黏包
            byte[] bytes1_2 = new byte[bytes1.Length + bytes2_1.Length];
            bytes1.CopyTo(bytes1_2, 0);
            bytes2_1.CopyTo(bytes1_2, bytes1.Length);

            TcpNetManager.Instance.SendTest(bytes1_2);
            await Task.Delay(500);
            TcpNetManager.Instance.SendTest(bytes2_2);
        });

        #endregion

        #region 知识点一 什么是心跳消息?
        //所谓心跳消息,就是在长连接中,客户端和服务端之间定期发送的一种特殊的数据包
        //用于通知对方自己还在线,以确保长连接的有效性

        //由于其发送的时间间隔往往是固定的持续的,就像是心跳一样一直存在
        //所以我们称之为心跳消息
        #endregion

        #region 知识点二 为什么需要心跳消息?
        //1.避免非正常关闭客户端时,服务器无法正常收到关闭连接消息
        //通过心跳消息我们可以自定义超时判断,如果超时没有收到客户端消息,证明客户端已经断开连接

        //2.避免客户端长期不发送消息,防火墙或者路由器会断开连接,我们可以通过心跳消息一直保持活跃状态
        #endregion

        #region 知识点三 实现心跳消息
        //客户端
        //主要功能:定时发送消息

        //服务器
        //主要功能:不停检测上次收到某客户端消息的时间,如果超时则认为连接已经断开
        #endregion

        #region 总结
        //心跳消息是长连接项目中必备的一套逻辑规则
        //通过它可以帮助我们在服务器端及时的释放掉失效的socket
        //可以有效避免当客户端非正常关闭时,服务器端不能及时判断连接已断开
        #endregion
    }
}

HeartMessage

using System.Collections;
using System.Collections.Generic;
using UnityEngine;

public class HeartMessage : BaseMessage
{
    public override int GetBytesNum()
    {
        //需要存储 ID和长度 两个int 8字节
        return 4 + 4;
    }

    public override int Reading(byte[] bytes, int beginIndex = 0)
    {
        //没有消息体 0字节
        return 0;
    }

    public override byte[] Writing()
    {
        //写入ID和长度到字节数组并返回
        int index = 0;
        byte[] bytes = new byte[GetBytesNum()];
        WriteInt(bytes, GetID(), ref index);
        WriteInt(bytes, 0, ref index);
        return bytes;
    }

    public override int GetID()
    {
        return 999;
    }
}

ClientSocket

using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading;

namespace Lesson27_网络通信_套接字Socket_TCP通信_同步_心跳消息_实现
{
    class ClientSocket
    {
        private static int CLIENT_BEGIN_ID = 1;  // 静态变量,用于为客户端分配唯一的客户端ID
        public int clientID;  // 客户端的唯一ID
        public Socket clientSocket;  // 与客户端通信的套接字对象

        //用于处理分包时 缓存的 字节数组 和 字节数组长度
        private byte[] cacheBytes = new byte[1024 * 1024];
        private int cacheNum = 0;

        //上一次收到消息的时间
        private long frontTime = -1;
        //超时时间
        private static int TIME_OUT_TIME = 10;

        /// <summary>
        /// 是否是连接状态
        /// </summary>
        public bool isClientConnected => this.clientSocket.Connected;  // 判断套接字是否处于连接状态

        public ClientSocket(Socket clientSocket)
        {
            this.clientID = CLIENT_BEGIN_ID;  // 初始化客户端ID
            this.clientSocket = clientSocket;  // 初始化套接字
            ++CLIENT_BEGIN_ID;  // 为下一个客户端分配不同的ID

            //我们现在为了方便大家理解 所以开了一个线程专门计时 但是这种方式比较消耗性能 不建议这样使用
            //ThreadPool.QueueUserWorkItem(CheckTimeOut);
        }

        /// <summary>
        /// 间隔一段时间 检测一次超时 如果超时 就会主动断开该客户端的连接
        /// </summary>
        /// <param name="obj"></param>
        private void CheckTimeOut(/*object obj*/)
        {
            //while (Connected)
            //{
            if (frontTime != -1 &&
            DateTime.Now.Ticks / TimeSpan.TicksPerSecond - frontTime >= TIME_OUT_TIME)
            {
                Program.serverSocket.AddDelSocket(this);
                //break;
            }
            //Thread.Sleep(5000);
            //}
        }

        // 关闭套接字连接
        public void Close()
        {
            if (clientSocket != null)
            {
                clientSocket.Shutdown(SocketShutdown.Both);  // 关闭套接字的读写
                clientSocket.Close();  // 关闭套接字连接
                clientSocket = null;
            }
        }

        // 发送消息给客户端
        public void Send(BaseMessage baseMessage)
        {
            if (isClientConnected)
            {
                try
                {
                    clientSocket.Send(baseMessage.Writing());  // 将消息编码为UTF-8字节数组并发送给客户端
                }
                catch (Exception e)
                {
                    Console.WriteLine("发消息出错" + e.Message);
                    //解析报错也加入到待移除客户端列表
                    Program.serverSocket.AddDelSocket(this);
                    //Close();  // 如果发送出现异常,关闭套接字连接
                }
            }
            else
            {
                //断开连接的话加入到待移除客户端列表
                Program.serverSocket.AddDelSocket(this);
            }
        }

        // 接收来自客户端的消息
        public void Receive()
        {
            if (!isClientConnected)
            {
                //断开连接的话加入到待移除客户端列表
                Program.serverSocket.AddDelSocket(this);
                return;
            }
                
            try
            {
                if (clientSocket.Available > 0)// 如果套接字中有可读数据
                {
                    byte[] result = new byte[1024 * 5];  // 创建一个缓冲区来存储接收到的数据
                    int receiveNum = clientSocket.Receive(result);// 从套接字接收数据并存储在缓冲区中
                    HandleReceiveMsg(result,receiveNum);
                    ////收到数据后 先读取4个字节 转为ID 才知道用哪一个类型去处理反序列化
                    //int msgID = BitConverter.ToInt32(result, 0);
                    //BaseMessage baseMessage = null;
                    //switch (msgID)
                    //{
                    //    case 1001:
                    //        baseMessage = new PlayerMessage();
                    //        baseMessage.Reading(result, 4);
                    //        break;
                    //}
                    //if (baseMessage == null)
                    //    return;
                    //ThreadPool.QueueUserWorkItem(HandleMessage, baseMessage);
                }

                //检测 是否超时 
                CheckTimeOut();
            }
            catch (Exception e)
            {
                Console.WriteLine("收消息出错" + e.Message);
                //解析报错也加入到待移除客户端列表
                Program.serverSocket.AddDelSocket(this);
                //Close(); // 如果接收出现异常,关闭套接字连接
            }
        }

        // 处理接受消息 分包、黏包问题的方法
        private void HandleReceiveMsg(byte[] receiveBytes, int receiveNum)
        {
            int msgID = 0;       // 消息ID
            int msgLength = 0;   // 消息长度
            int nowIndex = 0;    // 当前消息解析到哪一位

            // 当接收到消息时,检查是否有之前缓存的数据
            // 如果有,说明有分包,将新收到的字节数组拼接到后面,数组长度加上缓存长度
            // 如果没有,缓存数组是空数组,缓存长度是0,不影响后面的逻辑
            receiveBytes.CopyTo(cacheBytes, cacheNum);
            cacheNum += receiveNum;

            while (true)
            {
                // 在每次循环开始时将消息长度设置为-1作为标记,以避免上一次解析的数据影响当前的判断
                msgLength = -1;

                // 如果当前的缓存数组长度大于8 那么就可以解析这一个包的消息ID和消息长度 移动解析位置
                if (cacheNum - nowIndex >= 8)
                {
                    // 解析消息ID
                    msgID = BitConverter.ToInt32(cacheBytes, nowIndex);
                    nowIndex += 4;

                    // 解析消息长度
                    msgLength = BitConverter.ToInt32(cacheBytes, nowIndex);
                    nowIndex += 4;
                }

                // 缓存数组长度减去当前解析的位置假如大于消息长度 且消息长度不能是-1(-1说明没有解析消息长度 那就更不能解析消息体了)说明可以解析消息体
                if (cacheNum - nowIndex >= msgLength && msgLength != -1)
                {
                    // 解析消息体
                    BaseMessage baseMessage = null;
                    switch (msgID)
                    {
                        case 1001:
                            baseMessage = new PlayerMessage();
                            baseMessage.Reading(cacheBytes, nowIndex);
                            break;
                        case 1003:
                            baseMessage = new QuitMessage();
                            //QuitMessage没有消息体 不用反序列化
                            break;
                        case 999:
                            baseMessage = new HeartMessage();
                            //由于该消息没有消息体 所以都不用反序列化
                            break;
                    }

                    // 如果成功解析了消息体,将消息加入接收队列
                    if (baseMessage != null)
                        //receiveQueue.Enqueue(baseMessage);
                        ThreadPool.QueueUserWorkItem(HandleMessage, baseMessage);//开启线程进行处理

                    //移动解析位置 加上消息体长度
                    nowIndex += msgLength;

                    // 如果刚好解析完当前缓存数组所有内容,说明这个包没有黏包,重置缓存并退出循环,解析结束
                    if (nowIndex == cacheNum)
                    {
                        cacheNum = 0;
                        break;
                    }
                }
                // 如果不满足条件,表明存在分包的情况,需要将当前接收的内容记录下来 以便在下次接收到消息后继续处理
                else
                {
                    // 如果已经解析了消息ID和消息长度,但没有成功解析消息体,需要减去nowIndex解析位置的偏移。
                    // 要保留完整的消息ID和消息长度,以便下次完整解析。
                    if (msgLength != -1)
                        nowIndex -= 8;

                    // 使用Array.Copy方法,将剩余未解析的字节数组内容移到前面,用于缓存下次继续解析。
                    // 参数1: 要拷贝的数组  这里是原始的缓存数组 cacheBytes。
                    // 参数2: 从第几个索引开始拷贝后面的内容  这里是 nowIndex,表示从未解析的部分开始,把nowIndex到尾部的字节元素都要拷贝
                    // 参数3: 拷贝到的目标数组  也是原始的缓存数组 cacheBytes,因此在这里实际上是在原数组中进行移动操作。
                    // 参数4: 目标数组开始索引  这里是0,表示将数据移动到数组的开头。
                    // 参数5: 拷贝长度  这里是 cacheNum - nowIndex,表示要移动的字节数,即未解析部分的长度。cacheNum代表原先缓存数组所有要解析的字节数组长度,减去nowIndex代表未解析部分的长度。
                    Array.Copy(cacheBytes, nowIndex, cacheBytes, 0, cacheNum - nowIndex);

                    // 更新缓存的长度,减去已解析的部分,以便在下次继续解析时正确处理未解析的内容。
                    cacheNum = cacheNum - nowIndex;

                    break;
                }
            }
        }

        // 处理接收到的消息
        private void HandleMessage(object obj)
        {
            BaseMessage baseMessage = obj as BaseMessage;
            if (baseMessage is PlayerMessage)
            {
                PlayerMessage playerMessage = baseMessage as PlayerMessage;
                Console.WriteLine(playerMessage.playerID);
                Console.WriteLine(playerMessage.playerData.name);
                Console.WriteLine(playerMessage.playerData.lev);
                Console.WriteLine(playerMessage.playerData.atk);
            }
            else if(baseMessage is QuitMessage)
            {
                //收到断开消息 添加到断开列表中
                Program.serverSocket.AddDelSocket(this);
            }
            else if (baseMessage is HeartMessage)
            {
                //收到心跳消息 记录收到消息的时间
                frontTime = DateTime.Now.Ticks / TimeSpan.TicksPerSecond;
                Console.WriteLine("收到心跳消息");
            }
        }
    }
}


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 785293209@qq.com

×

喜欢就点赞,疼爱就打赏