Fixed small bug where aggregator ran even if there was no data to aggregate. issue 46

This commit is contained in:
Ziver Koc 2017-03-23 17:34:54 +01:00
parent 160c437430
commit cc703e5091

View file

@ -89,38 +89,42 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
long sensorId = sensor.getId();
AggregationMethod aggrMethod = sensor.getDeviceConfig().getAggregationMethod();
DBConnection db = HalContext.getDB();
PreparedStatement stmt = null;
PreparedStatement stmt;
try {
Long maxAggrTimestampInDB = getLatestAggrTimestamp(db, sensor, aggrPeriodLength);
if(maxAggrTimestampInDB == null)
maxAggrTimestampInDB = 0l;
long latestCompletePeriodEndTimestamp = new UTCTimePeriod(aggregationStartTime, aggrPeriodLength).getPreviosPeriod().getEndTimestamp();
long oldestPeriodStartTimestamp = new UTCTimePeriod(aggregationStartTime-ageLimitInMs, aggrPeriodLength).getStartTimestamp();
// DB timestamps
long dbMaxRawTimestamp = getLatestRawTimestamp(db, sensor);
long dbMaxAggrEndTimestamp = getLatestAggrEndTimestamp(db, sensor, aggrPeriodLength);
// Periods
long periodLatestEndTimestamp = new UTCTimePeriod(aggregationStartTime, aggrPeriodLength).getPreviosPeriod().getEndTimestamp();
long periodOldestStartTimestamp = new UTCTimePeriod(aggregationStartTime-ageLimitInMs, aggrPeriodLength).getStartTimestamp();
// Check if the sensor has stopped responding for 15 min or 3 times the data interval and alert the user
if (aggrPeriodLength == AggregationPeriodLength.FIVE_MINUTES) {
Long maxRawTimestampInDB = getLatestRawTimestamp(db, sensor);
long dataInterval = sensor.getDeviceConfig().getDataInterval();
if (dataInterval < UTCTimeUtility.FIVE_MINUTES_IN_MS)
dataInterval = UTCTimeUtility.FIVE_MINUTES_IN_MS;
if (maxRawTimestampInDB != null &&
maxRawTimestampInDB + (dataInterval * 3) < System.currentTimeMillis()) {
logger.fine("Sensor \"" + sensorId + "\" has stopped sending data");
if (dbMaxRawTimestamp > 0 &&
dbMaxRawTimestamp + (dataInterval * 3) < System.currentTimeMillis()) {
logger.fine("Sensor \"" + sensorId + "\" stopped sending data at: "+ dbMaxRawTimestamp);
HalAlertManager.getInstance().addAlert(new HalAlert(AlertLevel.WARNING,
"Sensor \"" + sensor.getName() + "\" has stopped responding",
"since <span class=\"timestamp\">"+maxRawTimestampInDB+"</span>",
"Sensor \"" + sensor.getName() + "\" stopped responding",
"at <span class=\"timestamp\">"+dbMaxRawTimestamp+"</span>",
AlertTTL.DISMISSED));
}
}
if(latestCompletePeriodEndTimestamp == maxAggrTimestampInDB){
logger.fine("no new data to evaluate - aggregation is up to date");
// Is there any new data to evaluate?
if(dbMaxRawTimestamp < dbMaxAggrEndTimestamp || dbMaxRawTimestamp < periodOldestStartTimestamp){
logger.fine("No new data to evaluate - aggregation is up to date");
return;
}else{
logger.fine("evaluating period: "+ (maxAggrTimestampInDB+1) + "=>" + latestCompletePeriodEndTimestamp + " (" + UTCTimeUtility.getDateString(maxAggrTimestampInDB+1) + "=>" + UTCTimeUtility.getDateString(latestCompletePeriodEndTimestamp) + ") with expected sample count: " + expectedSampleCount);
}
// Start processing
logger.fine("evaluating period: "+
(dbMaxAggrEndTimestamp+1) + "=>" + periodLatestEndTimestamp +
" (" + UTCTimeUtility.getDateString(dbMaxAggrEndTimestamp+1) + "=>" +
UTCTimeUtility.getDateString(periodLatestEndTimestamp) + ") " +
"with expected sample count: " + expectedSampleCount);
stmt = db.getPreparedStatement("SELECT *, 1 AS confidence FROM sensor_data_raw"
+" WHERE sensor_id == ?"
+ " AND timestamp > ?"
@ -128,16 +132,16 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
+ " AND timestamp >= ? "
+" ORDER BY timestamp ASC");
stmt.setLong(1, sensorId);
stmt.setLong(2, maxAggrTimestampInDB);
stmt.setLong(3, latestCompletePeriodEndTimestamp);
stmt.setLong(4, oldestPeriodStartTimestamp);
stmt.setLong(2, dbMaxAggrEndTimestamp);
stmt.setLong(3, periodLatestEndTimestamp);
stmt.setLong(4, periodOldestStartTimestamp);
DBConnection.exec(stmt, new DataAggregator(sensorId, aggrPeriodLength, expectedSampleCount, aggrMethod, aggregationStartTime));
} catch (SQLException e) {
logger.log(Level.SEVERE, null, e);
}
}
private Long getLatestAggrTimestamp(DBConnection db, Sensor sensor, AggregationPeriodLength aggrPeriodLength) throws SQLException {
private long getLatestAggrEndTimestamp(DBConnection db, Sensor sensor, AggregationPeriodLength aggrPeriodLength) throws SQLException {
PreparedStatement stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr"
+ " WHERE sensor_id == ?"
+ " AND timestamp_end-timestamp_start == ?");
@ -153,12 +157,15 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
default:
throw new IllegalArgumentException("aggregation period length is not supported.");
}
return DBConnection.exec(stmt, new SimpleSQLResult<Long>());
Long timestamp = DBConnection.exec(stmt, new SimpleSQLResult<Long>());
return (timestamp == null ? 0l : timestamp);
}
private Long getLatestRawTimestamp(DBConnection db, Sensor sensor) throws SQLException {
PreparedStatement stmt = db.getPreparedStatement("SELECT MAX(timestamp) FROM sensor_data_raw WHERE sensor_id == ?");
private long getLatestRawTimestamp(DBConnection db, Sensor sensor) throws SQLException {
PreparedStatement stmt = db.getPreparedStatement(
"SELECT MAX(timestamp) FROM sensor_data_raw WHERE sensor_id == ?");
stmt.setLong(1, sensor.getId());
return DBConnection.exec(stmt, new SimpleSQLResult<Long>());
Long timestamp = DBConnection.exec(stmt, new SimpleSQLResult<Long>());
return (timestamp == null ? 0l : timestamp);
}
@ -191,10 +198,13 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
int samples = 0;
long highestSequenceId = Sensor.getHighestSequenceId(sensorId);
PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement(
"INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
"INSERT INTO sensor_data_aggr" +
"(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) " +
"VALUES(?, ?, ?, ?, ?, ?)");
while(result.next()){
if(sensorId != result.getInt("sensor_id")){
throw new IllegalArgumentException("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")");
throw new IllegalArgumentException("found entry for aggregation for the wrong sensorId " +
"(expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")");
}
long timestamp = result.getLong("timestamp");
@ -218,7 +228,8 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
}
//check if the last period is complete and also should be aggregated
if(currentPeriod != null && currentPeriod.getEndTimestamp() <= new UTCTimePeriod(aggregationStartTime, aggrPeriodLength).getPreviosPeriod().getEndTimestamp()){
if(currentPeriod != null &&
currentPeriod.getEndTimestamp() <= new UTCTimePeriod(aggregationStartTime, aggrPeriodLength).getPreviosPeriod().getEndTimestamp()){
saveData(preparedInsertStmt, confidenceSum, sum, samples, currentPeriod, ++highestSequenceId);
}
@ -247,7 +258,11 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
aggrConfidence = 1; // ignore confidence for average
break;
}
logger.finer("saved period: " + currentPeriod + ", data: " + data + ", confidence: " + aggrConfidence + ", samples: " + samples + ", aggrMethod: " + aggrMethod);
logger.finer("Saved period: " + currentPeriod +
", data: " + data +
", confidence: " + aggrConfidence +
", samples: " + samples +
", aggrMethod: " + aggrMethod);
preparedInsertStmt.setLong(1, sensorId);
preparedInsertStmt.setLong(2, sequenceId);