using log4net; using Quartz; using RabbitMQ.Client; using RDIFramework.Utilities; using System; using System.Collections.Generic; using System.Configuration; using System.Data; using System.IO; using System.Text; using TimedUpload.utils; namespace TimedUpload.QuartzJobs { [DisallowConcurrentExecution] public class HeTongDataUploadJob : IJob { private readonly ILog log = LogManager.GetLogger(typeof(HeTongDataUploadJob)); MySqlHelper mySqlHelper = new MySqlHelper("DbMySQL"); public void Execute(IJobExecutionContext context) { string[] uploadUrls = Constants.UploadUrlHeTong.Split('|'); Dictionary connections = new Dictionary(); Dictionary channels = new Dictionary(); Dictionary properties = new Dictionary(); foreach (string uploadUrl in uploadUrls) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。 factory.UserName = Constants.UploadUserNameHeTong;//默认用户名,用户可以在服务端自定义创建,有相关命令行 factory.Password = Constants.UploadPasswordHeTong;//默认密码 factory.VirtualHost = Constants.VirtualHostHeTong; factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连 IConnection connection = factory.CreateConnection(); IModel channel = connection.CreateModel(); channel.QueueDeclare("archives.dmaDevice.exchange", true, false, false, null);//创建一个名称为kibaqueue的消息队列 channel.QueueDeclare("data.dmaDeviceData.exchange", true, false, false, null);//创建一个名称为kibaqueue的消息队列 IBasicProperties property = channel.CreateBasicProperties(); property.ContentType = "text/plain"; property.DeliveryMode = 2; //持久化 connections.Add(uploadUrl, connection); properties.Add(uploadUrl, property); channels.Add(uploadUrl, channel); } if (channels.Count > 0) { //SendZoneDevice(channels, properties); SendZoneDeviceHis(channels, properties); } foreach (KeyValuePair item in connections) { IConnection connection = item.Value; connection.Close(); } } /// /// 大表设备添加 /// /// /// private void SendZoneDevice(Dictionary channels, Dictionary properties) { try { Dictionary devIdDic = GetDevIds(); string devIdsTmp = ""; foreach (KeyValuePair item in devIdDic) { devIdsTmp += item.Key + ","; } string devIds = devIdsTmp.Substring(0, devIdsTmp.Length - 1); log.Info("大表设备同步任务开始执行.................\r\n"); string sql = "SELECT MeterAssessmentName,MeterAssessmentCode,ICCID,LngAndLat,IsPressucre,IsFlow,isZoneMeter,isTradeMeter,isLargeUser FROM bs_meterassessmentbase WHERE MeterAssessmentId IN (" + devIds + ")"; DataTable dt = mySqlHelper.GetDataTable(sql); StringBuilder message = new StringBuilder(); for (int i = 0; i < dt.Rows.Count; i++) { message.Clear(); try { DataRow dr = dt.Rows[i]; String iccid = ""; if (!"".Equals(dr["ICCID"].ToString())) { iccid = dr["ICCID"].ToString().Split(',')[0]; } String lngAndLat = ""; if (!"".Equals(dr["LngAndLat"].ToString())) { lngAndLat = dr["LngAndLat"].ToString(); } message.Append("{\"deviceAdd\":{"); message.Append("\"meterAssessmentName\": \"").Append(dr["MeterAssessmentName"]).Append("\","); message.Append("\"iccId\": ").Append(iccid).Append(","); message.Append("\"lngAndLat\": \"").Append(lngAndLat).Append("\","); message.Append("\"isPressucre\": ").Append(dr["IsPressucre"]).Append(","); message.Append("\"isFlow\": ").Append(dr["IsFlow"]).Append(","); message.Append("\"isZoneMeter\": ").Append(dr["isZoneMeter"]).Append(","); message.Append("\"isTradeMeter\": ").Append(dr["isTradeMeter"]).Append(","); message.Append("\"isLargeUser\": ").Append(dr["isLargeUser"]).Append(","); message.Append("\"meterAssessmentCode\": \"").Append(dr["MeterAssessmentCode"]).Append("\","); message.Append("\"manufacturerCode\": \"ht\""); message.Append("}}"); foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("archives.dmaDevice.exchange", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息 } } catch (Exception ex) { log.Info("大表设备同步任务数据推送失败:" + message.ToString() + "\r\n"); log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n"); } } log.Info("大表设备同步任务执行结束.................\r\n"); } catch (Exception ex) { log.Error("大表设备同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n"); } } /// /// 大表历史数据 /// /// /// private void SendZoneDeviceHis(Dictionary channels, Dictionary properties) { try { log.Info("大表设备历史数据同步和同DMA任务开始执行.................\r\n"); Dictionary uploadHis = GetDevIds(); Dictionary uploadHisNew = new Dictionary(); foreach (KeyValuePair itemDev in uploadHis) { string meterId = itemDev.Key; string lastTime = itemDev.Value; string sql = "SELECT MeterAssessmentCode,GetDateTime,NetCumulativeFlow,PositiveCumulativeFlow,NegativeCumulativeFlow,InstantaneousFlow,Pressure,BatteryVoltageValue FROM bs_meterassessmentbase_" + meterId + " a WHERE a.GetDateTime > '" + lastTime + "' ORDER BY GetDateTime ASC"; try { DataTable dtMeterHis = mySqlHelper.GetDataTable(sql); StringBuilder message = new StringBuilder(); for (int j = 0; j < dtMeterHis.Rows.Count; j++) { message.Clear(); try { DataRow drMeterHis = dtMeterHis.Rows[j]; String meterCode = drMeterHis["MeterAssessmentCode"].ToString(); String getDateTime = Convert.ToDateTime(drMeterHis["GetDateTime"]).ToString("yyyy-MM-dd HH:mm:ss"); message.Append("{"); message.Append("\"meterAssessmentCode\": \"").Append(meterCode).Append("\","); message.Append("\"manufacturerCode\": \"ht\","); message.Append("\"getDateTime\": \"").Append(getDateTime).Append("\","); if (Convert.DBNull != drMeterHis["NetCumulativeFlow"]) { message.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["NetCumulativeFlow"])).Append(","); } if (Convert.DBNull != drMeterHis["PositiveCumulativeFlow"]) { message.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["PositiveCumulativeFlow"])).Append(","); } if (Convert.DBNull != drMeterHis["NegativeCumulativeFlow"]) { message.Append("\"negativeCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["NegativeCumulativeFlow"])).Append(","); } if (Convert.DBNull != drMeterHis["InstantaneousFlow"]) { message.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(drMeterHis["InstantaneousFlow"])).Append(","); } if (Convert.DBNull != drMeterHis["Pressure"]) { message.Append("\"pressure\": ").Append(Convert.ToDecimal(drMeterHis["Pressure"])).Append(","); } if (Convert.DBNull != drMeterHis["BatteryVoltageValue"]) { message.Append("\"batteryVoltageValue\": ").Append(Convert.ToDecimal(drMeterHis["BatteryVoltageValue"])).Append(","); } message.Append("}"); foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("data.dmaDeviceData.exchange", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息 } lastTime = getDateTime; } catch (Exception ex) { log.Info("大表设备历史记录同步和同DMA任务数据推送失败:" + meterId + "," + message.ToString() + "\r\n"); log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n"); continue; } } } catch (Exception ex) { log.Error(meterId + "," + ex.Message + "===========" + ex.StackTrace + "\r\n"); } uploadHisNew[meterId] = lastTime; } SavaUploadHis(uploadHisNew); log.Info("大表设备历史记录同步和同DMA任务执行结束.................\r\n"); } catch (Exception ex) { log.Error("大表设备历史记录同步和同DMA任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n"); } } private Dictionary GetDevIds() { Dictionary uploadHis = new Dictionary(); using (StreamReader sr = new StreamReader(@"TextFileHeTong.txt")) { String line = ""; while ((line = sr.ReadLine()) != null) { if (!"".Equals(line)) { String[] item = line.Split(','); uploadHis[item[0]] = item[1]; } } } return uploadHis; } /// /// 更新配置文件中的值 /// /// 键 /// 值 private void UpdateAppConfig(String key, String value) { var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None); cfg.AppSettings.Settings[key].Value = value; cfg.Save(); ConfigurationManager.RefreshSection("appSettings"); } /// /// 判断历史记录表是否存在 /// /// /// private bool CheckTableExist(string tablename) { DataTable table = dbHelper.Fill("select top 1 * from sysobjects where name='" + tablename + "' and xtype='u'"); if (table == null || table.Rows.Count == 0) { return false; } return true; } /// /// 保存每块块表的上传最后一条历史记录 /// /// private void SavaUploadHis(Dictionary uploadHis) { // 清除之前的内容 FileStream stream = File.Open(@"TextFileHeTong.txt", FileMode.OpenOrCreate, FileAccess.Write); stream.Seek(0, SeekOrigin.Begin); stream.SetLength(0); stream.Close(); using (StreamWriter sw = new StreamWriter(@"TextFileHeTong.txt")) { foreach (var item in uploadHis) { sw.WriteLine(item.Key + "," + item.Value); } } } static IDbProvider dbHelper { get { var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.DbConncetion); return DbDefine; } } } }