Sfoglia il codice sorgente

添加水厂二供工艺图

jochu_liu 1 anno fa
parent
commit
80fd9ae9ac

+ 241 - 0
TimedUpload/QuartzJobs/ChangleWorkmanshipDataUploadJob.cs

@@ -0,0 +1,241 @@
+using log4net;
+using Newtonsoft.Json;
+using Quartz;
+using RabbitMQ.Client;
+using RDIFramework.Utilities;
+using System;
+using System.Collections.Generic;
+using System.Data;
+using System.Linq;
+using System.Text;
+
+namespace TimedUpload.QuartzJobs
+{
+    public class ChangleWorkmanshipDataUploadJob : IJob
+    {
+        private readonly ILog log = LogManager.GetLogger(typeof(WorkmanshipDataUploadJob));
+
+        public void Execute(IJobExecutionContext context)
+        {
+
+            string[] uploadUrls = Constants.UploadUrl.Split('|');
+            Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
+            Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
+            Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
+
+            foreach (string uploadUrl in uploadUrls)
+            {
+                ConnectionFactory factory = new ConnectionFactory();
+                factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
+                factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
+                factory.Password = Constants.UploadPassword;//默认密码
+
+                factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
+
+                IConnection connection = factory.CreateConnection();
+                IModel channel = connection.CreateModel();
+                channel.QueueDeclare("workmanship", true, false, false, null);//创建一个名称为kibaqueue的消息队列
+
+                IBasicProperties property = channel.CreateBasicProperties();
+                property.ContentType = "text/plain";
+                property.DeliveryMode = 2; //持久化
+                connections.Add(uploadUrl, connection);
+                properties.Add(uploadUrl, property);
+                channels.Add(uploadUrl, channel);
+            }
+
+            if (channels.Count > 0)
+            {
+                SendSeconddaryPumpData(channels, properties);
+                //SendWaterWellData(channels, properties);
+                SendWaterFactoryData(channels, properties);
+            }
+
+            foreach (KeyValuePair<string, IConnection> item in connections)
+            {
+                IConnection connection = item.Value;
+                connection.Close();
+            }
+        }
+
+        /// <summary>
+        /// 二供数据
+        /// </summary>
+        /// <param name="channel"></param>
+        private void SendSeconddaryPumpData(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
+        {
+            try
+            {
+                log.Info("二供工艺图数据同步任务开始执行.................\r\n");
+                string factorySql = "select 编码 DeviceCode,1 Type,RTRIM(日期) + ' ' +  RTRIM(时间) ReadTime,* from (select row_number() over(partition by 编码 order by 日期 desc, 时间 desc) as rid,* from 二供通用泵站 where 编码 is not null) as tt  where tt.rid=1";
+                DataTable dtDevice = dbHelper.Fill(factorySql);
+                if (dtDevice == null || dtDevice.Rows.Count == 0)
+                {
+                    return;
+                }
+
+                DataColumnCollection cols = dtDevice.Columns;
+                // 处理设备列表
+                for (int i = 0; i < dtDevice.Rows.Count; i++)
+                {
+                    DataRow dr = dtDevice.Rows[i];
+                    string deviceCode = dr["DeviceCode"].ToString();
+                    string type = dr["Type"].ToString();
+                    string realData = CreateJsonParameters(cols, dr);
+
+                    Dictionary<string, object> deviceMap = new Dictionary<string, object>();
+                    deviceMap["DeviceCode"] = deviceCode;
+                    deviceMap["Type"] = type;
+                    deviceMap["RealData"] = realData;
+
+                    string message = JsonConvert.SerializeObject(deviceMap);
+
+                    foreach (KeyValuePair<string, IModel> item in channels)
+                    {
+                        string key = item.Key;
+                        IModel channel = item.Value;
+                        IBasicProperties property = properties[key];
+                        channel.BasicPublish("workmanship", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
+                    }
+                }
+
+                log.Info("二供工艺图数据同步任务开始执行.................\r\n");
+            }
+            catch (Exception ex)
+            {
+                log.Error("二供工艺图数据同步任务错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
+            }
+        }
+
+        /// <summary>
+        /// 水源井数据
+        /// </summary>
+        /// <param name="channel"></param>
+        private void SendWaterWellData(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
+        {
+            try
+            {
+                log.Info("水源井工艺图数据同步任务开始执行.................\r\n");
+                string factorySql = "select 编码 DeviceCode,2 Type,RTRIM(日期) + ' ' +  RTRIM(时间) ReadTime,* from (select row_number() over(partition by 编码 order by 日期 desc, 时间 desc) as rid,* from 水源井数据 where 编码 is not null) as tt  where tt.rid=1";
+                DataTable dtDevice = dbHelper.Fill(factorySql);
+                if (dtDevice == null || dtDevice.Rows.Count == 0)
+                {
+                    return;
+                }
+
+                DataColumnCollection cols = dtDevice.Columns;
+                // 处理设备列表
+                for (int i = 0; i < dtDevice.Rows.Count; i++)
+                {
+                    DataRow dr = dtDevice.Rows[i];
+                    string deviceCode = dr["DeviceCode"].ToString();
+                    string type = dr["Type"].ToString();
+                    string realData = CreateJsonParameters(cols, dr);
+
+                    Dictionary<string, object> deviceMap = new Dictionary<string, object>();
+                    deviceMap["DeviceCode"] = deviceCode;
+                    deviceMap["Type"] = type;
+                    deviceMap["RealData"] = realData;
+
+                    string message = JsonConvert.SerializeObject(deviceMap);
+
+                    foreach (KeyValuePair<string, IModel> item in channels)
+                    {
+                        string key = item.Key;
+                        IModel channel = item.Value;
+                        IBasicProperties property = properties[key];
+                        channel.BasicPublish("workmanship", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
+                    }
+                }
+
+                log.Info("水源井工艺图数据同步任务结束.................\r\n");
+            }
+            catch (Exception ex)
+            {
+                log.Error("水源井工艺图数据同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
+            }
+        }
+
+        /// <summary>
+        /// 水厂数据
+        /// </summary>
+        /// <param name="channel"></param>
+        private void SendWaterFactoryData(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
+        {
+            try
+            {
+                log.Info("水厂工艺图数据同步任务开始执行.................\r\n");
+                DateTime newTime = DateTime.Now;
+                String yearStr = newTime.Year.ToString();
+                string factorySql = "";
+
+                DataTable dtDevice = dbHelper.Fill(factorySql);
+                if (dtDevice == null || dtDevice.Rows.Count == 0)
+                {
+                    return;
+                }
+
+                DataColumnCollection cols = dtDevice.Columns;
+                // 处理设备列表
+                for (int i = 0; i < dtDevice.Rows.Count; i++)
+                {
+                    DataRow dr = dtDevice.Rows[i];
+                    string deviceCode = dr["DeviceCode"].ToString();
+                    string type = dr["Type"].ToString();
+                    string realData = CreateJsonParameters(cols, dr);
+
+                    Dictionary<string, object> deviceMap = new Dictionary<string, object>();
+                    deviceMap["DeviceCode"] = deviceCode;
+                    deviceMap["Type"] = type;
+                    deviceMap["RealData"] = realData;
+
+                    string message = JsonConvert.SerializeObject(deviceMap);
+
+                    foreach (KeyValuePair<string, IModel> item in channels)
+                    {
+                        string key = item.Key;
+                        IModel channel = item.Value;
+                        IBasicProperties property = properties[key];
+                        channel.BasicPublish("workmanship", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
+                    }
+                }
+
+                log.Info("水厂工艺图数据同步任务执行结束.................\r\n");
+            }
+            catch (Exception ex)
+            {
+                log.Error("水厂工艺图数据同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
+            }
+        }
+
+        private static string CreateJsonParameters(DataColumnCollection cols, DataRow dr)
+        {
+            StringBuilder JsonString = new StringBuilder();
+
+            JsonString.Append("{");
+            for (int j = 0; j < cols.Count; j++)
+            {
+                if (j < cols.Count - 1)
+                {
+                    JsonString.Append("\"" + cols[j].ColumnName.ToString() + "\":" + "\"" + dr[j].ToString().Trim() + "\",");
+                }
+                else if (j == cols.Count - 1)
+                {
+                    JsonString.Append("\"" + cols[j].ColumnName.ToString() + "\":" + "\"" + dr[j].ToString().Trim() + "\"");
+                }
+            }
+            JsonString.Append("}");
+
+            return JsonString.ToString();
+        }
+
+        static IDbProvider dbHelper
+        {
+            get
+            {
+                var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion);
+                return DbDefine;
+            }
+        }
+    }
+}

+ 1 - 0
TimedUpload/TimedUpload.csproj

@@ -97,6 +97,7 @@
     <Compile Include="QuartzJobs\ChangleBusinessDataJob.cs" />
     <Compile Include="QuartzJobs\ChangleSecondPumpDataJob.cs" />
     <Compile Include="QuartzJobs\ChangleWaterFactoryDataJob.cs" />
+    <Compile Include="QuartzJobs\ChangleWorkmanshipDataUploadJob.cs" />
     <Compile Include="QuartzJobs\DABusinessDataJob.cs" />
     <Compile Include="QuartzJobs\NoiseDataUploadJob.cs" />
     <Compile Include="QuartzJobs\TestJob.cs" />