From edd8ee0341ac362a785785518351380a53ffbf68 Mon Sep 17 00:00:00 2001 From: Ziver Koc Date: Thu, 8 Nov 2018 18:12:56 +0100 Subject: [PATCH] Updated the way packet type is set and added TC --- src/zutil/net/mqtt/MqttBroker.java | 95 +++++++++++-------- src/zutil/net/mqtt/packet/MqttPacket.java | 14 --- .../net/mqtt/packet/MqttPacketConnect.java | 6 ++ .../net/mqtt/packet/MqttPacketConnectAck.java | 5 + .../net/mqtt/packet/MqttPacketDisconnect.java | 6 ++ .../net/mqtt/packet/MqttPacketPingReq.java | 6 ++ .../net/mqtt/packet/MqttPacketPingResp.java | 6 ++ .../net/mqtt/packet/MqttPacketPublish.java | 7 +- .../net/mqtt/packet/MqttPacketPublishAck.java | 6 ++ .../mqtt/packet/MqttPacketPublishComp.java | 6 ++ .../net/mqtt/packet/MqttPacketPublishRec.java | 6 ++ .../net/mqtt/packet/MqttPacketPublishRel.java | 6 ++ .../net/mqtt/packet/MqttPacketSubscribe.java | 6 ++ .../mqtt/packet/MqttPacketSubscribeAck.java | 6 ++ .../mqtt/packet/MqttPacketUnsubscribe.java | 6 ++ .../mqtt/packet/MqttPacketUnsubscribeAck.java | 6 ++ test/zutil/net/mqtt/MqttBrokerTest.java | 32 +++++++ 17 files changed, 173 insertions(+), 52 deletions(-) create mode 100644 test/zutil/net/mqtt/MqttBrokerTest.java diff --git a/src/zutil/net/mqtt/MqttBroker.java b/src/zutil/net/mqtt/MqttBroker.java index 93109c0..aeb7bd5 100755 --- a/src/zutil/net/mqtt/MqttBroker.java +++ b/src/zutil/net/mqtt/MqttBroker.java @@ -17,7 +17,7 @@ import java.util.logging.Logger; * * @see MQTT v3.1.1 Spec */ -public class MqttBroker extends ThreadedTCPNetworkServer{ +public class MqttBroker extends ThreadedTCPNetworkServer { private static final Logger logger = LogUtil.getLogger(); public static final int MQTT_PORT = 1883; @@ -25,24 +25,28 @@ public class MqttBroker extends ThreadedTCPNetworkServer{ public static final int MQTT_PROTOCOL_VERSION = 0x04; // MQTT 3.1.1 - public MqttBroker(){ + public MqttBroker() { super(MQTT_PORT); } @Override protected ThreadedTCPNetworkServerThread getThreadInstance(Socket s) throws IOException { - return new MQTTConnectionThread(s); + return new MqttConnectionThread(s); } - private static class MQTTConnectionThread implements ThreadedTCPNetworkServerThread { + protected static class MqttConnectionThread implements ThreadedTCPNetworkServerThread { private Socket socket; private BinaryStructInputStream in; private BinaryStructOutputStream out; + private boolean shutdown = false; - private MQTTConnectionThread(Socket s) throws IOException { + protected MqttConnectionThread() { + } // Test constructor + + public MqttConnectionThread(Socket s) throws IOException { socket = s; in = new BinaryStructInputStream(socket.getInputStream()); out = new BinaryStructOutputStream(socket.getOutputStream()); @@ -53,53 +57,42 @@ public class MqttBroker extends ThreadedTCPNetworkServer{ public void run() { try { // Setup connection - MqttPacketHeader packet = MqttPacket.read(in); + MqttPacketHeader connectPacket = MqttPacket.read(in); // Unexpected packet? - if ( ! (packet instanceof MqttPacketConnect)) - throw new IOException("Expected MqttPacketConnect but received "+packet.getClass()); - MqttPacketConnect conn = (MqttPacketConnect) packet; + if (!(connectPacket instanceof MqttPacketConnect)) + throw new IOException("Expected MqttPacketConnect but received " + connectPacket.getClass()); + MqttPacketConnect conn = (MqttPacketConnect) connectPacket; // Reply - MqttPacketConnectAck connack = new MqttPacketConnectAck(); - connack.returnCode = MqttPacketConnectAck.RETCODE_OK; + MqttPacketConnectAck connectAck = new MqttPacketConnectAck(); + // Incorrect protocol version? - if (conn.protocolLevel != MQTT_PROTOCOL_VERSION){ - connack.returnCode = MqttPacketConnectAck.RETCODE_PROT_VER_ERROR; - MqttPacket.write(out, connack); + if (conn.protocolLevel != MQTT_PROTOCOL_VERSION) { + connectAck.returnCode = MqttPacketConnectAck.RETCODE_PROT_VER_ERROR; + MqttPacket.write(out, connectAck); return; + } else { + connectAck.returnCode = MqttPacketConnectAck.RETCODE_OK; } + // TODO: authenticate // TODO: clean session - MqttPacket.write(out, connack); + MqttPacket.write(out, connectAck); // Connected - while (true) { - packet = MqttPacket.read(in); + + while (!shutdown) { + MqttPacketHeader packet = MqttPacket.read(in); if (packet == null) return; - switch (packet.type){ - // Publish - case MqttPacketHeader.PACKET_TYPE_PUBLISH: - break; - // Subscribe - case MqttPacketHeader.PACKET_TYPE_SUBSCRIBE: - break; - // Unsubscribe - case MqttPacketHeader.PACKET_TYPE_UNSUBSCRIBE: - break; - // Ping - case MqttPacketHeader.PACKET_TYPE_PINGREQ: - MqttPacket.write(out, new MqttPacketPingResp()); - break; - // Close connection - default: - logger.warning("Received unknown packet type: "+packet.type); - case MqttPacketHeader.PACKET_TYPE_DISCONNECT: - return; - } + MqttPacketHeader packetRsp = handleMqttPacket(packet); + + if (packetRsp != null) + MqttPacket.write(out, packetRsp); } + socket.close(); } catch (IOException e) { logger.log(Level.SEVERE, null, e); } finally { @@ -110,5 +103,33 @@ public class MqttBroker extends ThreadedTCPNetworkServer{ } } } + + public MqttPacketHeader handleMqttPacket(MqttPacketHeader packet) throws IOException { + switch (packet.type) { + // TODO: Publish + case MqttPacketHeader.PACKET_TYPE_PUBLISH: + break; + // TODO: Subscribe + case MqttPacketHeader.PACKET_TYPE_SUBSCRIBE: + break; + // TODO: Unsubscribe + case MqttPacketHeader.PACKET_TYPE_UNSUBSCRIBE: + break; + // Ping + case MqttPacketHeader.PACKET_TYPE_PINGREQ: + return new MqttPacketPingResp(); + // Close connection + default: + logger.warning("Received unknown packet type: " + packet.type); + case MqttPacketHeader.PACKET_TYPE_DISCONNECT: + shutdown = true; + } + + return null; + } + + public boolean isShutdown() { + return shutdown; + } } } diff --git a/src/zutil/net/mqtt/packet/MqttPacket.java b/src/zutil/net/mqtt/packet/MqttPacket.java index 258dd5a..4de1a62 100755 --- a/src/zutil/net/mqtt/packet/MqttPacket.java +++ b/src/zutil/net/mqtt/packet/MqttPacket.java @@ -48,20 +48,6 @@ public class MqttPacket { } public static void write(BinaryStructOutputStream out, MqttPacketHeader header) throws IOException{ - if (header instanceof MqttPacketConnect) header.type = PACKET_TYPE_CONN; - else if (header instanceof MqttPacketConnectAck) header.type = PACKET_TYPE_CONNACK; - else if (header instanceof MqttPacketPublishAck) header.type = PACKET_TYPE_PUBLISH; - else if (header instanceof MqttPacketPublishRec) header.type = PACKET_TYPE_PUBACK; - else if (header instanceof MqttPacketPublishComp) header.type = PACKET_TYPE_PUBREL; - else if (header instanceof MqttPacketSubscribe) header.type = PACKET_TYPE_PUBCOMP; - else if (header instanceof MqttPacketSubscribeAck) header.type = PACKET_TYPE_SUBSCRIBE; - else if (header instanceof MqttPacketUnsubscribe) header.type = PACKET_TYPE_UNSUBSCRIBE; - else if (header instanceof MqttPacketUnsubscribeAck) header.type = PACKET_TYPE_UNSUBACK; - else if (header instanceof MqttPacketPingReq) header.type = PACKET_TYPE_PINGREQ; - else if (header instanceof MqttPacketPingResp) header.type = PACKET_TYPE_PINGRESP; - else if (header instanceof MqttPacketDisconnect) header.type = PACKET_TYPE_DISCONNECT; - else - throw new IOException("Unknown header class: "+ header.getClass()); out.write(header); // TODO: payload diff --git a/src/zutil/net/mqtt/packet/MqttPacketConnect.java b/src/zutil/net/mqtt/packet/MqttPacketConnect.java index e808c0a..abebbe4 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketConnect.java +++ b/src/zutil/net/mqtt/packet/MqttPacketConnect.java @@ -9,6 +9,12 @@ package zutil.net.mqtt.packet; */ public class MqttPacketConnect extends MqttPacketHeader { + // Header + + { + type = MqttPacketHeader.PACKET_TYPE_CONN; + } + // Variable header @BinaryField(index = 2001, length = 16) diff --git a/src/zutil/net/mqtt/packet/MqttPacketConnectAck.java b/src/zutil/net/mqtt/packet/MqttPacketConnectAck.java index b620097..323966c 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketConnectAck.java +++ b/src/zutil/net/mqtt/packet/MqttPacketConnectAck.java @@ -14,6 +14,11 @@ public class MqttPacketConnectAck extends MqttPacketHeader{ public static final int RETCODE_BADD_USER_OR_PASS = 4; public static final int RETCODE_NOT_AUTHORIZED = 5; + // Header + + { + type = MqttPacketHeader.PACKET_TYPE_CONNACK; + } // Variable header diff --git a/src/zutil/net/mqtt/packet/MqttPacketDisconnect.java b/src/zutil/net/mqtt/packet/MqttPacketDisconnect.java index 2653a57..bd41ef3 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketDisconnect.java +++ b/src/zutil/net/mqtt/packet/MqttPacketDisconnect.java @@ -9,6 +9,12 @@ package zutil.net.mqtt.packet; */ public class MqttPacketDisconnect extends MqttPacketHeader{ + // Header + + { + type = MqttPacketHeader.PACKET_TYPE_DISCONNECT; + } + // No variable header // No payload diff --git a/src/zutil/net/mqtt/packet/MqttPacketPingReq.java b/src/zutil/net/mqtt/packet/MqttPacketPingReq.java index 232ffa7..2f173b8 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPingReq.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPingReq.java @@ -7,6 +7,12 @@ package zutil.net.mqtt.packet; */ public class MqttPacketPingReq extends MqttPacketHeader{ + // Header + + { + type = MqttPacketHeader.PACKET_TYPE_PINGREQ; + } + // No variable header // No payload diff --git a/src/zutil/net/mqtt/packet/MqttPacketPingResp.java b/src/zutil/net/mqtt/packet/MqttPacketPingResp.java index a34aaca..7e9c75a 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPingResp.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPingResp.java @@ -9,6 +9,12 @@ package zutil.net.mqtt.packet; */ public class MqttPacketPingResp extends MqttPacketHeader{ + // Header + + { + type = MqttPacketHeader.PACKET_TYPE_PINGRESP; + } + // No variable header // No payload diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublish.java b/src/zutil/net/mqtt/packet/MqttPacketPublish.java index 46b4d9f..fb27a63 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublish.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublish.java @@ -8,6 +8,12 @@ package zutil.net.mqtt.packet; */ public class MqttPacketPublish extends MqttPacketHeader { + // Header + + { + type = MqttPacketHeader.PACKET_TYPE_PUBLISH; + } + // Static Header /* @BinaryField(index = 2000, length = 1) @@ -20,7 +26,6 @@ public class MqttPacketPublish extends MqttPacketHeader { @CustomBinaryField(index = 3, serializer = MqttVariableIntSerializer.class) private int length; */ - // Variable Header @BinaryField(index = 2101, length = 16) diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublishAck.java b/src/zutil/net/mqtt/packet/MqttPacketPublishAck.java index 39f593a..0e099a5 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublishAck.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublishAck.java @@ -8,6 +8,12 @@ package zutil.net.mqtt.packet; */ public class MqttPacketPublishAck extends MqttPacketHeader{ + // Header + + { + type = MqttPacketHeader.PACKET_TYPE_PUBACK; + } + // Variable Header @BinaryField(index = 2000, length = 16) diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublishComp.java b/src/zutil/net/mqtt/packet/MqttPacketPublishComp.java index 634f544..3b2f126 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublishComp.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublishComp.java @@ -10,6 +10,12 @@ package zutil.net.mqtt.packet; */ public class MqttPacketPublishComp extends MqttPacketHeader{ + // Header + + { + type = MqttPacketHeader.PACKET_TYPE_PUBCOMP; + } + // Variable Header @BinaryField(index = 2000, length = 16) diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublishRec.java b/src/zutil/net/mqtt/packet/MqttPacketPublishRec.java index 81d8952..777c4a5 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublishRec.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublishRec.java @@ -10,6 +10,12 @@ package zutil.net.mqtt.packet; */ public class MqttPacketPublishRec extends MqttPacketHeader{ + // Header + + { + type = MqttPacketHeader.PACKET_TYPE_PUBREC; + } + // Variable Header @BinaryField(index = 2000, length = 16) diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublishRel.java b/src/zutil/net/mqtt/packet/MqttPacketPublishRel.java index 36b8588..6beb61c 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublishRel.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublishRel.java @@ -10,6 +10,12 @@ package zutil.net.mqtt.packet; */ public class MqttPacketPublishRel extends MqttPacketHeader{ + // Header + + { + type = MqttPacketHeader.PACKET_TYPE_PUBREL; + } + // Variable Header @BinaryField(index = 2000, length = 16) diff --git a/src/zutil/net/mqtt/packet/MqttPacketSubscribe.java b/src/zutil/net/mqtt/packet/MqttPacketSubscribe.java index 3f67b06..154aa93 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketSubscribe.java +++ b/src/zutil/net/mqtt/packet/MqttPacketSubscribe.java @@ -11,6 +11,12 @@ import java.util.List; */ public class MqttPacketSubscribe extends MqttPacketHeader{ + // Header + + { + type = MqttPacketHeader.PACKET_TYPE_SUBSCRIBE; + } + // Variable Header @BinaryField(index = 2000, length = 16) diff --git a/src/zutil/net/mqtt/packet/MqttPacketSubscribeAck.java b/src/zutil/net/mqtt/packet/MqttPacketSubscribeAck.java index 141b376..37fccf0 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketSubscribeAck.java +++ b/src/zutil/net/mqtt/packet/MqttPacketSubscribeAck.java @@ -12,6 +12,12 @@ import java.util.List; */ public class MqttPacketSubscribeAck extends MqttPacketHeader{ + // Header + + { + type = MqttPacketHeader.PACKET_TYPE_SUBACK; + } + // Variable Header @BinaryField(index = 2000, length = 16) diff --git a/src/zutil/net/mqtt/packet/MqttPacketUnsubscribe.java b/src/zutil/net/mqtt/packet/MqttPacketUnsubscribe.java index fbe539f..b7d9a13 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketUnsubscribe.java +++ b/src/zutil/net/mqtt/packet/MqttPacketUnsubscribe.java @@ -11,6 +11,12 @@ import java.util.List; */ public class MqttPacketUnsubscribe extends MqttPacketHeader{ + // Header + + { + type = MqttPacketHeader.PACKET_TYPE_UNSUBACK; + } + // Variable Header @BinaryField(index = 2000, length = 16) diff --git a/src/zutil/net/mqtt/packet/MqttPacketUnsubscribeAck.java b/src/zutil/net/mqtt/packet/MqttPacketUnsubscribeAck.java index 9259814..b125439 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketUnsubscribeAck.java +++ b/src/zutil/net/mqtt/packet/MqttPacketUnsubscribeAck.java @@ -9,6 +9,12 @@ package zutil.net.mqtt.packet; */ public class MqttPacketUnsubscribeAck extends MqttPacketHeader{ + // Header + + { + type = MqttPacketHeader.PACKET_TYPE_UNSUBACK; + } + // Variable Header @BinaryField(index = 2000, length = 16) diff --git a/test/zutil/net/mqtt/MqttBrokerTest.java b/test/zutil/net/mqtt/MqttBrokerTest.java new file mode 100644 index 0000000..1e4a5d3 --- /dev/null +++ b/test/zutil/net/mqtt/MqttBrokerTest.java @@ -0,0 +1,32 @@ +package zutil.net.mqtt; + +import org.junit.Test; +import zutil.net.mqtt.MqttBroker.MqttConnectionThread; +import zutil.net.mqtt.packet.MqttPacketDisconnect; +import zutil.net.mqtt.packet.MqttPacketPingReq; +import zutil.net.mqtt.packet.MqttPacketPingResp; + +import java.io.IOException; + +import static org.junit.Assert.*; + +public class MqttBrokerTest { + + + @Test + public void ping() throws IOException { + MqttConnectionThread thread = new MqttConnectionThread(); + MqttPacketPingReq pingPacket = new MqttPacketPingReq(); + + assertEquals(MqttPacketPingResp.class, thread.handleMqttPacket(pingPacket).getClass()); + } + + @Test + public void disconnect() throws IOException { + MqttConnectionThread thread = new MqttConnectionThread(); + MqttPacketDisconnect disconnectPacket = new MqttPacketDisconnect(); + + assertEquals(null, thread.handleMqttPacket(disconnectPacket)); + assertTrue(thread.isShutdown()); + } +} \ No newline at end of file