ChangleWorkmanshipDataUploadJob.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. using log4net;
  2. using Newtonsoft.Json;
  3. using Quartz;
  4. using RabbitMQ.Client;
  5. using RDIFramework.Utilities;
  6. using System;
  7. using System.Collections.Generic;
  8. using System.Data;
  9. using System.Linq;
  10. using System.Text;
  11. namespace TimedUpload.QuartzJobs
  12. {
  13. public class ChangleWorkmanshipDataUploadJob : IJob
  14. {
  15. private readonly ILog log = LogManager.GetLogger(typeof(WorkmanshipDataUploadJob));
  16. public void Execute(IJobExecutionContext context)
  17. {
  18. string[] uploadUrls = Constants.UploadUrl.Split('|');
  19. Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
  20. Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
  21. Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
  22. foreach (string uploadUrl in uploadUrls)
  23. {
  24. ConnectionFactory factory = new ConnectionFactory();
  25. factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
  26. factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
  27. factory.Password = Constants.UploadPassword;//默认密码
  28. factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
  29. IConnection connection = factory.CreateConnection();
  30. IModel channel = connection.CreateModel();
  31. channel.QueueDeclare("workmanship", true, false, false, null);//创建一个名称为kibaqueue的消息队列
  32. IBasicProperties property = channel.CreateBasicProperties();
  33. property.ContentType = "text/plain";
  34. property.DeliveryMode = 2; //持久化
  35. connections.Add(uploadUrl, connection);
  36. properties.Add(uploadUrl, property);
  37. channels.Add(uploadUrl, channel);
  38. }
  39. if (channels.Count > 0)
  40. {
  41. SendSeconddaryPumpData(channels, properties);
  42. //SendWaterWellData(channels, properties);
  43. SendWaterFactoryData(channels, properties);
  44. }
  45. foreach (KeyValuePair<string, IConnection> item in connections)
  46. {
  47. IConnection connection = item.Value;
  48. connection.Close();
  49. }
  50. }
  51. /// <summary>
  52. /// 二供数据
  53. /// </summary>
  54. /// <param name="channel"></param>
  55. private void SendSeconddaryPumpData(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
  56. {
  57. try
  58. {
  59. log.Info("二供工艺图数据同步任务开始执行.................\r\n");
  60. DateTime newTime = DateTime.Now;
  61. String yearStr = newTime.Year.ToString();
  62. #region 获取水质
  63. string ph = "0", chlorine = "0", turbidity = "0";
  64. string waterQulitySql = @"SELECT TOP 2 [RecordName],[RecordValue] FROM [dbo].[历史记录] where DevId = 3 order by GetDateTime DESC";
  65. DataTable waterQulityDt = dbHelper.Fill(waterQulitySql);
  66. if (waterQulityDt != null && waterQulityDt.Rows.Count > 0)
  67. {
  68. for (int i = 0; i < waterQulityDt.Rows.Count; i++)
  69. {
  70. DataRow row = waterQulityDt.Rows[i];
  71. if (row["RecordName"].ToString().Trim() == "余氯") {
  72. chlorine = row["RecordValue"].ToString();
  73. }
  74. if (row["RecordName"].ToString().Trim() == "浊度")
  75. {
  76. turbidity = row["RecordValue"].ToString();
  77. }
  78. }
  79. }
  80. #endregion
  81. string factorySql = @"SELECT top 1 1 rid,
  82. 1 Type,CONVERT(varchar(100),采集时间,111) 日期,CONVERT(varchar(100),采集时间,108) 时间
  83. CONVERT(varchar(100),采集时间,111) + ' ' + CONVERT(varchar(100),采集时间,108) ReadTime,CONVERT(varchar(100),采集时间,111) + ' ' + CONVERT(varchar(5),采集时间,108) 更新时间,
  84. " + ph + " PH," + chlorine + " 余氯, " + turbidity + @" 浊度,13.9 温度,'南流泉泵站' 编号,'second001' 编码,'second001' DeviceCode,0 水箱液位,0 爆管报警,'' 电源故障,
  85. '' 真空报警,瞬时流量,'' 硬件超压,'' 系统电压,0 缺水报警,0 网络状态,0 超压报警,'' 软件超压,0 进水报警, 累计流量 净累计流量,'' 正累计流量,'' 负累计流量,
  86. 0 一段压力设定,0 三段压力设定,0 二段压力设定
  87. ,[表1A相电压] 电压AB
  88. ,[表1B相电压] 电压AC
  89. ,[表1C相电压] 电压BC
  90. ,[表1A相电流] + 表2A相电流 + 表3A相电流 电流A
  91. ,[表1B相电流] + 表2B相电流 + 表3B相电流 电流B
  92. ,[表1C相电流] + 表2C相电流 + 表3C相电流 电流C
  93. ,[表1电能] + 表2电能 + 表3电能 用电量
  94. ,[表1A相电压] 一泵电压,'' 一泵故障
  95. ,[表1A相电流] 一泵电流
  96. ,[表2A相电压] 二泵电压,'' 二泵故障
  97. ,[表2A相电流] 二泵电流
  98. ,[表3A相电压] 三泵电压,'' 二泵故障
  99. ,[表3A相电流] 三泵电流
  100. ,[泵3状态] 三泵运行状态
  101. ,[泵2状态] 二泵运行状态
  102. ,[泵1状态] 一泵运行状态
  103. ,[一号泵有功功率] 一泵功率
  104. ,[一号泵频率] 一泵频率
  105. ,[二号泵有功功率] 二泵功率
  106. ,[二号泵频率] 二泵频率
  107. ,[三号泵有功功率] 三泵功率
  108. ,[三号泵频率] 三泵频率
  109. ,[出水设定压力] 泵设定压力
  110. ,[出水端实际压力] 泵出口压力
  111. ,[进水端实际压力] 泵进口压力
  112. FROM [dbo].[历史记录_000015_" + yearStr + "] order by id DESC";
  113. DataTable dtDevice = dbHelper.Fill(factorySql);
  114. if (dtDevice == null || dtDevice.Rows.Count == 0)
  115. {
  116. return;
  117. }
  118. DataColumnCollection cols = dtDevice.Columns;
  119. // 处理设备列表
  120. for (int i = 0; i < dtDevice.Rows.Count; i++)
  121. {
  122. DataRow dr = dtDevice.Rows[i];
  123. string deviceCode = dr["DeviceCode"].ToString();
  124. string type = dr["Type"].ToString();
  125. string realData = CreateJsonParameters(cols, dr);
  126. Dictionary<string, object> deviceMap = new Dictionary<string, object>();
  127. deviceMap["DeviceCode"] = deviceCode;
  128. deviceMap["Type"] = type;
  129. deviceMap["RealData"] = realData;
  130. string message = JsonConvert.SerializeObject(deviceMap);
  131. foreach (KeyValuePair<string, IModel> item in channels)
  132. {
  133. string key = item.Key;
  134. IModel channel = item.Value;
  135. IBasicProperties property = properties[key];
  136. channel.BasicPublish("workmanship", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
  137. }
  138. }
  139. log.Info("二供工艺图数据同步任务开始执行.................\r\n");
  140. }
  141. catch (Exception ex)
  142. {
  143. log.Error("二供工艺图数据同步任务错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
  144. }
  145. }
  146. /// <summary>
  147. /// 水厂数据
  148. /// </summary>
  149. /// <param name="channel"></param>
  150. private void SendWaterFactoryData(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
  151. {
  152. try
  153. {
  154. log.Info("水厂工艺图数据同步任务开始执行.................\r\n");
  155. DateTime newTime = DateTime.Now;
  156. String yearStr = newTime.Year.ToString();
  157. string factorySql = "";
  158. DataTable dtDevice = dbHelper.Fill(factorySql);
  159. if (dtDevice == null || dtDevice.Rows.Count == 0)
  160. {
  161. return;
  162. }
  163. DataColumnCollection cols = dtDevice.Columns;
  164. // 处理设备列表
  165. for (int i = 0; i < dtDevice.Rows.Count; i++)
  166. {
  167. DataRow dr = dtDevice.Rows[i];
  168. string deviceCode = dr["DeviceCode"].ToString();
  169. string type = dr["Type"].ToString();
  170. string realData = CreateJsonParameters(cols, dr);
  171. Dictionary<string, object> deviceMap = new Dictionary<string, object>();
  172. deviceMap["DeviceCode"] = deviceCode;
  173. deviceMap["Type"] = type;
  174. deviceMap["RealData"] = realData;
  175. string message = JsonConvert.SerializeObject(deviceMap);
  176. foreach (KeyValuePair<string, IModel> item in channels)
  177. {
  178. string key = item.Key;
  179. IModel channel = item.Value;
  180. IBasicProperties property = properties[key];
  181. channel.BasicPublish("workmanship", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
  182. }
  183. }
  184. log.Info("水厂工艺图数据同步任务执行结束.................\r\n");
  185. }
  186. catch (Exception ex)
  187. {
  188. log.Error("水厂工艺图数据同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
  189. }
  190. }
  191. private static string CreateJsonParameters(DataColumnCollection cols, DataRow dr)
  192. {
  193. StringBuilder JsonString = new StringBuilder();
  194. JsonString.Append("{");
  195. for (int j = 0; j < cols.Count; j++)
  196. {
  197. if (j < cols.Count - 1)
  198. {
  199. JsonString.Append("\"" + cols[j].ColumnName.ToString() + "\":" + "\"" + dr[j].ToString().Trim() + "\",");
  200. }
  201. else if (j == cols.Count - 1)
  202. {
  203. JsonString.Append("\"" + cols[j].ColumnName.ToString() + "\":" + "\"" + dr[j].ToString().Trim() + "\"");
  204. }
  205. }
  206. JsonString.Append("}");
  207. return JsonString.ToString();
  208. }
  209. static IDbProvider dbHelper
  210. {
  211. get
  212. {
  213. var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion);
  214. return DbDefine;
  215. }
  216. }
  217. }
  218. }