Refactoring code a lot.

- Common inner class in DataAggregatorDeamon for all period lenghts.
- Removed deletion of data from DataAggregator and
DataSynchronizationClient and put it in a new DataDeletionDeamon
- Moved time constants to a common TimeConstants class

Former-commit-id: 523ce7ad73b6edb4a507adcfa33b04742b170f25
This commit is contained in:
dcollin 2015-12-11 17:31:13 +01:00 committed by Daniel Collin
parent 04f1af3268
commit b421206ce3
10 changed files with 344 additions and 304 deletions

View file

@ -2,6 +2,7 @@ package se.koc.hal;
import se.koc.hal.deamon.DataAggregatorDaemon; import se.koc.hal.deamon.DataAggregatorDaemon;
import se.koc.hal.deamon.DataDeletionDaemon;
import se.koc.hal.deamon.DataSynchronizationClient; import se.koc.hal.deamon.DataSynchronizationClient;
import se.koc.hal.deamon.DataSynchronizationDaemon; import se.koc.hal.deamon.DataSynchronizationDaemon;
import se.koc.hal.deamon.HalDaemon; import se.koc.hal.deamon.HalDaemon;
@ -43,7 +44,8 @@ public class PowerChallenge {
daemons = new HalDaemon[]{ daemons = new HalDaemon[]{
new DataAggregatorDaemon(), new DataAggregatorDaemon(),
new DataSynchronizationDaemon(), new DataSynchronizationDaemon(),
new DataSynchronizationClient() new DataSynchronizationClient(),
new DataDeletionDaemon()
}; };
Timer daemonTimer = new Timer(); Timer daemonTimer = new Timer();
for(HalDaemon daemon : daemons){ for(HalDaemon daemon : daemons){

View file

@ -4,7 +4,6 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.Calendar;
import java.util.List; import java.util.List;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
@ -19,16 +18,9 @@ import zutil.log.LogUtil;
public class DataAggregatorDaemon extends TimerTask implements HalDaemon { public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
private static final Logger logger = LogUtil.getLogger(); private static final Logger logger = LogUtil.getLogger();
public static final long FIVE_MINUTES_IN_MS = 5 * 60 * 1000;
public static final long HOUR_IN_MS = FIVE_MINUTES_IN_MS * 12;
public static final long DAY_IN_MS = HOUR_IN_MS * 24;
private static final long HOUR_AGGREGATION_OFFSET = DAY_IN_MS;
private static final long DAY_AGGREGATION_OFFSET = DAY_IN_MS * 3;
public void initiate(Timer timer){ public void initiate(Timer timer){
timer.schedule(this, 0, FIVE_MINUTES_IN_MS); timer.schedule(this, 0, TimeConstants.FIVE_MINUTES_IN_MS);
} }
@Override @Override
@ -55,25 +47,25 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
maxDBTimestamp = 0l; maxDBTimestamp = 0l;
// 5 minute aggregation // 5 minute aggregation
long minPeriodTimestamp = getTimestampMinutePeriodStart(5, System.currentTimeMillis()); long minPeriodTimestamp = getTimestampPeriodStart(TimeConstants.FIVE_MINUTES_IN_MS, System.currentTimeMillis());
logger.fine("Calculating 5 min periods... (from:"+ maxDBTimestamp +", to:"+ minPeriodTimestamp +")"); logger.fine("Calculating 5 min periods... (from:"+ maxDBTimestamp +", to:"+ minPeriodTimestamp +")");
stmt = db.getPreparedStatement("SELECT * FROM sensor_data_raw" stmt = db.getPreparedStatement("SELECT *, 1 AS confidence, timestamp AS timestamp_start FROM sensor_data_raw"
+" WHERE sensor_id == ? AND ? < timestamp AND timestamp < ? " +" WHERE sensor_id == ? AND ? < timestamp AND timestamp < ? "
+" ORDER BY timestamp ASC"); +" ORDER BY timestamp ASC");
stmt.setLong(1, sensorId); stmt.setLong(1, sensorId);
stmt.setLong(2, maxDBTimestamp); stmt.setLong(2, maxDBTimestamp);
stmt.setLong(3, minPeriodTimestamp); stmt.setLong(3, minPeriodTimestamp);
DBConnection.exec(stmt, new FiveMinuteAggregator(sensorId)); DBConnection.exec(stmt, new DataAggregator(sensorId, TimeConstants.FIVE_MINUTES_IN_MS, 5));
// hour aggregation // hour aggregation
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr" stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr"
+" WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?"); +" WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?");
stmt.setLong(1, sensorId); stmt.setLong(1, sensorId);
stmt.setLong(2, HOUR_IN_MS-1); stmt.setLong(2, TimeConstants.HOUR_IN_MS-1);
maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult<Long>()); maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult<Long>());
if(maxDBTimestamp == null) if(maxDBTimestamp == null)
maxDBTimestamp = 0l; maxDBTimestamp = 0l;
long hourPeriodTimestamp = getTimestampMinutePeriodStart(60, System.currentTimeMillis()-HOUR_AGGREGATION_OFFSET); long hourPeriodTimestamp = getTimestampPeriodStart(TimeConstants.HOUR_IN_MS, System.currentTimeMillis());
logger.fine("Calculating hour periods... (from:"+ maxDBTimestamp +", to:"+ hourPeriodTimestamp +")"); logger.fine("Calculating hour periods... (from:"+ maxDBTimestamp +", to:"+ hourPeriodTimestamp +")");
stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr" stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr"
+" WHERE sensor_id == ? AND ? < timestamp_start AND timestamp_start < ? AND timestamp_end-timestamp_start == ?" +" WHERE sensor_id == ? AND ? < timestamp_start AND timestamp_start < ? AND timestamp_end-timestamp_start == ?"
@ -81,17 +73,17 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
stmt.setLong(1, sensorId); stmt.setLong(1, sensorId);
stmt.setLong(2, maxDBTimestamp); stmt.setLong(2, maxDBTimestamp);
stmt.setLong(3, hourPeriodTimestamp); stmt.setLong(3, hourPeriodTimestamp);
stmt.setLong(4, FIVE_MINUTES_IN_MS-1); stmt.setLong(4, TimeConstants.FIVE_MINUTES_IN_MS-1);
DBConnection.exec(stmt, new HourAggregator(sensorId)); DBConnection.exec(stmt, new DataAggregator(sensorId, TimeConstants.HOUR_IN_MS, 12));
// day aggregation // day aggregation
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?"); stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?");
stmt.setLong(1, sensorId); stmt.setLong(1, sensorId);
stmt.setLong(2, DAY_IN_MS-1); stmt.setLong(2, TimeConstants.DAY_IN_MS-1);
maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult<Long>()); maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult<Long>());
if(maxDBTimestamp == null) if(maxDBTimestamp == null)
maxDBTimestamp = 0l; maxDBTimestamp = 0l;
long dayPeriodTimestamp = getTimestampHourPeriodStart(24, System.currentTimeMillis()-DAY_AGGREGATION_OFFSET); long dayPeriodTimestamp = getTimestampPeriodStart(TimeConstants.DAY_IN_MS, System.currentTimeMillis());
logger.fine("Calculating day periods... (from:"+ maxDBTimestamp +", to:"+ dayPeriodTimestamp +")"); logger.fine("Calculating day periods... (from:"+ maxDBTimestamp +", to:"+ dayPeriodTimestamp +")");
stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr" stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr"
+" WHERE sensor_id == ? AND ? < timestamp_start AND timestamp_start < ? AND timestamp_end-timestamp_start == ?" +" WHERE sensor_id == ? AND ? < timestamp_start AND timestamp_start < ? AND timestamp_end-timestamp_start == ?"
@ -99,123 +91,54 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
stmt.setLong(1, sensorId); stmt.setLong(1, sensorId);
stmt.setLong(2, maxDBTimestamp); stmt.setLong(2, maxDBTimestamp);
stmt.setLong(3, dayPeriodTimestamp); stmt.setLong(3, dayPeriodTimestamp);
stmt.setLong(4, HOUR_IN_MS-1); stmt.setLong(4, TimeConstants.HOUR_IN_MS-1);
DBConnection.exec(stmt, new DayAggregator(sensorId)); DBConnection.exec(stmt, new DataAggregator(sensorId, TimeConstants.DAY_IN_MS, 24));
logger.fine("Done aggregation"); logger.fine("Done aggregation");
} catch (SQLException e) { } catch (SQLException e) {
e.printStackTrace(); e.printStackTrace();
} catch (ValueOutsideOfRangeException e) {
e.printStackTrace();
} }
} }
private static long getTimestampHourPeriodStart(int hour, long timestamp){ private class DataAggregator implements SQLResultHandler<Object>{
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(timestamp);
int currentMinute = cal.get(Calendar.HOUR_OF_DAY);
cal.set(Calendar.HOUR_OF_DAY, (currentMinute/hour) * hour);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
return cal.getTimeInMillis();
}
private static long getTimestampMinutePeriodStart(int min, long timestamp){
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(timestamp);
int currentMinute = cal.get(Calendar.MINUTE);
cal.set(Calendar.MINUTE, (currentMinute/min) * min);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
return cal.getTimeInMillis();
}
private class FiveMinuteAggregator implements SQLResultHandler<Object>{
private long sensorId = -1; private long sensorId = -1;
public FiveMinuteAggregator(long sensorId) { private long aggrTimeInMs = -1;
this.sensorId = sensorId; private int expectedSampleCount = -1;
}
@Override public DataAggregator(long sensorId, long aggrTimeInMs, int expectedSampleCount) {
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
long currentPeriodTimestamp = 0;
int sum = 0;
int count = 0;
int samples = 0;
long highestSequenceId = Sensor.getHighestSequenceId(sensorId);
HalContext.getDB().getConnection().setAutoCommit(false);
boolean error = false;
PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
while(result.next()){
if(sensorId != result.getInt("sensor_id")){
logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")");
error = true;
break;
}
long timestamp = result.getLong("timestamp");
long periodTimestamp = getTimestampMinutePeriodStart(5, timestamp);
if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){
float confidence = count / 5f;
logger.finer("Calculated minute period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ confidence+ " samples: " + samples);
preparedInsertStmt.setLong(1, sensorId);
preparedInsertStmt.setLong(2, ++highestSequenceId);
preparedInsertStmt.setLong(3, currentPeriodTimestamp);
preparedInsertStmt.setLong(4, currentPeriodTimestamp + FIVE_MINUTES_IN_MS - 1);
preparedInsertStmt.setInt(5, sum);
preparedInsertStmt.setFloat(6, confidence);
preparedInsertStmt.addBatch();
//DBConnection.exec(prepStmt);
// Reset variables
currentPeriodTimestamp = periodTimestamp;
sum = count = samples = 0;
}
if(currentPeriodTimestamp == 0) currentPeriodTimestamp = periodTimestamp;
sum += result.getInt("data");
++count;
++samples;
}
if(!error){
DBConnection.execBatch(preparedInsertStmt);
HalContext.getDB().getConnection().commit();
}else{
HalContext.getDB().getConnection().rollback();
}
HalContext.getDB().getConnection().setAutoCommit(true);
return null;
}
}
private class HourAggregator implements SQLResultHandler<Object>{
private long sensorId = -1;
public HourAggregator(long sensorId) {
this.sensorId = sensorId; this.sensorId = sensorId;
this.aggrTimeInMs = aggrTimeInMs;
this.expectedSampleCount = expectedSampleCount;
} }
@Override @Override
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException { public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
try{
long currentPeriodTimestamp = 0; long currentPeriodTimestamp = 0;
int sum = 0; int sum = 0;
float confidenceSum = 0; float confidenceSum = 0;
int samples = 0; int samples = 0;
long highestSequenceId = Sensor.getHighestSequenceId(sensorId); long highestSequenceId = Sensor.getHighestSequenceId(sensorId);
HalContext.getDB().getConnection().setAutoCommit(false); HalContext.getDB().getConnection().setAutoCommit(false);
boolean error = false; boolean rollback = false;
PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)"); PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
PreparedStatement preparedDeleteStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?");
while(result.next()){ while(result.next()){
if(sensorId != result.getInt("sensor_id")){ if(sensorId != result.getInt("sensor_id")){
logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")"); logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")");
error = true; rollback = true;
break; break;
} }
long timestamp = result.getLong("timestamp_start"); long timestamp = result.getLong("timestamp_start");
long periodTimestamp = getTimestampMinutePeriodStart(60, timestamp); long periodTimestamp = getTimestampPeriodStart(this.aggrTimeInMs, timestamp);
if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){ if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){
float aggrConfidence = confidenceSum / 12f; float aggrConfidence = confidenceSum / (float)this.expectedSampleCount;
logger.finer("Calculated hour period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ aggrConfidence+ " samples: " + samples); logger.finer("Calculated day period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ aggrConfidence+ " samples: " + samples);
preparedInsertStmt.setInt(1, result.getInt("sensor_id")); preparedInsertStmt.setInt(1, result.getInt("sensor_id"));
preparedInsertStmt.setLong(2, ++highestSequenceId); preparedInsertStmt.setLong(2, ++highestSequenceId);
preparedInsertStmt.setLong(3, currentPeriodTimestamp); preparedInsertStmt.setLong(3, currentPeriodTimestamp);
preparedInsertStmt.setLong(4, currentPeriodTimestamp + HOUR_IN_MS - 1); preparedInsertStmt.setLong(4, currentPeriodTimestamp + this.aggrTimeInMs - 1);
preparedInsertStmt.setInt(5, sum); preparedInsertStmt.setInt(5, sum);
preparedInsertStmt.setFloat(6, aggrConfidence); preparedInsertStmt.setFloat(6, aggrConfidence);
preparedInsertStmt.addBatch(); preparedInsertStmt.addBatch();
@ -228,81 +151,29 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
sum += result.getInt("data"); sum += result.getInt("data");
confidenceSum += result.getFloat("confidence"); confidenceSum += result.getFloat("confidence");
samples++; samples++;
preparedDeleteStmt.setInt(1, result.getInt("sensor_id"));
preparedDeleteStmt.setInt(2, result.getInt("sequence_id"));
preparedDeleteStmt.addBatch();
} }
if(!error){ if(!rollback){
DBConnection.execBatch(preparedInsertStmt); DBConnection.execBatch(preparedInsertStmt);
DBConnection.execBatch(preparedDeleteStmt);
HalContext.getDB().getConnection().commit(); HalContext.getDB().getConnection().commit();
}else{ }else{
HalContext.getDB().getConnection().rollback(); HalContext.getDB().getConnection().rollback();
} }
HalContext.getDB().getConnection().setAutoCommit(true); }catch(SQLException e){
return null;
}
}
private class DayAggregator implements SQLResultHandler<Object>{
private long sensorId = -1;
public DayAggregator(long sensorId) {
this.sensorId = sensorId;
}
@Override
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
long currentPeriodTimestamp = 0;
int sum = 0;
float confidenceSum = 0;
int samples = 0;
long highestSequenceId = Sensor.getHighestSequenceId(sensorId);
HalContext.getDB().getConnection().setAutoCommit(false);
boolean error = false;
PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
PreparedStatement preparedDeleteStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?");
while(result.next()){
if(sensorId != result.getInt("sensor_id")){
logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")");
error = true;
break;
}
long timestamp = result.getLong("timestamp_start");
long periodTimestamp = getTimestampHourPeriodStart(24, timestamp);
if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){
float aggrConfidence = confidenceSum / 24f;
logger.finer("Calculated day period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ aggrConfidence+ " samples: " + samples);
preparedInsertStmt.setInt(1, result.getInt("sensor_id"));
preparedInsertStmt.setLong(2, ++highestSequenceId);
preparedInsertStmt.setLong(3, currentPeriodTimestamp);
preparedInsertStmt.setLong(4, currentPeriodTimestamp + DAY_IN_MS - 1);
preparedInsertStmt.setInt(5, sum);
preparedInsertStmt.setFloat(6, aggrConfidence);
preparedInsertStmt.addBatch();
// Reset variables
currentPeriodTimestamp = periodTimestamp;
confidenceSum = sum = 0;
samples = 0;
}
if(currentPeriodTimestamp == 0) currentPeriodTimestamp = periodTimestamp;
sum += result.getInt("data");
confidenceSum += result.getFloat("confidence");
samples++;
preparedDeleteStmt.setInt(1, result.getInt("sensor_id"));
preparedDeleteStmt.setInt(2, result.getInt("sequence_id"));
preparedDeleteStmt.addBatch();
}
if(!error){
DBConnection.execBatch(preparedInsertStmt);
DBConnection.execBatch(preparedDeleteStmt);
HalContext.getDB().getConnection().commit();
}else{
HalContext.getDB().getConnection().rollback(); HalContext.getDB().getConnection().rollback();
} throw e;
} catch (ValueOutsideOfRangeException e) {
HalContext.getDB().getConnection().rollback();
e.printStackTrace();
}finally{
HalContext.getDB().getConnection().setAutoCommit(true); HalContext.getDB().getConnection().setAutoCommit(true);
}
return null; return null;
} }
} }
private static long getTimestampPeriodStart(long periodLengthInMs, long timestamp) throws ValueOutsideOfRangeException{
long tmp = timestamp % periodLengthInMs;
return timestamp - tmp;
}
} }

View file

@ -0,0 +1,131 @@
package se.koc.hal.deamon;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.logging.Logger;
import se.koc.hal.HalContext;
import se.koc.hal.struct.Sensor;
import zutil.db.DBConnection;
import zutil.db.SQLResultHandler;
import zutil.db.handler.SimpleSQLResult;
import zutil.log.LogUtil;
public class DataDeletionDaemon extends TimerTask implements HalDaemon {
private static final Logger logger = LogUtil.getLogger();
public void initiate(Timer timer){
timer.schedule(this, 5000, TimeConstants.FIVE_MINUTES_IN_MS);
}
@Override
public void run(){
try {
List<Sensor> sensorList = Sensor.getSensors(HalContext.getDB());
for(Sensor sensor : sensorList){
logger.fine("Deleting old data for sensor_id: " + sensor.getId());
aggregateSensor(sensor.getId());
}
} catch (SQLException e) {
e.printStackTrace();
}
}
public void aggregateSensor(long sensorId) {
DBConnection db = HalContext.getDB();
PreparedStatement stmt = null;
try {
Long maxDBTimestamp = null;
// delete too old 5 minute periods that already have been aggregated into hours
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr"
+" WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?");
stmt.setLong(1, sensorId);
stmt.setLong(2, TimeConstants.HOUR_IN_MS-1);
maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult<Long>());
if(maxDBTimestamp == null)
maxDBTimestamp = 0l;
stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr"
+" WHERE sensor_id == ? "
+ "AND timestamp_end < ? "
+ "AND timestamp_end-timestamp_start == ?"
+ "AND timestamp_end < ?");
stmt.setLong(1, sensorId);
stmt.setLong(2, maxDBTimestamp);
stmt.setLong(3, TimeConstants.FIVE_MINUTES_IN_MS-1);
stmt.setLong(4, System.currentTimeMillis()-TimeConstants.DAY_IN_MS);
DBConnection.exec(stmt, new AggrDataDeletor(sensorId));
// delete too old 1 hour periods that already have been aggregated into days
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr"
+" WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?");
stmt.setLong(1, sensorId);
stmt.setLong(2, TimeConstants.DAY_IN_MS-1);
maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult<Long>());
if(maxDBTimestamp == null)
maxDBTimestamp = 0l;
stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr"
+" WHERE sensor_id == ? "
+ "AND timestamp_end < ? "
+ "AND timestamp_end-timestamp_start == ?"
+ "AND timestamp_end < ?");
stmt.setLong(1, sensorId);
stmt.setLong(2, maxDBTimestamp);
stmt.setLong(3, TimeConstants.HOUR_IN_MS-1);
stmt.setLong(4, System.currentTimeMillis()-TimeConstants.SEVEN_DAYS_IN_MS);
DBConnection.exec(stmt, new AggrDataDeletor(sensorId));
logger.fine("Done deleting");
} catch (SQLException e) {
e.printStackTrace();
}
}
private class AggrDataDeletor implements SQLResultHandler<Object>{
private long sensorId = -1;
public AggrDataDeletor(long sensorId){
this.sensorId = sensorId;
}
@Override
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
try{
HalContext.getDB().getConnection().setAutoCommit(false);
boolean rollback = false;
PreparedStatement preparedDeleteStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?");
while(result.next()){
if(sensorId != result.getInt("sensor_id")){
logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")");
rollback = true;
break;
}
logger.finer("Deleted period: "+ result.getLong("timestamp_start"));
preparedDeleteStmt.setInt(1, result.getInt("sensor_id"));
preparedDeleteStmt.setLong(2, result.getLong("sequence_id"));
preparedDeleteStmt.addBatch();
}
if(!rollback){
DBConnection.execBatch(preparedDeleteStmt);
HalContext.getDB().getConnection().commit();
}else{
HalContext.getDB().getConnection().rollback();
}
}catch(Exception e){
HalContext.getDB().getConnection().rollback();
e.printStackTrace();
}finally{
HalContext.getDB().getConnection().setAutoCommit(true);
}
return null;
}
}
}

View file

@ -55,14 +55,7 @@ public class DataSynchronizationClient extends TimerTask implements HalDaemon{
SensorDataListDTO dataList = (SensorDataListDTO) in.readObject(); SensorDataListDTO dataList = (SensorDataListDTO) in.readObject();
for(SensorDataDTO data : dataList){ for(SensorDataDTO data : dataList){
PreparedStatement stmt = db.getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND ? <= timestamp_start AND timestamp_end <= ?"); PreparedStatement stmt = db.getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
stmt.setLong(1, sensor.getId());
stmt.setLong(2, data.timestampStart);
stmt.setLong(3, data.timestampEnd);
int deletions = DBConnection.exec(stmt);
if(deletions > 0)
logger.finer("Aggregate data replaced "+ deletions +" entries");
stmt = db.getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
stmt.setLong(1, sensor.getId()); stmt.setLong(1, sensor.getId());
stmt.setLong(2, data.sequenceId); stmt.setLong(2, data.sequenceId);
stmt.setLong(3, data.timestampStart); stmt.setLong(3, data.timestampStart);

View file

@ -0,0 +1,8 @@
package se.koc.hal.deamon;
public class TimeConstants {
public static final long FIVE_MINUTES_IN_MS = 5 * 60 * 1000;
public static final long HOUR_IN_MS = FIVE_MINUTES_IN_MS * 12;
public static final long DAY_IN_MS = HOUR_IN_MS * 24;
public static final long SEVEN_DAYS_IN_MS = DAY_IN_MS * 7;
}

View file

@ -0,0 +1,9 @@
package se.koc.hal.deamon;
public class ValueOutsideOfRangeException extends Exception {
public ValueOutsideOfRangeException(String string) {
super(string);
}
}

View file

@ -1,6 +1,5 @@
package se.koc.hal.page; package se.koc.hal.page;
import java.io.IOException;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
@ -9,13 +8,10 @@ import java.util.ArrayList;
import java.util.Map; import java.util.Map;
import se.koc.hal.HalContext; import se.koc.hal.HalContext;
import se.koc.hal.deamon.DataAggregatorDaemon; import se.koc.hal.deamon.TimeConstants;
import zutil.db.DBConnection; import zutil.db.DBConnection;
import zutil.db.SQLResultHandler; import zutil.db.SQLResultHandler;
import zutil.io.file.FileUtil; import zutil.io.file.FileUtil;
import zutil.net.http.HttpHeaderParser;
import zutil.net.http.HttpPage;
import zutil.net.http.HttpPrintStream;
import zutil.parser.Templator; import zutil.parser.Templator;
public class PCOverviewHttpPage extends HalHttpPage { public class PCOverviewHttpPage extends HalHttpPage {
@ -31,7 +27,6 @@ public class PCOverviewHttpPage extends HalHttpPage {
Map<String, String> request) Map<String, String> request)
throws Exception{ throws Exception{
DBConnection db = HalContext.getDB(); DBConnection db = HalContext.getDB();
PreparedStatement stmt = db.getPreparedStatement( PreparedStatement stmt = db.getPreparedStatement(
@ -40,15 +35,15 @@ public class PCOverviewHttpPage extends HalHttpPage {
+ " sensor_data_aggr.timestamp_end as timestamp_end," + " sensor_data_aggr.timestamp_end as timestamp_end,"
+ " sensor_data_aggr.data as data," + " sensor_data_aggr.data as data,"
+ " sensor_data_aggr.confidence as confidence," + " sensor_data_aggr.confidence as confidence,"
+ DataAggregatorDaemon.FIVE_MINUTES_IN_MS + " as period_length" + TimeConstants.FIVE_MINUTES_IN_MS + " as period_length"
+ " FROM sensor_data_aggr, user, sensor" + " FROM sensor_data_aggr, user, sensor"
+ " WHERE sensor.id = sensor_data_aggr.sensor_id" + " WHERE sensor.id = sensor_data_aggr.sensor_id"
+ " AND user.id = sensor.user_id" + " AND user.id = sensor.user_id"
+ " AND timestamp_end-timestamp_start == ?" + " AND timestamp_end-timestamp_start == ?"
+ " AND timestamp_start > ?" + " AND timestamp_start > ?"
+ " ORDER BY timestamp_start ASC"); + " ORDER BY timestamp_start ASC");
stmt.setLong(1, DataAggregatorDaemon.FIVE_MINUTES_IN_MS-1); stmt.setLong(1, TimeConstants.FIVE_MINUTES_IN_MS-1);
stmt.setLong(2, (System.currentTimeMillis() - DataAggregatorDaemon.DAY_IN_MS) ); stmt.setLong(2, (System.currentTimeMillis() - TimeConstants.DAY_IN_MS) );
ArrayList<PowerData> minDataList = DBConnection.exec(stmt , new SQLPowerDataBuilder()); ArrayList<PowerData> minDataList = DBConnection.exec(stmt , new SQLPowerDataBuilder());
stmt = db.getPreparedStatement( stmt = db.getPreparedStatement(
@ -57,15 +52,15 @@ public class PCOverviewHttpPage extends HalHttpPage {
+ " sensor_data_aggr.timestamp_end as timestamp_end," + " sensor_data_aggr.timestamp_end as timestamp_end,"
+ " sensor_data_aggr.data as data," + " sensor_data_aggr.data as data,"
+ " sensor_data_aggr.confidence as confidence," + " sensor_data_aggr.confidence as confidence,"
+ DataAggregatorDaemon.HOUR_IN_MS + " as period_length" + TimeConstants.HOUR_IN_MS + " as period_length"
+ " FROM sensor_data_aggr, user, sensor" + " FROM sensor_data_aggr, user, sensor"
+ " WHERE sensor.id = sensor_data_aggr.sensor_id" + " WHERE sensor.id = sensor_data_aggr.sensor_id"
+ " AND user.id = sensor.user_id" + " AND user.id = sensor.user_id"
+ " AND timestamp_end-timestamp_start == ?" + " AND timestamp_end-timestamp_start == ?"
+ " AND timestamp_start > ?" + " AND timestamp_start > ?"
+ " ORDER BY timestamp_start ASC"); + " ORDER BY timestamp_start ASC");
stmt.setLong(1, DataAggregatorDaemon.HOUR_IN_MS-1); stmt.setLong(1, TimeConstants.HOUR_IN_MS-1);
stmt.setLong(2, (System.currentTimeMillis() - 3*DataAggregatorDaemon.DAY_IN_MS) ); stmt.setLong(2, (System.currentTimeMillis() - 3*TimeConstants.DAY_IN_MS) );
ArrayList<PowerData> hourDataList = DBConnection.exec(stmt, new SQLPowerDataBuilder()); ArrayList<PowerData> hourDataList = DBConnection.exec(stmt, new SQLPowerDataBuilder());
stmt = db.getPreparedStatement( stmt = db.getPreparedStatement(
@ -74,24 +69,23 @@ public class PCOverviewHttpPage extends HalHttpPage {
+ " sensor_data_aggr.timestamp_end as timestamp_end," + " sensor_data_aggr.timestamp_end as timestamp_end,"
+ " sensor_data_aggr.data as data," + " sensor_data_aggr.data as data,"
+ " sensor_data_aggr.confidence as confidence," + " sensor_data_aggr.confidence as confidence,"
+ DataAggregatorDaemon.DAY_IN_MS + " as period_length" + TimeConstants.DAY_IN_MS + " as period_length"
+ " FROM sensor_data_aggr, user, sensor" + " FROM sensor_data_aggr, user, sensor"
+ " WHERE sensor.id = sensor_data_aggr.sensor_id" + " WHERE sensor.id = sensor_data_aggr.sensor_id"
+ " AND user.id = sensor.user_id" + " AND user.id = sensor.user_id"
+ " AND timestamp_end-timestamp_start == ?" + " AND timestamp_end-timestamp_start == ?"
+ " ORDER BY timestamp_start ASC"); + " ORDER BY timestamp_start ASC");
stmt.setLong(1, DataAggregatorDaemon.DAY_IN_MS-1); stmt.setLong(1, TimeConstants.DAY_IN_MS-1);
ArrayList<PowerData> dayDataList = DBConnection.exec(stmt, new SQLPowerDataBuilder()); ArrayList<PowerData> dayDataList = DBConnection.exec(stmt, new SQLPowerDataBuilder());
Templator tmpl = new Templator(FileUtil.find("web-resource/overview.tmpl")); Templator tmpl = new Templator(FileUtil.find("web-resource/index.html"));
tmpl.set("minData", minDataList); tmpl.set("minData", minDataList);
tmpl.set("hourData", hourDataList); tmpl.set("hourData", hourDataList);
tmpl.set("dayData", dayDataList); tmpl.set("dayData", dayDataList);
tmpl.set("username", new String[]{"Ziver", "Daniel"}); tmpl.set("username", new String[]{"Ziver", "Daniel"});
return tmpl; return tmpl;
} }
public static class PowerData{ public static class PowerData{

View file

@ -25,12 +25,20 @@ public class ImpulseTracker implements Runnable {
private Integer impulseCount = 0; private Integer impulseCount = 0;
private ExecutorService executorPool; private ExecutorService executorPool;
private final DBConnection db; private final DBConnection db;
private final int sensorId;
public static void main(String args[]) throws Exception { public static void main(String args[]) throws Exception {
new ImpulseTracker(); new ImpulseTracker(2);
} }
public ImpulseTracker() throws Exception{ /**
* Constructor
* @param sensorId The ID of this sensor. Will be written to the DB
* @throws Exception
*/
public ImpulseTracker(int sensorId) throws Exception{
this.sensorId = sensorId;
// create gpio controller // create gpio controller
final GpioController gpio = GpioFactory.getInstance(); final GpioController gpio = GpioFactory.getInstance();
@ -38,7 +46,7 @@ public class ImpulseTracker implements Runnable {
// provision gpio pin #02 as an input pin with its internal pull up resistor enabled // provision gpio pin #02 as an input pin with its internal pull up resistor enabled
final GpioPinDigitalInput irLightSensor = gpio.provisionDigitalInputPin(RaspiPin.GPIO_02, PinPullResistance.PULL_UP); final GpioPinDigitalInput irLightSensor = gpio.provisionDigitalInputPin(RaspiPin.GPIO_02, PinPullResistance.PULL_UP);
// create and register gpio pin listener // create and register gpio pin listener. May require the program to be run as sudo if the GPIO pin has not been exported
irLightSensor.addListener(new GpioPinListenerDigital() { irLightSensor.addListener(new GpioPinListenerDigital() {
@Override @Override
public void handleGpioPinDigitalStateChangeEvent(GpioPinDigitalStateChangeEvent event) { public void handleGpioPinDigitalStateChangeEvent(GpioPinDigitalStateChangeEvent event) {
@ -52,8 +60,10 @@ public class ImpulseTracker implements Runnable {
}); });
// setup a thread pool for executing database jobs
this.executorPool = Executors.newCachedThreadPool(); this.executorPool = Executors.newCachedThreadPool();
// Connect to the database
logger.info("Connecting to db..."); logger.info("Connecting to db...");
db = new DBConnection(DBConnection.DBMS.SQLite, "hal.db"); db = new DBConnection(DBConnection.DBMS.SQLite, "hal.db");
@ -64,6 +74,10 @@ public class ImpulseTracker implements Runnable {
} }
/**
* This loop will try to save the current time and the number of impulses seen every [IMPULSE_REPORT_TIMEOUT] milliseconds.
* Every iteration the actual loop time will be evaluated and used to calculate the time for the next loop.
*/
@Override @Override
public void run() { public void run() {
long startTime = System.nanoTime(); long startTime = System.nanoTime();
@ -71,7 +85,7 @@ public class ImpulseTracker implements Runnable {
impulseCount = 0; //reset the impulse count impulseCount = 0; //reset the impulse count
} }
while(true) { while(true) {
sleepNano(nanoSecondsSleep); sleepNano(nanoSecondsSleep); //sleep for some time. This variable will be modified every loop to compensate for the loop time spent.
int count = -1; int count = -1;
synchronized(impulseCount){ synchronized(impulseCount){
count = impulseCount; count = impulseCount;
@ -88,6 +102,10 @@ public class ImpulseTracker implements Runnable {
} }
} }
/**
* Sleep for [ns] nanoseconds
* @param ns
*/
private void sleepNano(long ns){ private void sleepNano(long ns){
//System.out.println("will go to sleep for " + ns + "ns"); //System.out.println("will go to sleep for " + ns + "ns");
try{ try{
@ -97,12 +115,21 @@ public class ImpulseTracker implements Runnable {
} }
} }
/**
* Saves the data to the database.
* This method should block the caller as short time as possible.
* Try to make the time spent in the method the same for every call (low variation).
*
* @param timestamp_end
* @param data
*/
private void save(final long timestamp_end, final int data){ private void save(final long timestamp_end, final int data){
//offload the timed loop by not doing the db interaction in this thread.
executorPool.execute(new Runnable(){ executorPool.execute(new Runnable(){
@Override @Override
public void run() { public void run() {
try { try {
db.exec("INSERT INTO sensor_data_raw(timestamp, sensor_id, data) VALUES("+timestamp_end+", "+2+", "+data+")"); db.exec("INSERT INTO sensor_data_raw(timestamp, sensor_id, data) VALUES("+timestamp_end+", "+ImpulseTracker.this.sensorId+", "+data+")");
} catch (SQLException e) { } catch (SQLException e) {
e.printStackTrace(); e.printStackTrace();
} }

View file

@ -35,6 +35,11 @@ public class Sensor extends DBBean{
return DBConnection.exec(stmt, DBBeanSQLResultHandler.createList(Sensor.class, db) ); return DBConnection.exec(stmt, DBBeanSQLResultHandler.createList(Sensor.class, db) );
} }
public static List<Sensor> getSensors(DBConnection db) throws SQLException{
PreparedStatement stmt = db.getPreparedStatement( "SELECT * FROM sensor" );
return DBConnection.exec(stmt, DBBeanSQLResultHandler.createList(Sensor.class, db) );
}
public static long getHighestSequenceId(long sensorId) throws SQLException{ public static long getHighestSequenceId(long sensorId) throws SQLException{
PreparedStatement stmt = HalContext.getDB().getPreparedStatement("SELECT MAX(sequence_id) FROM sensor_data_aggr WHERE sensor_id == ?"); PreparedStatement stmt = HalContext.getDB().getPreparedStatement("SELECT MAX(sequence_id) FROM sensor_data_aggr WHERE sensor_id == ?");

View file

@ -30,18 +30,18 @@
); );
chartData("hour-power-chart", chartData("hour-power-chart",
[ [
{ y: (Date.now()-3*24*60*60*1000) }, { y: (Date.now()-7*24*60*60*1000) },
{{#hourData}} {{#hourData}}
{ y: {{.timestamp}}, {{.username}}: {{.data}} }, { y: {{.timestamp}}, {{.username}}: {{.data}} },
{{/hourData}} {{/hourData}}
{ y: (Date.now()-24*60*60*1000) } { y: Date.now() }
] ]
); );
chartData("day-power-chart", chartData("day-power-chart",
[{{#dayData}} [{{#dayData}}
{ y: {{.timestamp}}, {{.username}}: {{.data}} }, { y: {{.timestamp}}, {{.username}}: {{.data}} },
{{/dayData}} {{/dayData}}
{ y: (Date.now()-3*24*60*60*1000) } { y: Date.now() }
] ]
); );
}); });