123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 |
- using log4net;
- using Newtonsoft.Json;
- using Quartz;
- using RabbitMQ.Client;
- using RDIFramework.Utilities;
- using System;
- using System.Collections;
- using System.Collections.Generic;
- using System.Configuration;
- using System.Data;
- using System.Text;
- namespace TimedUpload.QuartzJobs
- {
- [DisallowConcurrentExecution]
- public class SecondaryPumpDataUploadJob : IJob
- {
- private readonly ILog log = LogManager.GetLogger(typeof(SecondaryPumpDataUploadJob));
- public void Execute(IJobExecutionContext context)
- {
- try
- {
- string[] uploadUrls = Constants.UploadUrl.Split('|');
- Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
- Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
- Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
- foreach (string uploadUrl in uploadUrls)
- {
- ConnectionFactory factory = new ConnectionFactory();
- factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
- factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
- factory.Password = Constants.UploadPassword;//默认密码
- factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
- IConnection connection = factory.CreateConnection();
- IModel channel = connection.CreateModel();
- channel.QueueDeclare("secondaryPump.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
- IBasicProperties property = channel.CreateBasicProperties();
- property.ContentType = "text/plain";
- property.DeliveryMode = 2; //持久化
- connections.Add(uploadUrl, connection);
- properties.Add(uploadUrl, property);
- channels.Add(uploadUrl, channel);
- }
- if (channels.Count > 0)
- {
- SendSecondaryPumpHis(channels, properties);
- }
- foreach (KeyValuePair<string, IConnection> item in connections)
- {
- IConnection connection = item.Value;
- connection.Close();
- }
- }
- catch (Exception ex)
- {
- log.Debug(ex.Message);
- }
- }
- /// <summary>
- /// 二供历史数据
- /// </summary>
- /// <param name="channels"></param
- /// <param name="properties"></param>
- private void SendSecondaryPumpHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
- {
- try
- {
- log.Info("二供历史数据同步任务开始执行.................\r\n");
- string factorySql = "SELECT * FROM [dbo].[二供通用泵站] ORDER BY 日期, 时间";
- DataTable dtDevice = dbHelper.Fill(factorySql);
- string secondaryPumpColumn = Constants.SecondaryPumpColumn;
- if (string.IsNullOrEmpty(secondaryPumpColumn) || dtDevice == null || dtDevice.Rows.Count == 0)
- {
- return;
- }
- string newSecondaryToDMACode = Constants.SecondaryToDMACode;
- // 处理设备列表
- #region
- // for (int i = 0; i < dtDevice.Rows.Count; i++)
- // {
- //DataRow dr = dtDevice.Rows[i];
- //string id = "2023/1/17 15:37";//dr["更新时间"].ToString();
- //string deviceCode = "wwkj006";// dr["编码"].ToString();
- //string PressureIn = "0";//dr["泵进口压力"].ToString();
- //string PressureOut = "10.802";//dr["泵出口压力"].ToString();
- //string PressureSet = "0";//dr["泵设定压力"].ToString();
- //string InstantFlow = "0";//dr["瞬时流量"].ToString();
- //string TotalFlow = "0";//dr["净累计流量"].ToString();
- //string PositiveToTalFlow = "0";//dr["正累计流量"].ToString();
- //string NegativeTotalFlow = "0";//dr["负累计流量"].ToString();
- //string PH = "0";//dr["PH"].ToString();
- //string Chlorine = "0";//dr["余氯"].ToString();
- //string Turbidity = "0";//dr["浊度"].ToString();
- //string LiquidHeight = "0";//dr["水箱液位"].ToString();
- //string VoltageA = "0";//dr["电压AB"].ToString();
- //string VoltageB = "0";//dr["电压AC"].ToString();
- //string VoltageC = "0";//dr["电压BC"].ToString();
- //string CurrentA = "0";//dr["电流A"].ToString();
- //string CurrentB = "0";//dr["电流B"].ToString();
- //string CurrentC = "0";//dr["电流C"].ToString();
- //string Consumption = "0";//dr["用电量"].ToString();
- //string LackWater ="0";//dr["缺水报警"].ToString();
- //string OverPressure = "0";//dr["超压报警"].ToString();
- //string HouseInlet = "1";//dr["进水报警"].ToString(); // 进水报警0为正常,1为进水。
- //string TubeBurst = "0";//dr["爆管报警"].ToString();
- //string NetState = "0";//dr["网络状态"].ToString(); // 0代表通讯正常,1代表网络故障,2代表现场485设备通讯故障
- //string readTime = "2023/1/17 15:37:14";//dr["日期"].ToString().Trim() + " " + dr["时间"].ToString().Trim();
- #endregion
- for (int i = 0; i < dtDevice.Rows.Count; i++)
- {
- DataRow dr = dtDevice.Rows[i];
- string id = dr["更新时间"].ToString();
- string deviceCode = dr["编码"].ToString();
- string PressureIn = dr["泵进口压力"] == DBNull.Value ? "0" : dr["泵进口压力"].ToString();
- string PressureOut = dr["泵出口压力"] == DBNull.Value ? "0" : dr["泵出口压力"].ToString();
- string PressureSet = dr["泵设定压力"] == DBNull.Value ? "0" : dr["泵设定压力"].ToString();
- string InstantFlow = dr["瞬时流量"] == DBNull.Value ? "0" : dr["瞬时流量"].ToString();
- string TotalFlow = dr["净累计流量"] == DBNull.Value ? "0" : dr["净累计流量"].ToString();
- string PositiveToTalFlow = dr["正累计流量"] == DBNull.Value ? "0" : dr["正累计流量"].ToString();
- string NegativeTotalFlow = dr["负累计流量"] == DBNull.Value ? "0" : dr["负累计流量"].ToString();
- string PH = dr["PH"] == DBNull.Value ? "0" : dr["PH"].ToString();
- string Chlorine = dr["余氯"] == DBNull.Value ? "0" : dr["余氯"].ToString();
- string Turbidity = dr["浊度"] == DBNull.Value ? "0" : dr["浊度"].ToString();
- string LiquidHeight = dr["水箱液位"] == DBNull.Value ? "0" : dr["水箱液位"].ToString();
- string VoltageA = dr["电压AB"] == DBNull.Value ? "0" : dr["电压AB"].ToString();
- string VoltageB = dr["电压AC"] == DBNull.Value ? "0" : dr["电压AC"].ToString();
- string VoltageC = dr["电压BC"] == DBNull.Value ? "0" : dr["电压BC"].ToString();
- string CurrentA = dr["电流A"] == DBNull.Value ? "0" : dr["电流A"].ToString();
- string CurrentB = dr["电流B"] == DBNull.Value ? "0" : dr["电流B"].ToString();
- string CurrentC = dr["电流C"] == DBNull.Value ? "0" : dr["电流C"].ToString();
- string Consumption = dr["用电量"] == DBNull.Value ? "0" : dr["用电量"].ToString();
- string LackWater = dr["缺水报警"] == DBNull.Value ? "0" : dr["缺水报警"].ToString();
- string OverPressure = dr["超压报警"] == DBNull.Value ? "0" : dr["超压报警"].ToString();
- string HouseInlet = dr["进水报警"] == DBNull.Value ? "0" : dr["进水报警"].ToString(); // 进水报警0为正常,1为进水。
- string TubeBurst = dr["爆管报警"] == DBNull.Value ? "0" : dr["爆管报警"].ToString();
- string NetState = dr["网络状态"] == DBNull.Value ? "0" : dr["网络状态"].ToString(); // 0代表通讯正常,1代表网络故障,2代表现场485设备通讯故障
- string readTime = dr["日期"].ToString().Trim() + " " + dr["时间"].ToString().Trim();
- Dictionary<string, object> deviceMap = new Dictionary<string, object>();
- deviceMap["DeviceCode"] = deviceCode;
- deviceMap["ReadTime"] = readTime;
- deviceMap["PressureIN"] = PressureIn;
- deviceMap["PressureOut"] = PressureOut;
- deviceMap["PressureSet"] = PressureSet;
- deviceMap["InstantFlow"] = InstantFlow;
- deviceMap["TotalFlow"] = TotalFlow;
- deviceMap["PositiveToTalFlow"] = PositiveToTalFlow;
- deviceMap["NegativeTotalFlow"] = NegativeTotalFlow;
- deviceMap["PH"] = PH;
- deviceMap["Chlorine"] = Chlorine;
- deviceMap["Turbidity"] = Turbidity;
- deviceMap["LiquidHeight"] = LiquidHeight;
- deviceMap["VoltageA"] = VoltageA;
- deviceMap["VoltageB"] = VoltageB;
- deviceMap["VoltageC"] = VoltageC;
- deviceMap["CurrentA"] = CurrentA;
- deviceMap["CurrentB"] = CurrentB;
- deviceMap["CurrentC"] = CurrentC;
- deviceMap["Consumption"] = Consumption;
- deviceMap["LackWater"] = LackWater;
- deviceMap["OverPressure"] = OverPressure;
- deviceMap["HouseInlet"] = HouseInlet;
- deviceMap["TubeBurst"] = TubeBurst;
- deviceMap["NetState"] = NetState;
- ArrayList meterDataList = new ArrayList();
- // 处理泵的数据
- for (int j = 1; j < 4; j++)
- {
- Dictionary<string, object> meterMap = new Dictionary<string, object>();
- string colPre = "";
- switch (j)
- {
- case 1:
- colPre = "一泵";
- break;
- case 2:
- colPre = "二泵";
- break;
- case 3:
- colPre = "三泵";
- break;
- }
- string meterCode = "wwkj" + j.ToString().PadLeft(3, '0');
- string frequency = dr[colPre + "频率"] == DBNull.Value ? "0" : dr[colPre + "频率"].ToString();
- string current = dr[colPre + "电流"] == DBNull.Value ? "0" : dr[colPre + "电流"].ToString();
- string runState = dr[colPre + "运行状态"] == DBNull.Value ? "0" : dr[colPre + "运行状态"].ToString();
- string power = dr[colPre + "功率"] == DBNull.Value ? "0" : dr[colPre + "功率"].ToString();
- string voltage = dr[colPre + "电压"] == DBNull.Value ? "0" : dr[colPre + "电压"].ToString();
- meterMap["PumpCode"] = meterCode;
- meterMap["ReadTime"] = readTime;
- meterMap["Frequency"] = frequency;
- meterMap["Current"] = current;
- meterMap["RunState"] = runState;
- meterMap["Power"] = power;
- meterMap["Voltage"] = voltage;
- meterDataList.Add(meterMap);
- }
- if (meterDataList.Count == 0)
- {
- continue;
- }
- deviceMap["PumpData"] = meterDataList;
- string message = JsonConvert.SerializeObject(deviceMap);
- foreach (KeyValuePair<string, IModel> item in channels)
- {
- string key = item.Key;
- IModel channel = item.Value;
- IBasicProperties property = properties[key];
- channel.BasicPublish("secondaryPump.deviceHis", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
- }
- // 判断数据是否需要同步分区计量
- if ("1".Equals(Constants.SecondaryToDMA))
- {
- string deviceCodeTemp = SecondaryToDMA(dr, channels, properties, newSecondaryToDMACode);
- if ("".Equals(newSecondaryToDMACode))
- {
- newSecondaryToDMACode = deviceCodeTemp;
- }
- else
- {
- newSecondaryToDMACode = newSecondaryToDMACode + "," + deviceCodeTemp;
- }
- }
- // 删除数据
- string deleteSql = "DELETE FROM [dbo].[二供通用泵站] where 更新时间 = '" + id + "' and 编码 = '" + deviceCode + "'";
- dbHelper.ExecuteNonQuery(deleteSql);
- }
- if ("1".Equals(Constants.SecondaryToDMA))
- {
- UpdateAppConfig("SecondaryToDMACode", newSecondaryToDMACode);
- }
- log.Info("二供历史记录同步任务执行结束.................\r\n");
- }
- catch (Exception ex)
- {
- log.Error("二供历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
- }
- }
- private string SecondaryToDMA(DataRow dr, Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties, string newSecondaryToDMACode)
- {
- string deviceName = dr["编号"].ToString();
- string deviceCode = dr["编码"].ToString() + "sc";
- #region 设备同步
- if (newSecondaryToDMACode != null && newSecondaryToDMACode.IndexOf(deviceCode) < 0)
- {
- StringBuilder messageDevice = new StringBuilder();
- messageDevice.Append("{");
- messageDevice.Append("\"meterAssessmentName\": \"").Append(deviceName).Append("\",");
- messageDevice.Append("\"isPressucre\": 1,");
- messageDevice.Append("\"isFlow\": 1,");
- messageDevice.Append("\"isZoneMeter\": 1,");
- messageDevice.Append("\"isTradeMeter\": 0,");
- messageDevice.Append("\"isLargeUser\": 0,");
- messageDevice.Append("\"isQuality\": 1,");
- messageDevice.Append("\"meterAssessmentCode\": \"").Append(deviceCode).Append("\",");
- messageDevice.Append("\"manufacturerCode\": \"").Append(Constants.ManufacturerCode).Append("\",");
- messageDevice.Append("\"meterTypeId\": \"2\"");
- messageDevice.Append("}");
- foreach (KeyValuePair<string, IModel> item in channels)
- {
- string key = item.Key;
- IModel channel = item.Value;
- IBasicProperties property = properties[key];
- channel.BasicPublish("zone.device", "", property, Encoding.UTF8.GetBytes(messageDevice.ToString())); //生产消息
- }
- }
- #endregion
- #region 历史数据同步
- String getDateTime = Convert.ToDateTime(dr["更新时间"]).ToString("yyyy-MM-dd HH:mm:ss");
- StringBuilder messageHis = new StringBuilder();
- messageHis.Append("{");
- messageHis.Append("\"meterAssessmentCode\": \"").Append(deviceCode).Append("\",");
- messageHis.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(",");
- messageHis.Append("\"getDateTime\": \"").Append(getDateTime).Append("\",");
- if (Convert.DBNull != dr["净累计流量"])
- {
- messageHis.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(dr["净累计流量"])).Append(",");
- }
- if (Convert.DBNull != dr["正累计流量"])
- {
- messageHis.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(dr["正累计流量"])).Append(",");
- }
- if (Convert.DBNull != dr["负累计流量"])
- {
- messageHis.Append("\"negativeCumulativeFlow\": ").Append(Convert.ToDecimal(dr["负累计流量"])).Append(",");
- }
- if (Convert.DBNull != dr["瞬时流量"])
- {
- messageHis.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(dr["瞬时流量"])).Append(",");
- }
- if (Convert.DBNull != dr["泵出口压力"])
- {
- messageHis.Append("\"pressure\": ").Append(Convert.ToDecimal(dr["泵出口压力"])).Append(",");
- }
- if (Convert.DBNull != dr["PH"])
- {
- messageHis.Append("\"ph\": ").Append(Convert.ToDecimal(dr["PH"])).Append(",");
- }
- if (Convert.DBNull != dr["余氯"])
- {
- messageHis.Append("\"chlorine\": ").Append(Convert.ToDecimal(dr["余氯"])).Append(",");
- }
- if (Convert.DBNull != dr["浊度"])
- {
- messageHis.Append("\"turbidity\": ").Append(Convert.ToDecimal(dr["浊度"])).Append(",");
- }
- messageHis.Append("}");
- foreach (KeyValuePair<string, IModel> item in channels)
- {
- string key = item.Key;
- IModel channel = item.Value;
- IBasicProperties property = properties[key];
- channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(messageHis.ToString())); //生产消息
- }
- #endregion
- return deviceCode;
- }
- /// <summary>
- /// 更新配置文件中的值
- /// </summary>
- /// <param name="key">键</param>
- /// <param name="value">值</param>
- private void UpdateAppConfig(String key, String value)
- {
- var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
- cfg.AppSettings.Settings[key].Value = value;
- cfg.Save();
- ConfigurationManager.RefreshSection("appSettings");
- }
- static IDbProvider dbHelper
- {
- get
- {
- var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion);
- return DbDefine;
- }
- }
- }
- }
|