Fixed bugs in MQTT and updated tests

This commit is contained in:
Ziver Koc 2023-03-19 23:41:12 +01:00
parent 97df7f4304
commit 214bf0f40d
7 changed files with 66 additions and 38 deletions

View file

@ -317,7 +317,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
// Close connection
default:
logger.warning("Received unknown packet type: " + packet.type);
logger.warning("Received unknown packet type: " + packet.type + " (" + packet.getClass() + ")");
sendWillPacket();
/* FALLTHROUGH */
case MqttPacketHeader.PACKET_TYPE_DISCONNECT:

View file

@ -63,12 +63,11 @@ public class MqttPacket {
case PACKET_TYPE_PINGRESP: packet = new MqttPacketPingResp(); break; // no payload
case PACKET_TYPE_DISCONNECT: packet = new MqttPacketDisconnect(); break; // no payload
default:
throw new IOException("Unknown header type: " + packet.type);
throw new IOException("Unknown packet type in header: " + packet.type);
}
// Read header and payload
in.read(packet);
// TODO: payload
byte[] payload = new byte[Math.max(0, packet.variableHeaderAndPayloadLength - packet.calculateVariableHeaderLength())];
in.read(payload);
return packet;
}

View file

@ -138,12 +138,9 @@ public class MqttPacketConnect extends MqttPacketHeader {
@Override
public int calculatePayloadLength() {
int length = 0;
// Each String and byte[] is prefixed with a 2 byte length value in the payload
if (!flagCleanSession)
length += 2 + clientIdentifier.length();
int length = 2 + (clientIdentifier != null ? clientIdentifier.length() : 0);
if (flagWillFlag)
length += 2 + willTopic.length() + 2 + willPayload.length;
if (flagUsername)
@ -165,7 +162,7 @@ public class MqttPacketConnect extends MqttPacketHeader {
MqttPacketConnect packet = (MqttPacketConnect) parentObject;
TwoByteLengthPrefixedDataSerializer serializer = new TwoByteLengthPrefixedDataSerializer();
if ("clientIdentifier".equals(field.getName()) && !packet.flagCleanSession ||
if ("clientIdentifier".equals(field.getName()) ||
"willTopic".equals(field.getName()) && packet.flagWillFlag ||
"willPayload".equals(field.getName()) && packet.flagWillFlag ||
"username".equals(field.getName()) && packet.flagUsername ||
@ -181,7 +178,7 @@ public class MqttPacketConnect extends MqttPacketHeader {
MqttPacketConnect packet = (MqttPacketConnect) parentObject;
TwoByteLengthPrefixedDataSerializer serializer = new TwoByteLengthPrefixedDataSerializer();
if ("clientIdentifier".equals(field.getName()) && !packet.flagCleanSession ||
if ("clientIdentifier".equals(field.getName()) ||
"willTopic".equals(field.getName()) && packet.flagWillFlag ||
"willPayload".equals(field.getName()) && packet.flagWillFlag ||
"username".equals(field.getName()) && packet.flagUsername ||

View file

@ -26,6 +26,7 @@ package zutil.net.mqtt.packet;
import zutil.ByteUtil;
import zutil.converter.Converter;
import zutil.parser.binary.BinaryFieldData;
import zutil.parser.binary.BinaryFieldSerializer;
import zutil.parser.binary.serializer.TwoByteLengthPrefixedDataSerializer;
@ -64,13 +65,16 @@ public class MqttPacketPublish extends MqttPacketHeader {
@CustomBinaryField(index = 2002, serializer = TwoByteLengthPrefixedDataSerializer.class)
public String topicName;
@BinaryField(index = 2003, length = 16)
/**
* A unique identity of this packet. Only available if QOS is above 0.
*/
@CustomBinaryField(index = 2003, serializer = MqttPacketPublishPacketIdSerializer.class)
public int packetId;
@Override
public int calculateVariableHeaderLength() {
return 4 + (topicName != null ? topicName.length() : 0);
return 2 + (topicName != null ? topicName.length() : 0) + (getFlagQoS() > 0 ? 2 : 0);
}
// ------------------------------------------
@ -114,7 +118,8 @@ public class MqttPacketPublish extends MqttPacketHeader {
@Override
public byte[] read(InputStream in, BinaryFieldData field, Object parentObject) throws IOException {
MqttPacketPublish publishPacket = (MqttPacketPublish) parentObject;
int payloadLength = Math.max(0, publishPacket.variableHeaderAndPayloadLength - publishPacket.calculateVariableHeaderLength());
int variableLength = publishPacket.calculateVariableHeaderLength();
int payloadLength = Math.max(0, publishPacket.variableHeaderAndPayloadLength - variableLength);
byte[] payload = new byte[payloadLength];
in.read(payload);
@ -127,4 +132,37 @@ public class MqttPacketPublish extends MqttPacketHeader {
out.write(obj);
}
}
/**
* Only read and write Packet Identifier if QOS is above 0
*/
private static class MqttPacketPublishPacketIdSerializer implements BinaryFieldSerializer {
@Override
public Object read(InputStream in, BinaryFieldData field, Object parentObject) throws IOException {
MqttPacketPublish publish = (MqttPacketPublish) parentObject;
if (0 < publish.getFlagQoS()) {
byte[] b = new byte[2];
in.read(b);
return Converter.toInt(b);
}
return 0;
}
@Override
public Object read(InputStream in, BinaryFieldData field) throws IOException {return null;}
@Override
public void write(OutputStream out, Object obj, BinaryFieldData field, Object parentObject) throws IOException {
MqttPacketPublish publish = (MqttPacketPublish) parentObject;
if (0 < publish.getFlagQoS()) {
byte[] b = Converter.toBytes((int) obj);
out.write(b[1]);
out.write(b[0]);
}
}
@Override
public void write(OutputStream out, Object obj, BinaryFieldData field) throws IOException {}
}
}

View file

@ -14,7 +14,7 @@ import java.util.List;
* This class needs to be extended by a more specific subclass that can provide
* the list class type and handling the flow control of the read action.
*
* @param <T>
* @param <T> defines the class type of the items in the List.
*/
public abstract class BinaryStructListSerializer<T extends BinaryStruct> implements BinaryFieldSerializer<List<T>> {

View file

@ -54,12 +54,12 @@ public class MqttPacketConnectTest {
0b0000_0100, // Prot. Level
0b0101_1010, // Flags
0b0001_1010, // Flags
0b0000_0000, // Keep alive
0b0000_1010, // Keep alive
// Payload
0x00, 0x01, '5', // password
0x00, 0x01, '1', // Client Identifier
};
MqttPacketConnect obj = (MqttPacketConnect) MqttPacket.read(
@ -70,17 +70,17 @@ public class MqttPacketConnectTest {
assertEquals(10, obj.keepAlive);
assertFalse(obj.flagUsername);
assertTrue(obj.flagPassword);
assertFalse(obj.flagPassword);
assertFalse(obj.flagWillRetain);
assertEquals(3, obj.flagWillQoS);
assertFalse(obj.flagWillFlag);
assertTrue(obj.flagCleanSession);
assertNull(obj.clientIdentifier);
assertEquals("1", obj.clientIdentifier);
assertNull(obj.willTopic);
assertNull(null, obj.willPayload);
assertNull(obj.username);
assertEquals("5", obj.password);
assertNull(obj.password);
}
@Test
@ -88,7 +88,7 @@ public class MqttPacketConnectTest {
char[] data = new char[]{
// Fixed Header
0b0001_0000, // Packet Type + Reserved
0b0000_1010, // Variable Header + Payload Length
0xFF & 25, // Variable Header + Payload Length
// Variable Header
0b0000_0000, // length
0b0000_0100, // length
@ -137,7 +137,7 @@ public class MqttPacketConnectTest {
char[] data = new char[]{
// Fixed Header
0b0001_0000, // Packet Type + Reserved
0b0000_1010, // Variable Header + Payload Length
0xFF & 13, // Variable Header + Payload Length
// Variable Header
0b0000_0000, // length
0b0000_0100, // length
@ -153,10 +153,10 @@ public class MqttPacketConnectTest {
0b0000_0000, // Keep alive
0b0000_1010, // Keep alive
// Payload
0x00, 0x01, '1', // Client Identifier
};
MqttPacketConnect obj = new MqttPacketConnect();
obj.variableHeaderAndPayloadLength = 10;
obj.keepAlive = 10;
obj.flagUsername = false;
@ -166,6 +166,8 @@ public class MqttPacketConnectTest {
obj.flagWillFlag = false;
obj.flagCleanSession = true;
obj.clientIdentifier = "1";
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
BinaryStructOutputStream binOut = new BinaryStructOutputStream(buffer);
MqttPacket.write(binOut, obj);
@ -201,7 +203,6 @@ public class MqttPacketConnectTest {
};
MqttPacketConnect obj = new MqttPacketConnect();
obj.variableHeaderAndPayloadLength = 10;
obj.keepAlive = 10;
obj.flagUsername = true;

View file

@ -18,23 +18,21 @@ public class MqttPacketPublishTest {
char[] data = new char[]{
// Fixed Header
0b0011_0000, // Packet Type(4) + Reserved(4)
0xFF & 6, // Variable Header + Payload Length
0xFF & 4, // Variable Header + Payload Length
// Variable Header
0b0000_0000, // length
0xFF & 2, // length
'a', // Topic Name
'b', // Topic Name
0b0000_0000, // Packet Identifier
0xFF & 5, // Packet Identifier
// Payload
};
MqttPacketPublish obj = (MqttPacketPublish) MqttPacket.read(
new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data))));
assertEquals(6, obj.variableHeaderAndPayloadLength);
assertEquals(4, obj.variableHeaderAndPayloadLength);
assertEquals("ab", obj.topicName);
assertEquals(5, obj.packetId);
assertEquals(0, obj.packetId);
assertArrayEquals(new byte[0], obj.payload);
}
@ -43,14 +41,12 @@ public class MqttPacketPublishTest {
char[] data = new char[]{
// Fixed Header
0b0011_0000, // Packet Type(4) + Reserved(4)
0xFF & 9, // Variable Header + Payload Length
0xFF & 7, // Variable Header + Payload Length
// Variable Header
0b0000_0000, // length
0xFF & 2, // length
'a', // Topic Name
'b', // Topic Name
0b0000_0000, // Packet Identifier
0xFF & 5, // Packet Identifier
// Payload
0x00, 0x01, 0x02,
};
@ -58,9 +54,9 @@ public class MqttPacketPublishTest {
MqttPacketPublish obj = (MqttPacketPublish) MqttPacket.read(
new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data))));
assertEquals(9, obj.variableHeaderAndPayloadLength);
assertEquals(7, obj.variableHeaderAndPayloadLength);
assertEquals("ab", obj.topicName);
assertEquals(5, obj.packetId);
assertEquals(0, obj.packetId);
assertArrayEquals(new byte[]{0x00, 0x01, 0x02}, obj.payload);
}
@ -69,19 +65,16 @@ public class MqttPacketPublishTest {
char[] data = new char[]{
// Fixed Header
0b0011_0000, // Packet Type(4) + Reserved(4)
0xFF & 6, // Variable Header + Payload Length
0xFF & 4, // Variable Header + Payload Length
// Variable Header
0b0000_0000, // length
0xFF & 2, // length
'a', // Topic Name
'b', // Topic Name
0b0000_0000, // Packet Identifier
0xFF & 5, // Packet Identifier
// Payload
};
MqttPacketPublish obj = new MqttPacketPublish();
obj.variableHeaderAndPayloadLength = 5;
obj.topicName = "ab";
obj.packetId = 5;