diff --git a/src/se/koc/hal/deamon/DataAggregatorDaemon.java b/src/se/koc/hal/deamon/DataAggregatorDaemon.java index 6f811910..e80f8588 100755 --- a/src/se/koc/hal/deamon/DataAggregatorDaemon.java +++ b/src/se/koc/hal/deamon/DataAggregatorDaemon.java @@ -63,7 +63,7 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon { stmt.setLong(1, sensorId); stmt.setLong(2, maxDBTimestamp); stmt.setLong(3, minPeriodTimestamp); - DBConnection.exec(stmt, new FiveMinuteAggregator()); + DBConnection.exec(stmt, new FiveMinuteAggregator(sensorId)); // hour aggregation stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr" @@ -82,7 +82,7 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon { stmt.setLong(2, maxDBTimestamp); stmt.setLong(3, hourPeriodTimestamp); stmt.setLong(4, FIVE_MINUTES_IN_MS-1); - DBConnection.exec(stmt, new HourAggregator()); + DBConnection.exec(stmt, new HourAggregator(sensorId)); // day aggregation stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?"); @@ -100,7 +100,7 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon { stmt.setLong(2, maxDBTimestamp); stmt.setLong(3, dayPeriodTimestamp); stmt.setLong(4, HOUR_IN_MS-1); - DBConnection.exec(stmt, new DayAggregator()); + DBConnection.exec(stmt, new DayAggregator(sensorId)); logger.fine("Done aggregation"); } catch (SQLException e) { @@ -131,26 +131,39 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon { private class FiveMinuteAggregator implements SQLResultHandler{ + private long sensorId = -1; + public FiveMinuteAggregator(long sensorId) { + this.sensorId = sensorId; + } @Override public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException { long currentPeriodTimestamp = 0; int sum = 0; int count = 0; int samples = 0; + long highestSequenceId = Sensor.getHighestSequenceId(sensorId); + HalContext.getDB().getConnection().setAutoCommit(false); + boolean error = false; + PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)"); while(result.next()){ + if(sensorId != result.getInt("sensor_id")){ + logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")"); + error = true; + break; + } long timestamp = result.getLong("timestamp"); long periodTimestamp = getTimestampMinutePeriodStart(5, timestamp); if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){ float confidence = count / 5f; logger.finer("Calculated minute period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ confidence+ " samples: " + samples); - PreparedStatement prepStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)"); - prepStmt.setInt(1, result.getInt("sensor_id")); - prepStmt.setLong(2, Sensor.getHighestSequenceId(result.getInt("sensor_id")) + 1); - prepStmt.setLong(3, currentPeriodTimestamp); - prepStmt.setLong(4, currentPeriodTimestamp + FIVE_MINUTES_IN_MS - 1); - prepStmt.setInt(5, sum); - prepStmt.setFloat(6, confidence); - DBConnection.exec(prepStmt); + preparedInsertStmt.setLong(1, sensorId); + preparedInsertStmt.setLong(2, ++highestSequenceId); + preparedInsertStmt.setLong(3, currentPeriodTimestamp); + preparedInsertStmt.setLong(4, currentPeriodTimestamp + FIVE_MINUTES_IN_MS - 1); + preparedInsertStmt.setInt(5, sum); + preparedInsertStmt.setFloat(6, confidence); + preparedInsertStmt.addBatch(); + //DBConnection.exec(prepStmt); // Reset variables currentPeriodTimestamp = periodTimestamp; @@ -161,31 +174,51 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon { ++count; ++samples; } + DBConnection.execBatch(preparedInsertStmt); + if(!error){ + HalContext.getDB().getConnection().commit(); + }else{ + HalContext.getDB().getConnection().rollback(); + } + HalContext.getDB().getConnection().setAutoCommit(true); return null; } } private class HourAggregator implements SQLResultHandler{ + private long sensorId = -1; + public HourAggregator(long sensorId) { + this.sensorId = sensorId; + } @Override public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException { long currentPeriodTimestamp = 0; int sum = 0; float confidenceSum = 0; int samples = 0; + long highestSequenceId = Sensor.getHighestSequenceId(sensorId); + HalContext.getDB().getConnection().setAutoCommit(false); + boolean error = false; + PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)"); + PreparedStatement preparedDeleteStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?"); while(result.next()){ + if(sensorId != result.getInt("sensor_id")){ + logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")"); + error = true; + break; + } long timestamp = result.getLong("timestamp_start"); long periodTimestamp = getTimestampMinutePeriodStart(60, timestamp); if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){ float aggrConfidence = confidenceSum / 12f; logger.finer("Calculated hour period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ aggrConfidence+ " samples: " + samples); - PreparedStatement prepStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)"); - prepStmt.setInt(1, result.getInt("sensor_id")); - prepStmt.setLong(2, Sensor.getHighestSequenceId(result.getInt("sensor_id")) + 1); - prepStmt.setLong(3, currentPeriodTimestamp); - prepStmt.setLong(4, currentPeriodTimestamp + HOUR_IN_MS - 1); - prepStmt.setInt(5, sum); - prepStmt.setFloat(6, aggrConfidence); - DBConnection.exec(prepStmt); + preparedInsertStmt.setInt(1, result.getInt("sensor_id")); + preparedInsertStmt.setLong(2, ++highestSequenceId); + preparedInsertStmt.setLong(3, currentPeriodTimestamp); + preparedInsertStmt.setLong(4, currentPeriodTimestamp + HOUR_IN_MS - 1); + preparedInsertStmt.setInt(5, sum); + preparedInsertStmt.setFloat(6, aggrConfidence); + preparedInsertStmt.addBatch(); // Reset variables currentPeriodTimestamp = periodTimestamp; @@ -196,37 +229,56 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon { confidenceSum += result.getFloat("confidence"); samples++; - //TODO: SHould not be here! - PreparedStatement prepStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?"); - prepStmt.setInt(1, result.getInt("sensor_id")); - prepStmt.setInt(2, result.getInt("sequence_id")); - DBConnection.exec(prepStmt); + preparedDeleteStmt.setInt(1, result.getInt("sensor_id")); + preparedDeleteStmt.setInt(2, result.getInt("sequence_id")); + preparedDeleteStmt.addBatch(); } + DBConnection.execBatch(preparedInsertStmt); + DBConnection.execBatch(preparedDeleteStmt); + if(!error){ + HalContext.getDB().getConnection().commit(); + }else{ + HalContext.getDB().getConnection().rollback(); + } + HalContext.getDB().getConnection().setAutoCommit(true); return null; } } private class DayAggregator implements SQLResultHandler{ + private long sensorId = -1; + public DayAggregator(long sensorId) { + this.sensorId = sensorId; + } @Override public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException { long currentPeriodTimestamp = 0; int sum = 0; float confidenceSum = 0; int samples = 0; + long highestSequenceId = Sensor.getHighestSequenceId(sensorId); + HalContext.getDB().getConnection().setAutoCommit(false); + boolean error = false; + PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)"); + PreparedStatement preparedDeleteStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?"); while(result.next()){ + if(sensorId != result.getInt("sensor_id")){ + logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")"); + error = true; + break; + } long timestamp = result.getLong("timestamp_start"); long periodTimestamp = getTimestampHourPeriodStart(24, timestamp); if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){ float aggrConfidence = confidenceSum / 24f; logger.finer("Calculated day period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ aggrConfidence+ " samples: " + samples); - PreparedStatement prepStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)"); - prepStmt.setInt(1, result.getInt("sensor_id")); - prepStmt.setLong(2, Sensor.getHighestSequenceId(result.getInt("sensor_id")) + 1); - prepStmt.setLong(3, currentPeriodTimestamp); - prepStmt.setLong(4, currentPeriodTimestamp + DAY_IN_MS - 1); - prepStmt.setInt(5, sum); - prepStmt.setFloat(6, aggrConfidence); - DBConnection.exec(prepStmt); + preparedInsertStmt.setInt(1, result.getInt("sensor_id")); + preparedInsertStmt.setLong(2, ++highestSequenceId); + preparedInsertStmt.setLong(3, currentPeriodTimestamp); + preparedInsertStmt.setLong(4, currentPeriodTimestamp + DAY_IN_MS - 1); + preparedInsertStmt.setInt(5, sum); + preparedInsertStmt.setFloat(6, aggrConfidence); + preparedInsertStmt.addBatch(); // Reset variables currentPeriodTimestamp = periodTimestamp; @@ -238,12 +290,18 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon { confidenceSum += result.getFloat("confidence"); samples++; - //TODO: SHould not be here! - PreparedStatement prepStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?"); - prepStmt.setInt(1, result.getInt("sensor_id")); - prepStmt.setInt(2, result.getInt("sequence_id")); - DBConnection.exec(prepStmt); + preparedDeleteStmt.setInt(1, result.getInt("sensor_id")); + preparedDeleteStmt.setInt(2, result.getInt("sequence_id")); + preparedDeleteStmt.addBatch(); } + DBConnection.execBatch(preparedInsertStmt); + DBConnection.execBatch(preparedDeleteStmt); + if(!error){ + HalContext.getDB().getConnection().commit(); + }else{ + HalContext.getDB().getConnection().rollback(); + } + HalContext.getDB().getConnection().setAutoCommit(true); return null; } }