123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- using log4net;
- using Newtonsoft.Json;
- using Quartz;
- using RabbitMQ.Client;
- using RDIFramework.Utilities;
- using System;
- using System.Collections.Generic;
- using System.Data;
- using System.Linq;
- using System.Text;
- namespace TimedUpload.QuartzJobs
- {
- public class ChangleWorkmanshipDataUploadJob : IJob
- {
- private readonly ILog log = LogManager.GetLogger(typeof(WorkmanshipDataUploadJob));
- public void Execute(IJobExecutionContext context)
- {
- string[] uploadUrls = Constants.UploadUrl.Split('|');
- Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
- Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
- Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
- foreach (string uploadUrl in uploadUrls)
- {
- ConnectionFactory factory = new ConnectionFactory();
- factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
- factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
- factory.Password = Constants.UploadPassword;//默认密码
- factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
- IConnection connection = factory.CreateConnection();
- IModel channel = connection.CreateModel();
- channel.QueueDeclare("workmanship", true, false, false, null);//创建一个名称为kibaqueue的消息队列
- IBasicProperties property = channel.CreateBasicProperties();
- property.ContentType = "text/plain";
- property.DeliveryMode = 2; //持久化
- connections.Add(uploadUrl, connection);
- properties.Add(uploadUrl, property);
- channels.Add(uploadUrl, channel);
- }
- if (channels.Count > 0)
- {
- SendSeconddaryPumpData(channels, properties);
- //SendWaterWellData(channels, properties);
- SendWaterFactoryData(channels, properties);
- }
- foreach (KeyValuePair<string, IConnection> item in connections)
- {
- IConnection connection = item.Value;
- connection.Close();
- }
- }
- /// <summary>
- /// 二供数据
- /// </summary>
- /// <param name="channel"></param>
- private void SendSeconddaryPumpData(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
- {
- try
- {
- log.Info("二供工艺图数据同步任务开始执行.................\r\n");
- string factorySql = "select 编码 DeviceCode,1 Type,RTRIM(日期) + ' ' + RTRIM(时间) ReadTime,* from (select row_number() over(partition by 编码 order by 日期 desc, 时间 desc) as rid,* from 二供通用泵站 where 编码 is not null) as tt where tt.rid=1";
- DataTable dtDevice = dbHelper.Fill(factorySql);
- if (dtDevice == null || dtDevice.Rows.Count == 0)
- {
- return;
- }
- DataColumnCollection cols = dtDevice.Columns;
- // 处理设备列表
- for (int i = 0; i < dtDevice.Rows.Count; i++)
- {
- DataRow dr = dtDevice.Rows[i];
- string deviceCode = dr["DeviceCode"].ToString();
- string type = dr["Type"].ToString();
- string realData = CreateJsonParameters(cols, dr);
- Dictionary<string, object> deviceMap = new Dictionary<string, object>();
- deviceMap["DeviceCode"] = deviceCode;
- deviceMap["Type"] = type;
- deviceMap["RealData"] = realData;
- string message = JsonConvert.SerializeObject(deviceMap);
- foreach (KeyValuePair<string, IModel> item in channels)
- {
- string key = item.Key;
- IModel channel = item.Value;
- IBasicProperties property = properties[key];
- channel.BasicPublish("workmanship", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
- }
- }
- log.Info("二供工艺图数据同步任务开始执行.................\r\n");
- }
- catch (Exception ex)
- {
- log.Error("二供工艺图数据同步任务错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
- }
- }
- /// <summary>
- /// 水源井数据
- /// </summary>
- /// <param name="channel"></param>
- private void SendWaterWellData(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
- {
- try
- {
- log.Info("水源井工艺图数据同步任务开始执行.................\r\n");
- string factorySql = "select 编码 DeviceCode,2 Type,RTRIM(日期) + ' ' + RTRIM(时间) ReadTime,* from (select row_number() over(partition by 编码 order by 日期 desc, 时间 desc) as rid,* from 水源井数据 where 编码 is not null) as tt where tt.rid=1";
- DataTable dtDevice = dbHelper.Fill(factorySql);
- if (dtDevice == null || dtDevice.Rows.Count == 0)
- {
- return;
- }
- DataColumnCollection cols = dtDevice.Columns;
- // 处理设备列表
- for (int i = 0; i < dtDevice.Rows.Count; i++)
- {
- DataRow dr = dtDevice.Rows[i];
- string deviceCode = dr["DeviceCode"].ToString();
- string type = dr["Type"].ToString();
- string realData = CreateJsonParameters(cols, dr);
- Dictionary<string, object> deviceMap = new Dictionary<string, object>();
- deviceMap["DeviceCode"] = deviceCode;
- deviceMap["Type"] = type;
- deviceMap["RealData"] = realData;
- string message = JsonConvert.SerializeObject(deviceMap);
- foreach (KeyValuePair<string, IModel> item in channels)
- {
- string key = item.Key;
- IModel channel = item.Value;
- IBasicProperties property = properties[key];
- channel.BasicPublish("workmanship", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
- }
- }
- log.Info("水源井工艺图数据同步任务结束.................\r\n");
- }
- catch (Exception ex)
- {
- log.Error("水源井工艺图数据同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
- }
- }
- /// <summary>
- /// 水厂数据
- /// </summary>
- /// <param name="channel"></param>
- private void SendWaterFactoryData(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
- {
- try
- {
- log.Info("水厂工艺图数据同步任务开始执行.................\r\n");
- DateTime newTime = DateTime.Now;
- String yearStr = newTime.Year.ToString();
- string factorySql = "";
- DataTable dtDevice = dbHelper.Fill(factorySql);
- if (dtDevice == null || dtDevice.Rows.Count == 0)
- {
- return;
- }
- DataColumnCollection cols = dtDevice.Columns;
- // 处理设备列表
- for (int i = 0; i < dtDevice.Rows.Count; i++)
- {
- DataRow dr = dtDevice.Rows[i];
- string deviceCode = dr["DeviceCode"].ToString();
- string type = dr["Type"].ToString();
- string realData = CreateJsonParameters(cols, dr);
- Dictionary<string, object> deviceMap = new Dictionary<string, object>();
- deviceMap["DeviceCode"] = deviceCode;
- deviceMap["Type"] = type;
- deviceMap["RealData"] = realData;
- string message = JsonConvert.SerializeObject(deviceMap);
- foreach (KeyValuePair<string, IModel> item in channels)
- {
- string key = item.Key;
- IModel channel = item.Value;
- IBasicProperties property = properties[key];
- channel.BasicPublish("workmanship", "", property, Encoding.UTF8.GetBytes(message)); //生产消息
- }
- }
- log.Info("水厂工艺图数据同步任务执行结束.................\r\n");
- }
- catch (Exception ex)
- {
- log.Error("水厂工艺图数据同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
- }
- }
- private static string CreateJsonParameters(DataColumnCollection cols, DataRow dr)
- {
- StringBuilder JsonString = new StringBuilder();
- JsonString.Append("{");
- for (int j = 0; j < cols.Count; j++)
- {
- if (j < cols.Count - 1)
- {
- JsonString.Append("\"" + cols[j].ColumnName.ToString() + "\":" + "\"" + dr[j].ToString().Trim() + "\",");
- }
- else if (j == cols.Count - 1)
- {
- JsonString.Append("\"" + cols[j].ColumnName.ToString() + "\":" + "\"" + dr[j].ToString().Trim() + "\"");
- }
- }
- JsonString.Append("}");
- return JsonString.ToString();
- }
- static IDbProvider dbHelper
- {
- get
- {
- var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.WaterFactoryDbConncetion);
- return DbDefine;
- }
- }
- }
- }
|