Some more refactoring
Former-commit-id: 84a2afdcddb4db3339a224f7a9fb2e22b7285e8f
This commit is contained in:
parent
2c1b4af586
commit
37686c2a79
4 changed files with 35 additions and 46 deletions
|
|
@ -7,6 +7,7 @@ import java.sql.Statement;
|
|||
import java.util.List;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import se.koc.hal.HalContext;
|
||||
|
|
@ -31,8 +32,9 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
|
|||
logger.fine("Aggregating sensor_id: " + sensor.getId());
|
||||
aggregateSensor(sensor.getId());
|
||||
}
|
||||
logger.fine("Aggregation Done");
|
||||
} catch (SQLException e) {
|
||||
e.printStackTrace();
|
||||
logger.log(Level.SEVERE, null, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -67,6 +69,7 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
|
|||
maxDBTimestamp = 0l;
|
||||
long hourPeriodTimestamp = getTimestampPeriodStart(TimeConstants.HOUR_IN_MS, System.currentTimeMillis());
|
||||
logger.fine("Calculating hour periods... (from:"+ maxDBTimestamp +", to:"+ hourPeriodTimestamp +")");
|
||||
|
||||
stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr"
|
||||
+" WHERE sensor_id == ? AND ? < timestamp_start AND timestamp_start < ? AND timestamp_end-timestamp_start == ?"
|
||||
+" ORDER BY timestamp_start ASC");
|
||||
|
|
@ -85,6 +88,7 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
|
|||
maxDBTimestamp = 0l;
|
||||
long dayPeriodTimestamp = getTimestampPeriodStart(TimeConstants.DAY_IN_MS, System.currentTimeMillis());
|
||||
logger.fine("Calculating day periods... (from:"+ maxDBTimestamp +", to:"+ dayPeriodTimestamp +")");
|
||||
|
||||
stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr"
|
||||
+" WHERE sensor_id == ? AND ? < timestamp_start AND timestamp_start < ? AND timestamp_end-timestamp_start == ?"
|
||||
+" ORDER BY timestamp_start ASC");
|
||||
|
|
@ -93,12 +97,9 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
|
|||
stmt.setLong(3, dayPeriodTimestamp);
|
||||
stmt.setLong(4, TimeConstants.HOUR_IN_MS-1);
|
||||
DBConnection.exec(stmt, new DataAggregator(sensorId, TimeConstants.DAY_IN_MS, 24));
|
||||
|
||||
logger.fine("Done aggregation");
|
||||
|
||||
} catch (SQLException e) {
|
||||
e.printStackTrace();
|
||||
} catch (ValueOutsideOfRangeException e) {
|
||||
e.printStackTrace();
|
||||
logger.log(Level.SEVERE, null, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -122,13 +123,10 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
|
|||
int samples = 0;
|
||||
long highestSequenceId = Sensor.getHighestSequenceId(sensorId);
|
||||
HalContext.getDB().getConnection().setAutoCommit(false);
|
||||
boolean rollback = false;
|
||||
PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
|
||||
while(result.next()){
|
||||
if(sensorId != result.getInt("sensor_id")){
|
||||
logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")");
|
||||
rollback = true;
|
||||
break;
|
||||
throw new IllegalArgumentException("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")");
|
||||
}
|
||||
long timestamp = result.getLong("timestamp_start");
|
||||
long periodTimestamp = getTimestampPeriodStart(this.aggrTimeInMs, timestamp);
|
||||
|
|
@ -152,18 +150,13 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
|
|||
confidenceSum += result.getFloat("confidence");
|
||||
samples++;
|
||||
}
|
||||
if(!rollback){
|
||||
DBConnection.execBatch(preparedInsertStmt);
|
||||
HalContext.getDB().getConnection().commit();
|
||||
}else{
|
||||
HalContext.getDB().getConnection().rollback();
|
||||
}
|
||||
}catch(SQLException e){
|
||||
|
||||
DBConnection.execBatch(preparedInsertStmt);
|
||||
HalContext.getDB().getConnection().commit();
|
||||
|
||||
}catch(Exception e){
|
||||
HalContext.getDB().getConnection().rollback();
|
||||
throw e;
|
||||
} catch (ValueOutsideOfRangeException e) {
|
||||
HalContext.getDB().getConnection().rollback();
|
||||
e.printStackTrace();
|
||||
}finally{
|
||||
HalContext.getDB().getConnection().setAutoCommit(true);
|
||||
}
|
||||
|
|
@ -171,7 +164,7 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
|
|||
}
|
||||
}
|
||||
|
||||
private static long getTimestampPeriodStart(long periodLengthInMs, long timestamp) throws ValueOutsideOfRangeException{
|
||||
private static long getTimestampPeriodStart(long periodLengthInMs, long timestamp){
|
||||
long tmp = timestamp % periodLengthInMs;
|
||||
return timestamp - tmp;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ public class DataDeletionDaemon extends TimerTask implements HalDaemon {
|
|||
logger.fine("Deleting old data for sensor id: " + sensor.getId());
|
||||
cleanupSensor(sensor.getId());
|
||||
}
|
||||
logger.fine("Data cleanup done");
|
||||
} catch (SQLException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -48,23 +48,27 @@ public class DataSynchronizationClient extends TimerTask implements HalDaemon{
|
|||
|
||||
List<Sensor> sensors = Sensor.getSensors(db, user);
|
||||
for(Sensor sensor : sensors){
|
||||
PeerDataReqDTO req = new PeerDataReqDTO();
|
||||
req.sensorId = sensor.getExternalId();
|
||||
req.offsetSequenceId = Sensor.getHighestSequenceId(sensor.getId());
|
||||
out.writeObject(req);
|
||||
|
||||
SensorDataListDTO dataList = (SensorDataListDTO) in.readObject();
|
||||
for(SensorDataDTO data : dataList){
|
||||
PreparedStatement stmt = db.getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
|
||||
stmt.setLong(1, sensor.getId());
|
||||
stmt.setLong(2, data.sequenceId);
|
||||
stmt.setLong(3, data.timestampStart);
|
||||
stmt.setLong(4, data.timestampEnd);
|
||||
stmt.setInt(5, data.data);
|
||||
stmt.setFloat(6, data.confidence);
|
||||
DBConnection.exec(stmt);
|
||||
if(sensor.isSynced()) {
|
||||
PeerDataReqDTO req = new PeerDataReqDTO();
|
||||
req.sensorId = sensor.getExternalId();
|
||||
req.offsetSequenceId = Sensor.getHighestSequenceId(sensor.getId());
|
||||
out.writeObject(req);
|
||||
|
||||
SensorDataListDTO dataList = (SensorDataListDTO) in.readObject();
|
||||
for (SensorDataDTO data : dataList) {
|
||||
PreparedStatement stmt = db.getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
|
||||
stmt.setLong(1, sensor.getId());
|
||||
stmt.setLong(2, data.sequenceId);
|
||||
stmt.setLong(3, data.timestampStart);
|
||||
stmt.setLong(4, data.timestampEnd);
|
||||
stmt.setInt(5, data.data);
|
||||
stmt.setFloat(6, data.confidence);
|
||||
DBConnection.exec(stmt);
|
||||
}
|
||||
logger.fine("Stored " + dataList.size() + " entries for sensor " + sensor.getId() + " from " + user.getUserName());
|
||||
}
|
||||
logger.fine("Stored " + dataList.size() + " entries for sensor " + sensor.getId() + " from " + user.getUserName());
|
||||
else
|
||||
logger.fine("Skipped sensor " + sensor.getId());
|
||||
}
|
||||
out.writeObject(null);
|
||||
out.close();
|
||||
|
|
|
|||
|
|
@ -1,9 +0,0 @@
|
|||
package se.koc.hal.deamon;
|
||||
|
||||
public class ValueOutsideOfRangeException extends Exception {
|
||||
|
||||
public ValueOutsideOfRangeException(String string) {
|
||||
super(string);
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue