Optimized data aggregation.

- All aggregation use only the raw data as input.
- Only produce aggregated data that will show in a chart (less work after clearing the sensor_data_aggr table).
- Adding version handling of data aggregation to secure that the peer has up to date data.
- New db version: 6 - Adding sensor.aggr_version(int, default=1), will clear sensor_data_aggr for local sensors


Former-commit-id: cbf5a890a5058791b443e975048db13f6ac4c9fe
This commit is contained in:
Daniel Collin 2016-02-02 14:21:56 +01:00
parent a6f8cf872a
commit ea5c991824
10 changed files with 121 additions and 136 deletions

View file

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<classpath> <classpath>
<classpathentry excluding="" kind="src" path="src"/> <classpathentry kind="src" path="src"/>
<classpathentry kind="src" path="test"/> <classpathentry kind="src" path="test"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/icu4j-54.1.1.jar"/> <classpathentry kind="lib" path="external/marytts-5.1.2/lib/icu4j-54.1.1.jar"/>
<classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-client-5.1.2-jar-with-dependencies.jar"/> <classpathentry kind="lib" path="external/marytts-5.1.2/lib/marytts-client-5.1.2-jar-with-dependencies.jar"/>
@ -46,5 +46,5 @@
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry combineaccessrules="false" kind="src" path="/zutil-java"/> <classpathentry combineaccessrules="false" kind="src" path="/zutil-java"/>
<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/> <classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
<classpathentry kind="output" path="lib/java-speech-api-master/bin"/> <classpathentry kind="output" path="build"/>
</classpath> </classpath>

View file

@ -46,6 +46,12 @@
<exec executable="sqlite3"> <exec executable="sqlite3">
<arg line="hal.db 'DELETE FROM sensor_data_aggr'" /> <arg line="hal.db 'DELETE FROM sensor_data_aggr'" />
</exec> </exec>
<!-- update all internal sensors aggregation version to indicate for peers that they need to re-sync all data -->
<exec executable="sqlite3">
<arg line="hal.db 'UPDATE sensor SET aggr_version = aggr_version+1) WHERE id = (SELECT sensor.id FROM user, sensor WHERE user.external == 0 AND sensor.user_id = user.id)'" />
</exec>
</target> </target>
<!--clean all build paths--> <!--clean all build paths-->
@ -58,12 +64,12 @@
<!--build product code--> <!--build product code-->
<target name="build" depends="build-dependencies"> <target name="build" depends="build-dependencies">
<mkdir dir="${buildDir}" /> <mkdir dir="${buildDir}" />
<javac srcdir="${srcDir}" destdir="${buildDir}" fork="yes"> <javac srcdir="${srcDir}" destdir="${buildDir}" debug="yes" fork="yes">
<include name="**/*.java" /> <include name="**/*.java" />
<exclude name="se/hal/tts/GoogleTTSClient.java" /> <exclude name="se/hal/tts/GoogleTTSClient.java" />
<classpath refid="classpath.build" /> <classpath refid="classpath.build" />
</javac> </javac>
<javac srcdir="${testDir}" destdir="${buildDir}" fork="yes"> <javac srcdir="${testDir}" destdir="${buildDir}" debug="yes" fork="yes">
<include name="**/*.java" /> <include name="**/*.java" />
<exclude name="se/hal/test/JarvisSyntersizerTest.java" /> <exclude name="se/hal/test/JarvisSyntersizerTest.java" />
<classpath refid="classpath.build" /> <classpath refid="classpath.build" />

Binary file not shown.

View file

@ -16,6 +16,8 @@ import java.sql.Statement;
import java.util.Properties; import java.util.Properties;
import java.util.logging.Logger; import java.util.logging.Logger;
import se.hal.struct.Sensor;
public class HalContext { public class HalContext {
private static final Logger logger = LogUtil.getLogger(); private static final Logger logger = LogUtil.getLogger();
@ -70,8 +72,8 @@ public class HalContext {
referenceDB.exec("SELECT * FROM conf", new PropertiesSQLResult()); referenceDB.exec("SELECT * FROM conf", new PropertiesSQLResult());
// Check DB version // Check DB version
logger.info("DB version: "+ dbConf.getProperty(PROPERTY_DB_VERSION)); logger.info("DB version: "+ dbConf.getProperty(PROPERTY_DB_VERSION));
int defaultDBVersion = Integer.parseInt(defaultDBConf.getProperty(PROPERTY_DB_VERSION)); final int defaultDBVersion = Integer.parseInt(defaultDBConf.getProperty(PROPERTY_DB_VERSION));
int dbVersion = (dbConf.getProperty(PROPERTY_DB_VERSION) != null ? final int dbVersion = (dbConf.getProperty(PROPERTY_DB_VERSION) != null ?
Integer.parseInt(dbConf.getProperty(PROPERTY_DB_VERSION)) : Integer.parseInt(dbConf.getProperty(PROPERTY_DB_VERSION)) :
-1); -1);
if(defaultDBVersion > dbVersion ) { if(defaultDBVersion > dbVersion ) {
@ -97,16 +99,6 @@ public class HalContext {
logger.fine("Forced upgrade enabled"); logger.fine("Forced upgrade enabled");
handler.setForcedDBUpgrade(true); //set to true if any of the intermediate db version requires it. handler.setForcedDBUpgrade(true); //set to true if any of the intermediate db version requires it.
} }
if(result.getBoolean("clear_external_aggr_data")){
logger.fine("Clearing external aggregate data");
db.exec("DELETE FROM sensor_data_aggr WHERE sensor_id = "
+ "(SELECT sensor_id FROM user, sensor WHERE user.external == 1 AND sensor.user_id = user.id)");
}
if(result.getBoolean("clear_internal_aggr_data")){
logger.fine("Clearing local aggregate data");
db.exec("DELETE FROM sensor_data_aggr WHERE sensor_id = "
+ "(SELECT sensor_id FROM user, sensor WHERE user.external == 0 AND sensor.user_id = user.id)");
}
} }
return null; return null;
} }
@ -114,6 +106,33 @@ public class HalContext {
handler.upgrade(); handler.upgrade();
logger.fine("Performing post-upgrade activities");
//read upgrade path preferences from the reference database
referenceDB.exec("SELECT * FROM db_version_history"
+ " WHERE db_version <= " + defaultDBVersion
+ " AND db_version > " + dbVersion,
new SQLResultHandler<Object>() {
@Override
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
while(result.next()){
if(result.getBoolean("clear_external_aggr_data")){
logger.fine("Clearing external aggregate data");
db.exec("DELETE FROM sensor_data_aggr WHERE sensor_id = "
+ "(SELECT sensor.id FROM user, sensor WHERE user.external == 1 AND sensor.user_id = user.id)");
}
if(result.getBoolean("clear_internal_aggr_data")){
logger.fine("Clearing local aggregate data");
db.exec("DELETE FROM sensor_data_aggr WHERE sensor_id = "
+ "(SELECT sensor.id FROM user, sensor WHERE user.external == 0 AND sensor.user_id = user.id)");
//update all internal sensors aggregation version to indicate for peers that they need to re-sync all data
db.exec("UPDATE sensor SET aggr_version = (aggr_version+1) WHERE id = "
+ "(SELECT sensor.id FROM user, sensor WHERE user.external == 0 AND sensor.user_id = user.id)");
}
}
return null;
}
});
logger.info("DB upgrade done"); logger.info("DB upgrade done");
dbConf.setProperty(PROPERTY_DB_VERSION, defaultDBConf.getProperty(PROPERTY_DB_VERSION)); dbConf.setProperty(PROPERTY_DB_VERSION, defaultDBConf.getProperty(PROPERTY_DB_VERSION));
storeProperties(); storeProperties();

View file

@ -6,6 +6,7 @@ import se.hal.intf.HalDaemon;
import se.hal.struct.Sensor; import se.hal.struct.Sensor;
import se.hal.struct.User; import se.hal.struct.User;
import zutil.db.DBConnection; import zutil.db.DBConnection;
import zutil.db.bean.DBBeanSQLResultHandler;
import zutil.log.LogUtil; import zutil.log.LogUtil;
import zutil.parser.json.JSONParser; import zutil.parser.json.JSONParser;
@ -89,9 +90,21 @@ public class PCDataSynchronizationClient implements HalDaemon {
SensorDataReqDTO req = new SensorDataReqDTO(); SensorDataReqDTO req = new SensorDataReqDTO();
req.sensorId = sensor.getExternalId(); req.sensorId = sensor.getExternalId();
req.offsetSequenceId = Sensor.getHighestSequenceId(sensor.getId()); req.offsetSequenceId = Sensor.getHighestSequenceId(sensor.getId());
req.aggregationVersion = sensor.getAggregationVersion();
out.writeObject(req); out.writeObject(req);
SensorDataListDTO dataList = (SensorDataListDTO) in.readObject(); SensorDataListDTO dataList = (SensorDataListDTO) in.readObject();
if(dataList.aggregationVersion != -1 && dataList.aggregationVersion != sensor.getAggregationVersion()){
logger.fine("The peer has modified its aggregated data in such a way that we need to reset the sync and start over on this side. oldAggregationVersion:"+sensor.getAggregationVersion()+" , newAggregationVersion:"+dataList.aggregationVersion);
//clear old aggregated data for sensor
logger.finer("Deleting all aggregated data for sensor");
sensor.clearAggregatedData(db);
//save new aggregationVersion
sensor.setAggregationVersion(dataList.aggregationVersion);
sensor.save(db);
}
for (SensorDataDTO data : dataList) { for (SensorDataDTO data : dataList) {
PreparedStatement stmt = db.getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)"); PreparedStatement stmt = db.getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)");
stmt.setLong(1, sensor.getId()); stmt.setLong(1, sensor.getId());
@ -138,5 +151,6 @@ public class PCDataSynchronizationClient implements HalDaemon {
public long sensorId; public long sensorId;
public long offsetSequenceId; // highest known sequence id public long offsetSequenceId; // highest known sequence id
public long aggregationVersion = -1;
} }
} }

View file

@ -99,10 +99,16 @@ public class PCDataSynchronizationDaemon extends ThreadedTCPNetworkServer implem
SensorDataReqDTO req = (SensorDataReqDTO) obj; SensorDataReqDTO req = (SensorDataReqDTO) obj;
Sensor sensor = Sensor.getSensor(db, req.sensorId); Sensor sensor = Sensor.getSensor(db, req.sensorId);
if(sensor.isSynced()) { if(sensor.isSynced()) {
PreparedStatement stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id > ?"); PreparedStatement stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id > ?");
stmt.setLong(1, sensor.getId()); stmt.setLong(1, sensor.getId());
stmt.setLong(2, req.offsetSequenceId); logger.fine("Client requesting sensor data: sensorId: " + req.sensorId + ", offset: " + req.offsetSequenceId + ", " + req.aggregationVersion);
logger.fine("Client requesting sensor data: sensorId: " + req.sensorId + ", offset: " + req.offsetSequenceId); if(req.aggregationVersion != -1 && req.aggregationVersion != sensor.getAggregationVersion()){
logger.fine("The requested aggregation version does not match the local version: " + sensor.getAggregationVersion() + ". Will re-send all aggregated data.");
stmt.setLong(2, 0); //0 since we want to re-send all data to the peer
}else{
stmt.setLong(2, req.offsetSequenceId);
}
SensorDataListDTO rsp = DBConnection.exec(stmt, new SQLResultHandler<SensorDataListDTO>() { SensorDataListDTO rsp = DBConnection.exec(stmt, new SQLResultHandler<SensorDataListDTO>() {
@Override @Override
public SensorDataListDTO handleQueryResult(Statement stmt, ResultSet result) throws SQLException { public SensorDataListDTO handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
@ -119,6 +125,7 @@ public class PCDataSynchronizationDaemon extends ThreadedTCPNetworkServer implem
return list; return list;
} }
}); });
rsp.aggregationVersion = sensor.getAggregationVersion();
logger.fine("Sending " + rsp.size() + " sensor data items to client"); logger.fine("Sending " + rsp.size() + " sensor data items to client");
out.writeObject(rsp); out.writeObject(rsp);
} }
@ -157,6 +164,8 @@ public class PCDataSynchronizationDaemon extends ThreadedTCPNetworkServer implem
protected static class SensorDataListDTO extends ArrayList<SensorDataDTO> implements Serializable{ protected static class SensorDataListDTO extends ArrayList<SensorDataDTO> implements Serializable{
private static final long serialVersionUID = -5701618637734020691L; private static final long serialVersionUID = -5701618637734020691L;
public long aggregationVersion = -1;
} }
protected static class SensorDataDTO implements Serializable{ protected static class SensorDataDTO implements Serializable{
private static final long serialVersionUID = 8494331502087736809L; private static final long serialVersionUID = 8494331502087736809L;

View file

@ -2,6 +2,7 @@ package se.hal.deamon;
import se.hal.HalContext; import se.hal.HalContext;
import se.hal.intf.HalDaemon; import se.hal.intf.HalDaemon;
import se.hal.intf.HalSensorData;
import se.hal.struct.Sensor; import se.hal.struct.Sensor;
import se.hal.intf.HalSensorData.AggregationMethod; import se.hal.intf.HalSensorData.AggregationMethod;
import se.hal.util.TimeUtility; import se.hal.util.TimeUtility;
@ -42,25 +43,29 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
} }
public void aggregateSensor(Sensor sensor) { public void aggregateSensor(Sensor sensor) {
//if(sensor.getSensorData() instanceof PowerConsumptionSensorData){ if(sensor.getDeviceData() == null){
logger.fine("The sensor is of type: " + sensor.getDeviceData().getClass().getName()); logger.fine("The sensor type is not supported - ignoring it");
logger.fine("aggregating raw data to five minute periods"); return;
aggregateRawData(sensor, TimeUtility.FIVE_MINUTES_IN_MS, 5); }
logger.fine("aggregating five minute periods into hour periods"); logger.fine("The sensor is of type: " + sensor.getDeviceData().getClass().getName());
aggregateAggregatedData(sensor, TimeUtility.FIVE_MINUTES_IN_MS, TimeUtility.HOUR_IN_MS);
logger.fine("aggregating one hour periods into one day periods"); logger.fine("aggregating raw data up to a day old into five minute periods");
aggregateAggregatedData(sensor, TimeUtility.HOUR_IN_MS, TimeUtility.DAY_IN_MS); aggregateRawData(sensor, TimeUtility.FIVE_MINUTES_IN_MS, TimeUtility.DAY_IN_MS, 5);
//}else{
// logger.fine("The sensor type("+ sensor.getSensorData().getClass().getName() +") is not supported by the aggregation daemon. Ignoring"); logger.fine("aggregating raw data up to a week old into five minute periods");
//} aggregateRawData(sensor, TimeUtility.HOUR_IN_MS, TimeUtility.WEEK_IN_MS, 60);
logger.fine("aggregating raw data into one day periods");
aggregateRawData(sensor, TimeUtility.DAY_IN_MS, TimeUtility.INFINITY, 60*24);
} }
/** /**
* Aggregate data from the raw DB table to the aggregated table * Aggregate data from the raw DB table to the aggregated table
* @param sensor The sensor for to aggregate data * @param sensor The sensor for to aggregate data
* @param ageLimitInMs Only aggregate up to this age
* @param toPeriodSizeInMs The period length in ms to aggregate to * @param toPeriodSizeInMs The period length in ms to aggregate to
*/ */
private void aggregateRawData(Sensor sensor, long toPeriodSizeInMs, int expectedSampleCount){ private void aggregateRawData(Sensor sensor, long toPeriodSizeInMs, long ageLimitInMs, int expectedSampleCount){
long sensorId = sensor.getId(); long sensorId = sensor.getId();
AggregationMethod aggrMethod = sensor.getAggregationMethod(); AggregationMethod aggrMethod = sensor.getAggregationMethod();
DBConnection db = HalContext.getDB(); DBConnection db = HalContext.getDB();
@ -69,69 +74,33 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr" stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr"
+ " WHERE sensor_id == ?" + " WHERE sensor_id == ?"
+ " AND timestamp_end-timestamp_start == ?"); + " AND timestamp_end-timestamp_start == ?");
stmt.setLong(1, sensorId); stmt.setLong(1, sensorId);
stmt.setLong(2, toPeriodSizeInMs-1); stmt.setLong(2, toPeriodSizeInMs-1);
Long maxTimestampFoundForSensor = DBConnection.exec(stmt, new SimpleSQLResult<Long>()); Long maxTimestampFoundForSensor = DBConnection.exec(stmt, new SimpleSQLResult<Long>());
if(maxTimestampFoundForSensor == null) if(maxTimestampFoundForSensor == null)
maxTimestampFoundForSensor = 0l; maxTimestampFoundForSensor = 0l;
long currentPeriodStartTimestamp = TimeUtility.getTimestampPeriodStart_UTC(toPeriodSizeInMs, System.currentTimeMillis()); long currentPeriodStartTimestamp = TimeUtility.getTimestampPeriodStart_UTC(toPeriodSizeInMs, System.currentTimeMillis());
logger.fine("Calculating periods... (from:"+ maxTimestampFoundForSensor +", to:"+ currentPeriodStartTimestamp +") with expected sample count: " + expectedSampleCount); logger.fine("Calculating periods... (from:"+ maxTimestampFoundForSensor +", to:"+ currentPeriodStartTimestamp +") with expected sample count: " + expectedSampleCount);
stmt = db.getPreparedStatement("SELECT *, 1 AS confidence, timestamp AS timestamp_start FROM sensor_data_raw"
stmt = db.getPreparedStatement("SELECT *, 1 AS confidence FROM sensor_data_raw"
+" WHERE sensor_id == ?" +" WHERE sensor_id == ?"
+ " AND ? < timestamp_start" + " AND timestamp > ?"
+ " AND timestamp_start < ? " + " AND timestamp < ? "
+" ORDER BY timestamp_start ASC"); + " AND timestamp > ? "
+" ORDER BY timestamp ASC");
stmt.setLong(1, sensorId); stmt.setLong(1, sensorId);
stmt.setLong(2, maxTimestampFoundForSensor); stmt.setLong(2, maxTimestampFoundForSensor);
stmt.setLong(3, currentPeriodStartTimestamp); stmt.setLong(3, currentPeriodStartTimestamp);
stmt.setLong(4, System.currentTimeMillis()-ageLimitInMs);
DBConnection.exec(stmt, new DataAggregator(sensorId, toPeriodSizeInMs, expectedSampleCount, aggrMethod)); DBConnection.exec(stmt, new DataAggregator(sensorId, toPeriodSizeInMs, expectedSampleCount, aggrMethod));
} catch (SQLException e) { } catch (SQLException e) {
logger.log(Level.SEVERE, null, e); logger.log(Level.SEVERE, null, e);
} }
} }
/**
* Re-aggregate data from the aggregated DB table to itself
* @param sensor The sensor for to aggregate data
* @param fromPeriodSizeInMs The period length in ms to aggregate from
* @param toPeriodSizeInMs The period length in ms to aggregate to
*/
private void aggregateAggregatedData(Sensor sensor, long fromPeriodSizeInMs, long toPeriodSizeInMs){
long sensorId = sensor.getId();
AggregationMethod aggrMethod = sensor.getAggregationMethod();
int expectedSampleCount = (int)Math.ceil((double)toPeriodSizeInMs / (double)fromPeriodSizeInMs);
DBConnection db = HalContext.getDB();
PreparedStatement stmt = null;
try {
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr"
+" WHERE sensor_id == ?"
+ " AND timestamp_end-timestamp_start == ?");
stmt.setLong(1, sensorId);
stmt.setLong(2, toPeriodSizeInMs-1);
Long maxTimestampFoundForSensor = DBConnection.exec(stmt, new SimpleSQLResult<Long>());
if(maxTimestampFoundForSensor == null)
maxTimestampFoundForSensor = 0l;
long currentPeriodStartTimestamp = TimeUtility.getTimestampPeriodStart_UTC(toPeriodSizeInMs, System.currentTimeMillis());
logger.fine("Calculating periods... (from:"+ maxTimestampFoundForSensor +", to:"+ currentPeriodStartTimestamp +") with expected sample count: " + expectedSampleCount);
stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr"
+" WHERE sensor_id == ?"
+ " AND ? < timestamp_start"
+ " AND timestamp_start <= ?"
+ " AND timestamp_end-timestamp_start == ?"
+" ORDER BY timestamp_start ASC");
stmt.setLong(1, sensorId);
stmt.setLong(2, maxTimestampFoundForSensor);
stmt.setLong(3, currentPeriodStartTimestamp);
stmt.setLong(4, fromPeriodSizeInMs-1);
DBConnection.exec(stmt, new DataAggregator(sensorId, toPeriodSizeInMs, expectedSampleCount, aggrMethod));
} catch (SQLException e) {
logger.log(Level.SEVERE, null, e);
}
}
/** /**
* Internal class for aggregating data to the aggregated DB table * Internal class for aggregating data to the aggregated DB table
*/ */
@ -164,7 +133,7 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
if(sensorId != result.getInt("sensor_id")){ if(sensorId != result.getInt("sensor_id")){
throw new IllegalArgumentException("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")"); throw new IllegalArgumentException("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")");
} }
long timestamp = result.getLong("timestamp_start"); long timestamp = result.getLong("timestamp");
long periodTimestamp = TimeUtility.getTimestampPeriodStart_UTC(this.aggrTimeInMs, timestamp); long periodTimestamp = TimeUtility.getTimestampPeriodStart_UTC(this.aggrTimeInMs, timestamp);
if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){ if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){
float aggrConfidence = confidenceSum / (float)this.expectedSampleCount; float aggrConfidence = confidenceSum / (float)this.expectedSampleCount;
@ -173,23 +142,25 @@ public class SensorDataAggregatorDaemon implements HalDaemon {
case SUM: data = sum; break; case SUM: data = sum; break;
case AVERAGE: data = sum/samples; break; case AVERAGE: data = sum/samples; break;
} }
logger.finer("Calculated day period: " + currentPeriodTimestamp + ", data: " + sum + ", confidence: " + aggrConfidence + ", samples: " + samples + ", aggrMethod: " + aggrMethod); logger.finer("Calculated period starting at timestamp: " + currentPeriodTimestamp + ", data: " + sum + ", confidence: " + aggrConfidence + ", samples: " + samples + ", aggrMethod: " + aggrMethod);
preparedInsertStmt.setInt(1, result.getInt("sensor_id")); preparedInsertStmt.setInt(1, result.getInt("sensor_id"));
preparedInsertStmt.setLong(2, ++highestSequenceId); preparedInsertStmt.setLong(2, ++highestSequenceId);
preparedInsertStmt.setLong(3, currentPeriodTimestamp); preparedInsertStmt.setLong(3, currentPeriodTimestamp);
preparedInsertStmt.setLong(4, currentPeriodTimestamp + this.aggrTimeInMs - 1); preparedInsertStmt.setLong(4, currentPeriodTimestamp + this.aggrTimeInMs - 1);
preparedInsertStmt.setInt(5, (int)data); //TODO: make data float in DB to handle aggrMethod.AVG where the data must be able to be saved as a float preparedInsertStmt.setFloat(5, data);
preparedInsertStmt.setFloat(6, aggrConfidence); preparedInsertStmt.setFloat(6, aggrConfidence);
preparedInsertStmt.addBatch(); preparedInsertStmt.addBatch();
// Reset variables // Reset variables
currentPeriodTimestamp = periodTimestamp; currentPeriodTimestamp = periodTimestamp;
confidenceSum = sum = samples = 0; confidenceSum = 0;
sum = 0;
samples = 0;
} }
if(currentPeriodTimestamp == 0) currentPeriodTimestamp = periodTimestamp; if(currentPeriodTimestamp == 0) currentPeriodTimestamp = periodTimestamp;
sum += result.getInt("data"); sum += result.getInt("data");
confidenceSum += result.getFloat("confidence"); confidenceSum += result.getFloat("confidence");
samples++; ++samples;
} }
DBConnection.execBatch(preparedInsertStmt); DBConnection.execBatch(preparedInsertStmt);

View file

@ -6,7 +6,6 @@ import se.hal.struct.Sensor;
import se.hal.util.TimeUtility; import se.hal.util.TimeUtility;
import zutil.db.DBConnection; import zutil.db.DBConnection;
import zutil.db.SQLResultHandler; import zutil.db.SQLResultHandler;
import zutil.db.handler.SimpleSQLResult;
import zutil.log.LogUtil; import zutil.log.LogUtil;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
@ -41,59 +40,13 @@ public class SensorDataCleanupDaemon implements HalDaemon {
} }
public void cleanupSensor(Sensor sensor) { public void cleanupSensor(Sensor sensor) {
//if(sensor instanceof PowerConsumptionSensorData){
//logger.fine("The sensor is of type: " + sensor.getDeviceData().getClass().getName()); // this will instantiate the external sensor data
if (sensor.getUser() != null) { if (sensor.getUser() != null) {
if (sensor.getUser().isExternal()) { cleanupSensorData(sensor.getId(), TimeUtility.FIVE_MINUTES_IN_MS, TimeUtility.DAY_IN_MS); //clear 5-minute data older than a day
cleanupExternalSensorData(sensor.getId(), TimeUtility.FIVE_MINUTES_IN_MS, TimeUtility.DAY_IN_MS); cleanupSensorData(sensor.getId(), TimeUtility.HOUR_IN_MS, TimeUtility.WEEK_IN_MS); //clear 1-hour data older than a week
cleanupExternalSensorData(sensor.getId(), TimeUtility.HOUR_IN_MS, TimeUtility.WEEK_IN_MS);
} else {
cleanupInternalSensorData(sensor.getId(), TimeUtility.HOUR_IN_MS, TimeUtility.FIVE_MINUTES_IN_MS, TimeUtility.DAY_IN_MS);
cleanupInternalSensorData(sensor.getId(), TimeUtility.DAY_IN_MS, TimeUtility.HOUR_IN_MS, TimeUtility.WEEK_IN_MS);
}
}
//}else{
// logger.fine("The sensor type is not supported by the cleanup deamon. Ignoring");
//}
}
/**
* Will clear periods only if it has been aggregated and are too old.
*
* @param sensorId
* @Param referencePeriodlength Will only clear periods older than the newest period of this length.
* @Param clearPeriodlength Will clear periods with this length
* @param olderThan Data must be older than this many ms to be cleared from the DB
*/
private void cleanupInternalSensorData(long sensorId, long referencePeriodlength, long clearPeriodlength, long olderThan){
DBConnection db = HalContext.getDB();
PreparedStatement stmt = null;
try {
Long maxDBTimestamp = null;
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr"
+" WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?");
stmt.setLong(1, sensorId);
stmt.setLong(2, referencePeriodlength-1);
maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult<Long>());
if(maxDBTimestamp == null)
maxDBTimestamp = 0l;
stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr"
+" WHERE sensor_id == ? "
+ "AND timestamp_end < ? "
+ "AND timestamp_end-timestamp_start == ?"
+ "AND timestamp_end < ?");
stmt.setLong(1, sensorId);
stmt.setLong(2, maxDBTimestamp);
stmt.setLong(3, clearPeriodlength-1);
stmt.setLong(4, System.currentTimeMillis()-olderThan);
DBConnection.exec(stmt, new AggregateDataDeleter(sensorId));
} catch (SQLException e) {
logger.log(Level.SEVERE, null, e);
} }
} }
/** /**
* Will clear periods if they are too old. * Will clear periods if they are too old.
* *
@ -101,7 +54,7 @@ public class SensorDataCleanupDaemon implements HalDaemon {
* @Param clearPeriodlength Will clear periods with this length * @Param clearPeriodlength Will clear periods with this length
* @param olderThan Data must be older than this many ms to be cleared from the DB * @param olderThan Data must be older than this many ms to be cleared from the DB
*/ */
private void cleanupExternalSensorData(long sensorId, long clearPeriodlength, long olderThan){ private void cleanupSensorData(long sensorId, long clearPeriodlength, long olderThan){
DBConnection db = HalContext.getDB(); DBConnection db = HalContext.getDB();
PreparedStatement stmt = null; PreparedStatement stmt = null;
try { try {

View file

@ -22,7 +22,7 @@ public class Sensor extends AbstractDevice<HalSensorData>{
private long external_id = -1; private long external_id = -1;
/** local sensor= if sensor should be public. external sensor= if sensor should be requested from host **/ /** local sensor= if sensor should be public. external sensor= if sensor should be requested from host **/
private boolean sync = false; private boolean sync = false;
private long aggr_version;
public static List<Sensor> getExternalSensors(DBConnection db) throws SQLException{ public static List<Sensor> getExternalSensors(DBConnection db) throws SQLException{
@ -61,7 +61,7 @@ public class Sensor extends AbstractDevice<HalSensorData>{
stmt.setLong(1, sensorId); stmt.setLong(1, sensorId);
Integer id = DBConnection.exec(stmt, new SimpleSQLResult<Integer>()); Integer id = DBConnection.exec(stmt, new SimpleSQLResult<Integer>());
return (id != null ? id : 0); return (id != null ? id : 0);
} }
@ -77,6 +77,12 @@ public class Sensor extends AbstractDevice<HalSensorData>{
public void setSynced(boolean synced) { public void setSynced(boolean synced) {
this.sync = synced; this.sync = synced;
} }
public long getAggregationVersion(){
return this.aggr_version;
}
public void setAggregationVersion(long aggr_version){
this.aggr_version = aggr_version;
}
public HalSensorData.AggregationMethod getAggregationMethod(){ public HalSensorData.AggregationMethod getAggregationMethod(){
@ -87,4 +93,10 @@ public class Sensor extends AbstractDevice<HalSensorData>{
return getDeviceData().getSensorController(); return getDeviceData().getSensorController();
} }
public void clearAggregatedData(DBConnection db) throws SQLException{
PreparedStatement stmt = db.getPreparedStatement( "DELETE FROM sensor_data_aggr WHERE sensor_id == ?" );
stmt.setLong(1, getId());
DBConnection.exec(stmt);
}
} }

View file

@ -9,6 +9,7 @@ public class TimeUtility {
public static final long HOUR_IN_MS = MINUTES_IN_MS * 60; public static final long HOUR_IN_MS = MINUTES_IN_MS * 60;
public static final long DAY_IN_MS = HOUR_IN_MS * 24; public static final long DAY_IN_MS = HOUR_IN_MS * 24;
public static final long WEEK_IN_MS = DAY_IN_MS * 7; public static final long WEEK_IN_MS = DAY_IN_MS * 7;
public static final long INFINITY = Long.MAX_VALUE; //sort of true
public static long getTimestampPeriodStart_UTC(long periodLengthInMs, long timestamp) throws NumberFormatException{ public static long getTimestampPeriodStart_UTC(long periodLengthInMs, long timestamp) throws NumberFormatException{
if(periodLengthInMs < 0 || timestamp < 0) if(periodLengthInMs < 0 || timestamp < 0)