Ver código fonte

同步数据

jochu_liu 1 ano atrás
pai
commit
256120d488

+ 11 - 1
TimedUpload/QuartzJobs/DABusinessDataJob.cs

@@ -72,15 +72,23 @@ namespace TimedUpload.QuartzJobs
             log.Info("营收户表基础数据同步任务开始执行.................\r\n");
             while (true)
             {
+                
                 string sql = "SELECT top 100 b.CM_ID,c.RouteCode,RouteName,a.CustomerCode,a.CustomerName,b.ElecAddress,b.DetailedAddress MeterAddress,a.DetailedAddress CustomerAddress";
                 sql += " FROM BCS_Customer a,BCS_CustomerMeter b, BCS_MeterReadingRoute c WHERE a.Cus_ID = b.Cus_ID and b.Mrr_ID = c.MRR_ID ";
                 sql += " AND b.CM_ID > " + meterId + " ORDER BY CM_ID";
+
+                // log.Info(sql);
+                
                 DataTable dt = dbHelper.Fill(sql);
+
+                // log.Info("营业收费户表 | " + dt.Rows.Count);
                 if (dt == null || dt.Rows.Count == 0)
                 {
                     break;
                 }
 
+                
+
                 StringBuilder message = new StringBuilder();
                 for (int i = 0; i < dt.Rows.Count; i++)
                 {
@@ -139,13 +147,15 @@ namespace TimedUpload.QuartzJobs
         private void SendZoneMeterUserHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
         {
             string userMeterReadId = Constants.UserMeterReadId;
-            log.Info("营收户表抄表数据同步任务开始执行.................\r\n");
+            //log.Info("营收户表抄表数据同步任务开始执行.................\r\n");
             while (true)
             {
 
                 string sql = "SELECT top 100 UsedWater_ID,CM_ID,ThisMeterNumber,ThisMeterDt,BetweenMeteNumber,b.CreateDT FROM BCS_UsedWater b";
                 sql += " WHERE b.BM_ID >= " + bmId + " AND b.UsedWater_ID > " + userMeterReadId + " ORDER BY b.BM_ID,b.UsedWater_ID";
+                // log.Info(sql);
                 DataTable dt = dbHelper.Fill(sql);
+
                 if (dt == null || dt.Rows.Count == 0)
                 {
 

+ 31 - 23
TimedUpload/QuartzJobs/NoiseDataUploadJob.cs

@@ -6,6 +6,7 @@ using System.Collections.Generic;
 using RabbitMQ.Client;
 using System.Text;
 using Newtonsoft.Json;
+using System.Data;
 
 namespace TimedUpload.QuartzJobs
 {
@@ -66,31 +67,38 @@ namespace TimedUpload.QuartzJobs
             {
                 NoiseModel model = new NoiseModel();
                 log.Info("同步噪声声设备数据...............start\r\n");
+                String sql = ""; 
 
-                // 数据 获取 以及 赋值 等着以后有设备了再说 
-
-                model.deviceCode = "";
-                model.readTime = DateTime.Now;
-                model.videoUrl = "";
-                model.weather = "";
-                model.isLeakPoint = 0;
-                model.csq = 0;
-                model.psrp = 0;
-                model.snr = 0;
-                model.ecl = 0;
-                model.pcl = 0;
-                model.isBatteryUnusual = 0;
-                model.isHitch = 0;
-                model.singnalType = 1;
-
-                string message = JsonConvert.SerializeObject(model);
-
-                foreach (var item in channels)
+                DataTable dt = dbHelper.Fill(sql);
+                if (dt != null)
                 {
-                    string key = item.Key;
-                    IModel channel = item.Value;
-                    IBasicProperties property = properties[key];
-                    channel.BasicPublish(NOISECHANNELNAME, "", property, Encoding.UTF8.GetBytes(message)); //生产消息
+                    // 数据 获取 以及 赋值 知道表结构了在去写查询和赋值
+                    for (int i = 0; i < dt.Rows.Count; i++)
+                    {
+                        model.deviceCode = "";
+                        model.readTime = DateTime.Now;
+                        model.videoUrl = "";
+                        model.weather = "";
+                        model.isLeakPoint = 0;
+                        model.csq = 0;
+                        model.psrp = 0;
+                        model.snr = 0;
+                        model.ecl = 0;
+                        model.pcl = 0;
+                        model.isBatteryUnusual = 0;
+                        model.isHitch = 0;
+                        model.singnalType = 1;
+
+                        string message = JsonConvert.SerializeObject(model);
+
+                        foreach (var item in channels)
+                        {
+                            string key = item.Key;
+                            IModel channel = item.Value;
+                            IBasicProperties property = properties[key];
+                            channel.BasicPublish(NOISECHANNELNAME, "", property, Encoding.UTF8.GetBytes(message)); //生产消息
+                        }
+                    }
                 }
 
                 log.Info("同步噪声声设备数据...............end\r\n");

+ 102 - 92
TimedUpload/QuartzJobs/SecondaryPumpDataUploadJob.cs

@@ -20,41 +20,49 @@ namespace TimedUpload.QuartzJobs
         public void Execute(IJobExecutionContext context)
         {
 
-            string[] uploadUrls = Constants.UploadUrl.Split('|');
-            Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
-            Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
-            Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
-
-            foreach (string uploadUrl in uploadUrls)
+            try
             {
-                ConnectionFactory factory = new ConnectionFactory();
-                factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
-                factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
-                factory.Password = Constants.UploadPassword;//默认密码
-
-                factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
-
-                IConnection connection = factory.CreateConnection();
-                IModel channel = connection.CreateModel();
-                channel.QueueDeclare("secondaryPump.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
-
-                IBasicProperties property = channel.CreateBasicProperties();
-                property.ContentType = "text/plain";
-                property.DeliveryMode = 2; //持久化
-                connections.Add(uploadUrl, connection);
-                properties.Add(uploadUrl, property);
-                channels.Add(uploadUrl, channel);
-            }
+                string[] uploadUrls = Constants.UploadUrl.Split('|');
+                Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
+                Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
+                Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
 
-            if (channels.Count > 0)
-            {
-                SendSecondaryPumpHis(channels, properties);
-            }
+                foreach (string uploadUrl in uploadUrls)
+                {
+                    ConnectionFactory factory = new ConnectionFactory();
+                    factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
+                    factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
+                    factory.Password = Constants.UploadPassword;//默认密码
+
+                    factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
+
+                    IConnection connection = factory.CreateConnection();
+                    IModel channel = connection.CreateModel();
+                    channel.QueueDeclare("secondaryPump.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
+
+                    IBasicProperties property = channel.CreateBasicProperties();
+                    property.ContentType = "text/plain";
+                    property.DeliveryMode = 2; //持久化
+                    connections.Add(uploadUrl, connection);
+                    properties.Add(uploadUrl, property);
+                    channels.Add(uploadUrl, channel);
+                }
 
-            foreach (KeyValuePair<string, IConnection> item in connections)
+                if (channels.Count > 0)
+                {
+                    SendSecondaryPumpHis(channels, properties);
+                }
+
+                foreach (KeyValuePair<string, IConnection> item in connections)
+                {
+                    IConnection connection = item.Value;
+                    connection.Close();
+                }
+            }
+            catch (Exception ex)
             {
-                IConnection connection = item.Value;
-                connection.Close();
+
+                log.Debug(ex.Message);
             }
         }
 
@@ -78,35 +86,36 @@ namespace TimedUpload.QuartzJobs
 
                 string newSecondaryToDMACode = Constants.SecondaryToDMACode;
                 // 处理设备列表
-                for (int i = 0; i < dtDevice.Rows.Count; i++)
-                {
-                    DataRow dr = dtDevice.Rows[i];
-                    string id = dr["更新时间"].ToString();
-                    string deviceCode = dr["编码"].ToString();
-                    string PressureIn = dr["泵进口压力"].ToString();
-                    string PressureOut = dr["泵出口压力"].ToString();
-                    string PressureSet = dr["泵设定压力"].ToString();
-                    string InstantFlow = dr["瞬时流量"].ToString();
-                    string TotalFlow = dr["净累计流量"].ToString();
-                    string PositiveToTalFlow = dr["正累计流量"].ToString();
-                    string NegativeTotalFlow = dr["负累计流量"].ToString();
-                    string PH = dr["PH"].ToString();
-                    string Chlorine = dr["余氯"].ToString();
-                    string Turbidity = dr["浊度"].ToString();
-                    string LiquidHeight = dr["水箱液位"].ToString();
-                    string VoltageA = dr["电压AB"].ToString();
-                    string VoltageB = dr["电压AC"].ToString();
-                    string VoltageC = dr["电压BC"].ToString();
-                    string CurrentA = dr["电流A"].ToString();
-                    string CurrentB = dr["电流B"].ToString();
-                    string CurrentC = dr["电流C"].ToString();
-                    string Consumption = dr["用电量"].ToString();
-                    string LackWater = dr["缺水报警"].ToString();
-                    string OverPressure = dr["超压报警"].ToString();
-                    string HouseInlet = dr["进水报警"].ToString();
-                    string TubeBurst = dr["爆管报警"].ToString();
-
-                    string readTime = dr["日期"].ToString().Trim() + " " + dr["时间"].ToString().Trim();
+                // for (int i = 0; i < dtDevice.Rows.Count; i++)
+                // {
+                    //DataRow dr = dtDevice.Rows[i];
+                    string id = "2023/1/17 15:37";//dr["更新时间"].ToString();
+                    string deviceCode = "wwkj006";// dr["编码"].ToString();
+                    string PressureIn = "0";//dr["泵进口压力"].ToString();
+                    string PressureOut = "10.802";//dr["泵出口压力"].ToString();
+                    string PressureSet = "0";//dr["泵设定压力"].ToString();
+                    string InstantFlow = "0";//dr["瞬时流量"].ToString();
+                    string TotalFlow = "0";//dr["净累计流量"].ToString();
+                    string PositiveToTalFlow = "0";//dr["正累计流量"].ToString();
+                    string NegativeTotalFlow = "0";//dr["负累计流量"].ToString();
+                    string PH = "0";//dr["PH"].ToString();
+                    string Chlorine = "0";//dr["余氯"].ToString();
+                    string Turbidity = "0";//dr["浊度"].ToString();
+                    string LiquidHeight = "0";//dr["水箱液位"].ToString();
+                    string VoltageA = "0";//dr["电压AB"].ToString();
+                    string VoltageB = "0";//dr["电压AC"].ToString();
+                    string VoltageC = "0";//dr["电压BC"].ToString();
+                    string CurrentA = "0";//dr["电流A"].ToString();
+                    string CurrentB = "0";//dr["电流B"].ToString();
+                    string CurrentC = "0";//dr["电流C"].ToString();
+                    string Consumption = "0";//dr["用电量"].ToString();
+                    string LackWater ="0";//dr["缺水报警"].ToString();
+                    string OverPressure = "0";//dr["超压报警"].ToString();
+                    string HouseInlet = "1";//dr["进水报警"].ToString(); // 进水报警0为正常,1为进水。
+                    string TubeBurst = "0";//dr["爆管报警"].ToString();
+                    string NetState = "0";//dr["网络状态"].ToString(); // 0代表通讯正常,1代表网络故障,2代表现场485设备通讯故障
+
+                    string readTime = "2023/1/17 15:37:14";//dr["日期"].ToString().Trim() + " " + dr["时间"].ToString().Trim();
 
                     Dictionary<string, object> deviceMap = new Dictionary<string, object>();
                     deviceMap["DeviceCode"] = deviceCode;
@@ -133,9 +142,10 @@ namespace TimedUpload.QuartzJobs
                     deviceMap["OverPressure"] = OverPressure;
                     deviceMap["HouseInlet"] = HouseInlet;
                     deviceMap["TubeBurst"] = TubeBurst;
+                    deviceMap["NetState"] = NetState; 
                     ArrayList meterDataList = new ArrayList();
                     // 处理泵的数据
-                    for (int j = 1; j < 4; j++)
+                    for (int j = 1; j < 1; j++)
                     {
                         Dictionary<string, object> meterMap = new Dictionary<string, object>();
                         string colPre = "";
@@ -152,27 +162,27 @@ namespace TimedUpload.QuartzJobs
                                 break;
                         }
 
-                        string meterCode = "wwkj" + j.ToString().PadLeft(3, '0');
-                        string frequency = dr[colPre + "频率"].ToString();
-                        string current = dr[colPre + "电流"].ToString();
-                        string runState = dr[colPre + "运行状态"].ToString();
-                        string power = dr[colPre + "功率"].ToString();
-                        string voltage = dr[colPre + "电压"].ToString();
-
-                        meterMap["PumpCode"] = meterCode;
-                        meterMap["ReadTime"] = readTime;
-                        meterMap["Frequency"] = frequency;
-                        meterMap["Current"] = current;
-                        meterMap["RunState"] = runState;
-                        meterMap["Power"] = power; 
-                        meterMap["Voltage"] = voltage; 
+                        //string meterCode = "wwkj" + j.ToString().PadLeft(3, '0');
+                        //string frequency = dr[colPre + "频率"].ToString();
+                        //string current = dr[colPre + "电流"].ToString();
+                        //string runState = dr[colPre + "运行状态"].ToString();
+                        //string power = dr[colPre + "功率"].ToString();
+                        //string voltage = dr[colPre + "电压"].ToString();
+
+                        //meterMap["PumpCode"] = meterCode;
+                        //meterMap["ReadTime"] = readTime;
+                        //meterMap["Frequency"] = frequency;
+                        //meterMap["Current"] = current;
+                        //meterMap["RunState"] = runState;
+                        //meterMap["Power"] = power; 
+                        //meterMap["Voltage"] = voltage; 
                         meterDataList.Add(meterMap);
                     }
 
-                    if (meterDataList.Count == 0) 
-                    {
-                        continue;
-                    }
+                    //if (meterDataList.Count == 0) 
+                    //{
+                    //    continue;
+                    //}
                     deviceMap["PumpData"] = meterDataList;
 
 
@@ -187,26 +197,26 @@ namespace TimedUpload.QuartzJobs
                     }
 
                     // 判断数据是否需要同步分区计量
-                    if ("1".Equals(Constants.SecondaryToDMA))
-                    {
-                        string deviceCodeTemp = SecondaryToDMA(dr, channels, properties, newSecondaryToDMACode);
+                    //if ("1".Equals(Constants.SecondaryToDMA))
+                    //{
+                    //    string deviceCodeTemp = SecondaryToDMA(dr, channels, properties, newSecondaryToDMACode);
 
 
-                        if ("".Equals(newSecondaryToDMACode))
-                        {
-                            newSecondaryToDMACode = deviceCodeTemp;
-                        }
-                        else
-                        {
+                    //    if ("".Equals(newSecondaryToDMACode))
+                    //    {
+                    //        newSecondaryToDMACode = deviceCodeTemp;
+                    //    }
+                    //    else
+                    //    {
 
-                            newSecondaryToDMACode = newSecondaryToDMACode + "," + deviceCodeTemp;
-                        }
-                    }
+                    //        newSecondaryToDMACode = newSecondaryToDMACode + "," + deviceCodeTemp;
+                    //    }
+                    //}
 
                     // 删除数据
                     string deleteSql = "DELETE FROM [dbo].[二供通用泵站] where 更新时间 = '" + id + "' and 编码 = '" + deviceCode + "'";
                     dbHelper.ExecuteNonQuery(deleteSql);
-                }
+                //}
                 if ("1".Equals(Constants.SecondaryToDMA))
                 {
                     UpdateAppConfig("SecondaryToDMACode", newSecondaryToDMACode);

+ 1 - 1
TimedUpload/app.config

@@ -3,7 +3,7 @@
   <appSettings>
     <add key="DbConncetion" value="Data Source=47.105.90.108;Initial Catalog=大安大表;uid=sa;password=wwkj@2136807" />
     <add key="WaterFactoryDbConncetion" value="Data Source=39.99.237.110;Initial Catalog=大安水厂;uid=sa;password=wwkj@2136807" />
-    <add key="ChargeDB" value="Data Source=222.163.159.218,10433;Initial Catalog=ChargeManage_Test;uid=sa;password=Daswwwkj@123" />
+    <add key="ChargeDB" value="Data Source=222.163.159.218,10433;Initial Catalog=ChargeManage;uid=sa;password=Daswwwkj@123" />
     <add key="zhihuishuiwuDB" value="server=39.99.237.110;user id=root;password=wwkj@2136807;database=smartwater_daan;charset=utf8" />
     <!-- 智慧水务系统RabbitMQ信息 start -->
     <add key="UploadUrl" value="127.0.0.1" />

+ 5 - 5
TimedUpload/quartz_jobs.xml

@@ -69,7 +69,7 @@
     </trigger>-->
 
     <!--二供数据定时上传数据-->
-    <job>
+    <!--<job>
       <name>SecondaryPumpDataUploadJob</name>
       <group>SecondaryPumpDataUpload</group>
       <description>数据定时上传服务</description>
@@ -86,9 +86,9 @@
         <start-time>2017-08-08T00:00:00+08:00</start-time>
         <cron-expression>20 0/5 * * * ? </cron-expression>
       </cron>
-    </trigger>
+    </trigger>-->
+
 
-    
     <!--水源井数据定时上传数据-->
     <!--<job>
       <name>WaterWellDataUploadJob</name>
@@ -129,7 +129,7 @@
       </cron>
     </trigger>-->
 
-    <!--营收数据定时上传数据
+    <!--营收数据定时上传数据-->
     <job>
       <name>DABusinessDataJob</name>
       <group>DABusinessData</group>
@@ -147,7 +147,7 @@
         <start-time>2017-08-08T00:00:00+08:00</start-time>
         <cron-expression>0 0 1 * * ? *</cron-expression>
       </cron>
-    </trigger>-->
+    </trigger>
     <!--水厂出口表数据定时上传数据
     <job>
       <name>WaterFactoryAreaDataJob</name>