DABusinessDataJob.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. using log4net;
  2. using Quartz;
  3. using RabbitMQ.Client;
  4. using RDIFramework.Utilities;
  5. using System;
  6. using System.Collections.Generic;
  7. using System.Configuration;
  8. using System.Data;
  9. using System.Text;
  10. using TimedUpload.utils;
  11. namespace TimedUpload.QuartzJobs
  12. {
  13. [DisallowConcurrentExecution]
  14. public class DABusinessDataJob : IJob
  15. {
  16. private readonly ILog log = LogManager.GetLogger(typeof(DABusinessDataJob));
  17. private string manufacturerCode = Constants.ManufacturerCode;
  18. private string bmId = Constants.BmId;
  19. private string readDate = Constants.UserMeterDate;
  20. private string userMeterReadDate = Constants.UserMeterReadDate;
  21. public void Execute(IJobExecutionContext context)
  22. {
  23. string[] uploadUrls = Constants.UploadUrl.Split('|');
  24. Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
  25. Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
  26. Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
  27. foreach (string uploadUrl in uploadUrls)
  28. {
  29. ConnectionFactory factory = new ConnectionFactory();
  30. factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
  31. factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
  32. factory.Password = Constants.UploadPassword;//默认密码
  33. factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
  34. IConnection connection = factory.CreateConnection();
  35. IModel channel = connection.CreateModel();
  36. channel.QueueDeclare("zone.userMeter", true, false, false, null);//创建一个名称为kibaqueue的消息队列
  37. channel.QueueDeclare("zone.userMeterHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
  38. IBasicProperties property = channel.CreateBasicProperties();
  39. property.ContentType = "text/plain";
  40. property.DeliveryMode = 2; //持久化
  41. connections.Add(uploadUrl, connection);
  42. properties.Add(uploadUrl, property);
  43. channels.Add(uploadUrl, channel);
  44. }
  45. if (channels.Count > 0)
  46. {
  47. SendZoneMeterUser(channels, properties);
  48. SendZoneMeterUserHis(channels, properties);
  49. }
  50. foreach (KeyValuePair<string, IConnection> item in connections)
  51. {
  52. IConnection connection = item.Value;
  53. connection.Close();
  54. }
  55. }
  56. /// <summary>
  57. /// 户表添加
  58. /// </summary>
  59. /// <param name="channel"></param>
  60. private void SendZoneMeterUser(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
  61. {
  62. string meterId = Constants.UserMeterId;
  63. log.Info("营收户表基础数据同步任务开始执行.................\r\n");
  64. while (true)
  65. {
  66. DataTable dt = null;
  67. try
  68. {
  69. string sql = "SELECT top 100 b.CM_ID,c.RouteCode,RouteName,a.CustomerCode,a.CustomerName,b.ElecAddress,b.DetailedAddress MeterAddress,a.DetailedAddress CustomerAddress";
  70. sql += " FROM BCS_Customer a,BCS_CustomerMeter b, BCS_MeterReadingRoute c WHERE a.Cus_ID = b.Cus_ID and b.Mrr_ID = c.MRR_ID ";
  71. sql += " AND b.CM_ID > " + meterId + " ORDER BY CM_ID";
  72. dt = dbHelper.Fill(sql);
  73. }
  74. catch (Exception ex)
  75. {
  76. log.Info("营收户表基础数据同步查询数据异常" + ex.StackTrace + "\r\n");
  77. }
  78. if (dt == null || dt.Rows.Count == 0)
  79. {
  80. break;
  81. }
  82. log.Info("营收户表基础数据同步获取记录数:【" + dt.Rows.Count + "】................\r\n");
  83. StringBuilder message = new StringBuilder();
  84. for (int i = 0; i < dt.Rows.Count; i++)
  85. {
  86. message.Clear();
  87. try
  88. {
  89. DataRow dr = dt.Rows[i];
  90. meterId = Convert.ToString(dr["CM_ID"]);
  91. string meterLineNo = Convert.ToString(dr["RouteCode"]);
  92. string meterLineName = Convert.ToString(dr["RouteName"]);
  93. string clientNo = Convert.ToString(dr["CustomerCode"]);
  94. string clientName = Convert.ToString(dr["CustomerName"]);
  95. string meterCode = Convert.ToString(dr["CM_ID"]);
  96. string meterAddress = string.IsNullOrEmpty(Convert.ToString(dr["CustomerAddress"])) ? "未知" : Convert.ToString(dr["CustomerAddress"]);
  97. message.Append("{");
  98. message.Append("\"meterLineNo\": \"").Append(meterLineNo).Append("\",");
  99. message.Append("\"meterLineName\": \"").Append(meterLineName).Append("\",");
  100. message.Append("\"clientNo\": \"").Append(clientNo).Append("\",");
  101. message.Append("\"clientName\": \"").Append(clientName).Append("\",");
  102. message.Append("\"meterCode\": \"").Append(meterCode).Append("\",");
  103. message.Append("\"meterAddress\": \"").Append(meterAddress).Append("\",");
  104. message.Append("\"fromWhere\": \"").Append(manufacturerCode).Append("\"");
  105. message.Append("}");
  106. foreach (KeyValuePair<string, IModel> item in channels)
  107. {
  108. string key = item.Key;
  109. IModel channel = item.Value;
  110. IBasicProperties property = properties[key];
  111. channel.BasicPublish("zone.userMeter", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息
  112. }
  113. }
  114. catch (Exception ex)
  115. {
  116. log.Info("营收户表基础数据推送失败:" + message.ToString() + "\r\n");
  117. log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n");
  118. }
  119. }
  120. }
  121. UpdateAppConfig("UserMeterId", meterId);
  122. log.Info("营收户表基础数据同步任务结束执行.................\r\n");
  123. }
  124. /// <summary>
  125. /// 户表历史数据
  126. /// </summary>
  127. /// <param name="channel"></param>
  128. private void SendZoneMeterUserHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
  129. {
  130. string userMeterReadId = Constants.UserMeterReadId;
  131. log.Info("营收户表抄表数据同步任务开始执行.................\r\n");
  132. while (true)
  133. {
  134. DataTable dt = null;
  135. try
  136. {
  137. string sql = "SELECT top 100 UsedWater_ID,CM_ID,ThisMeterNumber,ThisMeterDt,BetweenMeteNumber,b.CreateDT FROM BCS_UsedWater b";
  138. sql += " WHERE b.BM_ID >= " + bmId + " AND b.UsedWater_ID > " + userMeterReadId + " ORDER BY b.BM_ID,b.UsedWater_ID";
  139. dt = dbHelper.Fill(sql);
  140. }
  141. catch (Exception ex)
  142. {
  143. log.Info("营收户表抄表数据同步查询数据异常" + ex.StackTrace + "\r\n");
  144. }
  145. if (dt == null || dt.Rows.Count == 0)
  146. {
  147. break;
  148. }
  149. log.Info("营收户表抄表数据同步获取记录数:【" + dt.Rows.Count + "】................\r\n");
  150. StringBuilder message = new StringBuilder();
  151. for (int i = 0; i < dt.Rows.Count; i++)
  152. {
  153. message.Clear();
  154. try
  155. {
  156. DataRow dr = dt.Rows[i];
  157. userMeterReadId = Convert.ToString(dr["UsedWater_ID"]);
  158. string meterCode = Convert.ToString(dr["CM_ID"]);
  159. string getDateTime = Convert.ToDateTime(dr["ThisMeterDt"]).ToString("yyyy-MM-dd HH:mm:ss");
  160. string currReadingValue = Convert.ToString(dr["ThisMeterNumber"]);
  161. string waterUsed = Convert.ToString(dr["BetweenMeteNumber"]);
  162. message.Append("{");
  163. message.Append("\"meterCode\": \"").Append(meterCode).Append("\",");
  164. message.Append("\"fromWhere\": \"").Append(manufacturerCode).Append("\",");
  165. message.Append("\"saleWater\": \"").Append(waterUsed).Append("\",");
  166. message.Append("\"getDateTime\": \"").Append(getDateTime).Append("\",");
  167. message.Append("\"currReadingValue\": ").Append(currReadingValue);
  168. message.Append("}");
  169. foreach (KeyValuePair<string, IModel> item in channels)
  170. {
  171. string key = item.Key;
  172. IModel channel = item.Value;
  173. IBasicProperties property = properties[key];
  174. channel.BasicPublish("zone.userMeterHis", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息
  175. }
  176. }
  177. catch (Exception ex)
  178. {
  179. log.Info("营收户表抄表数据推送失败:" + message.ToString() + "\r\n");
  180. log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n");
  181. }
  182. }
  183. }
  184. UpdateAppConfig("UserMeterReadId", userMeterReadId);
  185. log.Info("营收户表抄表数据同步任务结束执行.................\r\n");
  186. }
  187. /// <summary>
  188. /// 更新配置文件中的值
  189. /// </summary>
  190. /// <param name="key">键</param>
  191. /// <param name="value">值</param>
  192. private void UpdateAppConfig(String key, String value)
  193. {
  194. var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
  195. cfg.AppSettings.Settings[key].Value = value;
  196. cfg.Save();
  197. ConfigurationManager.RefreshSection("appSettings");
  198. }
  199. static IDbProvider dbHelper
  200. {
  201. get
  202. {
  203. var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.ChargeDB);
  204. return DbDefine;
  205. }
  206. }
  207. }
  208. }