SecondaryPumpDataUploadJob.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. using log4net;
  2. using Newtonsoft.Json;
  3. using Quartz;
  4. using RabbitMQ.Client;
  5. using RDIFramework.Utilities;
  6. using System;
  7. using System.Collections;
  8. using System.Collections.Generic;
  9. using System.Configuration;
  10. using System.Data;
  11. using System.Text;
  12. namespace TimedUpload.QuartzJobs
  13. {
  14. [DisallowConcurrentExecution]
  15. public class SecondaryPumpDataUploadJob : IJob
  16. {
  17. private readonly ILog log = LogManager.GetLogger(typeof(SecondaryPumpDataUploadJob));
  18. public void Execute(IJobExecutionContext context)
  19. {
  20. try
  21. {
  22. string[] uploadUrls = Constants.UploadUrl.Split('|');
  23. Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
  24. Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
  25. Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
  26. foreach (string uploadUrl in uploadUrls)
  27. {
  28. ConnectionFactory factory = new ConnectionFactory();
  29. factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
  30. factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
  31. factory.Password = Constants.UploadPassword;//默认密码
  32. factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
  33. IConnection connection = factory.CreateConnection();
  34. IModel channel = connection.CreateModel();
  35. channel.QueueDeclare("secondaryPump.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
  36. IBasicProperties property = channel.CreateBasicProperties();
  37. property.ContentType = "text/plain";
  38. property.DeliveryMode = 2; //持久化
  39. connections.Add(uploadUrl, connection);
  40. properties.Add(uploadUrl, property);
  41. channels.Add(uploadUrl, channel);
  42. }
  43. if (channels.Count > 0)
  44. {
  45. SendSecondaryPumpHis(channels, properties);
  46. }
  47. foreach (KeyValuePair<string, IConnection> item in connections)
  48. {
  49. IConnection connection = item.Value;
  50. connection.Close();
  51. }
  52. }
  53. catch (Exception ex)
  54. {
  55. log.Debug(ex.Message);
  56. }
  57. }
  58. /// <summary>
  59. /// 二供历史数据
  60. /// </summary>
  61. /// <param name="channels"></param
  62. /// <param name="properties"></param>
  63. private void SendSecondaryPumpHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
  64. {
  65. try
  66. {
  67. log.Info("二供历史数据同步任务开始执行.................\r\n");
  68. string factorySql = "SELECT * FROM [dbo].[二供通用泵站] ORDER BY 日期, 时间";
  69. DataTable dtDevice = dbHelper.Fill(factorySql);
  70. string secondaryPumpColumn = Constants.SecondaryPumpColumn;
  71. if (string.IsNullOrEmpty(secondaryPumpColumn) || dtDevice == null || dtDevice.Rows.Count == 0)
  72. {
  73. return;
  74. }
  75. string newSecondaryToDMACode = Constants.SecondaryToDMACode;
  76. // 处理设备列表
  77. // for (int i = 0; i < dtDevice.Rows.Count; i++)
  78. // {
  79. //DataRow dr = dtDevice.Rows[i];
  80. string id = "2023/1/17 15:37";//dr["更新时间"].ToString();
  81. string deviceCode = "wwkj006";// dr["编码"].ToString();
  82. string PressureIn = "0";//dr["泵进口压力"].ToString();
  83. string PressureOut = "10.802";//dr["泵出口压力"].ToString();
  84. string PressureSet = "0";//dr["泵设定压力"].ToString();
  85. string InstantFlow = "0";//dr["瞬时流量"].ToString();
  86. string TotalFlow = "0";//dr["净累计流量"].ToString();
  87. string PositiveToTalFlow = "0";//dr["正累计流量"].ToString();
  88. string NegativeTotalFlow = "0";//dr["负累计流量"].ToString();
  89. string PH = "0";//dr["PH"].ToString();
  90. string Chlorine = "0";//dr["余氯"].ToString();
  91. string Turbidity = "0";//dr["浊度"].ToString();
  92. string LiquidHeight = "0";//dr["水箱液位"].ToString();
  93. string VoltageA = "0";//dr["电压AB"].ToString();
  94. string VoltageB = "0";//dr["电压AC"].ToString();
  95. string VoltageC = "0";//dr["电压BC"].ToString();
  96. string CurrentA = "0";//dr["电流A"].ToString();
  97. string CurrentB = "0";//dr["电流B"].ToString();
  98. string CurrentC = "0";//dr["电流C"].ToString();
  99. string Consumption = "0";//dr["用电量"].ToString();
  100. string LackWater ="0";//dr["缺水报警"].ToString();
  101. string OverPressure = "0";//dr["超压报警"].ToString();
  102. string HouseInlet = "1";//dr["进水报警"].ToString(); // 进水报警0为正常,1为进水。
  103. string TubeBurst = "0";//dr["爆管报警"].ToString();
  104. string NetState = "0";//dr["网络状态"].ToString(); // 0代表通讯正常,1代表网络故障,2代表现场485设备通讯故障
  105. string readTime = "2023/1/17 15:37:14";//dr["日期"].ToString().Trim() + " " + dr["时间"].ToString().Trim();
  106. Dictionary<string, object> deviceMap = new Dictionary<string, object>();
  107. deviceMap["DeviceCode"] = deviceCode;
  108. deviceMap["ReadTime"] = readTime;
  109. deviceMap["PressureIN"] = PressureIn;
  110. deviceMap["PressureOut"] = PressureOut;
  111. deviceMap["PressureSet"] = PressureSet;
  112. deviceMap["InstantFlow"] = InstantFlow;
  113. deviceMap["TotalFlow"] = TotalFlow;
  114. deviceMap["PositiveToTalFlow"] = PositiveToTalFlow;
  115. deviceMap["NegativeTotalFlow"] = NegativeTotalFlow;
  116. deviceMap["PH"] = PH;
  117. deviceMap["Chlorine"] = Chlorine;
  118. deviceMap["Turbidity"] = Turbidity;
  119. deviceMap["LiquidHeight"] = LiquidHeight;
  120. deviceMap["VoltageA"] = VoltageA;
  121. deviceMap["VoltageB"] = VoltageB;
  122. deviceMap["VoltageC"] = VoltageC;
  123. deviceMap["CurrentA"] = CurrentA;
  124. deviceMap["CurrentB"] = CurrentB;
  125. deviceMap["CurrentC"] = CurrentC;
  126. deviceMap["Consumption"] = Consumption;
  127. deviceMap["LackWater"] = LackWater;
  128. deviceMap["OverPressure"] = OverPressure;
  129. deviceMap["HouseInlet"] = HouseInlet;
  130. deviceMap["TubeBurst"] = TubeBurst;
  131. deviceMap["NetState"] = NetState;
  132. ArrayList meterDataList = new ArrayList();
  133. // 处理泵的数据
  134. for (int j = 1; j < 1; j++)
  135. {
  136. Dictionary<string, object> meterMap = new Dictionary<string, object>();
  137. string colPre = "";
  138. switch (j)
  139. {
  140. case 1:
  141. colPre = "一泵";
  142. break;
  143. case 2:
  144. colPre = "二泵";
  145. break;
  146. case 3:
  147. colPre = "三泵";
  148. break;
  149. }
  150. //string meterCode = "wwkj" + j.ToString().PadLeft(3, '0');
  151. //string frequency = dr[colPre + "频率"].ToString();
  152. //string current = dr[colPre + "电流"].ToString();
  153. //string runState = dr[colPre + "运行状态"].ToString();
  154. //string power = dr[colPre + "功率"].ToString();
  155. //string voltage = dr[colPre + "电压"].ToString();
  156. //meterMap["PumpCode"] = meterCode;
  157. //meterMap["ReadTime"] = readTime;
  158. //meterMap["Frequency"] = frequency;
  159. //meterMap["Current"] = current;
  160. //meterMap["RunState"] = runState;
  161. //meterMap["Power"] = power;
  162. //meterMap["Voltage"] = voltage;
  163. meterDataList.Add(meterMap);
  164. }
  165. //if (meterDataList.Count == 0)
  166. //{
  167. // continue;
  168. //}
  169. deviceMap["PumpData"] = meterDataList;
  170. string message = JsonConvert.SerializeObject(deviceMap);
  171. foreach (KeyValuePair<string, IModel> item in channels)
  172. {
  173. string key = item.Key;
  174. IModel channel = item.Value;
  175. IBasicProperties property = properties[key];
  176. channel.BasicPublish("secondaryPump.deviceHis", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
  177. }
  178. // 判断数据是否需要同步分区计量
  179. //if ("1".Equals(Constants.SecondaryToDMA))
  180. //{
  181. // string deviceCodeTemp = SecondaryToDMA(dr, channels, properties, newSecondaryToDMACode);
  182. // if ("".Equals(newSecondaryToDMACode))
  183. // {
  184. // newSecondaryToDMACode = deviceCodeTemp;
  185. // }
  186. // else
  187. // {
  188. // newSecondaryToDMACode = newSecondaryToDMACode + "," + deviceCodeTemp;
  189. // }
  190. //}
  191. // 删除数据
  192. string deleteSql = "DELETE FROM [dbo].[二供通用泵站] where 更新时间 = '" + id + "' and 编码 = '" + deviceCode + "'";
  193. dbHelper.ExecuteNonQuery(deleteSql);
  194. //}
  195. if ("1".Equals(Constants.SecondaryToDMA))
  196. {
  197. UpdateAppConfig("SecondaryToDMACode", newSecondaryToDMACode);
  198. }
  199. log.Info("二供历史记录同步任务执行结束.................\r\n");
  200. }
  201. catch (Exception ex)
  202. {
  203. log.Error("二供历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
  204. }
  205. }
  206. private string SecondaryToDMA(DataRow dr, Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties, string newSecondaryToDMACode)
  207. {
  208. string deviceName = dr["编号"].ToString();
  209. string deviceCode = dr["编码"].ToString() + "sc";
  210. #region 设备同步
  211. if (newSecondaryToDMACode != null && newSecondaryToDMACode.IndexOf(deviceCode) < 0)
  212. {
  213. StringBuilder messageDevice = new StringBuilder();
  214. messageDevice.Append("{");
  215. messageDevice.Append("\"meterAssessmentName\": \"").Append(deviceName).Append("\",");
  216. messageDevice.Append("\"isPressucre\": 1,");
  217. messageDevice.Append("\"isFlow\": 1,");
  218. messageDevice.Append("\"isZoneMeter\": 1,");
  219. messageDevice.Append("\"isTradeMeter\": 0,");
  220. messageDevice.Append("\"isLargeUser\": 0,");
  221. messageDevice.Append("\"isQuality\": 1,");
  222. messageDevice.Append("\"meterAssessmentCode\": \"").Append(deviceCode).Append("\",");
  223. messageDevice.Append("\"manufacturerCode\": \"").Append(Constants.ManufacturerCode).Append("\",");
  224. messageDevice.Append("\"meterTypeId\": \"2\"");
  225. messageDevice.Append("}");
  226. foreach (KeyValuePair<string, IModel> item in channels)
  227. {
  228. string key = item.Key;
  229. IModel channel = item.Value;
  230. IBasicProperties property = properties[key];
  231. channel.BasicPublish("zone.device", "", property, Encoding.UTF8.GetBytes(messageDevice.ToString())); //生产消息
  232. }
  233. }
  234. #endregion
  235. #region 历史数据同步
  236. String getDateTime = Convert.ToDateTime(dr["更新时间"]).ToString("yyyy-MM-dd HH:mm:ss");
  237. StringBuilder messageHis = new StringBuilder();
  238. messageHis.Append("{");
  239. messageHis.Append("\"meterAssessmentCode\": \"").Append(deviceCode).Append("\",");
  240. messageHis.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(",");
  241. messageHis.Append("\"getDateTime\": \"").Append(getDateTime).Append("\",");
  242. if (Convert.DBNull != dr["净累计流量"])
  243. {
  244. messageHis.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(dr["净累计流量"])).Append(",");
  245. }
  246. if (Convert.DBNull != dr["正累计流量"])
  247. {
  248. messageHis.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(dr["正累计流量"])).Append(",");
  249. }
  250. if (Convert.DBNull != dr["负累计流量"])
  251. {
  252. messageHis.Append("\"negativeCumulativeFlow\": ").Append(Convert.ToDecimal(dr["负累计流量"])).Append(",");
  253. }
  254. if (Convert.DBNull != dr["瞬时流量"])
  255. {
  256. messageHis.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(dr["瞬时流量"])).Append(",");
  257. }
  258. if (Convert.DBNull != dr["泵出口压力"])
  259. {
  260. messageHis.Append("\"pressure\": ").Append(Convert.ToDecimal(dr["泵出口压力"])).Append(",");
  261. }
  262. if (Convert.DBNull != dr["PH"])
  263. {
  264. messageHis.Append("\"ph\": ").Append(Convert.ToDecimal(dr["PH"])).Append(",");
  265. }
  266. if (Convert.DBNull != dr["余氯"])
  267. {
  268. messageHis.Append("\"chlorine\": ").Append(Convert.ToDecimal(dr["余氯"])).Append(",");
  269. }
  270. if (Convert.DBNull != dr["浊度"])
  271. {
  272. messageHis.Append("\"turbidity\": ").Append(Convert.ToDecimal(dr["浊度"])).Append(",");
  273. }
  274. messageHis.Append("}");
  275. foreach (KeyValuePair<string, IModel> item in channels)
  276. {
  277. string key = item.Key;
  278. IModel channel = item.Value;
  279. IBasicProperties property = properties[key];
  280. channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(messageHis.ToString())); //生产消息
  281. }
  282. #endregion
  283. return deviceCode;
  284. }
  285. /// <summary>
  286. /// 更新配置文件中的值
  287. /// </summary>
  288. /// <param name="key">键</param>
  289. /// <param name="value">值</param>
  290. private void UpdateAppConfig(String key, String value)
  291. {
  292. var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
  293. cfg.AppSettings.Settings[key].Value = value;
  294. cfg.Save();
  295. ConfigurationManager.RefreshSection("appSettings");
  296. }
  297. static IDbProvider dbHelper
  298. {
  299. get
  300. {
  301. var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion);
  302. return DbDefine;
  303. }
  304. }
  305. }
  306. }