Initial MQTT packet structure. issue 73

This commit is contained in:
Ziver Koc 2017-01-19 17:13:13 +01:00
parent 1d34a264b1
commit 4c210fc38d
19 changed files with 541 additions and 4 deletions

View file

@ -0,0 +1,71 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
/**
* This packet is the first message sent from a Client when it
* has established a connection to a Server. A Client can only
* send the CONNECT Packet once over a Network Connection.
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketConnect implements BinaryStruct {
// Variable header
@BinaryField(index = 2001, length = 16)
private int protocolNameLength = 4;
/**
* The Protocol Name is a UTF-8 encoded string that represents
* the protocol name MQTT, capitalized as shown. The string,
* its offset and length will not be changed by future versions
* of the MQTT specification.
*/
@BinaryField(index = 2002, length = 8*4)
public String protocolName = "MQTT";
/**
* The value represents the revision level of the protocol used by
* the Client. The value of the Protocol Level field for the version
* 3.1.1 of the protocol is 4 (0x04)
*/
@BinaryField(index = 2003, length = 8)
public int protocolLevel = 0x04;
/** Indicates that the payload contains a username */
@BinaryField(index = 2010, length = 1)
public boolean flagUsername;
/** Indicates that the payload contains a password */
@BinaryField(index = 2011, length = 1)
public boolean flagPassword;
/** Specifies if the Will Message is to be Retained when it is published. */
@BinaryField(index = 2012, length = 1)
public boolean flagWillRetain;
/** Specifies the QoS level to be used when publishing the Will Message. */
@BinaryField(index = 2013, length = 2)
public int flagWillQoS;
@BinaryField(index = 2014, length = 1)
public boolean flagWillFlag;
@BinaryField(index = 2015, length = 1)
/** This bit specifies the handling of the Session state. */
public boolean flagCleanSession;
@BinaryField(index = 2016, length = 1)
private boolean reserved;
/**
* The Keep Alive is a time interval measured in seconds and
* it is the maximum time interval that is permitted to elapse
* between the point at which the Client finishes transmitting one
* Control Packet and the point it starts sending the next.
* A Keep Alive value of zero (0) has the effect of turning off
* the keep alive mechanism.
*/
@BinaryField(index = 2020, length = 16)
public int keepAlive;
// Payload
}

View file

@ -0,0 +1,29 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
/**
* The CONNACK Packet is the packet sent by the Server in response to a
* CONNECT Packet received from a Client.
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketConnectAck implements BinaryStruct{
public static final int RETCODE_OK = 0;
public static final int RETCODE_PROT_VER_ERROR = 1;
public static final int RETCODE_IDENTIFIER_REJECT = 2;
public static final int RETCODE_SERVER_UNAVAILABLE = 3;
public static final int RETCODE_BADD_USER_OR_PASS = 4;
public static final int RETCODE_NOT_AUTHORIZED = 5;
@BinaryField(index = 2000, length = 7)
private int flagReserved;
/** Indicates that there is a valid Session available */
@BinaryField(index = 2001, length = 1)
public int flagSessionPresent;
@BinaryField(index = 2002, length = 8)
public int returnCode;
}

View file

@ -0,0 +1,14 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
/**
* The DISCONNECT Packet is the final Control Packet sent from the Client to
* the Server. It indicates that the Client is disconnecting cleanly.
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketDisconnect implements BinaryStruct{
}

View file

@ -0,0 +1,36 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
/**
*
*/
public class MqttPacketHeader implements BinaryStruct {
// RESERVED = 0;
public static final int PACKET_TYPE_CONNACK = 2;
public static final int PACKET_TYPE_PUBLISH = 3;
public static final int PACKET_TYPE_PUBACK = 4;
public static final int PACKET_TYPE_PUBREC = 5;
public static final int PACKET_TYPE_PUBREL = 6;
public static final int PACKET_TYPE_PUBCOMP = 7;
public static final int PACKET_TYPE_SUBSCRIBE = 8;
public static final int PACKET_TYPE_SUBACK = 9;
public static final int PACKET_TYPE_UNSUBSCRIBE = 10;
public static final int PACKET_TYPE_UNSUBACK = 11;
public static final int PACKET_TYPE_PINGREQ = 12;
public static final int PACKET_TYPE_PINGRESP = 13;
public static final int PACKET_TYPE_DISCONNECT = 14;
// RESERVED = 15
@BinaryField(index = 1, length = 4)
private byte type;
@BinaryField(index = 2, length = 4)
private byte flags;
@CustomBinaryField(index = 3, serializer = MqttVariableIntSerializer.class)
private int length;
}

View file

@ -0,0 +1,13 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
/**
* The PINGREQ Packet is used in Keep Alive processing
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketPingReq implements BinaryStruct{
}

View file

@ -0,0 +1,14 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
/**
* A PINGRESP Packet is sent by the Server to the Client in response to a
* PINGREQ Packet. It indicates that the Server is alive.
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketPingResp implements BinaryStruct{
}

View file

@ -0,0 +1,39 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
/**
* A PUBLISH Control Packet is sent from a Client to a Server
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketPublish implements BinaryStruct{
// Static Header
/*
@BinaryField(index = 2000, length = 1)
private int flagDup;
@BinaryField(index = 2001, length = 2)
private int flagQoS;
@BinaryField(index = 2002, length = 1)
private int flagRetain;
@CustomBinaryField(index = 3, serializer = MqttVariableIntSerializer.class)
private int length;
*/
// Variable Header
@BinaryField(index = 2101, length = 16)
private int topicNameLength;
/** The Topic Name identifies the information channel to which payload data is published. */
@VariableLengthBinaryField(index = 2102, lengthField = "topicNameLength")
public String topicName;
@BinaryField(index = 2102, length = 16)
public int packetId;
// Optional Payload
}

View file

@ -0,0 +1,18 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
/**
* A PUBACK Packet is the response to a PUBLISH Packet with QoS level 1.
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketPublishAck implements BinaryStruct{
// Variable Header
@BinaryField(index = 2000, length = 16)
public int packetId;
}

View file

@ -0,0 +1,20 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
/**
* Publish complete.
* The PUBCOMP Packet is the response to a PUBREL Packet. It is the fourth and
* final packet of the QoS 2 protocol exchange.
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketPublishComp implements BinaryStruct{
// Variable Header
@BinaryField(index = 2000, length = 16)
public int packetId;
}

View file

@ -0,0 +1,20 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
/**
* Publish received.
* A PUBREC Packet is the response to a PUBLISH Packet with QoS 2. It is the
* second packet of the QoS 2 protocol exchange.
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketPublishRec implements BinaryStruct{
// Variable Header
@BinaryField(index = 2000, length = 16)
public int packetId;
}

View file

@ -0,0 +1,20 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
/**
* Publish release.
* A PUBREL Packet is the response to a PUBREC Packet. It is the third packet
* of the QoS 2 protocol exchange.
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketPublishRel implements BinaryStruct{
// Variable Header
@BinaryField(index = 2000, length = 16)
public int packetId;
}

View file

@ -0,0 +1,39 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
import java.util.List;
/**
* The SUBSCRIBE Packet is sent from the Client to the Server to create one or more Subscriptions.
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketSubscribe implements BinaryStruct{
// Variable Header
@BinaryField(index = 2000, length = 16)
public int packetId;
// Payload
public List<MqttSubscribePayload> payload;
public static class MqttSubscribePayload implements BinaryStruct{
@BinaryField(index = 3001, length = 16)
private int topicFilterLength;
/** A filter indicating the Topic to which the Client wants to subscribe to*/
@VariableLengthBinaryField(index = 3002, lengthField = "topicFilterLength")
public String topicFilter;
@BinaryField(index = 3003, length = 6)
private int reserved;
@BinaryField(index = 3004, length = 2)
private int qos;
}
}

View file

@ -0,0 +1,36 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
import java.util.List;
/**
* A SUBACK Packet is sent by the Server to the Client to confirm receipt
* and processing of a SUBSCRIBE Packet.
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketSubscribeAck implements BinaryStruct{
// Variable Header
@BinaryField(index = 2000, length = 16)
public int packetId;
// Payload
public List<MqttSubscribeAckPayload> payload;
public static class MqttSubscribeAckPayload implements BinaryStruct{
public static final int RETCODE_SUCESS_MAX_QOS_0 = 0;
public static final int RETCODE_SUCESS_MAX_QOS_1 = 1;
public static final int RETCODE_SUCESS_MAX_QOS_2 = 2;
public static final int RETCODE_FAILURE = 0x80;
@BinaryField(index = 3001, length = 8)
public int returnCode;
}
}

View file

@ -0,0 +1,35 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
import java.util.List;
/**
* An UNSUBSCRIBE Packet is sent by the Client to the Server, to unsubscribe from topics.
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketUnsubscribe implements BinaryStruct{
// Variable Header
@BinaryField(index = 2000, length = 16)
public int packetId;
// Payload
public List<MqttUnsubscribePayload> payload;
public static class MqttUnsubscribePayload implements BinaryStruct{
@BinaryField(index = 3001, length = 16)
private int topicFilterLength;
/** A filter indicating the Topic to which the Client wants to subscribe to*/
@VariableLengthBinaryField(index = 3002, lengthField = "topicFilterLength")
public String topicFilter;
}
}

View file

@ -0,0 +1,20 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
import java.util.List;
/**
* The UNSUBACK Packet is sent by the Server to the Client to confirm receipt
* of an UNSUBSCRIBE Packet.
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketUnsubscribeAck implements BinaryStruct{
// Variable Header
@BinaryField(index = 2000, length = 16)
public int packetId;
}

View file

@ -0,0 +1,43 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryFieldData;
import zutil.parser.binary.BinaryFieldSerializer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* Code from MQTT specification
*/
public class MqttVariableIntSerializer implements BinaryFieldSerializer<Integer>{
@Override
public Integer read(InputStream in, BinaryFieldData field) throws IOException {
int multiplier = 1;
int value = 0;
int encodedByte;
do {
encodedByte = in.read();
value += (encodedByte & 127) * multiplier;
if (multiplier > 128 * 128 * 128)
throw new IOException("Malformed Remaining Length");
multiplier *= 128;
} while ((encodedByte & 128) != 0);
return value;
}
@Override
public void write(OutputStream out, Integer obj, BinaryFieldData field) throws IOException {
int x = obj;
int encodedByte;
do {
encodedByte = x % 128;
x = x / 128;
// if there are more data to encode, set the top bit of this byte
if (x > 0)
encodedByte = encodedByte & 128;
out.write(encodedByte);
} while ( x > 0 );
}
}

View file

@ -101,7 +101,7 @@ public class BinaryFieldData {
else if (field.getType() == String.class)
field.set(obj, new String(ByteUtil.getReverseByteOrder(data), StandardCharsets.ISO_8859_1));
else
throw new UnsupportedOperationException("Unsupported BinaryStruct field class: "+ field.getClass());
throw new UnsupportedOperationException("Unsupported BinaryStruct field class: "+ field.getType());
} catch (IllegalAccessException e){
e.printStackTrace();
}

View file

@ -30,6 +30,9 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* A interface that indicate that the implementing class can
* be serialized into a linear binary stream.
*
* Created by Ziver on 2016-01-28.
*/
public interface BinaryStruct {
@ -47,7 +50,9 @@ public interface BinaryStruct {
}
/**
* Can be used for fields that are of variable length.
* Can be used for fields that are of variable length. This interface
* is only applicable for reading from a stream.
* TODO: Length must be manually set when writing.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@ -56,8 +61,8 @@ public interface BinaryStruct {
int index();
/** The name of the field that will contain the length of the data to read. */
String lengthField();
/** Defines the multiplier used on the lengthField to convert to length in bits which is used internally.
* Default value is 8. */
/** Defines the multiplier used on the lengthField parameter to convert the length in bits to
* a user defined value. Default value is 8 (which converts length to nr of bytes). */
int multiplier() default 8;
}

View file

@ -0,0 +1,65 @@
package zutil.net.mqtt.packet;
import org.junit.Test;
import zutil.converter.Converter;
import zutil.net.mqtt.packet.MqttPacketConnect;
import zutil.parser.binary.BinaryStructInputStream;
import zutil.parser.binary.BinaryStructOutputStream;
import java.io.IOException;
import static org.junit.Assert.*;
public class MqttPacketConnectTest {
char[] data = new char[]{
0b0000_0000, // length
0b0000_0100, // length
0b0100_1101, // 'M'
0b0101_0001, // 'Q'
0b0101_0100, // 'T'
0b0101_0100, // 'T'
0b0000_0100, // Prot. Level
0b1100_1110, // Flags
0b0000_0000, // Keep alive
0b0000_1010, // Keep alive
};
@Test
public void decode(){
MqttPacketConnect obj = new MqttPacketConnect();
BinaryStructInputStream.read(obj, Converter.toBytes(data));
assertEquals("MQTT", obj.protocolName);
assertEquals(4, obj.protocolLevel);
assertEquals(10, obj.keepAlive);
assertTrue(obj.flagUsername);
assertTrue(obj.flagPassword);
assertFalse(obj.flagWillRetain);
assertEquals(1, obj.flagWillQoS);
assertTrue(obj.flagWillFlag);
assertTrue(obj.flagCleanSession);
}
@Test
public void encode() throws IOException {
MqttPacketConnect obj = new MqttPacketConnect();
obj.keepAlive = 10;
obj.flagUsername = true;
obj.flagPassword = true;
obj.flagWillRetain = false;
obj.flagWillQoS = 1;
obj.flagWillFlag = true;
obj.flagCleanSession = true;
assertArrayEquals(Converter.toBytes(data),
BinaryStructOutputStream.serialize(obj));
}
}