31.TCP异步服务端进阶实现

31.网络通信-套接字Socket-TCP通信-异步-服务端综合练习题


31.1 知识点

将异步服务端,加上同步中的:区分消息类型,分包、黏包,心跳消息等功能

拷贝消息和基础数据相关代码到服务端

在ClientSocket类实现心跳消息检查

//上一次收到消息的时间
private long frontTime = -1;
//超时时间
private static int TIME_OUT_TIME = 10;

public ClientSocket(Socket clientSocket)
{
    this.clientID = CLIENT_BEGIN_ID;  // 初始化客户端ID
    this.clientSocket = clientSocket;  // 初始化套接字
    ++CLIENT_BEGIN_ID;  // 为下一个客户端分配不同的ID

    //开始收消息
    this.clientSocket.BeginReceive(cacheBytes, cacheNum, cacheBytes.Length, SocketFlags.None, ReceiveCallBack, null);

    //心跳消息检查
    ThreadPool.QueueUserWorkItem(CheckTimeOut);
}

/// <summary>
/// 间隔一段时间 检测一次超时 如果超时 就会主动断开该客户端的连接
/// </summary>
/// <param name="obj"></param>
private void CheckTimeOut(object obj)
{
    while (this.clientSocket != null && isClientConnected)
    {
        if (frontTime != -1 &&
            DateTime.Now.Ticks / TimeSpan.TicksPerSecond - frontTime >= TIME_OUT_TIME)
        {
            Program.serverSocket.CloseClientSocket(this);
            break;
        }
        Thread.Sleep(5000);
    }
}

在ClientSocket类,收发消息不使用字符串,使用BaseMessage,拷贝重写处理分包黏包函数和处理信息函数

// 发送消息给客户端
public void Send(BaseMessage baseMessage)
{
    // 检查 clientSocket 不为 null 并且客户端已连接。
    if (clientSocket != null && isClientConnected)
    {
        // 使用 Writing() 方法将消息序列化为字节数组。
        byte[] bytes = baseMessage.Writing();

        // 在 clientSocket 上启动异步发送操作以发送消息。
        // 参数:bytes - 要发送的数据,0 - 数据中的起始位置,
        // bytes.Length - 要发送的字节数,SocketFlags.None - 无特殊标志,
        // SendCallBack - 发送操作完成时执行的回调方法,null - 用户定义的对象。
        clientSocket.BeginSend(bytes, 0, bytes.Length, SocketFlags.None, SendCallBack, null);
    }
    else
    {
        // 如果 clientSocket 为 null 或客户端未连接,则关闭客户端的连接。
        Program.serverSocket.CloseClientSocket(this);
    }
}

// 发送消息给客户端的回调。
private void SendCallBack(IAsyncResult asyncResult)
{
    try
    {
        // 检查 clientSocket 不为 null 并且客户端已连接。
        if (clientSocket != null && isClientConnected)
            // 结束异步发送操作,标记其完成。
            this.clientSocket.EndSend(asyncResult);
        else
            // 如果 clientSocket 为 null 或客户端未连接,则关闭客户端的连接。
            Program.serverSocket.CloseClientSocket(this);
    }
    catch (SocketException e)
    {
        // 处理可能在发送操作期间发生的 SocketException。
        // 打印错误消息,包括 SocketErrorCode 和异常消息。
        Console.WriteLine("发送失败" + e.SocketErrorCode + e.Message);

        // 作为错误响应关闭客户端的连接。
        Program.serverSocket.CloseClientSocket(this);
    }
}

// 接收来自客户端的消息的回调
private void ReceiveCallBack(IAsyncResult asyncResult)
{
    try
    {
        if (this.clientSocket != null && isClientConnected)
        {
            // 从客户端成功接收消息
            int num = this.clientSocket.EndReceive(asyncResult);

            // 处理消息的分包和黏包问题
            HandleReceiveMsg(num);

            // 再次启动异步接收操作以等待下一条消息,将已接收的消息添加到缓存中。
            this.clientSocket.BeginReceive(cacheBytes, cacheNum, cacheBytes.Length - cacheNum, SocketFlags.None, ReceiveCallBack, this.clientSocket);
        }
        else
        {
            // 如果 clientSocket 为 null 或客户端未连接,则不再接收消息。
            Console.WriteLine("没有连接,不用再收消息了");
            Program.serverSocket.CloseClientSocket(this);
        }
    }
    catch (SocketException e)
    {
        // 处理可能在接收消息期间发生的 SocketException。
        Console.WriteLine("接受消息错误" + e.SocketErrorCode + e.Message);

        // 作为错误响应关闭客户端的连接。
        Program.serverSocket.CloseClientSocket(this);
    }
}

// 处理接受消息 分包、黏包问题的方法
private void HandleReceiveMsg(int receiveNum)
{
    int msgID = 0;       // 消息ID
    int msgLength = 0;   // 消息长度
    int nowIndex = 0;    // 当前消息解析到哪一位

    //由于消息接收后是直接存储在 cacheBytes中的 所以不需要进行什么拷贝操作
    //收到消息的字节数量
    cacheNum += receiveNum;

    while (true)
    {
        // 在每次循环开始时将消息长度设置为-1作为标记,以避免上一次解析的数据影响当前的判断
        msgLength = -1;

        // 如果当前的缓存数组长度大于8 那么就可以解析这一个包的消息ID和消息长度 移动解析位置
        if (cacheNum - nowIndex >= 8)
        {
            // 解析消息ID
            msgID = BitConverter.ToInt32(cacheBytes, nowIndex);
            nowIndex += 4;

            // 解析消息长度
            msgLength = BitConverter.ToInt32(cacheBytes, nowIndex);
            nowIndex += 4;
        }

        // 缓存数组长度减去当前解析的位置假如大于消息长度 且消息长度不能是-1(-1说明没有解析消息长度 那就更不能解析消息体了)说明可以解析消息体
        if (cacheNum - nowIndex >= msgLength && msgLength != -1)
        {
            //解析消息体
            BaseMessage baseMessage = null;
            switch (msgID)
            {
                case 1001:
                    baseMessage = new PlayerMessage();
                    baseMessage.Reading(cacheBytes, nowIndex);
                    break;
                case 1003:
                    baseMessage = new QuitMessage();
                    //由于该消息没有消息体 所以都不用反序列化
                    break;
                case 999:
                    baseMessage = new HeartMessage();
                    //由于该消息没有消息体 所以都不用反序列化
                    break;
            }
            if (baseMessage != null)
                ThreadPool.QueueUserWorkItem(HandleMessage, baseMessage);//开启线程进行处理

            //移动解析位置 加上消息体长度
            nowIndex += msgLength;

            // 如果刚好解析完当前缓存数组所有内容,说明这个包没有黏包,重置缓存并退出循环,解析结束
            if (nowIndex == cacheNum)
            {
                cacheNum = 0;
                break;
            }
        }
        // 如果不满足条件,表明存在分包的情况,需要将当前接收的内容记录下来 以便在下次接收到消息后继续处理
        else
        {
            // 如果已经解析了消息ID和消息长度,但没有成功解析消息体,需要减去nowIndex解析位置的偏移。
            // 要保留完整的消息ID和消息长度,以便下次完整解析。
            if (msgLength != -1)
                nowIndex -= 8;

            // 使用Array.Copy方法,将剩余未解析的字节数组内容移到前面,用于缓存下次继续解析。
            // 参数1: 要拷贝的数组  这里是原始的缓存数组 cacheBytes。
            // 参数2: 从第几个索引开始拷贝后面的内容  这里是 nowIndex,表示从未解析的部分开始,把nowIndex到尾部的字节元素都要拷贝
            // 参数3: 拷贝到的目标数组  也是原始的缓存数组 cacheBytes,因此在这里实际上是在原数组中进行移动操作。
            // 参数4: 目标数组开始索引  这里是0,表示将数据移动到数组的开头。
            // 参数5: 拷贝长度  这里是 cacheNum - nowIndex,表示要移动的字节数,即未解析部分的长度。cacheNum代表原先缓存数组所有要解析的字节数组长度,减去nowIndex代表未解析部分的长度。
            Array.Copy(cacheBytes, nowIndex, cacheBytes, 0, cacheNum - nowIndex);

            // 更新缓存的长度,减去已解析的部分,以便在下次继续解析时正确处理未解析的内容。
            cacheNum = cacheNum - nowIndex;

            break;
        }
    }

}

// 处理接收到的消息
private void HandleMessage(object obj)
{
    switch (obj)
    {
        case PlayerMessage baseMessage:
            PlayerMessage playerMessage = baseMessage as PlayerMessage;
            Console.WriteLine(playerMessage.playerID);
            Console.WriteLine(playerMessage.playerData.name);
            Console.WriteLine(playerMessage.playerData.lev);
            Console.WriteLine(playerMessage.playerData.atk);
            break;
        case QuitMessage baseMessage:
            //收到断开连接消息 把自己添加到待移除的列表当中
            Program.serverSocket.CloseClientSocket(this);
            break;
        case HeartMessage baseMessage:
            //收到心跳消息 记录收到消息的时间
            frontTime = DateTime.Now.Ticks / TimeSpan.TicksPerSecond;
            Console.WriteLine("收到心跳消息");
            break;
    }
}

在ServerSocket类,实现关闭客户端连接的方法

//关闭客户端连接的 从字典中移除
public void CloseClientSocket(ClientSocket socket)
{
    //添加线程锁 服务器类有很多线程 假如同时操作字典会有问题 保证线程安全
    lock (clientSocketDictionary)
    {
        socket.Close();
        if (clientSocketDictionary.ContainsKey(socket.clientID))
        {
            clientSocketDictionary.Remove(socket.clientID);
            Console.WriteLine("客户端{0}主动断开连接了", socket.clientID);
        }
    }
}

服务端入口定义ServerSocket类静态变量和输入1001广播逻辑

public static ServerSocket serverSocket;
static void Main(string[] args)
{
    serverSocket = new ServerSocket();
    serverSocket.Start("127.0.0.1", 8080, 1024);
    Console.WriteLine("开启服务器成功");

    while (true)
    {
        string input = Console.ReadLine();
        if (input.Substring(2) == "1001")
        {
            PlayerMessage playerMessage = new PlayerMessage();
            playerMessage.playerID = 9876;
            playerMessage.playerData = new PlayerData();
            playerMessage.playerData.name = "服务器端发来的消息";
            playerMessage.playerData.lev = 99;
            playerMessage.playerData.atk = 80;
            serverSocket.Broadcast(playerMessage);
        }
    }
}

31.2 知识点代码

ClientSocket

using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;

namespace Lesson31_网络通信_套接字Socket_TCP通信_异步_服务端综合练习题
{
    class ClientSocket
    {
        private static int CLIENT_BEGIN_ID = 1;  // 静态变量,用于为客户端分配唯一的客户端ID
        public int clientID;  // 客户端的唯一ID
        public Socket clientSocket;  // 与客户端通信的套接字对象

        //用于处理分包时 缓存的 字节数组 和 字节数组长度
        private byte[] cacheBytes = new byte[1024 * 1024];
        private int cacheNum = 0;

        //上一次收到消息的时间
        private long frontTime = -1;
        //超时时间
        private static int TIME_OUT_TIME = 10;

        /// <summary>
        /// 是否是连接状态
        /// </summary>
        public bool isClientConnected => this.clientSocket.Connected;  // 判断套接字是否处于连接状态

        public ClientSocket(Socket clientSocket)
        {
            this.clientID = CLIENT_BEGIN_ID;  // 初始化客户端ID
            this.clientSocket = clientSocket;  // 初始化套接字
            ++CLIENT_BEGIN_ID;  // 为下一个客户端分配不同的ID

            //开始收消息
            this.clientSocket.BeginReceive(cacheBytes, cacheNum, cacheBytes.Length, SocketFlags.None, ReceiveCallBack, null);

            //心跳消息检查
            ThreadPool.QueueUserWorkItem(CheckTimeOut);
        }

        /// <summary>
        /// 间隔一段时间 检测一次超时 如果超时 就会主动断开该客户端的连接
        /// </summary>
        /// <param name="obj"></param>
        private void CheckTimeOut(object obj)
        {
            while (this.clientSocket != null && isClientConnected)
            {
                if (frontTime != -1 &&
                DateTime.Now.Ticks / TimeSpan.TicksPerSecond - frontTime >= TIME_OUT_TIME)
                {
                    Program.serverSocket.CloseClientSocket(this);
                    break;
                }
                Thread.Sleep(5000);
            }
        }

        // 关闭套接字连接
        public void Close()
        {
            if (clientSocket != null)
            {
                clientSocket.Shutdown(SocketShutdown.Both);  // 关闭套接字的读写
                clientSocket.Close();  // 关闭套接字连接
                clientSocket = null;
            }
        }

        // 发送消息给客户端
        public void Send(BaseMessage baseMessage)
        {
            // 检查 clientSocket 不为 null 并且客户端已连接。
            if (clientSocket != null && isClientConnected)
            {
                // 使用 Writing() 方法将消息序列化为字节数组。
                byte[] bytes = baseMessage.Writing();

                // 在 clientSocket 上启动异步发送操作以发送消息。
                // 参数:bytes - 要发送的数据,0 - 数据中的起始位置,
                // bytes.Length - 要发送的字节数,SocketFlags.None - 无特殊标志,
                // SendCallBack - 发送操作完成时执行的回调方法,null - 用户定义的对象。
                clientSocket.BeginSend(bytes, 0, bytes.Length, SocketFlags.None, SendCallBack, null);
            }
            else
            {
                // 如果 clientSocket 为 null 或客户端未连接,则关闭客户端的连接。
                Program.serverSocket.CloseClientSocket(this);
            }
        }

        // 发送消息给客户端的回调。
        private void SendCallBack(IAsyncResult asyncResult)
        {
            try
            {
                // 检查 clientSocket 不为 null 并且客户端已连接。
                if (clientSocket != null && isClientConnected)
                    // 结束异步发送操作,标记其完成。
                    this.clientSocket.EndSend(asyncResult);
                else
                    // 如果 clientSocket 为 null 或客户端未连接,则关闭客户端的连接。
                    Program.serverSocket.CloseClientSocket(this);
            }
            catch (SocketException e)
            {
                // 处理可能在发送操作期间发生的 SocketException。
                // 打印错误消息,包括 SocketErrorCode 和异常消息。
                Console.WriteLine("发送失败" + e.SocketErrorCode + e.Message);

                // 作为错误响应关闭客户端的连接。
                Program.serverSocket.CloseClientSocket(this);
            }
        }

        // 接收来自客户端的消息的回调
        private void ReceiveCallBack(IAsyncResult asyncResult)
        {
            try
            {
                if (this.clientSocket != null && isClientConnected)
                {
                    // 从客户端成功接收消息
                    int num = this.clientSocket.EndReceive(asyncResult);

                    // 处理消息的分包和黏包问题
                    HandleReceiveMsg(num);

                    // 再次启动异步接收操作以等待下一条消息,将已接收的消息添加到缓存中。
                    this.clientSocket.BeginReceive(cacheBytes, cacheNum, cacheBytes.Length - cacheNum, SocketFlags.None, ReceiveCallBack, this.clientSocket);
                }
                else
                {
                    // 如果 clientSocket 为 null 或客户端未连接,则不再接收消息。
                    Console.WriteLine("没有连接,不用再收消息了");
                    Program.serverSocket.CloseClientSocket(this);
                }
            }
            catch (SocketException e)
            {
                // 处理可能在接收消息期间发生的 SocketException。
                Console.WriteLine("接受消息错误" + e.SocketErrorCode + e.Message);

                // 作为错误响应关闭客户端的连接。
                Program.serverSocket.CloseClientSocket(this);
            }
        }

        // 处理接受消息 分包、黏包问题的方法
        private void HandleReceiveMsg(int receiveNum)
        {
            int msgID = 0;       // 消息ID
            int msgLength = 0;   // 消息长度
            int nowIndex = 0;    // 当前消息解析到哪一位

            //由于消息接收后是直接存储在 cacheBytes中的 所以不需要进行什么拷贝操作
            //收到消息的字节数量
            cacheNum += receiveNum;

            while (true)
            {
                // 在每次循环开始时将消息长度设置为-1作为标记,以避免上一次解析的数据影响当前的判断
                msgLength = -1;

                // 如果当前的缓存数组长度大于8 那么就可以解析这一个包的消息ID和消息长度 移动解析位置
                if (cacheNum - nowIndex >= 8)
                {
                    // 解析消息ID
                    msgID = BitConverter.ToInt32(cacheBytes, nowIndex);
                    nowIndex += 4;

                    // 解析消息长度
                    msgLength = BitConverter.ToInt32(cacheBytes, nowIndex);
                    nowIndex += 4;
                }

                // 缓存数组长度减去当前解析的位置假如大于消息长度 且消息长度不能是-1(-1说明没有解析消息长度 那就更不能解析消息体了)说明可以解析消息体
                if (cacheNum - nowIndex >= msgLength && msgLength != -1)
                {
                    //解析消息体
                    BaseMessage baseMessage = null;
                    switch (msgID)
                    {
                        case 1001:
                            baseMessage = new PlayerMessage();
                            baseMessage.Reading(cacheBytes, nowIndex);
                            break;
                        case 1003:
                            baseMessage = new QuitMessage();
                            //由于该消息没有消息体 所以都不用反序列化
                            break;
                        case 999:
                            baseMessage = new HeartMessage();
                            //由于该消息没有消息体 所以都不用反序列化
                            break;
                    }
                    if (baseMessage != null)
                        ThreadPool.QueueUserWorkItem(HandleMessage, baseMessage);//开启线程进行处理

                    //移动解析位置 加上消息体长度
                    nowIndex += msgLength;

                    // 如果刚好解析完当前缓存数组所有内容,说明这个包没有黏包,重置缓存并退出循环,解析结束
                    if (nowIndex == cacheNum)
                    {
                        cacheNum = 0;
                        break;
                    }
                }
                // 如果不满足条件,表明存在分包的情况,需要将当前接收的内容记录下来 以便在下次接收到消息后继续处理
                else
                {
                    // 如果已经解析了消息ID和消息长度,但没有成功解析消息体,需要减去nowIndex解析位置的偏移。
                    // 要保留完整的消息ID和消息长度,以便下次完整解析。
                    if (msgLength != -1)
                        nowIndex -= 8;

                    // 使用Array.Copy方法,将剩余未解析的字节数组内容移到前面,用于缓存下次继续解析。
                    // 参数1: 要拷贝的数组  这里是原始的缓存数组 cacheBytes。
                    // 参数2: 从第几个索引开始拷贝后面的内容  这里是 nowIndex,表示从未解析的部分开始,把nowIndex到尾部的字节元素都要拷贝
                    // 参数3: 拷贝到的目标数组  也是原始的缓存数组 cacheBytes,因此在这里实际上是在原数组中进行移动操作。
                    // 参数4: 目标数组开始索引  这里是0,表示将数据移动到数组的开头。
                    // 参数5: 拷贝长度  这里是 cacheNum - nowIndex,表示要移动的字节数,即未解析部分的长度。cacheNum代表原先缓存数组所有要解析的字节数组长度,减去nowIndex代表未解析部分的长度。
                    Array.Copy(cacheBytes, nowIndex, cacheBytes, 0, cacheNum - nowIndex);

                    // 更新缓存的长度,减去已解析的部分,以便在下次继续解析时正确处理未解析的内容。
                    cacheNum = cacheNum - nowIndex;

                    break;
                }
            }

        }

        // 处理接收到的消息
        private void HandleMessage(object obj)
        {
            switch (obj)
            {
                case PlayerMessage baseMessage:
                    PlayerMessage playerMessage = baseMessage as PlayerMessage;
                    Console.WriteLine(playerMessage.playerID);
                    Console.WriteLine(playerMessage.playerData.name);
                    Console.WriteLine(playerMessage.playerData.lev);
                    Console.WriteLine(playerMessage.playerData.atk);
                    break;
                case QuitMessage baseMessage:
                    //收到断开连接消息 把自己添加到待移除的列表当中
                    Program.serverSocket.CloseClientSocket(this);
                    break;
                case HeartMessage baseMessage:
                    //收到心跳消息 记录收到消息的时间
                    frontTime = DateTime.Now.Ticks / TimeSpan.TicksPerSecond;
                    Console.WriteLine("收到心跳消息");
                    break;
            }
        }
    }
}

ServerSocket

using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;

namespace Lesson31_网络通信_套接字Socket_TCP通信_异步_服务端综合练习题
{
    class ServerSocket
    {
        // 服务器端Socket
        public Socket serverSocket;

        // 保存客户端连接的所有Socket的字典
        public Dictionary<int, ClientSocket> clientSocketDictionary = new Dictionary<int, ClientSocket>();

        //服务端启动
        public void Start(string ipString, int port, int clientSocketMaxNum)
        {
            // 创建服务器套接字,指定地址族为IPv4、套接字类型为流套接字、协议类型为TCP
            serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

            // 创建IP终结点,指定IP地址和端口号
            IPEndPoint serverEndPoint = new IPEndPoint(IPAddress.Parse(ipString), port);

            try
            {
                // 将套接字绑定到指定的IP终结点
                serverSocket.Bind(serverEndPoint);

                // 启动服务器套接字,同时指定同时等待连接的最大客户端数
                serverSocket.Listen(clientSocketMaxNum);

                //通过异步接受客户端连入
                serverSocket.BeginAccept(AcceptCallBack, null);
            }
            catch (Exception e)
            {
                Console.WriteLine("启动服务器失败" + e.Message);
            }
        }

        //服务端接收到客户端回调
        private void AcceptCallBack(IAsyncResult asyncResult)
        {
            try
            {
                //获取连入的客户端
                Socket clientSocket = serverSocket.EndAccept(asyncResult);
                ClientSocket client = new ClientSocket(clientSocket);

                //记录客户端对象
                clientSocketDictionary.Add(client.clientID, client);

                //继续去让别的客户端可以连入
                serverSocket.BeginAccept(AcceptCallBack, null);
            }
            catch (Exception e)
            {
                Console.WriteLine("客户端连入失败" + e.Message);
            }
        }

        // 向所有客户端广播消息
        public void Broadcast(BaseMessage baseMessage)
        {
            //线程锁 保证字典线程安全
            lock (clientSocketDictionary)
            {
                foreach (ClientSocket client in clientSocketDictionary.Values)
                {
                    client.Send(baseMessage);
                }
            }
        }

        //关闭客户端连接的 从字典中移除
        public void CloseClientSocket(ClientSocket socket)
        {
            //添加线程锁 服务器类有很多线程 假如同时操作字典会有问题 保证线程安全
            lock (clientSocketDictionary)
            {
                socket.Close();
                if (clientSocketDictionary.ContainsKey(socket.clientID))
                {
                    clientSocketDictionary.Remove(socket.clientID);
                    Console.WriteLine("客户端{0}主动断开连接了", socket.clientID);
                }
            }
        }
    }
}

Lesson31_网络通信_套接字Socket_TCP通信_异步_服务端综合练习题

namespace Lesson31_网络通信_套接字Socket_TCP通信_异步_服务端综合练习题
{
    class Program
    {
        public static ServerSocket serverSocket;
        static void Main(string[] args)
        {
            serverSocket = new ServerSocket();
            serverSocket.Start("127.0.0.1", 8080, 1024);
            Console.WriteLine("开启服务器成功");

            while (true)
            {
                string input = Console.ReadLine();
                if (input.Substring(2) == "1001")
                {
                    PlayerMessage playerMessage = new PlayerMessage();
                    playerMessage.playerID = 9876;
                    playerMessage.playerData = new PlayerData();
                    playerMessage.playerData.name = "服务器端发来的消息";
                    playerMessage.playerData.lev = 99;
                    playerMessage.playerData.atk = 80;
                    serverSocket.Broadcast(playerMessage);
                }
            }
        }
    }
}


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 785293209@qq.com

×

喜欢就点赞,疼爱就打赏