【前言】
游戏的主逻辑一般是单线程的,所以实现一个消息队列很简单,不像互联网开发中会涉及多线程、多进程。可以先看看这篇文章。
对回调函数和消息机制的理解_消息回调函数_永恒星的博客-CSDN博客
这里尝试先去分析一些要素,然后直接基于这些要素去写代码,而不是边写边分析。
【分析1—基本功能】
由于消息队列需要被跨模块调用,它应该是一个静态类,不能被继承,不能用单例的形式。
对于消息发送者(Sender)而言,需要有一个SendMessage的方法,方法中的参数是发送的消息内容。
对于消息接受者(Receiver)而言,需要有一个AddListener方法去监听消息,对应的要有一个RemoveListener方法,显然,消息接受者需要传入一个回调函数。
消息队列中可能是任意形式的消息,因此消息内容需要用object类型来表示。这就需要对消息发送者发的任意形式的消息做一个封装。
消息队列中要管理消息,需要有个数据结构去缓存Message,这里直接用Queue。
消息列队需要将Message转发给Receiver,因此必须要持有Receiver,这里持有的是Receiver的回调函数,同时要有个数据结构去缓存,直接用List。
因为有多个Sender和Receiver,他们之间需要有区分,Receiver只接受指定类型的消息,但由于是跨模块调用的,不能事先预定义消息的类型,因此需要将消息类型的定义交给发送消息时具体的发送者去定义,也即在SendMessage时要传入一个消息类型的字段作为参数,这个参数可以是枚举类型的。
在添加了一个参数后,消息队列拿到Sender的Message时,除了要缓存消息内容Content,还需要缓存消息类型Type。因为Type一定时,Content不定,也即Sender可能发送同一类型的不同内容的消息,可以做个字典来缓存两者,即Dictionay<Type,Queue<Content>>。但这样处理的话破坏了按消息到来的顺序来发送消息的基本原理了。因此,为了标识Content是什么Type,需要将二者封装成一个新的数据类型。
将消息转发给Receiver时,可以根据IMessage里的Type,找到对应的Receiver,显然缓存接受者需要Dictionary。
到这里实现了基本的消息队列了,可以先将代码写出来,然后继续分析。代码如下:
using System.Collections;
using System.Collections.Generic;public class MessageWrapper
{public MessageType type;public object content;
}public enum MessageType
{None = 0,EnterGame = 1,ClickButton = 2,Kill = 3,//根据使用需要不断往下添加
}public delegate void MessageCb(object content);
public sealed class MessageQueue
{private static Queue<MessageWrapper> messages = new Queue<MessageWrapper>();private static Dictionary<MessageType, MessageCb> listeners = new Dictionary<MessageType,MessageCb>();public static void SendMessage(MessageType type,object content){MessageWrapper messageWrapper = new MessageWrapper();messageWrapper.type = type;messageWrapper.content = content;messages.Enqueue(messageWrapper);}public static void AddListener(MessageType type, MessageCb cb){if(listeners.TryGetValue(type,out var messageCb)){listeners[type] = messageCb + cb;//委托链}else{listeners.Add(type, cb);}}public static void RemoveListener(MessageType type, MessageCb cb){if (listeners.TryGetValue(type, out var messageCb)){if(messageCb != null){messageCb -= cb;listeners[type] = messageCb;} }}public static void Tick(){while(messages.Count > 0){var message = messages.Dequeue();//做好管理,这里一定不为空var listener = listeners[message.type];SendMessagerInternal(listener, message.content);}}private static void SendMessagerInternal(MessageCb listener, object content){if(listener != null){listener(content);}}
}
【分析2—其他情况】
1.何时发送:这里将消息在下一帧发送,如果需要当前帧发送呢。尽管使用消息时默认是下一帧发送的,我们还是需要添加一下对这种情况的处理。这里用了bool变量即可,一个bool表示两种可能,刚好将需要立即发送和下一帧发送做了区分。
2.重复的Receiver:如果同一个Receiver 调用 AddListener多次,该如何处理呢。就目前的实现而言,无法分辨重复的Receiver。如果需要分辨的话,可以遍历委托链找到重复的Receiver,也可以先Remove再Add的方式规避重复的Receiver。在这里保持这样一个原则:对于错误的调用,希望可以纠错,而不是忽略它。所以采用遍历委托链的方式找到是哪个Receiver重复了。但遍历比较耗时,如果一个Message有数千个Receiver就很耗时。消息机制中,Sender不能直接知道有哪些Receiver,但消息队列可以知道,既然如此,可以让Receiver直接表明自己,而不是通过回调函数来表明。这里让Receiver在AddListener时传入一个标识自己的Name。消息队列需要额外缓存这个标识。
3.空的Receiver:AddListener和RemoveListener应该一一对应,但其他人在用的时候可能会忘记写了,这里直接把错误打印出来。
4.某些模块在发出消息或接受消息前需要有自定义的处理,需要增加一个钩子函数,供这些模块做自定义处理。
到这里处理了一些其他情况,还可能有更多的情况是没考虑到的,这些情况可以随着使用逐渐暴露出来,根据这些情况再做处理即可,这主要是经验的积累了。考虑了其他情况的后代码如下:
using System.Collections;
using System.Collections.Generic;
using UnityEngine;public class MessageWrapper
{public MessageType type;public object content;public bool isSend;
}public enum MessageType
{None = 0,EnterGame = 1,ClickButton = 2,Kill = 3,//根据使用需要不断往下添加
}public delegate void MessageCb(object content);
public delegate void Hook(MessageWrapper message);
public sealed class MessageQueue
{private static Queue<MessageWrapper> messages = new Queue<MessageWrapper>();private static Dictionary<MessageType, MessageCb> listenerCb = new Dictionary<MessageType,MessageCb>();private static Dictionary<MessageType,HashSet<string>> listenerNames = new Dictionary<MessageType,HashSet<string>>();private static Dictionary<string,Hook> name2hook = new Dictionary<string,Hook>();public static void SendMessage(MessageType type,object content,bool sync = false){MessageWrapper messageWrapper = new MessageWrapper();messageWrapper.type = type;messageWrapper.content = content;messageWrapper.isSend = false;if(sync){SendMessagerInternal(messageWrapper);}else{messages.Enqueue(messageWrapper);}}public static void AddListener(MessageType type, MessageCb cb,string name){if(cb == null){Debug.LogError("cb is null");return;}if(string.IsNullOrEmpty(name)){Debug.LogError("name is null or empty");}if(listenerCb.TryGetValue(type,out var messageCb)){if(listenerNames[type].Contains(name)){Debug.LogError($"receiver is repeated for {type}");}else{listenerCb[type] = messageCb + cb;//委托链 listenerNames[type].Add(name);}}else{listenerCb.Add(type, cb);listenerNames.Add(type, new HashSet<string> { name });}}public static void RemoveListener(MessageType type, MessageCb cb,string name){if (listenerCb.TryGetValue(type, out var messageCb)){if(listenerNames[type].Contains(name)){if (messageCb != null){messageCb -= cb;listenerCb[type] = messageCb;}listenerNames[type].Remove(name);}else{Debug.LogWarning($"there is no receiver for {type}");} }}public static void AddHook(string name, Hook hook){if (!name2hook.ContainsKey(name)){name2hook.Add(name, hook);}else{Debug.LogWarning($"hook of {name} is existed");}}public static void RemoveHook(string name){if (name2hook.ContainsKey(name)){name2hook.Remove(name);}else{Debug.LogWarning($"hook of {name} is not existed");}}public static void Tick(){while(messages.Count > 0){var message = messages.Dequeue();//做好管理,这里可以省去一些判断 SendMessagerInternal(message);}}public static void Dispose(){messages.Clear();listenerCb.Clear();listenerNames.Clear();}private static void SendMessagerInternal(MessageWrapper message){foreach (var item in name2hook.Values){item.Invoke(message);}if(listenerCb.TryGetValue(message.type,out var listener)){if (listener != null && !message.isSend){listener(message.content);message.isSend = true;}else{listenerNames.Remove(message.type);Debug.LogError($"listener is null for {message.type}");}}else{Debug.LogWarning($"there is no receiver for {message.type}");}}}
【分析情况3——性能和内存】
1.message每次都是重新new,而消息队列作为一个基础模块,使用的频率会很高,每次重新new会的性能开销就不可忽略,同时也会引起内存碎片化,需要加个Pool来处理,这Pool直接放在其自身。Pool的初始容量的确定需要根据实际的使用情况来确定,统计下实际使用时的峰值来给定初始的容量。这里先随便给一个。
2.每个发送的消息在Invoke后,Receiver可能要进行很多处理,如果在每帧Invoke过多,那么可能会导致这一帧耗时过长,因此可以对每帧发送的消息做一个限制,也即分帧处理。具体每帧处理多少个需要结合实际项目来看,这里随便给一个。
3.消息的内容content是object类型,可能会发生装箱和拆箱,产生GC,需要修改成不会拆箱装箱的形式,方法就是添加一个新的泛型字段来缓存。同时,将MessageWapper传递给Receiver,Receiver使用这个字段来获取消息的内容。
考虑了性能和内存的情况如下:
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using UnityEngine;public interface IMessage
{MessageType type { get;set; }object content { get; set; }bool isSend { get; set; }void Release();}
public class MessageWrapper<TContent> : IMessage
{private MessageType _type;public MessageType type{get { return _type; }set { _type = value; }}private object _content;public object content{get { return _content; }set { _content = value; }}private bool _isSend;public bool isSend{get { return isSend; }set {isSend = value;}}private TContent _ncontent;public TContent ncontent{get { return _ncontent; }set { _ncontent = value; }}private void Clear(){_content =null; _isSend = false;}public void Release(){Clear();Release(this);}private static Queue<MessageWrapper<TContent>> messagePool = new Queue<MessageWrapper<TContent>>(Enumerable.Range(0, 10).Select(i => new MessageWrapper<TContent>()));public static MessageWrapper<TContent> Allocate(){MessageWrapper<TContent> result = null;if (messagePool.Count > 0){result = messagePool.Dequeue();}else{result = new MessageWrapper<TContent>();}return result;}private static void Release(MessageWrapper<TContent> message){if (message == null) return;messagePool.Enqueue(message);}}public enum MessageType
{None = 0,EnterGame = 1,ClickButton = 2,Kill = 3,//根据使用需要不断往下添加
}public delegate void MessageCb(IMessage message);
public delegate void Hook(IMessage message);
public sealed class MessageQueue
{private static readonly int MAX_COUNT_MESSAGE = 20;private static Queue<IMessage> messages = new Queue<IMessage>();private static Dictionary<MessageType, MessageCb> listenerCb = new Dictionary<MessageType,MessageCb>();private static Dictionary<MessageType,HashSet<string>> listenerNames = new Dictionary<MessageType,HashSet<string>>();private static Dictionary<string,Hook> name2hook = new Dictionary<string,Hook>();public static void SendMessage(MessageType type,object content = null,bool sync = false){MessageWrapper<object> messageWrapper = MessageWrapper<object>.Allocate();messageWrapper.type = type;messageWrapper.content = content;messageWrapper.isSend = false;if(sync){SendMessagerInternal(messageWrapper);curCount++;}else{messages.Enqueue(messageWrapper);}}public static void SendMessage<TContent>(MessageType type, TContent content = default, bool sync = false){MessageWrapper<TContent> messageWrapper = MessageWrapper<TContent>.Allocate();messageWrapper.type = type;messageWrapper.ncontent = content;messageWrapper.isSend = false;if (sync){SendMessagerInternal(messageWrapper);curCount++;}else{messages.Enqueue(messageWrapper);}}public static void AddListener(MessageType type, MessageCb cb,string name){if(cb == null){Debug.LogError("cb is null");return;}if(string.IsNullOrEmpty(name)){Debug.LogError("name is null or empty");}if(listenerCb.TryGetValue(type,out var messageCb)){if(listenerNames[type].Contains(name)){Debug.LogError($"receiver is repeated for {type}");}else{listenerCb[type] = messageCb + cb;//委托链 listenerNames[type].Add(name);}}else{listenerCb.Add(type, cb);listenerNames.Add(type, new HashSet<string> { name });}}public static void RemoveListener(MessageType type, MessageCb cb,string name){if (listenerCb.TryGetValue(type, out var messageCb)){if(listenerNames[type].Contains(name)){if (messageCb != null){messageCb -= cb;listenerCb[type] = messageCb;}listenerNames[type].Remove(name);}else{Debug.LogWarning($"there is no receiver for {type}");} }}public static void AddHook(string name, Hook hook){if (!name2hook.ContainsKey(name)){name2hook.Add(name, hook);}else{Debug.LogWarning($"hook of {name} is existed");}}public static void RemoveHook(string name){if (name2hook.ContainsKey(name)){name2hook.Remove(name);}else{Debug.LogWarning($"hook of {name} is not existed");}}private static int curCount = 0;public static void Tick(){while(messages.Count > 0){curCount++;if (curCount > MAX_COUNT_MESSAGE){curCount = 0;break;}var message = messages.Dequeue();//做好管理,这里可以省去一些判断 SendMessagerInternal(message);}}public static void Dispose(){foreach (var item in messages){item.Release();}messages.Clear();listenerCb.Clear();listenerNames.Clear();}private static void SendMessagerInternal(IMessage message){foreach (var item in name2hook.Values){item.Invoke(message);}if(listenerCb.TryGetValue(message.type,out var listener)){if (listener != null && !message.isSend){message.isSend = true;listener(message);}else{listenerNames.Remove(message.type);Debug.LogError($"listener is null for {message.type}");}}else{Debug.LogWarning($"there is no receiver for {message.type}");}message.Release();}}
【总结】
对于实现一个消息队列,首先实现其基本功能,其次考虑各种不同的使用情况,在基本功能的基础上做额外的修改,最好考虑性能和内存,再做优化。