WaterFactoryAreaDataJob.cs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. using log4net;
  2. using Quartz;
  3. using RabbitMQ.Client;
  4. using RDIFramework.Utilities;
  5. using System;
  6. using System.Collections.Generic;
  7. using System.Data;
  8. using System.IO;
  9. using System.Text;
  10. namespace TimedUpload.QuartzJobs
  11. {
  12. [DisallowConcurrentExecution]
  13. public class WaterFactoryAreaDataJob :IJob
  14. {
  15. private readonly ILog log = LogManager.GetLogger(typeof(WaterFactoryAreaDataJob));
  16. public void Execute(IJobExecutionContext context)
  17. {
  18. // throw new NotImplementedException();
  19. string[] uploadUrls = Constants.UploadUrl.Split('|');
  20. Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
  21. Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
  22. Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
  23. foreach (string uploadUrl in uploadUrls)
  24. {
  25. ConnectionFactory factory = new ConnectionFactory();
  26. factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
  27. factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
  28. factory.Password = Constants.UploadPassword;//默认密码
  29. factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
  30. IConnection connection = factory.CreateConnection();
  31. IModel channel = connection.CreateModel();
  32. channel.QueueDeclare("zone.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
  33. IBasicProperties property = channel.CreateBasicProperties();
  34. property.ContentType = "text/plain";
  35. property.DeliveryMode = 2; //持久化
  36. connections.Add(uploadUrl, connection);
  37. properties.Add(uploadUrl, property);
  38. channels.Add(uploadUrl, channel);
  39. }
  40. if (channels.Count > 0)
  41. {
  42. SendZoneDeviceHis(channels, properties);
  43. }
  44. foreach (KeyValuePair<string, IConnection> item in connections)
  45. {
  46. IConnection connection = item.Value;
  47. connection.Close();
  48. }
  49. }
  50. /// <summary>
  51. /// 大表历史数据
  52. /// </summary>
  53. /// <param name="channels"></param>
  54. /// <param name="properties"></param>
  55. private void SendZoneDeviceHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
  56. {
  57. try
  58. {
  59. log.Info("水厂大表设备历史数据同步任务开始执行.................\r\n");
  60. String sqlMeter = "SELECT * FROM bs_waterfactory_meter WHERE Type = 1 AND IsFlow = 1";//"SELECT ID,考核表编码 FROM [设备信息] where 是否启用 = '是' and 考核表编码 is not null order by ID";
  61. DataTable dtMeter = dbHelper.Fill(sqlMeter);
  62. //Dictionary<string, object> arguments = new Dictionary<string, object>();
  63. //arguments["x-max-length-bytes"] = 2147383648;
  64. //arguments["x-overflow"] = "reject-publish";
  65. for (int i = 0; i < dtMeter.Rows.Count; i++)
  66. {
  67. DataRow drMeter = dtMeter.Rows[i];
  68. //String meterId = drMeter["ID"].ToString();
  69. String meterCode = drMeter["MeterCode"].ToString().TrimEnd() + "wf";
  70. String lastTime = "";
  71. //for (int k = lastYear; k <= nowYear; k++)
  72. //{
  73. //string tablename = "历史记录_" + ("000000" + meterId).Substring(meterId.Length, 6) + "_" + k;
  74. // 判断历史记录表是否存在
  75. //if (!CheckTableExist(tablename))
  76. //{
  77. // continue;
  78. //}
  79. //String sqlMeterHis = "select 记录时间,采集时间,正累计流量,负累计流量,净累计流量,瞬时流量,电池电压,压力 from " + tablename;
  80. //if (uploadHis.ContainsKey(meterCode))
  81. //{
  82. // sqlMeterHis += " where 采集时间 > '" + uploadHis[meterCode] + "'";
  83. //}
  84. //sqlMeterHis += " order by 采集时间";
  85. //DataTable dtMeterHis = dbHelper.Fill(sqlMeterHis);
  86. StringBuilder message = new StringBuilder();
  87. for (int j = 0; j < dtMeter.Rows.Count; j++)
  88. {
  89. message.Clear();
  90. try
  91. {
  92. //DataRow drMeterHis = dtMeterHis.Rows[j];
  93. String getDateTime = Convert.ToDateTime(drMeter["ReadTime"]).ToString("yyyy-MM-dd HH:mm:ss");
  94. message.Append("{");
  95. message.Append("\"meterAssessmentCode\": \"").Append(drMeter["MeterCode"].ToString()).Append("wf\",");
  96. message.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(",");
  97. message.Append("\"getDateTime\": \"").Append(getDateTime).Append("\",");
  98. if (Convert.DBNull != drMeter["NetCumulativeFlow"])
  99. {
  100. message.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(drMeter["NetCumulativeFlow"])).Append(",");
  101. }
  102. if (Convert.DBNull != drMeter["NetCumulativeFlow"])
  103. {
  104. message.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(drMeter["NetCumulativeFlow"])).Append(",");
  105. }
  106. message.Append("\"negativeCumulativeFlow\": ").Append(0).Append(",");
  107. if (Convert.DBNull != drMeter["InstantaneousFlow"])
  108. {
  109. message.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(drMeter["instantaneousFlow"])).Append(",");
  110. }
  111. if (Convert.DBNull != drMeter["Pressure"])
  112. {
  113. message.Append("\"pressure\": ").Append(Convert.ToDecimal(drMeter["Pressure"])).Append(",");
  114. }
  115. //if (Convert.DBNull != drMeter["电池电压"])
  116. //{
  117. // message.Append("\"batteryVoltageValue\": ").Append(Convert.ToDecimal(drMeterHis["电池电压"])).Append(",");
  118. //}
  119. message.Append("}");
  120. foreach (KeyValuePair<string, IModel> item in channels)
  121. {
  122. string key = item.Key;
  123. IModel channel = item.Value;
  124. IBasicProperties property = properties[key];
  125. channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息
  126. }
  127. lastTime = getDateTime;
  128. }
  129. catch (Exception ex)
  130. {
  131. log.Info("大表设备历史记录同步任务数据推送失败:" + message.ToString() + "\r\n");
  132. log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n");
  133. }
  134. }
  135. //}
  136. //if (!"".Equals(lastTime))
  137. //{
  138. // uploadHis[meterCode] = lastTime;
  139. //}
  140. }
  141. //SavaUploadHis(uploadHis);
  142. log.Info("水厂大表设备历史记录同步任务执行结束.................\r\n");
  143. }
  144. catch (Exception ex)
  145. {
  146. log.Error("水厂大表设备历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
  147. }
  148. }
  149. /// <summary>
  150. /// 判断历史记录表是否存在
  151. /// </summary>
  152. /// <param name="tablename"></param>
  153. /// <returns></returns>
  154. private bool CheckTableExist(string tablename)
  155. {
  156. DataTable table = dbHelper.Fill("select top 1 * from sysobjects where name='" + tablename + "' and xtype='u'");
  157. if (table == null || table.Rows.Count == 0)
  158. {
  159. return false;
  160. }
  161. return true;
  162. }
  163. /// <summary>
  164. /// 保存每块块表的上传最后一条历史记录
  165. /// </summary>
  166. /// <param name="uploadHis"></param>
  167. private void SavaUploadHis(Dictionary<String, String> uploadHis)
  168. {
  169. // 清除之前的内容
  170. FileStream stream = File.Open(@"TextFile1.txt", FileMode.OpenOrCreate, FileAccess.Write);
  171. stream.Seek(0, SeekOrigin.Begin);
  172. stream.SetLength(0);
  173. stream.Close();
  174. using (StreamWriter sw = new StreamWriter(@"TextFile1.txt"))
  175. {
  176. foreach (var item in uploadHis)
  177. {
  178. sw.WriteLine(item.Key + "," + item.Value);
  179. }
  180. }
  181. }
  182. static IDbProvider dbHelper
  183. {
  184. get
  185. {
  186. var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.MySql, Constants.zhihuishuiwuDB);
  187. return DbDefine;
  188. }
  189. }
  190. }
  191. }