|
@@ -0,0 +1,265 @@
|
|
|
+using log4net;
|
|
|
+using Quartz;
|
|
|
+using RabbitMQ.Client;
|
|
|
+using RDIFramework.Utilities;
|
|
|
+using System;
|
|
|
+using System.Collections.Generic;
|
|
|
+using System.Configuration;
|
|
|
+using System.Data;
|
|
|
+using System.Linq;
|
|
|
+using System.Text;
|
|
|
+
|
|
|
+namespace TimedUpload.QuartzJobs
|
|
|
+{
|
|
|
+ [DisallowConcurrentExecution]
|
|
|
+ public class ChangleWaterQualityDataJob : IJob
|
|
|
+ {
|
|
|
+ private readonly ILog log = LogManager.GetLogger(typeof(ChangleWaterQualityDataJob));
|
|
|
+
|
|
|
+ public void Execute(IJobExecutionContext context)
|
|
|
+ {
|
|
|
+ string[] uploadUrls = Constants.UploadUrl.Split('|');
|
|
|
+ Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
|
|
|
+ Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
|
|
|
+ Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
|
|
|
+
|
|
|
+ foreach (string uploadUrl in uploadUrls)
|
|
|
+ {
|
|
|
+ ConnectionFactory factory = new ConnectionFactory();
|
|
|
+ factory.HostName = uploadUrl;
|
|
|
+ factory.UserName = Constants.UploadUserName;
|
|
|
+ factory.Password = Constants.UploadPassword;
|
|
|
+
|
|
|
+ factory.AutomaticRecoveryEnabled = true;
|
|
|
+
|
|
|
+ IConnection connection = factory.CreateConnection();
|
|
|
+ IModel channel = connection.CreateModel();
|
|
|
+ channel.QueueDeclare("zone.device", true, false, false, null);
|
|
|
+ channel.QueueDeclare("zone.deviceHis", true, false, false, null);
|
|
|
+
|
|
|
+ IBasicProperties property = channel.CreateBasicProperties();
|
|
|
+ property.ContentType = "text/plain";
|
|
|
+ property.DeliveryMode = 2;
|
|
|
+ connections.Add(uploadUrl, connection);
|
|
|
+ properties.Add(uploadUrl, property);
|
|
|
+ channels.Add(uploadUrl, channel);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (channels.Count > 0)
|
|
|
+ {
|
|
|
+ SendZoneMeterUser(channels, properties);
|
|
|
+ SendZoneMeterUserHis(channels, properties);
|
|
|
+ }
|
|
|
+
|
|
|
+ foreach (KeyValuePair<string, IConnection> item in connections)
|
|
|
+ {
|
|
|
+ IConnection connection = item.Value;
|
|
|
+ connection.Close();
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ private void SendZoneMeterUser(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
|
|
|
+ {
|
|
|
+ string changLeWaterQualityId = Constants.changLeWaterQualityId;
|
|
|
+ log.Info("水质设备基础数据同步任务开始执行.................\r\n");
|
|
|
+ while (true)
|
|
|
+ {
|
|
|
+ DataTable dt = null;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ string sql = "SELECT * FROM [设备信息] WHERE ID > " + changLeWaterQualityId + " ORDER BY ID";
|
|
|
+
|
|
|
+ dt = dbHelper.Fill(sql);
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ log.Info("水质设备基础数据同步查询数据异常" + ex.StackTrace + "\r\n");
|
|
|
+ }
|
|
|
+
|
|
|
+ if (dt == null || dt.Rows.Count == 0)
|
|
|
+ {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Info("水质设备基础数据同步获取记录数:【" + dt.Rows.Count + "】................\r\n");
|
|
|
+
|
|
|
+
|
|
|
+ StringBuilder message = new StringBuilder();
|
|
|
+ for (int i = 0; i < dt.Rows.Count; i++)
|
|
|
+ {
|
|
|
+ message.Clear();
|
|
|
+ try
|
|
|
+ {
|
|
|
+ DataRow dr = dt.Rows[i];
|
|
|
+ String iccid = "";
|
|
|
+ if (!"".Equals(dr["DeviceSN"].ToString()))
|
|
|
+ {
|
|
|
+ iccid = dr["DeviceSN"].ToString();
|
|
|
+ }
|
|
|
+
|
|
|
+ String lngAndLat = "";
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ String meterAssessmentCode = "wwkjsz" + dr["ID"].ToString();
|
|
|
+ message.Append("{");
|
|
|
+ message.Append("\"meterAssessmentName\": \"").Append(dr["DeviceName"]).Append("\",");
|
|
|
+ message.Append("\"iccId\": ").Append(iccid).Append(",");
|
|
|
+
|
|
|
+ message.Append("\"lngAndLat\": \"").Append(lngAndLat).Append("\",");
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ message.Append("\"isPressucre\": 0,");
|
|
|
+ message.Append("\"isFlow\": 0,");
|
|
|
+ message.Append("\"isZoneMeter\": 0,");
|
|
|
+ message.Append("\"isTradeMeter\": 0,");
|
|
|
+ message.Append("\"isLargeUser\": 0,");
|
|
|
+ message.Append("\"isQuality\": 1,");
|
|
|
+ message.Append("\"meterAssessmentCode\": \"").Append(meterAssessmentCode).Append("\",");
|
|
|
+ message.Append("\"manufacturerCode\": \"").Append(Constants.ManufacturerCode).Append("\",");
|
|
|
+ message.Append("\"meterTypeId\": \"2\"");
|
|
|
+ message.Append("}");
|
|
|
+ foreach (KeyValuePair<string, IModel> item in channels)
|
|
|
+ {
|
|
|
+ string key = item.Key;
|
|
|
+ IModel channel = item.Value;
|
|
|
+ IBasicProperties property = properties[key];
|
|
|
+ channel.BasicPublish("zone.device", "", property, Encoding.UTF8.GetBytes(message.ToString()));
|
|
|
+ }
|
|
|
+ changLeWaterQualityId = dr["ID"].ToString();
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ log.Info("水质设备基础数据推送失败:" + message.ToString() + "\r\n");
|
|
|
+ log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ UpdateAppConfig("ChangLeWaterQualityId", changLeWaterQualityId);
|
|
|
+ log.Info("水质设备基础数据同步任务结束执行.................\r\n");
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ private void SendZoneMeterUserHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
|
|
|
+ {
|
|
|
+ string changLeWaterQualityHisId = Constants.changLeWaterQualityHisId;
|
|
|
+ log.Info("水质设备抄表数据同步任务开始执行.................\r\n");
|
|
|
+ String sqlMeter = "SELECT * FROM [设备信息] ORDER BY ID";
|
|
|
+ DataTable dtMeter = dbHelper.Fill(sqlMeter);
|
|
|
+
|
|
|
+ for (int i = 0; i < dtMeter.Rows.Count; i++)
|
|
|
+ {
|
|
|
+ DataRow drMeter = dtMeter.Rows[i];
|
|
|
+ String meterId = drMeter["ID"].ToString();
|
|
|
+ String meterCode = drMeter["考核表编码"].ToString();
|
|
|
+ String lastTime = "";
|
|
|
+
|
|
|
+ int nowYear = DateTime.Now.Year;
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ UpdateAppConfig("ChangLeWaterQualityHisId", changLeWaterQualityHisId);
|
|
|
+ log.Info("水质设备历史记录同步任务执行结束.................\r\n");
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ private void UpdateAppConfig(String key, String value)
|
|
|
+ {
|
|
|
+ var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
|
|
|
+ cfg.AppSettings.Settings[key].Value = value;
|
|
|
+ cfg.Save();
|
|
|
+ ConfigurationManager.RefreshSection("appSettings");
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ static IDbProvider dbHelper
|
|
|
+ {
|
|
|
+ get
|
|
|
+ {
|
|
|
+ var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.changLeWaterQualityDb);
|
|
|
+ return DbDefine;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|