using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Apache.NMS; using Apache.NMS.ActiveMQ; namespace SIMDP.Project { public class ActiveMQ { private IConnectionFactory factory; private IConnection connection; private ISession session; public class ISessionMember { public IMessageProducer prod; public IMessageConsumer consumer; public ITextMessage msg; } public static Dictionary dict = new Dictionary(); private bool isTopic = false; private bool hasSelector = false; private const string ClientID = "clientid"; private const string Selector = "filter='demo'"; private bool sendSuccess = true; private bool receiveSuccess = true; public bool Connect(bool isLocalMachine, string remoteAddress) { try { if (isLocalMachine) { factory = new ConnectionFactory("tcp://localhost:61616/"); } else { factory = new ConnectionFactory("tcp://" + remoteAddress + ":61616/"); //写tcp://192.168.1.111:61616的形式连接其他服务器上的ActiveMQ服务器 } //通过工厂建立连接 connection = factory.CreateConnection(); connection.ClientId = ClientID; connection.Start(); //通过连接创建Session会话 session = connection.CreateSession(); Console.WriteLine("Begin connection..."); return true; } catch (Exception e) { // sendSuccess = false; // receiveSuccess = false; Console.WriteLine("Exception:{0}", e.Message); return false; // Console.ReadLine(); // throw e; } } public void DisConnect() { Console.WriteLine("Close connection and session..."); connection?.Stop(); session?.Close(); session?.Dispose(); connection?.Close(); connection?.Dispose(); } /// /// 初始化 /// /// 选择是否是Topic /// 队列名 /// 是否设置过滤 public bool InitQueueOrTopic(bool topic, string name, bool selector = false) { ISessionMember sessionMember = new ISessionMember(); try { //通过会话创建生产者、消费者 if (topic) { sessionMember.prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name)); if (selector) { sessionMember.consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name), ClientID, Selector, false); hasSelector = true; } else { sessionMember.consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name), ClientID, null, false); hasSelector = false; } isTopic = true; } else { sessionMember.prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name)); if (selector) { sessionMember.consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name), Selector); hasSelector = true; } else { sessionMember.consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name)); hasSelector = false; } isTopic = false; } //创建一个发送的消息对象 sessionMember.msg = sessionMember.prod.CreateTextMessage(); dict.Add(name, sessionMember); } catch (Exception e) { sendSuccess = false; receiveSuccess = false; Console.WriteLine("Exception:{0}", e.Message); Console.ReadLine(); throw e; } return sendSuccess; } public bool SendMessage(string message, ISessionMember sessionMember, string msgId = "defult", MsgPriority priority = MsgPriority.Normal) { if (sessionMember.prod == null) { sendSuccess = false; Console.WriteLine("call InitQueueOrTopic() first!!"); return false; } Console.WriteLine("Begin send messages..."); //给这个对象赋实际的消息 sessionMember.msg.NMSCorrelationID = msgId; sessionMember.msg.Properties["MyID"] = msgId; sessionMember.msg.NMSMessageId = msgId; sessionMember.msg.Text = message; Console.WriteLine(message); if (isTopic) { sendSuccess = ProducerSubcriber(message, priority, sessionMember.prod, sessionMember.msg); } else { sendSuccess = P2P(message, priority, sessionMember.prod, sessionMember.msg); } return sendSuccess; } public string GetMessage(ISessionMember sessionMember) { if (sessionMember.prod == null) { Console.WriteLine("call InitQueueOrTopic() first!!"); return null; } Console.WriteLine("Begin receive messages..."); ITextMessage revMessage = null; try { //同步阻塞10ms,没消息就直接返回null,注意此处时间不能设太短,否则还没取到消息就直接返回null了!!! revMessage = sessionMember.consumer.Receive(new TimeSpan(TimeSpan.TicksPerMillisecond * 10)) as ITextMessage; } catch (System.Exception e) { receiveSuccess = false; Console.WriteLine("Exception:{0}", e.Message); Console.ReadLine(); throw e; } if (revMessage == null) { Console.WriteLine("No message received!"); return null; } else { Console.WriteLine("Received message with Correlation ID: " + revMessage.NMSCorrelationID); //Console.WriteLine("Received message with Properties'ID: " + revMessage.Properties["MyID"]); Console.WriteLine("Received message with text: " + revMessage.Text); } return revMessage.Text; } //P2P模式,一个生产者对应一个消费者 private bool P2P(string message, MsgPriority priority, IMessageProducer prod, ITextMessage msg) { try { if (hasSelector) { //设置消息对象的属性,这个很重要,是Queue的过滤条件,也是P2P消息的唯一指定属性 msg.Properties.SetString("filter", "demo"); //P2P模式 } prod.Priority = priority; //设置持久化 prod.DeliveryMode = MsgDeliveryMode.Persistent; //生产者把消息发送出去,几个枚举参数MsgDeliveryMode是否持久化,MsgPriority消息优先级别,存活时间,当然还有其他重载 prod.Send(msg, MsgDeliveryMode.Persistent, priority, TimeSpan.MinValue); } catch (Exception e) { sendSuccess = false; Console.WriteLine("Exception:{0}", e.Message); Console.ReadLine(); throw e; } return sendSuccess; } //发布订阅模式,一个生产者多个消费者 private bool ProducerSubcriber(string message, MsgPriority priority, IMessageProducer prod, ITextMessage msg) { try { prod.Priority = priority; //设置持久化,如果DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失 prod.DeliveryMode = MsgDeliveryMode.Persistent; prod.Send(msg, Apache.NMS.MsgDeliveryMode.Persistent, priority, TimeSpan.MinValue); //System.Threading.Thread.Sleep(1000); } catch (Exception e) { sendSuccess = false; Console.WriteLine("Exception:{0}", e.Message); Console.ReadLine(); throw e; } return sendSuccess; } } }