using log4net; using Newtonsoft.Json; using Quartz; using RabbitMQ.Client; using RDIFramework.Utilities; using System; using System.Collections; using System.Collections.Generic; using System.Data; using System.Linq; using System.Text; namespace TimedUpload.QuartzJobs { [DisallowConcurrentExecution] public class ChangleWaterFactoryDataJob : IJob { private readonly ILog log = LogManager.GetLogger(typeof(ChangleWaterFactoryDataJob)); public void Execute(IJobExecutionContext context) { string[] uploadUrls = Constants.UploadUrl.Split('|'); Dictionary connections = new Dictionary(); Dictionary channels = new Dictionary(); Dictionary properties = new Dictionary(); foreach (string uploadUrl in uploadUrls) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。 factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行 factory.Password = Constants.UploadPassword;//默认密码 factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连 IConnection connection = factory.CreateConnection(); IModel channel = connection.CreateModel(); channel.QueueDeclare("waterFactory.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列 IBasicProperties property = channel.CreateBasicProperties(); property.ContentType = "text/plain"; property.DeliveryMode = 2; //持久化 connections.Add(uploadUrl, connection); properties.Add(uploadUrl, property); channels.Add(uploadUrl, channel); } if (channels.Count > 0) { SendWaterFactoryHis(channels, properties); } foreach (KeyValuePair item in connections) { IConnection connection = item.Value; connection.Close(); } } /// /// 水厂历史数据 /// /// /// private void SendWaterFactoryHis(Dictionary channels, Dictionary properties) { try { Object inPh= null, inChlorine = null, inTurbidty = null, inTurbidtytwo = null, outPh = null, outChlorine = null, outTurbidty = null, level = null; DateTime nowTime = DateTime.Now; log.Info("昌乐水厂数据同步任务开始执行.................\r\n"); string factorySql = "SELECT [id],[inMeterId],[outMeterId],[uploadTime],[waterFactory] FROM [dbo].[uploadList]"; DataTable dtFactory = waterFHelper.Fill(factorySql); if (dtFactory == null || dtFactory.Rows.Count == 0) { return; } // 处理水厂水表 for (int i = 0; i < dtFactory.Rows.Count; i++) { string factoryCode = dtFactory.Rows[i]["waterFactory"].ToString(); // DataTable dtMeter = dbHelper.Fill(meterSql); //if (dtMeter == null || dtMeter.Rows.Count == 0) //{ // continue; //} Dictionary factoryMap = new Dictionary(); factoryMap["FactoryCode"] = factoryCode; // 处理水厂基础数据 // string baseSql = "SELECT RTRIM(日期) + ' ' + RTRIM(时间) ReadTime,反冲正累计流量 - 反冲负累计流量 BackWashing,[清水池液位] LiquidHeight,更新时间 FROM [dbo].[水厂泵房2] WHERE FactoryId = " + factoryId + " ORDER BY 日期, 时间 "; // DataTable baseDt = dbHelper.Fill(baseSql); DateTime searchTime = Convert.ToDateTime(dtFactory.Rows[i]["uploadTime"]); TimeSpan sTs = new TimeSpan(searchTime.Ticks); TimeSpan eTs = new TimeSpan(nowTime.Ticks); TimeSpan ts = eTs - sTs; int num = (int)ts.TotalMinutes / 5; if (num > 200) { num = 200; } for (int l = 0; l < num; l++) { log.Info(l); // DataRow baseRow = baseDt.Rows[l]; Dictionary baseMap = new Dictionary(); ArrayList meterDataList = new ArrayList(); searchTime = searchTime.AddMinutes(5); string time = searchTime.ToString("yyyy/MM/dd HH:mm:ss");// baseRow["ReadTime"].ToString(); string year = searchTime.Year.ToString(); baseMap["ReadTime"] = time; baseMap["BackWashing"] = "0";//baseRow["BackWashing"].ToString(); baseMap["Consumption"] = "0"; baseMap["CleanWaterFlow"] = "0"; baseMap["Dosage"] = "0"; // 进水 string insql = @"SELECT TOP 1 [id],[设备ID],[记录时间],[采集时间],[设备状态],[通讯状态],[净累计流量],[瞬时流量] FROM [dbo].[历史记录_000513_" + year + "] where 采集时间 >= '" + time + "'"; DataTable inDt = dbHelper.Fill(insql); if (inDt != null && inDt.Rows.Count > 0) { Dictionary meterMap = new Dictionary(); meterMap["InstantaneousFlow"] = inDt.Rows[0]["瞬时流量"]; meterMap["NetCumulativeFlow"] = inDt.Rows[0]["净累计流量"]; meterMap["Pressure"] = null; meterMap["PH"] = null; meterMap["Chlorine"] = null; meterMap["Turbidity"] = null; meterMap["TurbidityTwo"] = null; meterMap["ReadTime"] = time; meterMap["MeterCode"] = "wwkj001"; meterDataList.Add(meterMap); } else { break; Dictionary meterMap = new Dictionary(); meterMap["InstantaneousFlow"] = null; meterMap["NetCumulativeFlow"] = null; meterMap["Pressure"] = null; meterMap["PH"] = null; meterMap["Chlorine"] = null; meterMap["Turbidity"] = null; meterMap["TurbidityTwo"] = null; meterMap["ReadTime"] = time; meterMap["MeterCode"] = "wwkj001"; meterDataList.Add(meterMap); } // 出水 string outsql = @"SELECT TOP 1 [id],[设备ID],[记录时间],[采集时间],[设备状态],[通讯状态],[净累计流量],[瞬时流量] FROM [dbo].[历史记录_000286_" + year + "] where 采集时间 >= '" + time + "'"; ; DataTable outDt = dbHelper.Fill(outsql); if (outDt != null && outDt.Rows.Count > 0) { Dictionary meterMap = new Dictionary(); meterMap["InstantaneousFlow"] = outDt.Rows[0]["瞬时流量"]; meterMap["NetCumulativeFlow"] = outDt.Rows[0]["净累计流量"]; meterMap["Pressure"] = null; meterMap["PH"] = null; meterMap["Chlorine"] = null; meterMap["Turbidity"] = null; meterMap["TurbidityTwo"] = null; meterMap["ReadTime"] = time; meterMap["MeterCode"] = "wwkj005"; meterDataList.Add(meterMap); } else { break; Dictionary meterMap = new Dictionary(); meterMap["InstantaneousFlow"] = null; meterMap["NetCumulativeFlow"] = null; meterMap["Pressure"] = null; meterMap["PH"] = null; meterMap["Chlorine"] = null; meterMap["Turbidity"] = null; meterMap["TurbidityTwo"] = null; meterMap["ReadTime"] = time; meterMap["MeterCode"] = "wwkj005"; meterDataList.Add(meterMap); } // 进水水质和出水水质 string waterQuelitySql = @"SELECT TOP 1 [id] ,[originWaterZD],[originWaterPH],[chendianchiOutWaterZD],[lvhouZD] ,[lvhouYulv],[qingshuichiFlow],[gongshuishuitaOutFlow] ,[outWaterZD],[outWaterPH],[outWaterYulv] ,[qingshuichiFluidLevel],[CollectTime],[CreateDt] FROM[dbo].[WaterFactoryDaliyRecord] where CreateDt >= '" + searchTime.AddMinutes(-1).ToString("yyyy-MM-dd HH:mm") + "' "; DataTable wqDt = waterFHelper.Fill(waterQuelitySql); if (wqDt != null && wqDt.Rows.Count > 0) { inPh = wqDt.Rows[0]["originWaterPH"]; inChlorine = wqDt.Rows[0]["lvhouYulv"]; inTurbidty = wqDt.Rows[0]["originWaterZD"]; inTurbidtytwo = wqDt.Rows[0]["lvhouZD"]; outPh = wqDt.Rows[0]["outWaterPH"]; outChlorine = wqDt.Rows[0]["outWaterYulv"]; outTurbidty = wqDt.Rows[0]["outWaterZD"]; level = wqDt.Rows[0]["qingshuichiFluidLevel"]; } Dictionary meterMap1 = new Dictionary(); // 进水 meterMap1["InstantaneousFlow"] = null; meterMap1["NetCumulativeFlow"] = null; meterMap1["Pressure"] = null; meterMap1["PH"] = inPh; meterMap1["Chlorine"] = inChlorine; meterMap1["Turbidity"] = inTurbidty; meterMap1["TurbidityTwo"] = inTurbidtytwo; meterMap1["ReadTime"] = time; meterMap1["MeterCode"] = "wwkj008"; meterDataList.Add(meterMap1); Dictionary meterMap2 = new Dictionary(); // 出水 meterMap2["InstantaneousFlow"] = null; meterMap2["NetCumulativeFlow"] = null; meterMap2["PH"] = outPh; meterMap2["Chlorine"] = outChlorine; meterMap2["Turbidity"] = outTurbidty; meterMap2["TurbidityTwo"] = null; meterMap2["Pressure"] = null; meterMap2["ReadTime"] = time; meterMap2["MeterCode"] = "wwkj004"; meterDataList.Add(meterMap2); baseMap["LiquidHeight"] = level;//baseRow["LiquidHeight"].ToString(); #region // 处理水厂查询的数据 //for (int j = 0; j < dtMeter.Rows.Count; j++) //{ // string tableName = dtMeter.Rows[j]["TableName"].ToString(); // string meterId = dtMeter.Rows[j]["MeterId"].ToString(); // string meterCode = dtMeter.Rows[j]["MeterCode"].ToString(); // string colSql = "select * from 水厂数据字段 where MeterId = " + meterId; // DataTable dtMeterCol = dbHelper.Fill(colSql); // if (dtMeterCol == null || dtMeterCol.Rows.Count == 0) // { // continue; // } // string queryCol = ""; // string[] queryColArr = new string[dtMeterCol.Rows.Count]; // for (int k = 0; k < dtMeterCol.Rows.Count; k++) // { // string tableColumn = dtMeterCol.Rows[k]["TableColumn"].ToString(); // string sendKey = dtMeterCol.Rows[k]["SendKey"].ToString(); // if (k == dtMeterCol.Rows.Count - 1) // { // queryCol += tableColumn + " " + sendKey; // } // else // { // queryCol += tableColumn + " " + sendKey + ","; // } // queryColArr[k] = sendKey; // } // Dictionary meterMap = new Dictionary(); // string[] colArr = waterFactoryColumn.Split(','); // foreach (string col in colArr) // { // meterMap[col] = null; // } // string meterDataSql = "select " + queryCol + " from " + tableName + " where 更新时间 = '" + id + "'"; // DataTable data = dbHelper.Fill(meterDataSql); // if (data == null || data.Rows.Count == 0) // { // continue; // } // foreach (string col in queryColArr) // { // meterMap[col] = data.Rows[0][col]; // } // meterMap["MeterCode"] = meterCode; // meterDataList.Add(meterMap); //} #endregion factoryMap["BaseData"] = baseMap; factoryMap["MeterData"] = meterDataList; string message = JsonConvert.SerializeObject(factoryMap); log.Info(message); foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("waterFactory.deviceHis", "", property, Encoding.UTF8.GetBytes(message)); //生产消息 } // 删除数据 String updatesql = "UPDATE [dbo].[uploadList] SET uploadTime = '" + time + "' WHERE [id] = " + dtFactory.Rows[i]["id"].ToString(); waterFHelper.ExecuteNonQuery(updatesql); } } log.Info("水厂历史记录同步任务执行结束.................\r\n"); } catch (Exception ex) { log.Error("水厂历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n"); } } /// /// 大表数据 /// static IDbProvider dbHelper { get { var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.DbConncetion); return DbDefine; } } /// /// 水厂数据库 /// static IDbProvider waterFHelper { get { var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.changleWaterFactoryDb); return DbDefine; } } } }