123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- using log4net;
- using Quartz;
- using RabbitMQ.Client;
- using RDIFramework.Utilities;
- using System;
- using System.Collections.Generic;
- using System.Configuration;
- using System.Data;
- using System.Linq;
- using System.Text;
- namespace TimedUpload.QuartzJobs
- {
- [DisallowConcurrentExecution]
- public class ChangleBusinessDataJob : IJob
- {
- private readonly ILog log = LogManager.GetLogger(typeof(DABusinessDataJob));
- private string manufacturerCode = Constants.ManufacturerCode;
- private string bmId = Constants.BmId;
- private string readDate = Constants.UserMeterDate;
- private string userMeterReadDate = Constants.UserMeterReadDate;
- public void Execute(IJobExecutionContext context)
- {
- string[] uploadUrls = Constants.UploadUrl.Split('|');
- Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
- Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
- Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
- foreach (string uploadUrl in uploadUrls)
- {
- ConnectionFactory factory = new ConnectionFactory();
- factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
- factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
- factory.Password = Constants.UploadPassword;//默认密码
- factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
- IConnection connection = factory.CreateConnection();
- IModel channel = connection.CreateModel();
- channel.QueueDeclare("zone.userMeter", true, false, false, null);//创建一个名称为kibaqueue的消息队列
- channel.QueueDeclare("zone.userMeterHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
- IBasicProperties property = channel.CreateBasicProperties();
- property.ContentType = "text/plain";
- property.DeliveryMode = 2; //持久化
- connections.Add(uploadUrl, connection);
- properties.Add(uploadUrl, property);
- channels.Add(uploadUrl, channel);
- }
- if (channels.Count > 0)
- {
- SendZoneMeterUser(channels, properties);
- SendZoneMeterUserHis(channels, properties);
- }
- foreach (KeyValuePair<string, IConnection> item in connections)
- {
- IConnection connection = item.Value;
- connection.Close();
- }
- }
- /// <summary>
- /// 户表添加
- /// </summary>
- /// <param name="channel"></param>
- private void SendZoneMeterUser(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
- {
- string meterId = Constants.UserMeterId;
- log.Info("营收户表基础数据同步任务开始执行.................\r\n");
- while (true)
- {
- DataTable dt = null;
- try
- {
- string sql = "SELECT top 100 b.CM_ID,c.RouteCode,RouteName,a.CustomerCode,a.CustomerName,b.LifetimeCode ElecAddress,b.DetailedAddress MeterAddress,a.DetailedAddress CustomerAddress";
- sql += " FROM BCS_Customer a,BCS_CustomerMeter b, BCS_MeterReadingRoute c WHERE a.Cus_ID = b.Cus_ID and b.Mrr_ID = c.MRR_ID ";
- sql += " AND b.CM_ID > " + meterId + " ORDER BY CM_ID";
- dt = dbHelper.Fill(sql);
- }
- catch (Exception ex)
- {
- log.Info("营收户表基础数据同步查询数据异常" + ex.StackTrace + "\r\n");
- }
- if (dt == null || dt.Rows.Count == 0)
- {
- break;
- }
- log.Info("营收户表基础数据同步获取记录数:【" + dt.Rows.Count + "】................\r\n");
- StringBuilder message = new StringBuilder();
- for (int i = 0; i < dt.Rows.Count; i++)
- {
- message.Clear();
- try
- {
- DataRow dr = dt.Rows[i];
- meterId = Convert.ToString(dr["CM_ID"]);
- string meterLineNo = Convert.ToString(dr["RouteCode"]);
- string meterLineName = Convert.ToString(dr["RouteName"]);
- string clientNo = Convert.ToString(dr["CustomerCode"]);
- string clientName = Convert.ToString(dr["CustomerName"]);
- string meterCode = Convert.ToString(dr["CM_ID"]);
- string meterAddress = string.IsNullOrEmpty(Convert.ToString(dr["CustomerAddress"])) ? "未知" : Convert.ToString(dr["CustomerAddress"]);
- message.Append("{");
- message.Append("\"meterLineNo\": \"").Append(meterLineNo).Append("\",");
- message.Append("\"meterLineName\": \"").Append(meterLineName).Append("\",");
- message.Append("\"clientNo\": \"").Append(clientNo).Append("\",");
- message.Append("\"clientName\": \"").Append(clientName).Append("\",");
- message.Append("\"meterCode\": \"").Append(meterCode).Append("\",");
- message.Append("\"meterAddress\": \"").Append(meterAddress).Append("\",");
- message.Append("\"fromWhere\": \"").Append(manufacturerCode).Append("\"");
- message.Append("}");
- foreach (KeyValuePair<string, IModel> item in channels)
- {
- string key = item.Key;
- IModel channel = item.Value;
- IBasicProperties property = properties[key];
- channel.BasicPublish("zone.userMeter", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息
- }
- }
- catch (Exception ex)
- {
- log.Info("营收户表基础数据推送失败:" + message.ToString() + "\r\n");
- log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n");
- }
- }
- }
- UpdateAppConfig("UserMeterId", meterId);
- log.Info("营收户表基础数据同步任务结束执行.................\r\n");
- }
- /// <summary>
- /// 户表历史数据
- /// </summary>
- /// <param name="channel"></param>
- private void SendZoneMeterUserHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
- {
- string userMeterReadId = Constants.UserMeterReadId;
- log.Info("营收户表抄表数据同步任务开始执行.................\r\n");
- while (true)
- {
- DataTable dt = null;
- try
- {
- string sql = "SELECT top 100 UsedWater_ID,CM_ID,ThisMeterNumber,ThisMeterDt,BetweenMeteNumber,b.CreateDT FROM BCS_UsedWater b";
- sql += " WHERE b.BM_ID >= " + bmId + " AND b.UsedWater_ID > " + userMeterReadId + " ORDER BY b.BM_ID,b.UsedWater_ID";
- dt = dbHelper.Fill(sql);
- }
- catch (Exception ex)
- {
- log.Info("营收户表抄表数据同步查询数据异常" + ex.StackTrace + "\r\n");
- }
- if (dt == null || dt.Rows.Count == 0)
- {
- break;
- }
- log.Info("营收户表抄表数据同步获取记录数:【" + dt.Rows.Count + "】................\r\n");
- StringBuilder message = new StringBuilder();
- for (int i = 0; i < dt.Rows.Count; i++)
- {
- message.Clear();
- try
- {
- DataRow dr = dt.Rows[i];
- userMeterReadId = Convert.ToString(dr["UsedWater_ID"]);
- string meterCode = Convert.ToString(dr["CM_ID"]);
- string getDateTime = Convert.ToDateTime(dr["ThisMeterDt"]).ToString("yyyy-MM-dd HH:mm:ss");
- string currReadingValue = Convert.ToString(dr["ThisMeterNumber"]);
- string waterUsed = Convert.ToString(dr["BetweenMeteNumber"]);
- message.Append("{");
- message.Append("\"meterCode\": \"").Append(meterCode).Append("\",");
- message.Append("\"fromWhere\": \"").Append(manufacturerCode).Append("\",");
- message.Append("\"saleWater\": \"").Append(waterUsed).Append("\",");
- message.Append("\"getDateTime\": \"").Append(getDateTime).Append("\",");
- message.Append("\"currReadingValue\": ").Append(currReadingValue);
- message.Append("}");
- foreach (KeyValuePair<string, IModel> item in channels)
- {
- string key = item.Key;
- IModel channel = item.Value;
- IBasicProperties property = properties[key];
- channel.BasicPublish("zone.userMeterHis", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息
- }
- }
- catch (Exception ex)
- {
- log.Info("营收户表抄表数据推送失败:" + message.ToString() + "\r\n");
- log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n");
- }
- }
- }
- UpdateAppConfig("UserMeterReadId", userMeterReadId);
- log.Info("营收户表抄表数据同步任务结束执行.................\r\n");
- }
- /// <summary>
- /// 更新配置文件中的值
- /// </summary>
- /// <param name="key">键</param>
- /// <param name="value">值</param>
- private void UpdateAppConfig(String key, String value)
- {
- var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
- cfg.AppSettings.Settings[key].Value = value;
- cfg.Save();
- ConfigurationManager.RefreshSection("appSettings");
- }
- static IDbProvider dbHelper
- {
- get
- {
- var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.ChargeDB);
- return DbDefine;
- }
- }
- }
- }
|