using log4net; using Newtonsoft.Json; using Quartz; using RabbitMQ.Client; using RDIFramework.Utilities; using System; using System.Collections.Generic; using System.Data; using System.Text; using TimedUpload.utils; namespace TimedUpload.QuartzJobs { [DisallowConcurrentExecution] public class WaterWellDataUploadJob : IJob { private readonly ILog log = LogManager.GetLogger(typeof(WaterWellDataUploadJob)); public void Execute(IJobExecutionContext context) { string[] uploadUrls = Constants.UploadUrl.Split('|'); Dictionary connections = new Dictionary(); Dictionary channels = new Dictionary(); Dictionary properties = new Dictionary(); foreach (string uploadUrl in uploadUrls) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。 factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行 factory.Password = Constants.UploadPassword;//默认密码 factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连 IConnection connection = factory.CreateConnection(); IModel channel = connection.CreateModel(); channel.QueueDeclare("waterWell.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列 IBasicProperties property = channel.CreateBasicProperties(); property.ContentType = "text/plain"; property.DeliveryMode = 2; //持久化 connections.Add(uploadUrl, connection); properties.Add(uploadUrl, property); channels.Add(uploadUrl, channel); } if (channels.Count > 0) { SendWaterWellHis(channels, properties); } foreach (KeyValuePair item in connections) { IConnection connection = item.Value; connection.Close(); } } /// /// 水源井历史数据 /// /// /// private void SendWaterWellHis(Dictionary channels, Dictionary properties) { try { log.Info("水源井历史数据同步任务开始执行.................\r\n"); string factorySql = "SELECT * FROM [dbo].[水源井数据] ORDER BY 日期, 时间 "; DataTable dtDevice = dbHelper.Fill(factorySql); if (dtDevice == null || dtDevice.Rows.Count == 0) { return; } // 处理设备列表 for (int i = 0; i < dtDevice.Rows.Count; i++) { DataRow dr = dtDevice.Rows[i]; string id = dr["更新时间"].ToString(); string deviceCode = dr["编码"].ToString(); string InstantFlow = dr["井瞬时流量"].ToString(); string TotalFlow = dr["井累计流量"].ToString(); string VoltageA = dr["井电压A相"].ToString(); string VoltageB = dr["井电压B相"].ToString(); string VoltageC = dr["井电压C相"].ToString(); string CurrentA = dr["井电流A相"].ToString(); string CurrentB = dr["井电流B相"].ToString(); string CurrentC = dr["井电流C相"].ToString(); string Consumption = dr["井电能"].ToString(); string Power = dr["井功率"].ToString(); string IsAuto = dr["井控制模式"].ToString(); string RunState = dr["井运行状态"].ToString(); if (!CommonUtil.IsNumber(IsAuto)) { if ("TRUE".Equals(IsAuto.ToUpper())) { IsAuto = "1"; } else { IsAuto = "0"; } } if (!CommonUtil.IsNumber(RunState)) { if ("TRUE".Equals(RunState.ToUpper())) { RunState = "1"; } else { RunState = "0"; } } string readTime = dr["日期"].ToString().Trim() + " " + dr["时间"].ToString().Trim(); Dictionary deviceMap = new Dictionary(); deviceMap["DeviceCode"] = deviceCode; deviceMap["ReadTime"] = readTime; deviceMap["InstantFlow"] = InstantFlow; deviceMap["TotalFlow"] = TotalFlow; deviceMap["VoltageA"] = VoltageA; deviceMap["VoltageB"] = VoltageB; deviceMap["VoltageC"] = VoltageC; deviceMap["CurrentA"] = CurrentA; deviceMap["CurrentB"] = CurrentB; deviceMap["CurrentC"] = CurrentC; deviceMap["Consumption"] = Consumption; deviceMap["IsAuto"] = IsAuto; deviceMap["RunState"] = RunState; deviceMap["Power"] = Power; string message = JsonConvert.SerializeObject(deviceMap); foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("waterWell.deviceHis", "", property, Encoding.UTF8.GetBytes(message)); //生产消息 } // 删除数据 string deleteSql = "DELETE FROM [dbo].[水源井数据] where 更新时间 = '" + id + "' and 编码 = '" + deviceCode + "'"; dbHelper.ExecuteNonQuery(deleteSql); } log.Info("水源井历史记录同步任务执行结束.................\r\n"); } catch (Exception ex) { log.Error("水源井历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n"); } } static IDbProvider dbHelper { get { var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion); return DbDefine; } } } }