Преглед на файлове

添加昌乐二供同步

jochu_liu преди 1 година
родител
ревизия
46bd797ec9
променени са 4 файла, в които са добавени 388 реда и са изтрити 5 реда
  1. 364 0
      TimedUpload/QuartzJobs/ChangleSecondPumpDataJob.cs
  2. 2 1
      TimedUpload/TimedUpload.csproj
  3. 1 1
      TimedUpload/app.config
  4. 21 3
      TimedUpload/quartz_jobs.xml

+ 364 - 0
TimedUpload/QuartzJobs/ChangleSecondPumpDataJob.cs

@@ -0,0 +1,364 @@
+using log4net;
+using Newtonsoft.Json;
+using Quartz;
+using RabbitMQ.Client;
+using RDIFramework.Utilities;
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Configuration;
+using System.Data;
+using System.Linq;
+using System.Text;
+
+namespace TimedUpload.QuartzJobs
+{
+    [DisallowConcurrentExecution]
+    public class ChangleSecondPumpDataJob :IJob
+    {
+        private readonly ILog log = LogManager.GetLogger(typeof(SecondaryPumpDataUploadJob));
+
+        public void Execute(IJobExecutionContext context)
+        {
+
+            try
+            {
+                string[] uploadUrls = Constants.UploadUrl.Split('|');
+                Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
+                Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
+                Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
+
+                foreach (string uploadUrl in uploadUrls)
+                {
+                    ConnectionFactory factory = new ConnectionFactory();
+                    factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
+                    factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
+                    factory.Password = Constants.UploadPassword;//默认密码
+
+                    factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
+
+                    IConnection connection = factory.CreateConnection();
+                    IModel channel = connection.CreateModel();
+                    channel.QueueDeclare("secondaryPump.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
+
+                    IBasicProperties property = channel.CreateBasicProperties();
+                    property.ContentType = "text/plain";
+                    property.DeliveryMode = 2; //持久化
+                    connections.Add(uploadUrl, connection);
+                    properties.Add(uploadUrl, property);
+                    channels.Add(uploadUrl, channel);
+                }
+
+                if (channels.Count > 0)
+                {
+                    SendSecondaryPumpHis(channels, properties);
+                }
+
+                foreach (KeyValuePair<string, IConnection> item in connections)
+                {
+                    IConnection connection = item.Value;
+                    connection.Close();
+                }
+            }
+            catch (Exception ex)
+            {
+
+                log.Debug(ex.Message);
+            }
+        }
+
+        /// <summary>
+        /// 二供历史数据
+        /// </summary>
+        /// <param name="channels"></param
+        /// <param name="properties"></param>
+        private void SendSecondaryPumpHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
+        {
+            try
+            {
+                log.Info("二供历史数据同步任务开始执行.................\r\n");
+                string factorySql = "SELECT * FROM [updateList20230412]";
+                DataTable dtDevice = dbHelper.Fill(factorySql);
+                string secondaryPumpColumn = Constants.SecondaryPumpColumn;
+                if (string.IsNullOrEmpty(secondaryPumpColumn) || dtDevice == null || dtDevice.Rows.Count == 0)
+                {
+                    return;
+                }
+
+                // 处理设备列表
+
+                for (int k = 0; k < dtDevice.Rows.Count; k++)
+                {
+                    string table = "历史记录_" + dtDevice.Rows[k]["devId"].ToString().PadLeft(6,'0')+DateTime.Now.AddYears(-1).Year.ToString();
+                    String sql = @"SELECT TOP 2000 [id]
+                        ,[设备ID],[记录时间],[采集时间]
+                        ,[设备状态],[通讯状态],[数据来源]
+                        ,[表1A相电压],[表1B相电压],[表1C相电压]
+                        ,[表1A相电流],[表1B相电流],[表1C相电流]
+                        ,[表1电能]
+                        ,[表2A相电压],[表2B相电压],[表2C相电压]
+                        ,[表2A相电流],[表2B相电流],[表2C相电流]
+                        ,[表3A相电压],[表3B相电压],[表3C相电压]
+                        ,[表3A相电流],[表3B相电流] ,[表3C相电流]
+                        ,[表3电能],[信号质量],[门开关]
+                        ,[泵3状态],[泵2状态],[泵1状态]
+                        ,[断电监测]
+                        ,[一号泵有功功率],[一号泵频率]
+                        ,[二号泵有功功率],[二号泵频率]
+                        ,[三号泵有功功率],[三号泵频率]
+                        ,[出水设定压力],[出水端实际压力]
+                        ,[进水设定压力],[进水端实际压力]
+                        ,[瞬时流量],[累计流量]
+                    FROM [dbo].[历史记录_000015_2022] where 采集时间 > '" + Convert.ToDateTime(dtDevice.Rows[k]["uploadTime"]).ToString("yyyy-MM-dd HH:mm:ss") + "' ORDER BY 采集时间";
+                    DataTable dtHistory = dbHelper.Fill(sql); 
+                    for (int i = 0; i < dtHistory.Rows.Count; i++)
+                    {
+                        #region 
+                        DataRow dr = dtHistory.Rows[i];
+                        string id = dr["记录时间"].ToString();
+                        string deviceCode = dtDevice.Rows[k]["code"].ToString();//dr["编码"].ToString();
+                        string PressureIn = dr["进水端实际压力"].ToString();
+                        string PressureOut = dr["出水端实际压力"].ToString();
+                        string PressureSet = dr["出水设定压力"].ToString();
+                        string InstantFlow = dr["瞬时流量"].ToString();
+                        string TotalFlow = dr["累计流量"].ToString();
+                        string PositiveToTalFlow = "0";//dr["正累计流量"].ToString();
+                        string NegativeTotalFlow = "0";// dr["负累计流量"].ToString();
+                        string PH = "0";// dr["PH"].ToString();
+                        string Chlorine = "0";//dr["余氯"].ToString();
+                        string Turbidity = "0";//dr["浊度"].ToString();
+                        string LiquidHeight = "0";//dr["水箱液位"].ToString();
+                        string VoltageA = dr["表1A相电压"].ToString();
+                        string VoltageB = dr["表1B相电压"].ToString();
+                        string VoltageC = dr["表1C相电压"].ToString();
+                        string CurrentA = dr["表1A相电流"].ToString();
+                        string CurrentB = dr["表1B相电流"].ToString();
+                        string CurrentC = dr["表1C相电流"].ToString();
+                        string Consumption = dr["表1电能"].ToString();
+                        string LackWater = "0";//dr["缺水报警"].ToString();
+                        string OverPressure = "0";//dr["超压报警"].ToString();
+                        string HouseInlet = "0";//dr["进水报警"].ToString(); // 进水报警0为正常,1为进水。
+                        string TubeBurst = "0";//dr["爆管报警"].ToString();
+                        string NetState = "0";//dr["网络状态"].ToString(); // 0代表通讯正常,1代表网络故障,2代表现场485设备通讯故障
+
+                        string readTime = Convert.ToDateTime(dr["采集时间"]).ToString("yyyy/MM/dd HH:mm:ss");
+
+                        Dictionary<string, object> deviceMap = new Dictionary<string, object>();
+                        deviceMap["DeviceCode"] = deviceCode;
+                        deviceMap["ReadTime"] = readTime;
+                        deviceMap["PressureIN"] = PressureIn;
+                        deviceMap["PressureOut"] = PressureOut;
+                        deviceMap["PressureSet"] = PressureSet;
+                        deviceMap["InstantFlow"] = InstantFlow;
+                        deviceMap["TotalFlow"] = TotalFlow;
+                        deviceMap["PositiveToTalFlow"] = PositiveToTalFlow;
+                        deviceMap["NegativeTotalFlow"] = NegativeTotalFlow;
+                        deviceMap["PH"] = PH;
+                        deviceMap["Chlorine"] = Chlorine;
+                        deviceMap["Turbidity"] = Turbidity;
+                        deviceMap["LiquidHeight"] = LiquidHeight;
+                        deviceMap["VoltageA"] = VoltageA;
+                        deviceMap["VoltageB"] = VoltageB;
+                        deviceMap["VoltageC"] = VoltageC;
+                        deviceMap["CurrentA"] = CurrentA;
+                        deviceMap["CurrentB"] = CurrentB;
+                        deviceMap["CurrentC"] = CurrentC;
+                        deviceMap["Consumption"] = Consumption;
+                        deviceMap["LackWater"] = LackWater;
+                        deviceMap["OverPressure"] = OverPressure;
+                        deviceMap["HouseInlet"] = HouseInlet;
+                        deviceMap["TubeBurst"] = TubeBurst;
+                        deviceMap["NetState"] = NetState;
+                        ArrayList meterDataList = new ArrayList();
+                        // 处理泵的数据
+                        for (int j = 1; j < 4; j++)
+                        {
+                            Dictionary<string, object> meterMap = new Dictionary<string, object>();
+                            string colPre = "";
+                            string state = "";
+                            string colPrePower = "";
+                            switch (j)
+                            {
+                                case 1:
+                                    colPre = "一号泵";
+                                    state = "泵1状态";
+                                    colPrePower = "表1A相";
+                                    break;
+                                case 2:
+                                    colPre = "二号泵";
+                                    state = "泵2状态";
+                                    colPrePower = "表2A相";
+                                    break;
+                                case 3:
+                                    colPre = "三号泵";
+                                    state = "泵3状态";
+                                    colPrePower = "表3A相";
+                                    break;
+                            }
+
+                            string meterCode = "wwkj" + j.ToString().PadLeft(3, '0');
+                            string frequency = dr[colPre + "频率"].ToString();
+                            string current = dr[colPrePower + "电流"].ToString();
+                            string runState = dr[state].ToString();
+                            string power = dr[colPre + "有功功率"].ToString();
+                            string voltage = dr[colPrePower + "电压"].ToString();
+                            meterMap["PumpCode"] = meterCode;
+                            meterMap["ReadTime"] = readTime;
+                            meterMap["Frequency"] = frequency;
+                            meterMap["Current"] = current;
+                            meterMap["RunState"] = runState;
+                            meterMap["Power"] = power;
+                            meterMap["Voltage"] = voltage;
+
+                            meterDataList.Add(meterMap);
+                        }
+
+                        if (meterDataList.Count == 0)
+                        {
+                            continue;
+                        }
+                        deviceMap["PumpData"] = meterDataList;
+
+
+                        string message = JsonConvert.SerializeObject(deviceMap);
+                        // log.Info(message);
+                        foreach (KeyValuePair<string, IModel> item in channels)
+                        {
+                            
+                            string key = item.Key;
+                            // log.Info("secondaryPump.deviceHis | " + key);
+                            IModel channel = item.Value;
+                            IBasicProperties property = properties[key];
+                            
+                            channel.BasicPublish("secondaryPump.deviceHis", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
+                            // log.Info("______________________________________");
+                        }
+                        #endregion 
+                        string sqlUpdate = "UPDATE updateList20230412 set uploadTime = '" + readTime + "' where id = " + dtDevice.Rows[k]["id"].ToString();
+                        dbHelper.ExecuteNonQuery(sqlUpdate) ;
+
+
+
+
+                    }
+                }
+               
+
+                log.Info("二供历史记录同步任务执行结束.................\r\n");
+            }
+            catch (Exception ex)
+            {
+                log.Error("二供历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
+            }
+        }
+
+        private string SecondaryToDMA(DataRow dr, Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties, string newSecondaryToDMACode)
+        {
+            string deviceName = dr["编号"].ToString();
+
+            string deviceCode = dr["编码"].ToString() + "sc";
+
+            #region 设备同步
+            if (newSecondaryToDMACode != null && newSecondaryToDMACode.IndexOf(deviceCode) < 0)
+            {
+                StringBuilder messageDevice = new StringBuilder();
+                messageDevice.Append("{");
+                messageDevice.Append("\"meterAssessmentName\": \"").Append(deviceName).Append("\",");
+                messageDevice.Append("\"isPressucre\": 1,");
+                messageDevice.Append("\"isFlow\": 1,");
+                messageDevice.Append("\"isZoneMeter\": 1,");
+                messageDevice.Append("\"isTradeMeter\": 0,");
+                messageDevice.Append("\"isLargeUser\": 0,");
+                messageDevice.Append("\"isQuality\": 1,");
+                messageDevice.Append("\"meterAssessmentCode\": \"").Append(deviceCode).Append("\",");
+                messageDevice.Append("\"manufacturerCode\": \"").Append(Constants.ManufacturerCode).Append("\",");
+                messageDevice.Append("\"meterTypeId\": \"2\"");
+                messageDevice.Append("}");
+                foreach (KeyValuePair<string, IModel> item in channels)
+                {
+                    string key = item.Key;
+                    IModel channel = item.Value;
+                    IBasicProperties property = properties[key];
+                    channel.BasicPublish("zone.device", "", property, Encoding.UTF8.GetBytes(messageDevice.ToString())); //生产消息
+                }
+            }
+            #endregion
+
+            #region 历史数据同步
+            String getDateTime = Convert.ToDateTime(dr["更新时间"]).ToString("yyyy-MM-dd HH:mm:ss");
+
+            StringBuilder messageHis = new StringBuilder();
+            messageHis.Append("{");
+            messageHis.Append("\"meterAssessmentCode\": \"").Append(deviceCode).Append("\",");
+            messageHis.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(",");
+            messageHis.Append("\"getDateTime\": \"").Append(getDateTime).Append("\",");
+            if (Convert.DBNull != dr["净累计流量"])
+            {
+                messageHis.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(dr["净累计流量"])).Append(",");
+            }
+            if (Convert.DBNull != dr["正累计流量"])
+            {
+                messageHis.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(dr["正累计流量"])).Append(",");
+            }
+            if (Convert.DBNull != dr["负累计流量"])
+            {
+                messageHis.Append("\"negativeCumulativeFlow\": ").Append(Convert.ToDecimal(dr["负累计流量"])).Append(",");
+            }
+            if (Convert.DBNull != dr["瞬时流量"])
+            {
+                messageHis.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(dr["瞬时流量"])).Append(",");
+            }
+            if (Convert.DBNull != dr["泵出口压力"])
+            {
+                messageHis.Append("\"pressure\": ").Append(Convert.ToDecimal(dr["泵出口压力"])).Append(",");
+            }
+            if (Convert.DBNull != dr["PH"])
+            {
+                messageHis.Append("\"ph\": ").Append(Convert.ToDecimal(dr["PH"])).Append(",");
+            }
+            if (Convert.DBNull != dr["余氯"])
+            {
+                messageHis.Append("\"chlorine\": ").Append(Convert.ToDecimal(dr["余氯"])).Append(",");
+            }
+            if (Convert.DBNull != dr["浊度"])
+            {
+                messageHis.Append("\"turbidity\": ").Append(Convert.ToDecimal(dr["浊度"])).Append(",");
+            }
+            messageHis.Append("}");
+
+            foreach (KeyValuePair<string, IModel> item in channels)
+            {
+                string key = item.Key;
+                IModel channel = item.Value;
+                IBasicProperties property = properties[key];
+                channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(messageHis.ToString())); //生产消息
+            }
+            #endregion
+
+            return deviceCode;
+        }
+
+        /// <summary>
+        /// 更新配置文件中的值
+        /// </summary>
+        /// <param name="key">键</param>
+        /// <param name="value">值</param>
+        private void UpdateAppConfig(String key, String value)
+        {
+            var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
+            cfg.AppSettings.Settings[key].Value = value;
+            cfg.Save();
+            ConfigurationManager.RefreshSection("appSettings");
+
+        }
+
+        static IDbProvider dbHelper
+        {
+            get
+            {
+                var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion);
+                return DbDefine;
+            }
+        }
+    }
+}

+ 2 - 1
TimedUpload/TimedUpload.csproj

@@ -10,6 +10,7 @@
     <AssemblyName>TimedUpload</AssemblyName>
     <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
     <FileAlignment>512</FileAlignment>
+    <IsWebBootstrapper>false</IsWebBootstrapper>
     <PublishUrl>publish\</PublishUrl>
     <Install>true</Install>
     <InstallFrom>Disk</InstallFrom>
@@ -22,7 +23,6 @@
     <MapFileExtensions>true</MapFileExtensions>
     <ApplicationRevision>0</ApplicationRevision>
     <ApplicationVersion>1.0.0.%2a</ApplicationVersion>
-    <IsWebBootstrapper>false</IsWebBootstrapper>
     <UseApplicationTrust>false</UseApplicationTrust>
     <BootstrapperEnabled>true</BootstrapperEnabled>
   </PropertyGroup>
@@ -94,6 +94,7 @@
     <Compile Include="Constants.cs" />
     <Compile Include="Program.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="QuartzJobs\ChangleSecondPumpDataJob.cs" />
     <Compile Include="QuartzJobs\DABusinessDataJob.cs" />
     <Compile Include="QuartzJobs\NoiseDataUploadJob.cs" />
     <Compile Include="QuartzJobs\TestJob.cs" />

+ 1 - 1
TimedUpload/app.config

@@ -30,7 +30,7 @@
     <add key="SecondaryPumpColumn" value="PumpCode,ReadTime,Frequency,Current,RunState,Power,Voltage" />
     
 
-    <add key="ServiceName" value="数据同步到智慧水务系统" />
+    <add key="ServiceName" value="二供和水厂同步智慧水务系统" />
   </appSettings>
   <runtime>
     <assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">

+ 21 - 3
TimedUpload/quartz_jobs.xml

@@ -130,7 +130,7 @@
     </trigger>-->
 
     <!--营收数据定时上传数据-->
-    <job>
+    <!--<job>
       <name>DABusinessDataJob</name>
       <group>DABusinessData</group>
       <description>数据定时上传服务</description>
@@ -147,7 +147,7 @@
         <start-time>2017-08-08T00:00:00+08:00</start-time>
         <cron-expression>0 0 1 * * ? *</cron-expression>
       </cron>
-    </trigger>
+    </trigger>-->
     <!--水厂出口表数据定时上传数据
     <job>
       <name>WaterFactoryAreaDataJob</name>
@@ -167,6 +167,24 @@
         <cron-expression>0 0 1 * * ? *</cron-expression>
       </cron>
     </trigger>-->
-    
+    <!--昌乐泵房数据定时上传数据-->
+    <job>
+      <name>SecondaryPumpDataUploadJob</name>
+      <group>SecondaryPumpDataUploadData</group>
+      <description>昌乐泵房数据定时上传服务</description>
+      <job-type>TimedUpload.QuartzJobs.SecondaryPumpDataUploadJob,TimedUpload</job-type>
+      <durable>true</durable>
+      <recover>false</recover>
+    </job>
+    <trigger>
+      <cron>
+        <name>SecondaryPumpDataUploadJobbTrigger</name>
+        <group>SecondaryPumpDataUploadData</group>
+        <job-name>SecondaryPumpDataUploadJob</job-name>
+        <job-group>SecondaryPumpDataUploadData</job-group>
+        <start-time>2017-08-08T00:00:00+08:00</start-time>
+        <cron-expression>0 0 1 * * ? *</cron-expression>
+      </cron>
+    </trigger>
   </schedule>
 </job-scheduling-data>