using log4net; using Newtonsoft.Json; using Quartz; using RabbitMQ.Client; using RDIFramework.Utilities; using System; using System.Collections.Generic; using System.Data; using System.Text; namespace TimedUpload.QuartzJobs { [DisallowConcurrentExecution] public class WorkmanshipDataUploadJob : IJob { private readonly ILog log = LogManager.GetLogger(typeof(WorkmanshipDataUploadJob)); public void Execute(IJobExecutionContext context) { string[] uploadUrls = Constants.UploadUrl.Split('|'); Dictionary connections = new Dictionary(); Dictionary channels = new Dictionary(); Dictionary properties = new Dictionary(); foreach (string uploadUrl in uploadUrls) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。 factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行 factory.Password = Constants.UploadPassword;//默认密码 factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连 IConnection connection = factory.CreateConnection(); IModel channel = connection.CreateModel(); channel.QueueDeclare("workmanship", true, false, false, null);//创建一个名称为kibaqueue的消息队列 IBasicProperties property = channel.CreateBasicProperties(); property.ContentType = "text/plain"; property.DeliveryMode = 2; //持久化 connections.Add(uploadUrl, connection); properties.Add(uploadUrl, property); channels.Add(uploadUrl, channel); } if (channels.Count > 0) { SendSeconddaryPumpData(channels, properties); SendWaterWellData(channels, properties); SendWaterFactoryData(channels, properties); } foreach (KeyValuePair item in connections) { IConnection connection = item.Value; connection.Close(); } } /// /// 二供数据 /// /// private void SendSeconddaryPumpData(Dictionary channels, Dictionary properties) { try { log.Info("二供工艺图数据同步任务开始执行.................\r\n"); string factorySql = "select 编码 DeviceCode,1 Type,RTRIM(日期) + ' ' + RTRIM(时间) ReadTime,* from (select row_number() over(partition by 编码 order by 日期 desc, 时间 desc) as rid,* from 二供通用泵站 where 编码 is not null) as tt where tt.rid=1"; DataTable dtDevice = dbHelper.Fill(factorySql); if (dtDevice == null || dtDevice.Rows.Count == 0) { return; } DataColumnCollection cols = dtDevice.Columns; // 处理设备列表 for (int i = 0; i < dtDevice.Rows.Count; i++) { DataRow dr = dtDevice.Rows[i]; string deviceCode = dr["DeviceCode"].ToString(); string type = dr["Type"].ToString(); string realData = CreateJsonParameters(cols, dr); Dictionary deviceMap = new Dictionary(); deviceMap["DeviceCode"] = deviceCode; deviceMap["Type"] = type; deviceMap["RealData"] = realData; string message = JsonConvert.SerializeObject(deviceMap); foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("workmanship", "", property, Encoding.UTF8.GetBytes(message)); //生产消息 } } log.Info("二供工艺图数据同步任务开始执行.................\r\n"); } catch (Exception ex) { log.Error("二供工艺图数据同步任务错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n"); } } /// /// 水源井数据 /// /// private void SendWaterWellData(Dictionary channels, Dictionary properties) { try { log.Info("水源井工艺图数据同步任务开始执行.................\r\n"); string factorySql = "select 编码 DeviceCode,2 Type,RTRIM(日期) + ' ' + RTRIM(时间) ReadTime,* from (select row_number() over(partition by 编码 order by 日期 desc, 时间 desc) as rid,* from 水源井数据 where 编码 is not null) as tt where tt.rid=1"; DataTable dtDevice = dbHelper.Fill(factorySql); if (dtDevice == null || dtDevice.Rows.Count == 0) { return; } DataColumnCollection cols = dtDevice.Columns; // 处理设备列表 for (int i = 0; i < dtDevice.Rows.Count; i++) { DataRow dr = dtDevice.Rows[i]; string deviceCode = dr["DeviceCode"].ToString(); string type = dr["Type"].ToString(); string realData = CreateJsonParameters(cols, dr); Dictionary deviceMap = new Dictionary(); deviceMap["DeviceCode"] = deviceCode; deviceMap["Type"] = type; deviceMap["RealData"] = realData; string message = JsonConvert.SerializeObject(deviceMap); foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("workmanship", "", property, Encoding.UTF8.GetBytes(message)); //生产消息 } } log.Info("水源井工艺图数据同步任务结束.................\r\n"); } catch (Exception ex) { log.Error("水源井工艺图数据同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n"); } } /// /// 水厂数据 /// /// private void SendWaterFactoryData(Dictionary channels, Dictionary properties) { try { log.Info("水厂工艺图数据同步任务开始执行.................\r\n"); string factorySql = "SELECT a.FactoryCode DeviceCode,3 Type,RTRIM(b.日期) + ' ' + RTRIM(b.时间) ReadTime,b.*,c.* FROM [水厂信息] a,"; factorySql += "(select * from (select row_number() over(partition by FactoryId order by 日期 desc, 时间 desc) as rid,* from 水厂过滤间GGD where FactoryId is not null) as tt where tt.rid=1) b,"; factorySql += "(select * from (select row_number() over(partition by FactoryId order by 日期 desc, 时间 desc) as rid,* from 水厂泵房2 where FactoryId is not null) as tt where tt.rid=1) c"; factorySql += " where a.FactoryId = b.FactoryId and a.FactoryId = c.FactoryId"; DataTable dtDevice = dbHelper.Fill(factorySql); if (dtDevice == null || dtDevice.Rows.Count == 0) { return; } DataColumnCollection cols = dtDevice.Columns; // 处理设备列表 for (int i = 0; i < dtDevice.Rows.Count; i++) { DataRow dr = dtDevice.Rows[i]; string deviceCode = dr["DeviceCode"].ToString(); string type = dr["Type"].ToString(); string realData = CreateJsonParameters(cols, dr); Dictionary deviceMap = new Dictionary(); deviceMap["DeviceCode"] = deviceCode; deviceMap["Type"] = type; deviceMap["RealData"] = realData; string message = JsonConvert.SerializeObject(deviceMap); foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("workmanship", "", property, Encoding.UTF8.GetBytes(message)); //生产消息 } } log.Info("水厂工艺图数据同步任务执行结束.................\r\n"); } catch (Exception ex) { log.Error("水厂工艺图数据同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n"); } } private static string CreateJsonParameters(DataColumnCollection cols,DataRow dr) { StringBuilder JsonString = new StringBuilder(); JsonString.Append("{"); for (int j = 0; j < cols.Count; j++) { if (j < cols.Count - 1) { JsonString.Append("\"" + cols[j].ColumnName.ToString() + "\":" + "\"" + dr[j].ToString().Trim() + "\","); } else if (j == cols.Count - 1) { JsonString.Append("\"" + cols[j].ColumnName.ToString() + "\":" + "\"" + dr[j].ToString().Trim() + "\""); } } JsonString.Append("}"); return JsonString.ToString(); } static IDbProvider dbHelper { get { var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion); return DbDefine; } } } }