HeTongDataUploadJob.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. using log4net;
  2. using Quartz;
  3. using RabbitMQ.Client;
  4. using RDIFramework.Utilities;
  5. using System;
  6. using System.Collections.Generic;
  7. using System.Configuration;
  8. using System.Data;
  9. using System.IO;
  10. using System.Text;
  11. using TimedUpload.utils;
  12. namespace TimedUpload.QuartzJobs
  13. {
  14. [DisallowConcurrentExecution]
  15. public class HeTongDataUploadJob : IJob
  16. {
  17. private readonly ILog log = LogManager.GetLogger(typeof(HeTongDataUploadJob));
  18. MySqlHelper mySqlHelper = new MySqlHelper("DbMySQL");
  19. public void Execute(IJobExecutionContext context)
  20. {
  21. string[] uploadUrls = Constants.UploadUrlHeTong.Split('|');
  22. Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
  23. Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
  24. Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
  25. foreach (string uploadUrl in uploadUrls)
  26. {
  27. ConnectionFactory factory = new ConnectionFactory();
  28. factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
  29. factory.UserName = Constants.UploadUserNameHeTong;//默认用户名,用户可以在服务端自定义创建,有相关命令行
  30. factory.Password = Constants.UploadPasswordHeTong;//默认密码
  31. factory.VirtualHost = Constants.VirtualHostHeTong;
  32. factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
  33. IConnection connection = factory.CreateConnection();
  34. IModel channel = connection.CreateModel();
  35. channel.QueueDeclare("archives.dmaDevice.exchange", true, false, false, null);//创建一个名称为kibaqueue的消息队列
  36. channel.QueueDeclare("data.dmaDeviceData.exchange", true, false, false, null);//创建一个名称为kibaqueue的消息队列
  37. IBasicProperties property = channel.CreateBasicProperties();
  38. property.ContentType = "text/plain";
  39. property.DeliveryMode = 2; //持久化
  40. connections.Add(uploadUrl, connection);
  41. properties.Add(uploadUrl, property);
  42. channels.Add(uploadUrl, channel);
  43. }
  44. if (channels.Count > 0)
  45. {
  46. //SendZoneDevice(channels, properties);
  47. SendZoneDeviceHis(channels, properties);
  48. }
  49. foreach (KeyValuePair<string, IConnection> item in connections)
  50. {
  51. IConnection connection = item.Value;
  52. connection.Close();
  53. }
  54. }
  55. /// <summary>
  56. /// 大表设备添加
  57. /// </summary>
  58. /// <param name="channels"></param>
  59. /// <param name="properties"></param>
  60. private void SendZoneDevice(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
  61. {
  62. try
  63. {
  64. Dictionary<string, string> devIdDic = GetDevIds();
  65. string devIdsTmp = "";
  66. foreach (KeyValuePair<string, string> item in devIdDic)
  67. {
  68. devIdsTmp += item.Key + ",";
  69. }
  70. string devIds = devIdsTmp.Substring(0, devIdsTmp.Length - 1);
  71. log.Info("大表设备同步任务开始执行.................\r\n");
  72. string sql = "SELECT MeterAssessmentName,MeterAssessmentCode,ICCID,LngAndLat,IsPressucre,IsFlow,isZoneMeter,isTradeMeter,isLargeUser FROM bs_meterassessmentbase WHERE MeterAssessmentId IN (" + devIds + ")";
  73. DataTable dt = mySqlHelper.GetDataTable(sql);
  74. StringBuilder message = new StringBuilder();
  75. for (int i = 0; i < dt.Rows.Count; i++)
  76. {
  77. message.Clear();
  78. try
  79. {
  80. DataRow dr = dt.Rows[i];
  81. String iccid = "";
  82. if (!"".Equals(dr["ICCID"].ToString()))
  83. {
  84. iccid = dr["ICCID"].ToString().Split(',')[0];
  85. }
  86. String lngAndLat = "";
  87. if (!"".Equals(dr["LngAndLat"].ToString()))
  88. {
  89. lngAndLat = dr["LngAndLat"].ToString();
  90. }
  91. message.Append("{\"deviceAdd\":{");
  92. message.Append("\"meterAssessmentName\": \"").Append(dr["MeterAssessmentName"]).Append("\",");
  93. message.Append("\"iccId\": ").Append(iccid).Append(",");
  94. message.Append("\"lngAndLat\": \"").Append(lngAndLat).Append("\",");
  95. message.Append("\"isPressucre\": ").Append(dr["IsPressucre"]).Append(",");
  96. message.Append("\"isFlow\": ").Append(dr["IsFlow"]).Append(",");
  97. message.Append("\"isZoneMeter\": ").Append(dr["isZoneMeter"]).Append(",");
  98. message.Append("\"isTradeMeter\": ").Append(dr["isTradeMeter"]).Append(",");
  99. message.Append("\"isLargeUser\": ").Append(dr["isLargeUser"]).Append(",");
  100. message.Append("\"meterAssessmentCode\": \"").Append(dr["MeterAssessmentCode"]).Append("\",");
  101. message.Append("\"manufacturerCode\": \"ht\"");
  102. message.Append("}}");
  103. foreach (KeyValuePair<string, IModel> item in channels)
  104. {
  105. string key = item.Key;
  106. IModel channel = item.Value;
  107. IBasicProperties property = properties[key];
  108. channel.BasicPublish("archives.dmaDevice.exchange", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息
  109. }
  110. }
  111. catch (Exception ex)
  112. {
  113. log.Info("大表设备同步任务数据推送失败:" + message.ToString() + "\r\n");
  114. log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n");
  115. }
  116. }
  117. log.Info("大表设备同步任务执行结束.................\r\n");
  118. }
  119. catch (Exception ex)
  120. {
  121. log.Error("大表设备同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
  122. }
  123. }
  124. /// <summary>
  125. /// 大表历史数据
  126. /// </summary>
  127. /// <param name="channels"></param>
  128. /// <param name="properties"></param>
  129. private void SendZoneDeviceHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
  130. {
  131. try
  132. {
  133. log.Info("大表设备历史数据同步和同DMA任务开始执行.................\r\n");
  134. Dictionary<String, String> uploadHis = GetDevIds();
  135. Dictionary<String, String> uploadHisNew = new Dictionary<string, string>();
  136. foreach (KeyValuePair<string, string> itemDev in uploadHis)
  137. {
  138. string meterId = itemDev.Key;
  139. string lastTime = itemDev.Value;
  140. string sql = "SELECT MeterAssessmentCode,GetDateTime,NetCumulativeFlow,PositiveCumulativeFlow,NegativeCumulativeFlow,InstantaneousFlow,Pressure,BatteryVoltageValue FROM bs_meterassessmentbase_" + meterId + " a WHERE a.GetDateTime > '" + lastTime + "' ORDER BY GetDateTime ASC";
  141. try
  142. {
  143. DataTable dtMeterHis = mySqlHelper.GetDataTable(sql);
  144. StringBuilder message = new StringBuilder();
  145. for (int j = 0; j < dtMeterHis.Rows.Count; j++)
  146. {
  147. message.Clear();
  148. try
  149. {
  150. DataRow drMeterHis = dtMeterHis.Rows[j];
  151. String meterCode = drMeterHis["MeterAssessmentCode"].ToString();
  152. String getDateTime = Convert.ToDateTime(drMeterHis["GetDateTime"]).ToString("yyyy-MM-dd HH:mm:ss");
  153. message.Append("{");
  154. message.Append("\"meterAssessmentCode\": \"").Append(meterCode).Append("\",");
  155. message.Append("\"manufacturerCode\": \"ht\",");
  156. message.Append("\"getDateTime\": \"").Append(getDateTime).Append("\",");
  157. if (Convert.DBNull != drMeterHis["NetCumulativeFlow"])
  158. {
  159. message.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["NetCumulativeFlow"])).Append(",");
  160. }
  161. if (Convert.DBNull != drMeterHis["PositiveCumulativeFlow"])
  162. {
  163. message.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["PositiveCumulativeFlow"])).Append(",");
  164. }
  165. if (Convert.DBNull != drMeterHis["NegativeCumulativeFlow"])
  166. {
  167. message.Append("\"negativeCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["NegativeCumulativeFlow"])).Append(",");
  168. }
  169. if (Convert.DBNull != drMeterHis["InstantaneousFlow"])
  170. {
  171. message.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(drMeterHis["InstantaneousFlow"])).Append(",");
  172. }
  173. if (Convert.DBNull != drMeterHis["Pressure"])
  174. {
  175. message.Append("\"pressure\": ").Append(Convert.ToDecimal(drMeterHis["Pressure"])).Append(",");
  176. }
  177. if (Convert.DBNull != drMeterHis["BatteryVoltageValue"])
  178. {
  179. message.Append("\"batteryVoltageValue\": ").Append(Convert.ToDecimal(drMeterHis["BatteryVoltageValue"])).Append(",");
  180. }
  181. message.Append("}");
  182. foreach (KeyValuePair<string, IModel> item in channels)
  183. {
  184. string key = item.Key;
  185. IModel channel = item.Value;
  186. IBasicProperties property = properties[key];
  187. channel.BasicPublish("data.dmaDeviceData.exchange", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息
  188. }
  189. lastTime = getDateTime;
  190. }
  191. catch (Exception ex)
  192. {
  193. log.Info("大表设备历史记录同步和同DMA任务数据推送失败:" + meterId + "," + message.ToString() + "\r\n");
  194. log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n");
  195. continue;
  196. }
  197. }
  198. }
  199. catch (Exception ex)
  200. {
  201. log.Error(meterId + "," + ex.Message + "===========" + ex.StackTrace + "\r\n");
  202. }
  203. uploadHisNew[meterId] = lastTime;
  204. }
  205. SavaUploadHis(uploadHisNew);
  206. log.Info("大表设备历史记录同步和同DMA任务执行结束.................\r\n");
  207. }
  208. catch (Exception ex)
  209. {
  210. log.Error("大表设备历史记录同步和同DMA任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
  211. }
  212. }
  213. private Dictionary<string, string> GetDevIds()
  214. {
  215. Dictionary<String, String> uploadHis = new Dictionary<string, string>();
  216. using (StreamReader sr = new StreamReader(@"TextFileHeTong.txt"))
  217. {
  218. String line = "";
  219. while ((line = sr.ReadLine()) != null)
  220. {
  221. if (!"".Equals(line))
  222. {
  223. String[] item = line.Split(',');
  224. uploadHis[item[0]] = item[1];
  225. }
  226. }
  227. }
  228. return uploadHis;
  229. }
  230. /// <summary>
  231. /// 更新配置文件中的值
  232. /// </summary>
  233. /// <param name="key">键</param>
  234. /// <param name="value">值</param>
  235. private void UpdateAppConfig(String key, String value)
  236. {
  237. var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
  238. cfg.AppSettings.Settings[key].Value = value;
  239. cfg.Save();
  240. ConfigurationManager.RefreshSection("appSettings");
  241. }
  242. /// <summary>
  243. /// 判断历史记录表是否存在
  244. /// </summary>
  245. /// <param name="tablename"></param>
  246. /// <returns></returns>
  247. private bool CheckTableExist(string tablename)
  248. {
  249. DataTable table = dbHelper.Fill("select top 1 * from sysobjects where name='" + tablename + "' and xtype='u'");
  250. if (table == null || table.Rows.Count == 0)
  251. {
  252. return false;
  253. }
  254. return true;
  255. }
  256. /// <summary>
  257. /// 保存每块块表的上传最后一条历史记录
  258. /// </summary>
  259. /// <param name="uploadHis"></param>
  260. private void SavaUploadHis(Dictionary<String,String> uploadHis)
  261. {
  262. // 清除之前的内容
  263. FileStream stream = File.Open(@"TextFileHeTong.txt", FileMode.OpenOrCreate, FileAccess.Write);
  264. stream.Seek(0, SeekOrigin.Begin);
  265. stream.SetLength(0);
  266. stream.Close();
  267. using (StreamWriter sw = new StreamWriter(@"TextFileHeTong.txt"))
  268. {
  269. foreach (var item in uploadHis)
  270. {
  271. sw.WriteLine(item.Key + "," + item.Value);
  272. }
  273. }
  274. }
  275. static IDbProvider dbHelper
  276. {
  277. get
  278. {
  279. var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.DbConncetion);
  280. return DbDefine;
  281. }
  282. }
  283. }
  284. }