diff --git a/src/se/hal/HalContext.java b/src/se/hal/HalContext.java index 54bcb32d..8d45ca29 100755 --- a/src/se/hal/HalContext.java +++ b/src/se/hal/HalContext.java @@ -188,5 +188,13 @@ public class HalContext { public static DBConnection getDB(){ return db; } + + /** + * For testing purposes. + * @param db + */ + public static void setDB(DBConnection db){ + HalContext.db = db; + } } diff --git a/src/se/hal/deamon/SensorDataAggregatorDaemon.java b/src/se/hal/deamon/SensorDataAggregatorDaemon.java index 19d288b1..16d0983e 100755 --- a/src/se/hal/deamon/SensorDataAggregatorDaemon.java +++ b/src/se/hal/deamon/SensorDataAggregatorDaemon.java @@ -61,17 +61,19 @@ public class SensorDataAggregatorDaemon implements HalDaemon { } logger.fine("The sensor is of type: " + sensor.getDeviceData().getClass().getName()); - logger.fine("aggregating raw data up to a day old into five minute periods"); - aggregateRawData(sensor, AggregationPeriodLength.FIVE_MINUTES, UTCTimeUtility.DAY_IN_MS, 5); + long aggregationStartTime = System.currentTimeMillis(); - logger.fine("aggregating raw data up to a week old into one hour periods"); - aggregateRawData(sensor, AggregationPeriodLength.HOUR, UTCTimeUtility.WEEK_IN_MS, 60); + logger.fine("Aggregating raw data up to a day old into five minute periods"); + aggregateRawData(sensor, AggregationPeriodLength.FIVE_MINUTES, UTCTimeUtility.DAY_IN_MS, 5, aggregationStartTime); - logger.fine("aggregating raw data into one day periods"); - aggregateRawData(sensor, AggregationPeriodLength.DAY, UTCTimeUtility.INFINITY, 60*24); + logger.fine("Aggregating raw data up to a week old into one hour periods"); + aggregateRawData(sensor, AggregationPeriodLength.HOUR, UTCTimeUtility.WEEK_IN_MS, 60, aggregationStartTime); - logger.fine("aggregating raw data into one week periods"); - aggregateRawData(sensor, AggregationPeriodLength.WEEK, UTCTimeUtility.INFINITY, 60*24*7); + logger.fine("Aggregating raw data into one day periods"); + aggregateRawData(sensor, AggregationPeriodLength.DAY, UTCTimeUtility.INFINITY, 60*24, aggregationStartTime); + + logger.fine("Aggregating raw data into one week periods"); + aggregateRawData(sensor, AggregationPeriodLength.WEEK, UTCTimeUtility.INFINITY, 60*24*7, aggregationStartTime); } /** @@ -80,7 +82,7 @@ public class SensorDataAggregatorDaemon implements HalDaemon { * @param ageLimitInMs Only aggregate up to this age * @param toPeriodSizeInMs The period length in ms to aggregate to */ - private void aggregateRawData(Sensor sensor, AggregationPeriodLength aggrPeriodLength, long ageLimitInMs, int expectedSampleCount){ + private void aggregateRawData(Sensor sensor, AggregationPeriodLength aggrPeriodLength, long ageLimitInMs, int expectedSampleCount, long aggregationStartTime){ long sensorId = sensor.getId(); AggregationMethod aggrMethod = sensor.getDeviceData().getAggregationMethod(); DBConnection db = HalContext.getDB(); @@ -105,10 +107,10 @@ public class SensorDataAggregatorDaemon implements HalDaemon { if(maxTimestampFoundForSensor == null) maxTimestampFoundForSensor = 0l; - long latestCompletePeriodEndTimestamp = new UTCTimePeriod(System.currentTimeMillis(), aggrPeriodLength).getPreviosPeriod().getEndTimestamp(); - long oldestPeriodStartTimestamp = new UTCTimePeriod(System.currentTimeMillis()-ageLimitInMs, aggrPeriodLength).getStartTimestamp(); + long latestCompletePeriodEndTimestamp = new UTCTimePeriod(aggregationStartTime, aggrPeriodLength).getPreviosPeriod().getEndTimestamp(); + long oldestPeriodStartTimestamp = new UTCTimePeriod(aggregationStartTime-ageLimitInMs, aggrPeriodLength).getStartTimestamp(); - logger.fine("Calculating periods... (from:"+ maxTimestampFoundForSensor +", to:"+ latestCompletePeriodEndTimestamp +") with expected sample count: " + expectedSampleCount); + logger.fine("evaluating period: "+ maxTimestampFoundForSensor + "=>" + latestCompletePeriodEndTimestamp + " (" + UTCTimeUtility.getDateString(maxTimestampFoundForSensor) + "=>" + UTCTimeUtility.getDateString(latestCompletePeriodEndTimestamp) + ") with expected sample count: " + expectedSampleCount); stmt = db.getPreparedStatement("SELECT *, 1 AS confidence FROM sensor_data_raw" +" WHERE sensor_id == ?" @@ -120,7 +122,7 @@ public class SensorDataAggregatorDaemon implements HalDaemon { stmt.setLong(2, maxTimestampFoundForSensor); stmt.setLong(3, latestCompletePeriodEndTimestamp); stmt.setLong(4, oldestPeriodStartTimestamp); - DBConnection.exec(stmt, new DataAggregator(sensorId, aggrPeriodLength, expectedSampleCount, aggrMethod)); + DBConnection.exec(stmt, new DataAggregator(sensorId, aggrPeriodLength, expectedSampleCount, aggrMethod, aggregationStartTime)); } catch (SQLException e) { logger.log(Level.SEVERE, null, e); } @@ -134,12 +136,14 @@ public class SensorDataAggregatorDaemon implements HalDaemon { private final AggregationPeriodLength aggrPeriodLength; private final int expectedSampleCount; private final AggregationMethod aggrMethod; + private final long aggregationStartTime; - public DataAggregator(long sensorId, AggregationPeriodLength aggrPeriodLength, int expectedSampleCount, AggregationMethod aggrMethod) { + public DataAggregator(long sensorId, AggregationPeriodLength aggrPeriodLength, int expectedSampleCount, AggregationMethod aggrMethod, long aggregationStartTime) { this.sensorId = sensorId; this.aggrPeriodLength = aggrPeriodLength; this.expectedSampleCount = expectedSampleCount; this.aggrMethod = aggrMethod; + this.aggregationStartTime = aggregationStartTime; } @Override @@ -164,22 +168,9 @@ public class SensorDataAggregatorDaemon implements HalDaemon { if(currentPeriod == null) currentPeriod = dataPeriod; - + if(!dataPeriod.equals(currentPeriod)){ - float aggrConfidence = confidenceSum / (float)this.expectedSampleCount; - float data = -1; - switch(aggrMethod){ - case SUM: data = sum; break; - case AVERAGE: data = sum/samples; break; - } - logger.finer("Calculated period starting at timestamp: " + currentPeriod.getStartTimestamp() + ", data: " + sum + ", confidence: " + aggrConfidence + ", samples: " + samples + ", aggrMethod: " + aggrMethod); - preparedInsertStmt.setInt(1, result.getInt("sensor_id")); - preparedInsertStmt.setLong(2, ++highestSequenceId); - preparedInsertStmt.setLong(3, currentPeriod.getStartTimestamp()); - preparedInsertStmt.setLong(4, currentPeriod.getEndTimestamp()); - preparedInsertStmt.setFloat(5, data); - preparedInsertStmt.setFloat(6, aggrConfidence); - preparedInsertStmt.addBatch(); + saveData(preparedInsertStmt, confidenceSum, sum, samples, currentPeriod, ++highestSequenceId); // Reset variables currentPeriod = dataPeriod; @@ -191,6 +182,12 @@ public class SensorDataAggregatorDaemon implements HalDaemon { confidenceSum += result.getFloat("confidence"); ++samples; } + + //check if the last period is complete and also should be aggregated + if(currentPeriod != null && currentPeriod.getEndTimestamp() <= new UTCTimePeriod(aggregationStartTime, aggrPeriodLength).getPreviosPeriod().getEndTimestamp()){ + saveData(preparedInsertStmt, confidenceSum, sum, samples, currentPeriod, ++highestSequenceId); + } + DBConnection.execBatch(preparedInsertStmt); HalContext.getDB().getConnection().commit(); @@ -201,7 +198,26 @@ public class SensorDataAggregatorDaemon implements HalDaemon { HalContext.getDB().getConnection().setAutoCommit(true); } return null; - } + } + + private void saveData(PreparedStatement preparedInsertStmt, float confidenceSum, float sum, int samples, UTCTimePeriod currentPeriod, long sequenceId) throws SQLException{ + float aggrConfidence = confidenceSum / (float)this.expectedSampleCount; + float data = -1; + switch(aggrMethod){ + case SUM: data = sum; break; + case AVERAGE: data = sum/samples; break; + } + logger.finer("saved period: " + currentPeriod + ", data: " + sum + ", confidence: " + aggrConfidence + ", samples: " + samples + ", aggrMethod: " + aggrMethod); + + preparedInsertStmt.setLong(1, sensorId); + preparedInsertStmt.setLong(2, sequenceId); + preparedInsertStmt.setLong(3, currentPeriod.getStartTimestamp()); + preparedInsertStmt.setLong(4, currentPeriod.getEndTimestamp()); + preparedInsertStmt.setFloat(5, data); + preparedInsertStmt.setFloat(6, aggrConfidence); + preparedInsertStmt.addBatch(); + } + } - + } diff --git a/src/se/hal/plugin/raspberry/RPiPowerConsumptionSensor.java b/src/se/hal/plugin/raspberry/RPiPowerConsumptionSensor.java index eca9829a..938f49c5 100644 --- a/src/se/hal/plugin/raspberry/RPiPowerConsumptionSensor.java +++ b/src/se/hal/plugin/raspberry/RPiPowerConsumptionSensor.java @@ -1,7 +1,5 @@ package se.hal.plugin.raspberry; -import com.pi4j.io.gpio.Pin; - import se.hal.intf.HalSensorController; import se.hal.struct.PowerConsumptionSensorData; import zutil.ui.Configurator; diff --git a/src/se/hal/util/UTCTimePeriod.java b/src/se/hal/util/UTCTimePeriod.java index 765205bb..51a89889 100755 --- a/src/se/hal/util/UTCTimePeriod.java +++ b/src/se/hal/util/UTCTimePeriod.java @@ -45,4 +45,8 @@ public class UTCTimePeriod{ return false; } + public String toString(){ + return start + "=>" + end + " (" + UTCTimeUtility.getDateString(start) + "=>" + UTCTimeUtility.getDateString(end) + ")"; + } + } \ No newline at end of file diff --git a/test/se/hal/deamon/SensorDataAggregationDeamonTest.java b/test/se/hal/deamon/SensorDataAggregationDeamonTest.java new file mode 100755 index 00000000..fa52c4eb --- /dev/null +++ b/test/se/hal/deamon/SensorDataAggregationDeamonTest.java @@ -0,0 +1,83 @@ +package se.hal.deamon; + +import java.sql.PreparedStatement; + +import org.junit.Before; +import org.junit.Test; + +import se.hal.HalContext; +import se.hal.util.UTCTimeUtility; +import zutil.db.DBConnection; +import zutil.db.DBUpgradeHandler; +import zutil.log.LogUtil; + +public class SensorDataAggregationDeamonTest { + private static final String DEFAULT_DB_FILE = "hal-default.db"; + + private static DBConnection db; + + @Before + public void setupDatabase() throws Exception{ + System.out.println("-----------------------------------------------"); + + //setup loggin + System.out.println("Setting up logging"); + LogUtil.readConfiguration("logging.properties"); + + //create in memory database + System.out.println("Creating a in-memory databse for test"); + db = new DBConnection(DBConnection.DBMS.SQLite, ":memory:"); + HalContext.setDB(db); + + //upgrade the database to latest version + System.out.println("Upgrading in-memory databse to latest version"); + DBConnection referenceDB = new DBConnection(DBConnection.DBMS.SQLite, DEFAULT_DB_FILE); + final DBUpgradeHandler handler = new DBUpgradeHandler(referenceDB); + handler.addIgnoredTable("db_version_history"); + handler.addIgnoredTable("sqlite_sequence"); //sqlite internal + handler.setTargetDB(db); + handler.upgrade(); + + //populate the database with data + System.out.println("Adding user to database"); + db.exec("INSERT INTO user(id, external, username) VALUES(222, 0, 'test')"); //adding user + System.out.println("Adding sensor to database"); + db.exec("INSERT INTO sensor(id, user_id, external_id, type) VALUES(111, 222, 333, 'se.hal.plugin.tellstick.protocols.Oregon0x1A2D')"); //adding sensor + System.out.println("Generating raw data and saving it to the database..."); + PreparedStatement stmt = db.getPreparedStatement("INSERT INTO sensor_data_raw (timestamp, sensor_id, data) VALUES(?, ?, ?)"); + try{ + db.getConnection().setAutoCommit(false); + + long startTime = System.currentTimeMillis(); + for(int i = 0; i < 100000; ++i){ + stmt.setLong(1, startTime-(UTCTimeUtility.MINUTE_IN_MS*i)); + stmt.setLong(2, 111); + stmt.setFloat(3, 7.323f); + stmt.addBatch(); + } + + DBConnection.execBatch(stmt); + db.getConnection().commit(); + }catch(Exception e){ + db.getConnection().rollback(); + throw e; + }finally{ + db.getConnection().setAutoCommit(true); + } + + System.out.println("Ready for test!"); + } + + @Test + public void testAggregation(){ + System.out.println("Start testing raw data aggregation"); + SensorDataAggregatorDaemon aggrDeamon = new SensorDataAggregatorDaemon(); + aggrDeamon.run(); + + //TODO: verify the aggregation + + System.out.println("Finnished testing raw data aggregation"); + } + + +}