From 1fdc895ef4ad17a7782b76db015a394a4bdb1f6d Mon Sep 17 00:00:00 2001 From: dcollin Date: Fri, 4 Dec 2015 00:21:14 +0100 Subject: [PATCH] Added 5-minute, hour and day aggregator Former-commit-id: c02ec9ab1b0298c0b0e3f03b8ce5c3d6bdba21b2 --- .../koc/hal/deamon/DataAggregatorDaemon.java | 150 +++++++++++++++--- 1 file changed, 129 insertions(+), 21 deletions(-) diff --git a/src/se/koc/hal/deamon/DataAggregatorDaemon.java b/src/se/koc/hal/deamon/DataAggregatorDaemon.java index 9a6f7ab6..ab959876 100755 --- a/src/se/koc/hal/deamon/DataAggregatorDaemon.java +++ b/src/se/koc/hal/deamon/DataAggregatorDaemon.java @@ -3,6 +3,7 @@ package se.koc.hal.deamon; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Locale; import java.util.Timer; import java.util.TimerTask; import java.util.logging.Logger; @@ -21,6 +22,8 @@ import zutil.log.LogUtil; public class DataAggregatorDaemon extends TimerTask implements HalDaemon { private static final Logger logger = LogUtil.getLogger(); private static final int FIVE_MINUTES_IN_MS = 5 * 60 * 1000; + private static final int HOUR_IN_MS = FIVE_MINUTES_IN_MS * 12; + private static final int DAY_IN_MS = HOUR_IN_MS * 24; public void initiate(Timer timer){ @@ -33,28 +36,68 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon { public void run() { DBConnection db = PowerChallenge.db; try { - Long maxTimestampEnd = db.exec("SELECT MAX(timestamp_end) FROM sensor_data_aggr", new SimpleSQLHandler()); - if(maxTimestampEnd == null) - maxTimestampEnd = 0l; - logger.fine("Calculating 5 min periods..."); - long intervallTimestamp = getTimestampPeriodStart(5, System.currentTimeMillis()); - db.exec("SELECT * FROM sensor_data_raw WHERE timestamp > " + maxTimestampEnd + " AND timestamp < " + intervallTimestamp + " ORDER BY timestamp ASC", new FiveMinuteAgrrigator()); + Long maxDBTimestamp = db.exec("SELECT MAX(timestamp_end) FROM sensor_data_aggr", new SimpleSQLHandler()); + if(maxDBTimestamp == null) + maxDBTimestamp = 0l; + // 5 minute aggregation + long minPeriodTimestamp = getTimestampMinutePeriodStart(5, System.currentTimeMillis()); + logger.fine("Calculating 5 min periods... (from:"+ maxDBTimestamp +", to:"+ minPeriodTimestamp +")"); + db.exec("SELECT * FROM sensor_data_raw " + + "WHERE timestamp > " + maxDBTimestamp + " AND timestamp < " + minPeriodTimestamp + + " ORDER BY timestamp ASC", + new FiveMinuteAggregator()); + + // hour aggregation + maxDBTimestamp = db.exec("SELECT MAX(timestamp_end) FROM sensor_data_aggr WHERE timestamp_end-timestamp_start == " + (HOUR_IN_MS-1), new SimpleSQLHandler()); + if(maxDBTimestamp == null) + maxDBTimestamp = 0l; + long hourPeriodTimestamp = getTimestampMinutePeriodStart(60, System.currentTimeMillis()); + logger.fine("Calculating hour periods... (from:"+ maxDBTimestamp +", to:"+ minPeriodTimestamp +")"); + db.exec("SELECT * FROM sensor_data_aggr " + + "WHERE timestamp_start > " + maxDBTimestamp + " AND timestamp_start < " + hourPeriodTimestamp + " AND timestamp_end-timestamp_start == " + (FIVE_MINUTES_IN_MS-1) + +" ORDER BY timestamp_start ASC", + new HourAggregator()); + + // day aggregation + maxDBTimestamp = db.exec("SELECT MAX(timestamp_end) FROM sensor_data_aggr WHERE timestamp_end-timestamp_start == " + (DAY_IN_MS-1), new SimpleSQLHandler()); + if(maxDBTimestamp == null) + maxDBTimestamp = 0l; + long dayPeriodTimestamp = getTimestampHourPeriodStart(24, System.currentTimeMillis()); + logger.fine("Calculating day periods... (from:"+ maxDBTimestamp +", to:"+ minPeriodTimestamp +")"); + db.exec("SELECT * FROM sensor_data_aggr " + + "WHERE timestamp_start > " + maxDBTimestamp + " AND timestamp_start < " + dayPeriodTimestamp + " AND timestamp_end-timestamp_start == " + (HOUR_IN_MS-1) + +" ORDER BY timestamp_start ASC", + new DayAggregator()); + + + logger.fine("Done aggrigating"); } catch (SQLException e) { e.printStackTrace(); } } - - private static long getTimestampPeriodStart(int min, long timestamp){ + private static long getTimestampHourPeriodStart(int hour, long timestamp){ Calendar cal = Calendar.getInstance(); - cal.setTimeInMillis(System.currentTimeMillis()); - int currentMinute = cal.get(Calendar.MINUTE); - cal.set(Calendar.MINUTE, (currentMinute/min) * min); + cal.setTimeInMillis(timestamp); + int currentMinute = cal.get(Calendar.HOUR_OF_DAY); + cal.set(Calendar.HOUR_OF_DAY, (currentMinute/hour) * hour); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); return cal.getTimeInMillis(); } - private class FiveMinuteAgrrigator implements SQLResultHandler{ - + private static long getTimestampMinutePeriodStart(int min, long timestamp){ + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(timestamp); + int currentMinute = cal.get(Calendar.MINUTE); + cal.set(Calendar.MINUTE, (currentMinute/min) * min); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + return cal.getTimeInMillis(); + } + + private class FiveMinuteAggregator implements SQLResultHandler{ @Override public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException { long currentPeriodTimestamp = 0; @@ -62,11 +105,11 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon { int count = 0; while(result.next()){ long timestamp = result.getLong("timestamp"); - long periodTimestamp = getTimestampPeriodStart(5, timestamp); + long periodTimestamp = getTimestampMinutePeriodStart(5, timestamp); if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){ - float confidence = count / 5f; - logger.finer("Calculated period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ confidence); - PowerChallenge.db.exec(String.format("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(%d, %d, %d, %d, %d, %f)", + float confidence = Math.min(count / 5f, 1.0f); + logger.finer("Calculated minute period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ confidence); + PowerChallenge.db.exec(String.format(Locale.US, "INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(%d, %d, %d, %d, %d, %f)", result.getInt("sensor_id"), 42, currentPeriodTimestamp, @@ -75,16 +118,81 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon { confidence)); // Reset variables - periodTimestamp = currentPeriodTimestamp; - sum = count = 0; + currentPeriodTimestamp = periodTimestamp; + confidence = sum = 0; } if(currentPeriodTimestamp == 0) currentPeriodTimestamp = periodTimestamp; sum += result.getInt("data"); ++count; } return null; - } - + } } + private class HourAggregator implements SQLResultHandler{ + @Override + public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException { + long currentPeriodTimestamp = 0; + int sum = 0; + float confidenceSum = 0; + while(result.next()){ + long timestamp = result.getLong("timestamp_start"); + long periodTimestamp = getTimestampMinutePeriodStart(60, timestamp); + if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){ + float aggrConfidence = confidenceSum / 12f; + logger.finer("Calculated hour period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ aggrConfidence); + PowerChallenge.db.exec(String.format(Locale.US, "INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(%d, %d, %d, %d, %d, %f)", + result.getInt("sensor_id"), + 42, + currentPeriodTimestamp, + currentPeriodTimestamp + HOUR_IN_MS -1, + sum, + aggrConfidence)); + + // Reset variables + currentPeriodTimestamp = periodTimestamp; + confidenceSum = sum = 0; + } + if(currentPeriodTimestamp == 0) currentPeriodTimestamp = periodTimestamp; + sum += result.getInt("data"); + confidenceSum += result.getFloat("confidence"); + } + return null; + } + } + + private class DayAggregator implements SQLResultHandler{ + @Override + public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException { + long currentPeriodTimestamp = 0; + int sum = 0; + float confidenceSum = 0; + int samples = 0; + while(result.next()){ + long timestamp = result.getLong("timestamp_start"); + long periodTimestamp = getTimestampHourPeriodStart(24, timestamp); + if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){ + float aggrConfidence = confidenceSum / 24f; + logger.finer("Calculated day period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ aggrConfidence+ " samples: " + samples); + PowerChallenge.db.exec(String.format(Locale.US, "INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(%d, %d, %d, %d, %d, %f)", + result.getInt("sensor_id"), + 42, + currentPeriodTimestamp, + currentPeriodTimestamp + DAY_IN_MS -1, + sum, + aggrConfidence)); + + // Reset variables + currentPeriodTimestamp = periodTimestamp; + confidenceSum = sum = 0; + samples = 0; + } + if(currentPeriodTimestamp == 0) currentPeriodTimestamp = periodTimestamp; + sum += result.getInt("data"); + confidenceSum += result.getFloat("confidence"); + samples++; + } + return null; + } + } }