using log4net; using Newtonsoft.Json; using Quartz; using RabbitMQ.Client; using RDIFramework.Utilities; using System; using System.Collections; using System.Collections.Generic; using System.Configuration; using System.Data; using System.Linq; using System.Text; namespace TimedUpload.QuartzJobs { [DisallowConcurrentExecution] public class ChangleSecondPumpDataJob :IJob { private readonly ILog log = LogManager.GetLogger(typeof(SecondaryPumpDataUploadJob)); public void Execute(IJobExecutionContext context) { try { string[] uploadUrls = Constants.UploadUrl.Split('|'); Dictionary connections = new Dictionary(); Dictionary channels = new Dictionary(); Dictionary properties = new Dictionary(); foreach (string uploadUrl in uploadUrls) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。 factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行 factory.Password = Constants.UploadPassword;//默认密码 factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连 IConnection connection = factory.CreateConnection(); IModel channel = connection.CreateModel(); channel.QueueDeclare("secondaryPump.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列 IBasicProperties property = channel.CreateBasicProperties(); property.ContentType = "text/plain"; property.DeliveryMode = 2; //持久化 connections.Add(uploadUrl, connection); properties.Add(uploadUrl, property); channels.Add(uploadUrl, channel); } if (channels.Count > 0) { SendSecondaryPumpHis(channels, properties); } foreach (KeyValuePair item in connections) { IConnection connection = item.Value; connection.Close(); } } catch (Exception ex) { log.Debug(ex.Message); } } /// /// 二供历史数据 /// /// private void SendSecondaryPumpHis(Dictionary channels, Dictionary properties) { try { log.Info("二供历史数据同步任务开始执行.................\r\n"); string factorySql = "SELECT * FROM [updateList20230412]"; DataTable dtDevice = dbHelper.Fill(factorySql); string secondaryPumpColumn = Constants.SecondaryPumpColumn; if (string.IsNullOrEmpty(secondaryPumpColumn) || dtDevice == null || dtDevice.Rows.Count == 0) { return; } // 处理设备列表 for (int k = 0; k < dtDevice.Rows.Count; k++) { string table = "历史记录_" + dtDevice.Rows[k]["devId"].ToString().PadLeft(6,'0') + "_" + DateTime.Now.Year.ToString(); String sql = @"SELECT TOP 2000 [id] ,[设备ID],[记录时间],[采集时间] ,[设备状态],[通讯状态],[数据来源] ,[表1A相电压],[表1B相电压],[表1C相电压] ,[表1A相电流],[表1B相电流],[表1C相电流] ,[表1电能],[表2电能] ,[表2A相电压],[表2B相电压],[表2C相电压] ,[表2A相电流],[表2B相电流],[表2C相电流] ,[表3A相电压],[表3B相电压],[表3C相电压] ,[表3A相电流],[表3B相电流] ,[表3C相电流] ,[表3电能],[信号质量],[门开关] ,[泵3状态],[泵2状态],[泵1状态] ,[断电监测] ,[一号泵有功功率],[一号泵频率] ,[二号泵有功功率],[二号泵频率] ,[三号泵有功功率],[三号泵频率] ,[出水设定压力],[出水端实际压力] ,[进水设定压力],[进水端实际压力] ,[瞬时流量],[累计流量] FROM " + table + " where 采集时间 > '" + Convert.ToDateTime(dtDevice.Rows[k]["uploadTime"]).ToString("yyyy-MM-dd HH:mm:ss") + "' ORDER BY 采集时间"; DataTable dtHistory = dbHelper.Fill(sql); for (int i = 0; i < dtHistory.Rows.Count; i++) { #region DataRow dr = dtHistory.Rows[i]; string id = dr["记录时间"].ToString(); string deviceCode = dtDevice.Rows[k]["code"].ToString();//dr["编码"].ToString(); string PressureIn = dr["进水端实际压力"].ToString(); string PressureOut = dr["出水端实际压力"].ToString(); string PressureSet = dr["出水设定压力"].ToString(); string InstantFlow = dr["瞬时流量"].ToString(); string TotalFlow = dr["累计流量"].ToString(); string PositiveToTalFlow = "0";//dr["正累计流量"].ToString(); string NegativeTotalFlow = "0";// dr["负累计流量"].ToString(); string PH = "0";// dr["PH"].ToString(); string Chlorine = "0";//dr["余氯"].ToString(); string Turbidity = "0";//dr["浊度"].ToString(); string LiquidHeight = "0";//dr["水箱液位"].ToString(); string VoltageA = dr["表1A相电压"].ToString(); string VoltageB = dr["表1B相电压"].ToString(); string VoltageC = dr["表1C相电压"].ToString(); string CurrentA = dr["表1A相电流"].ToString(); string CurrentB = dr["表1B相电流"].ToString(); string CurrentC = dr["表1C相电流"].ToString(); string Consumption = (Convert.ToInt32(dr["表1电能"]) + Convert.ToInt32(dr["表2电能"]) + Convert.ToInt32(dr["表3电能"])).ToString(); string LackWater = "0";//dr["缺水报警"].ToString(); string OverPressure = "0";//dr["超压报警"].ToString(); string HouseInlet = "0";//dr["进水报警"].ToString(); // 进水报警0为正常,1为进水。 string TubeBurst = "0";//dr["爆管报警"].ToString(); string NetState = "0";//dr["网络状态"].ToString(); // 0代表通讯正常,1代表网络故障,2代表现场485设备通讯故障 string readTime = Convert.ToDateTime(dr["采集时间"]).ToString("yyyy/MM/dd HH:mm:ss"); Dictionary deviceMap = new Dictionary(); deviceMap["DeviceCode"] = deviceCode; deviceMap["ReadTime"] = readTime; deviceMap["PressureIN"] = PressureIn; deviceMap["PressureOut"] = PressureOut; deviceMap["PressureSet"] = PressureSet; deviceMap["InstantFlow"] = InstantFlow; deviceMap["TotalFlow"] = TotalFlow; deviceMap["PositiveToTalFlow"] = PositiveToTalFlow; deviceMap["NegativeTotalFlow"] = NegativeTotalFlow; deviceMap["PH"] = PH; deviceMap["Chlorine"] = Chlorine; deviceMap["Turbidity"] = Turbidity; deviceMap["LiquidHeight"] = LiquidHeight; deviceMap["VoltageA"] = VoltageA; deviceMap["VoltageB"] = VoltageB; deviceMap["VoltageC"] = VoltageC; deviceMap["CurrentA"] = CurrentA; deviceMap["CurrentB"] = CurrentB; deviceMap["CurrentC"] = CurrentC; deviceMap["Consumption"] = Consumption; deviceMap["LackWater"] = LackWater; deviceMap["OverPressure"] = OverPressure; deviceMap["HouseInlet"] = HouseInlet; deviceMap["TubeBurst"] = TubeBurst; deviceMap["NetState"] = NetState; ArrayList meterDataList = new ArrayList(); // 处理泵的数据 for (int j = 1; j < 4; j++) { Dictionary meterMap = new Dictionary(); string colPre = ""; string state = ""; string colPrePower = ""; string colPreCurrent = ""; switch (j) { case 1: colPre = "一号泵"; state = "泵1状态"; colPreCurrent = "表1A相"; colPrePower = "表1"; break; case 2: colPre = "二号泵"; state = "泵2状态"; colPreCurrent = "表2A相"; colPrePower = "表2"; break; case 3: colPre = "三号泵"; state = "泵3状态"; colPreCurrent = "表3A相"; colPrePower = "表3"; break; } string meterCode = "wwkj" + j.ToString().PadLeft(3, '0'); string frequency = dr[colPre + "频率"].ToString(); string current = dr[colPreCurrent + "电流"].ToString(); string runState = dr[state].ToString(); string power = dr[colPrePower + "电能"].ToString(); string voltage = dr[colPreCurrent + "电压"].ToString(); meterMap["PumpCode"] = meterCode; meterMap["ReadTime"] = readTime; meterMap["Frequency"] = frequency; meterMap["Current"] = current; meterMap["RunState"] = runState; meterMap["Power"] = power; meterMap["Voltage"] = voltage; meterDataList.Add(meterMap); } if (meterDataList.Count == 0) { continue; } deviceMap["PumpData"] = meterDataList; string message = JsonConvert.SerializeObject(deviceMap); // log.Info(message); foreach (KeyValuePair item in channels) { string key = item.Key; // log.Info("secondaryPump.deviceHis | " + key); IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("secondaryPump.deviceHis", "", property, Encoding.UTF8.GetBytes(message)); //生产消息 // log.Info("______________________________________"); } #endregion string sqlUpdate = "UPDATE updateList20230412 set uploadTime = '" + readTime + "' where id = " + dtDevice.Rows[k]["id"].ToString(); dbHelper.ExecuteNonQuery(sqlUpdate) ; } } log.Info("二供历史记录同步任务执行结束.................\r\n"); } catch (Exception ex) { log.Error("二供历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n"); } } private string SecondaryToDMA(DataRow dr, Dictionary channels, Dictionary properties, string newSecondaryToDMACode) { string deviceName = dr["编号"].ToString(); string deviceCode = dr["编码"].ToString() + "sc"; #region 设备同步 if (newSecondaryToDMACode != null && newSecondaryToDMACode.IndexOf(deviceCode) < 0) { StringBuilder messageDevice = new StringBuilder(); messageDevice.Append("{"); messageDevice.Append("\"meterAssessmentName\": \"").Append(deviceName).Append("\","); messageDevice.Append("\"isPressucre\": 1,"); messageDevice.Append("\"isFlow\": 1,"); messageDevice.Append("\"isZoneMeter\": 1,"); messageDevice.Append("\"isTradeMeter\": 0,"); messageDevice.Append("\"isLargeUser\": 0,"); messageDevice.Append("\"isQuality\": 1,"); messageDevice.Append("\"meterAssessmentCode\": \"").Append(deviceCode).Append("\","); messageDevice.Append("\"manufacturerCode\": \"").Append(Constants.ManufacturerCode).Append("\","); messageDevice.Append("\"meterTypeId\": \"2\""); messageDevice.Append("}"); foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("zone.device", "", property, Encoding.UTF8.GetBytes(messageDevice.ToString())); //生产消息 } } #endregion #region 历史数据同步 String getDateTime = Convert.ToDateTime(dr["更新时间"]).ToString("yyyy-MM-dd HH:mm:ss"); StringBuilder messageHis = new StringBuilder(); messageHis.Append("{"); messageHis.Append("\"meterAssessmentCode\": \"").Append(deviceCode).Append("\","); messageHis.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(","); messageHis.Append("\"getDateTime\": \"").Append(getDateTime).Append("\","); if (Convert.DBNull != dr["净累计流量"]) { messageHis.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(dr["净累计流量"])).Append(","); } if (Convert.DBNull != dr["正累计流量"]) { messageHis.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(dr["正累计流量"])).Append(","); } if (Convert.DBNull != dr["负累计流量"]) { messageHis.Append("\"negativeCumulativeFlow\": ").Append(Convert.ToDecimal(dr["负累计流量"])).Append(","); } if (Convert.DBNull != dr["瞬时流量"]) { messageHis.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(dr["瞬时流量"])).Append(","); } if (Convert.DBNull != dr["泵出口压力"]) { messageHis.Append("\"pressure\": ").Append(Convert.ToDecimal(dr["泵出口压力"])).Append(","); } if (Convert.DBNull != dr["PH"]) { messageHis.Append("\"ph\": ").Append(Convert.ToDecimal(dr["PH"])).Append(","); } if (Convert.DBNull != dr["余氯"]) { messageHis.Append("\"chlorine\": ").Append(Convert.ToDecimal(dr["余氯"])).Append(","); } if (Convert.DBNull != dr["浊度"]) { messageHis.Append("\"turbidity\": ").Append(Convert.ToDecimal(dr["浊度"])).Append(","); } messageHis.Append("}"); foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(messageHis.ToString())); //生产消息 } #endregion return deviceCode; } /// /// 更新配置文件中的值 /// /// 键 /// 值 private void UpdateAppConfig(String key, String value) { var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None); cfg.AppSettings.Settings[key].Value = value; cfg.Save(); ConfigurationManager.RefreshSection("appSettings"); } static IDbProvider dbHelper { get { var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion); return DbDefine; } } } }