Merge remote-tracking branch 'origin/master'

Former-commit-id: e670fc581a15545d7781d2fc4a976f74e0e3aba8
This commit is contained in:
Ziver Koc 2015-12-14 18:18:24 +01:00
commit 2bc4607ad1
7 changed files with 538 additions and 259 deletions

View file

@ -1,31 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="src"/>
<classpathentry kind="src" path="test"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/icu4j-54.1.1.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-client-5.1.2-jar-with-dependencies.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-lang-de-5.1.2.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-lang-en-5.1.2.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-lang-fr-5.1.2.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-lang-it-5.1.2.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-lang-ru-5.1.2.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-lang-sv-5.1.2.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-lang-te-5.1.2.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-lang-tr-5.1.2.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-runtime-5.1.2-jar-with-dependencies.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/voice-cmu-slt-hsmm-5.1.2.jar"/>
<classpathentry kind="lib" path="lib/Ab.jar"/>
<classpathentry kind="lib" path="lib/commons-math3-3.5.jar"/>
<classpathentry kind="lib" path="lib/jSerialComm-1.3.4.jar"/>
<classpathentry kind="lib" path="lib/marytts-client-5.1.2-jar-with-dependencies.jar"/>
<classpathentry kind="lib" path="lib/marytts-runtime-5.1.2-jar-with-dependencies.jar"/>
<classpathentry kind="lib" path="lib/pi4j-core-1.0.jar"/>
<classpathentry kind="lib" path="lib/junit-4.12.jar"/>
<classpathentry kind="lib" path="lib/java-speech-api-master.jar"/>
<classpathentry kind="lib" path="lib/sphinx4-core.jar"/>
<classpathentry kind="lib" path="lib/sqlite-jdbc-3.8.11.2.jar"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry combineaccessrules="false" kind="src" path="/zutil-java"/>
<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
<classpathentry kind="output" path="lib/java-speech-api-master/bin"/>
</classpath>
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="src"/>
<classpathentry kind="src" path="test"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/icu4j-54.1.1.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-client-5.1.2-jar-with-dependencies.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-lang-de-5.1.2.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-lang-en-5.1.2.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-lang-fr-5.1.2.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-lang-it-5.1.2.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-lang-ru-5.1.2.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-lang-sv-5.1.2.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-lang-te-5.1.2.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-lang-tr-5.1.2.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-runtime-5.1.2-jar-with-dependencies.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/voice-cmu-slt-hsmm-5.1.2.jar"/>
<classpathentry kind="lib" path="lib/Ab.jar"/>
<classpathentry kind="lib" path="lib/commons-math3-3.5.jar"/>
<classpathentry kind="lib" path="lib/jSerialComm-1.3.4.jar"/>
<classpathentry kind="lib" path="lib/marytts-client-5.1.2-jar-with-dependencies.jar"/>
<classpathentry kind="lib" path="lib/marytts-runtime-5.1.2-jar-with-dependencies.jar"/>
<classpathentry kind="lib" path="lib/pi4j-core-1.0.jar"/>
<classpathentry kind="lib" path="lib/junit-4.12.jar"/>
<classpathentry kind="lib" path="lib/java-speech-api-master.jar"/>
<classpathentry kind="lib" path="lib/sphinx4-core.jar"/>
<classpathentry kind="lib" path="lib/sqlite-jdbc-3.8.11.2_HACKED_FOR_RPI.jar"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry combineaccessrules="false" kind="src" path="/zutil-java"/>
<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
<classpathentry kind="output" path="lib/java-speech-api-master/bin"/>
</classpath>

View file

@ -20,8 +20,13 @@ import zutil.log.LogUtil;
public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
private static final Logger logger = LogUtil.getLogger();
private enum AggregationMethod{
SUM,
AVG
}
public void initiate(Timer timer){
timer.schedule(this, 0, TimeConstants.FIVE_MINUTES_IN_MS);
timer.schedule(this, 0, TimeUtility.FIVE_MINUTES_IN_MS);
}
@Override
@ -30,114 +35,146 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
List<Sensor> sensorList = Sensor.getLocalSensors(HalContext.getDB());
for(Sensor sensor : sensorList){
logger.fine("Aggregating sensor_id: " + sensor.getId());
aggregateSensor(sensor.getId());
aggregateSensor(sensor);
}
logger.fine("Aggregation Done");
} catch (SQLException e) {
logger.log(Level.SEVERE, null, e);
}
}
public void aggregateSensor(long sensorId) {
public void aggregateSensor(Sensor sensor) {
logger.fine("The sensor is of type: " + sensor.getType());
if(sensor.getType().equals("PowerMeter")){
logger.fine("aggregating raw data to five minute periods");
aggregateRawData(sensor.getId(), TimeUtility.FIVE_MINUTES_IN_MS, 5, AggregationMethod.SUM);
logger.fine("aggregating five minute periods into hour periods");
aggrigateAggregatedData(sensor.getId(), TimeUtility.FIVE_MINUTES_IN_MS, TimeUtility.HOUR_IN_MS, 12, AggregationMethod.SUM);
logger.fine("aggregating one hour periods into one day periods");
aggrigateAggregatedData(sensor.getId(), TimeUtility.HOUR_IN_MS, TimeUtility.DAY_IN_MS, 24, AggregationMethod.SUM);
}else{
logger.fine("The sensor type is not supported by the aggregation deamon. Ignoring");
}
}
/**
* Aggregate data from the raw DB table to the aggregated table
* @param sensorId The sensor for to aggregate data
* @param toPeriodSizeInMs The period length in ms to aggregate to
*/
private void aggregateRawData(long sensorId, long toPeriodSizeInMs, int expectedSampleCount, AggregationMethod aggrMethod){
DBConnection db = HalContext.getDB();
PreparedStatement stmt = null;
try {
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr WHERE sensor_id == ?");
stmt.setLong(1, sensorId);
Long maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult<Long>());
if(maxDBTimestamp == null)
maxDBTimestamp = 0l;
// 5 minute aggregation
long minPeriodTimestamp = getTimestampPeriodStart(TimeConstants.FIVE_MINUTES_IN_MS, System.currentTimeMillis());
logger.fine("Calculating 5 min periods... (from:"+ maxDBTimestamp +", to:"+ minPeriodTimestamp +")");
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr"
+ " WHERE sensor_id == ?"
+ " AND timestamp_end-timestamp_start == ?");
stmt.setLong(1, sensorId);
stmt.setLong(2, toPeriodSizeInMs-1);
Long maxTimestampFoundForSensor = DBConnection.exec(stmt, new SimpleSQLResult<Long>());
if(maxTimestampFoundForSensor == null)
maxTimestampFoundForSensor = 0l;
long currentPeriodStartTimestamp = TimeUtility.getTimestampPeriodStart(toPeriodSizeInMs, System.currentTimeMillis());
logger.fine("Calculating periods... (from:"+ maxTimestampFoundForSensor +", to:"+ currentPeriodStartTimestamp +")");
stmt = db.getPreparedStatement("SELECT *, 1 AS confidence, timestamp AS timestamp_start FROM sensor_data_raw"
+" WHERE sensor_id == ? AND ? < timestamp AND timestamp < ? "
+" WHERE sensor_id == ?"
+ " AND ? < timestamp"
+ " AND timestamp < ? "
+" ORDER BY timestamp ASC");
stmt.setLong(1, sensorId);
stmt.setLong(2, maxDBTimestamp);
stmt.setLong(3, minPeriodTimestamp);
DBConnection.exec(stmt, new DataAggregator(sensorId, TimeConstants.FIVE_MINUTES_IN_MS, 5));
stmt.setLong(2, maxTimestampFoundForSensor);
stmt.setLong(3, currentPeriodStartTimestamp);
DBConnection.exec(stmt, new DataAggregator(sensorId, toPeriodSizeInMs, expectedSampleCount, aggrMethod));
} catch (SQLException e) {
logger.log(Level.SEVERE, null, e);
}
}
/**
* Re-aggregate data from the aggregated DB table to itself
* @param sensorId The sensor for to aggregate data
* @param fromPeriodSizeInMs The period length in ms to aggregate from
* @param toPeriodSizeInMs The period length in ms to aggregate to
*/
private void aggrigateAggregatedData(long sensorId, long fromPeriodSizeInMs, long toPeriodSizeInMs, int expectedSampleCount, AggregationMethod aggrMethod){
DBConnection db = HalContext.getDB();
PreparedStatement stmt = null;
try {
// hour aggregation
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr"
+" WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?");
+" WHERE sensor_id == ?"
+ " AND timestamp_end-timestamp_start == ?");
stmt.setLong(1, sensorId);
stmt.setLong(2, TimeConstants.HOUR_IN_MS-1);
maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult<Long>());
if(maxDBTimestamp == null)
maxDBTimestamp = 0l;
long hourPeriodTimestamp = getTimestampPeriodStart(TimeConstants.HOUR_IN_MS, System.currentTimeMillis());
logger.fine("Calculating hour periods... (from:"+ maxDBTimestamp +", to:"+ hourPeriodTimestamp +")");
stmt.setLong(2, toPeriodSizeInMs-1);
Long maxTimestampFoundForSensor = DBConnection.exec(stmt, new SimpleSQLResult<Long>());
if(maxTimestampFoundForSensor == null)
maxTimestampFoundForSensor = 0l;
long currentPeriodStartTimestamp = TimeUtility.getTimestampPeriodStart(toPeriodSizeInMs, System.currentTimeMillis());
logger.fine("Calculating periods... (from:"+ maxTimestampFoundForSensor +", to:"+ currentPeriodStartTimestamp +")");
stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr"
+" WHERE sensor_id == ? AND ? < timestamp_start AND timestamp_start < ? AND timestamp_end-timestamp_start == ?"
+" WHERE sensor_id == ?"
+ " AND ? < timestamp_start"
+ " AND timestamp_start < ?"
+ " AND timestamp_end-timestamp_start == ?"
+" ORDER BY timestamp_start ASC");
stmt.setLong(1, sensorId);
stmt.setLong(2, maxDBTimestamp);
stmt.setLong(3, hourPeriodTimestamp);
stmt.setLong(4, TimeConstants.FIVE_MINUTES_IN_MS-1);
DBConnection.exec(stmt, new DataAggregator(sensorId, TimeConstants.HOUR_IN_MS, 12));
// day aggregation
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?");
stmt.setLong(1, sensorId);
stmt.setLong(2, TimeConstants.DAY_IN_MS-1);
maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult<Long>());
if(maxDBTimestamp == null)
maxDBTimestamp = 0l;
long dayPeriodTimestamp = getTimestampPeriodStart(TimeConstants.DAY_IN_MS, System.currentTimeMillis());
logger.fine("Calculating day periods... (from:"+ maxDBTimestamp +", to:"+ dayPeriodTimestamp +")");
stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr"
+" WHERE sensor_id == ? AND ? < timestamp_start AND timestamp_start < ? AND timestamp_end-timestamp_start == ?"
+" ORDER BY timestamp_start ASC");
stmt.setLong(1, sensorId);
stmt.setLong(2, maxDBTimestamp);
stmt.setLong(3, dayPeriodTimestamp);
stmt.setLong(4, TimeConstants.HOUR_IN_MS-1);
DBConnection.exec(stmt, new DataAggregator(sensorId, TimeConstants.DAY_IN_MS, 24));
stmt.setLong(2, maxTimestampFoundForSensor);
stmt.setLong(3, currentPeriodStartTimestamp);
stmt.setLong(4, fromPeriodSizeInMs-1);
DBConnection.exec(stmt, new DataAggregator(sensorId, toPeriodSizeInMs, expectedSampleCount, aggrMethod));
} catch (SQLException e) {
logger.log(Level.SEVERE, null, e);
}
}
/**
* Internal class for aggregating data to the aggregated DB table
*/
private class DataAggregator implements SQLResultHandler<Object>{
private long sensorId = -1;
private long aggrTimeInMs = -1;
private int expectedSampleCount = -1;
private AggregationMethod aggrMethod = null;
public DataAggregator(long sensorId, long aggrTimeInMs, int expectedSampleCount) {
public DataAggregator(long sensorId, long aggrTimeInMs, int expectedSampleCount, AggregationMethod aggrMethod) {
this.sensorId = sensorId;
this.aggrTimeInMs = aggrTimeInMs;
this.expectedSampleCount = expectedSampleCount;
this.aggrMethod = aggrMethod;
}
@Override
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
try{
HalContext.getDB().getConnection().setAutoCommit(false);
long currentPeriodTimestamp = 0;
int sum = 0;
float confidenceSum = 0;
int samples = 0;
long highestSequenceId = Sensor.getHighestSequenceId(sensorId);
HalContext.getDB().getConnection().setAutoCommit(false);
PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
while(result.next()){
if(sensorId != result.getInt("sensor_id")){
throw new IllegalArgumentException("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")");
}
long timestamp = result.getLong("timestamp_start");
long periodTimestamp = getTimestampPeriodStart(this.aggrTimeInMs, timestamp);
long periodTimestamp = TimeUtility.getTimestampPeriodStart(this.aggrTimeInMs, timestamp);
if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){
float aggrConfidence = confidenceSum / (float)this.expectedSampleCount;
logger.finer("Calculated day period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ aggrConfidence+ " samples: " + samples);
float data = -1;
switch(aggrMethod){
case SUM: data = sum; break;
case AVG: data = sum/samples; break;
}
logger.finer("Calculated day period: " + currentPeriodTimestamp + ", data: " + sum + ", confidence: " + aggrConfidence + ", samples: " + samples + ", aggrMethod: " + aggrMethod);
preparedInsertStmt.setInt(1, result.getInt("sensor_id"));
preparedInsertStmt.setLong(2, ++highestSequenceId);
preparedInsertStmt.setLong(3, currentPeriodTimestamp);
preparedInsertStmt.setLong(4, currentPeriodTimestamp + this.aggrTimeInMs - 1);
preparedInsertStmt.setInt(5, sum);
preparedInsertStmt.setInt(5, (int)data); //TODO: make data float in DB to handle aggrMethod.AVG where the data must be able to be saved as a float
preparedInsertStmt.setFloat(6, aggrConfidence);
preparedInsertStmt.addBatch();
@ -163,10 +200,5 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
return null;
}
}
private static long getTimestampPeriodStart(long periodLengthInMs, long timestamp){
long tmp = timestamp % periodLengthInMs;
return timestamp - tmp;
}
}

View file

@ -21,7 +21,7 @@ public class DataDeletionDaemon extends TimerTask implements HalDaemon {
private static final Logger logger = LogUtil.getLogger();
public void initiate(Timer timer){
timer.schedule(this, 5000, TimeConstants.FIVE_MINUTES_IN_MS);
timer.schedule(this, 5000, TimeUtility.FIVE_MINUTES_IN_MS);
}
@Override
@ -30,7 +30,7 @@ public class DataDeletionDaemon extends TimerTask implements HalDaemon {
List<Sensor> sensorList = Sensor.getSensors(HalContext.getDB());
for(Sensor sensor : sensorList){
logger.fine("Deleting old data for sensor id: " + sensor.getId());
cleanupSensor(sensor.getId());
cleanupSensor(sensor);
}
logger.fine("Data cleanup done");
} catch (SQLException e) {
@ -38,7 +38,30 @@ public class DataDeletionDaemon extends TimerTask implements HalDaemon {
}
}
public void cleanupSensor(long sensorId) {
public void cleanupSensor(Sensor sensor) {
logger.fine("The sensor is of type: " + sensor.getType());
if(sensor.getType().equals("PowerMeter")){
//if(sensor.isInternal()){ //TODO
cleanupInternalSensorData(sensor.getId(), TimeUtility.HOUR_IN_MS, TimeUtility.FIVE_MINUTES_IN_MS, TimeUtility.DAY_IN_MS);
cleanupInternalSensorData(sensor.getId(), TimeUtility.DAY_IN_MS, TimeUtility.HOUR_IN_MS, TimeUtility.WEEK_IN_MS);
//}else{ //TODO
//cleanupExternalSensorData(sensor.getId(), TimeUtility.FIVE_MINUTES_IN_MS, TimeUtility.DAY_IN_MS);
//cleanupExternalSensorData(sensor.getId(), TimeUtility.DAY_IN_MS, TimeUtility.WEEK_IN_MS);
//}
clearPeriodsOfWrongLenght(sensor.getId(), TimeUtility.FIVE_MINUTES_IN_MS, TimeUtility.HOUR_IN_MS, TimeUtility.WEEK_IN_MS);
}else{
logger.fine("The sensor type is not supported by the cleanup deamon. Ignoring");
}
}
/**
* Will clear periods only if it has been aggregated and are too old.
* @param sensorId
* @Param referencePeriodlength Will only clear periods older than the newest period of this length.
* @Param clearPeriodlength Will clear periods with this length
* @param olderThan Data must be older than this many ms to be cleared from the DB
*/
private void cleanupInternalSensorData(long sensorId, long referencePeriodlength, long clearPeriodlength, long olderThan){
DBConnection db = HalContext.getDB();
PreparedStatement stmt = null;
try {
@ -48,7 +71,7 @@ public class DataDeletionDaemon extends TimerTask implements HalDaemon {
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr"
+" WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?");
stmt.setLong(1, sensorId);
stmt.setLong(2, TimeConstants.HOUR_IN_MS-1);
stmt.setLong(2, referencePeriodlength-1);
maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult<Long>());
if(maxDBTimestamp == null)
maxDBTimestamp = 0l;
@ -60,35 +83,65 @@ public class DataDeletionDaemon extends TimerTask implements HalDaemon {
+ "AND timestamp_end < ?");
stmt.setLong(1, sensorId);
stmt.setLong(2, maxDBTimestamp);
stmt.setLong(3, TimeConstants.FIVE_MINUTES_IN_MS-1);
stmt.setLong(4, System.currentTimeMillis()-TimeConstants.DAY_IN_MS);
DBConnection.exec(stmt, new AggregateDataDeleter(sensorId));
// delete too old 1 hour periods that already have been aggregated into days
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr"
+" WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?");
stmt.setLong(1, sensorId);
stmt.setLong(2, TimeConstants.DAY_IN_MS-1);
maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult<Long>());
if(maxDBTimestamp == null)
maxDBTimestamp = 0l;
stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr"
+" WHERE sensor_id == ? "
+ "AND timestamp_end < ? "
+ "AND timestamp_end-timestamp_start == ?"
+ "AND timestamp_end < ?");
stmt.setLong(1, sensorId);
stmt.setLong(2, maxDBTimestamp);
stmt.setLong(3, TimeConstants.HOUR_IN_MS-1);
stmt.setLong(4, System.currentTimeMillis()-TimeConstants.SEVEN_DAYS_IN_MS);
stmt.setLong(3, clearPeriodlength-1);
stmt.setLong(4, System.currentTimeMillis()-olderThan);
DBConnection.exec(stmt, new AggregateDataDeleter(sensorId));
} catch (SQLException e) {
logger.log(Level.SEVERE, null, e);
}
}
private class AggregateDataDeleter implements SQLResultHandler<Object>{
/**
* Will clear periods if they are too old.
* @param sensorId
* @Param clearPeriodlength Will clear periods with this length
* @param olderThan Data must be older than this many ms to be cleared from the DB
*/
private void cleanupExternalSensorData(long sensorId, long clearPeriodlength, long olderThan){
DBConnection db = HalContext.getDB();
PreparedStatement stmt = null;
try {
stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr"
+" WHERE sensor_id == ? "
+ "AND timestamp_end-timestamp_start == ?"
+ "AND timestamp_end < ?");
stmt.setLong(1, sensorId);
stmt.setLong(2, clearPeriodlength-1);
stmt.setLong(3, System.currentTimeMillis()-olderThan);
DBConnection.exec(stmt, new AggregateDataDeleter(sensorId));
} catch (SQLException e) {
logger.log(Level.SEVERE, null, e);
}
}
/**
* Will delete all aggregated entries for a sensor id where the period length is not expected
* @param sensorId
* @param expectedPeriodLengths
*/
private void clearPeriodsOfWrongLenght(long sensorId, long... expectedPeriodLengths){
DBConnection db = HalContext.getDB();
PreparedStatement stmt = null;
try {
StringBuilder querry = new StringBuilder("SELECT * FROM sensor_data_aggr WHERE sensor_id == ?");
for(int i = 0; i < expectedPeriodLengths.length; ++i){
querry.append(" AND timestamp_end-timestamp_start != ?");
}
stmt = db.getPreparedStatement(querry.toString());
for(int i = 0; i < expectedPeriodLengths.length; ++i){
stmt.setLong(i+1, expectedPeriodLengths[i]);
}
long deletedRows = DBConnection.exec(stmt, new AggregateDataDeleter(sensorId));
if(deletedRows > 0){
logger.severe("removed aggregated data with an unknown period length. Is the database corrupt?");
}
} catch (SQLException e) {
logger.log(Level.SEVERE, null, e);
}
}
private class AggregateDataDeleter implements SQLResultHandler<Long>{
private long sensorId = -1;
public AggregateDataDeleter(long sensorId){
@ -96,7 +149,8 @@ public class DataDeletionDaemon extends TimerTask implements HalDaemon {
}
@Override
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
public Long handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
long count = 0;
try{
HalContext.getDB().getConnection().setAutoCommit(false);
PreparedStatement preparedDeleteStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?");
@ -104,20 +158,22 @@ public class DataDeletionDaemon extends TimerTask implements HalDaemon {
if(sensorId != result.getInt("sensor_id")){
throw new IllegalArgumentException("Found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")");
}
logger.finer("Deleting sensor aggregate entry timestamp: "+ result.getLong("timestamp_start") +" - "+ result.getLong("timestamp_end"));
logger.finer("Deleting sensor aggregate entry timestamp: "+ result.getLong("timestamp_start") +" - "+ result.getLong("timestamp_end") + " (" + TimeUtility.msToString(result.getLong("timestamp_end")-result.getLong("timestamp_start")) + ")");
preparedDeleteStmt.setInt(1, result.getInt("sensor_id"));
preparedDeleteStmt.setLong(2, result.getLong("sequence_id"));
preparedDeleteStmt.addBatch();
count++;
}
DBConnection.execBatch(preparedDeleteStmt);
HalContext.getDB().getConnection().commit();
}catch(Exception e){
HalContext.getDB().getConnection().rollback();
throw e;
}finally{
HalContext.getDB().getConnection().setAutoCommit(true);
}
return null;
return count;
}
}

View file

@ -1,8 +0,0 @@
package se.koc.hal.deamon;
public class TimeConstants {
public static final long FIVE_MINUTES_IN_MS = 5 * 60 * 1000;
public static final long HOUR_IN_MS = FIVE_MINUTES_IN_MS * 12;
public static final long DAY_IN_MS = HOUR_IN_MS * 24;
public static final long SEVEN_DAYS_IN_MS = DAY_IN_MS * 7;
}

View file

@ -0,0 +1,102 @@
package se.koc.hal.deamon;
import java.util.Calendar;
public class TimeUtility {
public static final long SECOND_IN_MS = 1000;
public static final long MINUTES_IN_MS = SECOND_IN_MS * 60;
public static final long FIVE_MINUTES_IN_MS = MINUTES_IN_MS * 5;
public static final long HOUR_IN_MS = MINUTES_IN_MS * 60;
public static final long DAY_IN_MS = HOUR_IN_MS * 24;
public static final long WEEK_IN_MS = DAY_IN_MS * 7;
/**
* Get the timstamp for the given timestamp floored with the period length. The result should point to the beginning of the timestamps period.
* @param periodLengthInMs The periods length to floor the timestamp with
* @param timestamp The timestamp to floor.
* @return
*/
public static long getTimestampPeriodStart(long periodLengthInMs, long timestamp){
if(periodLengthInMs < DAY_IN_MS){ //simple math if the period is less than a day long
return timestamp - (timestamp % periodLengthInMs);
}else{
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(timestamp);
boolean clear = false;
int days = millisecondsToDays(periodLengthInMs);
if(days > 0){
int currentDay = cal.get(Calendar.DAY_OF_YEAR);
cal.set(Calendar.DAY_OF_YEAR, (currentDay/days)*days);
clear = true;
}
int hours = millisecondsToHourOfDay(periodLengthInMs);
if(hours > 0){
int currentHour = cal.get(Calendar.HOUR_OF_DAY);
cal.set(Calendar.HOUR_OF_DAY, (currentHour/hours)*hours);
clear = true;
}else if(clear){
cal.set(Calendar.HOUR_OF_DAY, 0);
}
int minutes = millisecondsToMinuteOfHour(periodLengthInMs);
if(minutes > 0){
int currentMinute = cal.get(Calendar.MINUTE);
cal.set(Calendar.MINUTE, (currentMinute/minutes)*minutes);
clear = true;
}else if(clear){
cal.set(Calendar.MINUTE, 0);
}
int seconds = millisecondsToSecondOfMinute(periodLengthInMs);
if(seconds > 0){
int currentSecond = cal.get(Calendar.SECOND);
cal.set(Calendar.SECOND, (currentSecond/seconds)*seconds);
clear = true;
}else if(clear){
cal.set(Calendar.SECOND, 0);
}
int milliseconds = millisecondsToMillisecondInSecond(periodLengthInMs);
if(milliseconds > 0){
int currentMillisecond = cal.get(Calendar.MILLISECOND);
cal.set(Calendar.MILLISECOND, (currentMillisecond/milliseconds)*milliseconds);
}else if(clear){
cal.set(Calendar.MILLISECOND, 0);
}
return cal.getTimeInMillis();
}
}
public static int millisecondsToMillisecondInSecond(long ms){
return (int) (ms % SECOND_IN_MS);
}
public static int millisecondsToSecondOfMinute(long ms){
return (int) ((ms % MINUTES_IN_MS) / SECOND_IN_MS);
}
public static int millisecondsToMinuteOfHour(long ms){
return (int) ((ms % HOUR_IN_MS) / MINUTES_IN_MS);
}
public static int millisecondsToHourOfDay(long ms){
return (int) ((ms % DAY_IN_MS) / HOUR_IN_MS);
}
public static int millisecondsToDays(long ms){
return (int) (ms / DAY_IN_MS);
}
public static String msToString(long ms){
String retval = "";
int days = millisecondsToDays(ms);
retval += days + "days+";
int hours = millisecondsToHourOfDay(ms);
retval += (hours<10?"0"+hours:hours);
int minutes = millisecondsToMinuteOfHour(ms);
retval += ":" + (minutes<10?"0"+minutes:minutes);
int seconds = millisecondsToSecondOfMinute(ms);
retval += ":" + (seconds<10?"0"+seconds:seconds);
int milliseconds = millisecondsToMillisecondInSecond(ms);
retval += "." + (milliseconds<100?"0"+(milliseconds<10?"0"+milliseconds:milliseconds):milliseconds);
return retval;
}
}

View file

@ -1,131 +1,131 @@
package se.koc.hal.page;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Map;
import se.koc.hal.HalContext;
import se.koc.hal.deamon.TimeConstants;
import zutil.db.DBConnection;
import zutil.db.SQLResultHandler;
import zutil.io.file.FileUtil;
import zutil.parser.Templator;
public class PCOverviewHttpPage extends HalHttpPage {
public PCOverviewHttpPage() {
super("Overview", "overview");
}
@Override
public Templator httpRespond(
Map<String, Object> session,
Map<String, String> cookie,
Map<String, String> request)
throws Exception{
DBConnection db = HalContext.getDB();
PreparedStatement stmt = db.getPreparedStatement(
"SELECT user.username as username,"
+ " sensor_data_aggr.timestamp_start as timestamp_start,"
+ " sensor_data_aggr.timestamp_end as timestamp_end,"
+ " sensor_data_aggr.data as data,"
+ " sensor_data_aggr.confidence as confidence,"
+ TimeConstants.FIVE_MINUTES_IN_MS + " as period_length"
+ " FROM sensor_data_aggr, user, sensor"
+ " WHERE sensor.id = sensor_data_aggr.sensor_id"
+ " AND user.id = sensor.user_id"
+ " AND timestamp_end-timestamp_start == ?"
+ " AND timestamp_start > ?"
+ " ORDER BY timestamp_start ASC");
stmt.setLong(1, TimeConstants.FIVE_MINUTES_IN_MS-1);
stmt.setLong(2, (System.currentTimeMillis() - TimeConstants.DAY_IN_MS) );
ArrayList<PowerData> minDataList = DBConnection.exec(stmt , new SQLPowerDataBuilder());
stmt = db.getPreparedStatement(
"SELECT user.username as username,"
+ " sensor_data_aggr.timestamp_start as timestamp_start,"
+ " sensor_data_aggr.timestamp_end as timestamp_end,"
+ " sensor_data_aggr.data as data,"
+ " sensor_data_aggr.confidence as confidence,"
+ TimeConstants.HOUR_IN_MS + " as period_length"
+ " FROM sensor_data_aggr, user, sensor"
+ " WHERE sensor.id = sensor_data_aggr.sensor_id"
+ " AND user.id = sensor.user_id"
+ " AND timestamp_end-timestamp_start == ?"
+ " AND timestamp_start > ?"
+ " ORDER BY timestamp_start ASC");
stmt.setLong(1, TimeConstants.HOUR_IN_MS-1);
stmt.setLong(2, (System.currentTimeMillis() - 3*TimeConstants.DAY_IN_MS) );
ArrayList<PowerData> hourDataList = DBConnection.exec(stmt, new SQLPowerDataBuilder());
stmt = db.getPreparedStatement(
"SELECT user.username as username,"
+ " sensor_data_aggr.timestamp_start as timestamp_start,"
+ " sensor_data_aggr.timestamp_end as timestamp_end,"
+ " sensor_data_aggr.data as data,"
+ " sensor_data_aggr.confidence as confidence,"
+ TimeConstants.DAY_IN_MS + " as period_length"
+ " FROM sensor_data_aggr, user, sensor"
+ " WHERE sensor.id = sensor_data_aggr.sensor_id"
+ " AND user.id = sensor.user_id"
+ " AND timestamp_end-timestamp_start == ?"
+ " ORDER BY timestamp_start ASC");
stmt.setLong(1, TimeConstants.DAY_IN_MS-1);
ArrayList<PowerData> dayDataList = DBConnection.exec(stmt, new SQLPowerDataBuilder());
Templator tmpl = new Templator(FileUtil.find("web-resource/overview.tmpl"));
tmpl.set("minData", minDataList);
tmpl.set("hourData", hourDataList);
tmpl.set("dayData", dayDataList);
tmpl.set("username", new String[]{"Ziver", "Daniel"});
return tmpl;
}
public static class PowerData{
long timestamp;
String data;
String username;
public PowerData(long time, String data, String uname) {
this.timestamp = time;
this.data = data;
this.username = uname;
}
}
private static class SQLPowerDataBuilder implements SQLResultHandler<ArrayList<PowerData>> {
@Override
public ArrayList<PowerData> handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
ArrayList<PowerData> list = new ArrayList<>();
long previousTimestampEnd = -1;
while(result.next()){
long timestampStart = result.getLong("timestamp_start");
long timestampEnd = result.getLong("timestamp_end");
long periodLength = result.getLong("period_length");
int data = result.getInt("data");
String username = result.getString("username");
float confidence = result.getFloat("confidence");
//add null data point to list if one or more periods of data is missing before this
if(previousTimestampEnd != -1 && timestampStart-previousTimestampEnd > periodLength){
list.add(new PowerData(previousTimestampEnd+1, "null", username));
}
//add this data point to list
list.add(new PowerData(timestampStart, ""+ (data/1000.0), username));
//update previous end timestamp
previousTimestampEnd = timestampEnd;
}
return list;
}
}
}
package se.koc.hal.page;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Map;
import se.koc.hal.HalContext;
import se.koc.hal.deamon.TimeUtility;
import zutil.db.DBConnection;
import zutil.db.SQLResultHandler;
import zutil.io.file.FileUtil;
import zutil.parser.Templator;
public class PCOverviewHttpPage extends HalHttpPage {
public PCOverviewHttpPage() {
super("Overview", "overview");
}
@Override
public Templator httpRespond(
Map<String, Object> session,
Map<String, String> cookie,
Map<String, String> request)
throws Exception{
DBConnection db = HalContext.getDB();
PreparedStatement stmt = db.getPreparedStatement(
"SELECT user.username as username,"
+ " sensor_data_aggr.timestamp_start as timestamp_start,"
+ " sensor_data_aggr.timestamp_end as timestamp_end,"
+ " sensor_data_aggr.data as data,"
+ " sensor_data_aggr.confidence as confidence,"
+ TimeUtility.FIVE_MINUTES_IN_MS + " as period_length"
+ " FROM sensor_data_aggr, user, sensor"
+ " WHERE sensor.id = sensor_data_aggr.sensor_id"
+ " AND user.id = sensor.user_id"
+ " AND timestamp_end-timestamp_start == ?"
+ " AND timestamp_start > ?"
+ " ORDER BY timestamp_start ASC");
stmt.setLong(1, TimeUtility.FIVE_MINUTES_IN_MS-1);
stmt.setLong(2, (System.currentTimeMillis() - TimeUtility.DAY_IN_MS) );
ArrayList<PowerData> minDataList = DBConnection.exec(stmt , new SQLPowerDataBuilder());
stmt = db.getPreparedStatement(
"SELECT user.username as username,"
+ " sensor_data_aggr.timestamp_start as timestamp_start,"
+ " sensor_data_aggr.timestamp_end as timestamp_end,"
+ " sensor_data_aggr.data as data,"
+ " sensor_data_aggr.confidence as confidence,"
+ TimeUtility.HOUR_IN_MS + " as period_length"
+ " FROM sensor_data_aggr, user, sensor"
+ " WHERE sensor.id = sensor_data_aggr.sensor_id"
+ " AND user.id = sensor.user_id"
+ " AND timestamp_end-timestamp_start == ?"
+ " AND timestamp_start > ?"
+ " ORDER BY timestamp_start ASC");
stmt.setLong(1, TimeUtility.HOUR_IN_MS-1);
stmt.setLong(2, (System.currentTimeMillis() - TimeUtility.WEEK_IN_MS) );
ArrayList<PowerData> hourDataList = DBConnection.exec(stmt, new SQLPowerDataBuilder());
stmt = db.getPreparedStatement(
"SELECT user.username as username,"
+ " sensor_data_aggr.timestamp_start as timestamp_start,"
+ " sensor_data_aggr.timestamp_end as timestamp_end,"
+ " sensor_data_aggr.data as data,"
+ " sensor_data_aggr.confidence as confidence,"
+ TimeUtility.DAY_IN_MS + " as period_length"
+ " FROM sensor_data_aggr, user, sensor"
+ " WHERE sensor.id = sensor_data_aggr.sensor_id"
+ " AND user.id = sensor.user_id"
+ " AND timestamp_end-timestamp_start == ?"
+ " ORDER BY timestamp_start ASC");
stmt.setLong(1, TimeUtility.DAY_IN_MS-1);
ArrayList<PowerData> dayDataList = DBConnection.exec(stmt, new SQLPowerDataBuilder());
Templator tmpl = new Templator(FileUtil.find("web-resource/overview.tmpl"));
tmpl.set("minData", minDataList);
tmpl.set("hourData", hourDataList);
tmpl.set("dayData", dayDataList);
tmpl.set("username", new String[]{"Ziver", "Daniel"});
return tmpl;
}
public static class PowerData{
long timestamp;
String data;
String username;
public PowerData(long time, String data, String uname) {
this.timestamp = time;
this.data = data;
this.username = uname;
}
}
private static class SQLPowerDataBuilder implements SQLResultHandler<ArrayList<PowerData>> {
@Override
public ArrayList<PowerData> handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
ArrayList<PowerData> list = new ArrayList<>();
long previousTimestampEnd = -1;
while(result.next()){
long timestampStart = result.getLong("timestamp_start");
long timestampEnd = result.getLong("timestamp_end");
long periodLength = result.getLong("period_length");
int data = result.getInt("data");
String username = result.getString("username");
float confidence = result.getFloat("confidence");
//add null data point to list if one or more periods of data is missing before this
if(previousTimestampEnd != -1 && timestampStart-previousTimestampEnd > periodLength){
list.add(new PowerData(previousTimestampEnd+1, "null", username));
}
//add this data point to list
list.add(new PowerData(timestampStart, ""+ (data/1000.0), username));
//update previous end timestamp
previousTimestampEnd = timestampEnd;
}
return list;
}
}
}

View file

@ -0,0 +1,97 @@
package se.koc.hal.deamon;
import static org.junit.Assert.*;
import java.util.Calendar;
import org.junit.Before;
import org.junit.Test;
public class TimeUtilityTest {
private long currentTime;
private Calendar referenceCalendar;
@Before
public void setup(){
currentTime = System.currentTimeMillis();
referenceCalendar = Calendar.getInstance();
referenceCalendar.setTimeInMillis(currentTime);
}
@Test
public void testDayStartForCurrentTime(){
long thisPeriodStartedAt = TimeUtility.getTimestampPeriodStart(TimeUtility.DAY_IN_MS, currentTime);
Calendar testCalendar = Calendar.getInstance();
testCalendar.setTimeInMillis(thisPeriodStartedAt);
assertEquals("millisecond is wrong", 0, testCalendar.get(Calendar.MILLISECOND));
assertEquals("second is wrong", 0, testCalendar.get(Calendar.SECOND));
assertEquals("minute is wrong", 0, testCalendar.get(Calendar.MINUTE));
assertEquals("hour is wrong", 0, testCalendar.get(Calendar.HOUR_OF_DAY));
assertEquals("day is wrong", referenceCalendar.get(Calendar.DAY_OF_YEAR), testCalendar.get(Calendar.DAY_OF_YEAR));
}
@Test
public void testHourStartForCurrentTime(){
long thisPeriodStartedAt = TimeUtility.getTimestampPeriodStart(TimeUtility.HOUR_IN_MS, currentTime);
Calendar testCalendar = Calendar.getInstance();
testCalendar.setTimeInMillis(thisPeriodStartedAt);
assertEquals("millisecond is wrong", 0, testCalendar.get(Calendar.MILLISECOND));
assertEquals("second is wrong", 0, testCalendar.get(Calendar.SECOND));
assertEquals("minute is wrong", 0, testCalendar.get(Calendar.MINUTE));
assertEquals("hour is wrong", referenceCalendar.get(Calendar.HOUR_OF_DAY), testCalendar.get(Calendar.HOUR_OF_DAY));
assertEquals("day is wrong", referenceCalendar.get(Calendar.DAY_OF_YEAR), testCalendar.get(Calendar.DAY_OF_YEAR));
}
@Test
public void testMinuteStartForCurrentTime(){
long thisPeriodStartedAt = TimeUtility.getTimestampPeriodStart(TimeUtility.MINUTES_IN_MS, currentTime);
Calendar testCalendar = Calendar.getInstance();
testCalendar.setTimeInMillis(thisPeriodStartedAt);
assertEquals("millisecond is wrong", 0, testCalendar.get(Calendar.MILLISECOND));
assertEquals("second is wrong", 0, testCalendar.get(Calendar.SECOND));
assertEquals("minute is wrong", referenceCalendar.get(Calendar.MINUTE), testCalendar.get(Calendar.MINUTE));
assertEquals("hour is wrong", referenceCalendar.get(Calendar.HOUR_OF_DAY), testCalendar.get(Calendar.HOUR_OF_DAY));
assertEquals("day is wrong", referenceCalendar.get(Calendar.DAY_OF_YEAR), testCalendar.get(Calendar.DAY_OF_YEAR));
}
@Test
public void testSecondStartForCurrentTime(){
long thisPeriodStartedAt = TimeUtility.getTimestampPeriodStart(TimeUtility.SECOND_IN_MS, currentTime);
Calendar testCalendar = Calendar.getInstance();
testCalendar.setTimeInMillis(thisPeriodStartedAt);
assertEquals("millisecond is wrong", 0, testCalendar.get(Calendar.MILLISECOND));
assertEquals("second is wrong", referenceCalendar.get(Calendar.SECOND), testCalendar.get(Calendar.SECOND));
assertEquals("minute is wrong", referenceCalendar.get(Calendar.MINUTE), testCalendar.get(Calendar.MINUTE));
assertEquals("hour is wrong", referenceCalendar.get(Calendar.HOUR_OF_DAY), testCalendar.get(Calendar.HOUR_OF_DAY));
assertEquals("day is wrong", referenceCalendar.get(Calendar.DAY_OF_YEAR), testCalendar.get(Calendar.DAY_OF_YEAR));
}
@Test
public void testMsToString(){
//low values
assertEquals("0days+00:00:00.000", TimeUtility.msToString(0));
assertEquals("0days+00:00:00.001", TimeUtility.msToString(1));
assertEquals("0days+00:00:01.000", TimeUtility.msToString(TimeUtility.SECOND_IN_MS));
assertEquals("0days+00:01:00.000", TimeUtility.msToString(TimeUtility.MINUTES_IN_MS));
assertEquals("0days+00:05:00.000", TimeUtility.msToString(TimeUtility.FIVE_MINUTES_IN_MS));
assertEquals("0days+01:00:00.000", TimeUtility.msToString(TimeUtility.HOUR_IN_MS));
assertEquals("1days+00:00:00.000", TimeUtility.msToString(TimeUtility.DAY_IN_MS));
assertEquals("7days+00:00:00.000", TimeUtility.msToString(TimeUtility.WEEK_IN_MS));
//high values
assertEquals("0days+00:00:00.999", TimeUtility.msToString(999));
assertEquals("0days+00:00:59.000", TimeUtility.msToString(TimeUtility.SECOND_IN_MS*59));
assertEquals("0days+00:59:00.000", TimeUtility.msToString(TimeUtility.MINUTES_IN_MS*59));
assertEquals("0days+23:00:00.000", TimeUtility.msToString(TimeUtility.HOUR_IN_MS*23));
assertEquals("369days+00:00:00.000", TimeUtility.msToString(TimeUtility.DAY_IN_MS*369));
//combinations
long ms = (TimeUtility.DAY_IN_MS*999) + (TimeUtility.HOUR_IN_MS*23) + (TimeUtility.MINUTES_IN_MS*59) + (TimeUtility.SECOND_IN_MS*59) + 999;
assertEquals("999days+23:59:59.999", TimeUtility.msToString(ms));
}
}