WaterFactoryDataUploadJob.cs 9.7 KB


  1. using log4net;
  2. using Newtonsoft.Json;
  3. using Quartz;
  4. using RabbitMQ.Client;
  5. using RDIFramework.Utilities;
  6. using System;
  7. using System.Collections;
  8. using System.Collections.Generic;
  9. using System.Data;
  10. using System.Text;
  11. namespace TimedUpload.QuartzJobs
  12. {
  13. [DisallowConcurrentExecution]
  14. public class WaterFactoryDataUploadJob : IJob
  15. {
  16. private readonly ILog log = LogManager.GetLogger(typeof(WaterFactoryDataUploadJob));
  17. public void Execute(IJobExecutionContext context)
  18. {
  19. string[] uploadUrls = Constants.UploadUrl.Split('|');
  20. Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
  21. Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
  22. Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
  23. foreach (string uploadUrl in uploadUrls)
  24. {
  25. ConnectionFactory factory = new ConnectionFactory();
  26. factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
  27. factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
  28. factory.Password = Constants.UploadPassword;//默认密码
  29. factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
  30. IConnection connection = factory.CreateConnection();
  31. IModel channel = connection.CreateModel();
  32. channel.QueueDeclare("waterFactory.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
  33. IBasicProperties property = channel.CreateBasicProperties();
  34. property.ContentType = "text/plain";
  35. property.DeliveryMode = 2; //持久化
  36. connections.Add(uploadUrl, connection);
  37. properties.Add(uploadUrl, property);
  38. channels.Add(uploadUrl, channel);
  39. }
  40. if (channels.Count > 0)
  41. {
  42. SendWaterFactoryHis(channels, properties);
  43. }
  44. foreach (KeyValuePair<string, IConnection> item in connections)
  45. {
  46. IConnection connection = item.Value;
  47. connection.Close();
  48. }
  49. }
  50. /// <summary>
  51. /// 水厂历史数据
  52. /// </summary>
  53. /// <param name="channels"></param>
  54. /// <param name="properties"></param>
  55. private void SendWaterFactoryHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
  56. {
  57. try
  58. {
  59. log.Info("水厂历史数据同步任务开始执行.................\r\n");
  60. string factorySql = "select * from 水厂信息";
  61. DataTable dtFactory = dbHelper.Fill(factorySql);
  62. string waterFactoryColumn = Constants.WaterFactoryColumn;
  63. if (string.IsNullOrEmpty(waterFactoryColumn) || dtFactory == null || dtFactory.Rows.Count == 0)
  64. {
  65. return;
  66. }
  67. // 处理水厂水表
  68. for (int i = 0; i < dtFactory.Rows.Count; i++)
  69. {
  70. string factoryId = dtFactory.Rows[i]["FactoryId"].ToString();
  71. string factoryCode = dtFactory.Rows[i]["FactoryCode"].ToString();
  72. string meterSql = "select * from 水厂表具 where FactoryId = " + factoryId;
  73. DataTable dtMeter = dbHelper.Fill(meterSql);
  74. if (dtMeter == null || dtMeter.Rows.Count ==0)
  75. {
  76. continue;
  77. }
  78. Dictionary<string, object> factoryMap = new Dictionary<string, object>();
  79. factoryMap["FactoryCode"] = factoryCode;
  80. // 处理水厂基础数据
  81. string baseSql = "SELECT RTRIM(日期) + ' ' + RTRIM(时间) ReadTime,反冲正累计流量 - 反冲负累计流量 BackWashing,[清水池液位] LiquidHeight,更新时间 FROM [dbo].[水厂泵房2] WHERE FactoryId = " + factoryId + " ORDER BY 日期, 时间 ";
  82. DataTable baseDt = dbHelper.Fill(baseSql);
  83. for (int l = 0; l < baseDt.Rows.Count; l++)
  84. {
  85. DataRow baseRow = baseDt.Rows[l];
  86. Dictionary<string, object> baseMap = new Dictionary<string, object>();
  87. string id = baseRow["更新时间"].ToString();
  88. string time = baseRow["ReadTime"].ToString();
  89. baseMap["ReadTime"] = time;
  90. baseMap["BackWashing"] = baseRow["BackWashing"].ToString();
  91. baseMap["LiquidHeight"] = baseRow["LiquidHeight"].ToString();
  92. baseMap["Consumption"] = "0";
  93. baseMap["CleanWaterFlow"] = "0";
  94. baseMap["Dosage"] = "0";
  95. factoryMap["BaseData"] = baseMap;
  96. ArrayList meterDataList = new ArrayList();
  97. // 处理水厂查询的数据
  98. for (int j = 0; j < dtMeter.Rows.Count; j++)
  99. {
  100. string tableName = dtMeter.Rows[j]["TableName"].ToString();
  101. string meterId = dtMeter.Rows[j]["MeterId"].ToString();
  102. string meterCode = dtMeter.Rows[j]["MeterCode"].ToString();
  103. string colSql = "select * from 水厂数据字段 where MeterId = " + meterId;
  104. DataTable dtMeterCol = dbHelper.Fill(colSql);
  105. if (dtMeterCol == null || dtMeterCol.Rows.Count == 0)
  106. {
  107. continue;
  108. }
  109. string queryCol = "";
  110. string[] queryColArr = new string[dtMeterCol.Rows.Count];
  111. for (int k = 0; k < dtMeterCol.Rows.Count; k++)
  112. {
  113. string tableColumn = dtMeterCol.Rows[k]["TableColumn"].ToString();
  114. string sendKey = dtMeterCol.Rows[k]["SendKey"].ToString();
  115. if (k == dtMeterCol.Rows.Count - 1)
  116. {
  117. queryCol += tableColumn + " " + sendKey;
  118. }
  119. else
  120. {
  121. queryCol += tableColumn + " " + sendKey + ",";
  122. }
  123. queryColArr[k] = sendKey;
  124. }
  125. Dictionary<string, object> meterMap = new Dictionary<string, object>();
  126. string[] colArr = waterFactoryColumn.Split(',');
  127. foreach (string col in colArr)
  128. {
  129. meterMap[col] = null;
  130. }
  131. string meterDataSql = "select " + queryCol + " from " + tableName + " where 更新时间 = '" + id + "'";
  132. DataTable data = dbHelper.Fill(meterDataSql);
  133. if (data == null || data.Rows.Count == 0)
  134. {
  135. continue;
  136. }
  137. foreach (string col in queryColArr)
  138. {
  139. meterMap[col] = data.Rows[0][col];
  140. }
  141. meterMap["MeterCode"] = meterCode;
  142. meterDataList.Add(meterMap);
  143. }
  144. factoryMap["MeterData"] = meterDataList;
  145. string message = JsonConvert.SerializeObject(factoryMap);
  146. foreach (KeyValuePair<string, IModel> item in channels)
  147. {
  148. string key = item.Key;
  149. IModel channel = item.Value;
  150. IBasicProperties property = properties[key];
  151. channel.BasicPublish("waterFactory.deviceHis", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
  152. }
  153. // 删除数据
  154. string deleteSql = "DELETE FROM [dbo].[水厂泵房2] where 更新时间 = '" + id + "' and FactoryId = " + factoryId;
  155. deleteSql += ";DELETE FROM [dbo].[水厂过滤间GGD] where 更新时间 = '" + id + "' and FactoryId = " + factoryId;
  156. deleteSql += ";DELETE FROM [dbo].[水厂泵房1] where 更新时间 = '" + id + "' and FactoryId = " + factoryId;
  157. deleteSql += ";DELETE FROM [dbo].[水厂过滤间操作台] where 更新时间 = '" + id + "' and FactoryId = " + factoryId;
  158. dbHelper.ExecuteNonQuery(deleteSql);
  159. }
  160. }
  161. log.Info("水厂历史记录同步任务执行结束.................\r\n");
  162. }
  163. catch (Exception ex)
  164. {
  165. log.Error("水厂历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
  166. }
  167. }
  168. static IDbProvider dbHelper
  169. {
  170. get
  171. {
  172. var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion);
  173. return DbDefine;
  174. }
  175. }
  176. }
  177. }