123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515 |
- using log4net;
- using Quartz;
- using RabbitMQ.Client;
- using RDIFramework.Utilities;
- using System;
- using System.Collections.Generic;
- using System.Configuration;
- using System.Data;
- using System.IO;
- using System.Linq;
- using System.Text;
- namespace TimedUpload.QuartzJobs
- {
- class jingshanDataUploadDataJob : IJob
- {
- private readonly ILog log = LogManager.GetLogger(typeof(DataUploadJob));
- public void Execute(IJobExecutionContext context)
- {
- string[] uploadUrls = Constants.UploadUrl.Split('|');
- Dictionary<string, IConnection> connections = new Dictionary<string, IConnection>();
- Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
- Dictionary<string, IBasicProperties> properties = new Dictionary<string, IBasicProperties>();
- foreach (string uploadUrl in uploadUrls)
- {
- ConnectionFactory factory = new ConnectionFactory();
- factory.HostName = uploadUrl;//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
- factory.UserName = Constants.UploadUserName;//默认用户名,用户可以在服务端自定义创建,有相关命令行
- factory.Password = Constants.UploadPassword;//默认密码
- factory.AutomaticRecoveryEnabled = true; // 链接断开会自动重连
- IConnection connection = factory.CreateConnection();
- IModel channel = connection.CreateModel();
- channel.QueueDeclare("zone.device", true, false, false, null);//创建一个名称为kibaqueue的消息队列
- channel.QueueDeclare("zone.deviceHis", true, false, false, null);//创建一个名称为kibaqueue的消息队列
- IBasicProperties property = channel.CreateBasicProperties();
- property.ContentType = "text/plain";
- property.DeliveryMode = 2; //持久化
- connections.Add(uploadUrl, connection);
- properties.Add(uploadUrl, property);
- channels.Add(uploadUrl, channel);
- }
- if (channels.Count > 0)
- {
- SendZoneDevice(channels, properties);
- SendZoneDeviceHis(channels, properties);
- }
- foreach (KeyValuePair<string, IConnection> item in connections)
- {
- IConnection connection = item.Value;
- connection.Close();
- }
- }
- /// <summary>
- /// 大表设备添加
- /// </summary>
- /// <param name="channels"></param>
- /// <param name="properties"></param>
- private void SendZoneDevice(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
- {
- try
- {
- String meterId = Constants.MeterId;
- String sql = "SELECT a.ID,a.名称,a.考核表编码,a.X坐标,a.Y坐标,b.传输协议参数,ISNULL(a.是否阀控,0) AS isValve FROM [设备信息] a left join 传输设备 b on a.传输设备ID = b.ID where a.是否启用 = '是' and a.考核表编码 is not null and a.ID > " + meterId + " order by a.ID";
- DataTable dt = dbHelper.Fill(sql);
- if (dt == null)
- {
- log.Info("大表设备同步任务查询报错.................\r\n");
- return;
- }
- if (dt.Rows.Count == 0)
- {
- log.Info("大表设备同步任务,没有需要同步的设备.................\r\n");
- return;
- }
- log.Info("大表设备同步任务开始执行.................\r\n");
- StringBuilder message = new StringBuilder();
- for (int i = 0; i < dt.Rows.Count; i++)
- {
- message.Clear();
- try
- {
- DataRow dr = dt.Rows[i];
- String iccid = "";
- if (!"".Equals(dr["传输协议参数"].ToString()))
- {
- iccid = dr["传输协议参数"].ToString().Split(',')[0];
- }
- String lngAndLat = "";
- if (!"".Equals(dr["X坐标"].ToString()) && !"".Equals(dr["Y坐标"].ToString()))
- {
- lngAndLat = dr["X坐标"].ToString() + "|" + dr["Y坐标"].ToString();
- }
- message.Append("{");
- message.Append("\"meterAssessmentName\": \"").Append(dr["名称"]).Append("\",");
- message.Append("\"iccId\": ").Append(iccid).Append(",");
- //message.Append("\"areaId\": 22,");
- message.Append("\"lngAndLat\": \"").Append(lngAndLat).Append("\",");
- //message.Append("\"pipeCailber\": \"DN32\",");
- //message.Append("\"pipeTexture\": \"PVC\",");
- //message.Append("\"imei\": \"77564212\",");
- message.Append("\"isPressucre\": 1,");
- message.Append("\"isFlow\": 1,");
- message.Append("\"isZoneMeter\": 1,");
- message.Append("\"isTradeMeter\": 0,");
- message.Append("\"isLargeUser\": 0,");
- message.Append("\"isLargeUser\": 0,");
- message.Append("\"isValve\":").Append(dr["isValve"].ToString()).Append(",");
- message.Append("\"meterAssessmentCode\": \"").Append(dr["考核表编码"]).Append("\",");
- message.Append("\"manufacturerCode\": \"").Append(Constants.ManufacturerCode).Append("\",");
- message.Append("\"meterTypeId\": \"2\"");
- message.Append("}");
- foreach (KeyValuePair<string, IModel> item in channels)
- {
- string key = item.Key;
- IModel channel = item.Value;
- IBasicProperties property = properties[key];
- channel.BasicPublish("zone.device", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息
- }
- meterId = dr["ID"].ToString();
- }
- catch (Exception ex)
- {
- log.Info("大表设备同步任务数据推送失败:" + message.ToString() + "\r\n");
- log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n");
- }
- }
- UpdateAppConfig("MeterId", meterId);
- log.Info("大表设备同步任务执行结束.................\r\n");
- }
- catch (Exception ex)
- {
- log.Error("大表设备同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
- }
- }
- /// <summary>
- /// 大表历史数据
- /// </summary>
- /// <param name="channels"></param>
- /// <param name="properties"></param>
- private void SendZoneDeviceHis(Dictionary<string, IModel> channels, Dictionary<string, IBasicProperties> properties)
- {
- try
- {
- log.Info("大表设备历史数据同步任务开始执行.................\r\n");
- String sqlMeter = "SELECT ID,考核表编码,ISNULL(是否阀控,0) AS isValve FROM [设备信息] where 是否启用 = '是' and 考核表编码 is not null order by ID";
- DataTable dtMeter = dbHelper.Fill(sqlMeter);
- //Dictionary<string, object> arguments = new Dictionary<string, object>();
- //arguments["x-max-length-bytes"] = 2147383648;
- //arguments["x-overflow"] = "reject-publish";
- Dictionary<String, String> uploadHis = new Dictionary<string, string>();
- using (StreamReader sr = new StreamReader(@"TextFile1.txt"))
- {
- String line = "";
- while ((line = sr.ReadLine()) != null)
- {
- if (!"".Equals(line))
- {
- String[] item = line.Split(',');
- uploadHis[item[0]] = item[1];
- }
- }
- }
- for (int i = 0; i < dtMeter.Rows.Count; i++)
- {
- DataRow drMeter = dtMeter.Rows[i];
- String meterId = drMeter["ID"].ToString();
- String meterCode = drMeter["考核表编码"].ToString();
- String meterCodeTemp = meterCode;
- String lastTime = "";
- int isValve = Convert.ToInt32(drMeter["isValve"]);
- int nowYear = DateTime.Now.Year;
- int lastYear = uploadHis.ContainsKey(meterCode) ? Convert.ToDateTime(uploadHis[meterCode]).Year : nowYear;
- for (int k = lastYear; k <= nowYear; k++)
- {
- string tablename = "历史记录_" + ("000000" + meterId).Substring(meterId.Length, 6) + "_" + k;
- // 判断历史记录表是否存在
- if (!CheckTableExist(tablename))
- {
- continue;
- }
- String sqlMeterHis = "select 记录时间,采集时间,正累计流量,负累计流量,净累计流量,瞬时流量,电池电压,压力,设备状态,通讯状态";
- if (isValve == 1) {// 阀控的情况
- sqlMeterHis += ",开度,运行模式,手动开度设定值,压力量程设定,常用压力设定上限,步进间隔,常用压力设定下限,步进刻度";
- }
- if(isValve == 2) { // 控阀控泵的情况
- sqlMeterHis += ",[运行模式],[阀1开到位延时],[阀2开到位延时],[阀3开到位延时],[真空泵开启时间],[排气间隔],[控制方式],[真空泵故障],[阀1状态],[阀2状态],[阀3状态]";
- }
- sqlMeterHis +=" from " + tablename;
- if (uploadHis.ContainsKey(meterCode)) // 添加是否在线的判断
- {
- sqlMeterHis += " where 设备状态 = '全部正常' AND 通讯状态 = '全部正常' AND 采集时间 > '" + uploadHis[meterCode] + "'";
- // sqlMeterHis += " where 采集时间 > '" + uploadHis[meterCode] + "'";
- }
- else { // 添加是否在线的判断
- sqlMeterHis += " WHERE 设备状态 = '全部正常' AND 通讯状态 = '全部正常'";
- }
- sqlMeterHis += " order by 采集时间";
- DataTable dtMeterHis = dbHelper.Fill(sqlMeterHis);
- StringBuilder message = new StringBuilder();
- for (int j = 0; j < dtMeterHis.Rows.Count; j++)
- {
- message.Clear();
- try
- {
- DataRow drMeterHis = dtMeterHis.Rows[j];
- String getDateTime = Convert.ToDateTime(drMeterHis["采集时间"]).ToString("yyyy-MM-dd HH:mm:ss");
- if (!(meterId == "625" || meterId == "633"))
- {
- message.Append("{");
- message.Append("\"meterAssessmentCode\": \"").Append(meterCode).Append("\",");
- message.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(",");
- message.Append("\"getDateTime\": \"").Append(getDateTime).Append("\",");
- if (isValve == 1)
- {
- if (Convert.DBNull != drMeterHis["开度"])
- {
- message.Append("\"valveOpening\": ").Append(Convert.ToDecimal(drMeterHis["开度"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["运行模式"])
- {
- message.Append("\"operaOption\": ").Append(Convert.ToDecimal(drMeterHis["运行模式"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["手动开度设定值"])
- {
- message.Append("\"autoOpingVal\": ").Append(Convert.ToDecimal(drMeterHis["手动开度设定值"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["压力量程设定"])
- {
- message.Append("\"pressureRange\": ").Append(Convert.ToDecimal(drMeterHis["压力量程设定"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["常用压力设定上限"])
- {
- message.Append("\"comPressureVal\": ").Append(Convert.ToDecimal(drMeterHis["常用压力设定上限"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["步进间隔"])
- {
- message.Append("\"changeTime\": ").Append(Convert.ToDecimal(drMeterHis["步进间隔"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["常用压力设定下限"])
- {
- message.Append("\"comPressureValDown\": ").Append(Convert.ToDecimal(drMeterHis["常用压力设定下限"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["步进刻度"])
- {
- message.Append("\"stepInterval\": ").Append(Convert.ToDecimal(drMeterHis["步进刻度"])).Append(",");
- }
- }
- if(isValve == 2) { // 控阀控泵
- if (Convert.DBNull != drMeterHis["运行模式"])
- {
- message.Append("\"OperaOption\": ").Append(drMeterHis["运行模式"]).Append(",");
- }
- if (Convert.DBNull != drMeterHis["阀1开到位延时"])
- {
- message.Append("\"valveOneOpenDelayed\": ").Append(Convert.ToDecimal(drMeterHis["阀1开到位延时"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["阀2开到位延时"])
- {
- message.Append("\"valveTwoOpenDelayed\": ").Append(Convert.ToDecimal(drMeterHis["阀2开到位延时"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["阀3开到位延时"])
- {
- message.Append("\"valveThreeOpenDelayed\": ").Append(Convert.ToDecimal(drMeterHis["阀3开到位延时"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["真空泵开启时间"])
- {
- message.Append("\"vacuumPumpOpenTime\": ").Append(Convert.ToDecimal(drMeterHis["真空泵开启时间"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["排气间隔"])
- {
- message.Append("\"exhaustTime\": ").Append(Convert.ToDecimal(drMeterHis["排气间隔"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["控制方式"])
- {
- message.Append("\"runModule\": \"").Append(drMeterHis["控制方式"]).Append("\",");
- }
- if (Convert.DBNull != drMeterHis["真空泵故障"])
- {
- message.Append("\"vacuumPumpFault\": \"").Append(drMeterHis["真空泵故障"]).Append("\",");
- }
- if (Convert.DBNull != drMeterHis["阀1状态"])
- {
- message.Append("\"valveOneState\": \"").Append(drMeterHis["阀1状态"]).Append("\",");
- }
- if (Convert.DBNull != drMeterHis["阀2状态"])
- {
- message.Append("\"valveTwoState\": \"").Append(drMeterHis["阀2状态"]).Append("\",");
- }
- if (Convert.DBNull != drMeterHis["阀3状态"])
- {
- message.Append("\"valveThreeState\": \"").Append(drMeterHis["阀3状态"]).Append("\"");
- }
- }
- if (Convert.DBNull != drMeterHis["净累计流量"])
- {
- message.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["净累计流量"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["正累计流量"])
- {
- message.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["正累计流量"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["负累计流量"])
- {
- message.Append("\"negativeCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["负累计流量"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["瞬时流量"])
- {
- message.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(drMeterHis["瞬时流量"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["压力"])
- {
- message.Append("\"pressure\": ").Append(Convert.ToDecimal(drMeterHis["压力"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["电池电压"])
- {
- message.Append("\"batteryVoltageValue\": ").Append(Convert.ToDecimal(drMeterHis["电池电压"]));
- }
- //int isOnline = 0; // 0 不在线 1 在线
- //if(drMeterHis["设备状态"] == "全部正常" && drMeterHis["通讯状态"] == "全部正常")
- //{
- // isOnline = 1;
- //}
- //message.Append("\"isOnline\": ").Append(isOnline).Append("");
-
- message.Append("}");
- }
- else {
- if(meterId == "625") {
- message.Append("{");
- message.Append("\"meterAssessmentCode\": \"").Append(meterCode).Append("\",");
- message.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(",");
- message.Append("\"getDateTime\": \"").Append(getDateTime).Append("\",");
- if (Convert.DBNull != drMeterHis["压力"])
- {
- message.Append("\"pressure\": ").Append(Convert.ToDecimal(drMeterHis["压力"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["电池电压"])
- {
- message.Append("\"batteryVoltageValue\": ").Append(Convert.ToDecimal(drMeterHis["电池电压"]));
- }
-
-
- message.Append("}");
- String str = message.ToString();
- foreach (KeyValuePair<string, IModel> item in channels)
- {
- string key = item.Key;
- IModel channel = item.Value;
- IBasicProperties property = properties[key];
- channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(str)); //生产消息
- }
- message.Clear();
- if(meterId == "625") {
- meterCode = "jingshan015";
- }
- else {
- continue;
- }
- message.Append("{");
- message.Append("\"meterAssessmentCode\": \"").Append(meterCode).Append("\",");
- message.Append("\"manufacturerCode\": ").Append(Constants.ManufacturerCode).Append(",");
- message.Append("\"getDateTime\": \"").Append(getDateTime).Append("\",");
- if (Convert.DBNull != drMeterHis["净累计流量"])
- {
- message.Append("\"netCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["净累计流量"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["正累计流量"])
- {
- message.Append("\"positiveCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["正累计流量"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["负累计流量"])
- {
- message.Append("\"negativeCumulativeFlow\": ").Append(Convert.ToDecimal(drMeterHis["负累计流量"])).Append(",");
- }
- if (Convert.DBNull != drMeterHis["瞬时流量"])
- {
- message.Append("\"instantaneousFlow\": ").Append(Convert.ToDecimal(drMeterHis["瞬时流量"])).Append(",");
- }
-
- if (Convert.DBNull != drMeterHis["电池电压"])
- {
- message.Append("\"batteryVoltageValue\": ").Append(Convert.ToDecimal(drMeterHis["电池电压"]));
- }
-
-
- message.Append("}");
- }
- else if(meterId == "633") {
- continue;
- }
- }
- foreach (KeyValuePair<string, IModel> item in channels)
- {
- string key = item.Key;
- IModel channel = item.Value;
- IBasicProperties property = properties[key];
- channel.BasicPublish("zone.deviceHis", "", property, Encoding.UTF8.GetBytes(message.ToString())); //生产消息
- }
- lastTime = getDateTime;
- meterCode = meterCodeTemp;
- }
- catch (Exception ex)
- {
- log.Info("大表设备历史记录同步任务数据推送失败:" + message.ToString() + "\r\n");
- log.Error(ex.Message + "===========" + ex.StackTrace + "\r\n");
- }
- }
- }
- if (!"".Equals(lastTime))
- {
- uploadHis[meterCode] = lastTime;
- uploadHis[meterCodeTemp] = lastTime;
- }
- }
- SavaUploadHis(uploadHis);
- log.Info("大表设备历史记录同步任务执行结束.................\r\n");
- }
- catch (Exception ex)
- {
- log.Error("大表设备历史记录同步任务执行错误:" + ex.Message + "===========" + ex.StackTrace + "\r\n");
- }
- }
- /// <summary>
- /// 更新配置文件中的值
- /// </summary>
- /// <param name="key">键</param>
- /// <param name="value">值</param>
- private void UpdateAppConfig(String key, String value)
- {
- var cfg = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
- cfg.AppSettings.Settings[key].Value = value;
- cfg.Save();
- ConfigurationManager.RefreshSection("appSettings");
- }
- /// <summary>
- /// 判断历史记录表是否存在
- /// </summary>
- /// <param name="tablename"></param>
- /// <returns></returns>
- private bool CheckTableExist(string tablename)
- {
- DataTable table = dbHelper.Fill("select top 1 * from sysobjects where name='" + tablename + "' and xtype='u'");
- if (table == null || table.Rows.Count == 0)
- {
- return false;
- }
- return true;
- }
- /// <summary>
- /// 保存每块块表的上传最后一条历史记录
- /// </summary>
- /// <param name="uploadHis"></param>
- private void SavaUploadHis(Dictionary<String, String> uploadHis)
- {
- // 清除之前的内容
- FileStream stream = File.Open(@"TextFile1.txt", FileMode.OpenOrCreate, FileAccess.Write);
- stream.Seek(0, SeekOrigin.Begin);
- stream.SetLength(0);
- stream.Close();
- using (StreamWriter sw = new StreamWriter(@"TextFile1.txt"))
- {
- foreach (var item in uploadHis)
- {
- sw.WriteLine(item.Key + "," + item.Value);
- }
- }
- }
- static IDbProvider dbHelper
- {
- get
- {
- var DbDefine = DbFactoryProvider.GetProvider(CurrentDbType.SqlServer, Constants.DbConncetion);
- return DbDefine;
- }
- }
- }
- }
|