jochu_liu 2 年 前
コミット
164ba4bfd4
共有6 個のファイルを変更した267 個の追加16 個の削除を含む
  1. 1 0
      TimedUpload/Constants.cs
  2. 216 0
      TimedUpload/QuartzJobs/WaterFactoryAreaDataJob.cs
  3. 4 0
      TimedUpload/TimedUpload.csproj
  4. 22 12
      TimedUpload/app.config
  5. 1 0
      TimedUpload/packages.config
  6. 23 4
      TimedUpload/quartz_jobs.xml

+ 1 - 0
TimedUpload/Constants.cs

@@ -10,6 +10,7 @@ namespace TimedUpload
     {
         public static string DbConncetion = ConfigurationManager.AppSettings["DbConncetion"];
         public static string WaterFactoryDbConncetion = ConfigurationManager.AppSettings["WaterFactoryDbConncetion"];
+        public static string zhihuishuiwuDB = ConfigurationManager.AppSettings["zhihuishuiwuDB"];
         public static string UploadUrl = ConfigurationManager.AppSettings["UploadUrl"];
         public static string UploadPort =  ConfigurationManager.AppSettings["UploadPort"];
         public static string UploadUserName = ConfigurationManager.AppSettings["UploadUserName"];

+ 216 - 0
TimedUpload/QuartzJobs/WaterFactoryAreaDataJob.cs

@@ -0,0 +1,216 @@
+using log4net;
+using Quartz;
+using RabbitMQ.Client;
+using RDIFramework.Utilities;
+using System;
+using System.Collections.Generic;
+using System.Data;
+using System.IO;
+using System.Text;
+
+namespace TimedUpload.QuartzJobs
+{
+    [DisallowConcurrentExecution]
+    public class WaterFactoryAreaDataJob :IJob
+    {
+        private readonly ILog log = LogManager.GetLogger(typeof(WaterFactoryAreaDataJob));
+
+        public void Execute(IJobExecutionContext context)
+        {
+            // throw new NotImplementedException();
+            string[] uploadUrls = Constants.UploadUrl.Split('|');
+            Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
+            Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
+            Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
+
+            foreach (string uploadUrl in uploadUrls)
+            {
+                ConnectionFactory factory = new ConnectionFactory();
+                factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
+                factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
+                factory.Password = Constants.UploadPassword;//默认密码
+
+                factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
+
+                IConnection connection = factory.CreateConnection();
+                IModel channel = connection.CreateModel();
+                
+                channel.QueueDeclare("zone.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
+
+                IBasicProperties property = channel.CreateBasicProperties();
+                property.ContentType = "text/plain";
+                property.DeliveryMode = 2; //持久化
+                connections.Add(uploadUrl, connection);
+                properties.Add(uploadUrl, property);
+                channels.Add(uploadUrl, channel);
+            }
+
+            if (channels.Count > 0)
+            {
+                SendZoneDeviceHis(channels, properties);
+            }
+
+            foreach (KeyValuePair<string, IConnection> item in connections)
+            {
+                IConnection connection = item.Value;
+                connection.Close();
+            }
+        }
+
+        /// <summary>
+        /// 大表历史数据
+        /// </summary>
+        /// <param name="channels"></param>
+        /// <param name="properties"></param>
+        private void SendZoneDeviceHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
+        {
+            try
+            {
+                log.Info("水厂大表设备历史数据同步任务开始执行.................\r\n");
+                String sqlMeter = "SELECT * FROM bs_waterfactory_meter WHERE Type = 1 AND IsFlow = 1";//"SELECT ID,考核表编码 FROM [设备信息] where 是否启用 = '是' and 考核表编码 is not null order by ID";
+                DataTable dtMeter = dbHelper.Fill(sqlMeter);
+
+                //Dictionary<string, object> arguments = new Dictionary<string, object>();
+                //arguments["x-max-length-bytes"] = 2147383648;
+                //arguments["x-overflow"] = "reject-publish";
+
+                
+                for (int i = 0; i < dtMeter.Rows.Count; i++)
+                {
+                    DataRow drMeter = dtMeter.Rows[i];
+                    //String meterId = drMeter["ID"].ToString();
+                    String meterCode = drMeter["MeterCode"].ToString().TrimEnd() + "wf";
+                    String lastTime = "";
+
+                    //for (int k = lastYear; k <= nowYear; k++)
+                    //{
+                        //string tablename = "历史记录_" + ("000000" + meterId).Substring(meterId.Length, 6) + "_" + k;
+
+                        // 判断历史记录表是否存在
+                        //if (!CheckTableExist(tablename))
+                        //{
+                        //    continue;
+                        //}
+
+                        //String sqlMeterHis = "select 记录时间,采集时间,正累计流量,负累计流量,净累计流量,瞬时流量,电池电压,压力 from " + tablename;
+                        //if (uploadHis.ContainsKey(meterCode))
+                        //{
+                        //    sqlMeterHis += " where 采集时间 > '" + uploadHis[meterCode] + "'";
+                        //}
+                        //sqlMeterHis += " order by 采集时间";
+                        //DataTable dtMeterHis = dbHelper.Fill(sqlMeterHis);
+
+                        StringBuilder message = new StringBuilder();
+                        for (int j = 0; j < dtMeter.Rows.Count; j++)
+                        {
+                            message.Clear();
+                            try
+                            {
+                                //DataRow drMeterHis = dtMeterHis.Rows[j];
+                                String getDateTime = Convert.ToDateTime(drMeter["ReadTime"]).ToString("yyyy-MM-dd HH:mm:ss");
+                                message.Append("{");
+                                message.Append("\"meterAssessmentCode\": \"").Append(drMeter["MeterCode"].ToString()).Append("wf\",");
+                                message.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(",");
+                                message.Append("\"getDateTime\": \"").Append(getDateTime).Append("\",");
+                                if (Convert.DBNull != drMeter["NetCumulativeFlow"])
+                                {
+                                    message.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(drMeter["NetCumulativeFlow"])).Append(",");
+                                }
+                                if (Convert.DBNull != drMeter["NetCumulativeFlow"])
+                                {
+                                    message.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(drMeter["NetCumulativeFlow"])).Append(",");
+                                }
+                                
+                                message.Append("\"negativeCumulativeFlow\": ").Append(0).Append(",");
+                               
+                                if (Convert.DBNull != drMeter["InstantaneousFlow"])
+                                {
+                                    message.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(drMeter["instantaneousFlow"])).Append(",");
+                                }
+                                if (Convert.DBNull != drMeter["Pressure"])
+                                {
+                                    message.Append("\"pressure\": ").Append(Convert.ToDecimal(drMeter["Pressure"])).Append(",");
+                                }
+                                //if (Convert.DBNull != drMeter["电池电压"])
+                                //{
+                                //    message.Append("\"batteryVoltageValue\": ").Append(Convert.ToDecimal(drMeterHis["电池电压"])).Append(",");
+                                //}
+                                message.Append("}");
+
+                                foreach (KeyValuePair<string, IModel> item in channels)
+                                {
+                                    string key = item.Key;
+                                    IModel channel = item.Value;
+                                    IBasicProperties property = properties[key];
+                                    channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息
+                                }
+                                lastTime = getDateTime;
+                            }
+                            catch (Exception ex)
+                            {
+                                log.Info("大表设备历史记录同步任务数据推送失败:" + message.ToString() + "\r\n");
+                                log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n");
+                            }
+                        }
+                    //}
+                    //if (!"".Equals(lastTime))
+                    //{
+                    //    uploadHis[meterCode] = lastTime;
+                    //}
+                }
+
+                //SavaUploadHis(uploadHis);
+                log.Info("水厂大表设备历史记录同步任务执行结束.................\r\n");
+            }
+            catch (Exception ex)
+            {
+                log.Error("水厂大表设备历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
+            }
+        }
+
+        /// <summary>
+        /// 判断历史记录表是否存在
+        /// </summary>
+        /// <param name="tablename"></param>
+        /// <returns></returns>
+        private bool CheckTableExist(string tablename)
+        {
+            DataTable table = dbHelper.Fill("select top 1 * from sysobjects where name='" + tablename + "' and xtype='u'");
+            if (table == null || table.Rows.Count == 0)
+            {
+                return false;
+            }
+            return true;
+        }
+
+        /// <summary>
+        /// 保存每块块表的上传最后一条历史记录
+        /// </summary>
+        /// <param name="uploadHis"></param>
+        private void SavaUploadHis(Dictionary<String, String> uploadHis)
+        {
+            // 清除之前的内容
+            FileStream stream = File.Open(@"TextFile1.txt", FileMode.OpenOrCreate, FileAccess.Write);
+            stream.Seek(0, SeekOrigin.Begin);
+            stream.SetLength(0);
+            stream.Close();
+
+            using (StreamWriter sw = new StreamWriter(@"TextFile1.txt"))
+            {
+                foreach (var item in uploadHis)
+                {
+                    sw.WriteLine(item.Key + "," + item.Value);
+                }
+            }
+        }
+
+        static IDbProvider dbHelper
+        {
+            get
+            {
+                var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.MySql, Constants.zhihuishuiwuDB);
+                return DbDefine;
+            }
+        }
+    }
+}

+ 4 - 0
TimedUpload/TimedUpload.csproj

@@ -55,6 +55,9 @@
     <Reference Include="log4net, Version=2.0.8.0, Culture=neutral, PublicKeyToken=669e0ddf0bb1aa2a, processorArchitecture=MSIL">
       <HintPath>..\packages\log4net.2.0.8\lib\net40-full\log4net.dll</HintPath>
     </Reference>
+    <Reference Include="MySql.Data, Version=6.9.12.0, Culture=neutral, PublicKeyToken=c5687fc88969c44d, processorArchitecture=MSIL">
+      <HintPath>..\packages\MySql.Data.6.9.12\lib\net40\MySql.Data.dll</HintPath>
+    </Reference>
     <Reference Include="Newtonsoft.Json, Version=6.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
       <SpecificVersion>False</SpecificVersion>
       <HintPath>..\packages\Newtonsoft.Json.10.0.3\lib\net40\Newtonsoft.Json.dll</HintPath>
@@ -93,6 +96,7 @@
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="QuartzJobs\DABusinessDataJob.cs" />
     <Compile Include="QuartzJobs\TestJob.cs" />
+    <Compile Include="QuartzJobs\WaterFactoryAreaDataJob.cs" />
     <Compile Include="QuartzJobs\WorkmanshipDataUploadJob.cs" />
     <Compile Include="QuartzJobs\WaterWellDataUploadJob.cs" />
     <Compile Include="QuartzJobs\SecondaryPumpDataUploadJob.cs" />

+ 22 - 12
TimedUpload/app.config

@@ -4,6 +4,7 @@
     <add key="DbConncetion" value="Data Source=47.105.90.108;Initial Catalog=大安大表;uid=sa;password=wwkj@2136807" />
     <add key="WaterFactoryDbConncetion" value="Data Source=39.99.237.110;Initial Catalog=大安水厂;uid=sa;password=wwkj@2136807" />
     <add key="ChargeDB" value="Data Source=222.163.159.218,10433;Initial Catalog=ChargeManage_Test;uid=sa;password=Daswwwkj@123" />
+    <add key="zhihuishuiwuDB" value="server=39.99.237.110;user id=root;password=wwkj@2136807;database=smartwater_daan;charset=utf8" />
     <!-- 智慧水务系统RabbitMQ信息 start -->
     <add key="UploadUrl" value="127.0.0.1" />
     <add key="UploadPort" value="5678" />
@@ -11,25 +12,25 @@
     <add key="UploadPassword" value="wwkj123!" />
     <!-- 智慧水务系统RabbitMQ信息 end -->
     <add key="ManufacturerCode" value="1" />
-    <add key="MeterId" value="66"/>
+    <add key="MeterId" value="66" />
     <!-- 二供数据是否推送到分区记录 1:是;0:否-->
-    <add key="SecondaryToDMA" value="1"/>
+    <add key="SecondaryToDMA" value="1" />
     <!-- 二供设备已同步到分区计量的设备编码 -->
-    <add key="SecondaryToDMACode" value=""/>
+    <add key="SecondaryToDMACode" value="" />
 
-    <add key="BmId" value="145"/>
-    <add key="UserMeterId" value="0"/>
-    <add key="UserMeterDate" value="20220225"/>
-    <add key="UserMeterReadId" value="0"/>
-    <add key="UserMeterReadDate" value="2022-02-25 00:00:00"/>
+    <add key="BmId" value="145" />
+    <add key="UserMeterId" value="0" />
+    <add key="UserMeterDate" value="20220225" />
+    <add key="UserMeterReadId" value="0" />
+    <add key="UserMeterReadDate" value="2022-02-25 00:00:00" />
     
     
-    <add key="WaterFactoryColumn" value="MeterCode,ReadTime,NetCumulativeFlow,InstantaneousFlow,Pressure,PH,Chlorine,Turbidity"/>
+    <add key="WaterFactoryColumn" value="MeterCode,ReadTime,NetCumulativeFlow,InstantaneousFlow,Pressure,PH,Chlorine,Turbidity" />
 
-    <add key="SecondaryPumpColumn" value="PumpCode,ReadTime,Frequency,Current,RunState,Power,Voltage"/>
+    <add key="SecondaryPumpColumn" value="PumpCode,ReadTime,Frequency,Current,RunState,Power,Voltage" />
     
 
-    <add key="ServiceName" value="数据同步到智慧水务系统"/>
+    <add key="ServiceName" value="数据同步到智慧水务系统" />
   </appSettings>
   <runtime>
     <assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
@@ -37,6 +38,15 @@
         <assemblyIdentity name="log4net" publicKeyToken="669e0ddf0bb1aa2a" culture="neutral" />
         <bindingRedirect oldVersion="0.0.0.0-2.0.8.0" newVersion="2.0.8.0" />
       </dependentAssembly>
+      <dependentAssembly>
+        <assemblyIdentity name="MySql.Data" publicKeyToken="c5687fc88969c44d" culture="neutral" />
+        <bindingRedirect oldVersion="0.0.0.0-6.9.12.0" newVersion="6.9.12.0" />
+      </dependentAssembly>
     </assemblyBinding>
   </runtime>
-</configuration>
+<system.data>
+    <DbProviderFactories>
+      <remove invariant="MySql.Data.MySqlClient" />
+      <add name="MySQL Data Provider" invariant="MySql.Data.MySqlClient" description=".Net Framework Data Provider for MySQL" type="MySql.Data.MySqlClient.MySqlClientFactory, MySql.Data, Version=6.9.12.0, Culture=neutral, PublicKeyToken=c5687fc88969c44d" />
+    </DbProviderFactories>
+  </system.data></configuration>

+ 1 - 0
TimedUpload/packages.config

@@ -3,6 +3,7 @@
   <package id="Common.Logging" version="3.3.1" targetFramework="net40" />
   <package id="Common.Logging.Core" version="3.3.1" targetFramework="net40" />
   <package id="log4net" version="2.0.8" targetFramework="net40" />
+  <package id="MySql.Data" version="6.9.12" targetFramework="net40" />
   <package id="Oracle.ManagedDataAccess" version="12.2.1100" targetFramework="net40" />
   <package id="Quartz" version="2.6.0" targetFramework="net40" />
 </packages>

+ 23 - 4
TimedUpload/quartz_jobs.xml

@@ -69,7 +69,7 @@
     </trigger>-->
 
     <!--二供数据定时上传数据-->
-    <!--<job>
+    <job>
       <name>SecondaryPumpDataUploadJob</name>
       <group>SecondaryPumpDataUpload</group>
       <description>数据定时上传服务</description>
@@ -86,7 +86,7 @@
         <start-time>2017-08-08T00:00:00+08:00</start-time>
         <cron-expression>20 0/5 * * * ? </cron-expression>
       </cron>
-    </trigger>-->
+    </trigger>
 
     
     <!--水源井数据定时上传数据-->
@@ -129,7 +129,7 @@
       </cron>
     </trigger>-->
 
-    <!--营收数据定时上传数据-->
+    <!--营收数据定时上传数据
     <job>
       <name>DABusinessDataJob</name>
       <group>DABusinessData</group>
@@ -147,7 +147,26 @@
         <start-time>2017-08-08T00:00:00+08:00</start-time>
         <cron-expression>0 0 1 * * ? *</cron-expression>
       </cron>
-    </trigger>
+    </trigger>-->
+    <!--水厂出口表数据定时上传数据
+    <job>
+      <name>WaterFactoryAreaDataJob</name>
+      <group>WaterFactoryAreaData</group>
+      <description>数据定时上传服务</description>
+      <job-type>TimedUpload.QuartzJobs.WaterFactoryAreaDataJob,TimedUpload</job-type>
+      <durable>true</durable>
+      <recover>false</recover>
+    </job>
+    <trigger>
+      <cron>
+        <name>WaterFactoryAreaDataJobTrigger</name>
+        <group>WaterFactoryAreaData</group>
+        <job-name>WaterFactoryAreaDataJob</job-name>
+        <job-group>WaterFactoryAreaData</job-group>
+        <start-time>2017-08-08T00:00:00+08:00</start-time>
+        <cron-expression>0 0 1 * * ? *</cron-expression>
+      </cron>
+    </trigger>-->
     
   </schedule>
 </job-scheduling-data>