diff --git a/src/zutil/net/mqtt/MqttBroker.java b/src/zutil/net/mqtt/MqttBroker.java new file mode 100755 index 0000000..32be57c --- /dev/null +++ b/src/zutil/net/mqtt/MqttBroker.java @@ -0,0 +1,114 @@ +package zutil.net.mqtt; + +import zutil.log.LogUtil; +import zutil.net.mqtt.packet.*; +import zutil.net.threaded.ThreadedTCPNetworkServer; +import zutil.net.threaded.ThreadedTCPNetworkServerThread; +import zutil.parser.binary.BinaryStructInputStream; +import zutil.parser.binary.BinaryStructOutputStream; + +import java.io.IOException; +import java.net.Socket; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * TODO: + * + * @see MQTT v3.1.1 Spec + */ +public class MqttBroker extends ThreadedTCPNetworkServer{ + private static final Logger logger = LogUtil.getLogger(); + + public static final int MQTT_PORT = 1883; + public static final int MQTT_PORT_TLS = 8883; + public static final int MQTT_PROTOCOL_VERSION = 0x04; // MQTT 3.1.1 + + + public MqttBroker(){ + super(MQTT_PORT); + } + + + @Override + protected ThreadedTCPNetworkServerThread getThreadInstance(Socket s) throws IOException { + return new MQTTConnectionThread(s); + } + + + private static class MQTTConnectionThread implements ThreadedTCPNetworkServerThread { + private Socket socket; + private BinaryStructInputStream in; + private BinaryStructOutputStream out; + + + private MQTTConnectionThread(Socket s) throws IOException { + socket = s; + in = new BinaryStructInputStream(socket.getInputStream()); + out = new BinaryStructOutputStream(socket.getOutputStream()); + } + + + @Override + public void run() { + try { + // Setup connection + MqttPacket packet = MqttPacket.read(in); + // Unexpected packet? + if ( ! (packet.header instanceof MqttPacketConnect)) + throw new IOException("Expected MqttPacketConnect but received "+packet.header.getClass()); + MqttPacketConnect conn = (MqttPacketConnect) packet.header; + + // Reply + MqttPacketConnectAck connack = new MqttPacketConnectAck(); + connack.returnCode = MqttPacketConnectAck.RETCODE_OK; + // Incorrect protocol version? + if (conn.protocolLevel != MQTT_PROTOCOL_VERSION){ + connack.returnCode = MqttPacketConnectAck.RETCODE_PROT_VER_ERROR; + MqttPacket.write(out, connack); + return; + } + // TODO: authenticate + // TODO: clean session + MqttPacket.write(out, connack); + + // Connected + while (true) { + packet = MqttPacket.read(in); + if (packet == null) + return; + + switch (packet.header.type){ + // Publish + case MqttPacketHeader.PACKET_TYPE_PUBLISH: + break; + // Subscribe + case MqttPacketHeader.PACKET_TYPE_SUBSCRIBE: + break; + // Unsubscribe + case MqttPacketHeader.PACKET_TYPE_UNSUBSCRIBE: + break; + // Ping + case MqttPacketHeader.PACKET_TYPE_PINGREQ: + MqttPacket.write(out, new MqttPacketPingResp()); + break; + // Close connection + default: + logger.warning("Received unknown packet type: "+packet.header.type); + case MqttPacketHeader.PACKET_TYPE_DISCONNECT: + return; + } + } + + } catch (IOException e) { + logger.log(Level.SEVERE, null, e); + } finally { + try { + socket.close(); + } catch (IOException e) { + logger.log(Level.SEVERE, null, e); + } + } + } + } +} diff --git a/src/zutil/net/mqtt/MqttClient.java b/src/zutil/net/mqtt/MqttClient.java new file mode 100755 index 0000000..ef61297 --- /dev/null +++ b/src/zutil/net/mqtt/MqttClient.java @@ -0,0 +1,9 @@ +package zutil.net.mqtt; + +/** + * TODO: + * + * @see MQTT v3.1.1 Spec + */ +public class MqttClient { +} diff --git a/src/zutil/net/mqtt/packet/MqttPacketConnect.java b/src/zutil/net/mqtt/packet/MqttPacketConnect.java index 3b46568..dbe475d 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketConnect.java +++ b/src/zutil/net/mqtt/packet/MqttPacketConnect.java @@ -66,6 +66,11 @@ public class MqttPacketConnect extends MqttPacketHeader { public int keepAlive; - // Payload + // Payload: + // - Client identifier + // - Will Topic + // - Will message + // - User name + // - Password } diff --git a/src/zutil/net/mqtt/packet/MqttPacketConnectAck.java b/src/zutil/net/mqtt/packet/MqttPacketConnectAck.java index 029c919..f09b66e 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketConnectAck.java +++ b/src/zutil/net/mqtt/packet/MqttPacketConnectAck.java @@ -17,13 +17,16 @@ public class MqttPacketConnectAck extends MqttPacketHeader{ public static final int RETCODE_NOT_AUTHORIZED = 5; + // Variable header + @BinaryField(index = 2000, length = 7) private int flagReserved; /** Indicates that there is a valid Session available */ @BinaryField(index = 2001, length = 1) - public int flagSessionPresent; + public boolean flagSessionPresent; @BinaryField(index = 2002, length = 8) public int returnCode; + // No payload } diff --git a/src/zutil/net/mqtt/packet/MqttPacketDisconnect.java b/src/zutil/net/mqtt/packet/MqttPacketDisconnect.java index dba34b9..1c2aaac 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketDisconnect.java +++ b/src/zutil/net/mqtt/packet/MqttPacketDisconnect.java @@ -10,5 +10,7 @@ import zutil.parser.binary.BinaryStruct; */ public class MqttPacketDisconnect extends MqttPacketHeader{ + // No variable header + // No payload } diff --git a/src/zutil/net/mqtt/packet/MqttPacketHeader.java b/src/zutil/net/mqtt/packet/MqttPacketHeader.java index 06136fa..eb6f6a1 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketHeader.java +++ b/src/zutil/net/mqtt/packet/MqttPacketHeader.java @@ -25,6 +25,7 @@ public class MqttPacketHeader implements BinaryStruct { // RESERVED = 15 + // Fixed Header @BinaryField(index = 1, length = 4) public byte type; diff --git a/src/zutil/net/mqtt/packet/MqttPacketPingReq.java b/src/zutil/net/mqtt/packet/MqttPacketPingReq.java index 70ab34a..cba2d97 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPingReq.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPingReq.java @@ -9,5 +9,7 @@ import zutil.parser.binary.BinaryStruct; */ public class MqttPacketPingReq extends MqttPacketHeader{ + // No variable header + // No payload } diff --git a/src/zutil/net/mqtt/packet/MqttPacketPingResp.java b/src/zutil/net/mqtt/packet/MqttPacketPingResp.java index e9d1465..4f523de 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPingResp.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPingResp.java @@ -10,5 +10,7 @@ import zutil.parser.binary.BinaryStruct; */ public class MqttPacketPingResp extends MqttPacketHeader{ + // No variable header + // No payload } diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublish.java b/src/zutil/net/mqtt/packet/MqttPacketPublish.java index 5e6c075..a4fcfc9 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublish.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublish.java @@ -34,6 +34,7 @@ public class MqttPacketPublish extends MqttPacketHeader{ public int packetId; - // Optional Payload + // Payload + // - Application data } diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublishAck.java b/src/zutil/net/mqtt/packet/MqttPacketPublishAck.java index b33c8b2..df63aa2 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublishAck.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublishAck.java @@ -14,5 +14,5 @@ public class MqttPacketPublishAck extends MqttPacketHeader{ @BinaryField(index = 2000, length = 16) public int packetId; - + // No payload } diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublishComp.java b/src/zutil/net/mqtt/packet/MqttPacketPublishComp.java index 821a10b..6119222 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublishComp.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublishComp.java @@ -16,5 +16,5 @@ public class MqttPacketPublishComp extends MqttPacketHeader{ @BinaryField(index = 2000, length = 16) public int packetId; - + // No payload } diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublishRec.java b/src/zutil/net/mqtt/packet/MqttPacketPublishRec.java index eef81fa..a761f83 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublishRec.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublishRec.java @@ -16,5 +16,5 @@ public class MqttPacketPublishRec extends MqttPacketHeader{ @BinaryField(index = 2000, length = 16) public int packetId; - + // No payload } diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublishRel.java b/src/zutil/net/mqtt/packet/MqttPacketPublishRel.java index 7693d8f..51eb5e0 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublishRel.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublishRel.java @@ -16,5 +16,5 @@ public class MqttPacketPublishRel extends MqttPacketHeader{ @BinaryField(index = 2000, length = 16) public int packetId; - + // No Payload } diff --git a/src/zutil/net/mqtt/packet/MqttPacketUnsubscribeAck.java b/src/zutil/net/mqtt/packet/MqttPacketUnsubscribeAck.java index 500e5fa..4890192 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketUnsubscribeAck.java +++ b/src/zutil/net/mqtt/packet/MqttPacketUnsubscribeAck.java @@ -17,4 +17,5 @@ public class MqttPacketUnsubscribeAck extends MqttPacketHeader{ @BinaryField(index = 2000, length = 16) public int packetId; + // No payload }