ChangleWorkmanshipDataUploadJob.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. using log4net;
  2. using Newtonsoft.Json;
  3. using Quartz;
  4. using RabbitMQ.Client;
  5. using RDIFramework.Utilities;
  6. using System;
  7. using System.Collections.Generic;
  8. using System.Data;
  9. using System.Linq;
  10. using System.Text;
  11. namespace TimedUpload.QuartzJobs
  12. {
  13. [DisallowConcurrentExecution]
  14. public class ChangleWorkmanshipDataUploadJob : IJob
  15. {
  16. private readonly ILog log = LogManager.GetLogger(typeof(WorkmanshipDataUploadJob));
  17. public void Execute(IJobExecutionContext context)
  18. {
  19. string[] uploadUrls = Constants.UploadUrl.Split('|');
  20. Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
  21. Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
  22. Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
  23. foreach (string uploadUrl in uploadUrls)
  24. {
  25. ConnectionFactory factory = new ConnectionFactory();
  26. factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
  27. factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
  28. factory.Password = Constants.UploadPassword;//默认密码
  29. factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
  30. IConnection connection = factory.CreateConnection();
  31. IModel channel = connection.CreateModel();
  32. channel.QueueDeclare("workmanship", true, false, false, null);//创建一个名称为kibaqueue的消息队列
  33. IBasicProperties property = channel.CreateBasicProperties();
  34. property.ContentType = "text/plain";
  35. property.DeliveryMode = 2; //持久化
  36. connections.Add(uploadUrl, connection);
  37. properties.Add(uploadUrl, property);
  38. channels.Add(uploadUrl, channel);
  39. }
  40. if (channels.Count > 0)
  41. {
  42. SendSeconddaryPumpData(channels, properties);
  43. //SendWaterWellData(channels, properties);
  44. SendWaterFactoryData(channels, properties);
  45. }
  46. foreach (KeyValuePair<string, IConnection> item in connections)
  47. {
  48. IConnection connection = item.Value;
  49. connection.Close();
  50. }
  51. }
  52. /// <summary>
  53. /// 二供数据
  54. /// </summary>
  55. /// <param name="channel"></param>
  56. private void SendSeconddaryPumpData(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
  57. {
  58. try
  59. {
  60. log.Info("二供工艺图数据同步任务开始执行.................\r\n");
  61. DateTime newTime = DateTime.Now;
  62. String yearStr = newTime.Year.ToString();
  63. #region 获取水质
  64. string ph = "0", chlorine = "0", turbidity = "0";
  65. string waterQulitySql = @"SELECT TOP 2 [RecordName],[RecordValue] FROM [dbo].[历史记录] where DevId = 3 order by GetDateTime DESC";
  66. DataTable waterQulityDt = waterQulityDbHelper.Fill(waterQulitySql);
  67. if (waterQulityDt != null && waterQulityDt.Rows.Count > 0)
  68. {
  69. for (int i = 0; i < waterQulityDt.Rows.Count; i++)
  70. {
  71. DataRow row = waterQulityDt.Rows[i];
  72. if (row["RecordName"].ToString().Trim() == "余氯") {
  73. chlorine = row["RecordValue"].ToString();
  74. }
  75. if (row["RecordName"].ToString().Trim() == "浊度")
  76. {
  77. turbidity = row["RecordValue"].ToString();
  78. }
  79. }
  80. }
  81. #endregion
  82. string factorySql = @"SELECT top 1 1 rid,
  83. 1 Type,CONVERT(varchar(100),采集时间,111) 日期,CONVERT(varchar(100),采集时间,108) 时间 ,
  84. CONVERT(varchar(100),采集时间,111) + ' ' + CONVERT(varchar(100),采集时间,108) ReadTime,CONVERT(varchar(100),采集时间,111) + ' ' + CONVERT(varchar(5),采集时间,108) 更新时间,
  85. " + ph + " PH," + chlorine + " 余氯, " + turbidity + @" 浊度,13.9 温度,'南流泉泵站' 编号,'second001' 编码,'second001' DeviceCode,0 水箱液位,0 爆管报警,'' 电源故障,
  86. '' 真空报警,瞬时流量,'' 硬件超压,'' 系统电压,0 缺水报警,0 网络状态,0 超压报警,'' 软件超压,0 进水报警, 累计流量 净累计流量,'' 正累计流量,'' 负累计流量,
  87. 0 一段压力设定,0 三段压力设定,0 二段压力设定
  88. ,[表1A相电压] 电压AB
  89. ,[表1B相电压] 电压AC
  90. ,[表1C相电压] 电压BC
  91. ,[表1A相电流] + 表2A相电流 + 表3A相电流 电流A
  92. ,[表1B相电流] + 表2B相电流 + 表3B相电流 电流B
  93. ,[表1C相电流] + 表2C相电流 + 表3C相电流 电流C
  94. ,[表1电能] + 表2电能 + 表3电能 用电量
  95. ,[表1A相电压] 一泵电压,'' 一泵故障
  96. ,[表1A相电流] 一泵电流
  97. ,[表2A相电压] 二泵电压,'' 二泵故障
  98. ,[表2A相电流] 二泵电流
  99. ,[表3A相电压] 三泵电压,'' 二泵故障
  100. ,[表3A相电流] 三泵电流
  101. ,[泵3状态] 三泵运行状态
  102. ,[泵2状态] 二泵运行状态
  103. ,[泵1状态] 一泵运行状态
  104. ,[一号泵有功功率] 一泵功率
  105. ,[一号泵频率] 一泵频率
  106. ,[二号泵有功功率] 二泵功率
  107. ,[二号泵频率] 二泵频率
  108. ,[三号泵有功功率] 三泵功率
  109. ,[三号泵频率] 三泵频率
  110. ,[出水设定压力] 泵设定压力
  111. ,[出水端实际压力] 泵出口压力
  112. ,[进水端实际压力] 泵进口压力
  113. FROM [dbo].[历史记录_000015_" + yearStr + "] order by id DESC";
  114. DataTable dtDevice = secondDbHelper.Fill(factorySql);
  115. if (dtDevice == null || dtDevice.Rows.Count == 0)
  116. {
  117. return;
  118. }
  119. DataColumnCollection cols = dtDevice.Columns;
  120. // 处理设备列表
  121. for (int i = 0; i < dtDevice.Rows.Count; i++)
  122. {
  123. DataRow dr = dtDevice.Rows[i];
  124. string deviceCode = dr["DeviceCode"].ToString();
  125. string type = dr["Type"].ToString();
  126. string realData = CreateJsonParameters(cols, dr);
  127. Dictionary<string, object> deviceMap = new Dictionary<string, object>();
  128. deviceMap["DeviceCode"] = deviceCode;
  129. deviceMap["Type"] = type;
  130. deviceMap["RealData"] = realData;
  131. string message = JsonConvert.SerializeObject(deviceMap);
  132. foreach (KeyValuePair<string, IModel> item in channels)
  133. {
  134. string key = item.Key;
  135. IModel channel = item.Value;
  136. IBasicProperties property = properties[key];
  137. channel.BasicPublish("workmanship", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
  138. }
  139. }
  140. log.Info("二供工艺图数据同步任务开始执行.................\r\n");
  141. }
  142. catch (Exception ex)
  143. {
  144. log.Error("二供工艺图数据同步任务错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
  145. }
  146. }
  147. /// <summary>
  148. /// 水厂数据
  149. /// </summary>
  150. /// <param name="channel"></param>
  151. private void SendWaterFactoryData(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
  152. {
  153. try
  154. {
  155. log.Info("水厂工艺图数据同步任务开始执行.................\r\n");
  156. DateTime newTime = DateTime.Now;
  157. String yearStr = newTime.Year.ToString();
  158. // 进水
  159. Decimal inFlow = 0;
  160. Decimal inTotalFlow = 0;
  161. string insql = @"SELECT TOP 1 [id],[设备ID],[记录时间],[采集时间],[设备状态],[通讯状态],[净累计流量],[瞬时流量] FROM [dbo].[历史记录_000523_" + yearStr + "] order by 采集时间 DESC";
  162. DataTable inDt = dbHelper.Fill(insql);
  163. if (inDt != null && inDt.Rows.Count > 0) {
  164. inFlow = Convert.ToDecimal(inDt.Rows[0]["瞬时流量"]);
  165. inTotalFlow = Convert.ToDecimal(inDt.Rows[0]["净累计流量"]);
  166. }
  167. // 出水
  168. Decimal outFlow = 0;
  169. Decimal outTotalFlow = 0;
  170. string outsql = @"SELECT TOP 1 [id],[设备ID],[记录时间],[采集时间],[设备状态],[通讯状态],[净累计流量],[瞬时流量] FROM [dbo].[历史记录_000286_" + yearStr + "] Order by 采集时间 DESC"; ;
  171. DataTable outDt = dbHelper.Fill(outsql);
  172. if (outDt != null && outDt.Rows.Count > 0)
  173. {
  174. outFlow = Convert.ToDecimal(inDt.Rows[0]["瞬时流量"]);
  175. outTotalFlow = Convert.ToDecimal(inDt.Rows[0]["净累计流量"]);
  176. }
  177. string factorySql = @"SELECT TOP 1
  178. 1 rid,3 Type,1 ridl,CONVERT(varchar(100), CollectTime, 111) 日期,CONVERT(varchar(100), CollectTime, 108) 时间, CONVERT(varchar(100), CollectTime, 111) 日期1,CONVERT(varchar(100), CollectTime, 108) 时间1,
  179. CONVERT(varchar(100), CollectTime, 111) + ' ' + CONVERT(varchar(100), CollectTime, 108) ReadTime,CONVERT(varchar(100), CollectTime, 111) + ' ' + CONVERT(varchar(5), CollectTime, 108) 更新时间,CONVERT(varchar(100), CollectTime, 111) + ' ' + CONVERT(varchar(5), CollectTime, 108) 更新时间1,
  180. '' FactoryId,'sc001' DeviceCode,1 FactoryId1,0 反冲流量,0 管网压力," + outTotalFlow + " 一号正累计流量,"+ outFlow + @" 一号流量计流量,0 一号负累计流量,0 一号送水泵压力,0 一号反冲泵压力,0 三号反冲泵压力,0 三号送水泵压力,0 二号反冲泵压力,0 二号正累计流量,
  181. 0 二号负累计流量,0 二号送水泵压力,0 反冲正累计流量,0 反冲负累计流量,0 四号送水泵压力,"+ inFlow + @" 仓库流量计瞬时1,0 仓库流量计瞬时2, 0 仓库流量计瞬时3,"+inTotalFlow+ @" 仓库流量计正累计1,0 仓库流量计正累计2,0 仓库流量计正累计3,0 仓库流量计负累计1,
  182. 0 仓库流量计负累计2,0 仓库流量计负累计3
  183. ,[originWaterZD] 进水浊度
  184. ,[originWaterPH] 进水PH
  185. ,[lvhouZD] 泵房出水浊度
  186. ,[lvhouYulv] 进水氨氮
  187. ,[outWaterZD] 出水浊度
  188. ,[outWaterPH] 泵房出水PH
  189. ,[outWaterYulv] 泵房出水余氯
  190. ,[qingshuichiFluidLevel] 清水池液位
  191. FROM[Waterwell].[dbo].[WaterFactoryDaliyRecord] order by id DESC";
  192. DataTable dtDevice = waterFactoryDbHelper.Fill(factorySql);
  193. if (dtDevice == null || dtDevice.Rows.Count == 0)
  194. {
  195. return;
  196. }
  197. DataColumnCollection cols = dtDevice.Columns;
  198. // 处理设备列表
  199. for (int i = 0; i < dtDevice.Rows.Count; i++)
  200. {
  201. DataRow dr = dtDevice.Rows[i];
  202. string deviceCode = dr["DeviceCode"].ToString();
  203. string type = dr["Type"].ToString();
  204. string realData = CreateJsonParameters(cols, dr);
  205. Dictionary<string, object> deviceMap = new Dictionary<string, object>();
  206. deviceMap["DeviceCode"] = deviceCode;
  207. deviceMap["Type"] = type;
  208. deviceMap["RealData"] = realData;
  209. string message = JsonConvert.SerializeObject(deviceMap);
  210. foreach (KeyValuePair<string, IModel> item in channels)
  211. {
  212. string key = item.Key;
  213. IModel channel = item.Value;
  214. IBasicProperties property = properties[key];
  215. channel.BasicPublish("workmanship", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
  216. }
  217. }
  218. log.Info("水厂工艺图数据同步任务执行结束.................\r\n");
  219. }
  220. catch (Exception ex)
  221. {
  222. log.Error("水厂工艺图数据同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
  223. }
  224. }
  225. private static string CreateJsonParameters(DataColumnCollection cols, DataRow dr)
  226. {
  227. StringBuilder JsonString = new StringBuilder();
  228. JsonString.Append("{");
  229. for (int j = 0; j < cols.Count; j++)
  230. {
  231. if (j < cols.Count - 1)
  232. {
  233. JsonString.Append("\"" + cols[j].ColumnName.ToString() + "\":" + "\"" + dr[j].ToString().Trim() + "\",");
  234. }
  235. else if (j == cols.Count - 1)
  236. {
  237. JsonString.Append("\"" + cols[j].ColumnName.ToString() + "\":" + "\"" + dr[j].ToString().Trim() + "\"");
  238. }
  239. }
  240. JsonString.Append("}");
  241. return JsonString.ToString();
  242. }
  243. static IDbProvider dbHelper
  244. {
  245. get
  246. {
  247. var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.DbConncetion);
  248. return DbDefine;
  249. }
  250. }
  251. static IDbProvider waterFactoryDbHelper
  252. {
  253. get
  254. {
  255. var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.changleWaterFactoryDb);
  256. return DbDefine;
  257. }
  258. }
  259. static IDbProvider waterQulityDbHelper
  260. {
  261. get
  262. {
  263. var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.changLeWaterQualityDb);
  264. return DbDefine;
  265. }
  266. }
  267. static IDbProvider secondDbHelper
  268. {
  269. get
  270. {
  271. var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion);
  272. return DbDefine;
  273. }
  274. }
  275. }
  276. }