123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217 |
- using log4net;
- using Quartz;
- using RabbitMQ.Client;
- using RDIFramework.Utilities;
- using System;
- using System.Collections.Generic;
- using System.Data;
- using System.IO;
- using System.Text;
- namespace TimedUpload.QuartzJobs
- {
- [DisallowConcurrentExecution]
- public class WaterFactoryAreaDataJob :IJob
- {
- private readonly ILog log = LogManager.GetLogger(typeof(WaterFactoryAreaDataJob));
- public void Execute(IJobExecutionContext context)
- {
-
- string[] uploadUrls = Constants.UploadUrl.Split('|');
- Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
- Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
- Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
- foreach (string uploadUrl in uploadUrls)
- {
- ConnectionFactory factory = new ConnectionFactory();
- factory.HostName = uploadUrl;
- factory.UserName = Constants.UploadUserName;
- factory.Password = Constants.UploadPassword;
- factory.AutomaticRecoveryEnabled = true;
- IConnection connection = factory.CreateConnection();
- IModel channel = connection.CreateModel();
-
- channel.QueueDeclare("zone.deviceHis", true, false, false, null);
- IBasicProperties property = channel.CreateBasicProperties();
- property.ContentType = "text/plain";
- property.DeliveryMode = 2;
- connections.Add(uploadUrl, connection);
- properties.Add(uploadUrl, property);
- channels.Add(uploadUrl, channel);
- }
- if (channels.Count > 0)
- {
- SendZoneDeviceHis(channels, properties);
- }
- foreach (KeyValuePair<string, IConnection> item in connections)
- {
- IConnection connection = item.Value;
- connection.Close();
- }
- }
-
-
-
-
-
- private void SendZoneDeviceHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
- {
- try
- {
- log.Info("水厂大表设备历史数据同步任务开始执行.................\r\n");
- String sqlMeter = "SELECT * FROM bs_waterfactory_meter WHERE Type = 1 AND IsFlow = 1";
- DataTable dtMeter = dbHelper.Fill(sqlMeter);
-
-
-
-
- for (int i = 0; i < dtMeter.Rows.Count; i++)
- {
- DataRow drMeter = dtMeter.Rows[i];
-
- String meterCode = drMeter["MeterCode"].ToString().TrimEnd() + "wf";
- String lastTime = "";
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- StringBuilder message = new StringBuilder();
- for (int j = 0; j < dtMeter.Rows.Count; j++)
- {
- message.Clear();
- try
- {
-
- String getDateTime = Convert.ToDateTime(drMeter["ReadTime"]).ToString("yyyy-MM-dd HH:mm:ss");
- message.Append("{");
- message.Append("\"meterAssessmentCode\": \"").Append(drMeter["MeterCode"].ToString()).Append("wf\",");
- message.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(",");
- message.Append("\"getDateTime\": \"").Append(getDateTime).Append("\",");
- if (Convert.DBNull != drMeter["NetCumulativeFlow"])
- {
- message.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(drMeter["NetCumulativeFlow"])).Append(",");
- }
- if (Convert.DBNull != drMeter["NetCumulativeFlow"])
- {
- message.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(drMeter["NetCumulativeFlow"])).Append(",");
- }
-
- message.Append("\"negativeCumulativeFlow\": ").Append(0).Append(",");
-
- if (Convert.DBNull != drMeter["InstantaneousFlow"])
- {
- message.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(drMeter["instantaneousFlow"])).Append(",");
- }
- if (Convert.DBNull != drMeter["Pressure"])
- {
- message.Append("\"pressure\": ").Append(Convert.ToDecimal(drMeter["Pressure"])).Append(",");
- }
-
-
-
-
- message.Append("}");
- foreach (KeyValuePair<string, IModel> item in channels)
- {
- string key = item.Key;
- IModel channel = item.Value;
- IBasicProperties property = properties[key];
- channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(message.ToString()));
- }
- lastTime = getDateTime;
- }
- catch (Exception ex)
- {
- log.Info("大表设备历史记录同步任务数据推送失败:" + message.ToString() + "\r\n");
- log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n");
- }
- }
-
-
-
-
-
- }
-
- log.Info("水厂大表设备历史记录同步任务执行结束.................\r\n");
- }
- catch (Exception ex)
- {
- log.Error("水厂大表设备历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
- }
- }
-
-
-
-
-
- private bool CheckTableExist(string tablename)
- {
- DataTable table = dbHelper.Fill("select top 1 * from sysobjects where name='" + tablename + "' and xtype='u'");
- if (table == null || table.Rows.Count == 0)
- {
- return false;
- }
- return true;
- }
-
-
-
-
- private void SavaUploadHis(Dictionary<String, String> uploadHis)
- {
-
- FileStream stream = File.Open(@"TextFile1.txt", FileMode.OpenOrCreate, FileAccess.Write);
- stream.Seek(0, SeekOrigin.Begin);
- stream.SetLength(0);
- stream.Close();
- using (StreamWriter sw = new StreamWriter(@"TextFile1.txt"))
- {
- foreach (var item in uploadHis)
- {
- sw.WriteLine(item.Key + "," + item.Value);
- }
- }
- }
- static IDbProvider dbHelper
- {
- get
- {
- var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.MySql, Constants.zhihuishuiwuDB);
- return DbDefine;
- }
- }
- }
- }
|