Procházet zdrojové kódy

添加荆山水厂同步数据

jochu_liu před 1 rokem
rodič
revize
640d895435

+ 372 - 0
TimedUpload/QuartzJobs/jingshanDataUploadDataJob.cs

@@ -0,0 +1,372 @@
+using log4net;
+using Quartz;
+using RabbitMQ.Client;
+using RDIFramework.Utilities;
+using System;
+using System.Collections.Generic;
+using System.Configuration;
+using System.Data;
+using System.IO;
+using System.Linq;
+using System.Text;
+
+namespace TimedUpload.QuartzJobs
+{
+    class jingshanDataUploadDataJob : IJob
+    {
+        private readonly ILog log = LogManager.GetLogger(typeof(DataUploadJob));
+
+        public void Execute(IJobExecutionContext context)
+        {
+            string[] uploadUrls = Constants.UploadUrl.Split('|');
+            Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
+            Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
+            Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
+
+            foreach (string uploadUrl in uploadUrls)
+            {
+                ConnectionFactory factory = new ConnectionFactory();
+                factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
+                factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
+                factory.Password = Constants.UploadPassword;//默认密码
+
+                factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
+
+                IConnection connection = factory.CreateConnection();
+                IModel channel = connection.CreateModel();
+                channel.QueueDeclare("zone.device", true, false, false, null);//创建一个名称为kibaqueue的消息队列
+                channel.QueueDeclare("zone.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
+
+                IBasicProperties property = channel.CreateBasicProperties();
+                property.ContentType = "text/plain";
+                property.DeliveryMode = 2; //持久化
+                connections.Add(uploadUrl, connection);
+                properties.Add(uploadUrl, property);
+                channels.Add(uploadUrl, channel);
+            }
+
+            if (channels.Count > 0)
+            {
+                SendZoneDevice(channels, properties);
+                SendZoneDeviceHis(channels, properties);
+            }
+
+            foreach (KeyValuePair<string, IConnection> item in connections)
+            {
+                IConnection connection = item.Value;
+                connection.Close();
+            }
+        }
+
+        /// <summary>
+        /// 大表设备添加
+        /// </summary>
+        /// <param name="channels"></param>
+        /// <param name="properties"></param>
+        private void SendZoneDevice(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
+        {
+            try
+            {
+                String meterId = Constants.MeterId;
+                String sql = "SELECT a.ID,a.名称,a.考核表编码,a.X坐标,a.Y坐标,b.传输协议参数,ISNULL(a.是否阀控,0) AS isValve FROM [设备信息] a left join 传输设备 b on a.传输设备ID = b.ID where a.是否启用 = '是' and a.考核表编码 is not null and a.ID > " + meterId + " order by a.ID";
+                DataTable dt = dbHelper.Fill(sql);
+                if (dt == null)
+                {
+                    log.Info("大表设备同步任务查询报错.................\r\n");
+                    return;
+                }
+
+                if (dt.Rows.Count == 0)
+                {
+                    log.Info("大表设备同步任务,没有需要同步的设备.................\r\n");
+                    return;
+                }
+
+                log.Info("大表设备同步任务开始执行.................\r\n");
+
+
+                StringBuilder message = new StringBuilder();
+                for (int i = 0; i < dt.Rows.Count; i++)
+                {
+                    message.Clear();
+                    try
+                    {
+                        DataRow dr = dt.Rows[i];
+                        String iccid = "";
+                        if (!"".Equals(dr["传输协议参数"].ToString()))
+                        {
+                            iccid = dr["传输协议参数"].ToString().Split(',')[0];
+                        }
+
+                        String lngAndLat = "";
+                        if (!"".Equals(dr["X坐标"].ToString()) && !"".Equals(dr["Y坐标"].ToString()))
+                        {
+                            lngAndLat = dr["X坐标"].ToString() + "|" + dr["Y坐标"].ToString();
+                        }
+                        message.Append("{");
+                        message.Append("\"meterAssessmentName\": \"").Append(dr["名称"]).Append("\",");
+                        message.Append("\"iccId\": ").Append(iccid).Append(",");
+                        //message.Append("\"areaId\": 22,");
+                        message.Append("\"lngAndLat\": \"").Append(lngAndLat).Append("\",");
+                        //message.Append("\"pipeCailber\": \"DN32\",");
+                        //message.Append("\"pipeTexture\": \"PVC\",");
+                        //message.Append("\"imei\": \"77564212\",");
+                        message.Append("\"isPressucre\": 1,");
+                        message.Append("\"isFlow\": 1,");
+                        message.Append("\"isZoneMeter\": 1,");
+                        message.Append("\"isTradeMeter\": 0,");
+                        message.Append("\"isLargeUser\": 0,");
+                        message.Append("\"isLargeUser\": 0,");
+                        message.Append("\"isValve\":").Append(dr["isValve"].ToString()).Append(",");
+                        message.Append("\"meterAssessmentCode\": \"").Append(dr["考核表编码"]).Append("\",");
+                        message.Append("\"manufacturerCode\": \"").Append(Constants.ManufacturerCode).Append("\",");
+                        message.Append("\"meterTypeId\": \"2\"");
+                        message.Append("}");
+                        foreach (KeyValuePair<string, IModel> item in channels)
+                        {
+                            string key = item.Key;
+                            IModel channel = item.Value;
+                            IBasicProperties property = properties[key];
+                            channel.BasicPublish("zone.device", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息
+                        }
+                        meterId = dr["ID"].ToString();
+                    }
+                    catch (Exception ex)
+                    {
+                        log.Info("大表设备同步任务数据推送失败:" + message.ToString() + "\r\n");
+                        log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n");
+                    }
+                }
+                UpdateAppConfig("MeterId", meterId);
+                log.Info("大表设备同步任务执行结束.................\r\n");
+            }
+            catch (Exception ex)
+            {
+                log.Error("大表设备同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
+            }
+        }
+
+        /// <summary>
+        /// 大表历史数据
+        /// </summary>
+        /// <param name="channels"></param>
+        /// <param name="properties"></param>
+        private void SendZoneDeviceHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
+        {
+            try
+            {
+                log.Info("大表设备历史数据同步任务开始执行.................\r\n");
+                String sqlMeter = "SELECT ID,考核表编码,ISNULL(是否阀控,0) AS isValve FROM [设备信息] where 是否启用 = '是' and 考核表编码 is not null order by ID";
+                DataTable dtMeter = dbHelper.Fill(sqlMeter);
+
+                //Dictionary<string, object> arguments = new Dictionary<string, object>();
+                //arguments["x-max-length-bytes"] = 2147383648;
+                //arguments["x-overflow"] = "reject-publish";
+
+                Dictionary<String, String> uploadHis = new Dictionary<string, string>();
+                using (StreamReader sr = new StreamReader(@"TextFile1.txt"))
+                {
+                    String line = "";
+                    while ((line = sr.ReadLine()) != null)
+                    {
+                        if (!"".Equals(line))
+                        {
+                            String[] item = line.Split(',');
+                            uploadHis[item[0]] = item[1];
+                        }
+                    }
+                }
+
+                for (int i = 0; i < dtMeter.Rows.Count; i++)
+                {
+                    DataRow drMeter = dtMeter.Rows[i];
+                    String meterId = drMeter["ID"].ToString();
+                    String meterCode = drMeter["考核表编码"].ToString();
+                    String lastTime = "";
+                    int isValve = Convert.ToInt32(drMeter["isValve"]);
+
+                    int nowYear = DateTime.Now.Year;
+                    int lastYear = uploadHis.ContainsKey(meterCode) ? Convert.ToDateTime(uploadHis[meterCode]).Year : nowYear;
+
+                    for (int k = lastYear; k <= nowYear; k++)
+                    {
+                        string tablename = "历史记录_" + ("000000" + meterId).Substring(meterId.Length, 6) + "_" + k;
+
+                        // 判断历史记录表是否存在
+                        if (!CheckTableExist(tablename))
+                        {
+                            continue;
+                        }
+
+                        String sqlMeterHis = "select 记录时间,采集时间,正累计流量,负累计流量,净累计流量,瞬时流量,电池电压,压力";
+                        if (isValve == 1) {// 阀控的情况
+                            sqlMeterHis += ",开度,运行模式,手动开度设定值,压力量程设定,常用压力设定,阀门调整间隔";
+                        }
+                        sqlMeterHis +=" from " + tablename;
+                        if (uploadHis.ContainsKey(meterCode))
+                        {
+                            sqlMeterHis += " where 采集时间 > '" + uploadHis[meterCode] + "'";
+                        }
+                        sqlMeterHis += " order by 采集时间";
+                        DataTable dtMeterHis = dbHelper.Fill(sqlMeterHis);
+
+                        StringBuilder message = new StringBuilder();
+                        for (int j = 0; j < dtMeterHis.Rows.Count; j++)
+                        {
+                            message.Clear();
+                            try
+                            {
+                                DataRow drMeterHis = dtMeterHis.Rows[j];
+                                String getDateTime = Convert.ToDateTime(drMeterHis["采集时间"]).ToString("yyyy-MM-dd HH:mm:ss");
+                                message.Append("{");
+                                message.Append("\"meterAssessmentCode\": \"").Append(meterCode).Append("\",");
+                                message.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(",");
+                                message.Append("\"getDateTime\": \"").Append(getDateTime).Append("\",");
+
+                                if (isValve == 1)
+                                {
+                                    if (Convert.DBNull != drMeterHis["开度"])
+                                    {
+                                        message.Append("\"valveOpening\": ").Append(Convert.ToDecimal(drMeterHis["开度"])).Append(",");
+                                    }
+                                    if (Convert.DBNull != drMeterHis["运行模式"])
+                                    {
+                                        message.Append("\"operaOption\": ").Append(Convert.ToDecimal(drMeterHis["运行模式"])).Append(",");
+                                    }
+                                    if (Convert.DBNull != drMeterHis["手动开度设定值"])
+                                    {
+                                        message.Append("\"autoOpingVal\": ").Append(Convert.ToDecimal(drMeterHis["手动开度设定值"])).Append(",");
+                                    }
+                                    if (Convert.DBNull != drMeterHis["压力量程设定"])
+                                    {
+                                        message.Append("\"pressureRange\": ").Append(Convert.ToDecimal(drMeterHis["压力量程设定"])).Append(",");
+                                    }
+                                    if (Convert.DBNull != drMeterHis["常用压力设定"])
+                                    {
+                                        message.Append("\"comPressureVal\": ").Append(Convert.ToDecimal(drMeterHis["常用压力设定"])).Append(",");
+                                    }
+                                    if (Convert.DBNull != drMeterHis["阀门调整间隔"])
+                                    {
+                                        message.Append("\"changeTime\": ").Append(Convert.ToDecimal(drMeterHis["阀门调整间隔"])).Append(",");
+                                    }
+                                }
+
+                                if (Convert.DBNull != drMeterHis["净累计流量"])
+                                {
+                                    message.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["净累计流量"])).Append(",");
+                                }
+                                if (Convert.DBNull != drMeterHis["正累计流量"])
+                                {
+                                    message.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["正累计流量"])).Append(",");
+                                }
+                                if (Convert.DBNull != drMeterHis["负累计流量"])
+                                {
+                                    message.Append("\"negativeCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["负累计流量"])).Append(",");
+                                }
+                                if (Convert.DBNull != drMeterHis["瞬时流量"])
+                                {
+                                    message.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(drMeterHis["瞬时流量"])).Append(",");
+                                }
+                                if (Convert.DBNull != drMeterHis["压力"])
+                                {
+                                    message.Append("\"pressure\": ").Append(Convert.ToDecimal(drMeterHis["压力"])).Append(",");
+                                }
+                                if (Convert.DBNull != drMeterHis["电池电压"])
+                                {
+                                    message.Append("\"batteryVoltageValue\": ").Append(Convert.ToDecimal(drMeterHis["电池电压"])).Append(",");
+                                }
+                                
+
+                                message.Append("}");
+
+                                foreach (KeyValuePair<string, IModel> item in channels)
+                                {
+                                    string key = item.Key;
+                                    IModel channel = item.Value;
+                                    IBasicProperties property = properties[key];
+                                    channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息
+                                }
+                                lastTime = getDateTime;
+                            }
+                            catch (Exception ex)
+                            {
+                                log.Info("大表设备历史记录同步任务数据推送失败:" + message.ToString() + "\r\n");
+                                log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n");
+                            }
+                        }
+                    }
+                    if (!"".Equals(lastTime))
+                    {
+                        uploadHis[meterCode] = lastTime;
+                    }
+                }
+
+                SavaUploadHis(uploadHis);
+                log.Info("大表设备历史记录同步任务执行结束.................\r\n");
+            }
+            catch (Exception ex)
+            {
+                log.Error("大表设备历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
+            }
+        }
+
+        /// <summary>
+        /// 更新配置文件中的值
+        /// </summary>
+        /// <param name="key">键</param>
+        /// <param name="value">值</param>
+        private void UpdateAppConfig(String key, String value)
+        {
+            var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
+            cfg.AppSettings.Settings[key].Value = value;
+            cfg.Save();
+            ConfigurationManager.RefreshSection("appSettings");
+
+        }
+
+        /// <summary>
+        /// 判断历史记录表是否存在
+        /// </summary>
+        /// <param name="tablename"></param>
+        /// <returns></returns>
+        private bool CheckTableExist(string tablename)
+        {
+            DataTable table = dbHelper.Fill("select top 1 * from sysobjects where name='" + tablename + "' and xtype='u'");
+            if (table == null || table.Rows.Count == 0)
+            {
+                return false;
+            }
+            return true;
+        }
+
+        /// <summary>
+        /// 保存每块块表的上传最后一条历史记录
+        /// </summary>
+        /// <param name="uploadHis"></param>
+        private void SavaUploadHis(Dictionary<String, String> uploadHis)
+        {
+            // 清除之前的内容
+            FileStream stream = File.Open(@"TextFile1.txt", FileMode.OpenOrCreate, FileAccess.Write);
+            stream.Seek(0, SeekOrigin.Begin);
+            stream.SetLength(0);
+            stream.Close();
+
+            using (StreamWriter sw = new StreamWriter(@"TextFile1.txt"))
+            {
+                foreach (var item in uploadHis)
+                {
+                    sw.WriteLine(item.Key + "," + item.Value);
+                }
+            }
+        }
+
+        static IDbProvider dbHelper
+        {
+            get
+            {
+                var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.DbConncetion);
+                return DbDefine;
+            }
+        }
+    }
+}

+ 1 - 0
TimedUpload/TimedUpload.csproj

@@ -100,6 +100,7 @@
     <Compile Include="QuartzJobs\ChangleWaterFactoryDataJob.cs" />
     <Compile Include="QuartzJobs\ChangleWorkmanshipDataUploadJob.cs" />
     <Compile Include="QuartzJobs\DABusinessDataJob.cs" />
+    <Compile Include="QuartzJobs\jingshanDataUploadDataJob.cs" />
     <Compile Include="QuartzJobs\NoiseDataUploadJob.cs" />
     <Compile Include="QuartzJobs\TestJob.cs" />
     <Compile Include="QuartzJobs\WaterFactoryAreaDataJob.cs" />

+ 2 - 2
TimedUpload/app.config

@@ -1,14 +1,14 @@
 <?xml version="1.0" encoding="utf-8"?>
 <configuration>
   <appSettings>
-    <add key="DbConncetion" value="Data Source=47.105.90.108;Initial Catalog=大安大表;uid=sa;password=wwkj@2136807" />
+    <add key="DbConncetion" value="Data Source=47.92.134.169;Initial Catalog=jingshan;uid=sa;password=wwkj@2136807" />
     <add key="WaterFactoryDbConncetion" value="Data Source=39.99.237.110;Initial Catalog=大安水厂;uid=sa;password=wwkj@2136807" />
     <add key="ChargeDB" value="Data Source=222.163.159.218,10433;Initial Catalog=ChargeManage;uid=sa;password=Daswwwkj@123" />
     <add key="zhihuishuiwuDB" value="server=39.99.237.110;user id=root;password=wwkj@2136807;database=smartwater_daan;charset=utf8" />
     <add key="changleWaterFactoryDb" value=""/>
     <add key="ChangLeWaterQualityDb" value="Data Source=47.105.90.108;Initial Catalog=舜水水质;uid=sa;password=wwkj@2136807" />
     <!-- 智慧水务系统RabbitMQ信息 start -->
-    <add key="UploadUrl" value="182.92.149.41" />
+    <add key="UploadUrl" value="127.0.0.1" />
     <add key="UploadPort" value="5678" />
     <add key="UploadUserName" value="wwkj" />
     <add key="UploadPassword" value="wwkj123!" />

+ 21 - 1
TimedUpload/quartz_jobs.xml

@@ -244,7 +244,7 @@
       </cron>
     </trigger> -->
     <!--昌乐工艺图数据定时上传数据-->
-    <job>
+    <!--<job>
       <name>ChangleWorkmanshipDataJob</name>
       <group>ChangleWorkmanshipData</group>
       <description>昌乐水质数据定时上传服务</description>
@@ -261,6 +261,26 @@
         <start-time>2017-08-08T00:00:00+08:00</start-time>
         <cron-expression>0 0 1 * * ? *</cron-expression>
       </cron>
+    </trigger>-->
+
+    <!--荆山水厂分区计量数据定时上传数据-->
+    <job>
+      <name>jingshanDataUploadDataJob</name>
+      <group>jingshanDataUploadData</group>
+      <description>数据定时上传服务</description>
+      <job-type>TimedUpload.QuartzJobs.jingshanDataUploadDataJob,TimedUpload</job-type>
+      <durable>true</durable>
+      <recover>false</recover>
+    </job>
+    <trigger>
+      <cron>
+        <name>jingshanDataUploadDataJobTrigger</name>
+        <group>jingshanDataUploadData</group>
+        <job-name>jingshanDataUploadDataJob</job-name>
+        <job-group>jingshanDataUploadData</job-group>
+        <start-time>2017-08-08T00:00:00+08:00</start-time>
+        <cron-expression>0 0/30 * * * ? *</cron-expression>
+      </cron>
     </trigger>
   </schedule>
 </job-scheduling-data>