ActiveMQ.cs 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading.Tasks;
  6. using Apache.NMS;
  7. using Apache.NMS.ActiveMQ;
  8. namespace SIMDP.Project
  9. {
  10. public class ActiveMQ
  11. {
  12. private IConnectionFactory factory;
  13. private IConnection connection;
  14. private ISession session;
  15. public class ISessionMember
  16. {
  17. public IMessageProducer prod;
  18. public IMessageConsumer consumer;
  19. public ITextMessage msg;
  20. }
  21. public static Dictionary<string, ISessionMember> dict = new Dictionary<string, ISessionMember>();
  22. private bool isTopic = false;
  23. private bool hasSelector = false;
  24. private const string ClientID = "clientid";
  25. private const string Selector = "filter='demo'";
  26. private bool sendSuccess = true;
  27. private bool receiveSuccess = true;
  28. public bool Connect(bool isLocalMachine, string remoteAddress)
  29. {
  30. try
  31. {
  32. if (isLocalMachine)
  33. {
  34. factory = new ConnectionFactory("tcp://localhost:61616/");
  35. }
  36. else
  37. {
  38. factory = new ConnectionFactory("tcp://" + remoteAddress + ":61616/"); //写tcp://192.168.1.111:61616的形式连接其他服务器上的ActiveMQ服务器
  39. }
  40. //通过工厂建立连接
  41. connection = factory.CreateConnection();
  42. connection.ClientId = ClientID;
  43. connection.Start();
  44. //通过连接创建Session会话
  45. session = connection.CreateSession();
  46. Console.WriteLine("Begin connection...");
  47. return true;
  48. }
  49. catch (Exception e)
  50. {
  51. // sendSuccess = false;
  52. // receiveSuccess = false;
  53. Console.WriteLine("Exception:{0}", e.Message);
  54. return false;
  55. // Console.ReadLine();
  56. // throw e;
  57. }
  58. }
  59. public void DisConnect()
  60. {
  61. Console.WriteLine("Close connection and session...");
  62. connection?.Stop();
  63. session?.Close();
  64. session?.Dispose();
  65. connection?.Close();
  66. connection?.Dispose();
  67. }
  68. /// <summary>
  69. /// 初始化
  70. /// </summary>
  71. /// <param name="topic">选择是否是Topic</param>
  72. /// <param name="name">队列名</param>
  73. /// <param name="selector">是否设置过滤</param>
  74. public bool InitQueueOrTopic(bool topic, string name, bool selector = false)
  75. {
  76. ISessionMember sessionMember = new ISessionMember();
  77. try
  78. {
  79. //通过会话创建生产者、消费者
  80. if (topic)
  81. {
  82. sessionMember.prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name));
  83. if (selector)
  84. {
  85. sessionMember.consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name), ClientID, Selector, false);
  86. hasSelector = true;
  87. }
  88. else
  89. {
  90. sessionMember.consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name), ClientID, null, false);
  91. hasSelector = false;
  92. }
  93. isTopic = true;
  94. }
  95. else
  96. {
  97. sessionMember.prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name));
  98. if (selector)
  99. {
  100. sessionMember.consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name), Selector);
  101. hasSelector = true;
  102. }
  103. else
  104. {
  105. sessionMember.consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name));
  106. hasSelector = false;
  107. }
  108. isTopic = false;
  109. }
  110. //创建一个发送的消息对象
  111. sessionMember.msg = sessionMember.prod.CreateTextMessage();
  112. dict.Add(name, sessionMember);
  113. }
  114. catch (Exception e)
  115. {
  116. sendSuccess = false;
  117. receiveSuccess = false;
  118. Console.WriteLine("Exception:{0}", e.Message);
  119. Console.ReadLine();
  120. throw e;
  121. }
  122. return sendSuccess;
  123. }
  124. public bool SendMessage(string message, ISessionMember sessionMember, string msgId = "defult", MsgPriority priority = MsgPriority.Normal)
  125. {
  126. if (sessionMember.prod == null)
  127. {
  128. sendSuccess = false;
  129. Console.WriteLine("call InitQueueOrTopic() first!!");
  130. return false;
  131. }
  132. Console.WriteLine("Begin send messages...");
  133. //给这个对象赋实际的消息
  134. sessionMember.msg.NMSCorrelationID = msgId;
  135. sessionMember.msg.Properties["MyID"] = msgId;
  136. sessionMember.msg.NMSMessageId = msgId;
  137. sessionMember.msg.Text = message;
  138. Console.WriteLine(message);
  139. if (isTopic)
  140. {
  141. sendSuccess = ProducerSubcriber(message, priority, sessionMember.prod, sessionMember.msg);
  142. }
  143. else
  144. {
  145. sendSuccess = P2P(message, priority, sessionMember.prod, sessionMember.msg);
  146. }
  147. return sendSuccess;
  148. }
  149. public string GetMessage(ISessionMember sessionMember)
  150. {
  151. if (sessionMember.prod == null)
  152. {
  153. Console.WriteLine("call InitQueueOrTopic() first!!");
  154. return null;
  155. }
  156. Console.WriteLine("Begin receive messages...");
  157. ITextMessage revMessage = null;
  158. try
  159. {
  160. //同步阻塞10ms,没消息就直接返回null,注意此处时间不能设太短,否则还没取到消息就直接返回null了!!!
  161. revMessage = sessionMember.consumer.Receive(new TimeSpan(TimeSpan.TicksPerMillisecond * 10)) as ITextMessage;
  162. }
  163. catch (System.Exception e)
  164. {
  165. receiveSuccess = false;
  166. Console.WriteLine("Exception:{0}", e.Message);
  167. Console.ReadLine();
  168. throw e;
  169. }
  170. if (revMessage == null)
  171. {
  172. Console.WriteLine("No message received!");
  173. return null;
  174. }
  175. else
  176. {
  177. Console.WriteLine("Received message with Correlation ID: " + revMessage.NMSCorrelationID);
  178. //Console.WriteLine("Received message with Properties'ID: " + revMessage.Properties["MyID"]);
  179. Console.WriteLine("Received message with text: " + revMessage.Text);
  180. }
  181. return revMessage.Text;
  182. }
  183. //P2P模式,一个生产者对应一个消费者
  184. private bool P2P(string message, MsgPriority priority, IMessageProducer prod, ITextMessage msg)
  185. {
  186. try
  187. {
  188. if (hasSelector)
  189. {
  190. //设置消息对象的属性,这个很重要,是Queue的过滤条件,也是P2P消息的唯一指定属性
  191. msg.Properties.SetString("filter", "demo"); //P2P模式
  192. }
  193. prod.Priority = priority;
  194. //设置持久化
  195. prod.DeliveryMode = MsgDeliveryMode.Persistent;
  196. //生产者把消息发送出去,几个枚举参数MsgDeliveryMode是否持久化,MsgPriority消息优先级别,存活时间,当然还有其他重载
  197. prod.Send(msg, MsgDeliveryMode.Persistent, priority, TimeSpan.MinValue);
  198. }
  199. catch (Exception e)
  200. {
  201. sendSuccess = false;
  202. Console.WriteLine("Exception:{0}", e.Message);
  203. Console.ReadLine();
  204. throw e;
  205. }
  206. return sendSuccess;
  207. }
  208. //发布订阅模式,一个生产者多个消费者
  209. private bool ProducerSubcriber(string message, MsgPriority priority, IMessageProducer prod, ITextMessage msg)
  210. {
  211. try
  212. {
  213. prod.Priority = priority;
  214. //设置持久化,如果DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失
  215. prod.DeliveryMode = MsgDeliveryMode.Persistent;
  216. prod.Send(msg, Apache.NMS.MsgDeliveryMode.Persistent, priority, TimeSpan.MinValue);
  217. //System.Threading.Thread.Sleep(1000);
  218. }
  219. catch (Exception e)
  220. {
  221. sendSuccess = false;
  222. Console.WriteLine("Exception:{0}", e.Message);
  223. Console.ReadLine();
  224. throw e;
  225. }
  226. return sendSuccess;
  227. }
  228. }
  229. }