Ver código fonte

Merge remote-tracking branch 'origin/master'

jochu_liu 1 ano atrás
pai
commit
c2d48fe0e8

+ 3 - 0
TimedUpload/Constants.cs

@@ -29,6 +29,9 @@ namespace TimedUpload
         public static string SecondaryToDMA = ConfigurationManager.AppSettings["SecondaryToDMA"];
         public static string SecondaryToDMACode = ConfigurationManager.AppSettings["SecondaryToDMACode"];
         public static string changleWaterFactoryDb = ConfigurationManager.AppSettings["changleWaterFactoryDb"];
+        public static string changLeWaterQualityDb = ConfigurationManager.AppSettings["ChangLeWaterQualityDb"];
+        public static string changLeWaterQualityId = ConfigurationManager.AppSettings["ChangLeWaterQualityId"];
+        public static string changLeWaterQualityHisId = ConfigurationManager.AppSettings["ChangLeWaterQualityHisId"];
 
     }
 }

+ 265 - 0
TimedUpload/QuartzJobs/ChangleWaterQualityDataJob.cs

@@ -0,0 +1,265 @@
+using log4net;
+using Quartz;
+using RabbitMQ.Client;
+using RDIFramework.Utilities;
+using System;
+using System.Collections.Generic;
+using System.Configuration;
+using System.Data;
+using System.Linq;
+using System.Text;
+
+namespace TimedUpload.QuartzJobs
+{
+    [DisallowConcurrentExecution]
+    public class ChangleWaterQualityDataJob : IJob
+    {
+        private readonly ILog log = LogManager.GetLogger(typeof(ChangleWaterQualityDataJob));
+
+        public void Execute(IJobExecutionContext context)
+        {
+            string[] uploadUrls = Constants.UploadUrl.Split('|');
+            Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
+            Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
+            Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
+
+            foreach (string uploadUrl in uploadUrls)
+            {
+                ConnectionFactory factory = new ConnectionFactory();
+                factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
+                factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
+                factory.Password = Constants.UploadPassword;//默认密码
+
+                factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
+
+                IConnection connection = factory.CreateConnection();
+                IModel channel = connection.CreateModel();
+                channel.QueueDeclare("zone.device", true, false, false, null);//创建一个名称为kibaqueue的消息队列
+                channel.QueueDeclare("zone.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
+
+                IBasicProperties property = channel.CreateBasicProperties();
+                property.ContentType = "text/plain";
+                property.DeliveryMode = 2; //持久化
+                connections.Add(uploadUrl, connection);
+                properties.Add(uploadUrl, property);
+                channels.Add(uploadUrl, channel);
+            }
+
+            if (channels.Count > 0)
+            {
+                SendZoneMeterUser(channels, properties);
+                SendZoneMeterUserHis(channels, properties);
+            }
+
+            foreach (KeyValuePair<string, IConnection> item in connections)
+            {
+                IConnection connection = item.Value;
+                connection.Close();
+            }
+
+        }
+
+        /// <summary>
+        /// 水质设备添加
+        /// </summary>
+        /// <param name="channel"></param>
+        private void SendZoneMeterUser(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
+        {
+            string changLeWaterQualityId = Constants.changLeWaterQualityId;
+            log.Info("水质设备基础数据同步任务开始执行.................\r\n");
+            while (true)
+            {
+                DataTable dt = null;
+                try
+                {
+                    string sql = "SELECT * FROM [设备信息] WHERE ID > " + changLeWaterQualityId + " ORDER BY ID";
+
+                    dt = dbHelper.Fill(sql);
+                }
+                catch (Exception ex)
+                {
+                    log.Info("水质设备基础数据同步查询数据异常" + ex.StackTrace + "\r\n");
+                }
+
+                if (dt == null || dt.Rows.Count == 0)
+                {
+                    break;
+                }
+
+                log.Info("水质设备基础数据同步获取记录数:【" + dt.Rows.Count + "】................\r\n");
+
+
+                StringBuilder message = new StringBuilder();
+                for (int i = 0; i < dt.Rows.Count; i++)
+                {
+                    message.Clear();
+                    try
+                    {
+                        DataRow dr = dt.Rows[i];
+                        String iccid = "";
+                        if (!"".Equals(dr["DeviceSN"].ToString()))
+                        {
+                            iccid = dr["DeviceSN"].ToString();
+                        }
+
+                        String lngAndLat = "";
+                        //if (!"".Equals(dr["X坐标"].ToString()) && !"".Equals(dr["Y坐标"].ToString()))
+                        //{
+                        //    lngAndLat = dr["X坐标"].ToString() + "|" + dr["Y坐标"].ToString();
+                        //}
+                        String meterAssessmentCode = "wwkjsz" + dr["ID"].ToString();
+                        message.Append("{");
+                        message.Append("\"meterAssessmentName\": \"").Append(dr["DeviceName"]).Append("\",");
+                        message.Append("\"iccId\": ").Append(iccid).Append(",");
+                        //message.Append("\"areaId\": 22,");
+                        message.Append("\"lngAndLat\": \"").Append(lngAndLat).Append("\",");
+                        //message.Append("\"pipeCailber\": \"DN32\",");
+                        //message.Append("\"pipeTexture\": \"PVC\",");
+                        //message.Append("\"imei\": \"77564212\",");
+                        message.Append("\"isPressucre\": 0,");
+                        message.Append("\"isFlow\": 0,");
+                        message.Append("\"isZoneMeter\": 0,");
+                        message.Append("\"isTradeMeter\": 0,");
+                        message.Append("\"isLargeUser\": 0,");
+                        message.Append("\"isQuality\": 1,");
+                        message.Append("\"meterAssessmentCode\": \"").Append(meterAssessmentCode).Append("\",");
+                        message.Append("\"manufacturerCode\": \"").Append(Constants.ManufacturerCode).Append("\",");
+                        message.Append("\"meterTypeId\": \"2\"");
+                        message.Append("}");
+                        foreach (KeyValuePair<string, IModel> item in channels)
+                        {
+                            string key = item.Key;
+                            IModel channel = item.Value;
+                            IBasicProperties property = properties[key];
+                            channel.BasicPublish("zone.device", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息
+                        }
+                        changLeWaterQualityId = dr["ID"].ToString();
+                    }
+                    catch (Exception ex)
+                    {
+                        log.Info("水质设备基础数据推送失败:" + message.ToString() + "\r\n");
+                        log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n");
+                    }
+                }
+
+            }
+            UpdateAppConfig("ChangLeWaterQualityId", changLeWaterQualityId);
+            log.Info("水质设备基础数据同步任务结束执行.................\r\n");
+
+        }
+
+        /// <summary>
+        /// 水质设备历史数据
+        /// </summary>
+        /// <param name="channel"></param>
+        private void SendZoneMeterUserHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
+        {
+            string changLeWaterQualityHisId = Constants.changLeWaterQualityHisId;
+            log.Info("水质设备抄表数据同步任务开始执行.................\r\n");
+            String sqlMeter = "SELECT * FROM [设备信息] ORDER BY ID";
+            DataTable dtMeter = dbHelper.Fill(sqlMeter);
+
+            for (int i = 0; i < dtMeter.Rows.Count; i++)
+            {
+                DataRow drMeter = dtMeter.Rows[i];
+                String meterId = drMeter["ID"].ToString();
+                String meterCode = drMeter["考核表编码"].ToString();
+                String lastTime = "";
+
+                int nowYear = DateTime.Now.Year;
+
+                //for (int k = lastYear; k <= nowYear; k++)
+                //{
+                //    string tablename = "历史记录_" + ("000000" + meterId).Substring(meterId.Length, 6) + "_" + k;
+
+                    
+                //    DataTable dtMeterHis = dbHelper.Fill("");
+
+                //    StringBuilder message = new StringBuilder();
+                //    for (int j = 0; j < dtMeterHis.Rows.Count; j++)
+                //    {
+                //        message.Clear();
+                //        try
+                //        {
+                //            DataRow drMeterHis = dtMeterHis.Rows[j];
+                //            String getDateTime = Convert.ToDateTime(drMeterHis["采集时间"]).ToString("yyyy-MM-dd HH:mm:ss");
+                //            message.Append("{");
+                //            message.Append("\"meterAssessmentCode\": \"").Append(meterCode).Append("\",");
+                //            message.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(",");
+                //            message.Append("\"getDateTime\": \"").Append(getDateTime).Append("\",");
+                //            if (Convert.DBNull != drMeterHis["净累计流量"])
+                //            {
+                //                message.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["净累计流量"])).Append(",");
+                //            }
+                //            if (Convert.DBNull != drMeterHis["正累计流量"])
+                //            {
+                //                message.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["正累计流量"])).Append(",");
+                //            }
+                //            if (Convert.DBNull != drMeterHis["负累计流量"])
+                //            {
+                //                message.Append("\"negativeCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["负累计流量"])).Append(",");
+                //            }
+                //            if (Convert.DBNull != drMeterHis["瞬时流量"])
+                //            {
+                //                message.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(drMeterHis["瞬时流量"])).Append(",");
+                //            }
+                //            if (Convert.DBNull != drMeterHis["压力"])
+                //            {
+                //                message.Append("\"pressure\": ").Append(Convert.ToDecimal(drMeterHis["压力"])).Append(",");
+                //            }
+                //            if (Convert.DBNull != drMeterHis["电池电压"])
+                //            {
+                //                message.Append("\"batteryVoltageValue\": ").Append(Convert.ToDecimal(drMeterHis["电池电压"])).Append(",");
+                //            }
+                //            message.Append("}");
+
+                //            foreach (KeyValuePair<string, IModel> item in channels)
+                //            {
+                //                string key = item.Key;
+                //                IModel channel = item.Value;
+                //                IBasicProperties property = properties[key];
+                //                channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息
+                //            }
+                //            lastTime = getDateTime;
+                //        }
+                //        catch (Exception ex)
+                //        {
+                //            log.Info("水质设备历史记录同步任务数据推送失败:" + message.ToString() + "\r\n");
+                //            log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n");
+                //        }
+                //    }
+                //}
+                //if (!"".Equals(lastTime))
+                //{
+                //    uploadHis[meterCode] = lastTime;
+                //}
+            }
+
+            UpdateAppConfig("ChangLeWaterQualityHisId", changLeWaterQualityHisId);
+            log.Info("水质设备历史记录同步任务执行结束.................\r\n");
+        }
+
+        /// <summary>
+        /// 更新配置文件中的值
+        /// </summary>
+        /// <param name="key">键</param>
+        /// <param name="value">值</param>
+        private void UpdateAppConfig(String key, String value)
+        {
+            var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
+            cfg.AppSettings.Settings[key].Value = value;
+            cfg.Save();
+            ConfigurationManager.RefreshSection("appSettings");
+
+        }
+
+        static IDbProvider dbHelper
+        {
+            get
+            {
+                var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.changLeWaterQualityDb);
+                return DbDefine;
+            }
+        }
+    }
+}

+ 1 - 0
TimedUpload/TimedUpload.csproj

@@ -94,6 +94,7 @@
     <Compile Include="Constants.cs" />
     <Compile Include="Program.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="QuartzJobs\ChangleWaterQualityDataJob.cs" />
     <Compile Include="QuartzJobs\ChangleBusinessDataJob.cs" />
     <Compile Include="QuartzJobs\ChangleSecondPumpDataJob.cs" />
     <Compile Include="QuartzJobs\ChangleWaterFactoryDataJob.cs" />

+ 3 - 0
TimedUpload/app.config

@@ -6,6 +6,7 @@
     <add key="ChargeDB" value="Data Source=222.163.159.218,10433;Initial Catalog=ChargeManage;uid=sa;password=Daswwwkj@123" />
     <add key="zhihuishuiwuDB" value="server=39.99.237.110;user id=root;password=wwkj@2136807;database=smartwater_daan;charset=utf8" />
     <add key="changleWaterFactoryDb" value=""/>
+    <add key="ChangLeWaterQualityDb" value="Data Source=47.105.90.108;Initial Catalog=舜水水质;uid=sa;password=wwkj@2136807" />
     <!-- 智慧水务系统RabbitMQ信息 start -->
     <add key="UploadUrl" value="127.0.0.1" />
     <add key="UploadPort" value="5678" />
@@ -18,6 +19,8 @@
     <add key="SecondaryToDMA" value="1" />
     <!-- 二供设备已同步到分区计量的设备编码 -->
     <add key="SecondaryToDMACode" value="" />
+    <add key="ChangLeWaterQualityId" value="0" />
+    <add key="ChangLeWaterQualityHisId" value="0" />
 
     <add key="BmId" value="154" />
     <add key="UserMeterId" value="78843" />