using log4net; using Quartz; using RabbitMQ.Client; using RDIFramework.Utilities; using System; using System.Collections.Generic; using System.Data; using System.IO; using System.Text; namespace TimedUpload.QuartzJobs { [DisallowConcurrentExecution] public class WaterFactoryAreaDataJob :IJob { private readonly ILog log = LogManager.GetLogger(typeof(WaterFactoryAreaDataJob)); public void Execute(IJobExecutionContext context) { // throw new NotImplementedException(); string[] uploadUrls = Constants.UploadUrl.Split('|'); Dictionary connections = new Dictionary(); Dictionary channels = new Dictionary(); Dictionary properties = new Dictionary(); foreach (string uploadUrl in uploadUrls) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。 factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行 factory.Password = Constants.UploadPassword;//默认密码 factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连 IConnection connection = factory.CreateConnection(); IModel channel = connection.CreateModel(); channel.QueueDeclare("zone.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列 IBasicProperties property = channel.CreateBasicProperties(); property.ContentType = "text/plain"; property.DeliveryMode = 2; //持久化 connections.Add(uploadUrl, connection); properties.Add(uploadUrl, property); channels.Add(uploadUrl, channel); } if (channels.Count > 0) { SendZoneDeviceHis(channels, properties); } foreach (KeyValuePair item in connections) { IConnection connection = item.Value; connection.Close(); } } /// /// 大表历史数据 /// /// /// private void SendZoneDeviceHis(Dictionary channels, Dictionary properties) { try { log.Info("水厂大表设备历史数据同步任务开始执行.................\r\n"); String sqlMeter = "SELECT * FROM bs_waterfactory_meter WHERE Type = 1 AND IsFlow = 1";//"SELECT ID,考核表编码 FROM [设备信息] where 是否启用 = '是' and 考核表编码 is not null order by ID"; DataTable dtMeter = dbHelper.Fill(sqlMeter); //Dictionary arguments = new Dictionary(); //arguments["x-max-length-bytes"] = 2147383648; //arguments["x-overflow"] = "reject-publish"; for (int i = 0; i < dtMeter.Rows.Count; i++) { DataRow drMeter = dtMeter.Rows[i]; //String meterId = drMeter["ID"].ToString(); String meterCode = drMeter["MeterCode"].ToString().TrimEnd() + "wf"; String lastTime = ""; //for (int k = lastYear; k <= nowYear; k++) //{ //string tablename = "历史记录_" + ("000000" + meterId).Substring(meterId.Length, 6) + "_" + k; // 判断历史记录表是否存在 //if (!CheckTableExist(tablename)) //{ // continue; //} //String sqlMeterHis = "select 记录时间,采集时间,正累计流量,负累计流量,净累计流量,瞬时流量,电池电压,压力 from " + tablename; //if (uploadHis.ContainsKey(meterCode)) //{ // sqlMeterHis += " where 采集时间 > '" + uploadHis[meterCode] + "'"; //} //sqlMeterHis += " order by 采集时间"; //DataTable dtMeterHis = dbHelper.Fill(sqlMeterHis); StringBuilder message = new StringBuilder(); for (int j = 0; j < dtMeter.Rows.Count; j++) { message.Clear(); try { //DataRow drMeterHis = dtMeterHis.Rows[j]; String getDateTime = Convert.ToDateTime(drMeter["ReadTime"]).ToString("yyyy-MM-dd HH:mm:ss"); message.Append("{"); message.Append("\"meterAssessmentCode\": \"").Append(drMeter["MeterCode"].ToString()).Append("wf\","); message.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(","); message.Append("\"getDateTime\": \"").Append(getDateTime).Append("\","); if (Convert.DBNull != drMeter["NetCumulativeFlow"]) { message.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(drMeter["NetCumulativeFlow"])).Append(","); } if (Convert.DBNull != drMeter["NetCumulativeFlow"]) { message.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(drMeter["NetCumulativeFlow"])).Append(","); } message.Append("\"negativeCumulativeFlow\": ").Append(0).Append(","); if (Convert.DBNull != drMeter["InstantaneousFlow"]) { message.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(drMeter["instantaneousFlow"])).Append(","); } if (Convert.DBNull != drMeter["Pressure"]) { message.Append("\"pressure\": ").Append(Convert.ToDecimal(drMeter["Pressure"])).Append(","); } //if (Convert.DBNull != drMeter["电池电压"]) //{ // message.Append("\"batteryVoltageValue\": ").Append(Convert.ToDecimal(drMeterHis["电池电压"])).Append(","); //} message.Append("}"); foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息 } lastTime = getDateTime; } catch (Exception ex) { log.Info("大表设备历史记录同步任务数据推送失败:" + message.ToString() + "\r\n"); log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n"); } } //} //if (!"".Equals(lastTime)) //{ // uploadHis[meterCode] = lastTime; //} } //SavaUploadHis(uploadHis); log.Info("水厂大表设备历史记录同步任务执行结束.................\r\n"); } catch (Exception ex) { log.Error("水厂大表设备历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n"); } } /// /// 判断历史记录表是否存在 /// /// /// private bool CheckTableExist(string tablename) { DataTable table = dbHelper.Fill("select top 1 * from sysobjects where name='" + tablename + "' and xtype='u'"); if (table == null || table.Rows.Count == 0) { return false; } return true; } /// /// 保存每块块表的上传最后一条历史记录 /// /// private void SavaUploadHis(Dictionary uploadHis) { // 清除之前的内容 FileStream stream = File.Open(@"TextFile1.txt", FileMode.OpenOrCreate, FileAccess.Write); stream.Seek(0, SeekOrigin.Begin); stream.SetLength(0); stream.Close(); using (StreamWriter sw = new StreamWriter(@"TextFile1.txt")) { foreach (var item in uploadHis) { sw.WriteLine(item.Key + "," + item.Value); } } } static IDbProvider dbHelper { get { var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.MySql, Constants.zhihuishuiwuDB); return DbDefine; } } } }