diff --git a/src/zutil/net/mqtt/MqttBroker.java b/src/zutil/net/mqtt/MqttBroker.java index 8463d21..d0ea86a 100755 --- a/src/zutil/net/mqtt/MqttBroker.java +++ b/src/zutil/net/mqtt/MqttBroker.java @@ -317,7 +317,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer { // Close connection default: - logger.warning("Received unknown packet type: " + packet.type); + logger.warning("Received unknown packet type: " + packet.type + " (" + packet.getClass() + ")"); sendWillPacket(); /* FALLTHROUGH */ case MqttPacketHeader.PACKET_TYPE_DISCONNECT: diff --git a/src/zutil/net/mqtt/packet/MqttPacket.java b/src/zutil/net/mqtt/packet/MqttPacket.java index 4f3e175..26625cc 100755 --- a/src/zutil/net/mqtt/packet/MqttPacket.java +++ b/src/zutil/net/mqtt/packet/MqttPacket.java @@ -63,12 +63,11 @@ public class MqttPacket { case PACKET_TYPE_PINGRESP: packet = new MqttPacketPingResp(); break; // no payload case PACKET_TYPE_DISCONNECT: packet = new MqttPacketDisconnect(); break; // no payload default: - throw new IOException("Unknown header type: " + packet.type); + throw new IOException("Unknown packet type in header: " + packet.type); } + + // Read header and payload in.read(packet); - // TODO: payload - byte[] payload = new byte[Math.max(0, packet.variableHeaderAndPayloadLength - packet.calculateVariableHeaderLength())]; - in.read(payload); return packet; } diff --git a/src/zutil/net/mqtt/packet/MqttPacketConnect.java b/src/zutil/net/mqtt/packet/MqttPacketConnect.java index a86edac..0699d7d 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketConnect.java +++ b/src/zutil/net/mqtt/packet/MqttPacketConnect.java @@ -138,12 +138,9 @@ public class MqttPacketConnect extends MqttPacketHeader { @Override public int calculatePayloadLength() { - int length = 0; - // Each String and byte[] is prefixed with a 2 byte length value in the payload - if (!flagCleanSession) - length += 2 + clientIdentifier.length(); + int length = 2 + (clientIdentifier != null ? clientIdentifier.length() : 0); if (flagWillFlag) length += 2 + willTopic.length() + 2 + willPayload.length; if (flagUsername) @@ -165,7 +162,7 @@ public class MqttPacketConnect extends MqttPacketHeader { MqttPacketConnect packet = (MqttPacketConnect) parentObject; TwoByteLengthPrefixedDataSerializer serializer = new TwoByteLengthPrefixedDataSerializer(); - if ("clientIdentifier".equals(field.getName()) && !packet.flagCleanSession || + if ("clientIdentifier".equals(field.getName()) || "willTopic".equals(field.getName()) && packet.flagWillFlag || "willPayload".equals(field.getName()) && packet.flagWillFlag || "username".equals(field.getName()) && packet.flagUsername || @@ -181,7 +178,7 @@ public class MqttPacketConnect extends MqttPacketHeader { MqttPacketConnect packet = (MqttPacketConnect) parentObject; TwoByteLengthPrefixedDataSerializer serializer = new TwoByteLengthPrefixedDataSerializer(); - if ("clientIdentifier".equals(field.getName()) && !packet.flagCleanSession || + if ("clientIdentifier".equals(field.getName()) || "willTopic".equals(field.getName()) && packet.flagWillFlag || "willPayload".equals(field.getName()) && packet.flagWillFlag || "username".equals(field.getName()) && packet.flagUsername || diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublish.java b/src/zutil/net/mqtt/packet/MqttPacketPublish.java index ca9b2df..3510015 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublish.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublish.java @@ -26,6 +26,7 @@ package zutil.net.mqtt.packet; import zutil.ByteUtil; +import zutil.converter.Converter; import zutil.parser.binary.BinaryFieldData; import zutil.parser.binary.BinaryFieldSerializer; import zutil.parser.binary.serializer.TwoByteLengthPrefixedDataSerializer; @@ -64,13 +65,16 @@ public class MqttPacketPublish extends MqttPacketHeader { @CustomBinaryField(index = 2002, serializer = TwoByteLengthPrefixedDataSerializer.class) public String topicName; - @BinaryField(index = 2003, length = 16) + /** + * A unique identity of this packet. Only available if QOS is above 0. + */ + @CustomBinaryField(index = 2003, serializer = MqttPacketPublishPacketIdSerializer.class) public int packetId; @Override public int calculateVariableHeaderLength() { - return 4 + (topicName != null ? topicName.length() : 0); + return 2 + (topicName != null ? topicName.length() : 0) + (getFlagQoS() > 0 ? 2 : 0); } // ------------------------------------------ @@ -114,7 +118,8 @@ public class MqttPacketPublish extends MqttPacketHeader { @Override public byte[] read(InputStream in, BinaryFieldData field, Object parentObject) throws IOException { MqttPacketPublish publishPacket = (MqttPacketPublish) parentObject; - int payloadLength = Math.max(0, publishPacket.variableHeaderAndPayloadLength - publishPacket.calculateVariableHeaderLength()); + int variableLength = publishPacket.calculateVariableHeaderLength(); + int payloadLength = Math.max(0, publishPacket.variableHeaderAndPayloadLength - variableLength); byte[] payload = new byte[payloadLength]; in.read(payload); @@ -127,4 +132,37 @@ public class MqttPacketPublish extends MqttPacketHeader { out.write(obj); } } + + /** + * Only read and write Packet Identifier if QOS is above 0 + */ + private static class MqttPacketPublishPacketIdSerializer implements BinaryFieldSerializer { + @Override + public Object read(InputStream in, BinaryFieldData field, Object parentObject) throws IOException { + MqttPacketPublish publish = (MqttPacketPublish) parentObject; + + if (0 < publish.getFlagQoS()) { + byte[] b = new byte[2]; + in.read(b); + return Converter.toInt(b); + } + + return 0; + } + @Override + public Object read(InputStream in, BinaryFieldData field) throws IOException {return null;} + + @Override + public void write(OutputStream out, Object obj, BinaryFieldData field, Object parentObject) throws IOException { + MqttPacketPublish publish = (MqttPacketPublish) parentObject; + + if (0 < publish.getFlagQoS()) { + byte[] b = Converter.toBytes((int) obj); + out.write(b[1]); + out.write(b[0]); + } + } + @Override + public void write(OutputStream out, Object obj, BinaryFieldData field) throws IOException {} + } } diff --git a/src/zutil/parser/binary/serializer/BinaryStructListSerializer.java b/src/zutil/parser/binary/serializer/BinaryStructListSerializer.java index 4de0696..5f460a0 100644 --- a/src/zutil/parser/binary/serializer/BinaryStructListSerializer.java +++ b/src/zutil/parser/binary/serializer/BinaryStructListSerializer.java @@ -14,7 +14,7 @@ import java.util.List; * This class needs to be extended by a more specific subclass that can provide * the list class type and handling the flow control of the read action. * - * @param + * @param defines the class type of the items in the List. */ public abstract class BinaryStructListSerializer implements BinaryFieldSerializer> { diff --git a/test/zutil/net/mqtt/packet/MqttPacketConnectTest.java b/test/zutil/net/mqtt/packet/MqttPacketConnectTest.java index c1a7823..33cbe51 100755 --- a/test/zutil/net/mqtt/packet/MqttPacketConnectTest.java +++ b/test/zutil/net/mqtt/packet/MqttPacketConnectTest.java @@ -54,12 +54,12 @@ public class MqttPacketConnectTest { 0b0000_0100, // Prot. Level - 0b0101_1010, // Flags + 0b0001_1010, // Flags 0b0000_0000, // Keep alive 0b0000_1010, // Keep alive // Payload - 0x00, 0x01, '5', // password + 0x00, 0x01, '1', // Client Identifier }; MqttPacketConnect obj = (MqttPacketConnect) MqttPacket.read( @@ -70,17 +70,17 @@ public class MqttPacketConnectTest { assertEquals(10, obj.keepAlive); assertFalse(obj.flagUsername); - assertTrue(obj.flagPassword); + assertFalse(obj.flagPassword); assertFalse(obj.flagWillRetain); assertEquals(3, obj.flagWillQoS); assertFalse(obj.flagWillFlag); assertTrue(obj.flagCleanSession); - assertNull(obj.clientIdentifier); + assertEquals("1", obj.clientIdentifier); assertNull(obj.willTopic); assertNull(null, obj.willPayload); assertNull(obj.username); - assertEquals("5", obj.password); + assertNull(obj.password); } @Test @@ -88,7 +88,7 @@ public class MqttPacketConnectTest { char[] data = new char[]{ // Fixed Header 0b0001_0000, // Packet Type + Reserved - 0b0000_1010, // Variable Header + Payload Length + 0xFF & 25, // Variable Header + Payload Length // Variable Header 0b0000_0000, // length 0b0000_0100, // length @@ -137,7 +137,7 @@ public class MqttPacketConnectTest { char[] data = new char[]{ // Fixed Header 0b0001_0000, // Packet Type + Reserved - 0b0000_1010, // Variable Header + Payload Length + 0xFF & 13, // Variable Header + Payload Length // Variable Header 0b0000_0000, // length 0b0000_0100, // length @@ -153,10 +153,10 @@ public class MqttPacketConnectTest { 0b0000_0000, // Keep alive 0b0000_1010, // Keep alive // Payload + 0x00, 0x01, '1', // Client Identifier }; MqttPacketConnect obj = new MqttPacketConnect(); - obj.variableHeaderAndPayloadLength = 10; obj.keepAlive = 10; obj.flagUsername = false; @@ -166,6 +166,8 @@ public class MqttPacketConnectTest { obj.flagWillFlag = false; obj.flagCleanSession = true; + obj.clientIdentifier = "1"; + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); BinaryStructOutputStream binOut = new BinaryStructOutputStream(buffer); MqttPacket.write(binOut, obj); @@ -201,7 +203,6 @@ public class MqttPacketConnectTest { }; MqttPacketConnect obj = new MqttPacketConnect(); - obj.variableHeaderAndPayloadLength = 10; obj.keepAlive = 10; obj.flagUsername = true; diff --git a/test/zutil/net/mqtt/packet/MqttPacketPublishTest.java b/test/zutil/net/mqtt/packet/MqttPacketPublishTest.java index f87f9f1..3660628 100644 --- a/test/zutil/net/mqtt/packet/MqttPacketPublishTest.java +++ b/test/zutil/net/mqtt/packet/MqttPacketPublishTest.java @@ -18,23 +18,21 @@ public class MqttPacketPublishTest { char[] data = new char[]{ // Fixed Header 0b0011_0000, // Packet Type(4) + Reserved(4) - 0xFF & 6, // Variable Header + Payload Length + 0xFF & 4, // Variable Header + Payload Length // Variable Header 0b0000_0000, // length 0xFF & 2, // length 'a', // Topic Name 'b', // Topic Name - 0b0000_0000, // Packet Identifier - 0xFF & 5, // Packet Identifier // Payload }; MqttPacketPublish obj = (MqttPacketPublish) MqttPacket.read( new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data)))); - assertEquals(6, obj.variableHeaderAndPayloadLength); + assertEquals(4, obj.variableHeaderAndPayloadLength); assertEquals("ab", obj.topicName); - assertEquals(5, obj.packetId); + assertEquals(0, obj.packetId); assertArrayEquals(new byte[0], obj.payload); } @@ -43,14 +41,12 @@ public class MqttPacketPublishTest { char[] data = new char[]{ // Fixed Header 0b0011_0000, // Packet Type(4) + Reserved(4) - 0xFF & 9, // Variable Header + Payload Length + 0xFF & 7, // Variable Header + Payload Length // Variable Header 0b0000_0000, // length 0xFF & 2, // length 'a', // Topic Name 'b', // Topic Name - 0b0000_0000, // Packet Identifier - 0xFF & 5, // Packet Identifier // Payload 0x00, 0x01, 0x02, }; @@ -58,9 +54,9 @@ public class MqttPacketPublishTest { MqttPacketPublish obj = (MqttPacketPublish) MqttPacket.read( new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data)))); - assertEquals(9, obj.variableHeaderAndPayloadLength); + assertEquals(7, obj.variableHeaderAndPayloadLength); assertEquals("ab", obj.topicName); - assertEquals(5, obj.packetId); + assertEquals(0, obj.packetId); assertArrayEquals(new byte[]{0x00, 0x01, 0x02}, obj.payload); } @@ -69,19 +65,16 @@ public class MqttPacketPublishTest { char[] data = new char[]{ // Fixed Header 0b0011_0000, // Packet Type(4) + Reserved(4) - 0xFF & 6, // Variable Header + Payload Length + 0xFF & 4, // Variable Header + Payload Length // Variable Header 0b0000_0000, // length 0xFF & 2, // length 'a', // Topic Name 'b', // Topic Name - 0b0000_0000, // Packet Identifier - 0xFF & 5, // Packet Identifier // Payload }; MqttPacketPublish obj = new MqttPacketPublish(); - obj.variableHeaderAndPayloadLength = 5; obj.topicName = "ab"; obj.packetId = 5;