123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- using log4net;
- using Newtonsoft.Json;
- using Quartz;
- using RabbitMQ.Client;
- using RDIFramework.Utilities;
- using System;
- using System.Collections.Generic;
- using System.Data;
- using System.Text;
- using TimedUpload.utils;
- namespace TimedUpload.QuartzJobs
- {
- [DisallowConcurrentExecution]
- public class WaterWellDataUploadJob : IJob
- {
- private readonly ILog log = LogManager.GetLogger(typeof(WaterWellDataUploadJob));
- public void Execute(IJobExecutionContext context)
- {
- string[] uploadUrls = Constants.UploadUrl.Split('|');
- Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
- Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
- Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
- foreach (string uploadUrl in uploadUrls)
- {
- ConnectionFactory factory = new ConnectionFactory();
- factory.HostName = uploadUrl;
- factory.UserName = Constants.UploadUserName;
- factory.Password = Constants.UploadPassword;
- factory.AutomaticRecoveryEnabled = true;
- IConnection connection = factory.CreateConnection();
- IModel channel = connection.CreateModel();
- channel.QueueDeclare("waterWell.deviceHis", true, false, false, null);
- IBasicProperties property = channel.CreateBasicProperties();
- property.ContentType = "text/plain";
- property.DeliveryMode = 2;
- connections.Add(uploadUrl, connection);
- properties.Add(uploadUrl, property);
- channels.Add(uploadUrl, channel);
- }
- if (channels.Count > 0)
- {
- SendWaterWellHis(channels, properties);
- }
- foreach (KeyValuePair<string, IConnection> item in connections)
- {
- IConnection connection = item.Value;
- connection.Close();
- }
- }
-
-
-
-
-
- private void SendWaterWellHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
- {
- try
- {
- log.Info("水源井历史数据同步任务开始执行.................\r\n");
- string factorySql = "SELECT * FROM [dbo].[水源井数据] ORDER BY 日期, 时间 ";
- DataTable dtDevice = dbHelper.Fill(factorySql);
- if (dtDevice == null || dtDevice.Rows.Count == 0)
- {
- return;
- }
-
- for (int i = 0; i < dtDevice.Rows.Count; i++)
- {
- DataRow dr = dtDevice.Rows[i];
- string id = dr["更新时间"].ToString();
- string deviceCode = dr["编码"].ToString();
- string InstantFlow = dr["井瞬时流量"].ToString();
- string TotalFlow = dr["井累计流量"].ToString();
- string VoltageA = dr["井电压A相"].ToString();
- string VoltageB = dr["井电压B相"].ToString();
- string VoltageC = dr["井电压C相"].ToString();
- string CurrentA = dr["井电流A相"].ToString();
- string CurrentB = dr["井电流B相"].ToString();
- string CurrentC = dr["井电流C相"].ToString();
- string Consumption = dr["井电能"].ToString();
- string Power = dr["井功率"].ToString();
- string IsAuto = dr["井控制模式"].ToString();
- string RunState = dr["井运行状态"].ToString();
- if (!CommonUtil.IsNumber(IsAuto))
- {
- if ("TRUE".Equals(IsAuto.ToUpper()))
- {
- IsAuto = "1";
- }
- else
- {
- IsAuto = "0";
- }
- }
- if (!CommonUtil.IsNumber(RunState))
- {
- if ("TRUE".Equals(RunState.ToUpper()))
- {
- RunState = "1";
- }
- else
- {
- RunState = "0";
- }
- }
- string readTime = dr["日期"].ToString().Trim() + " " + dr["时间"].ToString().Trim();
- Dictionary<string, object> deviceMap = new Dictionary<string, object>();
- deviceMap["DeviceCode"] = deviceCode;
- deviceMap["ReadTime"] = readTime;
- deviceMap["InstantFlow"] = InstantFlow;
- deviceMap["TotalFlow"] = TotalFlow;
- deviceMap["VoltageA"] = VoltageA;
- deviceMap["VoltageB"] = VoltageB;
- deviceMap["VoltageC"] = VoltageC;
- deviceMap["CurrentA"] = CurrentA;
- deviceMap["CurrentB"] = CurrentB;
- deviceMap["CurrentC"] = CurrentC;
- deviceMap["Consumption"] = Consumption;
- deviceMap["IsAuto"] = IsAuto;
- deviceMap["RunState"] = RunState;
- deviceMap["Power"] = Power;
- string message = JsonConvert.SerializeObject(deviceMap);
- foreach (KeyValuePair<string, IModel> item in channels)
- {
- string key = item.Key;
- IModel channel = item.Value;
- IBasicProperties property = properties[key];
- channel.BasicPublish("waterWell.deviceHis", "", property, Encoding.UTF8.GetBytes(message));
- }
-
- string deleteSql = "DELETE FROM [dbo].[水源井数据] where 更新时间 = '" + id + "' and 编码 = '" + deviceCode + "'";
- dbHelper.ExecuteNonQuery(deleteSql);
- }
- log.Info("水源井历史记录同步任务执行结束.................\r\n");
- }
- catch (Exception ex)
- {
- log.Error("水源井历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
- }
- }
- static IDbProvider dbHelper
- {
- get
- {
- var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion);
- return DbDefine;
- }
- }
- }
- }
|