| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 | 
							- using log4net;
 
- using Newtonsoft.Json;
 
- using Quartz;
 
- using RabbitMQ.Client;
 
- using RDIFramework.Utilities;
 
- using System;
 
- using System.Collections;
 
- using System.Collections.Generic;
 
- using System.Data;
 
- using System.Linq;
 
- using System.Text;
 
- namespace TimedUpload.QuartzJobs
 
- {
 
-     [DisallowConcurrentExecution]
 
-     public class ChangleWaterFactoryDataJob : IJob
 
-     {
 
-         private readonly ILog log = LogManager.GetLogger(typeof(ChangleWaterFactoryDataJob));
 
-         public void Execute(IJobExecutionContext context)
 
-         {
 
-             string[] uploadUrls = Constants.UploadUrl.Split('|');
 
-             Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
 
-             Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
 
-             Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
 
-             foreach (string uploadUrl in uploadUrls)
 
-             {
 
-                 ConnectionFactory factory = new ConnectionFactory();
 
-                 factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
 
-                 factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
 
-                 factory.Password = Constants.UploadPassword;//默认密码
 
-                 factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
 
-                 IConnection connection = factory.CreateConnection();
 
-                 IModel channel = connection.CreateModel();
 
-                 channel.QueueDeclare("waterFactory.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
 
-                 IBasicProperties property = channel.CreateBasicProperties();
 
-                 property.ContentType = "text/plain";
 
-                 property.DeliveryMode = 2; //持久化
 
-                 connections.Add(uploadUrl, connection);
 
-                 properties.Add(uploadUrl, property);
 
-                 channels.Add(uploadUrl, channel);
 
-             }
 
-             if (channels.Count > 0)
 
-             {
 
-                 SendWaterFactoryHis(channels, properties);
 
-             }
 
-             foreach (KeyValuePair<string, IConnection> item in connections)
 
-             {
 
-                 IConnection connection = item.Value;
 
-                 connection.Close();
 
-             }
 
-         }
 
-         /// <summary>
 
-         /// 水厂历史数据
 
-         /// </summary>
 
-         /// <param name="channels"></param>
 
-         /// <param name="properties"></param>
 
-         private void SendWaterFactoryHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
 
-         {
 
-             try
 
-             {
 
-                 Object inPh= null, inChlorine = null, inTurbidty = null, inTurbidtytwo = null, outPh = null, outChlorine = null, outTurbidty = null, level = null;
 
-                 DateTime nowTime = DateTime.Now;
 
-                 log.Info("昌乐水厂数据同步任务开始执行.................\r\n");
 
-                 string factorySql = "SELECT [id],[inMeterId],[outMeterId],[uploadTime],[waterFactory]  FROM [dbo].[uploadList]";
 
-                 DataTable dtFactory = waterFHelper.Fill(factorySql);
 
-                 if (dtFactory == null || dtFactory.Rows.Count == 0)
 
-                 {
 
-                     return;
 
-                 }
 
-                 // 处理水厂水表
 
-                 for (int i = 0; i < dtFactory.Rows.Count; i++)
 
-                 {
 
-                     string factoryCode = dtFactory.Rows[i]["waterFactory"].ToString();
 
-                     // DataTable dtMeter = dbHelper.Fill(meterSql);
 
-                     //if (dtMeter == null || dtMeter.Rows.Count == 0)
 
-                     //{
 
-                     //    continue;
 
-                     //}
 
-                     Dictionary<string, object> factoryMap = new Dictionary<string, object>();
 
-                     factoryMap["FactoryCode"] = factoryCode;
 
-                     // 处理水厂基础数据
 
-                     // string baseSql = "SELECT RTRIM(日期) + ' ' +  RTRIM(时间) ReadTime,反冲正累计流量 - 反冲负累计流量 BackWashing,[清水池液位] LiquidHeight,更新时间 FROM [dbo].[水厂泵房2] WHERE FactoryId = " + factoryId + " ORDER BY 日期, 时间 ";
 
-                     // DataTable baseDt = dbHelper.Fill(baseSql);
 
-                     DateTime searchTime = Convert.ToDateTime(dtFactory.Rows[i]["uploadTime"]);
 
-                     TimeSpan sTs = new TimeSpan(searchTime.Ticks);
 
-                     TimeSpan eTs = new TimeSpan(nowTime.Ticks);
 
-                     TimeSpan ts = eTs - sTs;
 
-                     int num = (int)ts.TotalMinutes / 5;
 
-                     if (num > 200) {
 
-                         num = 200;
 
-                     }
 
-                     for (int l = 0; l < num; l++)
 
-                     {
 
-                         log.Info(l);
 
-                         // DataRow baseRow = baseDt.Rows[l];
 
-                         Dictionary<string, object> baseMap = new Dictionary<string, object>();
 
-                         ArrayList meterDataList = new ArrayList();
 
-                         searchTime = searchTime.AddMinutes(5);
 
-                         string time = searchTime.ToString("yyyy/MM/dd HH:mm:ss");// baseRow["ReadTime"].ToString();
 
-                         string year = searchTime.Year.ToString();
 
-                         baseMap["ReadTime"] = time;
 
-                         baseMap["BackWashing"] = "0";//baseRow["BackWashing"].ToString();
 
-                         baseMap["Consumption"] = "0";
 
-                         baseMap["CleanWaterFlow"] = "0";
 
-                         baseMap["Dosage"] = "0";
 
-                         // 进水
 
-                         string insql = @"SELECT TOP 1 [id],[设备ID],[记录时间],[采集时间],[设备状态],[通讯状态],[净累计流量],[瞬时流量] FROM [dbo].[历史记录_000513_" + year + "] where 采集时间 >= '" + time + "'";
 
-                         DataTable inDt = dbHelper.Fill(insql);
 
-                         if (inDt != null && inDt.Rows.Count > 0)
 
-                         {
 
-                             Dictionary<string, object> meterMap = new Dictionary<string, object>();
 
-                             meterMap["InstantaneousFlow"] = inDt.Rows[0]["瞬时流量"];
 
-                             meterMap["NetCumulativeFlow"] = inDt.Rows[0]["净累计流量"];
 
-                             meterMap["Pressure"] = null;
 
-                             meterMap["PH"] = null;
 
-                             meterMap["Chlorine"] = null;
 
-                             meterMap["Turbidity"] = null;
 
-                             meterMap["TurbidityTwo"] = null;
 
-                             meterMap["ReadTime"] = time;
 
-                             meterMap["MeterCode"] = "wwkj001";
 
-                             meterDataList.Add(meterMap);
 
-                         }
 
-                         else
 
-                         {
 
-                             break;
 
-                             Dictionary<string, object> meterMap = new Dictionary<string, object>();
 
-                             meterMap["InstantaneousFlow"] = null;
 
-                             meterMap["NetCumulativeFlow"] = null;
 
-                             meterMap["Pressure"] = null;
 
-                             meterMap["PH"] = null;
 
-                             meterMap["Chlorine"] = null;
 
-                             meterMap["Turbidity"] = null;
 
-                             meterMap["TurbidityTwo"] = null;
 
-                             meterMap["ReadTime"] = time;
 
-                             meterMap["MeterCode"] = "wwkj001";
 
-                             meterDataList.Add(meterMap);
 
-                         }
 
-                         // 出水
 
-                         string outsql = @"SELECT TOP 1 [id],[设备ID],[记录时间],[采集时间],[设备状态],[通讯状态],[净累计流量],[瞬时流量] FROM [dbo].[历史记录_000286_" + year + "] where 采集时间 >= '" + time + "'"; ;
 
-                         DataTable outDt = dbHelper.Fill(outsql);
 
-                         if (outDt != null && outDt.Rows.Count > 0)
 
-                         {
 
-                             Dictionary<string, object> meterMap = new Dictionary<string, object>();
 
-                             meterMap["InstantaneousFlow"] = outDt.Rows[0]["瞬时流量"];
 
-                             meterMap["NetCumulativeFlow"] = outDt.Rows[0]["净累计流量"];
 
-                             meterMap["Pressure"] = null;
 
-                             meterMap["PH"] = null;
 
-                             meterMap["Chlorine"] = null;
 
-                             meterMap["Turbidity"] = null;
 
-                             meterMap["TurbidityTwo"] = null;
 
-                             meterMap["ReadTime"] = time;
 
-                             meterMap["MeterCode"] = "wwkj005";
 
-                             meterDataList.Add(meterMap);
 
-                         }
 
-                         else
 
-                         {
 
-                             break;
 
-                             Dictionary<string, object> meterMap = new Dictionary<string, object>();
 
-                             meterMap["InstantaneousFlow"] = null;
 
-                             meterMap["NetCumulativeFlow"] = null;
 
-                             meterMap["Pressure"] = null;
 
-                             meterMap["PH"] = null;
 
-                             meterMap["Chlorine"] = null;
 
-                             meterMap["Turbidity"] = null;
 
-                             meterMap["TurbidityTwo"] = null;
 
-                             meterMap["ReadTime"] = time;
 
-                             meterMap["MeterCode"] = "wwkj005";
 
-                             meterDataList.Add(meterMap);
 
-                         }
 
-                         // 进水水质和出水水质
 
-                         string waterQuelitySql = @"SELECT TOP 1 [id]
 
-                                             ,[originWaterZD],[originWaterPH],[chendianchiOutWaterZD],[lvhouZD]
 
-                                             ,[lvhouYulv],[qingshuichiFlow],[gongshuishuitaOutFlow]
 
-                                             ,[outWaterZD],[outWaterPH],[outWaterYulv]
 
-                                             ,[qingshuichiFluidLevel],[CollectTime],[CreateDt]
 
-                                               FROM[dbo].[WaterFactoryDaliyRecord] where CreateDt >= '" + searchTime.AddMinutes(-1).ToString("yyyy-MM-dd HH:mm") + "' ";
 
-                         DataTable wqDt = waterFHelper.Fill(waterQuelitySql);
 
-                         if (wqDt != null && wqDt.Rows.Count > 0)
 
-                         {
 
-                             inPh = wqDt.Rows[0]["originWaterPH"];
 
-                             inChlorine = wqDt.Rows[0]["lvhouYulv"];
 
-                             inTurbidty = wqDt.Rows[0]["originWaterZD"];
 
-                             inTurbidtytwo = wqDt.Rows[0]["lvhouZD"];
 
-                             outPh = wqDt.Rows[0]["outWaterPH"];
 
-                             outChlorine = wqDt.Rows[0]["outWaterYulv"];
 
-                             outTurbidty = wqDt.Rows[0]["outWaterZD"];
 
-                             level = wqDt.Rows[0]["qingshuichiFluidLevel"];
 
-                         }
 
-                         Dictionary<string, object> meterMap1 = new Dictionary<string, object>(); // 进水
 
-                         meterMap1["InstantaneousFlow"] = null;
 
-                         meterMap1["NetCumulativeFlow"] = null;
 
-                         meterMap1["Pressure"] = null;
 
-                         meterMap1["PH"] = inPh;
 
-                         meterMap1["Chlorine"] = inChlorine;
 
-                         meterMap1["Turbidity"] = inTurbidty;
 
-                         meterMap1["TurbidityTwo"] = inTurbidtytwo;
 
-                         meterMap1["ReadTime"] = time;
 
-                         meterMap1["MeterCode"] = "wwkj008";
 
-                         meterDataList.Add(meterMap1);
 
-                         Dictionary<string, object> meterMap2 = new Dictionary<string, object>(); // 出水
 
-                         meterMap2["InstantaneousFlow"] = null;
 
-                         meterMap2["NetCumulativeFlow"] = null;
 
-                         meterMap2["PH"] = outPh;
 
-                         meterMap2["Chlorine"] = outChlorine;
 
-                         meterMap2["Turbidity"] = outTurbidty;
 
-                         meterMap2["TurbidityTwo"] = null;
 
-                         meterMap2["Pressure"] = null;
 
-                         meterMap2["ReadTime"] = time;
 
-                         meterMap2["MeterCode"] = "wwkj004";
 
-                         meterDataList.Add(meterMap2);
 
-                         baseMap["LiquidHeight"] = level;//baseRow["LiquidHeight"].ToString();
 
-                         #region 
 
-                         // 处理水厂查询的数据
 
-                         //for (int j = 0; j < dtMeter.Rows.Count; j++)
 
-                         //{
 
-                         //    string tableName = dtMeter.Rows[j]["TableName"].ToString();
 
-                         //    string meterId = dtMeter.Rows[j]["MeterId"].ToString();
 
-                         //    string meterCode = dtMeter.Rows[j]["MeterCode"].ToString();
 
-                         //    string colSql = "select * from 水厂数据字段 where MeterId = " + meterId;
 
-                         //    DataTable dtMeterCol = dbHelper.Fill(colSql);
 
-                         //    if (dtMeterCol == null || dtMeterCol.Rows.Count == 0)
 
-                         //    {
 
-                         //        continue;
 
-                         //    }
 
-                         //    string queryCol = "";
 
-                         //    string[] queryColArr = new string[dtMeterCol.Rows.Count];
 
-                         //    for (int k = 0; k < dtMeterCol.Rows.Count; k++)
 
-                         //    {
 
-                         //        string tableColumn = dtMeterCol.Rows[k]["TableColumn"].ToString();
 
-                         //        string sendKey = dtMeterCol.Rows[k]["SendKey"].ToString();
 
-                         //        if (k == dtMeterCol.Rows.Count - 1)
 
-                         //        {
 
-                         //            queryCol += tableColumn + " " + sendKey;
 
-                         //        }
 
-                         //        else
 
-                         //        {
 
-                         //            queryCol += tableColumn + " " + sendKey + ",";
 
-                         //        }
 
-                         //        queryColArr[k] = sendKey;
 
-                         //    }
 
-                         //    Dictionary<string, object> meterMap = new Dictionary<string, object>();
 
-                         //    string[] colArr = waterFactoryColumn.Split(',');
 
-                         //    foreach (string col in colArr)
 
-                         //    {
 
-                         //        meterMap[col] = null;
 
-                         //    }
 
-                         //    string meterDataSql = "select " + queryCol + " from " + tableName + " where 更新时间 = '" + id + "'";
 
-                         //    DataTable data = dbHelper.Fill(meterDataSql);
 
-                         //    if (data == null || data.Rows.Count == 0)
 
-                         //    {
 
-                         //        continue;
 
-                         //    }
 
-                         //    foreach (string col in queryColArr)
 
-                         //    {
 
-                         //        meterMap[col] = data.Rows[0][col];
 
-                         //    }
 
-                         //    meterMap["MeterCode"] = meterCode;
 
-                         //    meterDataList.Add(meterMap);
 
-                         //}
 
-                         #endregion
 
-                         factoryMap["BaseData"] = baseMap;
 
-                         factoryMap["MeterData"] = meterDataList;
 
-                         string message = JsonConvert.SerializeObject(factoryMap);
 
-                         log.Info(message);
 
-                         foreach (KeyValuePair<string, IModel> item in channels)
 
-                         {
 
-                             string key = item.Key;
 
-                             IModel channel = item.Value;
 
-                             IBasicProperties property = properties[key];
 
-                             channel.BasicPublish("waterFactory.deviceHis", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
 
-                         }
 
-                         // 删除数据
 
-                         String updatesql = "UPDATE [dbo].[uploadList] SET uploadTime = '" + time + "' WHERE [id] = " + dtFactory.Rows[i]["id"].ToString();
 
-                        
 
-                         waterFHelper.ExecuteNonQuery(updatesql);
 
-                     }
 
-                 }
 
-                 log.Info("水厂历史记录同步任务执行结束.................\r\n");
 
-             }
 
-             catch (Exception ex)
 
-             {
 
-                 log.Error("水厂历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
 
-             }
 
-         }
 
-         /// <summary>
 
-         /// 大表数据
 
-         /// </summary>
 
-         static IDbProvider dbHelper
 
-         {
 
-             get
 
-             {
 
-                 var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.DbConncetion);
 
-                 return DbDefine;
 
-             }
 
-         }
 
-         /// <summary>
 
-         /// 水厂数据库
 
-         /// </summary>
 
-         static IDbProvider waterFHelper
 
-         {
 
-             get
 
-             {
 
-                 var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.changleWaterFactoryDb);
 
-                 return DbDefine;
 
-             }
 
-         }
 
-     }
 
- }
 
 
  |