| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 | using log4net;using Newtonsoft.Json;using Quartz;using RabbitMQ.Client;using RDIFramework.Utilities;using System;using System.Collections.Generic;using System.Data;using System.Text;using TimedUpload.utils;namespace TimedUpload.QuartzJobs{    [DisallowConcurrentExecution]    public class WaterWellDataUploadJob : IJob    {        private readonly ILog log = LogManager.GetLogger(typeof(WaterWellDataUploadJob));        public void Execute(IJobExecutionContext context)        {            string[] uploadUrls = Constants.UploadUrl.Split('|');            Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();            Dictionary<string, IModel> channels = new Dictionary<string, IModel>();            Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();            foreach (string uploadUrl in uploadUrls)            {                ConnectionFactory factory = new ConnectionFactory();                factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。                factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行                factory.Password = Constants.UploadPassword;//默认密码                factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连                IConnection connection = factory.CreateConnection();                IModel channel = connection.CreateModel();                channel.QueueDeclare("waterWell.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列                IBasicProperties property = channel.CreateBasicProperties();                property.ContentType = "text/plain";                property.DeliveryMode = 2; //持久化                connections.Add(uploadUrl, connection);                properties.Add(uploadUrl, property);                channels.Add(uploadUrl, channel);            }            if (channels.Count > 0)            {                SendWaterWellHis(channels, properties);            }            foreach (KeyValuePair<string, IConnection> item in connections)            {                IConnection connection = item.Value;                connection.Close();            }        }        /// <summary>        /// 水源井历史数据        /// </summary>        /// <param name="channels"></param>        /// <param name="properties"></param>        private void SendWaterWellHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)        {            try            {                log.Info("水源井历史数据同步任务开始执行.................\r\n");                string factorySql = "SELECT * FROM [dbo].[水源井数据] ORDER BY 日期, 时间 ";                DataTable dtDevice = dbHelper.Fill(factorySql);                if (dtDevice == null || dtDevice.Rows.Count == 0)                {                    return;                }                // 处理设备列表                for (int i = 0; i < dtDevice.Rows.Count; i++)                {                    DataRow dr = dtDevice.Rows[i];                    string id = dr["更新时间"].ToString();                    string deviceCode = dr["编码"].ToString();                    string InstantFlow = dr["井瞬时流量"].ToString();                    string TotalFlow = dr["井累计流量"].ToString();                    string VoltageA = dr["井电压A相"].ToString();                    string VoltageB = dr["井电压B相"].ToString();                    string VoltageC = dr["井电压C相"].ToString();                    string CurrentA = dr["井电流A相"].ToString();                    string CurrentB = dr["井电流B相"].ToString();                    string CurrentC = dr["井电流C相"].ToString();                    string Consumption = dr["井电能"].ToString();                    string Power = dr["井功率"].ToString();                    string IsAuto = dr["井控制模式"].ToString();                    string RunState = dr["井运行状态"].ToString();                    if (!CommonUtil.IsNumber(IsAuto))                    {                        if ("TRUE".Equals(IsAuto.ToUpper()))                        {                            IsAuto = "1";                        }                        else                        {                            IsAuto = "0";                        }                    }                    if (!CommonUtil.IsNumber(RunState))                    {                        if ("TRUE".Equals(RunState.ToUpper()))                        {                            RunState = "1";                        }                        else                        {                            RunState = "0";                        }                    }                    string readTime = dr["日期"].ToString().Trim() + " " + dr["时间"].ToString().Trim();                    Dictionary<string, object> deviceMap = new Dictionary<string, object>();                    deviceMap["DeviceCode"] = deviceCode;                    deviceMap["ReadTime"] = readTime;                    deviceMap["InstantFlow"] = InstantFlow;                    deviceMap["TotalFlow"] = TotalFlow;                    deviceMap["VoltageA"] = VoltageA;                    deviceMap["VoltageB"] = VoltageB;                    deviceMap["VoltageC"] = VoltageC;                    deviceMap["CurrentA"] = CurrentA;                    deviceMap["CurrentB"] = CurrentB;                    deviceMap["CurrentC"] = CurrentC;                    deviceMap["Consumption"] = Consumption;                    deviceMap["IsAuto"] = IsAuto;                    deviceMap["RunState"] = RunState;                    deviceMap["Power"] = Power;                    string message = JsonConvert.SerializeObject(deviceMap);                    foreach (KeyValuePair<string, IModel> item in channels)                    {                        string key = item.Key;                        IModel channel = item.Value;                        IBasicProperties property = properties[key];                        channel.BasicPublish("waterWell.deviceHis", "", property, Encoding.UTF8.GetBytes(message)); //生产消息                    }                    // 删除数据                    string deleteSql = "DELETE FROM [dbo].[水源井数据] where 更新时间 = '" + id + "' and 编码 = '" + deviceCode + "'";                    dbHelper.ExecuteNonQuery(deleteSql);                }                log.Info("水源井历史记录同步任务执行结束.................\r\n");            }            catch (Exception ex)            {                log.Error("水源井历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");            }        }        static IDbProvider dbHelper        {            get            {                var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion);                return DbDefine;            }        }    }}
 |