Implemented peer information syncing.

Implemented local sensor sync boolean value validation.
Updated some traces


Former-commit-id: 6cfd254ccb6a983f56308e706f91869867881bc2
This commit is contained in:
Ziver Koc 2016-01-06 23:20:28 +01:00
parent 8d3ac0d340
commit eec7887f94
7 changed files with 199 additions and 65 deletions

View file

@ -1,3 +1,5 @@
http_port=8080
sync_port=6666
# Plugin configurations
tellstick.com_port=COM5

View file

@ -50,6 +50,15 @@ public class ControllerManager implements HalSensorReportListener, HalEventRepor
/////////////////////////////// SENSORS ///////////////////////////////////
public void register(Sensor sensor) throws IllegalAccessException, InstantiationException {
if(sensor.getSensorData() == null) {
logger.warning("Sensor data is null: "+ sensor);
return;
}
if(!availableSensors.contains(sensor.getSensorData().getClass())) {
logger.warning("Sensor data plugin not available: "+ sensor.getSensorData().getClass());
return;
}
logger.info("Registering new sensor(id: "+ sensor.getId() +"): "+ sensor.getSensorData().getClass());
Class<? extends HalSensorController> c = sensor.getController();
HalSensorController controller = getControllerInstance(c);
@ -60,6 +69,11 @@ public class ControllerManager implements HalSensorReportListener, HalEventRepor
}
public void deregister(Sensor sensor){
if(sensor.getSensorData() == null) {
logger.warning("Sensor data is null: "+ sensor);
return;
}
logger.info("Deregistering sensor(id: "+ sensor.getId() +"): "+ sensor.getSensorData().getClass());
Class<? extends HalSensorController> c = sensor.getController();
HalSensorController controller = (HalSensorController) controllerMap.get(c);;
@ -114,6 +128,15 @@ public class ControllerManager implements HalSensorReportListener, HalEventRepor
//////////////////////////////// EVENTS ///////////////////////////////////
public void register(Event event) throws IllegalAccessException, InstantiationException {
if(event.getEventData() == null) {
logger.warning("Sensor data is null: "+ event);
return;
}
if(!availableEvents.contains(event.getEventData().getClass())) {
logger.warning("Sensor data plugin not available: "+ event.getEventData().getClass());
return;
}
logger.info("Registering new event(id: "+ event.getId() +"): "+ event.getEventData().getClass());
Class<? extends HalEventController> c = event.getController();
HalEventController controller = getControllerInstance(c);
@ -124,6 +147,11 @@ public class ControllerManager implements HalSensorReportListener, HalEventRepor
}
public void deregister(Event event){
if(event.getEventData() == null) {
logger.warning("Sensor data is null: "+ event);
return;
}
logger.info("Deregistering event(id: "+ event.getId() +"): "+ event.getEventData().getClass());
Class<? extends HalEventController> c = event.getController();
HalEventController controller = (HalEventController) controllerMap.get(c);

View file

@ -1,8 +1,7 @@
package se.koc.hal.deamon;
import se.koc.hal.HalContext;
import se.koc.hal.deamon.DataSynchronizationDaemon.SensorDataDTO;
import se.koc.hal.deamon.DataSynchronizationDaemon.SensorDataListDTO;
import se.koc.hal.deamon.DataSynchronizationDaemon.*;
import se.koc.hal.intf.HalDaemon;
import se.koc.hal.struct.Sensor;
import se.koc.hal.struct.User;
@ -13,6 +12,7 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.ConnectException;
import java.net.Socket;
import java.net.UnknownHostException;
import java.sql.PreparedStatement;
@ -20,6 +20,7 @@ import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
public class DataSynchronizationClient implements HalDaemon {
@ -46,11 +47,34 @@ public class DataSynchronizationClient implements HalDaemon {
try (Socket s = new Socket(user.getHostname(), user.getPort());){
ObjectOutputStream out = new ObjectOutputStream(s.getOutputStream());
ObjectInputStream in = new ObjectInputStream(s.getInputStream());
// Request peer data
out.writeObject(new PeerDataReqDTO());
PeerDataRspDTO peerData = (PeerDataRspDTO) in.readObject();
user.setUserName(peerData.username);
user.setAddress(peerData.address);
user.save(db);
for(SensorDTO sensorDTO : peerData.sensors){
Sensor sensor = Sensor.getExternalSensor(db, sensorDTO.sensorId);
if(sensor != null) { // new sensor
sensor = new Sensor();
logger.fine("Created new external sensor with external_id: "+ sensorDTO.sensorId);
}
else
logger.fine("Updating external sensor with external_id: "+ sensorDTO.sensorId);
sensor.setExternalId(sensorDTO.sensorId);
sensor.setName(sensorDTO.name);
sensor.setType(sensorDTO.type);
sensor.setConfig(sensorDTO.config);
sensor.save(db);
}
// Request sensor data
List<Sensor> sensors = Sensor.getSensors(db, user);
for(Sensor sensor : sensors){
if(sensor.isSynced()) {
PeerDataReqDTO req = new PeerDataReqDTO();
SensorDataReqDTO req = new SensorDataReqDTO();
req.sensorId = sensor.getExternalId();
req.offsetSequenceId = Sensor.getHighestSequenceId(sensor.getId());
out.writeObject(req);
@ -71,29 +95,36 @@ public class DataSynchronizationClient implements HalDaemon {
else
logger.fine("Skipped sensor " + sensor.getId());
}
out.writeObject(null);
out.writeObject(null); // Tell server we are disconnecting
out.close();
in.close();
s.close();
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (UnknownHostException|ConnectException e) {
logger.warning("Unable to connect to: "+ user.getHostname()+":"+user.getPort() +" "+ e.getMessage());
} catch (ClassNotFoundException|IOException e) {
logger.log(Level.SEVERE, null, e);
}
}
} catch (SQLException e) {
e.printStackTrace();
logger.log(Level.SEVERE, null, e);
}
}
/////////////// DTO ///////////////////////
protected static class PeerDataReqDTO implements Serializable{
/**
* Request Peer information and available sensors
*/
protected static class PeerDataReqDTO implements Serializable{}
/**
* Request aggregate data for a specific sensor and offset
*/
protected static class SensorDataReqDTO implements Serializable{
private static final long serialVersionUID = -9066734025245139989L;
public long sensorId;

View file

@ -1,8 +1,10 @@
package se.koc.hal.deamon;
import se.koc.hal.HalContext;
import se.koc.hal.deamon.DataSynchronizationClient.PeerDataReqDTO;
import se.koc.hal.deamon.DataSynchronizationClient.*;
import se.koc.hal.intf.HalDaemon;
import se.koc.hal.struct.Sensor;
import se.koc.hal.struct.User;
import zutil.db.DBConnection;
import zutil.db.SQLResultHandler;
import zutil.log.LogUtil;
@ -19,7 +21,9 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
public class DataSynchronizationDaemon extends ThreadedTCPNetworkServer implements HalDaemon {
@ -42,7 +46,7 @@ public class DataSynchronizationDaemon extends ThreadedTCPNetworkServer implemen
try {
return new DataSynchronizationDaemonThread(s);
} catch (IOException e) {
e.printStackTrace();
logger.log(Level.SEVERE, "Unable to create DataSynchronizationDaemonThread", e);
}
return null;
}
@ -63,55 +67,94 @@ public class DataSynchronizationDaemon extends ThreadedTCPNetworkServer implemen
public void run(){
logger.fine("User connected: "+ s.getInetAddress().getHostName());
DBConnection db = HalContext.getDB();
try {
Object obj = null;
while((obj = in.readObject()) != null){
if(obj instanceof PeerDataReqDTO){
PeerDataReqDTO req = (PeerDataReqDTO) obj;
PreparedStatement stmt = HalContext.getDB().getPreparedStatement("SELECT * FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id > ?");
stmt.setLong(1, req.sensorId);
stmt.setLong(2, req.offsetSequenceId);
logger.fine("Client requesting: sensorId: "+req.sensorId+", offset: "+req.offsetSequenceId);
SensorDataListDTO list = DBConnection.exec(stmt, new SQLResultHandler<SensorDataListDTO>() {
@Override
public SensorDataListDTO handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
SensorDataListDTO list = new SensorDataListDTO();
while(result.next()){
SensorDataDTO data = new SensorDataDTO();
data.sequenceId = result.getLong("sensor_id");
data.timestampStart = result.getLong("timestamp_start");
data.timestampEnd = result.getLong("timestamp_end");
data.data = result.getInt("data");
data.confidence = result.getFloat("confidence");
list.add(data);
}
return list;
}
});
logger.fine("Sending "+ list.size() +" sensor data items to client");
out.writeObject(list);
if(obj instanceof PeerDataReqDTO){
logger.fine("Client requesting peer data");
PeerDataRspDTO rsp = new PeerDataRspDTO();
User localUser = User.getLocalUser(db);
rsp.username = localUser.getUserName();
rsp.address = localUser.getAddress();
rsp.sensors = new ArrayList<>();
for(Sensor sensor : Sensor.getLocalSensors(db)){
if(sensor.isSynced()) {
SensorDTO dto = new SensorDTO();
dto.sensorId = sensor.getId();
dto.name = sensor.getName();
dto.type = sensor.getType();
dto.config = sensor.getConfig();
rsp.sensors.add(dto);
}
}
out.writeObject(rsp);
}
if(obj instanceof SensorDataReqDTO){
SensorDataReqDTO req = (SensorDataReqDTO) obj;
Sensor sensor = Sensor.getSensor(db, req.sensorId);
if(sensor.isSynced()) {
PreparedStatement stmt = db.getPreparedStatement("SELECT * FROM sensor_data_aggr WHERE sensor_id == ? AND sequence_id > ?");
stmt.setLong(1, sensor.getId());
stmt.setLong(2, req.offsetSequenceId);
logger.fine("Client requesting sensor data: sensorId: " + req.sensorId + ", offset: " + req.offsetSequenceId);
SensorDataListDTO rsp = DBConnection.exec(stmt, new SQLResultHandler<SensorDataListDTO>() {
@Override
public SensorDataListDTO handleQueryResult(Statement stmt, ResultSet result) throws SQLException {
SensorDataListDTO list = new SensorDataListDTO();
while (result.next()) {
SensorDataDTO data = new SensorDataDTO();
data.sequenceId = result.getLong("sensor_id");
data.timestampStart = result.getLong("timestamp_start");
data.timestampEnd = result.getLong("timestamp_end");
data.data = result.getInt("data");
data.confidence = result.getFloat("confidence");
list.add(data);
}
return list;
}
});
logger.fine("Sending " + rsp.size() + " sensor data items to client");
out.writeObject(rsp);
}
else{
logger.warning("Client requesting non synced sensor data: sensorId: " + req.sensorId + ", offset: " + req.offsetSequenceId);
SensorDataListDTO rsp = new SensorDataListDTO();
out.writeObject(rsp);
}
}
}
out.close();
in.close();
s.close();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
} catch (ClassNotFoundException|IOException|SQLException e) {
logger.log(Level.SEVERE, null, e);
}
logger.fine("User disconnected: "+ s.getInetAddress().getHostName());
}
}
/////////////// DTO ///////////////////////
protected static class PeerDataRspDTO implements Serializable{
public String username;
public String address;
public ArrayList<SensorDTO> sensors;
}
protected static class SensorDTO implements Serializable{
public long sensorId;
public String name;
public String type;
public String config;
}
protected static class SensorDataListDTO extends ArrayList<SensorDataDTO> implements Serializable{
private static final long serialVersionUID = -5701618637734020691L;
}
protected static class SensorDataDTO implements Serializable{
private static final long serialVersionUID = 8494331502087736809L;

View file

@ -58,15 +58,16 @@ public class PCConfigureHttpPage extends HalHttpPage {
sensor = new Sensor();
sensor.setName(request.get("name"));
sensor.setType(request.get("type"));
sensor.setSynced(Boolean.parseBoolean(request.get("sync")));
//sensor.setConfig(request.get("config"));
sensor.setUser(localUser);
sensor.setSynced(true);
sensor.save(db);
case "modify_local_sensor":
sensor = Sensor.getSensor(db, id);
if(sensor != null){
sensor.setName(request.get("name"));
sensor.setType(request.get("type"));
sensor.setSynced(Boolean.parseBoolean(request.get("sync")));
//sensor.setConfig(request.get("config"));
sensor.save(db);
}
@ -118,6 +119,8 @@ public class PCConfigureHttpPage extends HalHttpPage {
tmpl.set("extUsers", User.getExternalUsers(db));
tmpl.set("extSensor", Sensor.getExternalSensors(db));
tmpl.set("availableSensors", ControllerManager.getInstance().getAvailableSensors());
return tmpl;
}

View file

@ -50,7 +50,12 @@ public class Sensor extends DBBean{
PreparedStatement stmt = db.getPreparedStatement( "SELECT sensor.* FROM sensor,user WHERE user.external == 1 AND user.id == sensor.user_id" );
return DBConnection.exec(stmt, DBBeanSQLResultHandler.createList(Sensor.class, db) );
}
public static Sensor getExternalSensor(DBConnection db, long id) throws SQLException{
PreparedStatement stmt = db.getPreparedStatement( "SELECT sensor.* FROM sensor,user WHERE user.external == 1 AND user.id == sensor.user_id AND sensor.external_id == ?" );
stmt.setLong(1, id);
return DBConnection.exec(stmt, DBBeanSQLResultHandler.create(Sensor.class, db) );
}
public static List<Sensor> getLocalSensors(DBConnection db) throws SQLException{
PreparedStatement stmt = db.getPreparedStatement( "SELECT sensor.* FROM sensor,user WHERE user.external == 0 AND user.id == sensor.user_id" );
return DBConnection.exec(stmt, DBBeanSQLResultHandler.createList(Sensor.class, db) );
@ -67,7 +72,7 @@ public class Sensor extends DBBean{
return DBConnection.exec(stmt, DBBeanSQLResultHandler.createList(Sensor.class, db) );
}
public static Sensor getSensor(DBConnection db, int id) throws SQLException{
public static Sensor getSensor(DBConnection db, long id) throws SQLException{
return DBBean.load(db, Sensor.class, id);
}
@ -81,6 +86,7 @@ public class Sensor extends DBBean{
public void setSensorData(HalSensor sensorData){
this.sensorData = sensorData;
updateConfig();
}
public HalSensor getSensorData(){
if(sensorData == null) {
@ -98,22 +104,24 @@ public class Sensor extends DBBean{
return sensorData;
}
public void save(DBConnection db) throws SQLException {
if(sensorData != null) {
try {
StringOutputStream buff = new StringOutputStream();
JSONObjectOutputStream out = new JSONObjectOutputStream(buff);
out.enableMetaData(false);
out.writeObject(sensorData);
out.close();
this.config = buff.toString();
} catch (IOException e){
logger.log(Level.SEVERE, "Unable to save sensor data", e);
}
}
if(sensorData != null)
updateConfig();
else
this.config = null;
super.save(db);
}
private void updateConfig(){
try {
StringOutputStream buff = new StringOutputStream();
JSONObjectOutputStream out = new JSONObjectOutputStream(buff);
out.enableMetaData(false);
out.writeObject(sensorData);
out.close();
this.config = buff.toString();
} catch (IOException e){
logger.log(Level.SEVERE, "Unable to save sensor data", e);
}
}
public String getName() {
@ -128,6 +136,13 @@ public class Sensor extends DBBean{
public void setType(String type) {
this.type = type;
}
public String getConfig() {
return config;
}
public void setConfig(String config) {
this.config = config;
this.sensorData = null; // invalidate current sensor data object
}
public User getUser() {
return user;
@ -156,4 +171,5 @@ public class Sensor extends DBBean{
public Class<? extends HalSensorController> getController(){
return getSensorData().getSensorController();
}
}

View file

@ -38,6 +38,7 @@
<thead>
<th>Name</th>
<th>Type</th>
<th>Public</th>
<th>Configuration</th>
<th>
<button class="btn btn-default btn-xs pull-right" data-toggle="modal"
@ -50,6 +51,7 @@
<tr>
<td>{{.name}}</td>
<td>{{.type}}</td>
<td>{{.sync}}</td>
<td>{{.config}}</td>
<td>
<form method="POST">
@ -62,6 +64,7 @@
data-id="{{.getId()}}"
data-name="{{.name}}"
data-type="{{.type}}"
data-sync="{{.sync}}"
data-config="{{.config}}">
<span class="glyphicon glyphicon-pencil"></span>
</button>
@ -183,17 +186,19 @@
$("#sensorModal").on('show.bs.modal', function (event) {
var button = $(event.relatedTarget);
var modal = $(this);
modal.find("input").val(""); // Reset all inputs
modal.find("input[type=text]").val(""); // Reset all inputs
if(button.data("id") >= 0){ // edit
modal.find("input[name=action]").val("modify_local_sensor");
modal.find("input[name=id]").val(button.data("id"));
modal.find("input[name=name]").val(button.data("name"));
modal.find("input[name=type]").val(button.data("type"));
modal.find("input[name=sync]").prop("checked", button.data("sync"));
modal.find("input[name=config]").val(button.data("config"));
}
else{ // create
modal.find("input[name=action]").val("create_local_sensor");
modal.find("input[name=id]").val(-1);
modal.find("input[name=sync]").prop("checked", "false");
}
});
@ -201,7 +206,7 @@
$("#userModal").on('show.bs.modal', function (event) {
var button = $(event.relatedTarget);
var modal = $(this);
modal.find("input").val(""); // Reset all inputs
modal.find("input[type=text]").val(""); // Reset all inputs
if(button.data("id") >= 0){ // edit
modal.find("input[name=action]").val("modify_external_user");
modal.find("input[name=id]").val(button.data("id"));
@ -237,10 +242,16 @@
<div class="form-group">
<label class="control-label">Type:</label>
<select class="form-control" name="type">
<option>TellStickController</option>
<option>RPiLocalController</option>
{{#availableSensors}}
<option>{{.getName()}}</option>
{{/availableSensors}}
</select>
</div>
<div class="form-group">
<label class="control-label">Public:</label>
<input type="checkbox" class="form-control" name="sync" value="true">
</div>
<hr>
<div class="form-group">
<label class="control-label">Config:</label> <!-- Should be improved, dynamic forms? -->