TestJob.cs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. using log4net;
  2. using Quartz;
  3. using RabbitMQ.Client;
  4. using RDIFramework.Utilities;
  5. using System;
  6. using System.Collections.Generic;
  7. using System.Configuration;
  8. using System.Data;
  9. using System.IO;
  10. using System.Text;
  11. namespace TimedUpload.QuartzJobs
  12. {
  13. [DisallowConcurrentExecution]
  14. public class TestJob:IJob
  15. {
  16. private readonly ILog log = LogManager.GetLogger(typeof(TestJob));
  17. public void Execute(IJobExecutionContext context)
  18. {
  19. string[] uploadUrls = Constants.UploadUrl.Split('|');
  20. Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
  21. Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
  22. Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
  23. foreach (string uploadUrl in uploadUrls)
  24. {
  25. ConnectionFactory factory = new ConnectionFactory();
  26. factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
  27. factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
  28. factory.Password = Constants.UploadPassword;//默认密码
  29. factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
  30. IConnection connection = factory.CreateConnection();
  31. IModel channel = connection.CreateModel();
  32. channel.QueueDeclare("zone.device", true, false, false, null);//创建一个名称为kibaqueue的消息队列
  33. channel.QueueDeclare("zone.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
  34. IBasicProperties property = channel.CreateBasicProperties();
  35. property.ContentType = "text/plain";
  36. property.DeliveryMode = 2; //持久化
  37. connections.Add(uploadUrl, connection);
  38. properties.Add(uploadUrl, property);
  39. channels.Add(uploadUrl, channel);
  40. }
  41. if (channels.Count > 0)
  42. {
  43. //SendZoneDevice(channels, properties);
  44. SendZoneDeviceHis(channels, properties);
  45. }
  46. foreach (KeyValuePair<string, IConnection> item in connections)
  47. {
  48. IConnection connection = item.Value;
  49. connection.Close();
  50. }
  51. }
  52. /// <summary>
  53. /// 大表设备添加
  54. /// </summary>
  55. /// <param name="channels"></param>
  56. /// <param name="properties"></param>
  57. private void SendZoneDevice(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
  58. {
  59. try
  60. {
  61. String meterId = Constants.MeterId;
  62. String sql = "SELECT a.ID,a.名称,a.考核表编码,a.X坐标,a.Y坐标,b.传输协议参数 FROM [设备信息] a left join 传输设备 b on a.传输设备ID = b.ID where a.是否启用 = '是' and a.考核表编码 is not null and a.ID > " + meterId + " order by a.ID";
  63. DataTable dt = dbHelper.Fill(sql);
  64. if (dt == null)
  65. {
  66. log.Info("大表设备同步任务查询报错.................\r\n");
  67. return;
  68. }
  69. if (dt.Rows.Count == 0)
  70. {
  71. log.Info("大表设备同步任务,没有需要同步的设备.................\r\n");
  72. return;
  73. }
  74. log.Info("大表设备同步任务开始执行.................\r\n");
  75. StringBuilder message = new StringBuilder();
  76. for (int i = 0; i < dt.Rows.Count; i++)
  77. {
  78. message.Clear();
  79. try
  80. {
  81. DataRow dr = dt.Rows[i];
  82. String iccid = "";
  83. if (!"".Equals(dr["传输协议参数"].ToString()))
  84. {
  85. iccid = dr["传输协议参数"].ToString().Split(',')[0];
  86. }
  87. String lngAndLat = "";
  88. if (!"".Equals(dr["X坐标"].ToString()) && !"".Equals(dr["Y坐标"].ToString()))
  89. {
  90. lngAndLat = dr["X坐标"].ToString() + "|" + dr["Y坐标"].ToString();
  91. }
  92. message.Append("{");
  93. message.Append("\"meterAssessmentName\": \"").Append(dr["名称"]).Append("\",");
  94. message.Append("\"iccId\": ").Append(iccid).Append(",");
  95. //message.Append("\"areaId\": 22,");
  96. message.Append("\"lngAndLat\": \"").Append(lngAndLat).Append("\",");
  97. //message.Append("\"pipeCailber\": \"DN32\",");
  98. //message.Append("\"pipeTexture\": \"PVC\",");
  99. //message.Append("\"imei\": \"77564212\",");
  100. message.Append("\"isPressucre\": 1,");
  101. message.Append("\"isFlow\": 1,");
  102. message.Append("\"isZoneMeter\": 1,");
  103. message.Append("\"isTradeMeter\": 0,");
  104. message.Append("\"isLargeUser\": 0,");
  105. message.Append("\"meterAssessmentCode\": \"").Append(dr["考核表编码"]).Append("\",");
  106. message.Append("\"manufacturerCode\": \"").Append(Constants.ManufacturerCode).Append("\",");
  107. message.Append("\"meterTypeId\": \"2\"");
  108. message.Append("}");
  109. foreach (KeyValuePair<string, IModel> item in channels)
  110. {
  111. string key = item.Key;
  112. IModel channel = item.Value;
  113. IBasicProperties property = properties[key];
  114. channel.BasicPublish("zone.device", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息
  115. }
  116. meterId = dr["ID"].ToString();
  117. }
  118. catch (Exception ex)
  119. {
  120. log.Info("大表设备同步任务数据推送失败:" + message.ToString() + "\r\n");
  121. log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n");
  122. }
  123. }
  124. UpdateAppConfig("MeterId", meterId);
  125. log.Info("大表设备同步任务执行结束.................\r\n");
  126. }
  127. catch (Exception ex)
  128. {
  129. log.Error("大表设备同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
  130. }
  131. }
  132. /// <summary>
  133. /// 大表历史数据
  134. /// </summary>
  135. /// <param name="channels"></param>
  136. /// <param name="properties"></param>
  137. private void SendZoneDeviceHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
  138. {
  139. try
  140. {
  141. StringBuilder message = new StringBuilder();
  142. message.Append("{");
  143. message.Append("\"meterAssessmentCode\": \"").Append("wwkjgw0029").Append("\",");
  144. message.Append("\"manufacturerCode\": ").Append(1).Append(",");
  145. message.Append("\"getDateTime\": \"").Append("2022-08-12 14:30:00").Append("\",");
  146. message.Append("\"pressure\": ").Append(0.55);
  147. message.Append("}");
  148. foreach (KeyValuePair<string, IModel> item in channels)
  149. {
  150. string key = item.Key;
  151. IModel channel = item.Value;
  152. IBasicProperties property = properties[key];
  153. channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息
  154. }
  155. }
  156. catch (Exception ex)
  157. {
  158. log.Error("大表设备历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
  159. }
  160. }
  161. /// <summary>
  162. /// 更新配置文件中的值
  163. /// </summary>
  164. /// <param name="key">键</param>
  165. /// <param name="value">值</param>
  166. private void UpdateAppConfig(String key, String value)
  167. {
  168. var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
  169. cfg.AppSettings.Settings[key].Value = value;
  170. cfg.Save();
  171. ConfigurationManager.RefreshSection("appSettings");
  172. }
  173. /// <summary>
  174. /// 判断历史记录表是否存在
  175. /// </summary>
  176. /// <param name="tablename"></param>
  177. /// <returns></returns>
  178. private bool CheckTableExist(string tablename)
  179. {
  180. DataTable table = dbHelper.Fill("select top 1 * from sysobjects where name='" + tablename + "' and xtype='u'");
  181. if (table == null || table.Rows.Count == 0)
  182. {
  183. return false;
  184. }
  185. return true;
  186. }
  187. /// <summary>
  188. /// 保存每块块表的上传最后一条历史记录
  189. /// </summary>
  190. /// <param name="uploadHis"></param>
  191. private void SavaUploadHis(Dictionary<String,String> uploadHis)
  192. {
  193. // 清除之前的内容
  194. FileStream stream = File.Open(@"TextFile1.txt", FileMode.OpenOrCreate, FileAccess.Write);
  195. stream.Seek(0, SeekOrigin.Begin);
  196. stream.SetLength(0);
  197. stream.Close();
  198. using (StreamWriter sw = new StreamWriter(@"TextFile1.txt"))
  199. {
  200. foreach (var item in uploadHis)
  201. {
  202. sw.WriteLine(item.Key + "," + item.Value);
  203. }
  204. }
  205. }
  206. static IDbProvider dbHelper
  207. {
  208. get
  209. {
  210. var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.DbConncetion);
  211. return DbDefine;
  212. }
  213. }
  214. }
  215. }