SecondaryPumpDataUploadJob.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. using log4net;
  2. using Newtonsoft.Json;
  3. using Quartz;
  4. using RabbitMQ.Client;
  5. using RDIFramework.Utilities;
  6. using System;
  7. using System.Collections;
  8. using System.Collections.Generic;
  9. using System.Configuration;
  10. using System.Data;
  11. using System.Text;
  12. namespace TimedUpload.QuartzJobs
  13. {
  14. [DisallowConcurrentExecution]
  15. public class SecondaryPumpDataUploadJob : IJob
  16. {
  17. private readonly ILog log = LogManager.GetLogger(typeof(SecondaryPumpDataUploadJob));
  18. public void Execute(IJobExecutionContext context)
  19. {
  20. string[] uploadUrls = Constants.UploadUrl.Split('|');
  21. Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
  22. Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
  23. Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
  24. foreach (string uploadUrl in uploadUrls)
  25. {
  26. ConnectionFactory factory = new ConnectionFactory();
  27. factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
  28. factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
  29. factory.Password = Constants.UploadPassword;//默认密码
  30. factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
  31. IConnection connection = factory.CreateConnection();
  32. IModel channel = connection.CreateModel();
  33. channel.QueueDeclare("secondaryPump.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
  34. IBasicProperties property = channel.CreateBasicProperties();
  35. property.ContentType = "text/plain";
  36. property.DeliveryMode = 2; //持久化
  37. connections.Add(uploadUrl, connection);
  38. properties.Add(uploadUrl, property);
  39. channels.Add(uploadUrl, channel);
  40. }
  41. if (channels.Count > 0)
  42. {
  43. SendSecondaryPumpHis(channels, properties);
  44. }
  45. foreach (KeyValuePair<string, IConnection> item in connections)
  46. {
  47. IConnection connection = item.Value;
  48. connection.Close();
  49. }
  50. }
  51. /// <summary>
  52. /// 二供历史数据
  53. /// </summary>
  54. /// <param name="channels"></param
  55. /// <param name="properties"></param>
  56. private void SendSecondaryPumpHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
  57. {
  58. try
  59. {
  60. log.Info("二供历史数据同步任务开始执行.................\r\n");
  61. string factorySql = "SELECT * FROM [dbo].[二供通用泵站] ORDER BY 日期, 时间";
  62. DataTable dtDevice = dbHelper.Fill(factorySql);
  63. string secondaryPumpColumn = Constants.SecondaryPumpColumn;
  64. if (string.IsNullOrEmpty(secondaryPumpColumn) || dtDevice == null || dtDevice.Rows.Count == 0)
  65. {
  66. return;
  67. }
  68. string newSecondaryToDMACode = Constants.SecondaryToDMACode;
  69. // 处理设备列表
  70. for (int i = 0; i < dtDevice.Rows.Count; i++)
  71. {
  72. DataRow dr = dtDevice.Rows[i];
  73. string id = dr["更新时间"].ToString();
  74. string deviceCode = dr["编码"].ToString();
  75. string PressureIn = dr["泵进口压力"].ToString();
  76. string PressureOut = dr["泵出口压力"].ToString();
  77. string PressureSet = dr["泵设定压力"].ToString();
  78. string InstantFlow = dr["瞬时流量"].ToString();
  79. string TotalFlow = dr["净累计流量"].ToString();
  80. string PositiveToTalFlow = dr["正累计流量"].ToString();
  81. string NegativeTotalFlow = dr["负累计流量"].ToString();
  82. string PH = dr["PH"].ToString();
  83. string Chlorine = dr["余氯"].ToString();
  84. string Turbidity = dr["浊度"].ToString();
  85. string LiquidHeight = dr["水箱液位"].ToString();
  86. string VoltageA = dr["电压AB"].ToString();
  87. string VoltageB = dr["电压AC"].ToString();
  88. string VoltageC = dr["电压BC"].ToString();
  89. string CurrentA = dr["电流A"].ToString();
  90. string CurrentB = dr["电流B"].ToString();
  91. string CurrentC = dr["电流C"].ToString();
  92. string Consumption = dr["用电量"].ToString();
  93. string LackWater = dr["缺水报警"].ToString();
  94. string OverPressure = dr["超压报警"].ToString();
  95. string HouseInlet = dr["进水报警"].ToString();
  96. string TubeBurst = dr["爆管报警"].ToString();
  97. string readTime = dr["日期"].ToString().Trim() + " " + dr["时间"].ToString().Trim();
  98. Dictionary<string, object> deviceMap = new Dictionary<string, object>();
  99. deviceMap["DeviceCode"] = deviceCode;
  100. deviceMap["ReadTime"] = readTime;
  101. deviceMap["PressureIN"] = PressureIn;
  102. deviceMap["PressureOut"] = PressureOut;
  103. deviceMap["PressureSet"] = PressureSet;
  104. deviceMap["InstantFlow"] = InstantFlow;
  105. deviceMap["TotalFlow"] = TotalFlow;
  106. deviceMap["PositiveToTalFlow"] = PositiveToTalFlow;
  107. deviceMap["NegativeTotalFlow"] = NegativeTotalFlow;
  108. deviceMap["PH"] = PH;
  109. deviceMap["Chlorine"] = Chlorine;
  110. deviceMap["Turbidity"] = Turbidity;
  111. deviceMap["LiquidHeight"] = LiquidHeight;
  112. deviceMap["VoltageA"] = VoltageA;
  113. deviceMap["VoltageB"] = VoltageB;
  114. deviceMap["VoltageC"] = VoltageC;
  115. deviceMap["CurrentA"] = CurrentA;
  116. deviceMap["CurrentB"] = CurrentB;
  117. deviceMap["CurrentC"] = CurrentC;
  118. deviceMap["Consumption"] = Consumption;
  119. deviceMap["LackWater"] = LackWater;
  120. deviceMap["OverPressure"] = OverPressure;
  121. deviceMap["HouseInlet"] = HouseInlet;
  122. deviceMap["TubeBurst"] = TubeBurst;
  123. ArrayList meterDataList = new ArrayList();
  124. // 处理泵的数据
  125. for (int j = 1; j < 4; j++)
  126. {
  127. Dictionary<string, object> meterMap = new Dictionary<string, object>();
  128. string colPre = "";
  129. switch (j)
  130. {
  131. case 1:
  132. colPre = "一泵";
  133. break;
  134. case 2:
  135. colPre = "二泵";
  136. break;
  137. case 3:
  138. colPre = "三泵";
  139. break;
  140. }
  141. string meterCode = "wwkj" + j.ToString().PadLeft(3, '0');
  142. string frequency = dr[colPre + "频率"].ToString();
  143. string current = dr[colPre + "电流"].ToString();
  144. string runState = dr[colPre + "运行状态"].ToString();
  145. string power = dr[colPre + "功率"].ToString();
  146. string voltage = dr[colPre + "电压"].ToString();
  147. meterMap["PumpCode"] = meterCode;
  148. meterMap["ReadTime"] = readTime;
  149. meterMap["Frequency"] = frequency;
  150. meterMap["Current"] = current;
  151. meterMap["RunState"] = runState;
  152. meterMap["Power"] = power;
  153. meterMap["Voltage"] = voltage;
  154. meterDataList.Add(meterMap);
  155. }
  156. if (meterDataList.Count == 0)
  157. {
  158. continue;
  159. }
  160. deviceMap["PumpData"] = meterDataList;
  161. string message = JsonConvert.SerializeObject(deviceMap);
  162. foreach (KeyValuePair<string, IModel> item in channels)
  163. {
  164. string key = item.Key;
  165. IModel channel = item.Value;
  166. IBasicProperties property = properties[key];
  167. channel.BasicPublish("secondaryPump.deviceHis", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
  168. }
  169. // 判断数据是否需要同步分区计量
  170. if ("1".Equals(Constants.SecondaryToDMA))
  171. {
  172. string deviceCodeTemp = SecondaryToDMA(dr, channels, properties, newSecondaryToDMACode);
  173. if ("".Equals(newSecondaryToDMACode))
  174. {
  175. newSecondaryToDMACode = deviceCodeTemp;
  176. }
  177. else
  178. {
  179. newSecondaryToDMACode = newSecondaryToDMACode + "," + deviceCodeTemp;
  180. }
  181. }
  182. // 删除数据
  183. string deleteSql = "DELETE FROM [dbo].[二供通用泵站] where 更新时间 = '" + id + "' and 编码 = '" + deviceCode + "'";
  184. dbHelper.ExecuteNonQuery(deleteSql);
  185. }
  186. if ("1".Equals(Constants.SecondaryToDMA))
  187. {
  188. UpdateAppConfig("SecondaryToDMACode", newSecondaryToDMACode);
  189. }
  190. log.Info("二供历史记录同步任务执行结束.................\r\n");
  191. }
  192. catch (Exception ex)
  193. {
  194. log.Error("二供历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
  195. }
  196. }
  197. private string SecondaryToDMA(DataRow dr, Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties, string newSecondaryToDMACode)
  198. {
  199. string deviceName = dr["编号"].ToString();
  200. string deviceCode = dr["编码"].ToString() + "sc";
  201. #region 设备同步
  202. if (newSecondaryToDMACode != null && newSecondaryToDMACode.IndexOf(deviceCode) < 0)
  203. {
  204. StringBuilder messageDevice = new StringBuilder();
  205. messageDevice.Append("{");
  206. messageDevice.Append("\"meterAssessmentName\": \"").Append(deviceName).Append("\",");
  207. messageDevice.Append("\"isPressucre\": 1,");
  208. messageDevice.Append("\"isFlow\": 1,");
  209. messageDevice.Append("\"isZoneMeter\": 1,");
  210. messageDevice.Append("\"isTradeMeter\": 0,");
  211. messageDevice.Append("\"isLargeUser\": 0,");
  212. messageDevice.Append("\"isQuality\": 1,");
  213. messageDevice.Append("\"meterAssessmentCode\": \"").Append(deviceCode).Append("\",");
  214. messageDevice.Append("\"manufacturerCode\": \"").Append(Constants.ManufacturerCode).Append("\",");
  215. messageDevice.Append("\"meterTypeId\": \"2\"");
  216. messageDevice.Append("}");
  217. foreach (KeyValuePair<string, IModel> item in channels)
  218. {
  219. string key = item.Key;
  220. IModel channel = item.Value;
  221. IBasicProperties property = properties[key];
  222. channel.BasicPublish("zone.device", "", property, Encoding.UTF8.GetBytes(messageDevice.ToString())); //生产消息
  223. }
  224. }
  225. #endregion
  226. #region 历史数据同步
  227. String getDateTime = Convert.ToDateTime(dr["更新时间"]).ToString("yyyy-MM-dd HH:mm:ss");
  228. StringBuilder messageHis = new StringBuilder();
  229. messageHis.Append("{");
  230. messageHis.Append("\"meterAssessmentCode\": \"").Append(deviceCode).Append("\",");
  231. messageHis.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(",");
  232. messageHis.Append("\"getDateTime\": \"").Append(getDateTime).Append("\",");
  233. if (Convert.DBNull != dr["净累计流量"])
  234. {
  235. messageHis.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(dr["净累计流量"])).Append(",");
  236. }
  237. if (Convert.DBNull != dr["正累计流量"])
  238. {
  239. messageHis.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(dr["正累计流量"])).Append(",");
  240. }
  241. if (Convert.DBNull != dr["负累计流量"])
  242. {
  243. messageHis.Append("\"negativeCumulativeFlow\": ").Append(Convert.ToDecimal(dr["负累计流量"])).Append(",");
  244. }
  245. if (Convert.DBNull != dr["瞬时流量"])
  246. {
  247. messageHis.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(dr["瞬时流量"])).Append(",");
  248. }
  249. if (Convert.DBNull != dr["泵出口压力"])
  250. {
  251. messageHis.Append("\"pressure\": ").Append(Convert.ToDecimal(dr["泵出口压力"])).Append(",");
  252. }
  253. if (Convert.DBNull != dr["PH"])
  254. {
  255. messageHis.Append("\"ph\": ").Append(Convert.ToDecimal(dr["PH"])).Append(",");
  256. }
  257. if (Convert.DBNull != dr["余氯"])
  258. {
  259. messageHis.Append("\"chlorine\": ").Append(Convert.ToDecimal(dr["余氯"])).Append(",");
  260. }
  261. if (Convert.DBNull != dr["浊度"])
  262. {
  263. messageHis.Append("\"turbidity\": ").Append(Convert.ToDecimal(dr["浊度"])).Append(",");
  264. }
  265. messageHis.Append("}");
  266. foreach (KeyValuePair<string, IModel> item in channels)
  267. {
  268. string key = item.Key;
  269. IModel channel = item.Value;
  270. IBasicProperties property = properties[key];
  271. channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(messageHis.ToString())); //生产消息
  272. }
  273. #endregion
  274. return deviceCode;
  275. }
  276. /// <summary>
  277. /// 更新配置文件中的值
  278. /// </summary>
  279. /// <param name="key">键</param>
  280. /// <param name="value">值</param>
  281. private void UpdateAppConfig(String key, String value)
  282. {
  283. var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
  284. cfg.AppSettings.Settings[key].Value = value;
  285. cfg.Save();
  286. ConfigurationManager.RefreshSection("appSettings");
  287. }
  288. static IDbProvider dbHelper
  289. {
  290. get
  291. {
  292. var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion);
  293. return DbDefine;
  294. }
  295. }
  296. }
  297. }