using log4net; using Newtonsoft.Json; using Quartz; using RabbitMQ.Client; using RDIFramework.Utilities; using System; using System.Collections.Generic; using System.Data; using System.Linq; using System.Text; namespace TimedUpload.QuartzJobs { [DisallowConcurrentExecution] public class ChangleWorkmanshipDataUploadJob : IJob { private readonly ILog log = LogManager.GetLogger(typeof(WorkmanshipDataUploadJob)); public void Execute(IJobExecutionContext context) { string[] uploadUrls = Constants.UploadUrl.Split('|'); Dictionary connections = new Dictionary(); Dictionary channels = new Dictionary(); Dictionary properties = new Dictionary(); foreach (string uploadUrl in uploadUrls) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。 factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行 factory.Password = Constants.UploadPassword;//默认密码 factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连 IConnection connection = factory.CreateConnection(); IModel channel = connection.CreateModel(); channel.QueueDeclare("workmanship", true, false, false, null);//创建一个名称为kibaqueue的消息队列 IBasicProperties property = channel.CreateBasicProperties(); property.ContentType = "text/plain"; property.DeliveryMode = 2; //持久化 connections.Add(uploadUrl, connection); properties.Add(uploadUrl, property); channels.Add(uploadUrl, channel); } if (channels.Count > 0) { SendSeconddaryPumpData(channels, properties); //SendWaterWellData(channels, properties); SendWaterFactoryData(channels, properties); } foreach (KeyValuePair item in connections) { IConnection connection = item.Value; connection.Close(); } } /// /// 二供数据 /// /// private void SendSeconddaryPumpData(Dictionary channels, Dictionary properties) { try { log.Info("二供工艺图数据同步任务开始执行.................\r\n"); DateTime newTime = DateTime.Now; String yearStr = newTime.Year.ToString(); #region 获取水质 string ph = "0", chlorine = "0", turbidity = "0"; string waterQulitySql = @"SELECT TOP 2 [RecordName],[RecordValue] FROM [dbo].[历史记录] where DevId = 3 order by GetDateTime DESC"; DataTable waterQulityDt = waterQulityDbHelper.Fill(waterQulitySql); if (waterQulityDt != null && waterQulityDt.Rows.Count > 0) { for (int i = 0; i < waterQulityDt.Rows.Count; i++) { DataRow row = waterQulityDt.Rows[i]; if (row["RecordName"].ToString().Trim() == "余氯") { chlorine = row["RecordValue"].ToString(); } if (row["RecordName"].ToString().Trim() == "浊度") { turbidity = row["RecordValue"].ToString(); } } } #endregion string factorySql = @"SELECT top 1 1 rid, 1 Type,CONVERT(varchar(100),采集时间,111) 日期,CONVERT(varchar(100),采集时间,108) 时间 , CONVERT(varchar(100),采集时间,111) + ' ' + CONVERT(varchar(100),采集时间,108) ReadTime,CONVERT(varchar(100),采集时间,111) + ' ' + CONVERT(varchar(5),采集时间,108) 更新时间, " + ph + " PH," + chlorine + " 余氯, " + turbidity + @" 浊度,13.9 温度,'南流泉泵站' 编号,'second001' 编码,'second001' DeviceCode,0 水箱液位,0 爆管报警,'' 电源故障, '' 真空报警,瞬时流量,'' 硬件超压,'' 系统电压,0 缺水报警,0 网络状态,0 超压报警,'' 软件超压,0 进水报警, 累计流量 净累计流量,'' 正累计流量,'' 负累计流量, 0 一段压力设定,0 三段压力设定,0 二段压力设定 ,[表1A相电压] 电压AB ,[表1B相电压] 电压AC ,[表1C相电压] 电压BC ,[表1A相电流] + 表2A相电流 + 表3A相电流 电流A ,[表1B相电流] + 表2B相电流 + 表3B相电流 电流B ,[表1C相电流] + 表2C相电流 + 表3C相电流 电流C ,[表1电能] + 表2电能 + 表3电能 用电量 ,[表1A相电压] 一泵电压,'' 一泵故障 ,[表1A相电流] 一泵电流 ,[表2A相电压] 二泵电压,'' 二泵故障 ,[表2A相电流] 二泵电流 ,[表3A相电压] 三泵电压,'' 二泵故障 ,[表3A相电流] 三泵电流 ,[泵3状态] 三泵运行状态 ,[泵2状态] 二泵运行状态 ,[泵1状态] 一泵运行状态 ,[一号泵有功功率] 一泵功率 ,[一号泵频率] 一泵频率 ,[二号泵有功功率] 二泵功率 ,[二号泵频率] 二泵频率 ,[三号泵有功功率] 三泵功率 ,[三号泵频率] 三泵频率 ,[出水设定压力] 泵设定压力 ,[出水端实际压力] 泵出口压力 ,[进水端实际压力] 泵进口压力 FROM [dbo].[历史记录_000015_" + yearStr + "] order by id DESC"; DataTable dtDevice = secondDbHelper.Fill(factorySql); if (dtDevice == null || dtDevice.Rows.Count == 0) { return; } DataColumnCollection cols = dtDevice.Columns; // 处理设备列表 for (int i = 0; i < dtDevice.Rows.Count; i++) { DataRow dr = dtDevice.Rows[i]; string deviceCode = dr["DeviceCode"].ToString(); string type = dr["Type"].ToString(); string realData = CreateJsonParameters(cols, dr); Dictionary deviceMap = new Dictionary(); deviceMap["DeviceCode"] = deviceCode; deviceMap["Type"] = type; deviceMap["RealData"] = realData; string message = JsonConvert.SerializeObject(deviceMap); foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("workmanship", "", property, Encoding.UTF8.GetBytes(message)); //生产消息 } } log.Info("二供工艺图数据同步任务开始执行.................\r\n"); } catch (Exception ex) { log.Error("二供工艺图数据同步任务错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n"); } } /// /// 水厂数据 /// /// private void SendWaterFactoryData(Dictionary channels, Dictionary properties) { try { log.Info("水厂工艺图数据同步任务开始执行.................\r\n"); DateTime newTime = DateTime.Now; String yearStr = newTime.Year.ToString(); // 进水 Decimal inFlow = 0; Decimal inTotalFlow = 0; string insql = @"SELECT TOP 1 [id],[设备ID],[记录时间],[采集时间],[设备状态],[通讯状态],[净累计流量],[瞬时流量] FROM [dbo].[历史记录_000523_" + yearStr + "] order by 采集时间 DESC"; DataTable inDt = dbHelper.Fill(insql); if (inDt != null && inDt.Rows.Count > 0) { inFlow = Convert.ToDecimal(inDt.Rows[0]["瞬时流量"]); inTotalFlow = Convert.ToDecimal(inDt.Rows[0]["净累计流量"]); } // 出水 Decimal outFlow = 0; Decimal outTotalFlow = 0; string outsql = @"SELECT TOP 1 [id],[设备ID],[记录时间],[采集时间],[设备状态],[通讯状态],[净累计流量],[瞬时流量] FROM [dbo].[历史记录_000286_" + yearStr + "] Order by 采集时间 DESC"; ; DataTable outDt = dbHelper.Fill(outsql); if (outDt != null && outDt.Rows.Count > 0) { outFlow = Convert.ToDecimal(inDt.Rows[0]["瞬时流量"]); outTotalFlow = Convert.ToDecimal(inDt.Rows[0]["净累计流量"]); } string factorySql = @"SELECT TOP 1 1 rid,3 Type,1 ridl,CONVERT(varchar(100), CollectTime, 111) 日期,CONVERT(varchar(100), CollectTime, 108) 时间, CONVERT(varchar(100), CollectTime, 111) 日期1,CONVERT(varchar(100), CollectTime, 108) 时间1, CONVERT(varchar(100), CollectTime, 111) + ' ' + CONVERT(varchar(100), CollectTime, 108) ReadTime,CONVERT(varchar(100), CollectTime, 111) + ' ' + CONVERT(varchar(5), CollectTime, 108) 更新时间,CONVERT(varchar(100), CollectTime, 111) + ' ' + CONVERT(varchar(5), CollectTime, 108) 更新时间1, '' FactoryId,'sc001' DeviceCode,1 FactoryId1,0 反冲流量,0 管网压力," + outTotalFlow + " 一号正累计流量,"+ outFlow + @" 一号流量计流量,0 一号负累计流量,0 一号送水泵压力,0 一号反冲泵压力,0 三号反冲泵压力,0 三号送水泵压力,0 二号反冲泵压力,0 二号正累计流量, 0 二号负累计流量,0 二号送水泵压力,0 反冲正累计流量,0 反冲负累计流量,0 四号送水泵压力,"+ inFlow + @" 仓库流量计瞬时1,0 仓库流量计瞬时2, 0 仓库流量计瞬时3,"+inTotalFlow+ @" 仓库流量计正累计1,0 仓库流量计正累计2,0 仓库流量计正累计3,0 仓库流量计负累计1, 0 仓库流量计负累计2,0 仓库流量计负累计3 ,[originWaterZD] 进水浊度 ,[originWaterPH] 进水PH ,[lvhouZD] 泵房出水浊度 ,[lvhouYulv] 进水氨氮 ,[outWaterZD] 出水浊度 ,[outWaterPH] 泵房出水PH ,[outWaterYulv] 泵房出水余氯 ,[qingshuichiFluidLevel] 清水池液位 FROM[Waterwell].[dbo].[WaterFactoryDaliyRecord] order by id DESC"; DataTable dtDevice = waterFactoryDbHelper.Fill(factorySql); if (dtDevice == null || dtDevice.Rows.Count == 0) { return; } DataColumnCollection cols = dtDevice.Columns; // 处理设备列表 for (int i = 0; i < dtDevice.Rows.Count; i++) { DataRow dr = dtDevice.Rows[i]; string deviceCode = dr["DeviceCode"].ToString(); string type = dr["Type"].ToString(); string realData = CreateJsonParameters(cols, dr); Dictionary deviceMap = new Dictionary(); deviceMap["DeviceCode"] = deviceCode; deviceMap["Type"] = type; deviceMap["RealData"] = realData; string message = JsonConvert.SerializeObject(deviceMap); foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("workmanship", "", property, Encoding.UTF8.GetBytes(message)); //生产消息 } } log.Info("水厂工艺图数据同步任务执行结束.................\r\n"); } catch (Exception ex) { log.Error("水厂工艺图数据同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n"); } } private static string CreateJsonParameters(DataColumnCollection cols, DataRow dr) { StringBuilder JsonString = new StringBuilder(); JsonString.Append("{"); for (int j = 0; j < cols.Count; j++) { if (j < cols.Count - 1) { JsonString.Append("\"" + cols[j].ColumnName.ToString() + "\":" + "\"" + dr[j].ToString().Trim() + "\","); } else if (j == cols.Count - 1) { JsonString.Append("\"" + cols[j].ColumnName.ToString() + "\":" + "\"" + dr[j].ToString().Trim() + "\""); } } JsonString.Append("}"); return JsonString.ToString(); } static IDbProvider dbHelper { get { var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.DbConncetion); return DbDefine; } } static IDbProvider waterFactoryDbHelper { get { var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.changleWaterFactoryDb); return DbDefine; } } static IDbProvider waterQulityDbHelper { get { var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.changLeWaterQualityDb); return DbDefine; } } static IDbProvider secondDbHelper { get { var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion); return DbDefine; } } } }