From b421206ce3e9381fa94617a22eb32ac6660a77a5 Mon Sep 17 00:00:00 2001 From: dcollin Date: Fri, 11 Dec 2015 17:31:13 +0100 Subject: [PATCH] Refactoring code a lot. - Common inner class in DataAggregatorDeamon for all period lenghts. - Removed deletion of data from DataAggregator and DataSynchronizationClient and put it in a new DataDeletionDeamon - Moved time constants to a common TimeConstants class Former-commit-id: 523ce7ad73b6edb4a507adcfa33b04742b170f25 --- src/se/koc/hal/PowerChallenge.java | 4 +- .../koc/hal/deamon/DataAggregatorDaemon.java | 279 +++++------------- src/se/koc/hal/deamon/DataDeletionDaemon.java | 131 ++++++++ .../hal/deamon/DataSynchronizationClient.java | 9 +- src/se/koc/hal/deamon/TimeConstants.java | 8 + .../deamon/ValueOutsideOfRangeException.java | 9 + src/se/koc/hal/page/PCOverviewHttpPage.java | 114 ++++--- .../plugin/localsensor/ImpulseTracker.java | 37 ++- src/se/koc/hal/struct/Sensor.java | 5 + web-resource/overview.tmpl | 52 ++-- 10 files changed, 344 insertions(+), 304 deletions(-) create mode 100644 src/se/koc/hal/deamon/DataDeletionDaemon.java create mode 100644 src/se/koc/hal/deamon/TimeConstants.java create mode 100644 src/se/koc/hal/deamon/ValueOutsideOfRangeException.java diff --git a/src/se/koc/hal/PowerChallenge.java b/src/se/koc/hal/PowerChallenge.java index 964dd416..3cee26bd 100755 --- a/src/se/koc/hal/PowerChallenge.java +++ b/src/se/koc/hal/PowerChallenge.java @@ -2,6 +2,7 @@ package se.koc.hal; import se.koc.hal.deamon.DataAggregatorDaemon; +import se.koc.hal.deamon.DataDeletionDaemon; import se.koc.hal.deamon.DataSynchronizationClient; import se.koc.hal.deamon.DataSynchronizationDaemon; import se.koc.hal.deamon.HalDaemon; @@ -43,7 +44,8 @@ public class PowerChallenge { daemons = new HalDaemon[]{ new DataAggregatorDaemon(), new DataSynchronizationDaemon(), - new DataSynchronizationClient() + new DataSynchronizationClient(), + new DataDeletionDaemon() }; Timer daemonTimer = new Timer(); for(HalDaemon daemon : daemons){ diff --git a/src/se/koc/hal/deamon/DataAggregatorDaemon.java b/src/se/koc/hal/deamon/DataAggregatorDaemon.java index cc503f7d..f4860e10 100755 --- a/src/se/koc/hal/deamon/DataAggregatorDaemon.java +++ b/src/se/koc/hal/deamon/DataAggregatorDaemon.java @@ -4,7 +4,6 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.Calendar; import java.util.List; import java.util.Timer; import java.util.TimerTask; @@ -19,16 +18,9 @@ import zutil.log.LogUtil; public class DataAggregatorDaemon extends TimerTask implements HalDaemon { private static final Logger logger = LogUtil.getLogger(); - public static final long FIVE_MINUTES_IN_MS = 5 * 60 * 1000; - public static final long HOUR_IN_MS = FIVE_MINUTES_IN_MS * 12; - public static final long DAY_IN_MS = HOUR_IN_MS * 24; - private static final long HOUR_AGGREGATION_OFFSET = DAY_IN_MS; - private static final long DAY_AGGREGATION_OFFSET = DAY_IN_MS * 3; - - public void initiate(Timer timer){ - timer.schedule(this, 0, FIVE_MINUTES_IN_MS); + timer.schedule(this, 0, TimeConstants.FIVE_MINUTES_IN_MS); } @Override @@ -55,25 +47,25 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon { maxDBTimestamp = 0l; // 5 minute aggregation - long minPeriodTimestamp = getTimestampMinutePeriodStart(5, System.currentTimeMillis()); + long minPeriodTimestamp = getTimestampPeriodStart(TimeConstants.FIVE_MINUTES_IN_MS, System.currentTimeMillis()); logger.fine("Calculating 5 min periods... (from:"+ maxDBTimestamp +", to:"+ minPeriodTimestamp +")"); - stmt = db.getPreparedStatement("SELECT * FROM sensor_data_raw" + stmt = db.getPreparedStatement("SELECT *, 1 AS confidence, timestamp AS timestamp_start FROM sensor_data_raw" +" WHERE sensor_id == ? AND ? < timestamp AND timestamp < ? " +" ORDER BY timestamp ASC"); stmt.setLong(1, sensorId); stmt.setLong(2, maxDBTimestamp); stmt.setLong(3, minPeriodTimestamp); - DBConnection.exec(stmt, new FiveMinuteAggregator(sensorId)); + DBConnection.exec(stmt, new DataAggregator(sensorId, TimeConstants.FIVE_MINUTES_IN_MS, 5)); // hour aggregation stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr" +" WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?"); stmt.setLong(1, sensorId); - stmt.setLong(2, HOUR_IN_MS-1); + stmt.setLong(2, TimeConstants.HOUR_IN_MS-1); maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult()); if(maxDBTimestamp == null) maxDBTimestamp = 0l; - long hourPeriodTimestamp = getTimestampMinutePeriodStart(60, System.currentTimeMillis()-HOUR_AGGREGATION_OFFSET); + long hourPeriodTimestamp = getTimestampPeriodStart(TimeConstants.HOUR_IN_MS, System.currentTimeMillis()); logger.fine("Calculating hour periods... (from:"+ maxDBTimestamp +", to:"+ hourPeriodTimestamp +")"); stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr" +" WHERE sensor_id == ? AND ? < timestamp_start AND timestamp_start < ? AND timestamp_end-timestamp_start == ?" @@ -81,17 +73,17 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon { stmt.setLong(1, sensorId); stmt.setLong(2, maxDBTimestamp); stmt.setLong(3, hourPeriodTimestamp); - stmt.setLong(4, FIVE_MINUTES_IN_MS-1); - DBConnection.exec(stmt, new HourAggregator(sensorId)); + stmt.setLong(4, TimeConstants.FIVE_MINUTES_IN_MS-1); + DBConnection.exec(stmt, new DataAggregator(sensorId, TimeConstants.HOUR_IN_MS, 12)); // day aggregation stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?"); stmt.setLong(1, sensorId); - stmt.setLong(2, DAY_IN_MS-1); + stmt.setLong(2, TimeConstants.DAY_IN_MS-1); maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult()); if(maxDBTimestamp == null) maxDBTimestamp = 0l; - long dayPeriodTimestamp = getTimestampHourPeriodStart(24, System.currentTimeMillis()-DAY_AGGREGATION_OFFSET); + long dayPeriodTimestamp = getTimestampPeriodStart(TimeConstants.DAY_IN_MS, System.currentTimeMillis()); logger.fine("Calculating day periods... (from:"+ maxDBTimestamp +", to:"+ dayPeriodTimestamp +")"); stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr" +" WHERE sensor_id == ? AND ? < timestamp_start AND timestamp_start < ? AND timestamp_end-timestamp_start == ?" @@ -99,210 +91,89 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon { stmt.setLong(1, sensorId); stmt.setLong(2, maxDBTimestamp); stmt.setLong(3, dayPeriodTimestamp); - stmt.setLong(4, HOUR_IN_MS-1); - DBConnection.exec(stmt, new DayAggregator(sensorId)); - + stmt.setLong(4, TimeConstants.HOUR_IN_MS-1); + DBConnection.exec(stmt, new DataAggregator(sensorId, TimeConstants.DAY_IN_MS, 24)); + logger.fine("Done aggregation"); } catch (SQLException e) { e.printStackTrace(); + } catch (ValueOutsideOfRangeException e) { + e.printStackTrace(); } } - private static long getTimestampHourPeriodStart(int hour, long timestamp){ - Calendar cal = Calendar.getInstance(); - cal.setTimeInMillis(timestamp); - int currentMinute = cal.get(Calendar.HOUR_OF_DAY); - cal.set(Calendar.HOUR_OF_DAY, (currentMinute/hour) * hour); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - return cal.getTimeInMillis(); - } - - private static long getTimestampMinutePeriodStart(int min, long timestamp){ - Calendar cal = Calendar.getInstance(); - cal.setTimeInMillis(timestamp); - int currentMinute = cal.get(Calendar.MINUTE); - cal.set(Calendar.MINUTE, (currentMinute/min) * min); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - return cal.getTimeInMillis(); - } - - - private class FiveMinuteAggregator implements SQLResultHandler{ + private class DataAggregator implements SQLResultHandler{ private long sensorId = -1; - public FiveMinuteAggregator(long sensorId) { - this.sensorId = sensorId; - } - @Override - public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException { - long currentPeriodTimestamp = 0; - int sum = 0; - int count = 0; - int samples = 0; - long highestSequenceId = Sensor.getHighestSequenceId(sensorId); - HalContext.getDB().getConnection().setAutoCommit(false); - boolean error = false; - PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)"); - while(result.next()){ - if(sensorId != result.getInt("sensor_id")){ - logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")"); - error = true; - break; - } - long timestamp = result.getLong("timestamp"); - long periodTimestamp = getTimestampMinutePeriodStart(5, timestamp); - if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){ - float confidence = count / 5f; - logger.finer("Calculated minute period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ confidence+ " samples: " + samples); - preparedInsertStmt.setLong(1, sensorId); - preparedInsertStmt.setLong(2, ++highestSequenceId); - preparedInsertStmt.setLong(3, currentPeriodTimestamp); - preparedInsertStmt.setLong(4, currentPeriodTimestamp + FIVE_MINUTES_IN_MS - 1); - preparedInsertStmt.setInt(5, sum); - preparedInsertStmt.setFloat(6, confidence); - preparedInsertStmt.addBatch(); - //DBConnection.exec(prepStmt); - - // Reset variables - currentPeriodTimestamp = periodTimestamp; - sum = count = samples = 0; - } - if(currentPeriodTimestamp == 0) currentPeriodTimestamp = periodTimestamp; - sum += result.getInt("data"); - ++count; - ++samples; - } - if(!error){ - DBConnection.execBatch(preparedInsertStmt); - HalContext.getDB().getConnection().commit(); - }else{ - HalContext.getDB().getConnection().rollback(); - } - HalContext.getDB().getConnection().setAutoCommit(true); - return null; - } - } - - private class HourAggregator implements SQLResultHandler{ - private long sensorId = -1; - public HourAggregator(long sensorId) { + private long aggrTimeInMs = -1; + private int expectedSampleCount = -1; + + public DataAggregator(long sensorId, long aggrTimeInMs, int expectedSampleCount) { this.sensorId = sensorId; + this.aggrTimeInMs = aggrTimeInMs; + this.expectedSampleCount = expectedSampleCount; } + @Override public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException { - long currentPeriodTimestamp = 0; - int sum = 0; - float confidenceSum = 0; - int samples = 0; - long highestSequenceId = Sensor.getHighestSequenceId(sensorId); - HalContext.getDB().getConnection().setAutoCommit(false); - boolean error = false; - PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)"); - PreparedStatement preparedDeleteStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?"); - while(result.next()){ - if(sensorId != result.getInt("sensor_id")){ - logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")"); - error = true; - break; + try{ + long currentPeriodTimestamp = 0; + int sum = 0; + float confidenceSum = 0; + int samples = 0; + long highestSequenceId = Sensor.getHighestSequenceId(sensorId); + HalContext.getDB().getConnection().setAutoCommit(false); + boolean rollback = false; + PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)"); + while(result.next()){ + if(sensorId != result.getInt("sensor_id")){ + logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")"); + rollback = true; + break; + } + long timestamp = result.getLong("timestamp_start"); + long periodTimestamp = getTimestampPeriodStart(this.aggrTimeInMs, timestamp); + if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){ + float aggrConfidence = confidenceSum / (float)this.expectedSampleCount; + logger.finer("Calculated day period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ aggrConfidence+ " samples: " + samples); + preparedInsertStmt.setInt(1, result.getInt("sensor_id")); + preparedInsertStmt.setLong(2, ++highestSequenceId); + preparedInsertStmt.setLong(3, currentPeriodTimestamp); + preparedInsertStmt.setLong(4, currentPeriodTimestamp + this.aggrTimeInMs - 1); + preparedInsertStmt.setInt(5, sum); + preparedInsertStmt.setFloat(6, aggrConfidence); + preparedInsertStmt.addBatch(); + + // Reset variables + currentPeriodTimestamp = periodTimestamp; + confidenceSum = sum = samples = 0; + } + if(currentPeriodTimestamp == 0) currentPeriodTimestamp = periodTimestamp; + sum += result.getInt("data"); + confidenceSum += result.getFloat("confidence"); + samples++; } - long timestamp = result.getLong("timestamp_start"); - long periodTimestamp = getTimestampMinutePeriodStart(60, timestamp); - if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){ - float aggrConfidence = confidenceSum / 12f; - logger.finer("Calculated hour period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ aggrConfidence+ " samples: " + samples); - preparedInsertStmt.setInt(1, result.getInt("sensor_id")); - preparedInsertStmt.setLong(2, ++highestSequenceId); - preparedInsertStmt.setLong(3, currentPeriodTimestamp); - preparedInsertStmt.setLong(4, currentPeriodTimestamp + HOUR_IN_MS - 1); - preparedInsertStmt.setInt(5, sum); - preparedInsertStmt.setFloat(6, aggrConfidence); - preparedInsertStmt.addBatch(); - - // Reset variables - currentPeriodTimestamp = periodTimestamp; - confidenceSum = sum = samples = 0; + if(!rollback){ + DBConnection.execBatch(preparedInsertStmt); + HalContext.getDB().getConnection().commit(); + }else{ + HalContext.getDB().getConnection().rollback(); } - if(currentPeriodTimestamp == 0) currentPeriodTimestamp = periodTimestamp; - sum += result.getInt("data"); - confidenceSum += result.getFloat("confidence"); - samples++; - - preparedDeleteStmt.setInt(1, result.getInt("sensor_id")); - preparedDeleteStmt.setInt(2, result.getInt("sequence_id")); - preparedDeleteStmt.addBatch(); - } - if(!error){ - DBConnection.execBatch(preparedInsertStmt); - DBConnection.execBatch(preparedDeleteStmt); - HalContext.getDB().getConnection().commit(); - }else{ + }catch(SQLException e){ HalContext.getDB().getConnection().rollback(); + throw e; + } catch (ValueOutsideOfRangeException e) { + HalContext.getDB().getConnection().rollback(); + e.printStackTrace(); + }finally{ + HalContext.getDB().getConnection().setAutoCommit(true); } - HalContext.getDB().getConnection().setAutoCommit(true); return null; } } - private class DayAggregator implements SQLResultHandler{ - private long sensorId = -1; - public DayAggregator(long sensorId) { - this.sensorId = sensorId; - } - @Override - public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException { - long currentPeriodTimestamp = 0; - int sum = 0; - float confidenceSum = 0; - int samples = 0; - long highestSequenceId = Sensor.getHighestSequenceId(sensorId); - HalContext.getDB().getConnection().setAutoCommit(false); - boolean error = false; - PreparedStatement preparedInsertStmt = HalContext.getDB().getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)"); - PreparedStatement preparedDeleteStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?"); - while(result.next()){ - if(sensorId != result.getInt("sensor_id")){ - logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")"); - error = true; - break; - } - long timestamp = result.getLong("timestamp_start"); - long periodTimestamp = getTimestampHourPeriodStart(24, timestamp); - if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){ - float aggrConfidence = confidenceSum / 24f; - logger.finer("Calculated day period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ aggrConfidence+ " samples: " + samples); - preparedInsertStmt.setInt(1, result.getInt("sensor_id")); - preparedInsertStmt.setLong(2, ++highestSequenceId); - preparedInsertStmt.setLong(3, currentPeriodTimestamp); - preparedInsertStmt.setLong(4, currentPeriodTimestamp + DAY_IN_MS - 1); - preparedInsertStmt.setInt(5, sum); - preparedInsertStmt.setFloat(6, aggrConfidence); - preparedInsertStmt.addBatch(); - - // Reset variables - currentPeriodTimestamp = periodTimestamp; - confidenceSum = sum = 0; - samples = 0; - } - if(currentPeriodTimestamp == 0) currentPeriodTimestamp = periodTimestamp; - sum += result.getInt("data"); - confidenceSum += result.getFloat("confidence"); - samples++; - - preparedDeleteStmt.setInt(1, result.getInt("sensor_id")); - preparedDeleteStmt.setInt(2, result.getInt("sequence_id")); - preparedDeleteStmt.addBatch(); - } - if(!error){ - DBConnection.execBatch(preparedInsertStmt); - DBConnection.execBatch(preparedDeleteStmt); - HalContext.getDB().getConnection().commit(); - }else{ - HalContext.getDB().getConnection().rollback(); - } - HalContext.getDB().getConnection().setAutoCommit(true); - return null; - } + private static long getTimestampPeriodStart(long periodLengthInMs, long timestamp) throws ValueOutsideOfRangeException{ + long tmp = timestamp % periodLengthInMs; + return timestamp - tmp; } + } diff --git a/src/se/koc/hal/deamon/DataDeletionDaemon.java b/src/se/koc/hal/deamon/DataDeletionDaemon.java new file mode 100644 index 00000000..812f3953 --- /dev/null +++ b/src/se/koc/hal/deamon/DataDeletionDaemon.java @@ -0,0 +1,131 @@ +package se.koc.hal.deamon; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import java.util.logging.Logger; + +import se.koc.hal.HalContext; +import se.koc.hal.struct.Sensor; +import zutil.db.DBConnection; +import zutil.db.SQLResultHandler; +import zutil.db.handler.SimpleSQLResult; +import zutil.log.LogUtil; + +public class DataDeletionDaemon extends TimerTask implements HalDaemon { + private static final Logger logger = LogUtil.getLogger(); + + public void initiate(Timer timer){ + timer.schedule(this, 5000, TimeConstants.FIVE_MINUTES_IN_MS); + } + + @Override + public void run(){ + try { + List sensorList = Sensor.getSensors(HalContext.getDB()); + for(Sensor sensor : sensorList){ + logger.fine("Deleting old data for sensor_id: " + sensor.getId()); + aggregateSensor(sensor.getId()); + } + } catch (SQLException e) { + e.printStackTrace(); + } + } + + public void aggregateSensor(long sensorId) { + DBConnection db = HalContext.getDB(); + PreparedStatement stmt = null; + try { + + Long maxDBTimestamp = null; + + // delete too old 5 minute periods that already have been aggregated into hours + stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr" + +" WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?"); + stmt.setLong(1, sensorId); + stmt.setLong(2, TimeConstants.HOUR_IN_MS-1); + maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult()); + if(maxDBTimestamp == null) + maxDBTimestamp = 0l; + stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr" + +" WHERE sensor_id == ? " + + "AND timestamp_end < ? " + + "AND timestamp_end-timestamp_start == ?" + + "AND timestamp_end < ?"); + stmt.setLong(1, sensorId); + stmt.setLong(2, maxDBTimestamp); + stmt.setLong(3, TimeConstants.FIVE_MINUTES_IN_MS-1); + stmt.setLong(4, System.currentTimeMillis()-TimeConstants.DAY_IN_MS); + DBConnection.exec(stmt, new AggrDataDeletor(sensorId)); + + // delete too old 1 hour periods that already have been aggregated into days + stmt = db.getPreparedStatement("SELECT MAX(timestamp_end) FROM sensor_data_aggr" + +" WHERE sensor_id == ? AND timestamp_end-timestamp_start == ?"); + stmt.setLong(1, sensorId); + stmt.setLong(2, TimeConstants.DAY_IN_MS-1); + maxDBTimestamp = DBConnection.exec(stmt, new SimpleSQLResult()); + if(maxDBTimestamp == null) + maxDBTimestamp = 0l; + stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr" + +" WHERE sensor_id == ? " + + "AND timestamp_end < ? " + + "AND timestamp_end-timestamp_start == ?" + + "AND timestamp_end < ?"); + stmt.setLong(1, sensorId); + stmt.setLong(2, maxDBTimestamp); + stmt.setLong(3, TimeConstants.HOUR_IN_MS-1); + stmt.setLong(4, System.currentTimeMillis()-TimeConstants.SEVEN_DAYS_IN_MS); + DBConnection.exec(stmt, new AggrDataDeletor(sensorId)); + + logger.fine("Done deleting"); + } catch (SQLException e) { + e.printStackTrace(); + } + } + + private class AggrDataDeletor implements SQLResultHandler{ + private long sensorId = -1; + + public AggrDataDeletor(long sensorId){ + this.sensorId = sensorId; + } + + @Override + public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException { + try{ + HalContext.getDB().getConnection().setAutoCommit(false); + boolean rollback = false; + PreparedStatement preparedDeleteStmt = HalContext.getDB().getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id == ?"); + while(result.next()){ + if(sensorId != result.getInt("sensor_id")){ + logger.severe("found entry for aggregation for the wrong sensorId (expecting: "+sensorId+", but was: "+result.getInt("sensor_id")+")"); + rollback = true; + break; + } + logger.finer("Deleted period: "+ result.getLong("timestamp_start")); + preparedDeleteStmt.setInt(1, result.getInt("sensor_id")); + preparedDeleteStmt.setLong(2, result.getLong("sequence_id")); + preparedDeleteStmt.addBatch(); + } + if(!rollback){ + DBConnection.execBatch(preparedDeleteStmt); + HalContext.getDB().getConnection().commit(); + }else{ + HalContext.getDB().getConnection().rollback(); + } + }catch(Exception e){ + HalContext.getDB().getConnection().rollback(); + e.printStackTrace(); + }finally{ + HalContext.getDB().getConnection().setAutoCommit(true); + } + return null; + } + + } + +} diff --git a/src/se/koc/hal/deamon/DataSynchronizationClient.java b/src/se/koc/hal/deamon/DataSynchronizationClient.java index 8abeaf6f..5489ce44 100755 --- a/src/se/koc/hal/deamon/DataSynchronizationClient.java +++ b/src/se/koc/hal/deamon/DataSynchronizationClient.java @@ -55,14 +55,7 @@ public class DataSynchronizationClient extends TimerTask implements HalDaemon{ SensorDataListDTO dataList = (SensorDataListDTO) in.readObject(); for(SensorDataDTO data : dataList){ - PreparedStatement stmt = db.getPreparedStatement("DELETE FROM sensor_data_aggr WHERE sensor_id == ? AND ? <= timestamp_start AND timestamp_end <= ?"); - stmt.setLong(1, sensor.getId()); - stmt.setLong(2, data.timestampStart); - stmt.setLong(3, data.timestampEnd); - int deletions = DBConnection.exec(stmt); - if(deletions > 0) - logger.finer("Aggregate data replaced "+ deletions +" entries"); - stmt = db.getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)"); + PreparedStatement stmt = db.getPreparedStatement("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(?, ?, ?, ?, ?, ?)"); stmt.setLong(1, sensor.getId()); stmt.setLong(2, data.sequenceId); stmt.setLong(3, data.timestampStart); diff --git a/src/se/koc/hal/deamon/TimeConstants.java b/src/se/koc/hal/deamon/TimeConstants.java new file mode 100644 index 00000000..6d54be32 --- /dev/null +++ b/src/se/koc/hal/deamon/TimeConstants.java @@ -0,0 +1,8 @@ +package se.koc.hal.deamon; + +public class TimeConstants { + public static final long FIVE_MINUTES_IN_MS = 5 * 60 * 1000; + public static final long HOUR_IN_MS = FIVE_MINUTES_IN_MS * 12; + public static final long DAY_IN_MS = HOUR_IN_MS * 24; + public static final long SEVEN_DAYS_IN_MS = DAY_IN_MS * 7; +} diff --git a/src/se/koc/hal/deamon/ValueOutsideOfRangeException.java b/src/se/koc/hal/deamon/ValueOutsideOfRangeException.java new file mode 100644 index 00000000..bbb3330f --- /dev/null +++ b/src/se/koc/hal/deamon/ValueOutsideOfRangeException.java @@ -0,0 +1,9 @@ +package se.koc.hal.deamon; + +public class ValueOutsideOfRangeException extends Exception { + + public ValueOutsideOfRangeException(String string) { + super(string); + } + +} diff --git a/src/se/koc/hal/page/PCOverviewHttpPage.java b/src/se/koc/hal/page/PCOverviewHttpPage.java index 22f8dc9a..80e44e0c 100755 --- a/src/se/koc/hal/page/PCOverviewHttpPage.java +++ b/src/se/koc/hal/page/PCOverviewHttpPage.java @@ -1,6 +1,5 @@ package se.koc.hal.page; -import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -9,91 +8,86 @@ import java.util.ArrayList; import java.util.Map; import se.koc.hal.HalContext; -import se.koc.hal.deamon.DataAggregatorDaemon; +import se.koc.hal.deamon.TimeConstants; import zutil.db.DBConnection; import zutil.db.SQLResultHandler; import zutil.io.file.FileUtil; -import zutil.net.http.HttpHeaderParser; -import zutil.net.http.HttpPage; -import zutil.net.http.HttpPrintStream; import zutil.parser.Templator; public class PCOverviewHttpPage extends HalHttpPage { - public PCOverviewHttpPage() { - super("Overview", "overview"); - } + public PCOverviewHttpPage() { + super("Overview", "overview"); + } - @Override + @Override public Templator httpRespond( Map session, Map cookie, Map request) - throws Exception{ - + throws Exception{ DBConnection db = HalContext.getDB(); PreparedStatement stmt = db.getPreparedStatement( - "SELECT user.username as username," - + " sensor_data_aggr.timestamp_start as timestamp_start," - + " sensor_data_aggr.timestamp_end as timestamp_end," - + " sensor_data_aggr.data as data," - + " sensor_data_aggr.confidence as confidence," - + DataAggregatorDaemon.FIVE_MINUTES_IN_MS + " as period_length" - + " FROM sensor_data_aggr, user, sensor" - + " WHERE sensor.id = sensor_data_aggr.sensor_id" - + " AND user.id = sensor.user_id" - + " AND timestamp_end-timestamp_start == ?" - + " AND timestamp_start > ?" - + " ORDER BY timestamp_start ASC"); - stmt.setLong(1, DataAggregatorDaemon.FIVE_MINUTES_IN_MS-1); - stmt.setLong(2, (System.currentTimeMillis() - DataAggregatorDaemon.DAY_IN_MS) ); + "SELECT user.username as username," + + " sensor_data_aggr.timestamp_start as timestamp_start," + + " sensor_data_aggr.timestamp_end as timestamp_end," + + " sensor_data_aggr.data as data," + + " sensor_data_aggr.confidence as confidence," + + TimeConstants.FIVE_MINUTES_IN_MS + " as period_length" + + " FROM sensor_data_aggr, user, sensor" + + " WHERE sensor.id = sensor_data_aggr.sensor_id" + + " AND user.id = sensor.user_id" + + " AND timestamp_end-timestamp_start == ?" + + " AND timestamp_start > ?" + + " ORDER BY timestamp_start ASC"); + stmt.setLong(1, TimeConstants.FIVE_MINUTES_IN_MS-1); + stmt.setLong(2, (System.currentTimeMillis() - TimeConstants.DAY_IN_MS) ); ArrayList minDataList = DBConnection.exec(stmt , new SQLPowerDataBuilder()); stmt = db.getPreparedStatement( - "SELECT user.username as username," - + " sensor_data_aggr.timestamp_start as timestamp_start," - + " sensor_data_aggr.timestamp_end as timestamp_end," - + " sensor_data_aggr.data as data," - + " sensor_data_aggr.confidence as confidence," - + DataAggregatorDaemon.HOUR_IN_MS + " as period_length" - + " FROM sensor_data_aggr, user, sensor" - + " WHERE sensor.id = sensor_data_aggr.sensor_id" - + " AND user.id = sensor.user_id" - + " AND timestamp_end-timestamp_start == ?" - + " AND timestamp_start > ?" - + " ORDER BY timestamp_start ASC"); - stmt.setLong(1, DataAggregatorDaemon.HOUR_IN_MS-1); - stmt.setLong(2, (System.currentTimeMillis() - 3*DataAggregatorDaemon.DAY_IN_MS) ); + "SELECT user.username as username," + + " sensor_data_aggr.timestamp_start as timestamp_start," + + " sensor_data_aggr.timestamp_end as timestamp_end," + + " sensor_data_aggr.data as data," + + " sensor_data_aggr.confidence as confidence," + + TimeConstants.HOUR_IN_MS + " as period_length" + + " FROM sensor_data_aggr, user, sensor" + + " WHERE sensor.id = sensor_data_aggr.sensor_id" + + " AND user.id = sensor.user_id" + + " AND timestamp_end-timestamp_start == ?" + + " AND timestamp_start > ?" + + " ORDER BY timestamp_start ASC"); + stmt.setLong(1, TimeConstants.HOUR_IN_MS-1); + stmt.setLong(2, (System.currentTimeMillis() - 3*TimeConstants.DAY_IN_MS) ); ArrayList hourDataList = DBConnection.exec(stmt, new SQLPowerDataBuilder()); stmt = db.getPreparedStatement( - "SELECT user.username as username," - + " sensor_data_aggr.timestamp_start as timestamp_start," - + " sensor_data_aggr.timestamp_end as timestamp_end," - + " sensor_data_aggr.data as data," - + " sensor_data_aggr.confidence as confidence," - + DataAggregatorDaemon.DAY_IN_MS + " as period_length" - + " FROM sensor_data_aggr, user, sensor" - + " WHERE sensor.id = sensor_data_aggr.sensor_id" - + " AND user.id = sensor.user_id" - + " AND timestamp_end-timestamp_start == ?" - + " ORDER BY timestamp_start ASC"); - stmt.setLong(1, DataAggregatorDaemon.DAY_IN_MS-1); + "SELECT user.username as username," + + " sensor_data_aggr.timestamp_start as timestamp_start," + + " sensor_data_aggr.timestamp_end as timestamp_end," + + " sensor_data_aggr.data as data," + + " sensor_data_aggr.confidence as confidence," + + TimeConstants.DAY_IN_MS + " as period_length" + + " FROM sensor_data_aggr, user, sensor" + + " WHERE sensor.id = sensor_data_aggr.sensor_id" + + " AND user.id = sensor.user_id" + + " AND timestamp_end-timestamp_start == ?" + + " ORDER BY timestamp_start ASC"); + stmt.setLong(1, TimeConstants.DAY_IN_MS-1); ArrayList dayDataList = DBConnection.exec(stmt, new SQLPowerDataBuilder()); - Templator tmpl = new Templator(FileUtil.find("web-resource/overview.tmpl")); + Templator tmpl = new Templator(FileUtil.find("web-resource/index.html")); tmpl.set("minData", minDataList); tmpl.set("hourData", hourDataList); tmpl.set("dayData", dayDataList); tmpl.set("username", new String[]{"Ziver", "Daniel"}); - return tmpl; - + return tmpl; } - + public static class PowerData{ long timestamp; String data; @@ -104,29 +98,29 @@ public class PCOverviewHttpPage extends HalHttpPage { this.username = uname; } } - + private static class SQLPowerDataBuilder implements SQLResultHandler> { @Override public ArrayList handleQueryResult(Statement stmt, ResultSet result) throws SQLException { ArrayList list = new ArrayList<>(); long previousTimestampEnd = -1; while(result.next()){ - + long timestampStart = result.getLong("timestamp_start"); long timestampEnd = result.getLong("timestamp_end"); long periodLength = result.getLong("period_length"); int data = result.getInt("data"); String username = result.getString("username"); float confidence = result.getFloat("confidence"); - + //add null data point to list if one or more periods of data is missing before this if(previousTimestampEnd != -1 && timestampStart-previousTimestampEnd > periodLength){ list.add(new PowerData(previousTimestampEnd+1, "null", username)); } - + //add this data point to list list.add(new PowerData(timestampStart, ""+ (data/1000.0), username)); - + //update previous end timestamp previousTimestampEnd = timestampEnd; } diff --git a/src/se/koc/hal/plugin/localsensor/ImpulseTracker.java b/src/se/koc/hal/plugin/localsensor/ImpulseTracker.java index 6d0232a5..fcc13659 100755 --- a/src/se/koc/hal/plugin/localsensor/ImpulseTracker.java +++ b/src/se/koc/hal/plugin/localsensor/ImpulseTracker.java @@ -25,20 +25,28 @@ public class ImpulseTracker implements Runnable { private Integer impulseCount = 0; private ExecutorService executorPool; private final DBConnection db; + private final int sensorId; public static void main(String args[]) throws Exception { - new ImpulseTracker(); + new ImpulseTracker(2); } - public ImpulseTracker() throws Exception{ + /** + * Constructor + * @param sensorId The ID of this sensor. Will be written to the DB + * @throws Exception + */ + public ImpulseTracker(int sensorId) throws Exception{ + this.sensorId = sensorId; + // create gpio controller final GpioController gpio = GpioFactory.getInstance(); // provision gpio pin #02 as an input pin with its internal pull up resistor enabled final GpioPinDigitalInput irLightSensor = gpio.provisionDigitalInputPin(RaspiPin.GPIO_02, PinPullResistance.PULL_UP); - // create and register gpio pin listener + // create and register gpio pin listener. May require the program to be run as sudo if the GPIO pin has not been exported irLightSensor.addListener(new GpioPinListenerDigital() { @Override public void handleGpioPinDigitalStateChangeEvent(GpioPinDigitalStateChangeEvent event) { @@ -52,8 +60,10 @@ public class ImpulseTracker implements Runnable { }); + // setup a thread pool for executing database jobs this.executorPool = Executors.newCachedThreadPool(); + // Connect to the database logger.info("Connecting to db..."); db = new DBConnection(DBConnection.DBMS.SQLite, "hal.db"); @@ -64,6 +74,10 @@ public class ImpulseTracker implements Runnable { } + /** + * This loop will try to save the current time and the number of impulses seen every [IMPULSE_REPORT_TIMEOUT] milliseconds. + * Every iteration the actual loop time will be evaluated and used to calculate the time for the next loop. + */ @Override public void run() { long startTime = System.nanoTime(); @@ -71,7 +85,7 @@ public class ImpulseTracker implements Runnable { impulseCount = 0; //reset the impulse count } while(true) { - sleepNano(nanoSecondsSleep); + sleepNano(nanoSecondsSleep); //sleep for some time. This variable will be modified every loop to compensate for the loop time spent. int count = -1; synchronized(impulseCount){ count = impulseCount; @@ -88,6 +102,10 @@ public class ImpulseTracker implements Runnable { } } + /** + * Sleep for [ns] nanoseconds + * @param ns + */ private void sleepNano(long ns){ //System.out.println("will go to sleep for " + ns + "ns"); try{ @@ -97,12 +115,21 @@ public class ImpulseTracker implements Runnable { } } + /** + * Saves the data to the database. + * This method should block the caller as short time as possible. + * Try to make the time spent in the method the same for every call (low variation). + * + * @param timestamp_end + * @param data + */ private void save(final long timestamp_end, final int data){ + //offload the timed loop by not doing the db interaction in this thread. executorPool.execute(new Runnable(){ @Override public void run() { try { - db.exec("INSERT INTO sensor_data_raw(timestamp, sensor_id, data) VALUES("+timestamp_end+", "+2+", "+data+")"); + db.exec("INSERT INTO sensor_data_raw(timestamp, sensor_id, data) VALUES("+timestamp_end+", "+ImpulseTracker.this.sensorId+", "+data+")"); } catch (SQLException e) { e.printStackTrace(); } diff --git a/src/se/koc/hal/struct/Sensor.java b/src/se/koc/hal/struct/Sensor.java index 83279e7b..d2d215ea 100755 --- a/src/se/koc/hal/struct/Sensor.java +++ b/src/se/koc/hal/struct/Sensor.java @@ -35,6 +35,11 @@ public class Sensor extends DBBean{ return DBConnection.exec(stmt, DBBeanSQLResultHandler.createList(Sensor.class, db) ); } + public static List getSensors(DBConnection db) throws SQLException{ + PreparedStatement stmt = db.getPreparedStatement( "SELECT * FROM sensor" ); + return DBConnection.exec(stmt, DBBeanSQLResultHandler.createList(Sensor.class, db) ); + } + public static long getHighestSequenceId(long sensorId) throws SQLException{ PreparedStatement stmt = HalContext.getDB().getPreparedStatement("SELECT MAX(sequence_id) FROM sensor_data_aggr WHERE sensor_id == ?"); diff --git a/web-resource/overview.tmpl b/web-resource/overview.tmpl index da6909c5..09f82443 100755 --- a/web-resource/overview.tmpl +++ b/web-resource/overview.tmpl @@ -19,32 +19,32 @@