Fixed failing MqTT TC

This commit is contained in:
Ziver Koc 2017-08-04 16:11:02 +02:00
parent fc1e650002
commit 86ce0990c6
16 changed files with 56 additions and 52 deletions

View file

@ -53,11 +53,11 @@ public class MqttBroker extends ThreadedTCPNetworkServer{
public void run() {
try {
// Setup connection
MqttPacket packet = MqttPacket.read(in);
MqttPacketHeader packet = MqttPacket.read(in);
// Unexpected packet?
if ( ! (packet.header instanceof MqttPacketConnect))
throw new IOException("Expected MqttPacketConnect but received "+packet.header.getClass());
MqttPacketConnect conn = (MqttPacketConnect) packet.header;
if ( ! (packet instanceof MqttPacketConnect))
throw new IOException("Expected MqttPacketConnect but received "+packet.getClass());
MqttPacketConnect conn = (MqttPacketConnect) packet;
// Reply
MqttPacketConnectAck connack = new MqttPacketConnectAck();
@ -78,7 +78,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer{
if (packet == null)
return;
switch (packet.header.type){
switch (packet.type){
// Publish
case MqttPacketHeader.PACKET_TYPE_PUBLISH:
break;
@ -94,7 +94,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer{
break;
// Close connection
default:
logger.warning("Received unknown packet type: "+packet.header.type);
logger.warning("Received unknown packet type: "+packet.type);
case MqttPacketHeader.PACKET_TYPE_DISCONNECT:
return;
}

View file

@ -1,7 +1,6 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
import zutil.parser.binary.BinaryStructInputStream;
import zutil.parser.binary.BinaryStructOutputStream;
@ -14,40 +13,35 @@ import static zutil.net.mqtt.packet.MqttPacketHeader.*;
*/
public class MqttPacket {
public MqttPacketHeader header;
private MqttPacket() {}
public static MqttPacket read(BinaryStructInputStream in) throws IOException {
MqttPacket packet = new MqttPacket();
public static MqttPacketHeader read(BinaryStructInputStream in) throws IOException {
MqttPacketHeader packet = new MqttPacketHeader();
// Peek into stream and find packet type
in.mark(10);
packet.header = new MqttPacketHeader();
in.read(packet.header);
in.read(packet);
in.reset();
// Resolve the correct header class
switch (packet.header.type){
case PACKET_TYPE_CONN: packet.header = new MqttPacketConnect(); break;
case PACKET_TYPE_CONNACK: packet.header = new MqttPacketConnectAck(); break;
case PACKET_TYPE_PUBLISH: packet.header = new MqttPacketPublish(); break;
case PACKET_TYPE_PUBACK: packet.header = new MqttPacketPublishAck(); break;
case PACKET_TYPE_PUBREC: packet.header = new MqttPacketPublishRec(); break;
case PACKET_TYPE_PUBREL: packet.header = new MqttPacketPublishRec(); break;
case PACKET_TYPE_PUBCOMP: packet.header = new MqttPacketPublishComp(); break;
case PACKET_TYPE_SUBSCRIBE: packet.header = new MqttPacketSubscribe(); break;
case PACKET_TYPE_SUBACK: packet.header = new MqttPacketSubscribeAck(); break;
case PACKET_TYPE_UNSUBSCRIBE: packet.header = new MqttPacketUnsubscribe(); break;
case PACKET_TYPE_UNSUBACK: packet.header = new MqttPacketUnsubscribeAck(); break;
case PACKET_TYPE_PINGREQ: packet.header = new MqttPacketPingReq(); break;
case PACKET_TYPE_PINGRESP: packet.header = new MqttPacketPingResp(); break;
case PACKET_TYPE_DISCONNECT: packet.header = new MqttPacketDisconnect(); break;
switch (packet.type){
case PACKET_TYPE_CONN: packet = new MqttPacketConnect(); break;
case PACKET_TYPE_CONNACK: packet = new MqttPacketConnectAck(); break;
case PACKET_TYPE_PUBLISH: packet = new MqttPacketPublish(); break;
case PACKET_TYPE_PUBACK: packet = new MqttPacketPublishAck(); break;
case PACKET_TYPE_PUBREC: packet = new MqttPacketPublishRec(); break;
case PACKET_TYPE_PUBREL: packet = new MqttPacketPublishRec(); break;
case PACKET_TYPE_PUBCOMP: packet = new MqttPacketPublishComp(); break;
case PACKET_TYPE_SUBSCRIBE: packet = new MqttPacketSubscribe(); break;
case PACKET_TYPE_SUBACK: packet = new MqttPacketSubscribeAck(); break;
case PACKET_TYPE_UNSUBSCRIBE: packet = new MqttPacketUnsubscribe(); break;
case PACKET_TYPE_UNSUBACK: packet = new MqttPacketUnsubscribeAck(); break;
case PACKET_TYPE_PINGREQ: packet = new MqttPacketPingReq(); break;
case PACKET_TYPE_PINGRESP: packet = new MqttPacketPingResp(); break;
case PACKET_TYPE_DISCONNECT: packet = new MqttPacketDisconnect(); break;
default:
throw new IOException("Unknown header type: "+ packet.header.type);
throw new IOException("Unknown header type: "+ packet.type);
}
in.read(packet.header);
in.read(packet);
// TODO: payload
return packet;

View file

@ -29,4 +29,5 @@ public class MqttPacketConnectAck extends MqttPacketHeader{
public int returnCode;
// No payload
}

View file

@ -1,6 +1,5 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
/**
* The DISCONNECT Packet is the final Control Packet sent from the Client to
@ -13,4 +12,5 @@ public class MqttPacketDisconnect extends MqttPacketHeader{
// No variable header
// No payload
}

View file

@ -12,4 +12,5 @@ public class MqttPacketPingReq extends MqttPacketHeader{
// No variable header
// No payload
}

View file

@ -1,6 +1,5 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
/**
* A PINGRESP Packet is sent by the Server to the Client in response to a
@ -13,4 +12,6 @@ public class MqttPacketPingResp extends MqttPacketHeader{
// No variable header
// No payload
}

View file

@ -1,13 +1,12 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
/**
* A PUBLISH Control Packet is sent from a Client to a Server
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketPublish extends MqttPacketHeader{
public class MqttPacketPublish extends MqttPacketHeader {
// Static Header
/*
@ -26,7 +25,9 @@ public class MqttPacketPublish extends MqttPacketHeader{
@BinaryField(index = 2101, length = 16)
private int topicNameLength;
/** The Topic Name identifies the information channel to which controlHeader data is published. */
/**
* The Topic Name identifies the information channel to which controlHeader data is published.
*/
@VariableLengthBinaryField(index = 2102, lengthField = "topicNameLength")
public String topicName;

View file

@ -1,6 +1,5 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
/**
* A PUBACK Packet is the response to a PUBLISH Packet with QoS level 1.
@ -15,4 +14,5 @@ public class MqttPacketPublishAck extends MqttPacketHeader{
public int packetId;
// No payload
}

View file

@ -1,6 +1,5 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
/**
* Publish complete.
@ -17,4 +16,5 @@ public class MqttPacketPublishComp extends MqttPacketHeader{
public int packetId;
// No payload
}

View file

@ -1,6 +1,5 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
/**
* Publish received.
@ -17,4 +16,5 @@ public class MqttPacketPublishRec extends MqttPacketHeader{
public int packetId;
// No payload
}

View file

@ -1,6 +1,5 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
/**
* Publish release.
@ -17,4 +16,5 @@ public class MqttPacketPublishRel extends MqttPacketHeader{
public int packetId;
// No Payload
}

View file

@ -22,7 +22,6 @@ public class MqttPacketSubscribe extends MqttPacketHeader{
public static class MqttSubscribePayload implements BinaryStruct{
@BinaryField(index = 3001, length = 16)

View file

@ -23,7 +23,6 @@ public class MqttPacketSubscribeAck extends MqttPacketHeader{
public static class MqttSubscribeAckPayload implements BinaryStruct{
public static final int RETCODE_SUCESS_MAX_QOS_0 = 0;
public static final int RETCODE_SUCESS_MAX_QOS_1 = 1;

View file

@ -22,7 +22,6 @@ public class MqttPacketUnsubscribe extends MqttPacketHeader{
public static class MqttUnsubscribePayload implements BinaryStruct{
@BinaryField(index = 3001, length = 16)

View file

@ -1,8 +1,5 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
import java.util.List;
/**
* The UNSUBACK Packet is sent by the Server to the Client to confirm receipt
@ -18,4 +15,5 @@ public class MqttPacketUnsubscribeAck extends MqttPacketHeader{
public int packetId;
// No payload
}

View file

@ -6,6 +6,8 @@ import zutil.net.mqtt.packet.MqttPacketConnect;
import zutil.parser.binary.BinaryStructInputStream;
import zutil.parser.binary.BinaryStructOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import static org.junit.Assert.*;
@ -14,6 +16,10 @@ import static org.junit.Assert.*;
public class MqttPacketConnectTest {
char[] data = new char[]{
// Fixed Header
0b0001_0000, // Packet Type + Reserved
0b0000_1010, // Variable Header + Payload Length
// Variable Header
0b0000_0000, // length
0b0000_0100, // length
0b0100_1101, // 'M'
@ -31,9 +37,11 @@ public class MqttPacketConnectTest {
@Test
public void decode(){
MqttPacketConnect obj = new MqttPacketConnect();
BinaryStructInputStream.read(obj, Converter.toBytes(data));
public void decode() throws IOException {
MqttPacketConnect obj = (MqttPacketConnect)MqttPacket.read(
new BinaryStructInputStream(
new ByteArrayInputStream(
Converter.toBytes(data))));
assertEquals("MQTT", obj.protocolName);
assertEquals(4, obj.protocolLevel);
@ -50,6 +58,7 @@ public class MqttPacketConnectTest {
@Test
public void encode() throws IOException {
MqttPacketConnect obj = new MqttPacketConnect();
obj.payloadLength = 10;
obj.keepAlive = 10;
obj.flagUsername = true;
@ -59,7 +68,9 @@ public class MqttPacketConnectTest {
obj.flagWillFlag = true;
obj.flagCleanSession = true;
assertArrayEquals(Converter.toBytes(data),
BinaryStructOutputStream.serialize(obj));
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
BinaryStructOutputStream binOut = new BinaryStructOutputStream(buffer);
MqttPacket.write(binOut, obj);
assertArrayEquals(Converter.toBytes(data), buffer.toByteArray());
}
}