diff --git a/.classpath b/.classpath
index 5906580d..fcd2d4db 100644
--- a/.classpath
+++ b/.classpath
@@ -1,31 +1,31 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/se/koc/hal/deamon/DataAggregatorDaemon.java b/src/se/koc/hal/deamon/DataAggregatorDaemon.java
index 6e457022..699f8769 100755
--- a/src/se/koc/hal/deamon/DataAggregatorDaemon.java
+++ b/src/se/koc/hal/deamon/DataAggregatorDaemon.java
@@ -20,8 +20,13 @@ import zutil.log.LogUtil;
public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
private static final Logger logger = LogUtil.getLogger();
+ private enum AggregationMethod{
+ SUM,
+ AVG
+ }
+
public void initiate(Timer timer){
- timer.schedule(this, 0, TimeConstants.FIVE_MINUTES_IN_MS);
+ timer.schedule(this, 0, TimeUtility.FIVE_MINUTES_IN_MS);
}
@Override
@@ -30,114 +35,146 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
List sensorList = Sensor.getLocalSensors(HalContext.getDB());
for(Sensor sensor : sensorList){
logger.fine("Aggregating sensor_id: " + sensor.getId());
- aggregateSensor(sensor.getId());
+ aggregateSensor(sensor);
}
logger.fine("Aggregation Done");
} catch (SQLException e) {
logger.log(Level.SEVERE, null, e);
}
}
-
- public void aggregateSensor(long sensorId) {
+
+ public void aggregateSensor(Sensor sensor) {
+ logger.fine("The sensor is of type: " + sensor.getType());
+ if(sensor.getType().equals("PowerMeter")){
+ logger.fine("aggregating raw data to five minute periods");
+ aggregateRawData(sensor.getId(), TimeUtility.FIVE_MINUTES_IN_MS, 5, AggregationMethod.SUM);
+ logger.fine("aggregating five minute periods into hour periods");
+ aggrigateAggregatedData(sensor.getId(), TimeUtility.FIVE_MINUTES_IN_MS, TimeUtility.HOUR_IN_MS, 12, AggregationMethod.SUM);
+ logger.fine("aggregating one hour periods into one day periods");
+ aggrigateAggregatedData(sensor.getId(), TimeUtility.HOUR_IN_MS, TimeUtility.DAY_IN_MS, 24, AggregationMethod.SUM);
+ }else{
+ logger.fine("The sensor type is not supported by the aggregation deamon. Ignoring");
+ }
+ }
+
+ /**
+ * Aggregate data from the raw DB table to the aggregated table
+ * @param sensorId The sensor for to aggregate data
+ * @param toPeriodSizeInMs The period length in ms to aggregate to
+ */
+ private void aggregateRawData(long sensorId, long toPeriodSizeInMs, int expectedSampleCount, AggregationMethod aggrMethod){
DBConnection db = HalContext.getDB();
PreparedStatement stmt = null;
try {
- stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr WHERE sensor_id == ?");
- stmt.setLong(1, sensorId);
- Long maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult());
- if(maxDBTimestamp == null)
- maxDBTimestamp = 0l;
- // 5 minute aggregation
- long minPeriodTimestamp = getTimestampPeriodStart(TimeConstants.FIVE_MINUTES_IN_MS, System.currentTimeMillis());
- logger.fine("Calculating 5 min periods... (from:"+ maxDBTimestamp +", to:"+ minPeriodTimestamp +")");
+ stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr"
+ + " WHERE sensor_id == ?"
+ + " AND timestamp_end-timestamp_start == ?");
+ stmt.setLong(1, sensorId);
+ stmt.setLong(2, toPeriodSizeInMs-1);
+ Long maxTimestampFoundForSensor = DBConnection.exec(stmt, new SimpleSQLResult());
+ if(maxTimestampFoundForSensor == null)
+ maxTimestampFoundForSensor = 0l;
+ long currentPeriodStartTimestamp = TimeUtility.getTimestampPeriodStart(toPeriodSizeInMs, System.currentTimeMillis());
+ logger.fine("Calculating periods... (from:"+ maxTimestampFoundForSensor +", to:"+ currentPeriodStartTimestamp +")");
stmt = db.getPreparedStatement("SELECT *, 1 AS confidence, timestamp AS timestamp_start FROM sensor_data_raw"
- +" WHERE sensor_id == ? AND ? < timestamp AND timestamp < ? "
+ +" WHERE sensor_id == ?"
+ + " AND ? < timestamp"
+ + " AND timestamp < ? "
+" ORDER BY timestamp ASC");
stmt.setLong(1, sensorId);
- stmt.setLong(2, maxDBTimestamp);
- stmt.setLong(3, minPeriodTimestamp);
- DBConnection.exec(stmt, new DataAggregator(sensorId, TimeConstants.FIVE_MINUTES_IN_MS, 5));
+ stmt.setLong(2, maxTimestampFoundForSensor);
+ stmt.setLong(3, currentPeriodStartTimestamp);
+ DBConnection.exec(stmt, new DataAggregator(sensorId, toPeriodSizeInMs, expectedSampleCount, aggrMethod));
+ } catch (SQLException e) {
+ logger.log(Level.SEVERE, null, e);
+ }
+ }
+
+ /**
+ * Re-aggregate data from the aggregated DB table to itself
+ * @param sensorId The sensor for to aggregate data
+ * @param fromPeriodSizeInMs The period length in ms to aggregate from
+ * @param toPeriodSizeInMs The period length in ms to aggregate to
+ */
+ private void aggrigateAggregatedData(long sensorId, long fromPeriodSizeInMs, long toPeriodSizeInMs, int expectedSampleCount, AggregationMethod aggrMethod){
+ DBConnection db = HalContext.getDB();
+ PreparedStatement stmt = null;
+ try {
- // hour aggregation
stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr"
- +" WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?");
+ +" WHERE sensor_id == ?"
+ + " AND timestamp_end-timestamp_start == ?");
stmt.setLong(1, sensorId);
- stmt.setLong(2, TimeConstants.HOUR_IN_MS-1);
- maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult());
- if(maxDBTimestamp == null)
- maxDBTimestamp = 0l;
- long hourPeriodTimestamp = getTimestampPeriodStart(TimeConstants.HOUR_IN_MS, System.currentTimeMillis());
- logger.fine("Calculating hour periods... (from:"+ maxDBTimestamp +", to:"+ hourPeriodTimestamp +")");
+ stmt.setLong(2, toPeriodSizeInMs-1);
+ Long maxTimestampFoundForSensor = DBConnection.exec(stmt, new SimpleSQLResult());
+ if(maxTimestampFoundForSensor == null)
+ maxTimestampFoundForSensor = 0l;
+ long currentPeriodStartTimestamp = TimeUtility.getTimestampPeriodStart(toPeriodSizeInMs, System.currentTimeMillis());
+ logger.fine("Calculating periods... (from:"+ maxTimestampFoundForSensor +", to:"+ currentPeriodStartTimestamp +")");
stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr"
- +" WHERE sensor_id == ? AND ? < timestamp_start AND timestamp_start < ? AND timestamp_end-timestamp_start == ?"
+ +" WHERE sensor_id == ?"
+ + " AND ? < timestamp_start"
+ + " AND timestamp_start < ?"
+ + " AND timestamp_end-timestamp_start == ?"
+" ORDER BY timestamp_start ASC");
stmt.setLong(1, sensorId);
- stmt.setLong(2, maxDBTimestamp);
- stmt.setLong(3, hourPeriodTimestamp);
- stmt.setLong(4, TimeConstants.FIVE_MINUTES_IN_MS-1);
- DBConnection.exec(stmt, new DataAggregator(sensorId, TimeConstants.HOUR_IN_MS, 12));
-
- // day aggregation
- stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?");
- stmt.setLong(1, sensorId);
- stmt.setLong(2, TimeConstants.DAY_IN_MS-1);
- maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult());
- if(maxDBTimestamp == null)
- maxDBTimestamp = 0l;
- long dayPeriodTimestamp = getTimestampPeriodStart(TimeConstants.DAY_IN_MS, System.currentTimeMillis());
- logger.fine("Calculating day periods... (from:"+ maxDBTimestamp +", to:"+ dayPeriodTimestamp +")");
-
- stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr"
- +" WHERE sensor_id == ? AND ? < timestamp_start AND timestamp_start < ? AND timestamp_end-timestamp_start == ?"
- +" ORDER BY timestamp_start ASC");
- stmt.setLong(1, sensorId);
- stmt.setLong(2, maxDBTimestamp);
- stmt.setLong(3, dayPeriodTimestamp);
- stmt.setLong(4, TimeConstants.HOUR_IN_MS-1);
- DBConnection.exec(stmt, new DataAggregator(sensorId, TimeConstants.DAY_IN_MS, 24));
-
+ stmt.setLong(2, maxTimestampFoundForSensor);
+ stmt.setLong(3, currentPeriodStartTimestamp);
+ stmt.setLong(4, fromPeriodSizeInMs-1);
+ DBConnection.exec(stmt, new DataAggregator(sensorId, toPeriodSizeInMs, expectedSampleCount, aggrMethod));
} catch (SQLException e) {
logger.log(Level.SEVERE, null, e);
}
}
+ /**
+ * Internal class for aggregating data to the aggregated DB table
+ */
private class DataAggregator implements SQLResultHandler