SecondaryPumpDataUploadJob.cs 19 KB


  1. using log4net;
  2. using Newtonsoft.Json;
  3. using Quartz;
  4. using RabbitMQ.Client;
  5. using RDIFramework.Utilities;
  6. using System;
  7. using System.Collections;
  8. using System.Collections.Generic;
  9. using System.Configuration;
  10. using System.Data;
  11. using System.Text;
  12. namespace TimedUpload.QuartzJobs
  13. {
  14. [DisallowConcurrentExecution]
  15. public class SecondaryPumpDataUploadJob : IJob
  16. {
  17. private readonly ILog log = LogManager.GetLogger(typeof(SecondaryPumpDataUploadJob));
  18. public void Execute(IJobExecutionContext context)
  19. {
  20. try
  21. {
  22. string[] uploadUrls = Constants.UploadUrl.Split('|');
  23. Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
  24. Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
  25. Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
  26. foreach (string uploadUrl in uploadUrls)
  27. {
  28. ConnectionFactory factory = new ConnectionFactory();
  29. factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
  30. factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
  31. factory.Password = Constants.UploadPassword;//默认密码
  32. factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
  33. IConnection connection = factory.CreateConnection();
  34. IModel channel = connection.CreateModel();
  35. channel.QueueDeclare("secondaryPump.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
  36. IBasicProperties property = channel.CreateBasicProperties();
  37. property.ContentType = "text/plain";
  38. property.DeliveryMode = 2; //持久化
  39. connections.Add(uploadUrl, connection);
  40. properties.Add(uploadUrl, property);
  41. channels.Add(uploadUrl, channel);
  42. }
  43. if (channels.Count > 0)
  44. {
  45. SendSecondaryPumpHis(channels, properties);
  46. }
  47. foreach (KeyValuePair<string, IConnection> item in connections)
  48. {
  49. IConnection connection = item.Value;
  50. connection.Close();
  51. }
  52. }
  53. catch (Exception ex)
  54. {
  55. log.Debug(ex.Message);
  56. }
  57. }
  58. /// <summary>
  59. /// 二供历史数据
  60. /// </summary>
  61. /// <param name="channels"></param
  62. /// <param name="properties"></param>
  63. private void SendSecondaryPumpHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
  64. {
  65. try
  66. {
  67. log.Info("二供历史数据同步任务开始执行.................\r\n");
  68. string factorySql = "SELECT * FROM [dbo].[二供通用泵站] ORDER BY 日期, 时间";
  69. DataTable dtDevice = dbHelper.Fill(factorySql);
  70. string secondaryPumpColumn = Constants.SecondaryPumpColumn;
  71. if (string.IsNullOrEmpty(secondaryPumpColumn) || dtDevice == null || dtDevice.Rows.Count == 0)
  72. {
  73. return;
  74. }
  75. string newSecondaryToDMACode = Constants.SecondaryToDMACode;
  76. // 处理设备列表
  77. #region
  78. // for (int i = 0; i < dtDevice.Rows.Count; i++)
  79. // {
  80. //DataRow dr = dtDevice.Rows[i];
  81. //string id = "2023/1/17 15:37";//dr["更新时间"].ToString();
  82. //string deviceCode = "wwkj006";// dr["编码"].ToString();
  83. //string PressureIn = "0";//dr["泵进口压力"].ToString();
  84. //string PressureOut = "10.802";//dr["泵出口压力"].ToString();
  85. //string PressureSet = "0";//dr["泵设定压力"].ToString();
  86. //string InstantFlow = "0";//dr["瞬时流量"].ToString();
  87. //string TotalFlow = "0";//dr["净累计流量"].ToString();
  88. //string PositiveToTalFlow = "0";//dr["正累计流量"].ToString();
  89. //string NegativeTotalFlow = "0";//dr["负累计流量"].ToString();
  90. //string PH = "0";//dr["PH"].ToString();
  91. //string Chlorine = "0";//dr["余氯"].ToString();
  92. //string Turbidity = "0";//dr["浊度"].ToString();
  93. //string LiquidHeight = "0";//dr["水箱液位"].ToString();
  94. //string VoltageA = "0";//dr["电压AB"].ToString();
  95. //string VoltageB = "0";//dr["电压AC"].ToString();
  96. //string VoltageC = "0";//dr["电压BC"].ToString();
  97. //string CurrentA = "0";//dr["电流A"].ToString();
  98. //string CurrentB = "0";//dr["电流B"].ToString();
  99. //string CurrentC = "0";//dr["电流C"].ToString();
  100. //string Consumption = "0";//dr["用电量"].ToString();
  101. //string LackWater ="0";//dr["缺水报警"].ToString();
  102. //string OverPressure = "0";//dr["超压报警"].ToString();
  103. //string HouseInlet = "1";//dr["进水报警"].ToString(); // 进水报警0为正常,1为进水。
  104. //string TubeBurst = "0";//dr["爆管报警"].ToString();
  105. //string NetState = "0";//dr["网络状态"].ToString(); // 0代表通讯正常,1代表网络故障,2代表现场485设备通讯故障
  106. //string readTime = "2023/1/17 15:37:14";//dr["日期"].ToString().Trim() + " " + dr["时间"].ToString().Trim();
  107. #endregion
  108. for (int i = 0; i < dtDevice.Rows.Count; i++)
  109. {
  110. DataRow dr = dtDevice.Rows[i];
  111. string id = dr["更新时间"].ToString();
  112. string deviceCode = dr["编码"].ToString();
  113. string PressureIn = dr["泵进口压力"] == DBNull.Value ? "0" : dr["泵进口压力"].ToString();
  114. string PressureOut = dr["泵出口压力"] == DBNull.Value ? "0" : dr["泵出口压力"].ToString();
  115. string PressureSet = dr["泵设定压力"] == DBNull.Value ? "0" : dr["泵设定压力"].ToString();
  116. string InstantFlow = dr["瞬时流量"] == DBNull.Value ? "0" : dr["瞬时流量"].ToString();
  117. string TotalFlow = dr["净累计流量"] == DBNull.Value ? "0" : dr["净累计流量"].ToString();
  118. string PositiveToTalFlow = dr["正累计流量"] == DBNull.Value ? "0" : dr["正累计流量"].ToString();
  119. string NegativeTotalFlow = dr["负累计流量"] == DBNull.Value ? "0" : dr["负累计流量"].ToString();
  120. string PH = dr["PH"] == DBNull.Value ? "0" : dr["PH"].ToString();
  121. string Chlorine = dr["余氯"] == DBNull.Value ? "0" : dr["余氯"].ToString();
  122. string Turbidity = dr["浊度"] == DBNull.Value ? "0" : dr["浊度"].ToString();
  123. string LiquidHeight = dr["水箱液位"] == DBNull.Value ? "0" : dr["水箱液位"].ToString();
  124. string VoltageA = dr["电压AB"] == DBNull.Value ? "0" : dr["电压AB"].ToString();
  125. string VoltageB = dr["电压AC"] == DBNull.Value ? "0" : dr["电压AC"].ToString();
  126. string VoltageC = dr["电压BC"] == DBNull.Value ? "0" : dr["电压BC"].ToString();
  127. string CurrentA = dr["电流A"] == DBNull.Value ? "0" : dr["电流A"].ToString();
  128. string CurrentB = dr["电流B"] == DBNull.Value ? "0" : dr["电流B"].ToString();
  129. string CurrentC = dr["电流C"] == DBNull.Value ? "0" : dr["电流C"].ToString();
  130. string Consumption = dr["用电量"] == DBNull.Value ? "0" : dr["用电量"].ToString();
  131. string LackWater = dr["缺水报警"] == DBNull.Value ? "0" : dr["缺水报警"].ToString();
  132. string OverPressure = dr["超压报警"] == DBNull.Value ? "0" : dr["超压报警"].ToString();
  133. string HouseInlet = dr["进水报警"] == DBNull.Value ? "0" : dr["进水报警"].ToString(); // 进水报警0为正常,1为进水。
  134. string TubeBurst = dr["爆管报警"] == DBNull.Value ? "0" : dr["爆管报警"].ToString();
  135. string NetState = dr["网络状态"] == DBNull.Value ? "0" : dr["网络状态"].ToString(); // 0代表通讯正常,1代表网络故障,2代表现场485设备通讯故障
  136. string readTime = dr["日期"].ToString().Trim() + " " + dr["时间"].ToString().Trim();
  137. Dictionary<string, object> deviceMap = new Dictionary<string, object>();
  138. deviceMap["DeviceCode"] = deviceCode;
  139. deviceMap["ReadTime"] = readTime;
  140. deviceMap["PressureIN"] = PressureIn;
  141. deviceMap["PressureOut"] = PressureOut;
  142. deviceMap["PressureSet"] = PressureSet;
  143. deviceMap["InstantFlow"] = InstantFlow;
  144. deviceMap["TotalFlow"] = TotalFlow;
  145. deviceMap["PositiveToTalFlow"] = PositiveToTalFlow;
  146. deviceMap["NegativeTotalFlow"] = NegativeTotalFlow;
  147. deviceMap["PH"] = PH;
  148. deviceMap["Chlorine"] = Chlorine;
  149. deviceMap["Turbidity"] = Turbidity;
  150. deviceMap["LiquidHeight"] = LiquidHeight;
  151. deviceMap["VoltageA"] = VoltageA;
  152. deviceMap["VoltageB"] = VoltageB;
  153. deviceMap["VoltageC"] = VoltageC;
  154. deviceMap["CurrentA"] = CurrentA;
  155. deviceMap["CurrentB"] = CurrentB;
  156. deviceMap["CurrentC"] = CurrentC;
  157. deviceMap["Consumption"] = Consumption;
  158. deviceMap["LackWater"] = LackWater;
  159. deviceMap["OverPressure"] = OverPressure;
  160. deviceMap["HouseInlet"] = HouseInlet;
  161. deviceMap["TubeBurst"] = TubeBurst;
  162. deviceMap["NetState"] = NetState;
  163. ArrayList meterDataList = new ArrayList();
  164. // 处理泵的数据
  165. for (int j = 1; j < 4; j++)
  166. {
  167. Dictionary<string, object> meterMap = new Dictionary<string, object>();
  168. string colPre = "";
  169. switch (j)
  170. {
  171. case 1:
  172. colPre = "一泵";
  173. break;
  174. case 2:
  175. colPre = "二泵";
  176. break;
  177. case 3:
  178. colPre = "三泵";
  179. break;
  180. }
  181. string meterCode = "wwkj" + j.ToString().PadLeft(3, '0');
  182. string frequency = dr[colPre + "频率"] == DBNull.Value ? "0" : dr[colPre + "频率"].ToString();
  183. string current = dr[colPre + "电流"] == DBNull.Value ? "0" : dr[colPre + "电流"].ToString();
  184. string runState = dr[colPre + "运行状态"] == DBNull.Value ? "0" : dr[colPre + "运行状态"].ToString();
  185. string power = dr[colPre + "功率"] == DBNull.Value ? "0" : dr[colPre + "功率"].ToString();
  186. string voltage = dr[colPre + "电压"] == DBNull.Value ? "0" : dr[colPre + "电压"].ToString();
  187. meterMap["PumpCode"] = meterCode;
  188. meterMap["ReadTime"] = readTime;
  189. meterMap["Frequency"] = frequency;
  190. meterMap["Current"] = current;
  191. meterMap["RunState"] = runState;
  192. meterMap["Power"] = power;
  193. meterMap["Voltage"] = voltage;
  194. meterDataList.Add(meterMap);
  195. }
  196. if (meterDataList.Count == 0)
  197. {
  198. continue;
  199. }
  200. deviceMap["PumpData"] = meterDataList;
  201. string message = JsonConvert.SerializeObject(deviceMap);
  202. foreach (KeyValuePair<string, IModel> item in channels)
  203. {
  204. string key = item.Key;
  205. IModel channel = item.Value;
  206. IBasicProperties property = properties[key];
  207. channel.BasicPublish("secondaryPump.deviceHis", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
  208. }
  209. // 判断数据是否需要同步分区计量
  210. if ("1".Equals(Constants.SecondaryToDMA))
  211. {
  212. string deviceCodeTemp = SecondaryToDMA(dr, channels, properties, newSecondaryToDMACode);
  213. if ("".Equals(newSecondaryToDMACode))
  214. {
  215. newSecondaryToDMACode = deviceCodeTemp;
  216. }
  217. else
  218. {
  219. newSecondaryToDMACode = newSecondaryToDMACode + "," + deviceCodeTemp;
  220. }
  221. }
  222. // 删除数据
  223. string deleteSql = "DELETE FROM [dbo].[二供通用泵站] where 更新时间 = '" + id + "' and 编码 = '" + deviceCode + "'";
  224. dbHelper.ExecuteNonQuery(deleteSql);
  225. }
  226. if ("1".Equals(Constants.SecondaryToDMA))
  227. {
  228. UpdateAppConfig("SecondaryToDMACode", newSecondaryToDMACode);
  229. }
  230. log.Info("二供历史记录同步任务执行结束.................\r\n");
  231. }
  232. catch (Exception ex)
  233. {
  234. log.Error("二供历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
  235. }
  236. }
  237. private string SecondaryToDMA(DataRow dr, Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties, string newSecondaryToDMACode)
  238. {
  239. string deviceName = dr["编号"].ToString();
  240. string deviceCode = dr["编码"].ToString() + "sc";
  241. #region 设备同步
  242. if (newSecondaryToDMACode != null && newSecondaryToDMACode.IndexOf(deviceCode) < 0)
  243. {
  244. StringBuilder messageDevice = new StringBuilder();
  245. messageDevice.Append("{");
  246. messageDevice.Append("\"meterAssessmentName\": \"").Append(deviceName).Append("\",");
  247. messageDevice.Append("\"isPressucre\": 1,");
  248. messageDevice.Append("\"isFlow\": 1,");
  249. messageDevice.Append("\"isZoneMeter\": 1,");
  250. messageDevice.Append("\"isTradeMeter\": 0,");
  251. messageDevice.Append("\"isLargeUser\": 0,");
  252. messageDevice.Append("\"isQuality\": 1,");
  253. messageDevice.Append("\"meterAssessmentCode\": \"").Append(deviceCode).Append("\",");
  254. messageDevice.Append("\"manufacturerCode\": \"").Append(Constants.ManufacturerCode).Append("\",");
  255. messageDevice.Append("\"meterTypeId\": \"2\"");
  256. messageDevice.Append("}");
  257. foreach (KeyValuePair<string, IModel> item in channels)
  258. {
  259. string key = item.Key;
  260. IModel channel = item.Value;
  261. IBasicProperties property = properties[key];
  262. channel.BasicPublish("zone.device", "", property, Encoding.UTF8.GetBytes(messageDevice.ToString())); //生产消息
  263. }
  264. }
  265. #endregion
  266. #region 历史数据同步
  267. String getDateTime = Convert.ToDateTime(dr["更新时间"]).ToString("yyyy-MM-dd HH:mm:ss");
  268. StringBuilder messageHis = new StringBuilder();
  269. messageHis.Append("{");
  270. messageHis.Append("\"meterAssessmentCode\": \"").Append(deviceCode).Append("\",");
  271. messageHis.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(",");
  272. messageHis.Append("\"getDateTime\": \"").Append(getDateTime).Append("\",");
  273. if (Convert.DBNull != dr["净累计流量"])
  274. {
  275. messageHis.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(dr["净累计流量"])).Append(",");
  276. }
  277. if (Convert.DBNull != dr["正累计流量"])
  278. {
  279. messageHis.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(dr["正累计流量"])).Append(",");
  280. }
  281. if (Convert.DBNull != dr["负累计流量"])
  282. {
  283. messageHis.Append("\"negativeCumulativeFlow\": ").Append(Convert.ToDecimal(dr["负累计流量"])).Append(",");
  284. }
  285. if (Convert.DBNull != dr["瞬时流量"])
  286. {
  287. messageHis.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(dr["瞬时流量"])).Append(",");
  288. }
  289. if (Convert.DBNull != dr["泵出口压力"])
  290. {
  291. messageHis.Append("\"pressure\": ").Append(Convert.ToDecimal(dr["泵出口压力"])).Append(",");
  292. }
  293. if (Convert.DBNull != dr["PH"])
  294. {
  295. messageHis.Append("\"ph\": ").Append(Convert.ToDecimal(dr["PH"])).Append(",");
  296. }
  297. if (Convert.DBNull != dr["余氯"])
  298. {
  299. messageHis.Append("\"chlorine\": ").Append(Convert.ToDecimal(dr["余氯"])).Append(",");
  300. }
  301. if (Convert.DBNull != dr["浊度"])
  302. {
  303. messageHis.Append("\"turbidity\": ").Append(Convert.ToDecimal(dr["浊度"])).Append(",");
  304. }
  305. messageHis.Append("}");
  306. foreach (KeyValuePair<string, IModel> item in channels)
  307. {
  308. string key = item.Key;
  309. IModel channel = item.Value;
  310. IBasicProperties property = properties[key];
  311. channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(messageHis.ToString())); //生产消息
  312. }
  313. #endregion
  314. return deviceCode;
  315. }
  316. /// <summary>
  317. /// 更新配置文件中的值
  318. /// </summary>
  319. /// <param name="key">键</param>
  320. /// <param name="value">值</param>
  321. private void UpdateAppConfig(String key, String value)
  322. {
  323. var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
  324. cfg.AppSettings.Settings[key].Value = value;
  325. cfg.Save();
  326. ConfigurationManager.RefreshSection("appSettings");
  327. }
  328. static IDbProvider dbHelper
  329. {
  330. get
  331. {
  332. var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion);
  333. return DbDefine;
  334. }
  335. }
  336. }
  337. }