using log4net; using Newtonsoft.Json; using Quartz; using RabbitMQ.Client; using RDIFramework.Utilities; using System; using System.Collections; using System.Collections.Generic; using System.Data; using System.Text; namespace TimedUpload.QuartzJobs { [DisallowConcurrentExecution] public class WaterFactoryDataUploadJob : IJob { private readonly ILog log = LogManager.GetLogger(typeof(WaterFactoryDataUploadJob)); public void Execute(IJobExecutionContext context) { string[] uploadUrls = Constants.UploadUrl.Split('|'); Dictionary connections = new Dictionary(); Dictionary channels = new Dictionary(); Dictionary properties = new Dictionary(); foreach (string uploadUrl in uploadUrls) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。 factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行 factory.Password = Constants.UploadPassword;//默认密码 factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连 IConnection connection = factory.CreateConnection(); IModel channel = connection.CreateModel(); channel.QueueDeclare("waterFactory.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列 IBasicProperties property = channel.CreateBasicProperties(); property.ContentType = "text/plain"; property.DeliveryMode = 2; //持久化 connections.Add(uploadUrl, connection); properties.Add(uploadUrl, property); channels.Add(uploadUrl, channel); } if (channels.Count > 0) { SendWaterFactoryHis(channels, properties); } foreach (KeyValuePair item in connections) { IConnection connection = item.Value; connection.Close(); } } /// /// 水厂历史数据 /// /// /// private void SendWaterFactoryHis(Dictionary channels, Dictionary properties) { try { log.Info("水厂历史数据同步任务开始执行.................\r\n"); string factorySql = "select * from 水厂信息"; DataTable dtFactory = dbHelper.Fill(factorySql); string waterFactoryColumn = Constants.WaterFactoryColumn; if (string.IsNullOrEmpty(waterFactoryColumn) || dtFactory == null || dtFactory.Rows.Count == 0) { return; } // 处理水厂水表 for (int i = 0; i < dtFactory.Rows.Count; i++) { string factoryId = dtFactory.Rows[i]["FactoryId"].ToString(); string factoryCode = dtFactory.Rows[i]["FactoryCode"].ToString(); string meterSql = "select * from 水厂表具 where FactoryId = " + factoryId; DataTable dtMeter = dbHelper.Fill(meterSql); if (dtMeter == null || dtMeter.Rows.Count ==0) { continue; } Dictionary factoryMap = new Dictionary(); factoryMap["FactoryCode"] = factoryCode; // 处理水厂基础数据 string baseSql = "SELECT RTRIM(日期) + ' ' + RTRIM(时间) ReadTime,反冲正累计流量 - 反冲负累计流量 BackWashing,[清水池液位] LiquidHeight,更新时间 FROM [dbo].[水厂泵房2] WHERE FactoryId = " + factoryId + " ORDER BY 日期, 时间 "; DataTable baseDt = dbHelper.Fill(baseSql); for (int l = 0; l < baseDt.Rows.Count; l++) { DataRow baseRow = baseDt.Rows[l]; Dictionary baseMap = new Dictionary(); string id = baseRow["更新时间"].ToString(); string time = baseRow["ReadTime"].ToString(); baseMap["ReadTime"] = time; baseMap["BackWashing"] = baseRow["BackWashing"].ToString(); baseMap["LiquidHeight"] = baseRow["LiquidHeight"].ToString(); baseMap["Consumption"] = "0"; baseMap["CleanWaterFlow"] = "0"; baseMap["Dosage"] = "0"; factoryMap["BaseData"] = baseMap; ArrayList meterDataList = new ArrayList(); // 处理水厂查询的数据 for (int j = 0; j < dtMeter.Rows.Count; j++) { string tableName = dtMeter.Rows[j]["TableName"].ToString(); string meterId = dtMeter.Rows[j]["MeterId"].ToString(); string meterCode = dtMeter.Rows[j]["MeterCode"].ToString(); string colSql = "select * from 水厂数据字段 where MeterId = " + meterId; DataTable dtMeterCol = dbHelper.Fill(colSql); if (dtMeterCol == null || dtMeterCol.Rows.Count == 0) { continue; } string queryCol = ""; string[] queryColArr = new string[dtMeterCol.Rows.Count]; for (int k = 0; k < dtMeterCol.Rows.Count; k++) { string tableColumn = dtMeterCol.Rows[k]["TableColumn"].ToString(); string sendKey = dtMeterCol.Rows[k]["SendKey"].ToString(); if (k == dtMeterCol.Rows.Count - 1) { queryCol += tableColumn + " " + sendKey; } else { queryCol += tableColumn + " " + sendKey + ","; } queryColArr[k] = sendKey; } Dictionary meterMap = new Dictionary(); string[] colArr = waterFactoryColumn.Split(','); foreach (string col in colArr) { meterMap[col] = null; } string meterDataSql = "select " + queryCol + " from " + tableName + " where 更新时间 = '" + id + "'"; DataTable data = dbHelper.Fill(meterDataSql); if (data == null || data.Rows.Count == 0) { continue; } foreach (string col in queryColArr) { meterMap[col] = data.Rows[0][col]; } meterMap["MeterCode"] = meterCode; meterDataList.Add(meterMap); } factoryMap["MeterData"] = meterDataList; string message = JsonConvert.SerializeObject(factoryMap); foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("waterFactory.deviceHis", "", property, Encoding.UTF8.GetBytes(message)); //生产消息 } // 删除数据 string deleteSql = "DELETE FROM [dbo].[水厂泵房2] where 更新时间 = '" + id + "' and FactoryId = " + factoryId; deleteSql += ";DELETE FROM [dbo].[水厂过滤间GGD] where 更新时间 = '" + id + "' and FactoryId = " + factoryId; deleteSql += ";DELETE FROM [dbo].[水厂泵房1] where 更新时间 = '" + id + "' and FactoryId = " + factoryId; deleteSql += ";DELETE FROM [dbo].[水厂过滤间操作台] where 更新时间 = '" + id + "' and FactoryId = " + factoryId; dbHelper.ExecuteNonQuery(deleteSql); } } log.Info("水厂历史记录同步任务执行结束.................\r\n"); } catch (Exception ex) { log.Error("水厂历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n"); } } static IDbProvider dbHelper { get { var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion); return DbDefine; } } } }