using log4net; using Quartz; using RabbitMQ.Client; using RDIFramework.Utilities; using System; using System.Collections.Generic; using System.Configuration; using System.Data; using System.Linq; using System.Text; namespace TimedUpload.QuartzJobs { [DisallowConcurrentExecution] public class ChangleBusinessDataJob : IJob { private readonly ILog log = LogManager.GetLogger(typeof(DABusinessDataJob)); private string manufacturerCode = Constants.ManufacturerCode; private string bmId = Constants.BmId; private string readDate = Constants.UserMeterDate; private string userMeterReadDate = Constants.UserMeterReadDate; public void Execute(IJobExecutionContext context) { string[] uploadUrls = Constants.UploadUrl.Split('|'); Dictionary connections = new Dictionary(); Dictionary channels = new Dictionary(); Dictionary properties = new Dictionary(); foreach (string uploadUrl in uploadUrls) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。 factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行 factory.Password = Constants.UploadPassword;//默认密码 factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连 IConnection connection = factory.CreateConnection(); IModel channel = connection.CreateModel(); channel.QueueDeclare("zone.userMeter", true, false, false, null);//创建一个名称为kibaqueue的消息队列 channel.QueueDeclare("zone.userMeterHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列 IBasicProperties property = channel.CreateBasicProperties(); property.ContentType = "text/plain"; property.DeliveryMode = 2; //持久化 connections.Add(uploadUrl, connection); properties.Add(uploadUrl, property); channels.Add(uploadUrl, channel); } if (channels.Count > 0) { SendZoneMeterUser(channels, properties); SendZoneMeterUserHis(channels, properties); } foreach (KeyValuePair item in connections) { IConnection connection = item.Value; connection.Close(); } } /// /// 户表添加 /// /// private void SendZoneMeterUser(Dictionary channels, Dictionary properties) { string meterId = Constants.UserMeterId; log.Info("营收户表基础数据同步任务开始执行.................\r\n"); while (true) { DataTable dt = null; try { string sql = "SELECT top 10000 b.CM_ID,c.RouteCode,RouteName,a.CustomerCode,a.CustomerName,b.LifetimeCode ElecAddress,b.DetailedAddress MeterAddress,a.DetailedAddress CustomerAddress"; sql += " FROM BCS_Customer a,BCS_CustomerMeter b, BCS_MeterReadingRoute c WHERE a.Cus_ID = b.Cus_ID and b.Mrr_ID = c.MRR_ID "; sql += " AND b.CM_ID > " + meterId + " ORDER BY CM_ID"; dt = dbHelper.Fill(sql); } catch (Exception ex) { log.Info("营收户表基础数据同步查询数据异常" + ex.StackTrace + "\r\n"); } if (dt == null || dt.Rows.Count == 0) { break; } log.Info("营收户表基础数据同步获取记录数:【" + dt.Rows.Count + "】................\r\n"); StringBuilder message = new StringBuilder(); for (int i = 0; i < dt.Rows.Count; i++) { message.Clear(); try { DataRow dr = dt.Rows[i]; meterId = Convert.ToString(dr["CM_ID"]); string meterLineNo = Convert.ToString(dr["RouteCode"]); string meterLineName = Convert.ToString(dr["RouteName"]); string clientNo = Convert.ToString(dr["CustomerCode"]); string clientName = Convert.ToString(dr["CustomerName"]); string meterCode = Convert.ToString(dr["CM_ID"]); string meterAddress = string.IsNullOrEmpty(Convert.ToString(dr["CustomerAddress"])) ? "未知" : Convert.ToString(dr["CustomerAddress"]); message.Append("{"); message.Append("\"meterLineNo\": \"").Append(meterLineNo).Append("\","); message.Append("\"meterLineName\": \"").Append(meterLineName).Append("\","); message.Append("\"clientNo\": \"").Append(clientNo).Append("\","); message.Append("\"clientName\": \"").Append(clientName).Append("\","); message.Append("\"meterCode\": \"").Append(meterCode).Append("\","); message.Append("\"meterAddress\": \"").Append(meterAddress).Append("\","); message.Append("\"fromWhere\": \"").Append(manufacturerCode).Append("\""); message.Append("}"); foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("zone.userMeter", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息 } } catch (Exception ex) { log.Info("营收户表基础数据推送失败:" + message.ToString() + "\r\n"); log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n"); } } } UpdateAppConfig("UserMeterId", meterId); log.Info("营收户表基础数据同步任务结束执行.................\r\n"); } /// /// 户表历史数据 /// /// private void SendZoneMeterUserHis(Dictionary channels, Dictionary properties) { string userMeterReadId = Constants.UserMeterReadId; log.Info("营收户表抄表数据同步任务开始执行.................\r\n"); while (true) { DataTable dt = null; try { string sql = "SELECT top 100 UsedWater_ID,CM_ID,ThisMeterNumber,ThisMeterDt,BetweenMeteNumber,b.CreateDT FROM BCS_UsedWater b"; sql += " WHERE b.BM_ID >= " + bmId + " AND b.UsedWater_ID > " + userMeterReadId + " ORDER BY b.BM_ID,b.UsedWater_ID"; dt = dbHelper.Fill(sql); } catch (Exception ex) { log.Info("营收户表抄表数据同步查询数据异常" + ex.StackTrace + "\r\n"); } if (dt == null || dt.Rows.Count == 0) { break; } log.Info("营收户表抄表数据同步获取记录数:【" + dt.Rows.Count + "】................\r\n"); StringBuilder message = new StringBuilder(); for (int i = 0; i < dt.Rows.Count; i++) { message.Clear(); try { DataRow dr = dt.Rows[i]; userMeterReadId = Convert.ToString(dr["UsedWater_ID"]); string meterCode = Convert.ToString(dr["CM_ID"]); string getDateTime = Convert.ToDateTime(dr["ThisMeterDt"]).ToString("yyyy-MM-dd HH:mm:ss"); string currReadingValue = Convert.ToString(dr["ThisMeterNumber"]); string waterUsed = Convert.ToString(dr["BetweenMeteNumber"]); message.Append("{"); message.Append("\"meterCode\": \"").Append(meterCode).Append("\","); message.Append("\"fromWhere\": \"").Append(manufacturerCode).Append("\","); message.Append("\"saleWater\": \"").Append(waterUsed).Append("\","); message.Append("\"getDateTime\": \"").Append(getDateTime).Append("\","); message.Append("\"currReadingValue\": ").Append(currReadingValue); message.Append("}"); foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("zone.userMeterHis", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息 } } catch (Exception ex) { log.Info("营收户表抄表数据推送失败:" + message.ToString() + "\r\n"); log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n"); } } } UpdateAppConfig("UserMeterReadId", userMeterReadId); log.Info("营收户表抄表数据同步任务结束执行.................\r\n"); } /// /// 更新配置文件中的值 /// /// 键 /// 值 private void UpdateAppConfig(String key, String value) { var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None); cfg.AppSettings.Settings[key].Value = value; cfg.Save(); ConfigurationManager.RefreshSection("appSettings"); } static IDbProvider dbHelper { get { var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.ChargeDB); return DbDefine; } } } }