Some refactoring in DataDeletionDaemon
Former-commit-id: ffcd854d70d4848fd026a154f7df3cc332710378
This commit is contained in:
parent
046aea1a2c
commit
2c1b4af586
1 changed files with 15 additions and 20 deletions
|
|
@ -7,6 +7,7 @@ import java.sql.Statement;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Timer;
|
import java.util.Timer;
|
||||||
import java.util.TimerTask;
|
import java.util.TimerTask;
|
||||||
|
import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
import se.koc.hal.HalContext;
|
import se.koc.hal.HalContext;
|
||||||
|
|
@ -29,18 +30,17 @@ public class DataDeletionDaemon extends TimerTask implements HalDaemon {
|
||||||
List<Sensor> sensorList = Sensor.getSensors(HalContext.getDB());
|
List<Sensor> sensorList = Sensor.getSensors(HalContext.getDB());
|
||||||
for(Sensor sensor : sensorList){
|
for(Sensor sensor : sensorList){
|
||||||
logger.fine("Deleting old data for sensor id: " + sensor.getId());
|
logger.fine("Deleting old data for sensor id: " + sensor.getId());
|
||||||
aggregateSensor(sensor.getId());
|
cleanupSensor(sensor.getId());
|
||||||
}
|
}
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void aggregateSensor(long sensorId) {
|
public void cleanupSensor(long sensorId) {
|
||||||
DBConnection db = HalContext.getDB();
|
DBConnection db = HalContext.getDB();
|
||||||
PreparedStatement stmt = null;
|
PreparedStatement stmt = null;
|
||||||
try {
|
try {
|
||||||
|
|
||||||
Long maxDBTimestamp = null;
|
Long maxDBTimestamp = null;
|
||||||
|
|
||||||
// delete too old 5 minute periods that already have been aggregated into hours
|
// delete too old 5 minute periods that already have been aggregated into hours
|
||||||
|
|
@ -51,6 +51,7 @@ public class DataDeletionDaemon extends TimerTask implements HalDaemon {
|
||||||
maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult<Long>());
|
maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult<Long>());
|
||||||
if(maxDBTimestamp == null)
|
if(maxDBTimestamp == null)
|
||||||
maxDBTimestamp = 0l;
|
maxDBTimestamp = 0l;
|
||||||
|
|
||||||
stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr"
|
stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr"
|
||||||
+" WHERE sensor_id == ? "
|
+" WHERE sensor_id == ? "
|
||||||
+ "AND timestamp_end < ? "
|
+ "AND timestamp_end < ? "
|
||||||
|
|
@ -60,7 +61,7 @@ public class DataDeletionDaemon extends TimerTask implements HalDaemon {
|
||||||
stmt.setLong(2, maxDBTimestamp);
|
stmt.setLong(2, maxDBTimestamp);
|
||||||
stmt.setLong(3, TimeConstants.FIVE_MINUTES_IN_MS-1);
|
stmt.setLong(3, TimeConstants.FIVE_MINUTES_IN_MS-1);
|
||||||
stmt.setLong(4, System.currentTimeMillis()-TimeConstants.DAY_IN_MS);
|
stmt.setLong(4, System.currentTimeMillis()-TimeConstants.DAY_IN_MS);
|
||||||
DBConnection.exec(stmt, new AggrDataDeletor(sensorId));
|
DBConnection.exec(stmt, new AggregateDataDeleter(sensorId));
|
||||||
|
|
||||||
// delete too old 1 hour periods that already have been aggregated into days
|
// delete too old 1 hour periods that already have been aggregated into days
|
||||||
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr"
|
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr"
|
||||||
|
|
@ -70,6 +71,7 @@ public class DataDeletionDaemon extends TimerTask implements HalDaemon {
|
||||||
maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult<Long>());
|
maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult<Long>());
|
||||||
if(maxDBTimestamp == null)
|
if(maxDBTimestamp == null)
|
||||||
maxDBTimestamp = 0l;
|
maxDBTimestamp = 0l;
|
||||||
|
|
||||||
stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr"
|
stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr"
|
||||||
+" WHERE sensor_id == ? "
|
+" WHERE sensor_id == ? "
|
||||||
+ "AND timestamp_end < ? "
|
+ "AND timestamp_end < ? "
|
||||||
|
|
@ -79,16 +81,16 @@ public class DataDeletionDaemon extends TimerTask implements HalDaemon {
|
||||||
stmt.setLong(2, maxDBTimestamp);
|
stmt.setLong(2, maxDBTimestamp);
|
||||||
stmt.setLong(3, TimeConstants.HOUR_IN_MS-1);
|
stmt.setLong(3, TimeConstants.HOUR_IN_MS-1);
|
||||||
stmt.setLong(4, System.currentTimeMillis()-TimeConstants.SEVEN_DAYS_IN_MS);
|
stmt.setLong(4, System.currentTimeMillis()-TimeConstants.SEVEN_DAYS_IN_MS);
|
||||||
DBConnection.exec(stmt, new AggrDataDeletor(sensorId));
|
DBConnection.exec(stmt, new AggregateDataDeleter(sensorId));
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
e.printStackTrace();
|
logger.log(Level.SEVERE, null, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class AggrDataDeletor implements SQLResultHandler<Object>{
|
private class AggregateDataDeleter implements SQLResultHandler<Object>{
|
||||||
private long sensorId = -1;
|
private long sensorId = -1;
|
||||||
|
|
||||||
public AggrDataDeletor(long sensorId){
|
public AggregateDataDeleter(long sensorId){
|
||||||
this.sensorId = sensorId;
|
this.sensorId = sensorId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -96,28 +98,21 @@ public class DataDeletionDaemon extends TimerTask implements HalDaemon {
|
||||||
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
|
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
|
||||||
try{
|
try{
|
||||||
HalContext.getDB().getConnection().setAutoCommit(false);
|
HalContext.getDB().getConnection().setAutoCommit(false);
|
||||||
boolean rollback = false;
|
|
||||||
PreparedStatement preparedDeleteStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?");
|
PreparedStatement preparedDeleteStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?");
|
||||||
while(result.next()){
|
while(result.next()){
|
||||||
if(sensorId != result.getInt("sensor_id")){
|
if(sensorId != result.getInt("sensor_id")){
|
||||||
logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")");
|
throw new IllegalArgumentException("Found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")");
|
||||||
rollback = true;
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
logger.finer("Deleted period: "+ result.getLong("timestamp_start"));
|
logger.finer("Deleting sensor aggregate entry timestamp: "+ result.getLong("timestamp_start") +" - "+ result.getLong("timestamp_end"));
|
||||||
preparedDeleteStmt.setInt(1, result.getInt("sensor_id"));
|
preparedDeleteStmt.setInt(1, result.getInt("sensor_id"));
|
||||||
preparedDeleteStmt.setLong(2, result.getLong("sequence_id"));
|
preparedDeleteStmt.setLong(2, result.getLong("sequence_id"));
|
||||||
preparedDeleteStmt.addBatch();
|
preparedDeleteStmt.addBatch();
|
||||||
}
|
}
|
||||||
if(!rollback){
|
|
||||||
DBConnection.execBatch(preparedDeleteStmt);
|
DBConnection.execBatch(preparedDeleteStmt);
|
||||||
HalContext.getDB().getConnection().commit();
|
HalContext.getDB().getConnection().commit();
|
||||||
}else{
|
|
||||||
HalContext.getDB().getConnection().rollback();
|
|
||||||
}
|
|
||||||
}catch(Exception e){
|
}catch(Exception e){
|
||||||
HalContext.getDB().getConnection().rollback();
|
HalContext.getDB().getConnection().rollback();
|
||||||
e.printStackTrace();
|
|
||||||
}finally{
|
}finally{
|
||||||
HalContext.getDB().getConnection().setAutoCommit(true);
|
HalContext.getDB().getConnection().setAutoCommit(true);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue