123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- using Apache.NMS;
- using Apache.NMS.ActiveMQ;
- namespace SIMDP.Project
- {
- public class ActiveMQ
- {
- private IConnectionFactory factory;
- private IConnection connection;
- private ISession session;
- public class ISessionMember
- {
- public IMessageProducer prod;
- public IMessageConsumer consumer;
- public ITextMessage msg;
- }
- public static Dictionary<string, ISessionMember> dict = new Dictionary<string, ISessionMember>();
- private bool isTopic = false;
- private bool hasSelector = false;
- private const string ClientID = "clientid";
- private const string Selector = "filter='demo'";
- private bool sendSuccess = true;
- private bool receiveSuccess = true;
- public bool Connect(bool isLocalMachine, string remoteAddress)
- {
- try
- {
- if (isLocalMachine)
- {
- factory = new ConnectionFactory("tcp://localhost:61616/");
- }
- else
- {
- factory = new ConnectionFactory("tcp://" + remoteAddress + ":61616/"); //写tcp://192.168.1.111:61616的形式连接其他服务器上的ActiveMQ服务器
- }
- //通过工厂建立连接
- connection = factory.CreateConnection();
- connection.ClientId = ClientID;
- connection.Start();
- //通过连接创建Session会话
- session = connection.CreateSession();
- Console.WriteLine("Begin connection...");
- return true;
- }
- catch (Exception e)
- {
- // sendSuccess = false;
- // receiveSuccess = false;
- Console.WriteLine("Exception:{0}", e.Message);
- return false;
- // Console.ReadLine();
- // throw e;
- }
- }
- public void DisConnect()
- {
- Console.WriteLine("Close connection and session...");
- connection?.Stop();
- session?.Close();
- session?.Dispose();
- connection?.Close();
- connection?.Dispose();
- }
- /// <summary>
- /// 初始化
- /// </summary>
- /// <param name="topic">选择是否是Topic</param>
- /// <param name="name">队列名</param>
- /// <param name="selector">是否设置过滤</param>
- public bool InitQueueOrTopic(bool topic, string name, bool selector = false)
- {
- ISessionMember sessionMember = new ISessionMember();
- try
- {
- //通过会话创建生产者、消费者
- if (topic)
- {
- sessionMember.prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name));
- if (selector)
- {
- sessionMember.consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name), ClientID, Selector, false);
- hasSelector = true;
- }
- else
- {
- sessionMember.consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name), ClientID, null, false);
- hasSelector = false;
- }
- isTopic = true;
- }
- else
- {
- sessionMember.prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name));
- if (selector)
- {
- sessionMember.consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name), Selector);
- hasSelector = true;
- }
- else
- {
- sessionMember.consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name));
- hasSelector = false;
- }
- isTopic = false;
- }
- //创建一个发送的消息对象
- sessionMember.msg = sessionMember.prod.CreateTextMessage();
- dict.Add(name, sessionMember);
- }
- catch (Exception e)
- {
- sendSuccess = false;
- receiveSuccess = false;
- Console.WriteLine("Exception:{0}", e.Message);
- Console.ReadLine();
- throw e;
- }
- return sendSuccess;
- }
- public bool SendMessage(string message, ISessionMember sessionMember, string msgId = "defult", MsgPriority priority = MsgPriority.Normal)
- {
- if (sessionMember.prod == null)
- {
- sendSuccess = false;
- Console.WriteLine("call InitQueueOrTopic() first!!");
- return false;
- }
- Console.WriteLine("Begin send messages...");
- //给这个对象赋实际的消息
- sessionMember.msg.NMSCorrelationID = msgId;
- sessionMember.msg.Properties["MyID"] = msgId;
- sessionMember.msg.NMSMessageId = msgId;
- sessionMember.msg.Text = message;
- Console.WriteLine(message);
- if (isTopic)
- {
- sendSuccess = ProducerSubcriber(message, priority, sessionMember.prod, sessionMember.msg);
- }
- else
- {
- sendSuccess = P2P(message, priority, sessionMember.prod, sessionMember.msg);
- }
- return sendSuccess;
- }
- public string GetMessage(ISessionMember sessionMember)
- {
- if (sessionMember.prod == null)
- {
- Console.WriteLine("call InitQueueOrTopic() first!!");
- return null;
- }
- Console.WriteLine("Begin receive messages...");
- ITextMessage revMessage = null;
- try
- {
- //同步阻塞10ms,没消息就直接返回null,注意此处时间不能设太短,否则还没取到消息就直接返回null了!!!
- revMessage = sessionMember.consumer.Receive(new TimeSpan(TimeSpan.TicksPerMillisecond * 10)) as ITextMessage;
- }
- catch (System.Exception e)
- {
- receiveSuccess = false;
- Console.WriteLine("Exception:{0}", e.Message);
- Console.ReadLine();
- throw e;
- }
- if (revMessage == null)
- {
- Console.WriteLine("No message received!");
- return null;
- }
- else
- {
- Console.WriteLine("Received message with Correlation ID: " + revMessage.NMSCorrelationID);
- //Console.WriteLine("Received message with Properties'ID: " + revMessage.Properties["MyID"]);
- Console.WriteLine("Received message with text: " + revMessage.Text);
- }
- return revMessage.Text;
- }
- //P2P模式,一个生产者对应一个消费者
- private bool P2P(string message, MsgPriority priority, IMessageProducer prod, ITextMessage msg)
- {
- try
- {
- if (hasSelector)
- {
- //设置消息对象的属性,这个很重要,是Queue的过滤条件,也是P2P消息的唯一指定属性
- msg.Properties.SetString("filter", "demo"); //P2P模式
- }
- prod.Priority = priority;
- //设置持久化
- prod.DeliveryMode = MsgDeliveryMode.Persistent;
- //生产者把消息发送出去,几个枚举参数MsgDeliveryMode是否持久化,MsgPriority消息优先级别,存活时间,当然还有其他重载
- prod.Send(msg, MsgDeliveryMode.Persistent, priority, TimeSpan.MinValue);
- }
- catch (Exception e)
- {
- sendSuccess = false;
- Console.WriteLine("Exception:{0}", e.Message);
- Console.ReadLine();
- throw e;
- }
- return sendSuccess;
- }
- //发布订阅模式,一个生产者多个消费者
- private bool ProducerSubcriber(string message, MsgPriority priority, IMessageProducer prod, ITextMessage msg)
- {
- try
- {
- prod.Priority = priority;
- //设置持久化,如果DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失
- prod.DeliveryMode = MsgDeliveryMode.Persistent;
- prod.Send(msg, Apache.NMS.MsgDeliveryMode.Persistent, priority, TimeSpan.MinValue);
- //System.Threading.Thread.Sleep(1000);
- }
- catch (Exception e)
- {
- sendSuccess = false;
- Console.WriteLine("Exception:{0}", e.Message);
- Console.ReadLine();
- throw e;
- }
- return sendSuccess;
- }
- }
- }
|