Merge branch 'master' of http://repo.koc.se/hal
Former-commit-id: aff7d7a63c125693132d2d73e17f4e36cc8b51ca
This commit is contained in:
commit
95b865e556
1 changed files with 95 additions and 37 deletions
|
|
@ -63,7 +63,7 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
|
||||||
stmt.setLong(1, sensorId);
|
stmt.setLong(1, sensorId);
|
||||||
stmt.setLong(2, maxDBTimestamp);
|
stmt.setLong(2, maxDBTimestamp);
|
||||||
stmt.setLong(3, minPeriodTimestamp);
|
stmt.setLong(3, minPeriodTimestamp);
|
||||||
DBConnection.exec(stmt, new FiveMinuteAggregator());
|
DBConnection.exec(stmt, new FiveMinuteAggregator(sensorId));
|
||||||
|
|
||||||
// hour aggregation
|
// hour aggregation
|
||||||
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr"
|
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr"
|
||||||
|
|
@ -82,7 +82,7 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
|
||||||
stmt.setLong(2, maxDBTimestamp);
|
stmt.setLong(2, maxDBTimestamp);
|
||||||
stmt.setLong(3, hourPeriodTimestamp);
|
stmt.setLong(3, hourPeriodTimestamp);
|
||||||
stmt.setLong(4, FIVE_MINUTES_IN_MS-1);
|
stmt.setLong(4, FIVE_MINUTES_IN_MS-1);
|
||||||
DBConnection.exec(stmt, new HourAggregator());
|
DBConnection.exec(stmt, new HourAggregator(sensorId));
|
||||||
|
|
||||||
// day aggregation
|
// day aggregation
|
||||||
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?");
|
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?");
|
||||||
|
|
@ -100,7 +100,7 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
|
||||||
stmt.setLong(2, maxDBTimestamp);
|
stmt.setLong(2, maxDBTimestamp);
|
||||||
stmt.setLong(3, dayPeriodTimestamp);
|
stmt.setLong(3, dayPeriodTimestamp);
|
||||||
stmt.setLong(4, HOUR_IN_MS-1);
|
stmt.setLong(4, HOUR_IN_MS-1);
|
||||||
DBConnection.exec(stmt, new DayAggregator());
|
DBConnection.exec(stmt, new DayAggregator(sensorId));
|
||||||
|
|
||||||
logger.fine("Done aggregation");
|
logger.fine("Done aggregation");
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
|
|
@ -131,26 +131,39 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
|
||||||
|
|
||||||
|
|
||||||
private class FiveMinuteAggregator implements SQLResultHandler<Object>{
|
private class FiveMinuteAggregator implements SQLResultHandler<Object>{
|
||||||
|
private long sensorId = -1;
|
||||||
|
public FiveMinuteAggregator(long sensorId) {
|
||||||
|
this.sensorId = sensorId;
|
||||||
|
}
|
||||||
@Override
|
@Override
|
||||||
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
|
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
|
||||||
long currentPeriodTimestamp = 0;
|
long currentPeriodTimestamp = 0;
|
||||||
int sum = 0;
|
int sum = 0;
|
||||||
int count = 0;
|
int count = 0;
|
||||||
int samples = 0;
|
int samples = 0;
|
||||||
|
long highestSequenceId = Sensor.getHighestSequenceId(sensorId);
|
||||||
|
HalContext.getDB().getConnection().setAutoCommit(false);
|
||||||
|
boolean error = false;
|
||||||
|
PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
|
||||||
while(result.next()){
|
while(result.next()){
|
||||||
|
if(sensorId != result.getInt("sensor_id")){
|
||||||
|
logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")");
|
||||||
|
error = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
long timestamp = result.getLong("timestamp");
|
long timestamp = result.getLong("timestamp");
|
||||||
long periodTimestamp = getTimestampMinutePeriodStart(5, timestamp);
|
long periodTimestamp = getTimestampMinutePeriodStart(5, timestamp);
|
||||||
if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){
|
if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){
|
||||||
float confidence = count / 5f;
|
float confidence = count / 5f;
|
||||||
logger.finer("Calculated minute period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ confidence+ " samples: " + samples);
|
logger.finer("Calculated minute period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ confidence+ " samples: " + samples);
|
||||||
PreparedStatement prepStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
|
preparedInsertStmt.setLong(1, sensorId);
|
||||||
prepStmt.setInt(1, result.getInt("sensor_id"));
|
preparedInsertStmt.setLong(2, ++highestSequenceId);
|
||||||
prepStmt.setLong(2, Sensor.getHighestSequenceId(result.getInt("sensor_id")) + 1);
|
preparedInsertStmt.setLong(3, currentPeriodTimestamp);
|
||||||
prepStmt.setLong(3, currentPeriodTimestamp);
|
preparedInsertStmt.setLong(4, currentPeriodTimestamp + FIVE_MINUTES_IN_MS - 1);
|
||||||
prepStmt.setLong(4, currentPeriodTimestamp + FIVE_MINUTES_IN_MS - 1);
|
preparedInsertStmt.setInt(5, sum);
|
||||||
prepStmt.setInt(5, sum);
|
preparedInsertStmt.setFloat(6, confidence);
|
||||||
prepStmt.setFloat(6, confidence);
|
preparedInsertStmt.addBatch();
|
||||||
DBConnection.exec(prepStmt);
|
//DBConnection.exec(prepStmt);
|
||||||
|
|
||||||
// Reset variables
|
// Reset variables
|
||||||
currentPeriodTimestamp = periodTimestamp;
|
currentPeriodTimestamp = periodTimestamp;
|
||||||
|
|
@ -161,31 +174,51 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
|
||||||
++count;
|
++count;
|
||||||
++samples;
|
++samples;
|
||||||
}
|
}
|
||||||
|
DBConnection.execBatch(preparedInsertStmt);
|
||||||
|
if(!error){
|
||||||
|
HalContext.getDB().getConnection().commit();
|
||||||
|
}else{
|
||||||
|
HalContext.getDB().getConnection().rollback();
|
||||||
|
}
|
||||||
|
HalContext.getDB().getConnection().setAutoCommit(true);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class HourAggregator implements SQLResultHandler<Object>{
|
private class HourAggregator implements SQLResultHandler<Object>{
|
||||||
|
private long sensorId = -1;
|
||||||
|
public HourAggregator(long sensorId) {
|
||||||
|
this.sensorId = sensorId;
|
||||||
|
}
|
||||||
@Override
|
@Override
|
||||||
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
|
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
|
||||||
long currentPeriodTimestamp = 0;
|
long currentPeriodTimestamp = 0;
|
||||||
int sum = 0;
|
int sum = 0;
|
||||||
float confidenceSum = 0;
|
float confidenceSum = 0;
|
||||||
int samples = 0;
|
int samples = 0;
|
||||||
|
long highestSequenceId = Sensor.getHighestSequenceId(sensorId);
|
||||||
|
HalContext.getDB().getConnection().setAutoCommit(false);
|
||||||
|
boolean error = false;
|
||||||
|
PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
|
||||||
|
PreparedStatement preparedDeleteStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?");
|
||||||
while(result.next()){
|
while(result.next()){
|
||||||
|
if(sensorId != result.getInt("sensor_id")){
|
||||||
|
logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")");
|
||||||
|
error = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
long timestamp = result.getLong("timestamp_start");
|
long timestamp = result.getLong("timestamp_start");
|
||||||
long periodTimestamp = getTimestampMinutePeriodStart(60, timestamp);
|
long periodTimestamp = getTimestampMinutePeriodStart(60, timestamp);
|
||||||
if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){
|
if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){
|
||||||
float aggrConfidence = confidenceSum / 12f;
|
float aggrConfidence = confidenceSum / 12f;
|
||||||
logger.finer("Calculated hour period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ aggrConfidence+ " samples: " + samples);
|
logger.finer("Calculated hour period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ aggrConfidence+ " samples: " + samples);
|
||||||
PreparedStatement prepStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
|
preparedInsertStmt.setInt(1, result.getInt("sensor_id"));
|
||||||
prepStmt.setInt(1, result.getInt("sensor_id"));
|
preparedInsertStmt.setLong(2, ++highestSequenceId);
|
||||||
prepStmt.setLong(2, Sensor.getHighestSequenceId(result.getInt("sensor_id")) + 1);
|
preparedInsertStmt.setLong(3, currentPeriodTimestamp);
|
||||||
prepStmt.setLong(3, currentPeriodTimestamp);
|
preparedInsertStmt.setLong(4, currentPeriodTimestamp + HOUR_IN_MS - 1);
|
||||||
prepStmt.setLong(4, currentPeriodTimestamp + HOUR_IN_MS - 1);
|
preparedInsertStmt.setInt(5, sum);
|
||||||
prepStmt.setInt(5, sum);
|
preparedInsertStmt.setFloat(6, aggrConfidence);
|
||||||
prepStmt.setFloat(6, aggrConfidence);
|
preparedInsertStmt.addBatch();
|
||||||
DBConnection.exec(prepStmt);
|
|
||||||
|
|
||||||
// Reset variables
|
// Reset variables
|
||||||
currentPeriodTimestamp = periodTimestamp;
|
currentPeriodTimestamp = periodTimestamp;
|
||||||
|
|
@ -196,37 +229,56 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
|
||||||
confidenceSum += result.getFloat("confidence");
|
confidenceSum += result.getFloat("confidence");
|
||||||
samples++;
|
samples++;
|
||||||
|
|
||||||
//TODO: SHould not be here!
|
preparedDeleteStmt.setInt(1, result.getInt("sensor_id"));
|
||||||
PreparedStatement prepStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?");
|
preparedDeleteStmt.setInt(2, result.getInt("sequence_id"));
|
||||||
prepStmt.setInt(1, result.getInt("sensor_id"));
|
preparedDeleteStmt.addBatch();
|
||||||
prepStmt.setInt(2, result.getInt("sequence_id"));
|
|
||||||
DBConnection.exec(prepStmt);
|
|
||||||
}
|
}
|
||||||
|
DBConnection.execBatch(preparedInsertStmt);
|
||||||
|
DBConnection.execBatch(preparedDeleteStmt);
|
||||||
|
if(!error){
|
||||||
|
HalContext.getDB().getConnection().commit();
|
||||||
|
}else{
|
||||||
|
HalContext.getDB().getConnection().rollback();
|
||||||
|
}
|
||||||
|
HalContext.getDB().getConnection().setAutoCommit(true);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class DayAggregator implements SQLResultHandler<Object>{
|
private class DayAggregator implements SQLResultHandler<Object>{
|
||||||
|
private long sensorId = -1;
|
||||||
|
public DayAggregator(long sensorId) {
|
||||||
|
this.sensorId = sensorId;
|
||||||
|
}
|
||||||
@Override
|
@Override
|
||||||
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
|
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
|
||||||
long currentPeriodTimestamp = 0;
|
long currentPeriodTimestamp = 0;
|
||||||
int sum = 0;
|
int sum = 0;
|
||||||
float confidenceSum = 0;
|
float confidenceSum = 0;
|
||||||
int samples = 0;
|
int samples = 0;
|
||||||
|
long highestSequenceId = Sensor.getHighestSequenceId(sensorId);
|
||||||
|
HalContext.getDB().getConnection().setAutoCommit(false);
|
||||||
|
boolean error = false;
|
||||||
|
PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
|
||||||
|
PreparedStatement preparedDeleteStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?");
|
||||||
while(result.next()){
|
while(result.next()){
|
||||||
|
if(sensorId != result.getInt("sensor_id")){
|
||||||
|
logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")");
|
||||||
|
error = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
long timestamp = result.getLong("timestamp_start");
|
long timestamp = result.getLong("timestamp_start");
|
||||||
long periodTimestamp = getTimestampHourPeriodStart(24, timestamp);
|
long periodTimestamp = getTimestampHourPeriodStart(24, timestamp);
|
||||||
if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){
|
if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){
|
||||||
float aggrConfidence = confidenceSum / 24f;
|
float aggrConfidence = confidenceSum / 24f;
|
||||||
logger.finer("Calculated day period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ aggrConfidence+ " samples: " + samples);
|
logger.finer("Calculated day period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ aggrConfidence+ " samples: " + samples);
|
||||||
PreparedStatement prepStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
|
preparedInsertStmt.setInt(1, result.getInt("sensor_id"));
|
||||||
prepStmt.setInt(1, result.getInt("sensor_id"));
|
preparedInsertStmt.setLong(2, ++highestSequenceId);
|
||||||
prepStmt.setLong(2, Sensor.getHighestSequenceId(result.getInt("sensor_id")) + 1);
|
preparedInsertStmt.setLong(3, currentPeriodTimestamp);
|
||||||
prepStmt.setLong(3, currentPeriodTimestamp);
|
preparedInsertStmt.setLong(4, currentPeriodTimestamp + DAY_IN_MS - 1);
|
||||||
prepStmt.setLong(4, currentPeriodTimestamp + DAY_IN_MS - 1);
|
preparedInsertStmt.setInt(5, sum);
|
||||||
prepStmt.setInt(5, sum);
|
preparedInsertStmt.setFloat(6, aggrConfidence);
|
||||||
prepStmt.setFloat(6, aggrConfidence);
|
preparedInsertStmt.addBatch();
|
||||||
DBConnection.exec(prepStmt);
|
|
||||||
|
|
||||||
// Reset variables
|
// Reset variables
|
||||||
currentPeriodTimestamp = periodTimestamp;
|
currentPeriodTimestamp = periodTimestamp;
|
||||||
|
|
@ -238,12 +290,18 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
|
||||||
confidenceSum += result.getFloat("confidence");
|
confidenceSum += result.getFloat("confidence");
|
||||||
samples++;
|
samples++;
|
||||||
|
|
||||||
//TODO: SHould not be here!
|
preparedDeleteStmt.setInt(1, result.getInt("sensor_id"));
|
||||||
PreparedStatement prepStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?");
|
preparedDeleteStmt.setInt(2, result.getInt("sequence_id"));
|
||||||
prepStmt.setInt(1, result.getInt("sensor_id"));
|
preparedDeleteStmt.addBatch();
|
||||||
prepStmt.setInt(2, result.getInt("sequence_id"));
|
|
||||||
DBConnection.exec(prepStmt);
|
|
||||||
}
|
}
|
||||||
|
DBConnection.execBatch(preparedInsertStmt);
|
||||||
|
DBConnection.execBatch(preparedDeleteStmt);
|
||||||
|
if(!error){
|
||||||
|
HalContext.getDB().getConnection().commit();
|
||||||
|
}else{
|
||||||
|
HalContext.getDB().getConnection().rollback();
|
||||||
|
}
|
||||||
|
HalContext.getDB().getConnection().setAutoCommit(true);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue