Initial impl of Mqtt broker

This commit is contained in:
Ziver Koc 2017-05-31 17:23:35 +02:00
parent c339ba8266
commit 173a754088
24 changed files with 240 additions and 40 deletions

0
src/zutil/log/net/NetLogServer.java Normal file → Executable file
View file

View file

@ -0,0 +1,79 @@
package zutil.net.mqtt;
import zutil.log.LogUtil;
import zutil.net.mqtt.packet.MqttPacket;
import zutil.net.mqtt.packet.MqttPacketConnect;
import zutil.net.mqtt.packet.MqttPacketConnectAck;
import zutil.net.threaded.ThreadedTCPNetworkServer;
import zutil.net.threaded.ThreadedTCPNetworkServerThread;
import zutil.parser.binary.BinaryStructInputStream;
import zutil.parser.binary.BinaryStructOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* TODO:
*/
public class MqttBroker extends ThreadedTCPNetworkServer{
private static final Logger logger = LogUtil.getLogger();
public static final int MQTT_PORT = 1883;
public static final int MQTT_PORT_TLS = 8883;
public MqttBroker(){
super(MQTT_PORT);
}
@Override
protected ThreadedTCPNetworkServerThread getThreadInstance(Socket s) throws IOException {
return new MQTTConnectionThread(s);
}
private static class MQTTConnectionThread implements ThreadedTCPNetworkServerThread {
private Socket socket;
private BinaryStructInputStream in;
private BinaryStructOutputStream out;
private MQTTConnectionThread(Socket s) throws IOException {
socket = s;
in = new BinaryStructInputStream(socket.getInputStream());
out = new BinaryStructOutputStream(socket.getOutputStream());
}
@Override
public void run() {
try {
MqttPacket packet = MqttPacket.read(in);
// Unexpected packet?
if ( ! (packet.header instanceof MqttPacketConnect))
throw new IOException("Expected MqttPacketConnect but received "+packet.header.getClass());
MqttPacketConnect conn = (MqttPacketConnect) packet.header;
// Incorrect protocol version?
if (conn.protocolLevel != 0x04){
MqttPacketConnectAck connack = new MqttPacketConnectAck();
connack.returnCode = MqttPacketConnectAck.RETCODE_PROT_VER_ERROR;
MqttPacket.write(out, connack);
return;
}
} catch (IOException e) {
logger.log(Level.SEVERE, null, e);
} finally {
try {
socket.close();
} catch (IOException e) {
logger.log(Level.SEVERE, null, e);
}
}
}
}
}

View file

@ -0,0 +1,7 @@
package zutil.net.mqtt;
/**
* TODO:
*/
public class MqttClient {
}

View file

@ -0,0 +1,75 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
import zutil.parser.binary.BinaryStructInputStream;
import zutil.parser.binary.BinaryStructOutputStream;
import java.io.IOException;
import static zutil.net.mqtt.packet.MqttPacketHeader.*;
/**
* A data class encapsulating a MQTT header and its controlHeader
*/
public class MqttPacket {
public MqttPacketHeader header;
private MqttPacket() {}
public static MqttPacket read(BinaryStructInputStream in) throws IOException {
MqttPacket packet = new MqttPacket();
// Peek into stream and find packet type
in.mark(10);
packet.header = new MqttPacketHeader();
in.read(packet.header);
in.reset();
// Resolve the correct header class
switch (packet.header.type){
case PACKET_TYPE_CONN: packet.header = new MqttPacketConnect(); break;
case PACKET_TYPE_CONNACK: packet.header = new MqttPacketConnectAck(); break;
case PACKET_TYPE_PUBLISH: packet.header = new MqttPacketPublish(); break;
case PACKET_TYPE_PUBACK: packet.header = new MqttPacketPublishAck(); break;
case PACKET_TYPE_PUBREC: packet.header = new MqttPacketPublishRec(); break;
case PACKET_TYPE_PUBREL: packet.header = new MqttPacketPublishRec(); break;
case PACKET_TYPE_PUBCOMP: packet.header = new MqttPacketPublishComp(); break;
case PACKET_TYPE_SUBSCRIBE: packet.header = new MqttPacketSubscribe(); break;
case PACKET_TYPE_SUBACK: packet.header = new MqttPacketSubscribeAck(); break;
case PACKET_TYPE_UNSUBSCRIBE: packet.header = new MqttPacketUnsubscribe(); break;
case PACKET_TYPE_UNSUBACK: packet.header = new MqttPacketUnsubscribeAck(); break;
case PACKET_TYPE_PINGREQ: packet.header = new MqttPacketPingReq(); break;
case PACKET_TYPE_PINGRESP: packet.header = new MqttPacketPingResp(); break;
case PACKET_TYPE_DISCONNECT: packet.header = new MqttPacketDisconnect(); break;
default:
throw new IOException("Unknown header type: "+ packet.header.type);
}
in.read(packet.header);
// TODO: payload
return packet;
}
public static void write(BinaryStructOutputStream out, MqttPacketHeader header) throws IOException{
if (header instanceof MqttPacketConnect) header.type = PACKET_TYPE_CONN;
else if (header instanceof MqttPacketConnectAck) header.type = PACKET_TYPE_CONNACK;
else if (header instanceof MqttPacketPublishAck) header.type = PACKET_TYPE_PUBLISH;
else if (header instanceof MqttPacketPublishRec) header.type = PACKET_TYPE_PUBACK;
else if (header instanceof MqttPacketPublishComp) header.type = PACKET_TYPE_PUBREL;
else if (header instanceof MqttPacketSubscribe) header.type = PACKET_TYPE_PUBCOMP;
else if (header instanceof MqttPacketSubscribeAck) header.type = PACKET_TYPE_SUBSCRIBE;
else if (header instanceof MqttPacketUnsubscribe) header.type = PACKET_TYPE_UNSUBSCRIBE;
else if (header instanceof MqttPacketUnsubscribeAck) header.type = PACKET_TYPE_UNSUBACK;
else if (header instanceof MqttPacketPingReq) header.type = PACKET_TYPE_PINGREQ;
else if (header instanceof MqttPacketPingResp) header.type = PACKET_TYPE_PINGRESP;
else if (header instanceof MqttPacketDisconnect) header.type = PACKET_TYPE_DISCONNECT;
else
throw new IOException("Unknown header class: "+ header.getClass());
out.write(header);
// TODO: payload
}
}

View file

@ -9,7 +9,7 @@ import zutil.parser.binary.BinaryStruct;
* *
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a> * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/ */
public class MqttPacketConnect implements BinaryStruct { public class MqttPacketConnect extends MqttPacketHeader {
// Variable header // Variable header
@ -33,10 +33,10 @@ public class MqttPacketConnect implements BinaryStruct {
public int protocolLevel = 0x04; public int protocolLevel = 0x04;
/** Indicates that the payload contains a username */ /** Indicates that the controlHeader contains a username */
@BinaryField(index = 2010, length = 1) @BinaryField(index = 2010, length = 1)
public boolean flagUsername; public boolean flagUsername;
/** Indicates that the payload contains a password */ /** Indicates that the controlHeader contains a password */
@BinaryField(index = 2011, length = 1) @BinaryField(index = 2011, length = 1)
public boolean flagPassword; public boolean flagPassword;
/** Specifies if the Will Message is to be Retained when it is published. */ /** Specifies if the Will Message is to be Retained when it is published. */

View file

@ -8,7 +8,7 @@ import zutil.parser.binary.BinaryStruct;
* *
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a> * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/ */
public class MqttPacketConnectAck implements BinaryStruct{ public class MqttPacketConnectAck extends MqttPacketHeader{
public static final int RETCODE_OK = 0; public static final int RETCODE_OK = 0;
public static final int RETCODE_PROT_VER_ERROR = 1; public static final int RETCODE_PROT_VER_ERROR = 1;
public static final int RETCODE_IDENTIFIER_REJECT = 2; public static final int RETCODE_IDENTIFIER_REJECT = 2;

View file

@ -8,7 +8,7 @@ import zutil.parser.binary.BinaryStruct;
* *
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a> * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/ */
public class MqttPacketDisconnect implements BinaryStruct{ public class MqttPacketDisconnect extends MqttPacketHeader{
} }

View file

@ -8,6 +8,7 @@ import zutil.parser.binary.BinaryStruct;
public class MqttPacketHeader implements BinaryStruct { public class MqttPacketHeader implements BinaryStruct {
// RESERVED = 0; // RESERVED = 0;
public static final int PACKET_TYPE_CONN = 1;
public static final int PACKET_TYPE_CONNACK = 2; public static final int PACKET_TYPE_CONNACK = 2;
public static final int PACKET_TYPE_PUBLISH = 3; public static final int PACKET_TYPE_PUBLISH = 3;
public static final int PACKET_TYPE_PUBACK = 4; public static final int PACKET_TYPE_PUBACK = 4;
@ -26,11 +27,11 @@ public class MqttPacketHeader implements BinaryStruct {
@BinaryField(index = 1, length = 4) @BinaryField(index = 1, length = 4)
private byte type; public byte type;
@BinaryField(index = 2, length = 4) @BinaryField(index = 2, length = 4)
private byte flags; public byte flags;
@CustomBinaryField(index = 3, serializer = MqttVariableIntSerializer.class) @CustomBinaryField(index = 3, serializer = MqttVariableIntSerializer.class)
private int length; public int payloadLength;
} }

View file

@ -7,7 +7,7 @@ import zutil.parser.binary.BinaryStruct;
* *
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a> * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/ */
public class MqttPacketPingReq implements BinaryStruct{ public class MqttPacketPingReq extends MqttPacketHeader{
} }

View file

@ -8,7 +8,7 @@ import zutil.parser.binary.BinaryStruct;
* *
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a> * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/ */
public class MqttPacketPingResp implements BinaryStruct{ public class MqttPacketPingResp extends MqttPacketHeader{
} }

View file

@ -7,7 +7,7 @@ import zutil.parser.binary.BinaryStruct;
* *
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a> * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/ */
public class MqttPacketPublish implements BinaryStruct{ public class MqttPacketPublish extends MqttPacketHeader{
// Static Header // Static Header
/* /*
@ -26,7 +26,7 @@ public class MqttPacketPublish implements BinaryStruct{
@BinaryField(index = 2101, length = 16) @BinaryField(index = 2101, length = 16)
private int topicNameLength; private int topicNameLength;
/** The Topic Name identifies the information channel to which payload data is published. */ /** The Topic Name identifies the information channel to which controlHeader data is published. */
@VariableLengthBinaryField(index = 2102, lengthField = "topicNameLength") @VariableLengthBinaryField(index = 2102, lengthField = "topicNameLength")
public String topicName; public String topicName;

View file

@ -7,7 +7,7 @@ import zutil.parser.binary.BinaryStruct;
* *
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a> * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/ */
public class MqttPacketPublishAck implements BinaryStruct{ public class MqttPacketPublishAck extends MqttPacketHeader{
// Variable Header // Variable Header

View file

@ -9,7 +9,7 @@ import zutil.parser.binary.BinaryStruct;
* *
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a> * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/ */
public class MqttPacketPublishComp implements BinaryStruct{ public class MqttPacketPublishComp extends MqttPacketHeader{
// Variable Header // Variable Header

View file

@ -9,7 +9,7 @@ import zutil.parser.binary.BinaryStruct;
* *
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a> * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/ */
public class MqttPacketPublishRec implements BinaryStruct{ public class MqttPacketPublishRec extends MqttPacketHeader{
// Variable Header // Variable Header

View file

@ -9,7 +9,7 @@ import zutil.parser.binary.BinaryStruct;
* *
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a> * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/ */
public class MqttPacketPublishRel implements BinaryStruct{ public class MqttPacketPublishRel extends MqttPacketHeader{
// Variable Header // Variable Header

View file

@ -9,7 +9,7 @@ import java.util.List;
* *
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a> * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/ */
public class MqttPacketSubscribe implements BinaryStruct{ public class MqttPacketSubscribe extends MqttPacketHeader{
// Variable Header // Variable Header

View file

@ -10,7 +10,7 @@ import java.util.List;
* *
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a> * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/ */
public class MqttPacketSubscribeAck implements BinaryStruct{ public class MqttPacketSubscribeAck extends MqttPacketHeader{
// Variable Header // Variable Header

View file

@ -9,7 +9,7 @@ import java.util.List;
* *
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a> * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/ */
public class MqttPacketUnsubscribe implements BinaryStruct{ public class MqttPacketUnsubscribe extends MqttPacketHeader{
// Variable Header // Variable Header

View file

@ -10,7 +10,7 @@ import java.util.List;
* *
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a> * @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/ */
public class MqttPacketUnsubscribeAck implements BinaryStruct{ public class MqttPacketUnsubscribeAck extends MqttPacketHeader{
// Variable Header // Variable Header

37
src/zutil/net/threaded/ThreadedTCPNetworkServer.java Normal file → Executable file
View file

@ -35,6 +35,8 @@ import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException; import java.security.NoSuchProviderException;
import java.security.cert.CertificateException; import java.security.cert.CertificateException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -47,7 +49,8 @@ import java.util.logging.Logger;
public abstract class ThreadedTCPNetworkServer extends Thread{ public abstract class ThreadedTCPNetworkServer extends Thread{
private static final Logger logger = LogUtil.getLogger(); private static final Logger logger = LogUtil.getLogger();
public final int port; private final int port;
private Executor executor;
private File keyStore; private File keyStore;
private String keyStorePass; private String keyStorePass;
@ -62,45 +65,47 @@ public abstract class ThreadedTCPNetworkServer extends Thread{
/** /**
* Creates a new instance of the sever * Creates a new instance of the sever
* *
* @param port The port that the server should listen to * @param port The port that the server should listen to
* @param sslCert If this is not null then the server will use SSL connection with this keyStore file path * @param keyStore If this is not null then the server will use SSL connection with this keyStore file path
* @param sslCert If this is not null then the server will use a SSL connection with the given certificate * @param keyStorePass If this is not null then the server will use a SSL connection with the given certificate
*/ */
public ThreadedTCPNetworkServer(int port, File keyStore, String keyStorePass){ public ThreadedTCPNetworkServer(int port, File keyStore, String keyStorePass){
this.port = port; this.port = port;
executor = Executors.newCachedThreadPool();
this.keyStorePass = keyStorePass; this.keyStorePass = keyStorePass;
this.keyStore = keyStore; this.keyStore = keyStore;
} }
public void run(){ public void run(){
ServerSocket ss = null; ServerSocket serverSocket = null;
try{ try{
if(keyStorePass != null && keyStore != null){ if(keyStorePass != null && keyStore != null){
registerCertificate(keyStore, keyStorePass); registerCertificate(keyStore, keyStorePass);
ss = initSSL( port ); serverSocket = initSSL( port );
} }
else{ else{
ss = new ServerSocket( port ); serverSocket = new ServerSocket( port );
} }
logger.info("Listening for TCP Connections on port: "+port); logger.info("Listening for TCP Connections on port: "+port);
while(true){ while(true){
Socket s = ss.accept(); Socket connectionSocket = serverSocket.accept();
ThreadedTCPNetworkServerThread t = getThreadInstance( s ); ThreadedTCPNetworkServerThread thread = getThreadInstance( connectionSocket );
if( t!=null ) if( thread!=null ) {
new Thread( t ).start(); executor.execute(thread);
}
else{ else{
logger.severe("Unable to instantiate ThreadedTCPNetworkServerThread, closing connection!"); logger.severe("Unable to instantiate ThreadedTCPNetworkServerThread, closing connection!");
s.close(); connectionSocket.close();
} }
} }
} catch(Exception e) { } catch(Exception e) {
logger.log(Level.SEVERE, null, e); logger.log(Level.SEVERE, null, e);
} finally { } finally {
if( ss!=null ){ if( serverSocket!=null ){
try{ try{
ss.close(); serverSocket.close();
}catch(IOException e){ logger.log(Level.SEVERE, null, e); } }catch(IOException e){ logger.log(Level.SEVERE, null, e); }
} }
} }
@ -114,7 +119,7 @@ public abstract class ThreadedTCPNetworkServer extends Thread{
* @param s is an new connection to an host * @param s is an new connection to an host
* @return a new instance of an thread or null * @return a new instance of an thread or null
*/ */
protected abstract ThreadedTCPNetworkServerThread getThreadInstance( Socket s ); protected abstract ThreadedTCPNetworkServerThread getThreadInstance( Socket s ) throws IOException;
/** /**
* Initiates a SSLServerSocket * Initiates a SSLServerSocket
@ -132,7 +137,7 @@ public abstract class ThreadedTCPNetworkServer extends Thread{
/** /**
* Registers the given cert file to the KeyStore * Registers the given cert file to the KeyStore
* *
* @param certFile The cert file * @param keyStore The cert file
*/ */
protected void registerCertificate(File keyStore, String keyStorePass) throws CertificateException, IOException, KeyStoreException, NoSuchProviderException, NoSuchAlgorithmException{ protected void registerCertificate(File keyStore, String keyStorePass) throws CertificateException, IOException, KeyStoreException, NoSuchProviderException, NoSuchAlgorithmException{
System.setProperty("javax.net.ssl.keyStore", keyStore.getAbsolutePath()); System.setProperty("javax.net.ssl.keyStore", keyStore.getAbsolutePath());

View file

@ -25,8 +25,7 @@
package zutil.net.threaded; package zutil.net.threaded;
/** /**
* The class that will handle the a TCP connection will include * Interface for handling a single connection
* this interface
* *
* @author Ziver * @author Ziver
* *

2
src/zutil/net/update/UpdateServer.java Normal file → Executable file
View file

@ -74,7 +74,7 @@ public class UpdateServer extends ThreadedTCPNetworkServer{
/** /**
* Creates a UpdateServerThread * Creates a UpdateServerThread
* *
* @param client is the socket to the client * @param c is the socket to the client
*/ */
public UpdateServerThread(Socket c){ public UpdateServerThread(Socket c){
socket = c; socket = c;

View file

@ -107,6 +107,27 @@ public class BinaryStructInputStream {
return totalReadLength; return totalReadLength;
} }
/**
* @see InputStream#markSupported()
*/
public boolean markSupported(){
return in.markSupported();
}
/**
* @see InputStream#mark(int)
*/
public void mark(int limit){
in.mark(limit);
}
/**
* @see InputStream#reset()
*/
public void reset() throws IOException {
in.reset();
}
protected static int shiftLeftBy(int bitIndex, int bitLength){ protected static int shiftLeftBy(int bitIndex, int bitLength){
int shiftBy = (8 - ((7-bitIndex) + bitLength) % 8) % 8; int shiftBy = (8 - ((7-bitIndex) + bitLength) % 8) % 8;
return shiftBy; return shiftBy;

View file

@ -65,6 +65,19 @@ public class BinaryStructOutputStream {
return buffer.toByteArray(); return buffer.toByteArray();
} }
/**
* @see OutputStream#write(byte[])
*/
public void write(byte b[]) throws IOException {
out.write(b);
}
/**
* @see OutputStream#write(byte[], int, int)
*/
public void write(byte b[], int off, int len) throws IOException {
out.write(b, off, len);
}
/** /**
* Generate a binary stream from the provided struct and * Generate a binary stream from the provided struct and
* write the data to the underlying stream. * write the data to the underlying stream.