using log4net; using Quartz; using RabbitMQ.Client; using RDIFramework.Utilities; using System; using System.Collections.Generic; using System.Configuration; using System.Data; using System.IO; using System.Linq; using System.Text; namespace TimedUpload.QuartzJobs { class jingshanDataUploadDataJob : IJob { private readonly ILog log = LogManager.GetLogger(typeof(DataUploadJob)); public void Execute(IJobExecutionContext context) { string[] uploadUrls = Constants.UploadUrl.Split('|'); Dictionary connections = new Dictionary(); Dictionary channels = new Dictionary(); Dictionary properties = new Dictionary(); foreach (string uploadUrl in uploadUrls) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。 factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行 factory.Password = Constants.UploadPassword;//默认密码 factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连 IConnection connection = factory.CreateConnection(); IModel channel = connection.CreateModel(); channel.QueueDeclare("zone.device", true, false, false, null);//创建一个名称为kibaqueue的消息队列 channel.QueueDeclare("zone.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列 IBasicProperties property = channel.CreateBasicProperties(); property.ContentType = "text/plain"; property.DeliveryMode = 2; //持久化 connections.Add(uploadUrl, connection); properties.Add(uploadUrl, property); channels.Add(uploadUrl, channel); } if (channels.Count > 0) { SendZoneDevice(channels, properties); SendZoneDeviceHis(channels, properties); } foreach (KeyValuePair item in connections) { IConnection connection = item.Value; connection.Close(); } } /// /// 大表设备添加 /// /// /// private void SendZoneDevice(Dictionary channels, Dictionary properties) { try { String meterId = Constants.MeterId; String sql = "SELECT a.ID,a.名称,a.考核表编码,a.X坐标,a.Y坐标,b.传输协议参数,ISNULL(a.是否阀控,0) AS isValve FROM [设备信息] a left join 传输设备 b on a.传输设备ID = b.ID where a.是否启用 = '是' and a.考核表编码 is not null and a.ID > " + meterId + " order by a.ID"; DataTable dt = dbHelper.Fill(sql); if (dt == null) { log.Info("大表设备同步任务查询报错.................\r\n"); return; } if (dt.Rows.Count == 0) { log.Info("大表设备同步任务,没有需要同步的设备.................\r\n"); return; } log.Info("大表设备同步任务开始执行.................\r\n"); StringBuilder message = new StringBuilder(); for (int i = 0; i < dt.Rows.Count; i++) { message.Clear(); try { DataRow dr = dt.Rows[i]; String iccid = ""; if (!"".Equals(dr["传输协议参数"].ToString())) { iccid = dr["传输协议参数"].ToString().Split(',')[0]; } String lngAndLat = ""; if (!"".Equals(dr["X坐标"].ToString()) && !"".Equals(dr["Y坐标"].ToString())) { lngAndLat = dr["X坐标"].ToString() + "|" + dr["Y坐标"].ToString(); } message.Append("{"); message.Append("\"meterAssessmentName\": \"").Append(dr["名称"]).Append("\","); message.Append("\"iccId\": ").Append(iccid).Append(","); //message.Append("\"areaId\": 22,"); message.Append("\"lngAndLat\": \"").Append(lngAndLat).Append("\","); //message.Append("\"pipeCailber\": \"DN32\","); //message.Append("\"pipeTexture\": \"PVC\","); //message.Append("\"imei\": \"77564212\","); message.Append("\"isPressucre\": 1,"); message.Append("\"isFlow\": 1,"); message.Append("\"isZoneMeter\": 1,"); message.Append("\"isTradeMeter\": 0,"); message.Append("\"isLargeUser\": 0,"); message.Append("\"isLargeUser\": 0,"); message.Append("\"isValve\":").Append(dr["isValve"].ToString()).Append(","); message.Append("\"meterAssessmentCode\": \"").Append(dr["考核表编码"]).Append("\","); message.Append("\"manufacturerCode\": \"").Append(Constants.ManufacturerCode).Append("\","); message.Append("\"meterTypeId\": \"2\""); message.Append("}"); foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("zone.device", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息 } meterId = dr["ID"].ToString(); } catch (Exception ex) { log.Info("大表设备同步任务数据推送失败:" + message.ToString() + "\r\n"); log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n"); } } UpdateAppConfig("MeterId", meterId); log.Info("大表设备同步任务执行结束.................\r\n"); } catch (Exception ex) { log.Error("大表设备同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n"); } } /// /// 大表历史数据 /// /// /// private void SendZoneDeviceHis(Dictionary channels, Dictionary properties) { try { log.Info("大表设备历史数据同步任务开始执行.................\r\n"); String sqlMeter = "SELECT ID,考核表编码,ISNULL(是否阀控,0) AS isValve FROM [设备信息] where 是否启用 = '是' and 考核表编码 is not null order by ID"; DataTable dtMeter = dbHelper.Fill(sqlMeter); //Dictionary arguments = new Dictionary(); //arguments["x-max-length-bytes"] = 2147383648; //arguments["x-overflow"] = "reject-publish"; Dictionary uploadHis = new Dictionary(); using (StreamReader sr = new StreamReader(@"TextFile1.txt")) { String line = ""; while ((line = sr.ReadLine()) != null) { if (!"".Equals(line)) { String[] item = line.Split(','); uploadHis[item[0]] = item[1]; } } } for (int i = 0; i < dtMeter.Rows.Count; i++) { DataRow drMeter = dtMeter.Rows[i]; String meterId = drMeter["ID"].ToString(); String meterCode = drMeter["考核表编码"].ToString(); String meterCodeTemp = meterCode; String lastTime = ""; int isValve = Convert.ToInt32(drMeter["isValve"]); int nowYear = DateTime.Now.Year; int lastYear = uploadHis.ContainsKey(meterCode) ? Convert.ToDateTime(uploadHis[meterCode]).Year : nowYear; for (int k = lastYear; k <= nowYear; k++) { string tablename = "历史记录_" + ("000000" + meterId).Substring(meterId.Length, 6) + "_" + k; // 判断历史记录表是否存在 if (!CheckTableExist(tablename)) { continue; } String sqlMeterHis = "select 记录时间,采集时间,正累计流量,负累计流量,净累计流量,瞬时流量,电池电压,压力,设备状态,通讯状态"; if (isValve == 1) {// 阀控的情况 sqlMeterHis += ",开度,运行模式,手动开度设定值,压力量程设定,常用压力设定上限,步进间隔,常用压力设定下限,步进刻度"; } if(isValve == 2) { // 控阀控泵的情况 sqlMeterHis += ",[运行模式],[阀1开到位延时],[阀2开到位延时],[阀3开到位延时],[真空泵开启时间],[排气间隔],[控制方式],[真空泵故障],[阀1状态],[阀2状态],[阀3状态]"; } if (isValve == 3) { sqlMeterHis += ",阀门状态"; } sqlMeterHis +=" from " + tablename; if (uploadHis.ContainsKey(meterCode)) // 添加是否在线的判断 { sqlMeterHis += " where 设备状态 = '全部正常' AND 通讯状态 = '全部正常' AND 采集时间 > '" + uploadHis[meterCode] + "'"; // sqlMeterHis += " where 采集时间 > '" + uploadHis[meterCode] + "'"; } else { // 添加是否在线的判断 sqlMeterHis += " WHERE 设备状态 = '全部正常' AND 通讯状态 = '全部正常'"; } sqlMeterHis += " order by 采集时间"; DataTable dtMeterHis = dbHelper.Fill(sqlMeterHis); StringBuilder message = new StringBuilder(); for (int j = 0; j < dtMeterHis.Rows.Count; j++) { message.Clear(); try { DataRow drMeterHis = dtMeterHis.Rows[j]; String getDateTime = Convert.ToDateTime(drMeterHis["采集时间"]).ToString("yyyy-MM-dd HH:mm:ss"); if (!(meterId == "625" || meterId == "633")) { message.Append("{"); message.Append("\"meterAssessmentCode\": \"").Append(meterCode).Append("\","); message.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(","); message.Append("\"getDateTime\": \"").Append(getDateTime).Append("\","); if (isValve == 1) { if (Convert.DBNull != drMeterHis["开度"]) { message.Append("\"valveOpening\": ").Append(Convert.ToDecimal(drMeterHis["开度"])).Append(","); } if (Convert.DBNull != drMeterHis["运行模式"]) { message.Append("\"operaOption\": ").Append(Convert.ToDecimal(drMeterHis["运行模式"])).Append(","); } if (Convert.DBNull != drMeterHis["手动开度设定值"]) { message.Append("\"autoOpingVal\": ").Append(Convert.ToDecimal(drMeterHis["手动开度设定值"])).Append(","); } if (Convert.DBNull != drMeterHis["压力量程设定"]) { message.Append("\"pressureRange\": ").Append(Convert.ToDecimal(drMeterHis["压力量程设定"])).Append(","); } if (Convert.DBNull != drMeterHis["常用压力设定上限"]) { message.Append("\"comPressureVal\": ").Append(Convert.ToDecimal(drMeterHis["常用压力设定上限"])).Append(","); } if (Convert.DBNull != drMeterHis["步进间隔"]) { message.Append("\"changeTime\": ").Append(Convert.ToDecimal(drMeterHis["步进间隔"])).Append(","); } if (Convert.DBNull != drMeterHis["常用压力设定下限"]) { message.Append("\"comPressureValDown\": ").Append(Convert.ToDecimal(drMeterHis["常用压力设定下限"])).Append(","); } if (Convert.DBNull != drMeterHis["步进刻度"]) { message.Append("\"stepInterval\": ").Append(Convert.ToDecimal(drMeterHis["步进刻度"])).Append(","); } } if(isValve == 2) { // 控阀控泵 if (Convert.DBNull != drMeterHis["运行模式"]) { message.Append("\"OperaOption\": ").Append(drMeterHis["运行模式"]).Append(","); } if (Convert.DBNull != drMeterHis["阀1开到位延时"]) { message.Append("\"valveOneOpenDelayed\": ").Append(Convert.ToDecimal(drMeterHis["阀1开到位延时"])).Append(","); } if (Convert.DBNull != drMeterHis["阀2开到位延时"]) { message.Append("\"valveTwoOpenDelayed\": ").Append(Convert.ToDecimal(drMeterHis["阀2开到位延时"])).Append(","); } if (Convert.DBNull != drMeterHis["阀3开到位延时"]) { message.Append("\"valveThreeOpenDelayed\": ").Append(Convert.ToDecimal(drMeterHis["阀3开到位延时"])).Append(","); } if (Convert.DBNull != drMeterHis["真空泵开启时间"]) { message.Append("\"vacuumPumpOpenTime\": ").Append(Convert.ToDecimal(drMeterHis["真空泵开启时间"])).Append(","); } if (Convert.DBNull != drMeterHis["排气间隔"]) { message.Append("\"exhaustTime\": ").Append(Convert.ToDecimal(drMeterHis["排气间隔"])).Append(","); } if (Convert.DBNull != drMeterHis["控制方式"]) { message.Append("\"runModule\": \"").Append(drMeterHis["控制方式"]).Append("\","); } if (Convert.DBNull != drMeterHis["真空泵故障"]) { message.Append("\"vacuumPumpFault\": \"").Append(drMeterHis["真空泵故障"]).Append("\","); } if (Convert.DBNull != drMeterHis["阀1状态"]) { message.Append("\"valveOneState\": \"").Append(drMeterHis["阀1状态"]).Append("\","); } if (Convert.DBNull != drMeterHis["阀2状态"]) { message.Append("\"valveTwoState\": \"").Append(drMeterHis["阀2状态"]).Append("\","); } if (Convert.DBNull != drMeterHis["阀3状态"]) { message.Append("\"valveThreeState\": \"").Append(drMeterHis["阀3状态"]).Append("\""); } } if (isValve == 3)// 带阀控的大表 { if (Convert.DBNull != drMeterHis["阀门状态"]) { message.Append("\"valveOneState\": \"").Append(drMeterHis["阀门状态"]).Append("\","); } } if (Convert.DBNull != drMeterHis["净累计流量"]) { message.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["净累计流量"])).Append(","); } if (Convert.DBNull != drMeterHis["正累计流量"]) { message.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["正累计流量"])).Append(","); } if (Convert.DBNull != drMeterHis["负累计流量"]) { message.Append("\"negativeCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["负累计流量"])).Append(","); } if (Convert.DBNull != drMeterHis["瞬时流量"]) { message.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(drMeterHis["瞬时流量"])).Append(","); } if (Convert.DBNull != drMeterHis["压力"]) { message.Append("\"pressure\": ").Append(Convert.ToDecimal(drMeterHis["压力"])).Append(","); } if (Convert.DBNull != drMeterHis["电池电压"]) { message.Append("\"batteryVoltageValue\": ").Append(Convert.ToDecimal(drMeterHis["电池电压"])); } //int isOnline = 0; // 0 不在线 1 在线 //if(drMeterHis["设备状态"] == "全部正常" && drMeterHis["通讯状态"] == "全部正常") //{ // isOnline = 1; //} //message.Append("\"isOnline\": ").Append(isOnline).Append(""); message.Append("}"); } else { if(meterId == "625") { message.Append("{"); message.Append("\"meterAssessmentCode\": \"").Append(meterCode).Append("\","); message.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(","); message.Append("\"getDateTime\": \"").Append(getDateTime).Append("\","); if (Convert.DBNull != drMeterHis["压力"]) { message.Append("\"pressure\": ").Append(Convert.ToDecimal(drMeterHis["压力"])).Append(","); } if (Convert.DBNull != drMeterHis["电池电压"]) { message.Append("\"batteryVoltageValue\": ").Append(Convert.ToDecimal(drMeterHis["电池电压"])); } message.Append("}"); String str = message.ToString(); foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(str)); //生产消息 } message.Clear(); if(meterId == "625") { meterCode = "jingshan015"; } else { continue; } message.Append("{"); message.Append("\"meterAssessmentCode\": \"").Append(meterCode).Append("\","); message.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(","); message.Append("\"getDateTime\": \"").Append(getDateTime).Append("\","); if (Convert.DBNull != drMeterHis["净累计流量"]) { message.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["净累计流量"])).Append(","); } if (Convert.DBNull != drMeterHis["正累计流量"]) { message.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["正累计流量"])).Append(","); } if (Convert.DBNull != drMeterHis["负累计流量"]) { message.Append("\"negativeCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["负累计流量"])).Append(","); } if (Convert.DBNull != drMeterHis["瞬时流量"]) { message.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(drMeterHis["瞬时流量"])).Append(","); } if (Convert.DBNull != drMeterHis["电池电压"]) { message.Append("\"batteryVoltageValue\": ").Append(Convert.ToDecimal(drMeterHis["电池电压"])); } message.Append("}"); } else if(meterId == "633") { continue; } } foreach (KeyValuePair item in channels) { string key = item.Key; IModel channel = item.Value; IBasicProperties property = properties[key]; channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息 } lastTime = getDateTime; meterCode = meterCodeTemp; } catch (Exception ex) { log.Info("大表设备历史记录同步任务数据推送失败:" + message.ToString() + "\r\n"); log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n"); } } } if (!"".Equals(lastTime)) { uploadHis[meterCode] = lastTime; uploadHis[meterCodeTemp] = lastTime; } } SavaUploadHis(uploadHis); log.Info("大表设备历史记录同步任务执行结束.................\r\n"); } catch (Exception ex) { log.Error("大表设备历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n"); } } /// /// 更新配置文件中的值 /// /// 键 /// 值 private void UpdateAppConfig(String key, String value) { var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None); cfg.AppSettings.Settings[key].Value = value; cfg.Save(); ConfigurationManager.RefreshSection("appSettings"); } /// /// 判断历史记录表是否存在 /// /// /// private bool CheckTableExist(string tablename) { DataTable table = dbHelper.Fill("select top 1 * from sysobjects where name='" + tablename + "' and xtype='u'"); if (table == null || table.Rows.Count == 0) { return false; } return true; } /// /// 保存每块块表的上传最后一条历史记录 /// /// private void SavaUploadHis(Dictionary uploadHis) { // 清除之前的内容 FileStream stream = File.Open(@"TextFile1.txt", FileMode.OpenOrCreate, FileAccess.Write); stream.Seek(0, SeekOrigin.Begin); stream.SetLength(0); stream.Close(); using (StreamWriter sw = new StreamWriter(@"TextFile1.txt")) { foreach (var item in uploadHis) { sw.WriteLine(item.Key + "," + item.Value); } } } static IDbProvider dbHelper { get { var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.DbConncetion); return DbDefine; } } } }