123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337 |
- using log4net;
- using Newtonsoft.Json;
- using Quartz;
- using RabbitMQ.Client;
- using RDIFramework.Utilities;
- using System;
- using System.Collections;
- using System.Collections.Generic;
- using System.Data;
- using System.Linq;
- using System.Text;
- namespace TimedUpload.QuartzJobs
- {
- [DisallowConcurrentExecution]
- public class ChangleWaterFactoryDataJob : IJob
- {
- private readonly ILog log = LogManager.GetLogger(typeof(ChangleWaterFactoryDataJob));
- public void Execute(IJobExecutionContext context)
- {
- string[] uploadUrls = Constants.UploadUrl.Split('|');
- Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
- Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
- Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
- foreach (string uploadUrl in uploadUrls)
- {
- ConnectionFactory factory = new ConnectionFactory();
- factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
- factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
- factory.Password = Constants.UploadPassword;//默认密码
- factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
- IConnection connection = factory.CreateConnection();
- IModel channel = connection.CreateModel();
- channel.QueueDeclare("waterFactory.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
- IBasicProperties property = channel.CreateBasicProperties();
- property.ContentType = "text/plain";
- property.DeliveryMode = 2; //持久化
- connections.Add(uploadUrl, connection);
- properties.Add(uploadUrl, property);
- channels.Add(uploadUrl, channel);
- }
- if (channels.Count > 0)
- {
- SendWaterFactoryHis(channels, properties);
- }
- foreach (KeyValuePair<string, IConnection> item in connections)
- {
- IConnection connection = item.Value;
- connection.Close();
- }
- }
- /// <summary>
- /// 水厂历史数据
- /// </summary>
- /// <param name="channels"></param>
- /// <param name="properties"></param>
- private void SendWaterFactoryHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
- {
- try
- {
- Object inPh= null, inChlorine = null, inTurbidty = null, inTurbidtytwo = null, outPh = null, outChlorine = null, outTurbidty = null, level = null;
- DateTime nowTime = DateTime.Now;
- log.Info("昌乐水厂数据同步任务开始执行.................\r\n");
- string factorySql = "SELECT [id],[inMeterId],[outMeterId],[uploadTime],[waterFactory] FROM [dbo].[uploadList]";
- DataTable dtFactory = waterFHelper.Fill(factorySql);
- if (dtFactory == null || dtFactory.Rows.Count == 0)
- {
- return;
- }
- // 处理水厂水表
- for (int i = 0; i < dtFactory.Rows.Count; i++)
- {
- string factoryCode = dtFactory.Rows[i]["waterFactory"].ToString();
- // DataTable dtMeter = dbHelper.Fill(meterSql);
- //if (dtMeter == null || dtMeter.Rows.Count == 0)
- //{
- // continue;
- //}
- Dictionary<string, object> factoryMap = new Dictionary<string, object>();
- factoryMap["FactoryCode"] = factoryCode;
- // 处理水厂基础数据
- // string baseSql = "SELECT RTRIM(日期) + ' ' + RTRIM(时间) ReadTime,反冲正累计流量 - 反冲负累计流量 BackWashing,[清水池液位] LiquidHeight,更新时间 FROM [dbo].[水厂泵房2] WHERE FactoryId = " + factoryId + " ORDER BY 日期, 时间 ";
- // DataTable baseDt = dbHelper.Fill(baseSql);
- DateTime searchTime = Convert.ToDateTime(dtFactory.Rows[i]["uploadTime"]);
- TimeSpan sTs = new TimeSpan(searchTime.Ticks);
- TimeSpan eTs = new TimeSpan(nowTime.Ticks);
- TimeSpan ts = eTs - sTs;
- int num = (int)ts.TotalMinutes / 5;
- if (num > 200) {
- num = 200;
- }
- for (int l = 0; l < num; l++)
- {
- log.Info(l);
- // DataRow baseRow = baseDt.Rows[l];
- Dictionary<string, object> baseMap = new Dictionary<string, object>();
- ArrayList meterDataList = new ArrayList();
- searchTime = searchTime.AddMinutes(5);
- string time = searchTime.ToString("yyyy/MM/dd HH:mm:ss");// baseRow["ReadTime"].ToString();
- string year = searchTime.Year.ToString();
- baseMap["ReadTime"] = time;
- baseMap["BackWashing"] = "0";//baseRow["BackWashing"].ToString();
- baseMap["Consumption"] = "0";
- baseMap["CleanWaterFlow"] = "0";
- baseMap["Dosage"] = "0";
- // 进水
- string insql = @"SELECT TOP 1 [id],[设备ID],[记录时间],[采集时间],[设备状态],[通讯状态],[净累计流量],[瞬时流量] FROM [dbo].[历史记录_000523_" + year + "] where 采集时间 >= '" + time + "'";
- DataTable inDt = dbHelper.Fill(insql);
- if (inDt != null && inDt.Rows.Count > 0)
- {
- Dictionary<string, object> meterMap = new Dictionary<string, object>();
- meterMap["InstantaneousFlow"] = inDt.Rows[0]["瞬时流量"];
- meterMap["NetCumulativeFlow"] = inDt.Rows[0]["净累计流量"];
- meterMap["Pressure"] = null;
- meterMap["PH"] = null;
- meterMap["Chlorine"] = null;
- meterMap["Turbidity"] = null;
- meterMap["TurbidityTwo"] = null;
- meterMap["ReadTime"] = time;
- meterMap["MeterCode"] = "wwkj001";
- meterDataList.Add(meterMap);
- }
- else
- {
- Dictionary<string, object> meterMap = new Dictionary<string, object>();
- meterMap["InstantaneousFlow"] = null;
- meterMap["NetCumulativeFlow"] = null;
- meterMap["Pressure"] = null;
- meterMap["PH"] = null;
- meterMap["Chlorine"] = null;
- meterMap["Turbidity"] = null;
- meterMap["TurbidityTwo"] = null;
- meterMap["ReadTime"] = time;
- meterMap["MeterCode"] = "wwkj001";
- meterDataList.Add(meterMap);
- }
- // 出水
- string outsql = @"SELECT TOP 1 [id],[设备ID],[记录时间],[采集时间],[设备状态],[通讯状态],[净累计流量],[瞬时流量] FROM [dbo].[历史记录_000286_" + year + "] where 采集时间 >= '" + time + "'"; ;
- DataTable outDt = dbHelper.Fill(outsql);
- if (outDt != null && outDt.Rows.Count > 0)
- {
- Dictionary<string, object> meterMap = new Dictionary<string, object>();
- meterMap["InstantaneousFlow"] = outDt.Rows[0]["瞬时流量"];
- meterMap["NetCumulativeFlow"] = outDt.Rows[0]["净累计流量"];
- meterMap["Pressure"] = null;
- meterMap["PH"] = null;
- meterMap["Chlorine"] = null;
- meterMap["Turbidity"] = null;
- meterMap["TurbidityTwo"] = null;
- meterMap["ReadTime"] = time;
- meterMap["MeterCode"] = "wwkj005";
- meterDataList.Add(meterMap);
- }
- else
- {
- Dictionary<string, object> meterMap = new Dictionary<string, object>();
- meterMap["InstantaneousFlow"] = null;
- meterMap["NetCumulativeFlow"] = null;
- meterMap["Pressure"] = null;
- meterMap["PH"] = null;
- meterMap["Chlorine"] = null;
- meterMap["Turbidity"] = null;
- meterMap["TurbidityTwo"] = null;
- meterMap["ReadTime"] = time;
- meterMap["MeterCode"] = "wwkj005";
- meterDataList.Add(meterMap);
- }
- // 进水水质和出水水质
- string waterQuelitySql = @"SELECT TOP 1 [id]
- ,[originWaterZD],[originWaterPH],[chendianchiOutWaterZD],[lvhouZD]
- ,[lvhouYulv],[qingshuichiFlow],[gongshuishuitaOutFlow]
- ,[outWaterZD],[outWaterPH],[outWaterYulv]
- ,[qingshuichiFluidLevel],[CollectTime],[CreateDt]
- FROM[dbo].[WaterFactoryDaliyRecord] where CollectTime >= '" + searchTime.ToString("yyyy-MM-dd HH:mm") + "' ";
- DataTable wqDt = waterFHelper.Fill(waterQuelitySql);
- if (wqDt != null && wqDt.Rows.Count > 0)
- {
- inPh = wqDt.Rows[0]["originWaterPH"];
- inChlorine = wqDt.Rows[0]["lvhouYulv"];
- inTurbidty = wqDt.Rows[0]["originWaterZD"];
- inTurbidtytwo = wqDt.Rows[0]["lvhouZD"];
- outPh = wqDt.Rows[0]["outWaterPH"];
- outChlorine = wqDt.Rows[0]["outWaterYulv"];
- outTurbidty = wqDt.Rows[0]["outWaterZD"];
- level = wqDt.Rows[0]["qingshuichiFluidLevel"];
- }
- Dictionary<string, object> meterMap1 = new Dictionary<string, object>(); // 进水
- meterMap1["InstantaneousFlow"] = null;
- meterMap1["NetCumulativeFlow"] = null;
- meterMap1["Pressure"] = null;
- meterMap1["PH"] = inPh;
- meterMap1["Chlorine"] = inChlorine;
- meterMap1["Turbidity"] = inTurbidty;
- meterMap1["TurbidityTwo"] = inTurbidtytwo;
- meterMap1["ReadTime"] = time;
- meterMap1["MeterCode"] = "wwkj008";
- meterDataList.Add(meterMap1);
- Dictionary<string, object> meterMap2 = new Dictionary<string, object>(); // 出水
- meterMap2["InstantaneousFlow"] = null;
- meterMap2["NetCumulativeFlow"] = null;
- meterMap2["PH"] = outPh;
- meterMap2["Chlorine"] = outChlorine;
- meterMap2["Turbidity"] = outTurbidty;
- meterMap2["TurbidityTwo"] = null;
- meterMap2["Pressure"] = null;
- meterMap2["ReadTime"] = time;
- meterMap2["MeterCode"] = "wwkj004";
- meterDataList.Add(meterMap2);
- baseMap["LiquidHeight"] = level;//baseRow["LiquidHeight"].ToString();
- #region
- // 处理水厂查询的数据
- //for (int j = 0; j < dtMeter.Rows.Count; j++)
- //{
- // string tableName = dtMeter.Rows[j]["TableName"].ToString();
- // string meterId = dtMeter.Rows[j]["MeterId"].ToString();
- // string meterCode = dtMeter.Rows[j]["MeterCode"].ToString();
- // string colSql = "select * from 水厂数据字段 where MeterId = " + meterId;
- // DataTable dtMeterCol = dbHelper.Fill(colSql);
- // if (dtMeterCol == null || dtMeterCol.Rows.Count == 0)
- // {
- // continue;
- // }
- // string queryCol = "";
- // string[] queryColArr = new string[dtMeterCol.Rows.Count];
- // for (int k = 0; k < dtMeterCol.Rows.Count; k++)
- // {
- // string tableColumn = dtMeterCol.Rows[k]["TableColumn"].ToString();
- // string sendKey = dtMeterCol.Rows[k]["SendKey"].ToString();
- // if (k == dtMeterCol.Rows.Count - 1)
- // {
- // queryCol += tableColumn + " " + sendKey;
- // }
- // else
- // {
- // queryCol += tableColumn + " " + sendKey + ",";
- // }
- // queryColArr[k] = sendKey;
- // }
- // Dictionary<string, object> meterMap = new Dictionary<string, object>();
- // string[] colArr = waterFactoryColumn.Split(',');
- // foreach (string col in colArr)
- // {
- // meterMap[col] = null;
- // }
- // string meterDataSql = "select " + queryCol + " from " + tableName + " where 更新时间 = '" + id + "'";
- // DataTable data = dbHelper.Fill(meterDataSql);
- // if (data == null || data.Rows.Count == 0)
- // {
- // continue;
- // }
- // foreach (string col in queryColArr)
- // {
- // meterMap[col] = data.Rows[0][col];
- // }
- // meterMap["MeterCode"] = meterCode;
- // meterDataList.Add(meterMap);
- //}
- #endregion
- factoryMap["BaseData"] = baseMap;
- factoryMap["MeterData"] = meterDataList;
- string message = JsonConvert.SerializeObject(factoryMap);
- foreach (KeyValuePair<string, IModel> item in channels)
- {
- string key = item.Key;
- IModel channel = item.Value;
- IBasicProperties property = properties[key];
- channel.BasicPublish("waterFactory.deviceHis", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
- }
- // 删除数据
- String updatesql = "UPDATE [dbo].[uploadList] SET uploadTime = '" + time + "' WHERE [id] = " + dtFactory.Rows[i]["id"].ToString();
-
- waterFHelper.ExecuteNonQuery(updatesql);
- }
- }
- log.Info("水厂历史记录同步任务执行结束.................\r\n");
- }
- catch (Exception ex)
- {
- log.Error("水厂历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
- }
- }
- /// <summary>
- /// 大表数据
- /// </summary>
- static IDbProvider dbHelper
- {
- get
- {
- var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.DbConncetion);
- return DbDefine;
- }
- }
- /// <summary>
- /// 水厂数据库
- /// </summary>
- static IDbProvider waterFHelper
- {
- get
- {
- var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.changleWaterFactoryDb);
- return DbDefine;
- }
- }
- }
- }
|