4.消息池类

  1. 4.消息池类
    1. 4.1 知识点
      1. 创建消息池类
      2. 在TCP异步管理器中定义消息池对象
      3. 之后会写自动生成消息池和Handler类的工具
    2. 4.2 知识点代码
      1. MessagePool
      2. TcpNetAsyncManager

4.消息池类


4.1 知识点

创建消息池类

  • 消息池类:主要用于注册消息类型、消息处理器类型与ID的映射关系,以便在运行时根据ID获取相应的消息类对象或消息处理类对象。
public class MessagePool
{
    private Dictionary<int, Type> messsages = new Dictionary<int, Type>();
    private Dictionary<int, Type> handlers = new Dictionary<int, Type>();

    public MessagePool()
    {
        Register(1001, typeof(PlayerMessage), typeof(PlayerMessageHandler));
    }

    private void Register(int id, Type messageType, Type handlerType)
    {
        messsages.Add(id, messageType);
        handlers.Add(id, handlerType);
    }

    /// <summary>
    /// 根据ID获取指定的消息类对象
    /// </summary>
    /// <param name="id"></param>
    /// <returns></returns>
    public BaseMessage GetMessage(int id)
    {
        if (!messsages.ContainsKey(id))
            return null;

        return Activator.CreateInstance(messsages[id]) as BaseMessage;
    }

    /// <summary>
    /// 根据ID获取指定的消息处理类对象
    /// </summary>
    /// <param name="id"></param>
    /// <returns></returns>
    public BaseHandler GetHandler(int id)
    {
        if (!handlers.ContainsKey(id))
            return null;

        return Activator.CreateInstance(handlers[id]) as BaseHandler;
    }
}

在TCP异步管理器中定义消息池对象

  • 用途:在 TCP 异步管理器中定义消息池对象,以便在分包黏包方法中使用消息池获取消息对象和消息处理器,并将它们压入队列中。
// 消息池对象,用于快速获取消息和消息处理类对象
private MessagePool messagePool = new MessagePool();

// 在分包黏包方法中使用消息池获取消息对象和消息处理器,并将它们压入队列中
BaseMessage baseMessage = messagePool.GetMessage(msgID);
if (baseMessage != null)
{
    // 反序列化
    baseMessage.Reading(cacheBytes, nowIndex);
    // 获取消息处理器对象
    BaseHandler baseHandler = messagePool.GetHandler(msgID);
    baseHandler.message = baseMessage;
    // 将消息处理器对象放入队列中,稍后在Update中进行处理
    receiveQueue.Enqueue(baseHandler);
}

之后会写自动生成消息池和Handler类的工具


4.2 知识点代码

MessagePool

using GamePlayer;
using GameSystem;
using System;
using System.Collections;
using System.Collections.Generic;
using UnityEngine;

/// <summary>
/// 消息池类 主要是用于 注册 ID和消息类型以及消息处理器类型的映射关系
/// 方便我们获取对象 进行反序列化和消息逻辑处理
/// </summary>
public class MessagePool
{
    // 记录消息类型和ID的映射关系
    private Dictionary<int, Type> messsages = new Dictionary<int, Type>();
    // 记录消息处理器类型和ID的映射关系
    private Dictionary<int, Type> handlers = new Dictionary<int, Type>();

    // 构造函数,用于初始化消息池并注册初始映射关系
    public MessagePool()
    {
        // 在构造函数中进行注册映射关系
        Register(1001, typeof(PlayerMessage), typeof(PlayerMessageHandler));
    }

    // 注册方法,将消息类型、消息处理器类型与ID进行关联
    private void Register(int id, Type messageType, Type handlerType)
    {
        messsages.Add(id, messageType);
        handlers.Add(id, handlerType);
    }

    /// <summary>
    /// 根据ID得到一个指定的消息类对象
    /// </summary>
    /// <param name="id"></param>
    /// <returns></returns>
    public BaseMessage GetMessage(int id)
    {
        // 如果消息池中不包含指定ID的映射关系,则返回null
        if (!messsages.ContainsKey(id))
            return null;

        // 根据ID创建对应消息类对象并返回
        return Activator.CreateInstance(messsages[id]) as BaseMessage;
    }

    /// <summary>
    /// 根据ID得到一个指定的消息处理类对象
    /// </summary>
    /// <param name="id"></param>
    /// <returns></returns>
    public BaseHandler GetHandler(int id)
    {
        // 如果消息池中不包含指定ID的映射关系,则返回null
        if (!handlers.ContainsKey(id))
            return null;

        // 根据ID创建对应消息处理类对象并返回
        return Activator.CreateInstance(handlers[id]) as BaseHandler;
    }
}

TcpNetAsyncManager

using System;
using System.Collections;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using UnityEngine;

public class TcpNetAsyncManager : BaseSingletonInMonoBehaviour<TcpNetAsyncManager>
{
    //和服务器进行连接的 Socket
    private Socket socket;// 创建Socket对象,用于网络通信

    //private Queue<BaseMessage> receiveQueue = new Queue<BaseMessage>(); // 创建一个队列,用于存储接收到的消息
    private Queue<BaseHandler> receiveQueue = new Queue<BaseHandler>(); // 创建一个队列,用于存储接消息接受者

    //用于处理分包时 缓存的 字节数组 和 字节数组长度
    private byte[] cacheBytes = new byte[1024 * 1024];
    private int cacheNum = 0;

    //发送心跳消息的间隔时间
    private int SEND_HEART_MSG_TIME = 2;
    private HeartMessage heartMessage = new HeartMessage();

    //消息池对象 用于快速获取消息和消息处理类对象
    private MessagePool messagePool = new MessagePool();

    protected override void Awake()
    {
        base.Awake();

        //客户端循环定时给服务端发送心跳消息
        InvokeRepeating("SendHeartMsg", 0, SEND_HEART_MSG_TIME);
    }

    //发送心跳消息
    private void SendHeartMsg()
    {
        if (socket != null && this.socket.Connected)
            Send(heartMessage);
    }

    //连接服务器的代码
    public void Connect(string ip, int port)
    {
        if (socket != null && socket.Connected)
            return;

        IPEndPoint ipPoint = new IPEndPoint(IPAddress.Parse(ip), port); // 创建一个IP终结点对象
        socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

        SocketAsyncEventArgs socketAsyncEventArgs = new SocketAsyncEventArgs();
        socketAsyncEventArgs.RemoteEndPoint = ipPoint;
        socketAsyncEventArgs.Completed += (socket, socketAsyncEventArgs) =>
        {
            if (socketAsyncEventArgs.SocketError == SocketError.Success)
            {
                print("连接成功");
                //收消息
                SocketAsyncEventArgs receiveSocketAsyncEventArgs = new SocketAsyncEventArgs();
                receiveSocketAsyncEventArgs.SetBuffer(cacheBytes, 0, cacheBytes.Length);
                receiveSocketAsyncEventArgs.Completed += ReceiveCallBack;
                this.socket.ReceiveAsync(receiveSocketAsyncEventArgs);
            }
            else
            {
                print("连接失败" + socketAsyncEventArgs.SocketError);
            }
        };
        socket.ConnectAsync(socketAsyncEventArgs);
    }

    // 关闭连接
    public void Close()
    {
        print("客户端主动断开连接");
        if (socket != null)
        {
            //主动发送一条断开的消息个服务端 
            QuitMessage quitMessage = new QuitMessage();

            //这里不能用我们封装的Send方法 因为Send方法是开一个线程发送的 可能还没发就直接被断开了
            socket.Send(quitMessage.Writing());

            socket.Shutdown(SocketShutdown.Both); // 关闭套接字的发送和接收

            socket.Disconnect(false);//手动停止连接 参数意思是这个socket以后还用不用

            socket.Close(); // 关闭套接字连接

            socket = null;//当前socket不会再用了 置空
        }
    }

    // 当对象被销毁时,确保关闭连接
    private void OnDestroy()
    {
        Close(); // 调用关闭连接的方法
    }

    /// <summary>
    /// 用于测试 直接发字节数组的方法
    /// </summary>
    /// <param name="bytes"></param>
    public void SendTest(byte[] bytes)
    {
        SocketAsyncEventArgs socketAsyncEventArgs = new SocketAsyncEventArgs();
        socketAsyncEventArgs.SetBuffer(bytes, 0, bytes.Length);
        socketAsyncEventArgs.Completed += (socket, args) =>
        {
            if (args.SocketError != SocketError.Success)
            {
                print("发送消息失败" + args.SocketError);
                Close();
            }

        };
        this.socket.SendAsync(socketAsyncEventArgs);
    }

    // 发送消息
    public void Send(BaseMessage baseMessage)
    {
        if (this.socket != null && this.socket.Connected)
        {
            byte[] bytes = baseMessage.Writing();
            SocketAsyncEventArgs args = new SocketAsyncEventArgs();
            args.SetBuffer(bytes, 0, bytes.Length);
            args.Completed += (socket, args) =>
            {
                if (args.SocketError != SocketError.Success)
                {
                    print("发送消息失败" + args.SocketError);
                    Close();
                }

            };
            this.socket.SendAsync(args);
        }
        else
        {
            Close();
        }
    }

    //收消息完成的回调函数
    private void ReceiveCallBack(object obj, SocketAsyncEventArgs socketAsyncEventArgs)
    {
        if (socketAsyncEventArgs.SocketError == SocketError.Success)
        {
            HandleReceiveMsg(socketAsyncEventArgs.BytesTransferred);
            //继续去收消息
            socketAsyncEventArgs.SetBuffer(cacheNum, socketAsyncEventArgs.Buffer.Length - cacheNum);
            //继续异步收消息
            if (this.socket != null && this.socket.Connected)
                socket.ReceiveAsync(socketAsyncEventArgs);
            else
                Close();
        }
        else
        {
            print("接受消息出错" + socketAsyncEventArgs.SocketError);
            //关闭客户端连接
            Close();
        }
    }

    //处理接受消息 分包、黏包问题的方法
    private void HandleReceiveMsg(int receiveNum)
    {
        int msgID = 0;
        int msgLength = 0;
        int nowIndex = 0;

        cacheNum += receiveNum;

        while (true)
        {
            //每次将长度设置为-1 是避免上一次解析的数据 影响这一次的判断
            msgLength = -1;
            //处理解析一条消息
            if (cacheNum - nowIndex >= 8)
            {
                //解析ID
                msgID = BitConverter.ToInt32(cacheBytes, nowIndex);
                nowIndex += 4;
                //解析长度
                msgLength = BitConverter.ToInt32(cacheBytes, nowIndex);
                nowIndex += 4;
            }

            if (cacheNum - nowIndex >= msgLength && msgLength != -1)
            {
                ////解析消息体
                //BaseMessage baseMessage = null;

                //BaseHandler baseHandler = null;

                //switch (msgID)
                //{
                //    case 1001:
                //        baseMessage = new PlayerMessage();
                //        baseMessage.Reading(cacheBytes, nowIndex);

                //        baseHandler = new PlayerMessageHandler();
                //        baseHandler.message = baseMessage;
                //        break;
                //}
                //if (baseHandler != null)
                ////if (baseMessage != null)
                //    receiveQueue.Enqueue(baseHandler);
                //    //receiveQueue.Enqueue(baseMessage);

                //得到一个指定ID的消息类对象 只不过是用父类装子类
                BaseMessage baseMessage = messagePool.GetMessage(msgID);
                if (baseMessage != null)
                {
                    //反序列化
                    baseMessage.Reading(cacheBytes, nowIndex);
                    //得到一个消息处理器对象
                    BaseHandler baseHandler = messagePool.GetHandler(msgID);
                    baseHandler.message = baseMessage;
                    //把消息处理器对象 放入队列中 稍后在Update中进行处理
                    receiveQueue.Enqueue(baseHandler);
                }

                nowIndex += msgLength;
                if (nowIndex == cacheNum)
                {
                    cacheNum = 0;
                    break;
                }
            }
            else
            {
                if (msgLength != -1)
                    nowIndex -= 8;
                //就是把剩余没有解析的字节数组内容 移到前面来 用于缓存下次继续解析
                Array.Copy(cacheBytes, nowIndex, cacheBytes, 0, cacheNum - nowIndex);
                cacheNum = cacheNum - nowIndex;
                break;
            }
        }

    }

    void Update()
    {
        if (receiveQueue.Count > 0)
        {
            BaseHandler baseHandler = receiveQueue.Dequeue();
            baseHandler.MessageHandle();
            //BaseMessage baseMessage = receiveQueue.Dequeue();
            //switch (baseMessage)
            //{
            //    case PlayerMessage msg:
            //        print(msg.playerID);
            //        print(msg.playerData.name);
            //        print(msg.playerData.lev);
            //        print(msg.playerData.atk);
            //        break;
            //}
        }
    }

}


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 785293209@qq.com

×

喜欢就点赞,疼爱就打赏