Added 5-minute, hour and day aggregator

Former-commit-id: c02ec9ab1b0298c0b0e3f03b8ce5c3d6bdba21b2
This commit is contained in:
dcollin 2015-12-04 00:21:14 +01:00
parent adf49efc9a
commit 1fdc895ef4

View file

@ -3,6 +3,7 @@ package se.koc.hal.deamon;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.Locale;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -21,6 +22,8 @@ import zutil.log.LogUtil;
public class DataAggregatorDaemon extends TimerTask implements HalDaemon { public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
private static final Logger logger = LogUtil.getLogger(); private static final Logger logger = LogUtil.getLogger();
private static final int FIVE_MINUTES_IN_MS = 5 * 60 * 1000; private static final int FIVE_MINUTES_IN_MS = 5 * 60 * 1000;
private static final int HOUR_IN_MS = FIVE_MINUTES_IN_MS * 12;
private static final int DAY_IN_MS = HOUR_IN_MS * 24;
public void initiate(Timer timer){ public void initiate(Timer timer){
@ -33,28 +36,68 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
public void run() { public void run() {
DBConnection db = PowerChallenge.db; DBConnection db = PowerChallenge.db;
try { try {
Long maxTimestampEnd = db.exec("SELECT MAX(timestamp_end) FROM sensor_data_aggr", new SimpleSQLHandler<Long>()); Long maxDBTimestamp = db.exec("SELECT MAX(timestamp_end) FROM sensor_data_aggr", new SimpleSQLHandler<Long>());
if(maxTimestampEnd == null) if(maxDBTimestamp == null)
maxTimestampEnd = 0l; maxDBTimestamp = 0l;
logger.fine("Calculating 5 min periods..."); // 5 minute aggregation
long intervallTimestamp = getTimestampPeriodStart(5, System.currentTimeMillis()); long minPeriodTimestamp = getTimestampMinutePeriodStart(5, System.currentTimeMillis());
db.exec("SELECT * FROM sensor_data_raw WHERE timestamp > " + maxTimestampEnd + " AND timestamp < " + intervallTimestamp + " ORDER BY timestamp ASC", new FiveMinuteAgrrigator()); logger.fine("Calculating 5 min periods... (from:"+ maxDBTimestamp +", to:"+ minPeriodTimestamp +")");
db.exec("SELECT * FROM sensor_data_raw "
+ "WHERE timestamp > " + maxDBTimestamp + " AND timestamp < " + minPeriodTimestamp
+ " ORDER BY timestamp ASC",
new FiveMinuteAggregator());
// hour aggregation
maxDBTimestamp = db.exec("SELECT MAX(timestamp_end) FROM sensor_data_aggr WHERE timestamp_end-timestamp_start == " + (HOUR_IN_MS-1), new SimpleSQLHandler<Long>());
if(maxDBTimestamp == null)
maxDBTimestamp = 0l;
long hourPeriodTimestamp = getTimestampMinutePeriodStart(60, System.currentTimeMillis());
logger.fine("Calculating hour periods... (from:"+ maxDBTimestamp +", to:"+ minPeriodTimestamp +")");
db.exec("SELECT * FROM sensor_data_aggr "
+ "WHERE timestamp_start > " + maxDBTimestamp + " AND timestamp_start < " + hourPeriodTimestamp + " AND timestamp_end-timestamp_start == " + (FIVE_MINUTES_IN_MS-1)
+" ORDER BY timestamp_start ASC",
new HourAggregator());
// day aggregation
maxDBTimestamp = db.exec("SELECT MAX(timestamp_end) FROM sensor_data_aggr WHERE timestamp_end-timestamp_start == " + (DAY_IN_MS-1), new SimpleSQLHandler<Long>());
if(maxDBTimestamp == null)
maxDBTimestamp = 0l;
long dayPeriodTimestamp = getTimestampHourPeriodStart(24, System.currentTimeMillis());
logger.fine("Calculating day periods... (from:"+ maxDBTimestamp +", to:"+ minPeriodTimestamp +")");
db.exec("SELECT * FROM sensor_data_aggr "
+ "WHERE timestamp_start > " + maxDBTimestamp + " AND timestamp_start < " + dayPeriodTimestamp + " AND timestamp_end-timestamp_start == " + (HOUR_IN_MS-1)
+" ORDER BY timestamp_start ASC",
new DayAggregator());
logger.fine("Done aggrigating");
} catch (SQLException e) { } catch (SQLException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
private static long getTimestampHourPeriodStart(int hour, long timestamp){
private static long getTimestampPeriodStart(int min, long timestamp){
Calendar cal = Calendar.getInstance(); Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(System.currentTimeMillis()); cal.setTimeInMillis(timestamp);
int currentMinute = cal.get(Calendar.MINUTE); int currentMinute = cal.get(Calendar.HOUR_OF_DAY);
cal.set(Calendar.MINUTE, (currentMinute/min) * min); cal.set(Calendar.HOUR_OF_DAY, (currentMinute/hour) * hour);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
return cal.getTimeInMillis(); return cal.getTimeInMillis();
} }
private class FiveMinuteAgrrigator implements SQLResultHandler<Object>{ private static long getTimestampMinutePeriodStart(int min, long timestamp){
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(timestamp);
int currentMinute = cal.get(Calendar.MINUTE);
cal.set(Calendar.MINUTE, (currentMinute/min) * min);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
return cal.getTimeInMillis();
}
private class FiveMinuteAggregator implements SQLResultHandler<Object>{
@Override @Override
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException { public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
long currentPeriodTimestamp = 0; long currentPeriodTimestamp = 0;
@ -62,11 +105,11 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
int count = 0; int count = 0;
while(result.next()){ while(result.next()){
long timestamp = result.getLong("timestamp"); long timestamp = result.getLong("timestamp");
long periodTimestamp = getTimestampPeriodStart(5, timestamp); long periodTimestamp = getTimestampMinutePeriodStart(5, timestamp);
if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){ if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){
float confidence = count / 5f; float confidence = Math.min(count / 5f, 1.0f);
logger.finer("Calculated period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ confidence); logger.finer("Calculated minute period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ confidence);
PowerChallenge.db.exec(String.format("INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(%d, %d, %d, %d, %d, %f)", PowerChallenge.db.exec(String.format(Locale.US, "INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(%d, %d, %d, %d, %d, %f)",
result.getInt("sensor_id"), result.getInt("sensor_id"),
42, 42,
currentPeriodTimestamp, currentPeriodTimestamp,
@ -75,16 +118,81 @@ public class DataAggregatorDaemon extends TimerTask implements HalDaemon {
confidence)); confidence));
// Reset variables // Reset variables
periodTimestamp = currentPeriodTimestamp; currentPeriodTimestamp = periodTimestamp;
sum = count = 0; confidence = sum = 0;
} }
if(currentPeriodTimestamp == 0) currentPeriodTimestamp = periodTimestamp; if(currentPeriodTimestamp == 0) currentPeriodTimestamp = periodTimestamp;
sum += result.getInt("data"); sum += result.getInt("data");
++count; ++count;
} }
return null; return null;
} }
} }
private class HourAggregator implements SQLResultHandler<Object>{
@Override
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
long currentPeriodTimestamp = 0;
int sum = 0;
float confidenceSum = 0;
while(result.next()){
long timestamp = result.getLong("timestamp_start");
long periodTimestamp = getTimestampMinutePeriodStart(60, timestamp);
if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){
float aggrConfidence = confidenceSum / 12f;
logger.finer("Calculated hour period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ aggrConfidence);
PowerChallenge.db.exec(String.format(Locale.US, "INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(%d, %d, %d, %d, %d, %f)",
result.getInt("sensor_id"),
42,
currentPeriodTimestamp,
currentPeriodTimestamp + HOUR_IN_MS -1,
sum,
aggrConfidence));
// Reset variables
currentPeriodTimestamp = periodTimestamp;
confidenceSum = sum = 0;
}
if(currentPeriodTimestamp == 0) currentPeriodTimestamp = periodTimestamp;
sum += result.getInt("data");
confidenceSum += result.getFloat("confidence");
}
return null;
}
}
private class DayAggregator implements SQLResultHandler<Object>{
@Override
public Object handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
long currentPeriodTimestamp = 0;
int sum = 0;
float confidenceSum = 0;
int samples = 0;
while(result.next()){
long timestamp = result.getLong("timestamp_start");
long periodTimestamp = getTimestampHourPeriodStart(24, timestamp);
if(currentPeriodTimestamp != 0 && periodTimestamp != currentPeriodTimestamp){
float aggrConfidence = confidenceSum / 24f;
logger.finer("Calculated day period: "+ currentPeriodTimestamp +" sum: "+ sum +" confidence: "+ aggrConfidence+ " samples: " + samples);
PowerChallenge.db.exec(String.format(Locale.US, "INSERT INTO sensor_data_aggr(sensor_id, sequence_id, timestamp_start, timestamp_end, data, confidence) VALUES(%d, %d, %d, %d, %d, %f)",
result.getInt("sensor_id"),
42,
currentPeriodTimestamp,
currentPeriodTimestamp + DAY_IN_MS -1,
sum,
aggrConfidence));
// Reset variables
currentPeriodTimestamp = periodTimestamp;
confidenceSum = sum = 0;
samples = 0;
}
if(currentPeriodTimestamp == 0) currentPeriodTimestamp = periodTimestamp;
sum += result.getInt("data");
confidenceSum += result.getFloat("confidence");
samples++;
}
return null;
}
}
} }