|
@@ -0,0 +1,336 @@
|
|
|
|
+using log4net;
|
|
|
|
+using Newtonsoft.Json;
|
|
|
|
+using Quartz;
|
|
|
|
+using RabbitMQ.Client;
|
|
|
|
+using RDIFramework.Utilities;
|
|
|
|
+using System;
|
|
|
|
+using System.Collections;
|
|
|
|
+using System.Collections.Generic;
|
|
|
|
+using System.Data;
|
|
|
|
+using System.Linq;
|
|
|
|
+using System.Text;
|
|
|
|
+
|
|
|
|
+namespace TimedUpload.QuartzJobs
|
|
|
|
+{
|
|
|
|
+ [DisallowConcurrentExecution]
|
|
|
|
+ public class ChangleWaterFactoryDataJob : IJob
|
|
|
|
+ {
|
|
|
|
+ private readonly ILog log = LogManager.GetLogger(typeof(ChangleWaterFactoryDataJob));
|
|
|
|
+
|
|
|
|
+ public void Execute(IJobExecutionContext context)
|
|
|
|
+ {
|
|
|
|
+ string[] uploadUrls = Constants.UploadUrl.Split('|');
|
|
|
|
+ Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
|
|
|
|
+ Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
|
|
|
|
+ Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
|
|
|
|
+
|
|
|
|
+ foreach (string uploadUrl in uploadUrls)
|
|
|
|
+ {
|
|
|
|
+ ConnectionFactory factory = new ConnectionFactory();
|
|
|
|
+ factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
|
|
|
|
+ factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
|
|
|
|
+ factory.Password = Constants.UploadPassword;//默认密码
|
|
|
|
+
|
|
|
|
+ factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
|
|
|
|
+
|
|
|
|
+ IConnection connection = factory.CreateConnection();
|
|
|
|
+ IModel channel = connection.CreateModel();
|
|
|
|
+ channel.QueueDeclare("waterFactory.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
|
|
|
|
+
|
|
|
|
+ IBasicProperties property = channel.CreateBasicProperties();
|
|
|
|
+ property.ContentType = "text/plain";
|
|
|
|
+ property.DeliveryMode = 2; //持久化
|
|
|
|
+ connections.Add(uploadUrl, connection);
|
|
|
|
+ properties.Add(uploadUrl, property);
|
|
|
|
+ channels.Add(uploadUrl, channel);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (channels.Count > 0)
|
|
|
|
+ {
|
|
|
|
+ SendWaterFactoryHis(channels, properties);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ foreach (KeyValuePair<string, IConnection> item in connections)
|
|
|
|
+ {
|
|
|
|
+ IConnection connection = item.Value;
|
|
|
|
+ connection.Close();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /// <summary>
|
|
|
|
+ /// 水厂历史数据
|
|
|
|
+ /// </summary>
|
|
|
|
+ /// <param name="channels"></param>
|
|
|
|
+ /// <param name="properties"></param>
|
|
|
|
+ private void SendWaterFactoryHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
|
|
|
|
+ {
|
|
|
|
+ try
|
|
|
|
+ {
|
|
|
|
+ Object inPh= null, inChlorine = null, inTurbidty = null, inTurbidtytwo = null, outPh = null, outChlorine = null, outTurbidty = null, level = null;
|
|
|
|
+
|
|
|
|
+ DateTime nowTime = DateTime.Now;
|
|
|
|
+ log.Info("昌乐水厂数据同步任务开始执行.................\r\n");
|
|
|
|
+ string factorySql = "SELECT [id],[inMeterId],[outMeterId],[uploadTime],[waterFactory] FROM [dbo].[uploadList]";
|
|
|
|
+ DataTable dtFactory = waterFHelper.Fill(factorySql);
|
|
|
|
+ if (dtFactory == null || dtFactory.Rows.Count == 0)
|
|
|
|
+ {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 处理水厂水表
|
|
|
|
+ for (int i = 0; i < dtFactory.Rows.Count; i++)
|
|
|
|
+ {
|
|
|
|
+ string factoryCode = dtFactory.Rows[i]["waterFactory"].ToString();
|
|
|
|
+
|
|
|
|
+ // DataTable dtMeter = dbHelper.Fill(meterSql);
|
|
|
|
+
|
|
|
|
+ //if (dtMeter == null || dtMeter.Rows.Count == 0)
|
|
|
|
+ //{
|
|
|
|
+ // continue;
|
|
|
|
+ //}
|
|
|
|
+ Dictionary<string, object> factoryMap = new Dictionary<string, object>();
|
|
|
|
+ factoryMap["FactoryCode"] = factoryCode;
|
|
|
|
+
|
|
|
|
+ // 处理水厂基础数据
|
|
|
|
+ // string baseSql = "SELECT RTRIM(日期) + ' ' + RTRIM(时间) ReadTime,反冲正累计流量 - 反冲负累计流量 BackWashing,[清水池液位] LiquidHeight,更新时间 FROM [dbo].[水厂泵房2] WHERE FactoryId = " + factoryId + " ORDER BY 日期, 时间 ";
|
|
|
|
+ // DataTable baseDt = dbHelper.Fill(baseSql);
|
|
|
|
+ DateTime searchTime = Convert.ToDateTime(dtFactory.Rows[i]["uploadTime"]);
|
|
|
|
+ TimeSpan sTs = new TimeSpan(searchTime.Ticks);
|
|
|
|
+ TimeSpan eTs = new TimeSpan(nowTime.Ticks);
|
|
|
|
+ TimeSpan ts = eTs - sTs;
|
|
|
|
+ int num = (int)ts.TotalMinutes / 5;
|
|
|
|
+ if (num > 200) {
|
|
|
|
+ num = 200;
|
|
|
|
+ }
|
|
|
|
+ for (int l = 0; l < num; l++)
|
|
|
|
+ {
|
|
|
|
+ log.Info(l);
|
|
|
|
+ // DataRow baseRow = baseDt.Rows[l];
|
|
|
|
+ Dictionary<string, object> baseMap = new Dictionary<string, object>();
|
|
|
|
+ ArrayList meterDataList = new ArrayList();
|
|
|
|
+ searchTime = searchTime.AddMinutes(5);
|
|
|
|
+ string time = searchTime.ToString("yyyy/MM/dd HH:mm:ss");// baseRow["ReadTime"].ToString();
|
|
|
|
+ string year = searchTime.Year.ToString();
|
|
|
|
+ baseMap["ReadTime"] = time;
|
|
|
|
+ baseMap["BackWashing"] = "0";//baseRow["BackWashing"].ToString();
|
|
|
|
+
|
|
|
|
+ baseMap["Consumption"] = "0";
|
|
|
|
+ baseMap["CleanWaterFlow"] = "0";
|
|
|
|
+ baseMap["Dosage"] = "0";
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ // 进水
|
|
|
|
+ string insql = @"SELECT TOP 1 [id],[设备ID],[记录时间],[采集时间],[设备状态],[通讯状态],[净累计流量],[瞬时流量] FROM [dbo].[历史记录_000523_" + year + "] where 采集时间 >= '" + time + "'";
|
|
|
|
+ DataTable inDt = dbHelper.Fill(insql);
|
|
|
|
+ if (inDt != null && inDt.Rows.Count > 0)
|
|
|
|
+ {
|
|
|
|
+ Dictionary<string, object> meterMap = new Dictionary<string, object>();
|
|
|
|
+ meterMap["InstantaneousFlow"] = inDt.Rows[0]["瞬时流量"];
|
|
|
|
+ meterMap["NetCumulativeFlow"] = inDt.Rows[0]["净累计流量"];
|
|
|
|
+ meterMap["Pressure"] = null;
|
|
|
|
+ meterMap["PH"] = null;
|
|
|
|
+ meterMap["Chlorine"] = null;
|
|
|
|
+ meterMap["Turbidity"] = null;
|
|
|
|
+ meterMap["TurbidityTwo"] = null;
|
|
|
|
+ meterMap["ReadTime"] = time;
|
|
|
|
+ meterMap["MeterCode"] = "wwkj001";
|
|
|
|
+ meterDataList.Add(meterMap);
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ Dictionary<string, object> meterMap = new Dictionary<string, object>();
|
|
|
|
+ meterMap["InstantaneousFlow"] = null;
|
|
|
|
+ meterMap["NetCumulativeFlow"] = null;
|
|
|
|
+ meterMap["Pressure"] = null;
|
|
|
|
+ meterMap["PH"] = null;
|
|
|
|
+ meterMap["Chlorine"] = null;
|
|
|
|
+ meterMap["Turbidity"] = null;
|
|
|
|
+ meterMap["TurbidityTwo"] = null;
|
|
|
|
+ meterMap["ReadTime"] = time;
|
|
|
|
+ meterMap["MeterCode"] = "wwkj001";
|
|
|
|
+ meterDataList.Add(meterMap);
|
|
|
|
+ }
|
|
|
|
+ // 出水
|
|
|
|
+ string outsql = @"SELECT TOP 1 [id],[设备ID],[记录时间],[采集时间],[设备状态],[通讯状态],[净累计流量],[瞬时流量] FROM [dbo].[历史记录_000286_" + year + "] where 采集时间 >= '" + time + "'"; ;
|
|
|
|
+ DataTable outDt = dbHelper.Fill(outsql);
|
|
|
|
+ if (outDt != null && outDt.Rows.Count > 0)
|
|
|
|
+ {
|
|
|
|
+ Dictionary<string, object> meterMap = new Dictionary<string, object>();
|
|
|
|
+ meterMap["InstantaneousFlow"] = outDt.Rows[0]["瞬时流量"];
|
|
|
|
+ meterMap["NetCumulativeFlow"] = outDt.Rows[0]["净累计流量"];
|
|
|
|
+ meterMap["Pressure"] = null;
|
|
|
|
+ meterMap["PH"] = null;
|
|
|
|
+ meterMap["Chlorine"] = null;
|
|
|
|
+ meterMap["Turbidity"] = null;
|
|
|
|
+ meterMap["TurbidityTwo"] = null;
|
|
|
|
+ meterMap["ReadTime"] = time;
|
|
|
|
+ meterMap["MeterCode"] = "wwkj005";
|
|
|
|
+ meterDataList.Add(meterMap);
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ Dictionary<string, object> meterMap = new Dictionary<string, object>();
|
|
|
|
+ meterMap["InstantaneousFlow"] = null;
|
|
|
|
+ meterMap["NetCumulativeFlow"] = null;
|
|
|
|
+ meterMap["Pressure"] = null;
|
|
|
|
+ meterMap["PH"] = null;
|
|
|
|
+ meterMap["Chlorine"] = null;
|
|
|
|
+ meterMap["Turbidity"] = null;
|
|
|
|
+ meterMap["TurbidityTwo"] = null;
|
|
|
|
+ meterMap["ReadTime"] = time;
|
|
|
|
+ meterMap["MeterCode"] = "wwkj005";
|
|
|
|
+ meterDataList.Add(meterMap);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 进水水质和出水水质
|
|
|
|
+ string waterQuelitySql = @"SELECT TOP 1 [id]
|
|
|
|
+ ,[originWaterZD],[originWaterPH],[chendianchiOutWaterZD],[lvhouZD]
|
|
|
|
+ ,[lvhouYulv],[qingshuichiFlow],[gongshuishuitaOutFlow]
|
|
|
|
+ ,[outWaterZD],[outWaterPH],[outWaterYulv]
|
|
|
|
+ ,[qingshuichiFluidLevel],[CollectTime],[CreateDt]
|
|
|
|
+ FROM[dbo].[WaterFactoryDaliyRecord] where CollectTime >= '" + searchTime.ToString("yyyy-MM-dd HH:mm") + "' ";
|
|
|
|
+
|
|
|
|
+ DataTable wqDt = waterFHelper.Fill(waterQuelitySql);
|
|
|
|
+ if (wqDt != null && wqDt.Rows.Count > 0)
|
|
|
|
+ {
|
|
|
|
+ inPh = wqDt.Rows[0]["originWaterPH"];
|
|
|
|
+ inChlorine = wqDt.Rows[0]["lvhouYulv"];
|
|
|
|
+ inTurbidty = wqDt.Rows[0]["originWaterZD"];
|
|
|
|
+ inTurbidtytwo = wqDt.Rows[0]["lvhouZD"];
|
|
|
|
+ outPh = wqDt.Rows[0]["outWaterPH"];
|
|
|
|
+ outChlorine = wqDt.Rows[0]["outWaterYulv"];
|
|
|
|
+ outTurbidty = wqDt.Rows[0]["outWaterZD"];
|
|
|
|
+ level = wqDt.Rows[0]["qingshuichiFluidLevel"];
|
|
|
|
+ }
|
|
|
|
+ Dictionary<string, object> meterMap1 = new Dictionary<string, object>(); // 进水
|
|
|
|
+ meterMap1["InstantaneousFlow"] = null;
|
|
|
|
+ meterMap1["NetCumulativeFlow"] = null;
|
|
|
|
+ meterMap1["Pressure"] = null;
|
|
|
|
+ meterMap1["PH"] = inPh;
|
|
|
|
+ meterMap1["Chlorine"] = inChlorine;
|
|
|
|
+ meterMap1["Turbidity"] = inTurbidty;
|
|
|
|
+ meterMap1["TurbidityTwo"] = inTurbidtytwo;
|
|
|
|
+ meterMap1["ReadTime"] = time;
|
|
|
|
+ meterMap1["MeterCode"] = "wwkj008";
|
|
|
|
+ meterDataList.Add(meterMap1);
|
|
|
|
+ Dictionary<string, object> meterMap2 = new Dictionary<string, object>(); // 出水
|
|
|
|
+ meterMap2["InstantaneousFlow"] = null;
|
|
|
|
+ meterMap2["NetCumulativeFlow"] = null;
|
|
|
|
+ meterMap2["PH"] = outPh;
|
|
|
|
+ meterMap2["Chlorine"] = outChlorine;
|
|
|
|
+ meterMap2["Turbidity"] = outTurbidty;
|
|
|
|
+ meterMap2["TurbidityTwo"] = null;
|
|
|
|
+ meterMap2["Pressure"] = null;
|
|
|
|
+ meterMap2["ReadTime"] = time;
|
|
|
|
+ meterMap2["MeterCode"] = "wwkj004";
|
|
|
|
+ meterDataList.Add(meterMap2);
|
|
|
|
+
|
|
|
|
+ baseMap["LiquidHeight"] = level;//baseRow["LiquidHeight"].ToString();
|
|
|
|
+
|
|
|
|
+ #region
|
|
|
|
+ // 处理水厂查询的数据
|
|
|
|
+ //for (int j = 0; j < dtMeter.Rows.Count; j++)
|
|
|
|
+ //{
|
|
|
|
+ // string tableName = dtMeter.Rows[j]["TableName"].ToString();
|
|
|
|
+ // string meterId = dtMeter.Rows[j]["MeterId"].ToString();
|
|
|
|
+ // string meterCode = dtMeter.Rows[j]["MeterCode"].ToString();
|
|
|
|
+
|
|
|
|
+ // string colSql = "select * from 水厂数据字段 where MeterId = " + meterId;
|
|
|
|
+ // DataTable dtMeterCol = dbHelper.Fill(colSql);
|
|
|
|
+ // if (dtMeterCol == null || dtMeterCol.Rows.Count == 0)
|
|
|
|
+ // {
|
|
|
|
+ // continue;
|
|
|
|
+ // }
|
|
|
|
+ // string queryCol = "";
|
|
|
|
+ // string[] queryColArr = new string[dtMeterCol.Rows.Count];
|
|
|
|
+ // for (int k = 0; k < dtMeterCol.Rows.Count; k++)
|
|
|
|
+ // {
|
|
|
|
+ // string tableColumn = dtMeterCol.Rows[k]["TableColumn"].ToString();
|
|
|
|
+ // string sendKey = dtMeterCol.Rows[k]["SendKey"].ToString();
|
|
|
|
+ // if (k == dtMeterCol.Rows.Count - 1)
|
|
|
|
+ // {
|
|
|
|
+ // queryCol += tableColumn + " " + sendKey;
|
|
|
|
+ // }
|
|
|
|
+ // else
|
|
|
|
+ // {
|
|
|
|
+ // queryCol += tableColumn + " " + sendKey + ",";
|
|
|
|
+ // }
|
|
|
|
+ // queryColArr[k] = sendKey;
|
|
|
|
+ // }
|
|
|
|
+
|
|
|
|
+ // Dictionary<string, object> meterMap = new Dictionary<string, object>();
|
|
|
|
+ // string[] colArr = waterFactoryColumn.Split(',');
|
|
|
|
+ // foreach (string col in colArr)
|
|
|
|
+ // {
|
|
|
|
+ // meterMap[col] = null;
|
|
|
|
+ // }
|
|
|
|
+ // string meterDataSql = "select " + queryCol + " from " + tableName + " where 更新时间 = '" + id + "'";
|
|
|
|
+ // DataTable data = dbHelper.Fill(meterDataSql);
|
|
|
|
+ // if (data == null || data.Rows.Count == 0)
|
|
|
|
+ // {
|
|
|
|
+ // continue;
|
|
|
|
+ // }
|
|
|
|
+ // foreach (string col in queryColArr)
|
|
|
|
+ // {
|
|
|
|
+ // meterMap[col] = data.Rows[0][col];
|
|
|
|
+ // }
|
|
|
|
+ // meterMap["MeterCode"] = meterCode;
|
|
|
|
+ // meterDataList.Add(meterMap);
|
|
|
|
+ //}
|
|
|
|
+ #endregion
|
|
|
|
+
|
|
|
|
+ factoryMap["BaseData"] = baseMap;
|
|
|
|
+ factoryMap["MeterData"] = meterDataList;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ string message = JsonConvert.SerializeObject(factoryMap);
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ foreach (KeyValuePair<string, IModel> item in channels)
|
|
|
|
+ {
|
|
|
|
+ string key = item.Key;
|
|
|
|
+ IModel channel = item.Value;
|
|
|
|
+ IBasicProperties property = properties[key];
|
|
|
|
+ channel.BasicPublish("waterFactory.deviceHis", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 删除数据
|
|
|
|
+ String updatesql = "UPDATE [dbo].[uploadList] SET uploadTime = '" + time + "' WHERE [id] = " + dtFactory.Rows[i]["id"].ToString();
|
|
|
|
+
|
|
|
|
+ waterFHelper.ExecuteNonQuery(updatesql);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ log.Info("水厂历史记录同步任务执行结束.................\r\n");
|
|
|
|
+ }
|
|
|
|
+ catch (Exception ex)
|
|
|
|
+ {
|
|
|
|
+ log.Error("水厂历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /// <summary>
|
|
|
|
+ /// 大表数据
|
|
|
|
+ /// </summary>
|
|
|
|
+ static IDbProvider dbHelper
|
|
|
|
+ {
|
|
|
|
+ get
|
|
|
|
+ {
|
|
|
|
+ var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.DbConncetion);
|
|
|
|
+ return DbDefine;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /// <summary>
|
|
|
|
+ /// 水厂数据库
|
|
|
|
+ /// </summary>
|
|
|
|
+ static IDbProvider waterFHelper
|
|
|
|
+ {
|
|
|
|
+ get
|
|
|
|
+ {
|
|
|
|
+ var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.changleWaterFactoryDb);
|
|
|
|
+ return DbDefine;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|