ChangleWaterQualityDataJob.cs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. using log4net;
  2. using Quartz;
  3. using RabbitMQ.Client;
  4. using RDIFramework.Utilities;
  5. using System;
  6. using System.Collections.Generic;
  7. using System.Configuration;
  8. using System.Data;
  9. using System.Linq;
  10. using System.Text;
  11. namespace TimedUpload.QuartzJobs
  12. {
  13. [DisallowConcurrentExecution]
  14. public class ChangleWaterQualityDataJob : IJob
  15. {
  16. private readonly ILog log = LogManager.GetLogger(typeof(ChangleWaterQualityDataJob));
  17. public void Execute(IJobExecutionContext context)
  18. {
  19. string[] uploadUrls = Constants.UploadUrl.Split('|');
  20. Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
  21. Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
  22. Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
  23. foreach (string uploadUrl in uploadUrls)
  24. {
  25. ConnectionFactory factory = new ConnectionFactory();
  26. factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
  27. factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
  28. factory.Password = Constants.UploadPassword;//默认密码
  29. factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
  30. IConnection connection = factory.CreateConnection();
  31. IModel channel = connection.CreateModel();
  32. channel.QueueDeclare("zone.device", true, false, false, null);//创建一个名称为kibaqueue的消息队列
  33. channel.QueueDeclare("zone.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
  34. IBasicProperties property = channel.CreateBasicProperties();
  35. property.ContentType = "text/plain";
  36. property.DeliveryMode = 2; //持久化
  37. connections.Add(uploadUrl, connection);
  38. properties.Add(uploadUrl, property);
  39. channels.Add(uploadUrl, channel);
  40. }
  41. if (channels.Count > 0)
  42. {
  43. SendQualityMeterUser(channels, properties);
  44. SendQualityMeterUserHis(channels, properties);
  45. }
  46. foreach (KeyValuePair<string, IConnection> item in connections)
  47. {
  48. IConnection connection = item.Value;
  49. connection.Close();
  50. }
  51. }
  52. /// <summary>
  53. /// 水质设备添加
  54. /// </summary>
  55. /// <param name="channel"></param>
  56. private void SendQualityMeterUser(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
  57. {
  58. string changLeWaterQualityId = Constants.changLeWaterQualityId;
  59. log.Info("水质设备基础数据同步任务开始执行.................\r\n");
  60. while (true)
  61. {
  62. DataTable dt = null;
  63. try
  64. {
  65. string sql = "SELECT * FROM [设备信息] WHERE ID > " + changLeWaterQualityId + " ORDER BY ID";
  66. dt = dbHelper.Fill(sql);
  67. }
  68. catch (Exception ex)
  69. {
  70. log.Info("水质设备基础数据同步查询数据异常" + ex.StackTrace + "\r\n");
  71. }
  72. if (dt == null || dt.Rows.Count == 0)
  73. {
  74. break;
  75. }
  76. log.Info("水质设备基础数据同步获取记录数:【" + dt.Rows.Count + "】................\r\n");
  77. StringBuilder message = new StringBuilder();
  78. for (int i = 0; i < dt.Rows.Count; i++)
  79. {
  80. message.Clear();
  81. try
  82. {
  83. DataRow dr = dt.Rows[i];
  84. String iccid = "";
  85. if (!"".Equals(dr["DeviceSN"].ToString()))
  86. {
  87. iccid = dr["DeviceSN"].ToString();
  88. }
  89. String lngAndLat = "";
  90. //if (!"".Equals(dr["X坐标"].ToString()) && !"".Equals(dr["Y坐标"].ToString()))
  91. //{
  92. // lngAndLat = dr["X坐标"].ToString() + "|" + dr["Y坐标"].ToString();
  93. //}
  94. String meterAssessmentCode = "wwkjsz" + dr["ID"].ToString();
  95. message.Append("{");
  96. message.Append("\"meterAssessmentName\": \"").Append(dr["DeviceName"]).Append("\",");
  97. message.Append("\"iccId\": ").Append(iccid).Append(",");
  98. //message.Append("\"areaId\": 22,");
  99. message.Append("\"lngAndLat\": \"").Append(lngAndLat).Append("\",");
  100. //message.Append("\"pipeCailber\": \"DN32\",");
  101. //message.Append("\"pipeTexture\": \"PVC\",");
  102. //message.Append("\"imei\": \"77564212\",");
  103. message.Append("\"isPressucre\": 0,");
  104. message.Append("\"isFlow\": 0,");
  105. message.Append("\"isZoneMeter\": 0,");
  106. message.Append("\"isTradeMeter\": 0,");
  107. message.Append("\"isLargeUser\": 0,");
  108. message.Append("\"isQuality\": 1,");
  109. message.Append("\"meterAssessmentCode\": \"").Append(meterAssessmentCode).Append("\",");
  110. message.Append("\"manufacturerCode\": \"").Append(Constants.ManufacturerCode).Append("\",");
  111. message.Append("\"meterTypeId\": \"2\"");
  112. message.Append("}");
  113. foreach (KeyValuePair<string, IModel> item in channels)
  114. {
  115. string key = item.Key;
  116. IModel channel = item.Value;
  117. IBasicProperties property = properties[key];
  118. channel.BasicPublish("zone.device", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息
  119. }
  120. changLeWaterQualityId = dr["ID"].ToString();
  121. }
  122. catch (Exception ex)
  123. {
  124. log.Info("水质设备基础数据推送失败:" + message.ToString() + "\r\n");
  125. log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n");
  126. }
  127. }
  128. }
  129. UpdateAppConfig("ChangLeWaterQualityId", changLeWaterQualityId);
  130. log.Info("水质设备基础数据同步任务结束执行.................\r\n");
  131. }
  132. /// <summary>
  133. /// 水质设备历史数据
  134. /// </summary>
  135. /// <param name="channel"></param>
  136. private void SendQualityMeterUserHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
  137. {
  138. string changLeWaterQualityHisId = Constants.changLeWaterQualityHisId;
  139. log.Info("水质设备抄表数据同步任务开始执行.................\r\n");
  140. String sqlMeter = "SELECT top 100 * FROM [历史记录_zhsw] WHERE HistoryRecordID > " + changLeWaterQualityHisId + " ORDER BY HistoryRecordID";
  141. DataTable dtMeterHis = dbHelper.Fill(sqlMeter);
  142. StringBuilder message = new StringBuilder();
  143. for (int j = 0; j < dtMeterHis.Rows.Count; j++)
  144. {
  145. message.Clear();
  146. try
  147. {
  148. DataRow drMeterHis = dtMeterHis.Rows[j];
  149. String getDateTime = Convert.ToDateTime(drMeterHis["GetDateTime"]).ToString("yyyy-MM-dd HH:mm:ss");
  150. String meterCode = "wwkjsz" + drMeterHis["DevId"].ToString();
  151. message.Append("{");
  152. message.Append("\"meterAssessmentCode\": \"").Append(meterCode).Append("\",");
  153. message.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(",");
  154. message.Append("\"getDateTime\": \"").Append(getDateTime).Append("\",");
  155. if (Convert.DBNull != drMeterHis["Chlorine"])
  156. {
  157. message.Append("\"chlorine\": ").Append(Convert.ToDecimal(drMeterHis["Chlorine"])).Append(",");
  158. }
  159. if (Convert.DBNull != drMeterHis["Turbidity"])
  160. {
  161. message.Append("\"turbidity\": ").Append(Convert.ToDecimal(drMeterHis["Turbidity"])).Append(",");
  162. }
  163. message.Append("}");
  164. foreach (KeyValuePair<string, IModel> item in channels)
  165. {
  166. string key = item.Key;
  167. IModel channel = item.Value;
  168. IBasicProperties property = properties[key];
  169. channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息
  170. }
  171. changLeWaterQualityHisId = drMeterHis["HistoryRecordID"].ToString();
  172. }
  173. catch (Exception ex)
  174. {
  175. log.Info("水质设备历史记录同步任务数据推送失败:" + message.ToString() + "\r\n");
  176. log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n");
  177. }
  178. }
  179. UpdateAppConfig("ChangLeWaterQualityHisId", changLeWaterQualityHisId);
  180. log.Info("水质设备历史记录同步任务执行结束.................\r\n");
  181. }
  182. /// <summary>
  183. /// 更新配置文件中的值
  184. /// </summary>
  185. /// <param name="key">键</param>
  186. /// <param name="value">值</param>
  187. private void UpdateAppConfig(String key, String value)
  188. {
  189. var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
  190. cfg.AppSettings.Settings[key].Value = value;
  191. cfg.Save();
  192. ConfigurationManager.RefreshSection("appSettings");
  193. }
  194. static IDbProvider dbHelper
  195. {
  196. get
  197. {
  198. var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.changLeWaterQualityDb);
  199. return DbDefine;
  200. }
  201. }
  202. }
  203. }