diff --git a/src/zutil/net/mqtt/MqttBroker.java b/src/zutil/net/mqtt/MqttBroker.java index 32be57c..93109c0 100755 --- a/src/zutil/net/mqtt/MqttBroker.java +++ b/src/zutil/net/mqtt/MqttBroker.java @@ -53,11 +53,11 @@ public class MqttBroker extends ThreadedTCPNetworkServer{ public void run() { try { // Setup connection - MqttPacket packet = MqttPacket.read(in); + MqttPacketHeader packet = MqttPacket.read(in); // Unexpected packet? - if ( ! (packet.header instanceof MqttPacketConnect)) - throw new IOException("Expected MqttPacketConnect but received "+packet.header.getClass()); - MqttPacketConnect conn = (MqttPacketConnect) packet.header; + if ( ! (packet instanceof MqttPacketConnect)) + throw new IOException("Expected MqttPacketConnect but received "+packet.getClass()); + MqttPacketConnect conn = (MqttPacketConnect) packet; // Reply MqttPacketConnectAck connack = new MqttPacketConnectAck(); @@ -78,7 +78,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer{ if (packet == null) return; - switch (packet.header.type){ + switch (packet.type){ // Publish case MqttPacketHeader.PACKET_TYPE_PUBLISH: break; @@ -94,7 +94,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer{ break; // Close connection default: - logger.warning("Received unknown packet type: "+packet.header.type); + logger.warning("Received unknown packet type: "+packet.type); case MqttPacketHeader.PACKET_TYPE_DISCONNECT: return; } diff --git a/src/zutil/net/mqtt/packet/MqttPacket.java b/src/zutil/net/mqtt/packet/MqttPacket.java index fa80599..258dd5a 100755 --- a/src/zutil/net/mqtt/packet/MqttPacket.java +++ b/src/zutil/net/mqtt/packet/MqttPacket.java @@ -1,7 +1,6 @@ package zutil.net.mqtt.packet; -import zutil.parser.binary.BinaryStruct; import zutil.parser.binary.BinaryStructInputStream; import zutil.parser.binary.BinaryStructOutputStream; @@ -14,40 +13,35 @@ import static zutil.net.mqtt.packet.MqttPacketHeader.*; */ public class MqttPacket { - public MqttPacketHeader header; - - private MqttPacket() {} - - public static MqttPacket read(BinaryStructInputStream in) throws IOException { - MqttPacket packet = new MqttPacket(); + public static MqttPacketHeader read(BinaryStructInputStream in) throws IOException { + MqttPacketHeader packet = new MqttPacketHeader(); // Peek into stream and find packet type in.mark(10); - packet.header = new MqttPacketHeader(); - in.read(packet.header); + in.read(packet); in.reset(); // Resolve the correct header class - switch (packet.header.type){ - case PACKET_TYPE_CONN: packet.header = new MqttPacketConnect(); break; - case PACKET_TYPE_CONNACK: packet.header = new MqttPacketConnectAck(); break; - case PACKET_TYPE_PUBLISH: packet.header = new MqttPacketPublish(); break; - case PACKET_TYPE_PUBACK: packet.header = new MqttPacketPublishAck(); break; - case PACKET_TYPE_PUBREC: packet.header = new MqttPacketPublishRec(); break; - case PACKET_TYPE_PUBREL: packet.header = new MqttPacketPublishRec(); break; - case PACKET_TYPE_PUBCOMP: packet.header = new MqttPacketPublishComp(); break; - case PACKET_TYPE_SUBSCRIBE: packet.header = new MqttPacketSubscribe(); break; - case PACKET_TYPE_SUBACK: packet.header = new MqttPacketSubscribeAck(); break; - case PACKET_TYPE_UNSUBSCRIBE: packet.header = new MqttPacketUnsubscribe(); break; - case PACKET_TYPE_UNSUBACK: packet.header = new MqttPacketUnsubscribeAck(); break; - case PACKET_TYPE_PINGREQ: packet.header = new MqttPacketPingReq(); break; - case PACKET_TYPE_PINGRESP: packet.header = new MqttPacketPingResp(); break; - case PACKET_TYPE_DISCONNECT: packet.header = new MqttPacketDisconnect(); break; + switch (packet.type){ + case PACKET_TYPE_CONN: packet = new MqttPacketConnect(); break; + case PACKET_TYPE_CONNACK: packet = new MqttPacketConnectAck(); break; + case PACKET_TYPE_PUBLISH: packet = new MqttPacketPublish(); break; + case PACKET_TYPE_PUBACK: packet = new MqttPacketPublishAck(); break; + case PACKET_TYPE_PUBREC: packet = new MqttPacketPublishRec(); break; + case PACKET_TYPE_PUBREL: packet = new MqttPacketPublishRec(); break; + case PACKET_TYPE_PUBCOMP: packet = new MqttPacketPublishComp(); break; + case PACKET_TYPE_SUBSCRIBE: packet = new MqttPacketSubscribe(); break; + case PACKET_TYPE_SUBACK: packet = new MqttPacketSubscribeAck(); break; + case PACKET_TYPE_UNSUBSCRIBE: packet = new MqttPacketUnsubscribe(); break; + case PACKET_TYPE_UNSUBACK: packet = new MqttPacketUnsubscribeAck(); break; + case PACKET_TYPE_PINGREQ: packet = new MqttPacketPingReq(); break; + case PACKET_TYPE_PINGRESP: packet = new MqttPacketPingResp(); break; + case PACKET_TYPE_DISCONNECT: packet = new MqttPacketDisconnect(); break; default: - throw new IOException("Unknown header type: "+ packet.header.type); + throw new IOException("Unknown header type: "+ packet.type); } - in.read(packet.header); + in.read(packet); // TODO: payload return packet; diff --git a/src/zutil/net/mqtt/packet/MqttPacketConnectAck.java b/src/zutil/net/mqtt/packet/MqttPacketConnectAck.java index f09b66e..f20d8ed 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketConnectAck.java +++ b/src/zutil/net/mqtt/packet/MqttPacketConnectAck.java @@ -29,4 +29,5 @@ public class MqttPacketConnectAck extends MqttPacketHeader{ public int returnCode; // No payload + } diff --git a/src/zutil/net/mqtt/packet/MqttPacketDisconnect.java b/src/zutil/net/mqtt/packet/MqttPacketDisconnect.java index 1c2aaac..2653a57 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketDisconnect.java +++ b/src/zutil/net/mqtt/packet/MqttPacketDisconnect.java @@ -1,6 +1,5 @@ package zutil.net.mqtt.packet; -import zutil.parser.binary.BinaryStruct; /** * The DISCONNECT Packet is the final Control Packet sent from the Client to @@ -13,4 +12,5 @@ public class MqttPacketDisconnect extends MqttPacketHeader{ // No variable header // No payload + } diff --git a/src/zutil/net/mqtt/packet/MqttPacketPingReq.java b/src/zutil/net/mqtt/packet/MqttPacketPingReq.java index cba2d97..3770bb7 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPingReq.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPingReq.java @@ -12,4 +12,5 @@ public class MqttPacketPingReq extends MqttPacketHeader{ // No variable header // No payload + } diff --git a/src/zutil/net/mqtt/packet/MqttPacketPingResp.java b/src/zutil/net/mqtt/packet/MqttPacketPingResp.java index 4f523de..a34aaca 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPingResp.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPingResp.java @@ -1,6 +1,5 @@ package zutil.net.mqtt.packet; -import zutil.parser.binary.BinaryStruct; /** * A PINGRESP Packet is sent by the Server to the Client in response to a @@ -13,4 +12,6 @@ public class MqttPacketPingResp extends MqttPacketHeader{ // No variable header // No payload + + } diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublish.java b/src/zutil/net/mqtt/packet/MqttPacketPublish.java index a4fcfc9..46b4d9f 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublish.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublish.java @@ -1,13 +1,12 @@ package zutil.net.mqtt.packet; -import zutil.parser.binary.BinaryStruct; /** * A PUBLISH Control Packet is sent from a Client to a Server * * @see MQTT v3.1.1 Spec */ -public class MqttPacketPublish extends MqttPacketHeader{ +public class MqttPacketPublish extends MqttPacketHeader { // Static Header /* @@ -26,7 +25,9 @@ public class MqttPacketPublish extends MqttPacketHeader{ @BinaryField(index = 2101, length = 16) private int topicNameLength; - /** The Topic Name identifies the information channel to which controlHeader data is published. */ + /** + * The Topic Name identifies the information channel to which controlHeader data is published. + */ @VariableLengthBinaryField(index = 2102, lengthField = "topicNameLength") public String topicName; diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublishAck.java b/src/zutil/net/mqtt/packet/MqttPacketPublishAck.java index df63aa2..39f593a 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublishAck.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublishAck.java @@ -1,6 +1,5 @@ package zutil.net.mqtt.packet; -import zutil.parser.binary.BinaryStruct; /** * A PUBACK Packet is the response to a PUBLISH Packet with QoS level 1. @@ -15,4 +14,5 @@ public class MqttPacketPublishAck extends MqttPacketHeader{ public int packetId; // No payload + } diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublishComp.java b/src/zutil/net/mqtt/packet/MqttPacketPublishComp.java index 6119222..634f544 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublishComp.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublishComp.java @@ -1,6 +1,5 @@ package zutil.net.mqtt.packet; -import zutil.parser.binary.BinaryStruct; /** * Publish complete. @@ -17,4 +16,5 @@ public class MqttPacketPublishComp extends MqttPacketHeader{ public int packetId; // No payload + } diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublishRec.java b/src/zutil/net/mqtt/packet/MqttPacketPublishRec.java index a761f83..81d8952 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublishRec.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublishRec.java @@ -1,6 +1,5 @@ package zutil.net.mqtt.packet; -import zutil.parser.binary.BinaryStruct; /** * Publish received. @@ -17,4 +16,5 @@ public class MqttPacketPublishRec extends MqttPacketHeader{ public int packetId; // No payload + } diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublishRel.java b/src/zutil/net/mqtt/packet/MqttPacketPublishRel.java index 51eb5e0..36b8588 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublishRel.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublishRel.java @@ -1,6 +1,5 @@ package zutil.net.mqtt.packet; -import zutil.parser.binary.BinaryStruct; /** * Publish release. @@ -17,4 +16,5 @@ public class MqttPacketPublishRel extends MqttPacketHeader{ public int packetId; // No Payload + } diff --git a/src/zutil/net/mqtt/packet/MqttPacketSubscribe.java b/src/zutil/net/mqtt/packet/MqttPacketSubscribe.java index 81d447f..3f67b06 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketSubscribe.java +++ b/src/zutil/net/mqtt/packet/MqttPacketSubscribe.java @@ -22,7 +22,6 @@ public class MqttPacketSubscribe extends MqttPacketHeader{ - public static class MqttSubscribePayload implements BinaryStruct{ @BinaryField(index = 3001, length = 16) diff --git a/src/zutil/net/mqtt/packet/MqttPacketSubscribeAck.java b/src/zutil/net/mqtt/packet/MqttPacketSubscribeAck.java index 5d91735..141b376 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketSubscribeAck.java +++ b/src/zutil/net/mqtt/packet/MqttPacketSubscribeAck.java @@ -23,7 +23,6 @@ public class MqttPacketSubscribeAck extends MqttPacketHeader{ - public static class MqttSubscribeAckPayload implements BinaryStruct{ public static final int RETCODE_SUCESS_MAX_QOS_0 = 0; public static final int RETCODE_SUCESS_MAX_QOS_1 = 1; diff --git a/src/zutil/net/mqtt/packet/MqttPacketUnsubscribe.java b/src/zutil/net/mqtt/packet/MqttPacketUnsubscribe.java index 3aa68f5..fbe539f 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketUnsubscribe.java +++ b/src/zutil/net/mqtt/packet/MqttPacketUnsubscribe.java @@ -22,7 +22,6 @@ public class MqttPacketUnsubscribe extends MqttPacketHeader{ - public static class MqttUnsubscribePayload implements BinaryStruct{ @BinaryField(index = 3001, length = 16) diff --git a/src/zutil/net/mqtt/packet/MqttPacketUnsubscribeAck.java b/src/zutil/net/mqtt/packet/MqttPacketUnsubscribeAck.java index 4890192..9259814 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketUnsubscribeAck.java +++ b/src/zutil/net/mqtt/packet/MqttPacketUnsubscribeAck.java @@ -1,8 +1,5 @@ package zutil.net.mqtt.packet; -import zutil.parser.binary.BinaryStruct; - -import java.util.List; /** * The UNSUBACK Packet is sent by the Server to the Client to confirm receipt @@ -18,4 +15,5 @@ public class MqttPacketUnsubscribeAck extends MqttPacketHeader{ public int packetId; // No payload + } diff --git a/test/zutil/net/mqtt/packet/MqttPacketConnectTest.java b/test/zutil/net/mqtt/packet/MqttPacketConnectTest.java index 752f245..a18ea6a 100755 --- a/test/zutil/net/mqtt/packet/MqttPacketConnectTest.java +++ b/test/zutil/net/mqtt/packet/MqttPacketConnectTest.java @@ -6,6 +6,8 @@ import zutil.net.mqtt.packet.MqttPacketConnect; import zutil.parser.binary.BinaryStructInputStream; import zutil.parser.binary.BinaryStructOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import static org.junit.Assert.*; @@ -14,6 +16,10 @@ import static org.junit.Assert.*; public class MqttPacketConnectTest { char[] data = new char[]{ + // Fixed Header + 0b0001_0000, // Packet Type + Reserved + 0b0000_1010, // Variable Header + Payload Length + // Variable Header 0b0000_0000, // length 0b0000_0100, // length 0b0100_1101, // 'M' @@ -31,9 +37,11 @@ public class MqttPacketConnectTest { @Test - public void decode(){ - MqttPacketConnect obj = new MqttPacketConnect(); - BinaryStructInputStream.read(obj, Converter.toBytes(data)); + public void decode() throws IOException { + MqttPacketConnect obj = (MqttPacketConnect)MqttPacket.read( + new BinaryStructInputStream( + new ByteArrayInputStream( + Converter.toBytes(data)))); assertEquals("MQTT", obj.protocolName); assertEquals(4, obj.protocolLevel); @@ -50,6 +58,7 @@ public class MqttPacketConnectTest { @Test public void encode() throws IOException { MqttPacketConnect obj = new MqttPacketConnect(); + obj.payloadLength = 10; obj.keepAlive = 10; obj.flagUsername = true; @@ -59,7 +68,9 @@ public class MqttPacketConnectTest { obj.flagWillFlag = true; obj.flagCleanSession = true; - assertArrayEquals(Converter.toBytes(data), - BinaryStructOutputStream.serialize(obj)); + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + BinaryStructOutputStream binOut = new BinaryStructOutputStream(buffer); + MqttPacket.write(binOut, obj); + assertArrayEquals(Converter.toBytes(data), buffer.toByteArray()); } } \ No newline at end of file