|
@@ -0,0 +1,364 @@
|
|
|
+using log4net;
|
|
|
+using Newtonsoft.Json;
|
|
|
+using Quartz;
|
|
|
+using RabbitMQ.Client;
|
|
|
+using RDIFramework.Utilities;
|
|
|
+using System;
|
|
|
+using System.Collections;
|
|
|
+using System.Collections.Generic;
|
|
|
+using System.Configuration;
|
|
|
+using System.Data;
|
|
|
+using System.Linq;
|
|
|
+using System.Text;
|
|
|
+
|
|
|
+namespace TimedUpload.QuartzJobs
|
|
|
+{
|
|
|
+ [DisallowConcurrentExecution]
|
|
|
+ public class ChangleSecondPumpDataJob :IJob
|
|
|
+ {
|
|
|
+ private readonly ILog log = LogManager.GetLogger(typeof(SecondaryPumpDataUploadJob));
|
|
|
+
|
|
|
+ public void Execute(IJobExecutionContext context)
|
|
|
+ {
|
|
|
+
|
|
|
+ try
|
|
|
+ {
|
|
|
+ string[] uploadUrls = Constants.UploadUrl.Split('|');
|
|
|
+ Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
|
|
|
+ Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
|
|
|
+ Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
|
|
|
+
|
|
|
+ foreach (string uploadUrl in uploadUrls)
|
|
|
+ {
|
|
|
+ ConnectionFactory factory = new ConnectionFactory();
|
|
|
+ factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
|
|
|
+ factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
|
|
|
+ factory.Password = Constants.UploadPassword;//默认密码
|
|
|
+
|
|
|
+ factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
|
|
|
+
|
|
|
+ IConnection connection = factory.CreateConnection();
|
|
|
+ IModel channel = connection.CreateModel();
|
|
|
+ channel.QueueDeclare("secondaryPump.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
|
|
|
+
|
|
|
+ IBasicProperties property = channel.CreateBasicProperties();
|
|
|
+ property.ContentType = "text/plain";
|
|
|
+ property.DeliveryMode = 2; //持久化
|
|
|
+ connections.Add(uploadUrl, connection);
|
|
|
+ properties.Add(uploadUrl, property);
|
|
|
+ channels.Add(uploadUrl, channel);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (channels.Count > 0)
|
|
|
+ {
|
|
|
+ SendSecondaryPumpHis(channels, properties);
|
|
|
+ }
|
|
|
+
|
|
|
+ foreach (KeyValuePair<string, IConnection> item in connections)
|
|
|
+ {
|
|
|
+ IConnection connection = item.Value;
|
|
|
+ connection.Close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+
|
|
|
+ log.Debug(ex.Message);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 二供历史数据
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="channels"></param
|
|
|
+ /// <param name="properties"></param>
|
|
|
+ private void SendSecondaryPumpHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ log.Info("二供历史数据同步任务开始执行.................\r\n");
|
|
|
+ string factorySql = "SELECT * FROM [updateList20230412]";
|
|
|
+ DataTable dtDevice = dbHelper.Fill(factorySql);
|
|
|
+ string secondaryPumpColumn = Constants.SecondaryPumpColumn;
|
|
|
+ if (string.IsNullOrEmpty(secondaryPumpColumn) || dtDevice == null || dtDevice.Rows.Count == 0)
|
|
|
+ {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 处理设备列表
|
|
|
+
|
|
|
+ for (int k = 0; k < dtDevice.Rows.Count; k++)
|
|
|
+ {
|
|
|
+ string table = "历史记录_" + dtDevice.Rows[k]["devId"].ToString().PadLeft(6,'0')+DateTime.Now.Year.ToString();
|
|
|
+ String sql = @"SELECT TOP 2000 [id]
|
|
|
+ ,[设备ID],[记录时间],[采集时间]
|
|
|
+ ,[设备状态],[通讯状态],[数据来源]
|
|
|
+ ,[表1A相电压],[表1B相电压],[表1C相电压]
|
|
|
+ ,[表1A相电流],[表1B相电流],[表1C相电流]
|
|
|
+ ,[表1电能]
|
|
|
+ ,[表2A相电压],[表2B相电压],[表2C相电压]
|
|
|
+ ,[表2A相电流],[表2B相电流],[表2C相电流]
|
|
|
+ ,[表3A相电压],[表3B相电压],[表3C相电压]
|
|
|
+ ,[表3A相电流],[表3B相电流] ,[表3C相电流]
|
|
|
+ ,[表3电能],[信号质量],[门开关]
|
|
|
+ ,[泵3状态],[泵2状态],[泵1状态]
|
|
|
+ ,[断电监测]
|
|
|
+ ,[一号泵有功功率],[一号泵频率]
|
|
|
+ ,[二号泵有功功率],[二号泵频率]
|
|
|
+ ,[三号泵有功功率],[三号泵频率]
|
|
|
+ ,[出水设定压力],[出水端实际压力]
|
|
|
+ ,[进水设定压力],[进水端实际压力]
|
|
|
+ ,[瞬时流量],[累计流量]
|
|
|
+ FROM "+ table + " where 采集时间 > '" + Convert.ToDateTime(dtDevice.Rows[k]["uploadTime"]).ToString("yyyy-MM-dd HH:mm:ss") + "' ORDER BY 采集时间";
|
|
|
+ DataTable dtHistory = dbHelper.Fill(sql);
|
|
|
+ for (int i = 0; i < dtHistory.Rows.Count; i++)
|
|
|
+ {
|
|
|
+ #region
|
|
|
+ DataRow dr = dtHistory.Rows[i];
|
|
|
+ string id = dr["记录时间"].ToString();
|
|
|
+ string deviceCode = dtDevice.Rows[k]["code"].ToString();//dr["编码"].ToString();
|
|
|
+ string PressureIn = dr["进水端实际压力"].ToString();
|
|
|
+ string PressureOut = dr["出水端实际压力"].ToString();
|
|
|
+ string PressureSet = dr["出水设定压力"].ToString();
|
|
|
+ string InstantFlow = dr["瞬时流量"].ToString();
|
|
|
+ string TotalFlow = dr["累计流量"].ToString();
|
|
|
+ string PositiveToTalFlow = "0";//dr["正累计流量"].ToString();
|
|
|
+ string NegativeTotalFlow = "0";// dr["负累计流量"].ToString();
|
|
|
+ string PH = "0";// dr["PH"].ToString();
|
|
|
+ string Chlorine = "0";//dr["余氯"].ToString();
|
|
|
+ string Turbidity = "0";//dr["浊度"].ToString();
|
|
|
+ string LiquidHeight = "0";//dr["水箱液位"].ToString();
|
|
|
+ string VoltageA = dr["表1A相电压"].ToString();
|
|
|
+ string VoltageB = dr["表1B相电压"].ToString();
|
|
|
+ string VoltageC = dr["表1C相电压"].ToString();
|
|
|
+ string CurrentA = dr["表1A相电流"].ToString();
|
|
|
+ string CurrentB = dr["表1B相电流"].ToString();
|
|
|
+ string CurrentC = dr["表1C相电流"].ToString();
|
|
|
+ string Consumption = dr["表1电能"].ToString();
|
|
|
+ string LackWater = "0";//dr["缺水报警"].ToString();
|
|
|
+ string OverPressure = "0";//dr["超压报警"].ToString();
|
|
|
+ string HouseInlet = "0";//dr["进水报警"].ToString(); // 进水报警0为正常,1为进水。
|
|
|
+ string TubeBurst = "0";//dr["爆管报警"].ToString();
|
|
|
+ string NetState = "0";//dr["网络状态"].ToString(); // 0代表通讯正常,1代表网络故障,2代表现场485设备通讯故障
|
|
|
+
|
|
|
+ string readTime = Convert.ToDateTime(dr["采集时间"]).ToString("yyyy/MM/dd HH:mm:ss");
|
|
|
+
|
|
|
+ Dictionary<string, object> deviceMap = new Dictionary<string, object>();
|
|
|
+ deviceMap["DeviceCode"] = deviceCode;
|
|
|
+ deviceMap["ReadTime"] = readTime;
|
|
|
+ deviceMap["PressureIN"] = PressureIn;
|
|
|
+ deviceMap["PressureOut"] = PressureOut;
|
|
|
+ deviceMap["PressureSet"] = PressureSet;
|
|
|
+ deviceMap["InstantFlow"] = InstantFlow;
|
|
|
+ deviceMap["TotalFlow"] = TotalFlow;
|
|
|
+ deviceMap["PositiveToTalFlow"] = PositiveToTalFlow;
|
|
|
+ deviceMap["NegativeTotalFlow"] = NegativeTotalFlow;
|
|
|
+ deviceMap["PH"] = PH;
|
|
|
+ deviceMap["Chlorine"] = Chlorine;
|
|
|
+ deviceMap["Turbidity"] = Turbidity;
|
|
|
+ deviceMap["LiquidHeight"] = LiquidHeight;
|
|
|
+ deviceMap["VoltageA"] = VoltageA;
|
|
|
+ deviceMap["VoltageB"] = VoltageB;
|
|
|
+ deviceMap["VoltageC"] = VoltageC;
|
|
|
+ deviceMap["CurrentA"] = CurrentA;
|
|
|
+ deviceMap["CurrentB"] = CurrentB;
|
|
|
+ deviceMap["CurrentC"] = CurrentC;
|
|
|
+ deviceMap["Consumption"] = Consumption;
|
|
|
+ deviceMap["LackWater"] = LackWater;
|
|
|
+ deviceMap["OverPressure"] = OverPressure;
|
|
|
+ deviceMap["HouseInlet"] = HouseInlet;
|
|
|
+ deviceMap["TubeBurst"] = TubeBurst;
|
|
|
+ deviceMap["NetState"] = NetState;
|
|
|
+ ArrayList meterDataList = new ArrayList();
|
|
|
+ // 处理泵的数据
|
|
|
+ for (int j = 1; j < 4; j++)
|
|
|
+ {
|
|
|
+ Dictionary<string, object> meterMap = new Dictionary<string, object>();
|
|
|
+ string colPre = "";
|
|
|
+ string state = "";
|
|
|
+ string colPrePower = "";
|
|
|
+ switch (j)
|
|
|
+ {
|
|
|
+ case 1:
|
|
|
+ colPre = "一号泵";
|
|
|
+ state = "泵1状态";
|
|
|
+ colPrePower = "表1A相";
|
|
|
+ break;
|
|
|
+ case 2:
|
|
|
+ colPre = "二号泵";
|
|
|
+ state = "泵2状态";
|
|
|
+ colPrePower = "表2A相";
|
|
|
+ break;
|
|
|
+ case 3:
|
|
|
+ colPre = "三号泵";
|
|
|
+ state = "泵3状态";
|
|
|
+ colPrePower = "表3A相";
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ string meterCode = "wwkj" + j.ToString().PadLeft(3, '0');
|
|
|
+ string frequency = dr[colPre + "频率"].ToString();
|
|
|
+ string current = dr[colPrePower + "电流"].ToString();
|
|
|
+ string runState = dr[state].ToString();
|
|
|
+ string power = dr[colPre + "有功功率"].ToString();
|
|
|
+ string voltage = dr[colPrePower + "电压"].ToString();
|
|
|
+ meterMap["PumpCode"] = meterCode;
|
|
|
+ meterMap["ReadTime"] = readTime;
|
|
|
+ meterMap["Frequency"] = frequency;
|
|
|
+ meterMap["Current"] = current;
|
|
|
+ meterMap["RunState"] = runState;
|
|
|
+ meterMap["Power"] = power;
|
|
|
+ meterMap["Voltage"] = voltage;
|
|
|
+
|
|
|
+ meterDataList.Add(meterMap);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (meterDataList.Count == 0)
|
|
|
+ {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ deviceMap["PumpData"] = meterDataList;
|
|
|
+
|
|
|
+
|
|
|
+ string message = JsonConvert.SerializeObject(deviceMap);
|
|
|
+ // log.Info(message);
|
|
|
+ foreach (KeyValuePair<string, IModel> item in channels)
|
|
|
+ {
|
|
|
+
|
|
|
+ string key = item.Key;
|
|
|
+ // log.Info("secondaryPump.deviceHis | " + key);
|
|
|
+ IModel channel = item.Value;
|
|
|
+ IBasicProperties property = properties[key];
|
|
|
+
|
|
|
+ channel.BasicPublish("secondaryPump.deviceHis", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
|
|
|
+ // log.Info("______________________________________");
|
|
|
+ }
|
|
|
+ #endregion
|
|
|
+ string sqlUpdate = "UPDATE updateList20230412 set uploadTime = '" + readTime + "' where id = " + dtDevice.Rows[k]["id"].ToString();
|
|
|
+ dbHelper.ExecuteNonQuery(sqlUpdate) ;
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ log.Info("二供历史记录同步任务执行结束.................\r\n");
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ log.Error("二供历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private string SecondaryToDMA(DataRow dr, Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties, string newSecondaryToDMACode)
|
|
|
+ {
|
|
|
+ string deviceName = dr["编号"].ToString();
|
|
|
+
|
|
|
+ string deviceCode = dr["编码"].ToString() + "sc";
|
|
|
+
|
|
|
+ #region 设备同步
|
|
|
+ if (newSecondaryToDMACode != null && newSecondaryToDMACode.IndexOf(deviceCode) < 0)
|
|
|
+ {
|
|
|
+ StringBuilder messageDevice = new StringBuilder();
|
|
|
+ messageDevice.Append("{");
|
|
|
+ messageDevice.Append("\"meterAssessmentName\": \"").Append(deviceName).Append("\",");
|
|
|
+ messageDevice.Append("\"isPressucre\": 1,");
|
|
|
+ messageDevice.Append("\"isFlow\": 1,");
|
|
|
+ messageDevice.Append("\"isZoneMeter\": 1,");
|
|
|
+ messageDevice.Append("\"isTradeMeter\": 0,");
|
|
|
+ messageDevice.Append("\"isLargeUser\": 0,");
|
|
|
+ messageDevice.Append("\"isQuality\": 1,");
|
|
|
+ messageDevice.Append("\"meterAssessmentCode\": \"").Append(deviceCode).Append("\",");
|
|
|
+ messageDevice.Append("\"manufacturerCode\": \"").Append(Constants.ManufacturerCode).Append("\",");
|
|
|
+ messageDevice.Append("\"meterTypeId\": \"2\"");
|
|
|
+ messageDevice.Append("}");
|
|
|
+ foreach (KeyValuePair<string, IModel> item in channels)
|
|
|
+ {
|
|
|
+ string key = item.Key;
|
|
|
+ IModel channel = item.Value;
|
|
|
+ IBasicProperties property = properties[key];
|
|
|
+ channel.BasicPublish("zone.device", "", property, Encoding.UTF8.GetBytes(messageDevice.ToString())); //生产消息
|
|
|
+ }
|
|
|
+ }
|
|
|
+ #endregion
|
|
|
+
|
|
|
+ #region 历史数据同步
|
|
|
+ String getDateTime = Convert.ToDateTime(dr["更新时间"]).ToString("yyyy-MM-dd HH:mm:ss");
|
|
|
+
|
|
|
+ StringBuilder messageHis = new StringBuilder();
|
|
|
+ messageHis.Append("{");
|
|
|
+ messageHis.Append("\"meterAssessmentCode\": \"").Append(deviceCode).Append("\",");
|
|
|
+ messageHis.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(",");
|
|
|
+ messageHis.Append("\"getDateTime\": \"").Append(getDateTime).Append("\",");
|
|
|
+ if (Convert.DBNull != dr["净累计流量"])
|
|
|
+ {
|
|
|
+ messageHis.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(dr["净累计流量"])).Append(",");
|
|
|
+ }
|
|
|
+ if (Convert.DBNull != dr["正累计流量"])
|
|
|
+ {
|
|
|
+ messageHis.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(dr["正累计流量"])).Append(",");
|
|
|
+ }
|
|
|
+ if (Convert.DBNull != dr["负累计流量"])
|
|
|
+ {
|
|
|
+ messageHis.Append("\"negativeCumulativeFlow\": ").Append(Convert.ToDecimal(dr["负累计流量"])).Append(",");
|
|
|
+ }
|
|
|
+ if (Convert.DBNull != dr["瞬时流量"])
|
|
|
+ {
|
|
|
+ messageHis.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(dr["瞬时流量"])).Append(",");
|
|
|
+ }
|
|
|
+ if (Convert.DBNull != dr["泵出口压力"])
|
|
|
+ {
|
|
|
+ messageHis.Append("\"pressure\": ").Append(Convert.ToDecimal(dr["泵出口压力"])).Append(",");
|
|
|
+ }
|
|
|
+ if (Convert.DBNull != dr["PH"])
|
|
|
+ {
|
|
|
+ messageHis.Append("\"ph\": ").Append(Convert.ToDecimal(dr["PH"])).Append(",");
|
|
|
+ }
|
|
|
+ if (Convert.DBNull != dr["余氯"])
|
|
|
+ {
|
|
|
+ messageHis.Append("\"chlorine\": ").Append(Convert.ToDecimal(dr["余氯"])).Append(",");
|
|
|
+ }
|
|
|
+ if (Convert.DBNull != dr["浊度"])
|
|
|
+ {
|
|
|
+ messageHis.Append("\"turbidity\": ").Append(Convert.ToDecimal(dr["浊度"])).Append(",");
|
|
|
+ }
|
|
|
+ messageHis.Append("}");
|
|
|
+
|
|
|
+ foreach (KeyValuePair<string, IModel> item in channels)
|
|
|
+ {
|
|
|
+ string key = item.Key;
|
|
|
+ IModel channel = item.Value;
|
|
|
+ IBasicProperties property = properties[key];
|
|
|
+ channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(messageHis.ToString())); //生产消息
|
|
|
+ }
|
|
|
+ #endregion
|
|
|
+
|
|
|
+ return deviceCode;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 更新配置文件中的值
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="key">键</param>
|
|
|
+ /// <param name="value">值</param>
|
|
|
+ private void UpdateAppConfig(String key, String value)
|
|
|
+ {
|
|
|
+ var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
|
|
|
+ cfg.AppSettings.Settings[key].Value = value;
|
|
|
+ cfg.Save();
|
|
|
+ ConfigurationManager.RefreshSection("appSettings");
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ static IDbProvider dbHelper
|
|
|
+ {
|
|
|
+ get
|
|
|
+ {
|
|
|
+ var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion);
|
|
|
+ return DbDefine;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|