ChangleSecondPumpDataJob.cs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. using log4net;
  2. using Newtonsoft.Json;
  3. using Quartz;
  4. using RabbitMQ.Client;
  5. using RDIFramework.Utilities;
  6. using System;
  7. using System.Collections;
  8. using System.Collections.Generic;
  9. using System.Configuration;
  10. using System.Data;
  11. using System.Linq;
  12. using System.Text;
  13. namespace TimedUpload.QuartzJobs
  14. {
  15. [DisallowConcurrentExecution]
  16. public class ChangleSecondPumpDataJob :IJob
  17. {
  18. private readonly ILog log = LogManager.GetLogger(typeof(SecondaryPumpDataUploadJob));
  19. public void Execute(IJobExecutionContext context)
  20. {
  21. try
  22. {
  23. string[] uploadUrls = Constants.UploadUrl.Split('|');
  24. Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
  25. Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
  26. Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
  27. foreach (string uploadUrl in uploadUrls)
  28. {
  29. ConnectionFactory factory = new ConnectionFactory();
  30. factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
  31. factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
  32. factory.Password = Constants.UploadPassword;//默认密码
  33. factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
  34. IConnection connection = factory.CreateConnection();
  35. IModel channel = connection.CreateModel();
  36. channel.QueueDeclare("secondaryPump.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
  37. IBasicProperties property = channel.CreateBasicProperties();
  38. property.ContentType = "text/plain";
  39. property.DeliveryMode = 2; //持久化
  40. connections.Add(uploadUrl, connection);
  41. properties.Add(uploadUrl, property);
  42. channels.Add(uploadUrl, channel);
  43. }
  44. if (channels.Count > 0)
  45. {
  46. SendSecondaryPumpHis(channels, properties);
  47. }
  48. foreach (KeyValuePair<string, IConnection> item in connections)
  49. {
  50. IConnection connection = item.Value;
  51. connection.Close();
  52. }
  53. }
  54. catch (Exception ex)
  55. {
  56. log.Debug(ex.Message);
  57. }
  58. }
  59. /// <summary>
  60. /// 二供历史数据
  61. /// </summary>
  62. /// <param name="channels"></param
  63. /// <param name="properties"></param>
  64. private void SendSecondaryPumpHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
  65. {
  66. try
  67. {
  68. log.Info("二供历史数据同步任务开始执行.................\r\n");
  69. string factorySql = "SELECT * FROM [updateList20230412]";
  70. DataTable dtDevice = dbHelper.Fill(factorySql);
  71. string secondaryPumpColumn = Constants.SecondaryPumpColumn;
  72. if (string.IsNullOrEmpty(secondaryPumpColumn) || dtDevice == null || dtDevice.Rows.Count == 0)
  73. {
  74. return;
  75. }
  76. // 处理设备列表
  77. for (int k = 0; k < dtDevice.Rows.Count; k++)
  78. {
  79. string table = "历史记录_" + dtDevice.Rows[k]["devId"].ToString().PadLeft(6,'0') + "_" + DateTime.Now.Year.ToString();
  80. String sql = @"SELECT TOP 2000 [id]
  81. ,[设备ID],[记录时间],[采集时间]
  82. ,[设备状态],[通讯状态],[数据来源]
  83. ,[表1A相电压],[表1B相电压],[表1C相电压]
  84. ,[表1A相电流],[表1B相电流],[表1C相电流]
  85. ,[表1电能],[表2电能]
  86. ,[表2A相电压],[表2B相电压],[表2C相电压]
  87. ,[表2A相电流],[表2B相电流],[表2C相电流]
  88. ,[表3A相电压],[表3B相电压],[表3C相电压]
  89. ,[表3A相电流],[表3B相电流] ,[表3C相电流]
  90. ,[表3电能],[信号质量],[门开关]
  91. ,[泵3状态],[泵2状态],[泵1状态]
  92. ,[断电监测]
  93. ,[一号泵有功功率],[一号泵频率]
  94. ,[二号泵有功功率],[二号泵频率]
  95. ,[三号泵有功功率],[三号泵频率]
  96. ,[出水设定压力],[出水端实际压力]
  97. ,[进水设定压力],[进水端实际压力]
  98. ,[瞬时流量],[累计流量]
  99. FROM " + table + " where 采集时间 > '" + Convert.ToDateTime(dtDevice.Rows[k]["uploadTime"]).ToString("yyyy-MM-dd HH:mm:ss") + "' ORDER BY 采集时间";
  100. DataTable dtHistory = dbHelper.Fill(sql);
  101. for (int i = 0; i < dtHistory.Rows.Count; i++)
  102. {
  103. #region
  104. DataRow dr = dtHistory.Rows[i];
  105. string id = dr["记录时间"].ToString();
  106. string deviceCode = dtDevice.Rows[k]["code"].ToString();//dr["编码"].ToString();
  107. string PressureIn = dr["进水端实际压力"].ToString();
  108. string PressureOut = dr["出水端实际压力"].ToString();
  109. string PressureSet = dr["出水设定压力"].ToString();
  110. string InstantFlow = dr["瞬时流量"].ToString();
  111. string TotalFlow = dr["累计流量"].ToString();
  112. string PositiveToTalFlow = "0";//dr["正累计流量"].ToString();
  113. string NegativeTotalFlow = "0";// dr["负累计流量"].ToString();
  114. string PH = "0";// dr["PH"].ToString();
  115. string Chlorine = "0";//dr["余氯"].ToString();
  116. string Turbidity = "0";//dr["浊度"].ToString();
  117. string LiquidHeight = "0";//dr["水箱液位"].ToString();
  118. string VoltageA = dr["表1A相电压"].ToString();
  119. string VoltageB = dr["表1B相电压"].ToString();
  120. string VoltageC = dr["表1C相电压"].ToString();
  121. string CurrentA = dr["表1A相电流"].ToString();
  122. string CurrentB = dr["表1B相电流"].ToString();
  123. string CurrentC = dr["表1C相电流"].ToString();
  124. string Consumption = (Convert.ToInt32(dr["表1电能"]) + Convert.ToInt32(dr["表2电能"]) + Convert.ToInt32(dr["表3电能"])).ToString();
  125. string LackWater = "0";//dr["缺水报警"].ToString();
  126. string OverPressure = "0";//dr["超压报警"].ToString();
  127. string HouseInlet = "0";//dr["进水报警"].ToString(); // 进水报警0为正常,1为进水。
  128. string TubeBurst = "0";//dr["爆管报警"].ToString();
  129. string NetState = "0";//dr["网络状态"].ToString(); // 0代表通讯正常,1代表网络故障,2代表现场485设备通讯故障
  130. string readTime = Convert.ToDateTime(dr["采集时间"]).ToString("yyyy/MM/dd HH:mm:ss");
  131. Dictionary<string, object> deviceMap = new Dictionary<string, object>();
  132. deviceMap["DeviceCode"] = deviceCode;
  133. deviceMap["ReadTime"] = readTime;
  134. deviceMap["PressureIN"] = PressureIn;
  135. deviceMap["PressureOut"] = PressureOut;
  136. deviceMap["PressureSet"] = PressureSet;
  137. deviceMap["InstantFlow"] = InstantFlow;
  138. deviceMap["TotalFlow"] = TotalFlow;
  139. deviceMap["PositiveToTalFlow"] = PositiveToTalFlow;
  140. deviceMap["NegativeTotalFlow"] = NegativeTotalFlow;
  141. deviceMap["PH"] = PH;
  142. deviceMap["Chlorine"] = Chlorine;
  143. deviceMap["Turbidity"] = Turbidity;
  144. deviceMap["LiquidHeight"] = LiquidHeight;
  145. deviceMap["VoltageA"] = VoltageA;
  146. deviceMap["VoltageB"] = VoltageB;
  147. deviceMap["VoltageC"] = VoltageC;
  148. deviceMap["CurrentA"] = CurrentA;
  149. deviceMap["CurrentB"] = CurrentB;
  150. deviceMap["CurrentC"] = CurrentC;
  151. deviceMap["Consumption"] = Consumption;
  152. deviceMap["LackWater"] = LackWater;
  153. deviceMap["OverPressure"] = OverPressure;
  154. deviceMap["HouseInlet"] = HouseInlet;
  155. deviceMap["TubeBurst"] = TubeBurst;
  156. deviceMap["NetState"] = NetState;
  157. ArrayList meterDataList = new ArrayList();
  158. // 处理泵的数据
  159. for (int j = 1; j < 4; j++)
  160. {
  161. Dictionary<string, object> meterMap = new Dictionary<string, object>();
  162. string colPre = "";
  163. string state = "";
  164. string colPrePower = "";
  165. string colPreCurrent = "";
  166. switch (j)
  167. {
  168. case 1:
  169. colPre = "一号泵";
  170. state = "泵1状态";
  171. colPreCurrent = "表1A相";
  172. colPrePower = "表1";
  173. break;
  174. case 2:
  175. colPre = "二号泵";
  176. state = "泵2状态";
  177. colPreCurrent = "表2A相";
  178. colPrePower = "表2";
  179. break;
  180. case 3:
  181. colPre = "三号泵";
  182. state = "泵3状态";
  183. colPreCurrent = "表3A相";
  184. colPrePower = "表3";
  185. break;
  186. }
  187. string meterCode = "wwkj" + j.ToString().PadLeft(3, '0');
  188. string frequency = dr[colPre + "频率"].ToString();
  189. string current = dr[colPreCurrent + "电流"].ToString();
  190. string runState = dr[state].ToString();
  191. string power = dr[colPrePower + "电能"].ToString();
  192. string voltage = dr[colPreCurrent + "电压"].ToString();
  193. meterMap["PumpCode"] = meterCode;
  194. meterMap["ReadTime"] = readTime;
  195. meterMap["Frequency"] = frequency;
  196. meterMap["Current"] = current;
  197. meterMap["RunState"] = runState;
  198. meterMap["Power"] = power;
  199. meterMap["Voltage"] = voltage;
  200. meterDataList.Add(meterMap);
  201. }
  202. if (meterDataList.Count == 0)
  203. {
  204. continue;
  205. }
  206. deviceMap["PumpData"] = meterDataList;
  207. string message = JsonConvert.SerializeObject(deviceMap);
  208. // log.Info(message);
  209. foreach (KeyValuePair<string, IModel> item in channels)
  210. {
  211. string key = item.Key;
  212. // log.Info("secondaryPump.deviceHis | " + key);
  213. IModel channel = item.Value;
  214. IBasicProperties property = properties[key];
  215. channel.BasicPublish("secondaryPump.deviceHis", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
  216. // log.Info("______________________________________");
  217. }
  218. #endregion
  219. string sqlUpdate = "UPDATE updateList20230412 set uploadTime = '" + readTime + "' where id = " + dtDevice.Rows[k]["id"].ToString();
  220. dbHelper.ExecuteNonQuery(sqlUpdate) ;
  221. }
  222. }
  223. log.Info("二供历史记录同步任务执行结束.................\r\n");
  224. }
  225. catch (Exception ex)
  226. {
  227. log.Error("二供历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
  228. }
  229. }
  230. private string SecondaryToDMA(DataRow dr, Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties, string newSecondaryToDMACode)
  231. {
  232. string deviceName = dr["编号"].ToString();
  233. string deviceCode = dr["编码"].ToString() + "sc";
  234. #region 设备同步
  235. if (newSecondaryToDMACode != null && newSecondaryToDMACode.IndexOf(deviceCode) < 0)
  236. {
  237. StringBuilder messageDevice = new StringBuilder();
  238. messageDevice.Append("{");
  239. messageDevice.Append("\"meterAssessmentName\": \"").Append(deviceName).Append("\",");
  240. messageDevice.Append("\"isPressucre\": 1,");
  241. messageDevice.Append("\"isFlow\": 1,");
  242. messageDevice.Append("\"isZoneMeter\": 1,");
  243. messageDevice.Append("\"isTradeMeter\": 0,");
  244. messageDevice.Append("\"isLargeUser\": 0,");
  245. messageDevice.Append("\"isQuality\": 1,");
  246. messageDevice.Append("\"meterAssessmentCode\": \"").Append(deviceCode).Append("\",");
  247. messageDevice.Append("\"manufacturerCode\": \"").Append(Constants.ManufacturerCode).Append("\",");
  248. messageDevice.Append("\"meterTypeId\": \"2\"");
  249. messageDevice.Append("}");
  250. foreach (KeyValuePair<string, IModel> item in channels)
  251. {
  252. string key = item.Key;
  253. IModel channel = item.Value;
  254. IBasicProperties property = properties[key];
  255. channel.BasicPublish("zone.device", "", property, Encoding.UTF8.GetBytes(messageDevice.ToString())); //生产消息
  256. }
  257. }
  258. #endregion
  259. #region 历史数据同步
  260. String getDateTime = Convert.ToDateTime(dr["更新时间"]).ToString("yyyy-MM-dd HH:mm:ss");
  261. StringBuilder messageHis = new StringBuilder();
  262. messageHis.Append("{");
  263. messageHis.Append("\"meterAssessmentCode\": \"").Append(deviceCode).Append("\",");
  264. messageHis.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(",");
  265. messageHis.Append("\"getDateTime\": \"").Append(getDateTime).Append("\",");
  266. if (Convert.DBNull != dr["净累计流量"])
  267. {
  268. messageHis.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(dr["净累计流量"])).Append(",");
  269. }
  270. if (Convert.DBNull != dr["正累计流量"])
  271. {
  272. messageHis.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(dr["正累计流量"])).Append(",");
  273. }
  274. if (Convert.DBNull != dr["负累计流量"])
  275. {
  276. messageHis.Append("\"negativeCumulativeFlow\": ").Append(Convert.ToDecimal(dr["负累计流量"])).Append(",");
  277. }
  278. if (Convert.DBNull != dr["瞬时流量"])
  279. {
  280. messageHis.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(dr["瞬时流量"])).Append(",");
  281. }
  282. if (Convert.DBNull != dr["泵出口压力"])
  283. {
  284. messageHis.Append("\"pressure\": ").Append(Convert.ToDecimal(dr["泵出口压力"])).Append(",");
  285. }
  286. if (Convert.DBNull != dr["PH"])
  287. {
  288. messageHis.Append("\"ph\": ").Append(Convert.ToDecimal(dr["PH"])).Append(",");
  289. }
  290. if (Convert.DBNull != dr["余氯"])
  291. {
  292. messageHis.Append("\"chlorine\": ").Append(Convert.ToDecimal(dr["余氯"])).Append(",");
  293. }
  294. if (Convert.DBNull != dr["浊度"])
  295. {
  296. messageHis.Append("\"turbidity\": ").Append(Convert.ToDecimal(dr["浊度"])).Append(",");
  297. }
  298. messageHis.Append("}");
  299. foreach (KeyValuePair<string, IModel> item in channels)
  300. {
  301. string key = item.Key;
  302. IModel channel = item.Value;
  303. IBasicProperties property = properties[key];
  304. channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(messageHis.ToString())); //生产消息
  305. }
  306. #endregion
  307. return deviceCode;
  308. }
  309. /// <summary>
  310. /// 更新配置文件中的值
  311. /// </summary>
  312. /// <param name="key">键</param>
  313. /// <param name="value">值</param>
  314. private void UpdateAppConfig(String key, String value)
  315. {
  316. var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
  317. cfg.AppSettings.Settings[key].Value = value;
  318. cfg.Save();
  319. ConfigurationManager.RefreshSection("appSettings");
  320. }
  321. static IDbProvider dbHelper
  322. {
  323. get
  324. {
  325. var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion);
  326. return DbDefine;
  327. }
  328. }
  329. }
  330. }