Added alert dismiss when sensor starts responding

This commit is contained in:
Ziver Koc 2018-12-03 19:32:37 +01:00
parent 6f6a764f97
commit b3c2d195b5
2 changed files with 185 additions and 170 deletions

View file

@ -19,6 +19,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -26,19 +27,22 @@ import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
public class SensorDataAggregatorDaemon implements HalDaemon { public class SensorDataAggregatorDaemon implements HalDaemon {
private static final Logger logger = LogUtil.getLogger(); private static final Logger logger = LogUtil.getLogger();
public enum AggregationPeriodLength{ public enum AggregationPeriodLength{
SECOND, SECOND,
MINUTE, MINUTE,
FIVE_MINUTES, FIVE_MINUTES,
FIFTEEN_MINUTES, FIFTEEN_MINUTES,
HOUR, HOUR,
DAY, DAY,
WEEK, WEEK,
MONTH, MONTH,
YEAR YEAR
} }
private HashMap<Long, HalAlert> alertMap = new HashMap<>();
public void initiate(ScheduledExecutorService executor){ public void initiate(ScheduledExecutorService executor){
executor.scheduleAtFixedRate(this, 0, UTCTimeUtility.FIVE_MINUTES_IN_MS, TimeUnit.MILLISECONDS); executor.scheduleAtFixedRate(this, 0, UTCTimeUtility.FIVE_MINUTES_IN_MS, TimeUnit.MILLISECONDS);
@ -46,59 +50,59 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
@Override @Override
public void run(){ public void run(){
try { try {
List<Sensor> sensorList = Sensor.getLocalSensors(HalContext.getDB()); List<Sensor> sensorList = Sensor.getLocalSensors(HalContext.getDB());
for(Sensor sensor : sensorList){ for(Sensor sensor : sensorList){
logger.fine("Aggregating sensor_id: " + sensor.getId()); logger.fine("Aggregating sensor_id: " + sensor.getId());
aggregateSensor(sensor); aggregateSensor(sensor);
} }
logger.fine("Aggregation Done"); logger.fine("Aggregation Done");
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "Thread has crashed", e); logger.log(Level.SEVERE, "Thread has crashed", e);
} }
} }
public void aggregateSensor(Sensor sensor) { public void aggregateSensor(Sensor sensor) {
if(sensor.getDeviceConfig() == null){ if(sensor.getDeviceConfig() == null){
logger.fine("The sensor config is not available - ignoring it"); logger.fine("The sensor config is not available - ignoring it");
return; return;
} }
logger.fine("The sensor is of type: " + sensor.getDeviceConfig().getClass().getName()); logger.fine("The sensor is of type: " + sensor.getDeviceConfig().getClass().getName());
long aggregationStartTime = System.currentTimeMillis(); long aggregationStartTime = System.currentTimeMillis();
logger.fine("Aggregating raw data up to a day old into five minute periods"); logger.fine("Aggregating raw data up to a day old into five minute periods");
aggregateRawData(sensor, AggregationPeriodLength.FIVE_MINUTES, UTCTimeUtility.DAY_IN_MS, 5, aggregationStartTime); aggregateRawData(sensor, AggregationPeriodLength.FIVE_MINUTES, UTCTimeUtility.DAY_IN_MS, 5, aggregationStartTime);
logger.fine("Aggregating raw data up to a week old into one hour periods"); logger.fine("Aggregating raw data up to a week old into one hour periods");
aggregateRawData(sensor, AggregationPeriodLength.HOUR, UTCTimeUtility.WEEK_IN_MS, 60, aggregationStartTime); aggregateRawData(sensor, AggregationPeriodLength.HOUR, UTCTimeUtility.WEEK_IN_MS, 60, aggregationStartTime);
logger.fine("Aggregating raw data into one day periods"); logger.fine("Aggregating raw data into one day periods");
aggregateRawData(sensor, AggregationPeriodLength.DAY, UTCTimeUtility.INFINITY, 60*24, aggregationStartTime); aggregateRawData(sensor, AggregationPeriodLength.DAY, UTCTimeUtility.INFINITY, 60*24, aggregationStartTime);
logger.fine("Aggregating raw data into one week periods"); logger.fine("Aggregating raw data into one week periods");
aggregateRawData(sensor, AggregationPeriodLength.WEEK, UTCTimeUtility.INFINITY, 60*24*7, aggregationStartTime); aggregateRawData(sensor, AggregationPeriodLength.WEEK, UTCTimeUtility.INFINITY, 60*24*7, aggregationStartTime);
} }
/** /**
* Aggregate data from the raw DB table to the aggregated table * Aggregate data from the raw DB table to the aggregated table
* @param sensor The sensor for to aggregate data * @param sensor The sensor for to aggregate data
* @param ageLimitInMs Only aggregate up to this age * @param ageLimitInMs Only aggregate up to this age
*/ */
private void aggregateRawData(Sensor sensor, AggregationPeriodLength aggrPeriodLength, long ageLimitInMs, int expectedSampleCount, long aggregationStartTime){ private void aggregateRawData(Sensor sensor, AggregationPeriodLength aggrPeriodLength, long ageLimitInMs, int expectedSampleCount, long aggregationStartTime){
long sensorId = sensor.getId(); long sensorId = sensor.getId();
AggregationMethod aggrMethod = sensor.getDeviceConfig().getAggregationMethod(); AggregationMethod aggrMethod = sensor.getDeviceConfig().getAggregationMethod();
DBConnection db = HalContext.getDB(); DBConnection db = HalContext.getDB();
PreparedStatement stmt; PreparedStatement stmt;
try { try {
// DB timestamps // DB timestamps
long dbMaxRawTimestamp = getLatestRawTimestamp(db, sensor); long dbMaxRawTimestamp = getLatestRawTimestamp(db, sensor);
long dbMaxAggrEndTimestamp = getLatestAggrEndTimestamp(db, sensor, aggrPeriodLength); long dbMaxAggrEndTimestamp = getLatestAggrEndTimestamp(db, sensor, aggrPeriodLength);
// Periods // Periods
long periodLatestEndTimestamp = new UTCTimePeriod(aggregationStartTime, aggrPeriodLength).getPreviosPeriod().getEndTimestamp(); long periodLatestEndTimestamp = new UTCTimePeriod(aggregationStartTime, aggrPeriodLength).getPreviosPeriod().getEndTimestamp();
long periodOldestStartTimestamp = new UTCTimePeriod(aggregationStartTime-ageLimitInMs, aggrPeriodLength).getStartTimestamp(); long periodOldestStartTimestamp = new UTCTimePeriod(aggregationStartTime-ageLimitInMs, aggrPeriodLength).getStartTimestamp();
// Check if the sensor has stopped responding for 15 min or 3 times the data interval and alert the user // Check if the sensor has stopped responding for 15 min or 3 times the data interval if so alert the user
if (aggrPeriodLength == AggregationPeriodLength.FIVE_MINUTES) { if (aggrPeriodLength == AggregationPeriodLength.FIVE_MINUTES) {
long dataInterval = sensor.getDeviceConfig().getDataInterval(); long dataInterval = sensor.getDeviceConfig().getDataInterval();
if (dataInterval < UTCTimeUtility.FIVE_MINUTES_IN_MS) if (dataInterval < UTCTimeUtility.FIVE_MINUTES_IN_MS)
@ -106,39 +110,50 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
if (dbMaxRawTimestamp > 0 && if (dbMaxRawTimestamp > 0 &&
dbMaxRawTimestamp + (dataInterval * 3) < System.currentTimeMillis()) { dbMaxRawTimestamp + (dataInterval * 3) < System.currentTimeMillis()) {
logger.fine("Sensor \"" + sensorId + "\" stopped sending data at: "+ dbMaxRawTimestamp); logger.fine("Sensor \"" + sensorId + "\" stopped sending data at: "+ dbMaxRawTimestamp);
HalAlertManager.getInstance().addAlert(new HalAlert(AlertLevel.WARNING,
if (alertMap.containsKey(sensor.getId()))
alertMap.get(sensor.getId()).dismiss();
HalAlert alert = new HalAlert(AlertLevel.WARNING,
"Sensor \"" + sensor.getName() + "\" stopped responding", "Sensor \"" + sensor.getName() + "\" stopped responding",
"at <span class=\"timestamp\">"+dbMaxRawTimestamp+"</span>", "at <span class=\"timestamp\">"+dbMaxRawTimestamp+"</span>",
AlertTTL.DISMISSED)); AlertTTL.DISMISSED);
alertMap.put(sensor.getId(), alert);
HalAlertManager.getInstance().addAlert(alert);
}
else {
// Sensor has responded remove alert
if (alertMap.containsKey(sensor.getId()))
alertMap.get(sensor.getId()).dismiss();
} }
} }
// Is there any new data to evaluate? // Is there any new data to evaluate?
if(dbMaxRawTimestamp < dbMaxAggrEndTimestamp || dbMaxRawTimestamp < periodOldestStartTimestamp){ if(dbMaxRawTimestamp < dbMaxAggrEndTimestamp || dbMaxRawTimestamp < periodOldestStartTimestamp){
logger.fine("No new data to evaluate - aggregation is up to date"); logger.fine("No new data to evaluate - aggregation is up to date");
return; return;
} }
// Start processing // Start processing
logger.fine("evaluating period: "+ logger.fine("evaluating period: "+
(dbMaxAggrEndTimestamp+1) + "=>" + periodLatestEndTimestamp + (dbMaxAggrEndTimestamp+1) + "=>" + periodLatestEndTimestamp +
" (" + UTCTimeUtility.getDateString(dbMaxAggrEndTimestamp+1) + "=>" + " (" + UTCTimeUtility.getDateString(dbMaxAggrEndTimestamp+1) + "=>" +
UTCTimeUtility.getDateString(periodLatestEndTimestamp) + ") " + UTCTimeUtility.getDateString(periodLatestEndTimestamp) + ") " +
"with expected sample count: " + expectedSampleCount); "with expected sample count: " + expectedSampleCount);
stmt = db.getPreparedStatement("SELECT *, 1 AS confidence FROM sensor_data_raw" stmt = db.getPreparedStatement("SELECT *, 1 AS confidence FROM sensor_data_raw"
+" WHERE sensor_id == ?" +" WHERE sensor_id == ?"
+ " AND timestamp > ?" + " AND timestamp > ?"
+ " AND timestamp <= ? " + " AND timestamp <= ? "
+ " AND timestamp >= ? " + " AND timestamp >= ? "
+" ORDER BY timestamp ASC"); +" ORDER BY timestamp ASC");
stmt.setLong(1, sensorId); stmt.setLong(1, sensorId);
stmt.setLong(2, dbMaxAggrEndTimestamp); stmt.setLong(2, dbMaxAggrEndTimestamp);
stmt.setLong(3, periodLatestEndTimestamp); stmt.setLong(3, periodLatestEndTimestamp);
stmt.setLong(4, periodOldestStartTimestamp); stmt.setLong(4, periodOldestStartTimestamp);
DBConnection.exec(stmt, new DataAggregator(sensorId, aggrPeriodLength, expectedSampleCount, aggrMethod, aggregationStartTime)); DBConnection.exec(stmt, new DataAggregator(sensorId, aggrPeriodLength, expectedSampleCount, aggrMethod, aggregationStartTime));
} catch (SQLException e) { } catch (SQLException e) {
logger.log(Level.SEVERE, null, e); logger.log(Level.SEVERE, null, e);
} }
} }
private long getLatestAggrEndTimestamp(DBConnection db, Sensor sensor, AggregationPeriodLength aggrPeriodLength) throws SQLException { private long getLatestAggrEndTimestamp(DBConnection db, Sensor sensor, AggregationPeriodLength aggrPeriodLength) throws SQLException {
@ -173,106 +188,106 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
* Internal class for aggregating data to the aggregated DB table * Internal class for aggregating data to the aggregated DB table
*/ */
private class DataAggregator implements SQLResultHandler<Object>{ private class DataAggregator implements SQLResultHandler<Object>{
private final long sensorId; private final long sensorId;
private final AggregationPeriodLength aggrPeriodLength; private final AggregationPeriodLength aggrPeriodLength;
private final int expectedSampleCount; private final int expectedSampleCount;
private final AggregationMethod aggrMethod; private final AggregationMethod aggrMethod;
private final long aggregationStartTime; private final long aggregationStartTime;
public DataAggregator(long sensorId, AggregationPeriodLength aggrPeriodLength, int expectedSampleCount, AggregationMethod aggrMethod, long aggregationStartTime) { public DataAggregator(long sensorId, AggregationPeriodLength aggrPeriodLength, int expectedSampleCount, AggregationMethod aggrMethod, long aggregationStartTime) {
this.sensorId = sensorId; this.sensorId = sensorId;
this.aggrPeriodLength = aggrPeriodLength; this.aggrPeriodLength = aggrPeriodLength;
this.expectedSampleCount = expectedSampleCount; this.expectedSampleCount = expectedSampleCount;
this.aggrMethod = aggrMethod; this.aggrMethod = aggrMethod;
this.aggregationStartTime = aggregationStartTime; this.aggregationStartTime = aggregationStartTime;
} }
@Override @Override
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException { public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
try{ try{
HalContext.getDB().getConnection().setAutoCommit(false); HalContext.getDB().getConnection().setAutoCommit(false);
UTCTimePeriod currentPeriod = null; UTCTimePeriod currentPeriod = null;
float sum = 0; float sum = 0;
float confidenceSum = 0; float confidenceSum = 0;
int samples = 0; int samples = 0;
long highestSequenceId = Sensor.getHighestSequenceId(sensorId); long highestSequenceId = Sensor.getHighestSequenceId(sensorId);
PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement( PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement(
"INSERT INTO sensor_data_aggr" + "INSERT INTO sensor_data_aggr" +
"(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) " + "(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) " +
"VALUES(?, ?, ?, ?, ?, ?)"); "VALUES(?, ?, ?, ?, ?, ?)");
while(result.next()){ while(result.next()){
if(sensorId != result.getInt("sensor_id")){ if(sensorId != result.getInt("sensor_id")){
throw new IllegalArgumentException("found entry for aggregation for the wrong sensorId " + throw new IllegalArgumentException("found entry for aggregation for the wrong sensorId " +
"(expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")"); "(expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")");
} }
long timestamp = result.getLong("timestamp"); long timestamp = result.getLong("timestamp");
UTCTimePeriod dataPeriod = new UTCTimePeriod(timestamp, this.aggrPeriodLength); UTCTimePeriod dataPeriod = new UTCTimePeriod(timestamp, this.aggrPeriodLength);
if(currentPeriod == null) if(currentPeriod == null)
currentPeriod = dataPeriod; currentPeriod = dataPeriod;
if(!dataPeriod.equals(currentPeriod)){ if(!dataPeriod.equals(currentPeriod)){
saveData(preparedInsertStmt, confidenceSum, sum, samples, currentPeriod, ++highestSequenceId); saveData(preparedInsertStmt, confidenceSum, sum, samples, currentPeriod, ++highestSequenceId);
// Reset variables // Reset variables
currentPeriod = dataPeriod; currentPeriod = dataPeriod;
confidenceSum = 0; confidenceSum = 0;
sum = 0; sum = 0;
samples = 0; samples = 0;
} }
sum += result.getFloat("data"); sum += result.getFloat("data");
confidenceSum += result.getFloat("confidence"); confidenceSum += result.getFloat("confidence");
++samples; ++samples;
} }
//check if the last period is complete and also should be aggregated //check if the last period is complete and also should be aggregated
if(currentPeriod != null && if(currentPeriod != null &&
currentPeriod.getEndTimestamp() <= new UTCTimePeriod(aggregationStartTime, aggrPeriodLength).getPreviosPeriod().getEndTimestamp()){ currentPeriod.getEndTimestamp() <= new UTCTimePeriod(aggregationStartTime, aggrPeriodLength).getPreviosPeriod().getEndTimestamp()){
saveData(preparedInsertStmt, confidenceSum, sum, samples, currentPeriod, ++highestSequenceId); saveData(preparedInsertStmt, confidenceSum, sum, samples, currentPeriod, ++highestSequenceId);
} }
DBConnection.execBatch(preparedInsertStmt); DBConnection.execBatch(preparedInsertStmt);
HalContext.getDB().getConnection().commit(); HalContext.getDB().getConnection().commit();
}catch(Exception e){ }catch(Exception e){
HalContext.getDB().getConnection().rollback(); HalContext.getDB().getConnection().rollback();
throw e; throw e;
}finally{ }finally{
HalContext.getDB().getConnection().setAutoCommit(true); HalContext.getDB().getConnection().setAutoCommit(true);
} }
return null; return null;
} }
private void saveData(PreparedStatement preparedInsertStmt, float confidenceSum, float sum, int samples, UTCTimePeriod currentPeriod, long sequenceId) throws SQLException{ private void saveData(PreparedStatement preparedInsertStmt, float confidenceSum, float sum, int samples, UTCTimePeriod currentPeriod, long sequenceId) throws SQLException{
float aggrConfidence = -1; float aggrConfidence = -1;
float data = -1; float data = -1;
switch(aggrMethod){ switch(aggrMethod){
case SUM: case SUM:
data = sum; data = sum;
aggrConfidence = confidenceSum / (float)this.expectedSampleCount; aggrConfidence = confidenceSum / (float)this.expectedSampleCount;
break; break;
case AVERAGE: case AVERAGE:
data = sum/samples; data = sum/samples;
aggrConfidence = 1; // ignore confidence for average aggrConfidence = 1; // ignore confidence for average
break; break;
} }
logger.finer("Saved period: " + currentPeriod + logger.finer("Saved period: " + currentPeriod +
", data: " + data + ", data: " + data +
", confidence: " + aggrConfidence + ", confidence: " + aggrConfidence +
", samples: " + samples + ", samples: " + samples +
", aggrMethod: " + aggrMethod); ", aggrMethod: " + aggrMethod);
preparedInsertStmt.setLong(1, sensorId); preparedInsertStmt.setLong(1, sensorId);
preparedInsertStmt.setLong(2, sequenceId); preparedInsertStmt.setLong(2, sequenceId);
preparedInsertStmt.setLong(3, currentPeriod.getStartTimestamp()); preparedInsertStmt.setLong(3, currentPeriod.getStartTimestamp());
preparedInsertStmt.setLong(4, currentPeriod.getEndTimestamp()); preparedInsertStmt.setLong(4, currentPeriod.getEndTimestamp());
preparedInsertStmt.setFloat(5, data); preparedInsertStmt.setFloat(5, data);
preparedInsertStmt.setFloat(6, aggrConfidence); preparedInsertStmt.setFloat(6, aggrConfidence);
preparedInsertStmt.addBatch(); preparedInsertStmt.addBatch();
} }
} }
} }

View file

@ -150,7 +150,7 @@ public class HalAlertManager implements HttpPage {
case DISMISSED: this.ttl = Integer.MAX_VALUE; break; case DISMISSED: this.ttl = Integer.MAX_VALUE; break;
} }
} }
public void dissmiss(){ public void dismiss(){
ttl = -1; ttl = -1;
} }
@ -159,7 +159,7 @@ public class HalAlertManager implements HttpPage {
if (obj instanceof HalAlert) if (obj instanceof HalAlert)
return level == ((HalAlert) obj).level && return level == ((HalAlert) obj).level &&
title.equals(((HalAlert) obj).title); title.equals(((HalAlert) obj).title);
return false; return super.equals(obj);
} }
} }
} }