diff --git a/src/se/hal/daemon/SensorDataAggregatorDaemon.java b/src/se/hal/daemon/SensorDataAggregatorDaemon.java index 554df8f1..8fe1c1d0 100755 --- a/src/se/hal/daemon/SensorDataAggregatorDaemon.java +++ b/src/se/hal/daemon/SensorDataAggregatorDaemon.java @@ -89,38 +89,42 @@ public class SensorDataAggregatorDaemon implements HalDaemon { long sensorId = sensor.getId(); AggregationMethod aggrMethod = sensor.getDeviceConfig().getAggregationMethod(); DBConnection db = HalContext.getDB(); - PreparedStatement stmt = null; + PreparedStatement stmt; try { - Long maxAggrTimestampInDB = getLatestAggrTimestamp(db, sensor, aggrPeriodLength); - if(maxAggrTimestampInDB == null) - maxAggrTimestampInDB = 0l; - - long latestCompletePeriodEndTimestamp = new UTCTimePeriod(aggregationStartTime, aggrPeriodLength).getPreviosPeriod().getEndTimestamp(); - long oldestPeriodStartTimestamp = new UTCTimePeriod(aggregationStartTime-ageLimitInMs, aggrPeriodLength).getStartTimestamp(); + // DB timestamps + long dbMaxRawTimestamp = getLatestRawTimestamp(db, sensor); + long dbMaxAggrEndTimestamp = getLatestAggrEndTimestamp(db, sensor, aggrPeriodLength); + // Periods + long periodLatestEndTimestamp = new UTCTimePeriod(aggregationStartTime, aggrPeriodLength).getPreviosPeriod().getEndTimestamp(); + long periodOldestStartTimestamp = new UTCTimePeriod(aggregationStartTime-ageLimitInMs, aggrPeriodLength).getStartTimestamp(); // Check if the sensor has stopped responding for 15 min or 3 times the data interval and alert the user if (aggrPeriodLength == AggregationPeriodLength.FIVE_MINUTES) { - Long maxRawTimestampInDB = getLatestRawTimestamp(db, sensor); long dataInterval = sensor.getDeviceConfig().getDataInterval(); if (dataInterval < UTCTimeUtility.FIVE_MINUTES_IN_MS) dataInterval = UTCTimeUtility.FIVE_MINUTES_IN_MS; - if (maxRawTimestampInDB != null && - maxRawTimestampInDB + (dataInterval * 3) < System.currentTimeMillis()) { - logger.fine("Sensor \"" + sensorId + "\" has stopped sending data"); + if (dbMaxRawTimestamp > 0 && + dbMaxRawTimestamp + (dataInterval * 3) < System.currentTimeMillis()) { + logger.fine("Sensor \"" + sensorId + "\" stopped sending data at: "+ dbMaxRawTimestamp); HalAlertManager.getInstance().addAlert(new HalAlert(AlertLevel.WARNING, - "Sensor \"" + sensor.getName() + "\" has stopped responding", - "since "+maxRawTimestampInDB+"", + "Sensor \"" + sensor.getName() + "\" stopped responding", + "at "+dbMaxRawTimestamp+"", AlertTTL.DISMISSED)); } } - if(latestCompletePeriodEndTimestamp == maxAggrTimestampInDB){ - logger.fine("no new data to evaluate - aggregation is up to date"); + // Is there any new data to evaluate? + if(dbMaxRawTimestamp < dbMaxAggrEndTimestamp || dbMaxRawTimestamp < periodOldestStartTimestamp){ + logger.fine("No new data to evaluate - aggregation is up to date"); return; - }else{ - logger.fine("evaluating period: "+ (maxAggrTimestampInDB+1) + "=>" + latestCompletePeriodEndTimestamp + " (" + UTCTimeUtility.getDateString(maxAggrTimestampInDB+1) + "=>" + UTCTimeUtility.getDateString(latestCompletePeriodEndTimestamp) + ") with expected sample count: " + expectedSampleCount); } - + + // Start processing + logger.fine("evaluating period: "+ + (dbMaxAggrEndTimestamp+1) + "=>" + periodLatestEndTimestamp + + " (" + UTCTimeUtility.getDateString(dbMaxAggrEndTimestamp+1) + "=>" + + UTCTimeUtility.getDateString(periodLatestEndTimestamp) + ") " + + "with expected sample count: " + expectedSampleCount); stmt = db.getPreparedStatement("SELECT *, 1 AS confidence FROM sensor_data_raw" +" WHERE sensor_id == ?" + " AND timestamp > ?" @@ -128,16 +132,16 @@ public class SensorDataAggregatorDaemon implements HalDaemon { + " AND timestamp >= ? " +" ORDER BY timestamp ASC"); stmt.setLong(1, sensorId); - stmt.setLong(2, maxAggrTimestampInDB); - stmt.setLong(3, latestCompletePeriodEndTimestamp); - stmt.setLong(4, oldestPeriodStartTimestamp); + stmt.setLong(2, dbMaxAggrEndTimestamp); + stmt.setLong(3, periodLatestEndTimestamp); + stmt.setLong(4, periodOldestStartTimestamp); DBConnection.exec(stmt, new DataAggregator(sensorId, aggrPeriodLength, expectedSampleCount, aggrMethod, aggregationStartTime)); } catch (SQLException e) { logger.log(Level.SEVERE, null, e); } } - private Long getLatestAggrTimestamp(DBConnection db, Sensor sensor, AggregationPeriodLength aggrPeriodLength) throws SQLException { + private long getLatestAggrEndTimestamp(DBConnection db, Sensor sensor, AggregationPeriodLength aggrPeriodLength) throws SQLException { PreparedStatement stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr" + " WHERE sensor_id == ?" + " AND timestamp_end-timestamp_start == ?"); @@ -153,12 +157,15 @@ public class SensorDataAggregatorDaemon implements HalDaemon { default: throw new IllegalArgumentException("aggregation period length is not supported."); } - return DBConnection.exec(stmt, new SimpleSQLResult()); + Long timestamp = DBConnection.exec(stmt, new SimpleSQLResult()); + return (timestamp == null ? 0l : timestamp); } - private Long getLatestRawTimestamp(DBConnection db, Sensor sensor) throws SQLException { - PreparedStatement stmt = db.getPreparedStatement("SELECT MAX(timestamp) FROM sensor_data_raw WHERE sensor_id == ?"); + private long getLatestRawTimestamp(DBConnection db, Sensor sensor) throws SQLException { + PreparedStatement stmt = db.getPreparedStatement( + "SELECT MAX(timestamp) FROM sensor_data_raw WHERE sensor_id == ?"); stmt.setLong(1, sensor.getId()); - return DBConnection.exec(stmt, new SimpleSQLResult()); + Long timestamp = DBConnection.exec(stmt, new SimpleSQLResult()); + return (timestamp == null ? 0l : timestamp); } @@ -191,10 +198,13 @@ public class SensorDataAggregatorDaemon implements HalDaemon { int samples = 0; long highestSequenceId = Sensor.getHighestSequenceId(sensorId); PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement( - "INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)"); + "INSERT INTO sensor_data_aggr" + + "(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) " + + "VALUES(?, ?, ?, ?, ?, ?)"); while(result.next()){ if(sensorId != result.getInt("sensor_id")){ - throw new IllegalArgumentException("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")"); + throw new IllegalArgumentException("found entry for aggregation for the wrong sensorId " + + "(expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")"); } long timestamp = result.getLong("timestamp"); @@ -218,7 +228,8 @@ public class SensorDataAggregatorDaemon implements HalDaemon { } //check if the last period is complete and also should be aggregated - if(currentPeriod != null && currentPeriod.getEndTimestamp() <= new UTCTimePeriod(aggregationStartTime, aggrPeriodLength).getPreviosPeriod().getEndTimestamp()){ + if(currentPeriod != null && + currentPeriod.getEndTimestamp() <= new UTCTimePeriod(aggregationStartTime, aggrPeriodLength).getPreviosPeriod().getEndTimestamp()){ saveData(preparedInsertStmt, confidenceSum, sum, samples, currentPeriod, ++highestSequenceId); } @@ -247,7 +258,11 @@ public class SensorDataAggregatorDaemon implements HalDaemon { aggrConfidence = 1; // ignore confidence for average break; } - logger.finer("saved period: " + currentPeriod + ", data: " + data + ", confidence: " + aggrConfidence + ", samples: " + samples + ", aggrMethod: " + aggrMethod); + logger.finer("Saved period: " + currentPeriod + + ", data: " + data + + ", confidence: " + aggrConfidence + + ", samples: " + samples + + ", aggrMethod: " + aggrMethod); preparedInsertStmt.setLong(1, sensorId); preparedInsertStmt.setLong(2, sequenceId);