optimization: executing insert and delete in batches

Former-commit-id: 550aaa9db54972dfdf26702ea897975ec7de3697
This commit is contained in:
dcollin 2015-12-09 16:52:12 +01:00 committed by Daniel Collin
parent dc41dc878a
commit 005c57ad03

View file

@ -63,7 +63,7 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
stmt.setLong(1, sensorId);
stmt.setLong(2, maxDBTimestamp);
stmt.setLong(3, minPeriodTimestamp);
DBConnection.exec(stmt, new FiveMinuteAggregator());
DBConnection.exec(stmt, new FiveMinuteAggregator(sensorId));
// hour aggregation
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr"
@ -82,7 +82,7 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
stmt.setLong(2, maxDBTimestamp);
stmt.setLong(3, hourPeriodTimestamp);
stmt.setLong(4, FIVE_MINUTES_IN_MS-1);
DBConnection.exec(stmt, new HourAggregator());
DBConnection.exec(stmt, new HourAggregator(sensorId));
// day aggregation
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?");
@ -100,7 +100,7 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
stmt.setLong(2, maxDBTimestamp);
stmt.setLong(3, dayPeriodTimestamp);
stmt.setLong(4, HOUR_IN_MS-1);
DBConnection.exec(stmt, new DayAggregator());
DBConnection.exec(stmt, new DayAggregator(sensorId));
logger.fine("Done aggregation");
} catch (SQLException e) {
@ -131,27 +131,39 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
private class FiveMinuteAggregator implements SQLResultHandler<Object>{
private long sensorId = -1;
public FiveMinuteAggregator(long sensorId) {
this.sensorId = sensorId;
}
@Override
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
long currentPeriodTimestamp = 0;
int sum = 0;
int count = 0;
int samples = 0;
long highestSequenceId = Sensor.getHighestSequenceId(sensorId);
HalContext.getDB().getConnection().setAutoCommit(false);
boolean error = false;
PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
while(result.next()){
if(sensorId != result.getInt("sensor_id")){
logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")");
error = true;
break;
}
long timestamp = result.getLong("timestamp");
long periodTimestamp = getTimestampMinutePeriodStart(5, timestamp);
if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){
float confidence = count / 5f;
logger.finer("Calculated minute period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ confidence+ " samples: " + samples);
PreparedStatement prepStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
prepStmt.setInt(1, result.getInt("sensor_id"));
prepStmt.setLong(2, Sensor.getHighestSequenceId(result.getInt("sensor_id")) + 1);
prepStmt.setLong(3, currentPeriodTimestamp);
prepStmt.setLong(4, currentPeriodTimestamp + FIVE_MINUTES_IN_MS - 1);
prepStmt.setInt(5, sum);
prepStmt.setFloat(6, confidence);
DBConnection.exec(prepStmt);
preparedInsertStmt.setLong(1, sensorId);
preparedInsertStmt.setLong(2, ++highestSequenceId);
preparedInsertStmt.setLong(3, currentPeriodTimestamp);
preparedInsertStmt.setLong(4, currentPeriodTimestamp + FIVE_MINUTES_IN_MS - 1);
preparedInsertStmt.setInt(5, sum);
preparedInsertStmt.setFloat(6, confidence);
preparedInsertStmt.addBatch();
//DBConnection.exec(prepStmt);
// Reset variables
currentPeriodTimestamp = periodTimestamp;
@ -162,34 +174,51 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
++count;
++samples;
}
HalContext.getDB().getConnection().commit();
DBConnection.execBatch(preparedInsertStmt);
if(!error){
HalContext.getDB().getConnection().commit();
}else{
HalContext.getDB().getConnection().rollback();
}
HalContext.getDB().getConnection().setAutoCommit(true);
return null;
}
}
private class HourAggregator implements SQLResultHandler<Object>{
private long sensorId = -1;
public HourAggregator(long sensorId) {
this.sensorId = sensorId;
}
@Override
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
long currentPeriodTimestamp = 0;
int sum = 0;
float confidenceSum = 0;
int samples = 0;
long highestSequenceId = Sensor.getHighestSequenceId(sensorId);
HalContext.getDB().getConnection().setAutoCommit(false);
boolean error = false;
PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
PreparedStatement preparedDeleteStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?");
while(result.next()){
if(sensorId != result.getInt("sensor_id")){
logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")");
error = true;
break;
}
long timestamp = result.getLong("timestamp_start");
long periodTimestamp = getTimestampMinutePeriodStart(60, timestamp);
if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){
float aggrConfidence = confidenceSum / 12f;
logger.finer("Calculated hour period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ aggrConfidence+ " samples: " + samples);
PreparedStatement prepStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
prepStmt.setInt(1, result.getInt("sensor_id"));
prepStmt.setLong(2, Sensor.getHighestSequenceId(result.getInt("sensor_id")) + 1);
prepStmt.setLong(3, currentPeriodTimestamp);
prepStmt.setLong(4, currentPeriodTimestamp + HOUR_IN_MS - 1);
prepStmt.setInt(5, sum);
prepStmt.setFloat(6, aggrConfidence);
DBConnection.exec(prepStmt);
preparedInsertStmt.setInt(1, result.getInt("sensor_id"));
preparedInsertStmt.setLong(2, ++highestSequenceId);
preparedInsertStmt.setLong(3, currentPeriodTimestamp);
preparedInsertStmt.setLong(4, currentPeriodTimestamp + HOUR_IN_MS - 1);
preparedInsertStmt.setInt(5, sum);
preparedInsertStmt.setFloat(6, aggrConfidence);
preparedInsertStmt.addBatch();
// Reset variables
currentPeriodTimestamp = periodTimestamp;
@ -200,40 +229,56 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
confidenceSum += result.getFloat("confidence");
samples++;
//TODO: SHould not be here!
PreparedStatement prepStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?");
prepStmt.setInt(1, result.getInt("sensor_id"));
prepStmt.setInt(2, result.getInt("sequence_id"));
DBConnection.exec(prepStmt);
preparedDeleteStmt.setInt(1, result.getInt("sensor_id"));
preparedDeleteStmt.setInt(2, result.getInt("sequence_id"));
preparedDeleteStmt.addBatch();
}
DBConnection.execBatch(preparedInsertStmt);
DBConnection.execBatch(preparedDeleteStmt);
if(!error){
HalContext.getDB().getConnection().commit();
}else{
HalContext.getDB().getConnection().rollback();
}
HalContext.getDB().getConnection().commit();
HalContext.getDB().getConnection().setAutoCommit(true);
return null;
}
}
private class DayAggregator implements SQLResultHandler<Object>{
private long sensorId = -1;
public DayAggregator(long sensorId) {
this.sensorId = sensorId;
}
@Override
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
long currentPeriodTimestamp = 0;
int sum = 0;
float confidenceSum = 0;
int samples = 0;
long highestSequenceId = Sensor.getHighestSequenceId(sensorId);
HalContext.getDB().getConnection().setAutoCommit(false);
boolean error = false;
PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
PreparedStatement preparedDeleteStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?");
while(result.next()){
if(sensorId != result.getInt("sensor_id")){
logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")");
error = true;
break;
}
long timestamp = result.getLong("timestamp_start");
long periodTimestamp = getTimestampHourPeriodStart(24, timestamp);
if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){
float aggrConfidence = confidenceSum / 24f;
logger.finer("Calculated day period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ aggrConfidence+ " samples: " + samples);
PreparedStatement prepStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
prepStmt.setInt(1, result.getInt("sensor_id"));
prepStmt.setLong(2, Sensor.getHighestSequenceId(result.getInt("sensor_id")) + 1);
prepStmt.setLong(3, currentPeriodTimestamp);
prepStmt.setLong(4, currentPeriodTimestamp + DAY_IN_MS - 1);
prepStmt.setInt(5, sum);
prepStmt.setFloat(6, aggrConfidence);
DBConnection.exec(prepStmt);
preparedInsertStmt.setInt(1, result.getInt("sensor_id"));
preparedInsertStmt.setLong(2, ++highestSequenceId);
preparedInsertStmt.setLong(3, currentPeriodTimestamp);
preparedInsertStmt.setLong(4, currentPeriodTimestamp + DAY_IN_MS - 1);
preparedInsertStmt.setInt(5, sum);
preparedInsertStmt.setFloat(6, aggrConfidence);
preparedInsertStmt.addBatch();
// Reset variables
currentPeriodTimestamp = periodTimestamp;
@ -245,13 +290,17 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
confidenceSum += result.getFloat("confidence");
samples++;
//TODO: SHould not be here!
PreparedStatement prepStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?");
prepStmt.setInt(1, result.getInt("sensor_id"));
prepStmt.setInt(2, result.getInt("sequence_id"));
DBConnection.exec(prepStmt);
preparedDeleteStmt.setInt(1, result.getInt("sensor_id"));
preparedDeleteStmt.setInt(2, result.getInt("sequence_id"));
preparedDeleteStmt.addBatch();
}
DBConnection.execBatch(preparedInsertStmt);
DBConnection.execBatch(preparedDeleteStmt);
if(!error){
HalContext.getDB().getConnection().commit();
}else{
HalContext.getDB().getConnection().rollback();
}
HalContext.getDB().getConnection().commit();
HalContext.getDB().getConnection().setAutoCommit(true);
return null;
}