SensorDataAggreagtionDeamon fix+updates:

Added test code.
Updated some taces.
Resolved issue where the aggreagation deamon missed to aggregate the latest complete period.
This commit is contained in:
Daniel Collin 2016-02-18 14:50:24 +01:00
parent 77a2b9ccb4
commit 776f09899d
5 changed files with 142 additions and 33 deletions

View file

@ -188,5 +188,13 @@ public class HalContext {
public static DBConnection getDB(){ public static DBConnection getDB(){
return db; return db;
} }
/**
* For testing purposes.
* @param db
*/
public static void setDB(DBConnection db){
HalContext.db = db;
}
} }

View file

@ -61,17 +61,19 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
} }
logger.fine("The sensor is of type: " + sensor.getDeviceData().getClass().getName()); logger.fine("The sensor is of type: " + sensor.getDeviceData().getClass().getName());
logger.fine("aggregating raw data up to a day old into five minute periods"); long aggregationStartTime = System.currentTimeMillis();
aggregateRawData(sensor, AggregationPeriodLength.FIVE_MINUTES, UTCTimeUtility.DAY_IN_MS, 5);
logger.fine("aggregating raw data up to a week old into one hour periods"); logger.fine("Aggregating raw data up to a day old into five minute periods");
aggregateRawData(sensor, AggregationPeriodLength.HOUR, UTCTimeUtility.WEEK_IN_MS, 60); aggregateRawData(sensor, AggregationPeriodLength.FIVE_MINUTES, UTCTimeUtility.DAY_IN_MS, 5, aggregationStartTime);
logger.fine("aggregating raw data into one day periods"); logger.fine("Aggregating raw data up to a week old into one hour periods");
aggregateRawData(sensor, AggregationPeriodLength.DAY, UTCTimeUtility.INFINITY, 60*24); aggregateRawData(sensor, AggregationPeriodLength.HOUR, UTCTimeUtility.WEEK_IN_MS, 60, aggregationStartTime);
logger.fine("aggregating raw data into one week periods"); logger.fine("Aggregating raw data into one day periods");
aggregateRawData(sensor, AggregationPeriodLength.WEEK, UTCTimeUtility.INFINITY, 60*24*7); aggregateRawData(sensor, AggregationPeriodLength.DAY, UTCTimeUtility.INFINITY, 60*24, aggregationStartTime);
logger.fine("Aggregating raw data into one week periods");
aggregateRawData(sensor, AggregationPeriodLength.WEEK, UTCTimeUtility.INFINITY, 60*24*7, aggregationStartTime);
} }
/** /**
@ -80,7 +82,7 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
* @param ageLimitInMs Only aggregate up to this age * @param ageLimitInMs Only aggregate up to this age
* @param toPeriodSizeInMs The period length in ms to aggregate to * @param toPeriodSizeInMs The period length in ms to aggregate to
*/ */
private void aggregateRawData(Sensor sensor, AggregationPeriodLength aggrPeriodLength, long ageLimitInMs, int expectedSampleCount){ private void aggregateRawData(Sensor sensor, AggregationPeriodLength aggrPeriodLength, long ageLimitInMs, int expectedSampleCount, long aggregationStartTime){
long sensorId = sensor.getId(); long sensorId = sensor.getId();
AggregationMethod aggrMethod = sensor.getDeviceData().getAggregationMethod(); AggregationMethod aggrMethod = sensor.getDeviceData().getAggregationMethod();
DBConnection db = HalContext.getDB(); DBConnection db = HalContext.getDB();
@ -105,10 +107,10 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
if(maxTimestampFoundForSensor == null) if(maxTimestampFoundForSensor == null)
maxTimestampFoundForSensor = 0l; maxTimestampFoundForSensor = 0l;
long latestCompletePeriodEndTimestamp = new UTCTimePeriod(System.currentTimeMillis(), aggrPeriodLength).getPreviosPeriod().getEndTimestamp(); long latestCompletePeriodEndTimestamp = new UTCTimePeriod(aggregationStartTime, aggrPeriodLength).getPreviosPeriod().getEndTimestamp();
long oldestPeriodStartTimestamp = new UTCTimePeriod(System.currentTimeMillis()-ageLimitInMs, aggrPeriodLength).getStartTimestamp(); long oldestPeriodStartTimestamp = new UTCTimePeriod(aggregationStartTime-ageLimitInMs, aggrPeriodLength).getStartTimestamp();
logger.fine("Calculating periods... (from:"+ maxTimestampFoundForSensor +", to:"+ latestCompletePeriodEndTimestamp +") with expected sample count: " + expectedSampleCount); logger.fine("evaluating period: "+ maxTimestampFoundForSensor + "=>" + latestCompletePeriodEndTimestamp + " (" + UTCTimeUtility.getDateString(maxTimestampFoundForSensor) + "=>" + UTCTimeUtility.getDateString(latestCompletePeriodEndTimestamp) + ") with expected sample count: " + expectedSampleCount);
stmt = db.getPreparedStatement("SELECT *, 1 AS confidence FROM sensor_data_raw" stmt = db.getPreparedStatement("SELECT *, 1 AS confidence FROM sensor_data_raw"
+" WHERE sensor_id == ?" +" WHERE sensor_id == ?"
@ -120,7 +122,7 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
stmt.setLong(2, maxTimestampFoundForSensor); stmt.setLong(2, maxTimestampFoundForSensor);
stmt.setLong(3, latestCompletePeriodEndTimestamp); stmt.setLong(3, latestCompletePeriodEndTimestamp);
stmt.setLong(4, oldestPeriodStartTimestamp); stmt.setLong(4, oldestPeriodStartTimestamp);
DBConnection.exec(stmt, new DataAggregator(sensorId, aggrPeriodLength, expectedSampleCount, aggrMethod)); DBConnection.exec(stmt, new DataAggregator(sensorId, aggrPeriodLength, expectedSampleCount, aggrMethod, aggregationStartTime));
} catch (SQLException e) { } catch (SQLException e) {
logger.log(Level.SEVERE, null, e); logger.log(Level.SEVERE, null, e);
} }
@ -134,12 +136,14 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
private final AggregationPeriodLength aggrPeriodLength; private final AggregationPeriodLength aggrPeriodLength;
private final int expectedSampleCount; private final int expectedSampleCount;
private final AggregationMethod aggrMethod; private final AggregationMethod aggrMethod;
private final long aggregationStartTime;
public DataAggregator(long sensorId, AggregationPeriodLength aggrPeriodLength, int expectedSampleCount, AggregationMethod aggrMethod) { public DataAggregator(long sensorId, AggregationPeriodLength aggrPeriodLength, int expectedSampleCount, AggregationMethod aggrMethod, long aggregationStartTime) {
this.sensorId = sensorId; this.sensorId = sensorId;
this.aggrPeriodLength = aggrPeriodLength; this.aggrPeriodLength = aggrPeriodLength;
this.expectedSampleCount = expectedSampleCount; this.expectedSampleCount = expectedSampleCount;
this.aggrMethod = aggrMethod; this.aggrMethod = aggrMethod;
this.aggregationStartTime = aggregationStartTime;
} }
@Override @Override
@ -164,22 +168,9 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
if(currentPeriod == null) if(currentPeriod == null)
currentPeriod = dataPeriod; currentPeriod = dataPeriod;
if(!dataPeriod.equals(currentPeriod)){ if(!dataPeriod.equals(currentPeriod)){
float aggrConfidence = confidenceSum / (float)this.expectedSampleCount; saveData(preparedInsertStmt, confidenceSum, sum, samples, currentPeriod, ++highestSequenceId);
float data = -1;
switch(aggrMethod){
case SUM: data = sum; break;
case AVERAGE: data = sum/samples; break;
}
logger.finer("Calculated period starting at timestamp: " + currentPeriod.getStartTimestamp() + ", data: " + sum + ", confidence: " + aggrConfidence + ", samples: " + samples + ", aggrMethod: " + aggrMethod);
preparedInsertStmt.setInt(1, result.getInt("sensor_id"));
preparedInsertStmt.setLong(2, ++highestSequenceId);
preparedInsertStmt.setLong(3, currentPeriod.getStartTimestamp());
preparedInsertStmt.setLong(4, currentPeriod.getEndTimestamp());
preparedInsertStmt.setFloat(5, data);
preparedInsertStmt.setFloat(6, aggrConfidence);
preparedInsertStmt.addBatch();
// Reset variables // Reset variables
currentPeriod = dataPeriod; currentPeriod = dataPeriod;
@ -191,6 +182,12 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
confidenceSum += result.getFloat("confidence"); confidenceSum += result.getFloat("confidence");
++samples; ++samples;
} }
//check if the last period is complete and also should be aggregated
if(currentPeriod != null && currentPeriod.getEndTimestamp() <= new UTCTimePeriod(aggregationStartTime, aggrPeriodLength).getPreviosPeriod().getEndTimestamp()){
saveData(preparedInsertStmt, confidenceSum, sum, samples, currentPeriod, ++highestSequenceId);
}
DBConnection.execBatch(preparedInsertStmt); DBConnection.execBatch(preparedInsertStmt);
HalContext.getDB().getConnection().commit(); HalContext.getDB().getConnection().commit();
@ -201,7 +198,26 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
HalContext.getDB().getConnection().setAutoCommit(true); HalContext.getDB().getConnection().setAutoCommit(true);
} }
return null; return null;
} }
private void saveData(PreparedStatement preparedInsertStmt, float confidenceSum, float sum, int samples, UTCTimePeriod currentPeriod, long sequenceId) throws SQLException{
float aggrConfidence = confidenceSum / (float)this.expectedSampleCount;
float data = -1;
switch(aggrMethod){
case SUM: data = sum; break;
case AVERAGE: data = sum/samples; break;
}
logger.finer("saved period: " + currentPeriod + ", data: " + sum + ", confidence: " + aggrConfidence + ", samples: " + samples + ", aggrMethod: " + aggrMethod);
preparedInsertStmt.setLong(1, sensorId);
preparedInsertStmt.setLong(2, sequenceId);
preparedInsertStmt.setLong(3, currentPeriod.getStartTimestamp());
preparedInsertStmt.setLong(4, currentPeriod.getEndTimestamp());
preparedInsertStmt.setFloat(5, data);
preparedInsertStmt.setFloat(6, aggrConfidence);
preparedInsertStmt.addBatch();
}
} }
} }

View file

@ -1,7 +1,5 @@
package se.hal.plugin.raspberry; package se.hal.plugin.raspberry;
import com.pi4j.io.gpio.Pin;
import se.hal.intf.HalSensorController; import se.hal.intf.HalSensorController;
import se.hal.struct.PowerConsumptionSensorData; import se.hal.struct.PowerConsumptionSensorData;
import zutil.ui.Configurator; import zutil.ui.Configurator;

View file

@ -45,4 +45,8 @@ public class UTCTimePeriod{
return false; return false;
} }
public String toString(){
return start + "=>" + end + " (" + UTCTimeUtility.getDateString(start) + "=>" + UTCTimeUtility.getDateString(end) + ")";
}
} }

View file

@ -0,0 +1,83 @@
package se.hal.deamon;
import java.sql.PreparedStatement;
import org.junit.Before;
import org.junit.Test;
import se.hal.HalContext;
import se.hal.util.UTCTimeUtility;
import zutil.db.DBConnection;
import zutil.db.DBUpgradeHandler;
import zutil.log.LogUtil;
public class SensorDataAggregationDeamonTest {
private static final String DEFAULT_DB_FILE = "hal-default.db";
private static DBConnection db;
@Before
public void setupDatabase() throws Exception{
System.out.println("-----------------------------------------------");
//setup loggin
System.out.println("Setting up logging");
LogUtil.readConfiguration("logging.properties");
//create in memory database
System.out.println("Creating a in-memory databse for test");
db = new DBConnection(DBConnection.DBMS.SQLite, ":memory:");
HalContext.setDB(db);
//upgrade the database to latest version
System.out.println("Upgrading in-memory databse to latest version");
DBConnection referenceDB = new DBConnection(DBConnection.DBMS.SQLite, DEFAULT_DB_FILE);
final DBUpgradeHandler handler = new DBUpgradeHandler(referenceDB);
handler.addIgnoredTable("db_version_history");
handler.addIgnoredTable("sqlite_sequence"); //sqlite internal
handler.setTargetDB(db);
handler.upgrade();
//populate the database with data
System.out.println("Adding user to database");
db.exec("INSERT INTO user(id, external, username) VALUES(222, 0, 'test')"); //adding user
System.out.println("Adding sensor to database");
db.exec("INSERT INTO sensor(id, user_id, external_id, type) VALUES(111, 222, 333, 'se.hal.plugin.tellstick.protocols.Oregon0x1A2D')"); //adding sensor
System.out.println("Generating raw data and saving it to the database...");
PreparedStatement stmt = db.getPreparedStatement("INSERT INTO sensor_data_raw (timestamp, sensor_id, data) VALUES(?, ?, ?)");
try{
db.getConnection().setAutoCommit(false);
long startTime = System.currentTimeMillis();
for(int i = 0; i < 100000; ++i){
stmt.setLong(1, startTime-(UTCTimeUtility.MINUTE_IN_MS*i));
stmt.setLong(2, 111);
stmt.setFloat(3, 7.323f);
stmt.addBatch();
}
DBConnection.execBatch(stmt);
db.getConnection().commit();
}catch(Exception e){
db.getConnection().rollback();
throw e;
}finally{
db.getConnection().setAutoCommit(true);
}
System.out.println("Ready for test!");
}
@Test
public void testAggregation(){
System.out.println("Start testing raw data aggregation");
SensorDataAggregatorDaemon aggrDeamon = new SensorDataAggregatorDaemon();
aggrDeamon.run();
//TODO: verify the aggregation
System.out.println("Finnished testing raw data aggregation");
}
}