WaterWellDataUploadJob.cs 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. using log4net;
  2. using Newtonsoft.Json;
  3. using Quartz;
  4. using RabbitMQ.Client;
  5. using RDIFramework.Utilities;
  6. using System;
  7. using System.Collections.Generic;
  8. using System.Data;
  9. using System.Text;
  10. using TimedUpload.utils;
  11. namespace TimedUpload.QuartzJobs
  12. {
  13. [DisallowConcurrentExecution]
  14. public class WaterWellDataUploadJob : IJob
  15. {
  16. private readonly ILog log = LogManager.GetLogger(typeof(WaterWellDataUploadJob));
  17. public void Execute(IJobExecutionContext context)
  18. {
  19. string[] uploadUrls = Constants.UploadUrl.Split('|');
  20. Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
  21. Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
  22. Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
  23. foreach (string uploadUrl in uploadUrls)
  24. {
  25. ConnectionFactory factory = new ConnectionFactory();
  26. factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
  27. factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
  28. factory.Password = Constants.UploadPassword;//默认密码
  29. factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
  30. IConnection connection = factory.CreateConnection();
  31. IModel channel = connection.CreateModel();
  32. channel.QueueDeclare("waterWell.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
  33. IBasicProperties property = channel.CreateBasicProperties();
  34. property.ContentType = "text/plain";
  35. property.DeliveryMode = 2; //持久化
  36. connections.Add(uploadUrl, connection);
  37. properties.Add(uploadUrl, property);
  38. channels.Add(uploadUrl, channel);
  39. }
  40. if (channels.Count > 0)
  41. {
  42. SendWaterWellHis(channels, properties);
  43. }
  44. foreach (KeyValuePair<string, IConnection> item in connections)
  45. {
  46. IConnection connection = item.Value;
  47. connection.Close();
  48. }
  49. }
  50. /// <summary>
  51. /// 水源井历史数据
  52. /// </summary>
  53. /// <param name="channels"></param>
  54. /// <param name="properties"></param>
  55. private void SendWaterWellHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
  56. {
  57. try
  58. {
  59. log.Info("水源井历史数据同步任务开始执行.................\r\n");
  60. string factorySql = "SELECT * FROM [dbo].[水源井数据] ORDER BY 日期, 时间 ";
  61. DataTable dtDevice = dbHelper.Fill(factorySql);
  62. if (dtDevice == null || dtDevice.Rows.Count == 0)
  63. {
  64. return;
  65. }
  66. // 处理设备列表
  67. for (int i = 0; i < dtDevice.Rows.Count; i++)
  68. {
  69. DataRow dr = dtDevice.Rows[i];
  70. string id = dr["更新时间"].ToString();
  71. string deviceCode = dr["编码"].ToString();
  72. string InstantFlow = dr["井瞬时流量"].ToString();
  73. string TotalFlow = dr["井累计流量"].ToString();
  74. string VoltageA = dr["井电压A相"].ToString();
  75. string VoltageB = dr["井电压B相"].ToString();
  76. string VoltageC = dr["井电压C相"].ToString();
  77. string CurrentA = dr["井电流A相"].ToString();
  78. string CurrentB = dr["井电流B相"].ToString();
  79. string CurrentC = dr["井电流C相"].ToString();
  80. string Consumption = dr["井电能"].ToString();
  81. string Power = dr["井功率"].ToString();
  82. string IsAuto = dr["井控制模式"].ToString();
  83. string RunState = dr["井运行状态"].ToString();
  84. if (!CommonUtil.IsNumber(IsAuto))
  85. {
  86. if ("TRUE".Equals(IsAuto.ToUpper()))
  87. {
  88. IsAuto = "1";
  89. }
  90. else
  91. {
  92. IsAuto = "0";
  93. }
  94. }
  95. if (!CommonUtil.IsNumber(RunState))
  96. {
  97. if ("TRUE".Equals(RunState.ToUpper()))
  98. {
  99. RunState = "1";
  100. }
  101. else
  102. {
  103. RunState = "0";
  104. }
  105. }
  106. string readTime = dr["日期"].ToString().Trim() + " " + dr["时间"].ToString().Trim();
  107. Dictionary<string, object> deviceMap = new Dictionary<string, object>();
  108. deviceMap["DeviceCode"] = deviceCode;
  109. deviceMap["ReadTime"] = readTime;
  110. deviceMap["InstantFlow"] = InstantFlow;
  111. deviceMap["TotalFlow"] = TotalFlow;
  112. deviceMap["VoltageA"] = VoltageA;
  113. deviceMap["VoltageB"] = VoltageB;
  114. deviceMap["VoltageC"] = VoltageC;
  115. deviceMap["CurrentA"] = CurrentA;
  116. deviceMap["CurrentB"] = CurrentB;
  117. deviceMap["CurrentC"] = CurrentC;
  118. deviceMap["Consumption"] = Consumption;
  119. deviceMap["IsAuto"] = IsAuto;
  120. deviceMap["RunState"] = RunState;
  121. deviceMap["Power"] = Power;
  122. string message = JsonConvert.SerializeObject(deviceMap);
  123. foreach (KeyValuePair<string, IModel> item in channels)
  124. {
  125. string key = item.Key;
  126. IModel channel = item.Value;
  127. IBasicProperties property = properties[key];
  128. channel.BasicPublish("waterWell.deviceHis", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
  129. }
  130. // 删除数据
  131. string deleteSql = "DELETE FROM [dbo].[水源井数据] where 更新时间 = '" + id + "' and 编码 = '" + deviceCode + "'";
  132. dbHelper.ExecuteNonQuery(deleteSql);
  133. }
  134. log.Info("水源井历史记录同步任务执行结束.................\r\n");
  135. }
  136. catch (Exception ex)
  137. {
  138. log.Error("水源井历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
  139. }
  140. }
  141. static IDbProvider dbHelper
  142. {
  143. get
  144. {
  145. var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion);
  146. return DbDefine;
  147. }
  148. }
  149. }
  150. }