博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
WCF基于MSMQ的事件代理服务
阅读量:5349 次
发布时间:2019-06-15

本文共 14745 字,大约阅读时间需要 49 分钟。

前言

公司目前楼主负责的项目正在改版升级,对之前的服务也在作调整,项目里有个操作日志的模块,就决定把日志单独提取出来,做个日志服务,所以就有了这篇文章

正文

MSMQ作为消息队列,B/S项目调用日志服务,日志服务往消息队列发送消息,事件代理服务负责处理消息队列的消息,贴下核心代码

事件代理服务契约

using System;using System.Collections.Generic;using System.Linq;using System.ServiceModel;using System.Text;using TestService.Contract.Faults;namespace TestService.Contract{    ///     /// 事件代理    ///     [ServiceContract(Namespace = "http://TestService.Contract",                     SessionMode = SessionMode.Required,                     CallbackContract = typeof(IEventBrokerCallback))]    public interface IEventBroker    {        [OperationContract(IsOneWay = false)]        [FaultContract(typeof(EventBrokerException))]        void Subscribe(Guid subscriptionId, string[] eventNames);        [OperationContract(IsOneWay = true)]        void EndSubscription(Guid subscriptionId);    }}

事件代理服务回调处理契约

using System;using System.Collections.Generic;using System.Linq;using System.ServiceModel;using System.Text;using TestService.Contract.Data;namespace TestService.Contract{    ///     /// 事件代理回调    ///     public interface IEventBrokerCallback    {        [OperationContract(IsOneWay = true)]        void ReceiveStreamingResult(RealTimeEventMessage streamingResult);    }}

事件代理服务异常实体

using System;using System.Collections.Generic;using System.Linq;using System.Runtime.Serialization;using System.Text;namespace TestService.Contract.Faults{    [DataContract]    public class EventBrokerException    {        [DataMember]        public string Message        {            get;            set;        }        public EventBrokerException(string message)        {            Message = message;        }    }}

消息处理实体

using System;using System.Collections.Generic;using System.Linq;using System.Runtime.Serialization;using System.Text;namespace TestService.Contract.Data{    ///     /// 数据实体    ///     [DataContract]    public class RealTimeEventMessage    {        public RealTimeEventMessage()        {        }        public RealTimeEventMessage(MessageModel msg, string eventName, string entityIdType,            string description, string additionalData, DateTime date)        {            this.Msg = msg;            this.EventName = eventName;            this.EntityIdType = entityIdType;            this.Description = description;            this.AdditionalData = additionalData;            this.Date = date;        }        [DataMember]        public MessageModel Msg { get; set; }        [DataMember]        public string EventName { get; set; }        [DataMember]        public string EntityIdType { get; set; }        [DataMember]        public string Description { get; set; }        [DataMember]        public string AdditionalData { get; set; }        [DataMember]        public DateTime? Date { get; set; }    }}

以上是事件代理服务的契约部分,下面看看实现,先看EventBroker的实现

using System;using System.Collections.Generic;using System.Linq;using System.Messaging;using System.ServiceModel;using System.Threading.Tasks;using TestService.ApplicationService.Data;using TestService.ApplicationService.Services.Interface;using TestService.Common.IoC;using TestService.Contract;using TestService.Contract.Data;using TestService.Contract.Faults;namespace TestService.ApplicationService{    [IocServiceBehavior]    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple)]    public class EventBroker : ApplicationBaseService, IEventBroker    {        Dictionary
> eventNameToCallbackLookups = new Dictionary
>(); private static Object syncObj = new Object(); private static string inputQueueName = ""; private bool shouldRun = true; private static readonly TimeSpan queueReadTimeOut = TimeSpan.FromSeconds(500); private static readonly TimeSpan queuePeekTimeOut = TimeSpan.FromSeconds(30); private IXmlParserService _xmlParserService; public EventBroker(IXmlParserService xmlParserService) { inputQueueName = AppSettings.InputQueueName; StartCollectingMessage(); _xmlParserService = xmlParserService; } public void StartCollectingMessage() { try { GetMessageFromQueue(); } catch (Exception ex) { throw new FaultException
(new EventBrokerException(ex.Message), new FaultReason(ex.Message)); } } public void Subscribe(Guid subscriptionId, string[] eventNames) { try { CreateSubscription(subscriptionId, eventNames); } catch (Exception ex) { throw new FaultException
(new EventBrokerException(ex.Message), new FaultReason(ex.Message)); } } public void EndSubscription(Guid subscriptionId) { lock (syncObj) { //create new dictionary that will be populated by those remaining Dictionary
> remainingEventNameToCallbackLookups = new Dictionary
>(); foreach (KeyValuePair
> kvp in eventNameToCallbackLookups) { //get all the remaining subscribers whos session id is not the same as the one we wish to remove List
remainingMessageSubscriptions = kvp.Value.Where(x => x.CallbackSessionId != subscriptionId).ToList(); if (remainingMessageSubscriptions.Any()) { remainingEventNameToCallbackLookups.Add(kvp.Key, remainingMessageSubscriptions); } } //now left with only the subscribers that are subscribed eventNameToCallbackLookups = remainingEventNameToCallbackLookups; } } #region 私有方法 ///
/// 从消息队列中获取消息 /// private void GetMessageFromQueue() { try { Task messageQueueReaderTask = Task.Factory.StartNew(() => { using (MessageQueue queue = new MessageQueue(inputQueueName, QueueAccessMode.Receive)) { queue.Formatter = new XmlMessageFormatter(new[] { typeof(string) }); while (shouldRun) { Message message = null; try { if (!queue.IsEmpty()) { //Logs.Debug("接受队列里的消息"); message = queue.Receive(queueReadTimeOut); ProcessMessage(message); } } catch (MessageQueueException e) { if (e.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout) { Log.Warn("消息队列出现异常:", e); } } catch (Exception e) { // Write the message details to the Error queue Log.Warn("操作异常:", e); } } } }, TaskCreationOptions.LongRunning); } catch (AggregateException ex) { throw; } } ///
/// 处理消息 /// ///
private void ProcessMessage(Message msmqMessage) { string messageBody = (string)msmqMessage.Body;#if DEBUG Log.Info(string.Format("接受消息 : {0}", messageBody));#endif RealTimeEventMessage messageToSendToSubscribers = _xmlParserService.ParseRawMsmqXml(messageBody); if (messageToSendToSubscribers != null) { lock (syncObj) { if (messageToSendToSubscribers.Msg != null) { // 保存到数据库 } List
deadSubscribers = new List
(); if (eventNameToCallbackLookups.ContainsKey(messageToSendToSubscribers.EventName)) { List
uniqueCallbackHandles = eventNameToCallbackLookups[messageToSendToSubscribers.EventName]; foreach (UniqueCallbackHandle uniqueCallbackHandle in uniqueCallbackHandles) { try { uniqueCallbackHandle.Callback.ReceiveStreamingResult(messageToSendToSubscribers); } catch (CommunicationObjectAbortedException coaex) { deadSubscribers.Add(uniqueCallbackHandle.CallbackSessionId); } } } //end all subcriptions for dead subscribers foreach (Guid deadSubscriberId in deadSubscribers) { EndSubscription(deadSubscriberId); } } } } private void CreateSubscription(Guid subscriptionId, string[] eventNames) { //Ensure that a subscription is created for each message type the subscriber wants to receive lock (syncObj) { foreach (string eventName in eventNames) { if (!eventNameToCallbackLookups.ContainsKey(eventName)) { List
currentCallbacks = new List
(); eventNameToCallbackLookups[eventName] = currentCallbacks; } eventNameToCallbackLookups[eventName].Add( new UniqueCallbackHandle(subscriptionId, OperationContext.Current.GetCallbackChannel
())); } } } #endregion }}

事件代理实现里的回调处理实体

using System;using System.Collections.Generic;using System.Linq;using System.Text;using TestService.Contract;namespace TestService.ApplicationService.Data{    public class UniqueCallbackHandle    {        public UniqueCallbackHandle(Guid callbackSessionId, IEventBrokerCallback callback)        {            this.CallbackSessionId = callbackSessionId;            this.Callback = callback;        }        public Guid CallbackSessionId { get; private set; }        public IEventBrokerCallback Callback { get; private set; }    }}

 

其中事件代理服务构造方法的AppSettings.InputQueueName是读取配置文件里的专用队列名称,这里有个构造方法,后面会使用Autofac进行注入,另外还有个IXmlParserService普通业务操作的,主要是解析消息队列里存放的xml消息

using System;using System.Collections.Generic;using System.Linq;using System.Text;using TestService.Contract.Data;namespace TestService.ApplicationService.Services.Interface{    public interface IXmlParserService    {        RealTimeEventMessage ParseRawMsmqXml(string messageBody);    }}

实现

using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Xml.Linq;using TestService.ApplicationService.Services.Interface;using TestService.Contract.Data;namespace TestService.ApplicationService.Services.Impl{    public class XmlParserService : IXmlParserService    {        private MessageModel msg;        public XmlParserService(MessageModel msg)        {            this.msg = msg;        }        public RealTimeEventMessage ParseRawMsmqXml(string messageBody)        {            try            {                RealTimeEventMessage info = new RealTimeEventMessage();                XElement xelement = XElement.Parse(messageBody);                MessageModel model = new MessageModel();                model.MessageId = GetSafeString(xelement, "messageId");                model.Content = GetSafeString(xelement, "content");                model.CreateTime = GetSafeDate(xelement, "createTime") ?? DateTime.Now;                info.Msg = model;                info.EventName = GetSafeString(xelement, "eventName");                //info.EntityIdType = GetSafeString(xelement, "entityIdType");                //info.Description = GetSafeString(xelement, "description").Replace("\n\n", "\n\r");                //info.Date = GetSafeDate(xelement, "date");                //info.AdditionalData = GetSafeString(xelement, "additionalData");                return info;            }            catch (Exception ex)            {                Log.Error("解析Xml消息出现异常:" + ex);                return null;            }        }        public static Int32 GetSafeInt32(XElement root, string elementName)        {            try            {                XElement element = root.Elements().Where(node => node.Name.LocalName == elementName).Single();                return Convert.ToInt32(element.Value);            }            catch            {                return 0;            }        }        private static DateTime? GetSafeDate(XElement root, string elementName)        {            try            {                XElement element = root.Elements().Where(node => node.Name.LocalName == elementName).Single();                return DateTime.Parse(element.Value);            }            catch            {                return null;            }        }        public static String GetSafeString(XElement root, string elementName)        {            try            {                XElement element = root.Elements().Where(node => node.Name.LocalName == elementName).Single();                return element.Value;            }            catch            {                return String.Empty;            }        }        public static bool GetSafeBool(XElement root, string elementName)        {            try            {                XElement element = root.Elements().Where(node => node.Name.LocalName == elementName).Single();                return Convert.ToBoolean(element.Value);            }            catch            {                return false;            }        }    }}

这里的xml节点主要是根据消息服务里发送消息的xml节点来定的,事件代理服务的就是上面的这么多,下面看看消息服务,

using System;using System.Collections.Generic;using System.Linq;using System.ServiceModel;using System.Text;using TestService.Contract.Data;namespace TestService.Contract{    [ServiceContract]    public interface IMessageService    {        [OperationContract]        bool Send(MessageModel model);    }}

实现

using System;using System.Collections.Generic;using System.Linq;using System.Messaging;using System.Text;using TestService.Common;using TestService.Common.IoC;using TestService.Contract;using TestService.Contract.Data;namespace TestService.ApplicationService{    ///     /// 消息服务    ///     [IocServiceBehavior]    public class MessageService : ApplicationBaseService, IMessageService    {        private static string inputQueueName = "";        public MessageService()        {            inputQueueName = AppSettings.InputQueueName;        }        public bool Send(MessageModel model)        {            using (MessageQueue queue = new MessageQueue(inputQueueName, QueueAccessMode.Send))            {                queue.Formatter = new XmlMessageFormatter(new[] { typeof(string) });                try                {                    Message message = new Message(                        GetXmlData(model));                    Log.Info(string.Format("发送消息: {0}", message.Body.ToString()));                    queue.Send(message);                    return true;                }                catch (MessageQueueException mex)                {                    if (mex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout)                    {                        Log.Error(string.Format("Message queue exception occured", mex));                    }                    return false;                }                catch (Exception ex)                {                    // Write the message details to the Error queue                    Log.Error(ex);                    return false;                }            }        }        private string GetXmlData(MessageModel model)        {            StringBuilder sb = new StringBuilder("
"); sb.AppendFormat("
ClientDealEvent
"); sb.AppendFormat("
{0}
", model.MessageId); sb.AppendFormat("
{0}
", model.CreateTime); sb.AppendFormat("
{0}
", model.Content); sb.AppendLine("
"); return sb.ToString(); } }}

消息服务比较简单,就是往消息队列里发送消息,细心的人会发现,我在每个服务实现里都加了个IocServiceBehavior特性,这个主要是标识了注入了该服务

核心代码就是上面介绍的那么多,有些操作没将代码贴出来,后面会将代码开源到码云上,今天就先写到这儿了

转载于:https://www.cnblogs.com/zhouxiaoyun/p/5816357.html

你可能感兴趣的文章
360浏览器兼容模式 不能$.post (不是a 连接 onclick的问题!!)
查看>>
spring注入Properties
查看>>
【BZOJ-1055】玩具取名 区间DP
查看>>
Bit Twiddling Hacks
查看>>
LeetCode : Reverse Vowels of a String
查看>>
时间戳与日期的相互转换
查看>>
jmeter(五)创建web测试计划
查看>>
python基本数据类型
查看>>
1305: [CQOI2009]dance跳舞 - BZOJ
查看>>
关于TDD的思考
查看>>
Cocos2d-x学习之windows 7 android环境搭建
查看>>
将html代码中的大写标签转换成小写标签
查看>>
jmeter多线程组间的参数传递
查看>>
零散笔记
查看>>
MaiN
查看>>
[Python学习] 简单网络爬虫抓取博客文章及思想介绍
查看>>
触发器课程SQL Server 知识梳理九 触发器的使用
查看>>
信息浏览器从Android的浏览器中传递cookie数据到App中信息浏览器
查看>>
客户端连接linux虚拟机集群报错
查看>>
linux下部署一个JavaEE项目的简单步骤
查看>>