using log4net; using Newtonsoft.Json; using Quartz; using RabbitMQ.Client; using RDIFramework.Utilities; using System; using System.Collections; using System.Collections.Generic; using System.Configuration; using System.Data; using System.Text; namespace TimedUpload.QuartzJobs { [DisallowConcurrentExecution] public class SecondaryPumpDataUploadJob : IJob { private readonly ILog log = LogManager.GetLogger(typeof(SecondaryPumpDataUploadJob)); public void Execute(IJobExecutionContext context) { try { string[] uploadUrls = Constants.UploadUrl.Split('|'); Dictionary connections = new Dictionary(); Dictionary channels = new Dictionary(); Dictionary properties = new Dictionary(); foreach (string uploadUrl in uploadUrls) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。 factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行 factory.Password = Constants.UploadPassword;//默认密码 factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连 IConnection connection = factory.CreateConnection(); IModel channel = connection.CreateModel(); channel.QueueDeclare("secondaryPump.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列 IBasicProperties property = channel.CreateBasicProperties(); property.ContentType = "text/plain"; property.DeliveryMode = 2; //持久化 connections.Add(uploadUrl, connection); properties.Add(uploadUrl, property); channels.Add(uploadUrl, channel); } if (channels.Count > 0) { SendSecondaryPumpHis(channels, properties); } foreach (KeyValuePair item in connections) { IConnection connection = item.Value; connection.Close(); } } catch (Exception ex) { log.Debug(ex.Message); } } /// /// 二供历史数据 /// /// private void SendSecondaryPumpHis(Dictionary channels, Dictionary properties) { try { log.Info("二供历史数据同步任务开始执行.................\r\n"); string factorySql = "SELECT * FROM [dbo].[二供通用泵站] ORDER BY 日期, 时间"; DataTable dtDevice = dbHelper.Fill(factorySql); string secondaryPumpColumn = Constants.SecondaryPumpColumn; if (string.IsNullOrEmpty(secondaryPumpColumn) || dtDevice == null || dtDevice.Rows.Count == 0) { return; } string newSecondaryToDMACode = Constants.SecondaryToDMACode; // 处理设备列表 #region // for (int i = 0; i < dtDevice.Rows.Count; i++) // { //DataRow dr = dtDevice.Rows[i]; //string id = "2023/1/17 15:37";//dr["更新时间"].ToString(); //string deviceCode = "wwkj006";// dr["编码"].ToString(); //string PressureIn = "0";//dr["泵进口压力"].ToString(); //string PressureOut = "10.802";//dr["泵出口压力"].ToString(); //string PressureSet = "0";//dr["泵设定压力"].ToString(); //string InstantFlow = "0";//dr["瞬时流量"].ToString(); //string TotalFlow = "0";//dr["净累计流量"].ToString(); //string PositiveToTalFlow = "0";//dr["正累计流量"].ToString(); //string NegativeTotalFlow = "0";//dr["负累计流量"].ToString(); //string PH = "0";//dr["PH"].ToString(); //string Chlorine = "0";//dr["余氯"].ToString(); //string Turbidity = "0";//dr["浊度"].ToString(); //string LiquidHeight = "0";//dr["水箱液位"].ToString(); //string VoltageA = "0";//dr["电压AB"].ToString(); //string VoltageB = "0";//dr["电压AC"].ToString(); //string VoltageC = "0";//dr["电压BC"].ToString(); //string CurrentA = "0";//dr["电流A"].ToString(); //string CurrentB = "0";//dr["电流B"].ToString(); //string CurrentC = "0";//dr["电流C"].ToString(); //string Consumption = "0";//dr["用电量"].ToString(); //string LackWater ="0";//dr["缺水报警"].ToString(); //string OverPressure = "0";//dr["超压报警"].ToString(); //string HouseInlet = "1";//dr["进水报警"].ToString(); // 进水报警0为正常,1为进水。 //string TubeBurst = "0";//dr["爆管报警"].ToString(); //string NetState = "0";//dr["网络状态"].ToString(); // 0代表通讯正常,1代表网络故障,2代表现场485设备通讯故障 //string readTime = "2023/1/17 15:37:14";//dr["日期"].ToString().Trim() + " " + dr["时间"].ToString().Trim(); #endregion for (int i = 0; i < dtDevice.Rows.Count; i++) { DataRow dr = dtDevice.Rows[i]; string id = dr["更新时间"].ToString(); string deviceCode = dr["编码"].ToString(); string PressureIn = dr["泵进口压力"] == DBNull.Value ? "0" : dr["泵进口压力"].ToString(); string PressureOut = dr["泵出口压力"] == DBNull.Value ? "0" : dr["泵出口压力"].ToString(); string PressureSet = dr["泵设定压力"] == DBNull.Value ? "0" : dr["泵设定压力"].ToString(); string InstantFlow = dr["瞬时流量"] == DBNull.Value ? "0" : dr["瞬时流量"].ToString(); string TotalFlow = dr["净累计流量"] == DBNull.Value ? "0" : dr["净累计流量"].ToString(); string PositiveToTalFlow = dr["正累计流量"] == DBNull.Value ? "0" : dr["正累计流量"].ToString(); string NegativeTotalFlow = dr["负累计流量"] == DBNull.Value ? "0" : dr["负累计流量"].ToString(); string PH = dr["PH"] == DBNull.Value ? "0" : dr["PH"].ToString(); string Chlorine = dr["余氯"] == DBNull.Value ? "0" : dr["余氯"].ToString(); string Turbidity = dr["浊度"] == DBNull.Value ? "0" : dr["浊度"].ToString(); string LiquidHeight = dr["水箱液位"] == DBNull.Value ? "0" : dr["水箱液位"].ToString(); string VoltageA = dr["电压AB"] == DBNull.Value ? "0" : dr["电压AB"].ToString(); string VoltageB = dr["电压AC"] == DBNull.Value ? "0" : dr["电压AC"].ToString(); string VoltageC = dr["电压BC"] == DBNull.Value ? "0" : dr["电压BC"].ToString(); string CurrentA = dr["电流A"] == DBNull.Value ? "0" : dr["电流A"].ToString(); string CurrentB = dr["电流B"] == DBNull.Value ? "0" : dr["电流B"].ToString(); string CurrentC = dr["电流C"] == DBNull.Value ? "0" : dr["电流C"].ToString(); string Consumption = dr["用电量"] == DBNull.Value ? "0" : dr["用电量"].ToString(); string LackWater = dr["缺水报警"] == DBNull.Value ? "0" : dr["缺水报警"].ToString(); string OverPressure = dr["超压报警"] == DBNull.Value ? "0" : dr["超压报警"].ToString(); string HouseInlet = dr["进水报警"] == DBNull.Value ? "0" : dr["进水报警"].ToString(); // 进水报警0为正常,1为进水。 string TubeBurst = dr["爆管报警"] == DBNull.Value ? "0" : dr["爆管报警"].ToString(); string NetState = dr["网络状态"] == DBNull.Value ? "0" : dr["网络状态"].ToString(); // 0代表通讯正常,1代表网络故障,2代表现场485设备通讯故障 string readTime = dr["日期"].ToString().Trim() + " " + dr["时间"].ToString().Trim(); Dictionary deviceMap = new Dictionary(); deviceMap["DeviceCode"] = deviceCode; deviceMap["ReadTime"] = readTime; deviceMap["PressureIN"] = PressureIn; deviceMap["PressureOut"] = PressureOut; deviceMap["PressureSet"] = PressureSet; deviceMap["InstantFlow"] = InstantFlow; deviceMap["TotalFlow"] = TotalFlow; deviceMap["PositiveToTalFlow"] = PositiveToTalFlow; deviceMap["NegativeTotalFlow"] = NegativeTotalFlow; deviceMap["PH"] = PH; deviceMap["Chlorine"] = Chlorine; deviceMap["Turbidity"] = Turbidity; deviceMap["LiquidHeight"] = LiquidHeight; deviceMap["VoltageA"] = VoltageA; deviceMap["VoltageB"] = VoltageB; deviceMap["VoltageC"] = VoltageC; deviceMap["CurrentA"] = CurrentA; deviceMap["CurrentB"] = CurrentB; deviceMap["CurrentC"] = CurrentC; deviceMap["Consumption"] = Consumption; deviceMap["LackWater"] = LackWater; deviceMap["OverPressure"] = OverPressure; deviceMap["HouseInlet"] = HouseInlet; deviceMap["TubeBurst"] = TubeBurst; deviceMap["NetState"] = NetState; ArrayList meterDataList = new ArrayList(); // 处理泵的数据 for (int j = 1; j < 4; j++) { Dictionary meterMap = new Dictionary(); string colPre = ""; switch (j) { case 1: colPre = "一泵"; break; case 2: colPre = "二泵"; break; case 3: colPre = "三泵"; break; } string meterCode = "wwkj" + j.ToString().PadLeft(3, '0'); string frequency = dr[colPre + "频率"] == DBNull.Value ? "0" : dr[colPre + "频率"].ToString(); string current = dr[colPre + "电流"] == DBNull.Value ? "0" : dr[colPre + "电流"].ToString(); string runState = dr[colPre + "运行状态"] == DBNull.Value ? "0" : dr[colPre + "运行状态"].ToString(); string power = dr[colPre + "功率"] == DBNull.Value ? "0" : dr[colPre + "功率"].ToString(); string voltage = dr[colPre + "电压"] == DBNull.Value ? "0" : dr[colPre + "电压"].ToString(); meterMap["PumpCode"] = meterCode; meterMap["ReadTime"] = readTime; meterMap["Frequency"] = frequency; meterMap["Current"] = current; meterMap["RunState"] = runState; meterMap["Power"] = power; meterMap["Voltage"] = voltage; meterDataList.Add(meterMap); } if (meterDataList.Count == 0) { continue; } deviceMap["PumpData"] = meterDataList; string message = JsonConvert.SerializeObject(deviceMap); foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("secondaryPump.deviceHis", "", property, Encoding.UTF8.GetBytes(message)); //生产消息 } // 判断数据是否需要同步分区计量 if ("1".Equals(Constants.SecondaryToDMA)) { string deviceCodeTemp = SecondaryToDMA(dr, channels, properties, newSecondaryToDMACode); if ("".Equals(newSecondaryToDMACode)) { newSecondaryToDMACode = deviceCodeTemp; } else { newSecondaryToDMACode = newSecondaryToDMACode + "," + deviceCodeTemp; } } // 删除数据 string deleteSql = "DELETE FROM [dbo].[二供通用泵站] where 更新时间 = '" + id + "' and 编码 = '" + deviceCode + "'"; dbHelper.ExecuteNonQuery(deleteSql); } if ("1".Equals(Constants.SecondaryToDMA)) { UpdateAppConfig("SecondaryToDMACode", newSecondaryToDMACode); } log.Info("二供历史记录同步任务执行结束.................\r\n"); } catch (Exception ex) { log.Error("二供历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n"); } } private string SecondaryToDMA(DataRow dr, Dictionary channels, Dictionary properties, string newSecondaryToDMACode) { string deviceName = dr["编号"].ToString(); string deviceCode = dr["编码"].ToString() + "sc"; #region 设备同步 if (newSecondaryToDMACode != null && newSecondaryToDMACode.IndexOf(deviceCode) < 0) { StringBuilder messageDevice = new StringBuilder(); messageDevice.Append("{"); messageDevice.Append("\"meterAssessmentName\": \"").Append(deviceName).Append("\","); messageDevice.Append("\"isPressucre\": 1,"); messageDevice.Append("\"isFlow\": 1,"); messageDevice.Append("\"isZoneMeter\": 1,"); messageDevice.Append("\"isTradeMeter\": 0,"); messageDevice.Append("\"isLargeUser\": 0,"); messageDevice.Append("\"isQuality\": 1,"); messageDevice.Append("\"meterAssessmentCode\": \"").Append(deviceCode).Append("\","); messageDevice.Append("\"manufacturerCode\": \"").Append(Constants.ManufacturerCode).Append("\","); messageDevice.Append("\"meterTypeId\": \"2\""); messageDevice.Append("}"); foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("zone.device", "", property, Encoding.UTF8.GetBytes(messageDevice.ToString())); //生产消息 } } #endregion #region 历史数据同步 String getDateTime = Convert.ToDateTime(dr["更新时间"]).ToString("yyyy-MM-dd HH:mm:ss"); StringBuilder messageHis = new StringBuilder(); messageHis.Append("{"); messageHis.Append("\"meterAssessmentCode\": \"").Append(deviceCode).Append("\","); messageHis.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(","); messageHis.Append("\"getDateTime\": \"").Append(getDateTime).Append("\","); if (Convert.DBNull != dr["净累计流量"]) { messageHis.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(dr["净累计流量"])).Append(","); } if (Convert.DBNull != dr["正累计流量"]) { messageHis.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(dr["正累计流量"])).Append(","); } if (Convert.DBNull != dr["负累计流量"]) { messageHis.Append("\"negativeCumulativeFlow\": ").Append(Convert.ToDecimal(dr["负累计流量"])).Append(","); } if (Convert.DBNull != dr["瞬时流量"]) { messageHis.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(dr["瞬时流量"])).Append(","); } if (Convert.DBNull != dr["泵出口压力"]) { messageHis.Append("\"pressure\": ").Append(Convert.ToDecimal(dr["泵出口压力"])).Append(","); } if (Convert.DBNull != dr["PH"]) { messageHis.Append("\"ph\": ").Append(Convert.ToDecimal(dr["PH"])).Append(","); } if (Convert.DBNull != dr["余氯"]) { messageHis.Append("\"chlorine\": ").Append(Convert.ToDecimal(dr["余氯"])).Append(","); } if (Convert.DBNull != dr["浊度"]) { messageHis.Append("\"turbidity\": ").Append(Convert.ToDecimal(dr["浊度"])).Append(","); } messageHis.Append("}"); foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(messageHis.ToString())); //生产消息 } #endregion return deviceCode; } /// /// 更新配置文件中的值 /// /// 键 /// 值 private void UpdateAppConfig(String key, String value) { var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None); cfg.AppSettings.Settings[key].Value = value; cfg.Save(); ConfigurationManager.RefreshSection("appSettings"); } static IDbProvider dbHelper { get { var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion); return DbDefine; } } } }