ChangleWaterFactoryDataJob.cs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. using log4net;
  2. using Newtonsoft.Json;
  3. using Quartz;
  4. using RabbitMQ.Client;
  5. using RDIFramework.Utilities;
  6. using System;
  7. using System.Collections;
  8. using System.Collections.Generic;
  9. using System.Data;
  10. using System.Linq;
  11. using System.Text;
  12. namespace TimedUpload.QuartzJobs
  13. {
  14. [DisallowConcurrentExecution]
  15. public class ChangleWaterFactoryDataJob : IJob
  16. {
  17. private readonly ILog log = LogManager.GetLogger(typeof(ChangleWaterFactoryDataJob));
  18. public void Execute(IJobExecutionContext context)
  19. {
  20. string[] uploadUrls = Constants.UploadUrl.Split('|');
  21. Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
  22. Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
  23. Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
  24. foreach (string uploadUrl in uploadUrls)
  25. {
  26. ConnectionFactory factory = new ConnectionFactory();
  27. factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
  28. factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
  29. factory.Password = Constants.UploadPassword;//默认密码
  30. factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
  31. IConnection connection = factory.CreateConnection();
  32. IModel channel = connection.CreateModel();
  33. channel.QueueDeclare("waterFactory.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
  34. IBasicProperties property = channel.CreateBasicProperties();
  35. property.ContentType = "text/plain";
  36. property.DeliveryMode = 2; //持久化
  37. connections.Add(uploadUrl, connection);
  38. properties.Add(uploadUrl, property);
  39. channels.Add(uploadUrl, channel);
  40. }
  41. if (channels.Count > 0)
  42. {
  43. SendWaterFactoryHis(channels, properties);
  44. }
  45. foreach (KeyValuePair<string, IConnection> item in connections)
  46. {
  47. IConnection connection = item.Value;
  48. connection.Close();
  49. }
  50. }
  51. /// <summary>
  52. /// 水厂历史数据
  53. /// </summary>
  54. /// <param name="channels"></param>
  55. /// <param name="properties"></param>
  56. private void SendWaterFactoryHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
  57. {
  58. try
  59. {
  60. Object inPh= null, inChlorine = null, inTurbidty = null, inTurbidtytwo = null, outPh = null, outChlorine = null, outTurbidty = null, level = null;
  61. DateTime nowTime = DateTime.Now;
  62. log.Info("昌乐水厂数据同步任务开始执行.................\r\n");
  63. string factorySql = "SELECT [id],[inMeterId],[outMeterId],[uploadTime],[waterFactory] FROM [dbo].[uploadList]";
  64. DataTable dtFactory = waterFHelper.Fill(factorySql);
  65. if (dtFactory == null || dtFactory.Rows.Count == 0)
  66. {
  67. return;
  68. }
  69. // 处理水厂水表
  70. for (int i = 0; i < dtFactory.Rows.Count; i++)
  71. {
  72. string factoryCode = dtFactory.Rows[i]["waterFactory"].ToString();
  73. // DataTable dtMeter = dbHelper.Fill(meterSql);
  74. //if (dtMeter == null || dtMeter.Rows.Count == 0)
  75. //{
  76. // continue;
  77. //}
  78. Dictionary<string, object> factoryMap = new Dictionary<string, object>();
  79. factoryMap["FactoryCode"] = factoryCode;
  80. // 处理水厂基础数据
  81. // string baseSql = "SELECT RTRIM(日期) + ' ' + RTRIM(时间) ReadTime,反冲正累计流量 - 反冲负累计流量 BackWashing,[清水池液位] LiquidHeight,更新时间 FROM [dbo].[水厂泵房2] WHERE FactoryId = " + factoryId + " ORDER BY 日期, 时间 ";
  82. // DataTable baseDt = dbHelper.Fill(baseSql);
  83. DateTime searchTime = Convert.ToDateTime(dtFactory.Rows[i]["uploadTime"]);
  84. TimeSpan sTs = new TimeSpan(searchTime.Ticks);
  85. TimeSpan eTs = new TimeSpan(nowTime.Ticks);
  86. TimeSpan ts = eTs - sTs;
  87. int num = (int)ts.TotalMinutes / 5;
  88. if (num > 200) {
  89. num = 200;
  90. }
  91. for (int l = 0; l < num; l++)
  92. {
  93. log.Info(l);
  94. // DataRow baseRow = baseDt.Rows[l];
  95. Dictionary<string, object> baseMap = new Dictionary<string, object>();
  96. ArrayList meterDataList = new ArrayList();
  97. searchTime = searchTime.AddMinutes(5);
  98. string time = searchTime.ToString("yyyy/MM/dd HH:mm:ss");// baseRow["ReadTime"].ToString();
  99. string year = searchTime.Year.ToString();
  100. baseMap["ReadTime"] = time;
  101. baseMap["BackWashing"] = "0";//baseRow["BackWashing"].ToString();
  102. baseMap["Consumption"] = "0";
  103. baseMap["CleanWaterFlow"] = "0";
  104. baseMap["Dosage"] = "0";
  105. // 进水
  106. string insql = @"SELECT TOP 1 [id],[设备ID],[记录时间],[采集时间],[设备状态],[通讯状态],[净累计流量],[瞬时流量] FROM [dbo].[历史记录_000513_" + year + "] where 采集时间 >= '" + time + "'";
  107. DataTable inDt = dbHelper.Fill(insql);
  108. if (inDt != null && inDt.Rows.Count > 0)
  109. {
  110. Dictionary<string, object> meterMap = new Dictionary<string, object>();
  111. meterMap["InstantaneousFlow"] = inDt.Rows[0]["瞬时流量"];
  112. meterMap["NetCumulativeFlow"] = inDt.Rows[0]["净累计流量"];
  113. meterMap["Pressure"] = null;
  114. meterMap["PH"] = null;
  115. meterMap["Chlorine"] = null;
  116. meterMap["Turbidity"] = null;
  117. meterMap["TurbidityTwo"] = null;
  118. meterMap["ReadTime"] = time;
  119. meterMap["MeterCode"] = "wwkj001";
  120. meterDataList.Add(meterMap);
  121. }
  122. else
  123. {
  124. break;
  125. Dictionary<string, object> meterMap = new Dictionary<string, object>();
  126. meterMap["InstantaneousFlow"] = null;
  127. meterMap["NetCumulativeFlow"] = null;
  128. meterMap["Pressure"] = null;
  129. meterMap["PH"] = null;
  130. meterMap["Chlorine"] = null;
  131. meterMap["Turbidity"] = null;
  132. meterMap["TurbidityTwo"] = null;
  133. meterMap["ReadTime"] = time;
  134. meterMap["MeterCode"] = "wwkj001";
  135. meterDataList.Add(meterMap);
  136. }
  137. // 出水
  138. string outsql = @"SELECT TOP 1 [id],[设备ID],[记录时间],[采集时间],[设备状态],[通讯状态],[净累计流量],[瞬时流量] FROM [dbo].[历史记录_000286_" + year + "] where 采集时间 >= '" + time + "'"; ;
  139. DataTable outDt = dbHelper.Fill(outsql);
  140. if (outDt != null && outDt.Rows.Count > 0)
  141. {
  142. Dictionary<string, object> meterMap = new Dictionary<string, object>();
  143. meterMap["InstantaneousFlow"] = outDt.Rows[0]["瞬时流量"];
  144. meterMap["NetCumulativeFlow"] = outDt.Rows[0]["净累计流量"];
  145. meterMap["Pressure"] = null;
  146. meterMap["PH"] = null;
  147. meterMap["Chlorine"] = null;
  148. meterMap["Turbidity"] = null;
  149. meterMap["TurbidityTwo"] = null;
  150. meterMap["ReadTime"] = time;
  151. meterMap["MeterCode"] = "wwkj005";
  152. meterDataList.Add(meterMap);
  153. }
  154. else
  155. {
  156. break;
  157. Dictionary<string, object> meterMap = new Dictionary<string, object>();
  158. meterMap["InstantaneousFlow"] = null;
  159. meterMap["NetCumulativeFlow"] = null;
  160. meterMap["Pressure"] = null;
  161. meterMap["PH"] = null;
  162. meterMap["Chlorine"] = null;
  163. meterMap["Turbidity"] = null;
  164. meterMap["TurbidityTwo"] = null;
  165. meterMap["ReadTime"] = time;
  166. meterMap["MeterCode"] = "wwkj005";
  167. meterDataList.Add(meterMap);
  168. }
  169. // 进水水质和出水水质
  170. string waterQuelitySql = @"SELECT TOP 1 [id]
  171. ,[originWaterZD],[originWaterPH],[chendianchiOutWaterZD],[lvhouZD]
  172. ,[lvhouYulv],[qingshuichiFlow],[gongshuishuitaOutFlow]
  173. ,[outWaterZD],[outWaterPH],[outWaterYulv]
  174. ,[qingshuichiFluidLevel],[CollectTime],[CreateDt]
  175. FROM[dbo].[WaterFactoryDaliyRecord] where CreateDt >= '" + searchTime.AddMinutes(-1).ToString("yyyy-MM-dd HH:mm") + "' ";
  176. DataTable wqDt = waterFHelper.Fill(waterQuelitySql);
  177. if (wqDt != null && wqDt.Rows.Count > 0)
  178. {
  179. inPh = wqDt.Rows[0]["originWaterPH"];
  180. inChlorine = wqDt.Rows[0]["lvhouYulv"];
  181. inTurbidty = wqDt.Rows[0]["originWaterZD"];
  182. inTurbidtytwo = wqDt.Rows[0]["lvhouZD"];
  183. outPh = wqDt.Rows[0]["outWaterPH"];
  184. outChlorine = wqDt.Rows[0]["outWaterYulv"];
  185. outTurbidty = wqDt.Rows[0]["outWaterZD"];
  186. level = wqDt.Rows[0]["qingshuichiFluidLevel"];
  187. }
  188. Dictionary<string, object> meterMap1 = new Dictionary<string, object>(); // 进水
  189. meterMap1["InstantaneousFlow"] = null;
  190. meterMap1["NetCumulativeFlow"] = null;
  191. meterMap1["Pressure"] = null;
  192. meterMap1["PH"] = inPh;
  193. meterMap1["Chlorine"] = inChlorine;
  194. meterMap1["Turbidity"] = inTurbidty;
  195. meterMap1["TurbidityTwo"] = inTurbidtytwo;
  196. meterMap1["ReadTime"] = time;
  197. meterMap1["MeterCode"] = "wwkj008";
  198. meterDataList.Add(meterMap1);
  199. Dictionary<string, object> meterMap2 = new Dictionary<string, object>(); // 出水
  200. meterMap2["InstantaneousFlow"] = null;
  201. meterMap2["NetCumulativeFlow"] = null;
  202. meterMap2["PH"] = outPh;
  203. meterMap2["Chlorine"] = outChlorine;
  204. meterMap2["Turbidity"] = outTurbidty;
  205. meterMap2["TurbidityTwo"] = null;
  206. meterMap2["Pressure"] = null;
  207. meterMap2["ReadTime"] = time;
  208. meterMap2["MeterCode"] = "wwkj004";
  209. meterDataList.Add(meterMap2);
  210. baseMap["LiquidHeight"] = level;//baseRow["LiquidHeight"].ToString();
  211. #region
  212. // 处理水厂查询的数据
  213. //for (int j = 0; j < dtMeter.Rows.Count; j++)
  214. //{
  215. // string tableName = dtMeter.Rows[j]["TableName"].ToString();
  216. // string meterId = dtMeter.Rows[j]["MeterId"].ToString();
  217. // string meterCode = dtMeter.Rows[j]["MeterCode"].ToString();
  218. // string colSql = "select * from 水厂数据字段 where MeterId = " + meterId;
  219. // DataTable dtMeterCol = dbHelper.Fill(colSql);
  220. // if (dtMeterCol == null || dtMeterCol.Rows.Count == 0)
  221. // {
  222. // continue;
  223. // }
  224. // string queryCol = "";
  225. // string[] queryColArr = new string[dtMeterCol.Rows.Count];
  226. // for (int k = 0; k < dtMeterCol.Rows.Count; k++)
  227. // {
  228. // string tableColumn = dtMeterCol.Rows[k]["TableColumn"].ToString();
  229. // string sendKey = dtMeterCol.Rows[k]["SendKey"].ToString();
  230. // if (k == dtMeterCol.Rows.Count - 1)
  231. // {
  232. // queryCol += tableColumn + " " + sendKey;
  233. // }
  234. // else
  235. // {
  236. // queryCol += tableColumn + " " + sendKey + ",";
  237. // }
  238. // queryColArr[k] = sendKey;
  239. // }
  240. // Dictionary<string, object> meterMap = new Dictionary<string, object>();
  241. // string[] colArr = waterFactoryColumn.Split(',');
  242. // foreach (string col in colArr)
  243. // {
  244. // meterMap[col] = null;
  245. // }
  246. // string meterDataSql = "select " + queryCol + " from " + tableName + " where 更新时间 = '" + id + "'";
  247. // DataTable data = dbHelper.Fill(meterDataSql);
  248. // if (data == null || data.Rows.Count == 0)
  249. // {
  250. // continue;
  251. // }
  252. // foreach (string col in queryColArr)
  253. // {
  254. // meterMap[col] = data.Rows[0][col];
  255. // }
  256. // meterMap["MeterCode"] = meterCode;
  257. // meterDataList.Add(meterMap);
  258. //}
  259. #endregion
  260. factoryMap["BaseData"] = baseMap;
  261. factoryMap["MeterData"] = meterDataList;
  262. string message = JsonConvert.SerializeObject(factoryMap);
  263. log.Info(message);
  264. foreach (KeyValuePair<string, IModel> item in channels)
  265. {
  266. string key = item.Key;
  267. IModel channel = item.Value;
  268. IBasicProperties property = properties[key];
  269. channel.BasicPublish("waterFactory.deviceHis", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
  270. }
  271. // 删除数据
  272. String updatesql = "UPDATE [dbo].[uploadList] SET uploadTime = '" + time + "' WHERE [id] = " + dtFactory.Rows[i]["id"].ToString();
  273. waterFHelper.ExecuteNonQuery(updatesql);
  274. }
  275. }
  276. log.Info("水厂历史记录同步任务执行结束.................\r\n");
  277. }
  278. catch (Exception ex)
  279. {
  280. log.Error("水厂历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
  281. }
  282. }
  283. /// <summary>
  284. /// 大表数据
  285. /// </summary>
  286. static IDbProvider dbHelper
  287. {
  288. get
  289. {
  290. var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.DbConncetion);
  291. return DbDefine;
  292. }
  293. }
  294. /// <summary>
  295. /// 水厂数据库
  296. /// </summary>
  297. static IDbProvider waterFHelper
  298. {
  299. get
  300. {
  301. var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.changleWaterFactoryDb);
  302. return DbDefine;
  303. }
  304. }
  305. }
  306. }