123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- using log4net;
- using Newtonsoft.Json;
- using Quartz;
- using RabbitMQ.Client;
- using RDIFramework.Utilities;
- using System;
- using System.Collections;
- using System.Collections.Generic;
- using System.Data;
- using System.Text;
- namespace TimedUpload.QuartzJobs
- {
- [DisallowConcurrentExecution]
- public class WaterFactoryDataUploadJob : IJob
- {
- private readonly ILog log = LogManager.GetLogger(typeof(WaterFactoryDataUploadJob));
- public void Execute(IJobExecutionContext context)
- {
- string[] uploadUrls = Constants.UploadUrl.Split('|');
- Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
- Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
- Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
- foreach (string uploadUrl in uploadUrls)
- {
- ConnectionFactory factory = new ConnectionFactory();
- factory.HostName = uploadUrl;
- factory.UserName = Constants.UploadUserName;
- factory.Password = Constants.UploadPassword;
- factory.AutomaticRecoveryEnabled = true;
- IConnection connection = factory.CreateConnection();
- IModel channel = connection.CreateModel();
- channel.QueueDeclare("waterFactory.deviceHis", true, false, false, null);
- IBasicProperties property = channel.CreateBasicProperties();
- property.ContentType = "text/plain";
- property.DeliveryMode = 2;
- connections.Add(uploadUrl, connection);
- properties.Add(uploadUrl, property);
- channels.Add(uploadUrl, channel);
- }
- if (channels.Count > 0)
- {
- SendWaterFactoryHis(channels, properties);
- }
- foreach (KeyValuePair<string, IConnection> item in connections)
- {
- IConnection connection = item.Value;
- connection.Close();
- }
- }
-
-
-
-
-
- private void SendWaterFactoryHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
- {
- try
- {
- log.Info("水厂历史数据同步任务开始执行.................\r\n");
- string factorySql = "select * from 水厂信息";
- DataTable dtFactory = dbHelper.Fill(factorySql);
- string waterFactoryColumn = Constants.WaterFactoryColumn;
- if (string.IsNullOrEmpty(waterFactoryColumn) || dtFactory == null || dtFactory.Rows.Count == 0)
- {
- return;
- }
-
- for (int i = 0; i < dtFactory.Rows.Count; i++)
- {
- string factoryId = dtFactory.Rows[i]["FactoryId"].ToString();
- string factoryCode = dtFactory.Rows[i]["FactoryCode"].ToString();
- string meterSql = "select * from 水厂表具 where FactoryId = " + factoryId;
- DataTable dtMeter = dbHelper.Fill(meterSql);
- if (dtMeter == null || dtMeter.Rows.Count ==0)
- {
- continue;
- }
- Dictionary<string, object> factoryMap = new Dictionary<string, object>();
- factoryMap["FactoryCode"] = factoryCode;
-
- string baseSql = "SELECT RTRIM(日期) + ' ' + RTRIM(时间) ReadTime,反冲正累计流量 - 反冲负累计流量 BackWashing,[清水池液位] LiquidHeight,更新时间 FROM [dbo].[水厂泵房2] WHERE FactoryId = " + factoryId + " ORDER BY 日期, 时间 ";
- DataTable baseDt = dbHelper.Fill(baseSql);
- for (int l = 0; l < baseDt.Rows.Count; l++)
- {
- DataRow baseRow = baseDt.Rows[l];
- Dictionary<string, object> baseMap = new Dictionary<string, object>();
- string id = baseRow["更新时间"].ToString();
- string time = baseRow["ReadTime"].ToString();
- baseMap["ReadTime"] = time;
- baseMap["BackWashing"] = baseRow["BackWashing"].ToString();
- baseMap["LiquidHeight"] = baseRow["LiquidHeight"].ToString();
- baseMap["Consumption"] = "0";
- baseMap["CleanWaterFlow"] = "0";
- baseMap["Dosage"] = "0";
- factoryMap["BaseData"] = baseMap;
- ArrayList meterDataList = new ArrayList();
-
- for (int j = 0; j < dtMeter.Rows.Count; j++)
- {
- string tableName = dtMeter.Rows[j]["TableName"].ToString();
- string meterId = dtMeter.Rows[j]["MeterId"].ToString();
- string meterCode = dtMeter.Rows[j]["MeterCode"].ToString();
- string colSql = "select * from 水厂数据字段 where MeterId = " + meterId;
- DataTable dtMeterCol = dbHelper.Fill(colSql);
- if (dtMeterCol == null || dtMeterCol.Rows.Count == 0)
- {
- continue;
- }
- string queryCol = "";
- string[] queryColArr = new string[dtMeterCol.Rows.Count];
- for (int k = 0; k < dtMeterCol.Rows.Count; k++)
- {
- string tableColumn = dtMeterCol.Rows[k]["TableColumn"].ToString();
- string sendKey = dtMeterCol.Rows[k]["SendKey"].ToString();
- if (k == dtMeterCol.Rows.Count - 1)
- {
- queryCol += tableColumn + " " + sendKey;
- }
- else
- {
- queryCol += tableColumn + " " + sendKey + ",";
- }
- queryColArr[k] = sendKey;
- }
- Dictionary<string, object> meterMap = new Dictionary<string, object>();
- string[] colArr = waterFactoryColumn.Split(',');
- foreach (string col in colArr)
- {
- meterMap[col] = null;
- }
- string meterDataSql = "select " + queryCol + " from " + tableName + " where 更新时间 = '" + id + "'";
- DataTable data = dbHelper.Fill(meterDataSql);
- if (data == null || data.Rows.Count == 0)
- {
- continue;
- }
- foreach (string col in queryColArr)
- {
- meterMap[col] = data.Rows[0][col];
- }
- meterMap["MeterCode"] = meterCode;
- meterDataList.Add(meterMap);
- }
- factoryMap["MeterData"] = meterDataList;
- string message = JsonConvert.SerializeObject(factoryMap);
- foreach (KeyValuePair<string, IModel> item in channels)
- {
- string key = item.Key;
- IModel channel = item.Value;
- IBasicProperties property = properties[key];
- channel.BasicPublish("waterFactory.deviceHis", "", property, Encoding.UTF8.GetBytes(message));
- }
-
- string deleteSql = "DELETE FROM [dbo].[水厂泵房2] where 更新时间 = '" + id + "' and FactoryId = " + factoryId;
- deleteSql += ";DELETE FROM [dbo].[水厂过滤间GGD] where 更新时间 = '" + id + "' and FactoryId = " + factoryId;
- deleteSql += ";DELETE FROM [dbo].[水厂泵房1] where 更新时间 = '" + id + "' and FactoryId = " + factoryId;
- deleteSql += ";DELETE FROM [dbo].[水厂过滤间操作台] where 更新时间 = '" + id + "' and FactoryId = " + factoryId;
- dbHelper.ExecuteNonQuery(deleteSql);
- }
- }
- log.Info("水厂历史记录同步任务执行结束.................\r\n");
- }
- catch (Exception ex)
- {
- log.Error("水厂历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
- }
- }
- static IDbProvider dbHelper
- {
- get
- {
- var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion);
- return DbDefine;
- }
- }
- }
- }
|