Implemented payload handling for MQTT messages and introduced tests for these

This commit is contained in:
Ziver Koc 2023-03-18 04:10:08 +01:00
parent fcbb2ef227
commit d80b6a69e5
29 changed files with 1118 additions and 145 deletions

View file

@ -28,6 +28,14 @@ public class PositionalInputStream extends FilterInputStream {
return pos;
}
/**
* Method will reset the position to 0.
*/
public synchronized void resetPosition() {
pos = 0;
mark = 0;
}
@Override
public int read() throws IOException {
@ -72,11 +80,10 @@ public class PositionalInputStream extends FilterInputStream {
public synchronized void reset() throws IOException {
super.reset();
synchronized(this) {
// Only update the position if mark is supported,
// as reset succeeds even if mark is not supported.
if (markSupported())
pos = mark;
}
// Only update the position if mark is supported,
// as reset succeeds even if mark is not supported.
if (markSupported())
pos = mark;
}
}

View file

@ -34,7 +34,9 @@ import zutil.net.threaded.ThreadedTCPNetworkServerThread;
import zutil.parser.binary.BinaryStructInputStream;
import zutil.parser.binary.BinaryStructOutputStream;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.util.*;
import java.util.logging.Level;
@ -55,10 +57,16 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
private MqttSubscriptionListener globalListener;
private Map<String, List<MqttSubscriptionListener>> subscriptionListeners = new HashMap<>();
public MqttBroker() throws IOException {
super(MQTT_PORT);
}
public MqttBroker(int port) throws IOException {
super(port);
}
@Override
protected ThreadedTCPNetworkServerThread getThreadInstance(Socket s) throws IOException {
return new MqttConnectionThread(this, s);
@ -108,19 +116,16 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
* Publish data to the specific topic
*/
public void publish(String topic, byte[] data) {
if (!subscriptionListeners.containsKey(topic)) {
logger.fine("Data was published to topic (" + topic + ") with no subscribers.");
return;
}
logger.finer("Data has been published to topic (" + topic + ")");
List<MqttSubscriptionListener> topicSubscriptions = subscriptionListeners.get(topic);
if (globalListener != null)
globalListener.dataPublished(topic, data);
for (MqttSubscriptionListener subscriber : topicSubscriptions) {
subscriber.dataPublished(topic, data);
List<MqttSubscriptionListener> topicSubscriptions = subscriptionListeners.get(topic);
if (topicSubscriptions != null) {
for (MqttSubscriptionListener subscriber : topicSubscriptions) {
subscriber.dataPublished(topic, data);
}
}
}
@ -166,6 +171,10 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
private BinaryStructOutputStream out;
private boolean disconnected = false;
/** A message that should be sent in case the connection to client is abnormally disconnected */
private MqttPacketHeader willPacket = null;
/** The maximum amount of time(seconds) to wait for activity from client, 0 means no timeout */
private int connectionTimeoutTime = 0;
/**
* Test constructor
@ -177,7 +186,12 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
public MqttConnectionThread(MqttBroker b, Socket s) throws IOException {
this(b);
socket = s;
in = new BinaryStructInputStream(socket.getInputStream());
InputStream baseInputstream = socket.getInputStream();
if (!baseInputstream.markSupported())
baseInputstream = new BufferedInputStream(baseInputstream);
in = new BinaryStructInputStream(baseInputstream);
out = new BinaryStructOutputStream(socket.getOutputStream());
}
@ -203,6 +217,8 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
logger.log(Level.SEVERE, null, e);
} finally {
try {
sendWillPacket();
socket.close();
broker.unsubscribe(this);
} catch (IOException e) {
@ -220,6 +236,10 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
// Reply
MqttPacketConnectAck connectAck = new MqttPacketConnectAck();
// ----------------------------------
// Handling Header
// ----------------------------------
// Incorrect protocol version?
if (conn.protocolLevel != MQTT_PROTOCOL_VERSION) {
connectAck.returnCode = MqttPacketConnectAck.RETCODE_PROT_VER_ERROR;
@ -229,14 +249,53 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
connectAck.returnCode = MqttPacketConnectAck.RETCODE_OK;
}
// Is reserved field properly set? should be false
if (conn.flagReserved) {
disconnected = true;
return;
}
// Handle Session
if (conn.flagCleanSession) {
// TODO: Remove session
connectAck.flagSessionPresent = false;
} else {
// TODO: Restore or create new session
throw new UnsupportedOperationException("Sessions currently not supported.");
}
// Handle will message
if (conn.flagWillFlag) {
// TODO: Read will message from payload
//willPacket = xxx
//throw new UnsupportedOperationException("Will packet currently not supported.");
} else {
willPacket = null;
}
// TODO: authenticate
// TODO: clean session
if (conn.flagUsername) {
String username;
if (conn.flagPassword) {
String password;
}
}
connectionTimeoutTime = conn.keepAlive;
// ----------------------------------
// Handling Payload
// ----------------------------------
sendPacket(connectAck);
}
protected void handlePacket(MqttPacketHeader packet) throws IOException {
// TODO: QOS 1
// TODO: QOS 2
// TODO: handle connection timeout
switch (packet.type) {
case MqttPacketHeader.PACKET_TYPE_PUBLISH:
@ -259,7 +318,10 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
// Close connection
default:
logger.warning("Received unknown packet type: " + packet.type);
sendWillPacket();
/* FALLTHROUGH */
case MqttPacketHeader.PACKET_TYPE_DISCONNECT:
willPacket = null;
disconnected = true;
break;
}
@ -277,20 +339,20 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
MqttPacketSubscribeAck subscribeAckPacket = new MqttPacketSubscribeAck();
subscribeAckPacket.packetId = subscribePacket.packetId;
for (MqttSubscribePayload payload : subscribePacket.payload) {
for (MqttSubscribePayload payload : subscribePacket.payloads) {
broker.subscribe(payload.topicFilter, this);
// Prepare response
MqttSubscribeAckPayload ackPayload = new MqttSubscribeAckPayload();
ackPayload.returnCode = MqttSubscribeAckPayload.RETCODE_SUCESS_MAX_QOS_0;
subscribeAckPacket.payload.add(ackPayload);
subscribeAckPacket.payloads.add(ackPayload);
}
sendPacket(subscribeAckPacket);
}
private void handleUnsubscribe(MqttPacketUnsubscribe unsubscribePacket) throws IOException {
for (MqttUnsubscribePayload payload : unsubscribePacket.payload) {
for (MqttUnsubscribePayload payload : unsubscribePacket.payloads) {
broker.unsubscribe(payload.topicFilter, this);
}
@ -306,13 +368,20 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
// Data has been published to a subscribed topic.
}
private void sendWillPacket() throws IOException {
if (willPacket != null) {
sendPacket(willPacket);
willPacket = null;
}
}
public synchronized void sendPacket(MqttPacketHeader packet) throws IOException {
MqttPacket.write(out, packet);
}
public boolean isDisconnected() {
return disconnected;
}
}
}

View file

@ -49,31 +49,32 @@ public class MqttPacket {
// Resolve the correct header class
switch (packet.type) {
case PACKET_TYPE_CONN: packet = new MqttPacketConnect(); break;
case PACKET_TYPE_CONNACK: packet = new MqttPacketConnectAck(); break;
case PACKET_TYPE_CONNACK: packet = new MqttPacketConnectAck(); break; // no payload
case PACKET_TYPE_PUBLISH: packet = new MqttPacketPublish(); break;
case PACKET_TYPE_PUBACK: packet = new MqttPacketPublishAck(); break;
case PACKET_TYPE_PUBREC: packet = new MqttPacketPublishRec(); break;
case PACKET_TYPE_PUBREL: packet = new MqttPacketPublishRec(); break;
case PACKET_TYPE_PUBCOMP: packet = new MqttPacketPublishComp(); break;
case PACKET_TYPE_PUBACK: packet = new MqttPacketPublishAck(); break; // no payload
case PACKET_TYPE_PUBREC: /* FALLTHROUGH */
case PACKET_TYPE_PUBREL: packet = new MqttPacketPublishRec(); break; // no payload
case PACKET_TYPE_PUBCOMP: packet = new MqttPacketPublishComp(); break; // no payload
case PACKET_TYPE_SUBSCRIBE: packet = new MqttPacketSubscribe(); break;
case PACKET_TYPE_SUBACK: packet = new MqttPacketSubscribeAck(); break;
case PACKET_TYPE_UNSUBSCRIBE: packet = new MqttPacketUnsubscribe(); break;
case PACKET_TYPE_UNSUBACK: packet = new MqttPacketUnsubscribeAck(); break;
case PACKET_TYPE_PINGREQ: packet = new MqttPacketPingReq(); break;
case PACKET_TYPE_PINGRESP: packet = new MqttPacketPingResp(); break;
case PACKET_TYPE_DISCONNECT: packet = new MqttPacketDisconnect(); break;
case PACKET_TYPE_UNSUBACK: packet = new MqttPacketUnsubscribeAck(); break; // no payload
case PACKET_TYPE_PINGREQ: packet = new MqttPacketPingReq(); break; // no payload
case PACKET_TYPE_PINGRESP: packet = new MqttPacketPingResp(); break; // no payload
case PACKET_TYPE_DISCONNECT: packet = new MqttPacketDisconnect(); break; // no payload
default:
throw new IOException("Unknown header type: " + packet.type);
}
in.read(packet);
// TODO: payload
byte[] payload = new byte[Math.max(0, packet.variableHeaderAndPayloadLength - packet.calculateVariableHeaderLength())];
in.read(payload);
return packet;
}
public static void write(BinaryStructOutputStream out, MqttPacketHeader header) throws IOException{
header.variableHeaderAndPayloadLength = header.calculateVariableHeaderLength() + header.calculatePayloadLength();
out.write(header);
// TODO: payload
}
}

View file

@ -24,6 +24,14 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryFieldData;
import zutil.parser.binary.BinaryFieldSerializer;
import zutil.parser.binary.serializer.TwoByteLengthPrefixedDataSerializer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* This packet is the first message sent from a Client when it
* has established a connection to a Server. A Client can only
@ -39,7 +47,9 @@ public class MqttPacketConnect extends MqttPacketHeader {
type = MqttPacketHeader.PACKET_TYPE_CONN;
}
// ------------------------------------------
// Variable header
// ------------------------------------------
@BinaryField(index = 2001, length = 16)
private int protocolNameLength = 4;
@ -86,7 +96,7 @@ public class MqttPacketConnect extends MqttPacketHeader {
public boolean flagCleanSession;
@BinaryField(index = 2016, length = 1)
private boolean reserved;
public boolean flagReserved;
/**
@ -101,11 +111,91 @@ public class MqttPacketConnect extends MqttPacketHeader {
public int keepAlive;
// Payload:
// - Client identifier
// - Will Topic
// - Will message
// - User name
// - Password
@Override
public int calculateVariableHeaderLength() {
return 10;
}
// ------------------------------------------
// Payload
// ------------------------------------------
@CustomBinaryField(index = 3000, serializer = MqttPacketConnectPayloadSerializer.class)
public String clientIdentifier;
@CustomBinaryField(index = 3001, serializer = MqttPacketConnectPayloadSerializer.class)
public String willTopic;
@CustomBinaryField(index = 3002, serializer = MqttPacketConnectPayloadSerializer.class)
public byte[] willPayload;
@CustomBinaryField(index = 3003, serializer = MqttPacketConnectPayloadSerializer.class)
public String username;
@CustomBinaryField(index = 3004, serializer = MqttPacketConnectPayloadSerializer.class)
public String password;
@Override
public int calculatePayloadLength() {
int length = 0;
// Each String and byte[] is prefixed with a 2 byte length value in the payload
if (!flagCleanSession)
length += 2 + clientIdentifier.length();
if (flagWillFlag)
length += 2 + willTopic.length() + 2 + willPayload.length;
if (flagUsername)
length += 2 + username.length();
if (flagPassword)
length += 2 + password.length();
return length;
}
// ------------------------------------------
// Utilities
// ------------------------------------------
protected static class MqttPacketConnectPayloadSerializer implements BinaryFieldSerializer {
@Override
public Object read(InputStream in, BinaryFieldData field, Object parentObject) throws IOException {
MqttPacketConnect packet = (MqttPacketConnect) parentObject;
TwoByteLengthPrefixedDataSerializer serializer = new TwoByteLengthPrefixedDataSerializer();
if ("clientIdentifier".equals(field.getName()) && !packet.flagCleanSession ||
"willTopic".equals(field.getName()) && packet.flagWillFlag ||
"willPayload".equals(field.getName()) && packet.flagWillFlag ||
"username".equals(field.getName()) && packet.flagUsername ||
"password".equals(field.getName()) && packet.flagPassword) {
return serializer.read(in, field, parentObject);
}
return null;
}
@Override
public void write(OutputStream out, Object obj, BinaryFieldData field, Object parentObject) throws IOException {
MqttPacketConnect packet = (MqttPacketConnect) parentObject;
TwoByteLengthPrefixedDataSerializer serializer = new TwoByteLengthPrefixedDataSerializer();
if ("clientIdentifier".equals(field.getName()) && !packet.flagCleanSession ||
"willTopic".equals(field.getName()) && packet.flagWillFlag ||
"willPayload".equals(field.getName()) && packet.flagWillFlag ||
"username".equals(field.getName()) && packet.flagUsername ||
"password".equals(field.getName()) && packet.flagPassword) {
serializer.write(out, obj, field, parentObject);
}
}
@Override
public Object read(InputStream in, BinaryFieldData field) throws IOException {
return null;
}
@Override
public void write(OutputStream out, Object obj, BinaryFieldData field) throws IOException {}
}
}

View file

@ -30,7 +30,7 @@ package zutil.net.mqtt.packet;
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketConnectAck extends MqttPacketHeader{
public class MqttPacketConnectAck extends MqttPacketHeader {
public static final int RETCODE_OK = 0;
public static final int RETCODE_PROT_VER_ERROR = 1;
public static final int RETCODE_IDENTIFIER_REJECT = 2;
@ -44,10 +44,12 @@ public class MqttPacketConnectAck extends MqttPacketHeader{
type = MqttPacketHeader.PACKET_TYPE_CONNACK;
}
// ------------------------------------------
// Variable header
// ------------------------------------------
@BinaryField(index = 2000, length = 7)
private int flagReserved;
private int flagReserved = 0;
/** Indicates that there is a valid Session available */
@BinaryField(index = 2001, length = 1)
public boolean flagSessionPresent;
@ -55,6 +57,12 @@ public class MqttPacketConnectAck extends MqttPacketHeader{
@BinaryField(index = 2002, length = 8)
public int returnCode;
@Override
public int calculateVariableHeaderLength() {
return 2;
}
// No payload
}

View file

@ -31,7 +31,7 @@ package zutil.net.mqtt.packet;
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketDisconnect extends MqttPacketHeader{
public class MqttPacketDisconnect extends MqttPacketHeader {
// Header

View file

@ -26,6 +26,10 @@ package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
*
*/
@ -57,6 +61,27 @@ public class MqttPacketHeader implements BinaryStruct {
public byte flags;
@CustomBinaryField(index = 3, serializer = MqttVariableIntSerializer.class)
public int payloadLength;
public int variableHeaderAndPayloadLength;
// ------------------------------------------
// Variable Header
// ------------------------------------------
/**
* @return the calculated length of the variable MQTT header in bytes
*/
public int calculateVariableHeaderLength() {
return 0;
}
// ------------------------------------------
// Payload
// ------------------------------------------
/**
* @return the calculated length of assigned payload in bytes
*/
public int calculatePayloadLength() {
return 0;
}
}

View file

@ -29,7 +29,7 @@ package zutil.net.mqtt.packet;
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketPingReq extends MqttPacketHeader{
public class MqttPacketPingReq extends MqttPacketHeader {
// Header

View file

@ -31,7 +31,7 @@ package zutil.net.mqtt.packet;
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketPingResp extends MqttPacketHeader{
public class MqttPacketPingResp extends MqttPacketHeader {
// Header

View file

@ -26,6 +26,13 @@ package zutil.net.mqtt.packet;
import zutil.ByteUtil;
import zutil.parser.binary.BinaryFieldData;
import zutil.parser.binary.BinaryFieldSerializer;
import zutil.parser.binary.serializer.TwoByteLengthPrefixedDataSerializer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* A PUBLISH Control Packet is sent from a Client to a Server
@ -41,42 +48,83 @@ public class MqttPacketPublish extends MqttPacketHeader {
}
private byte flagDupBitmask = ByteUtil.getBitMask(3, 1);
private byte flagQoSBitmask = ByteUtil.getBitMask(1, 2);
private byte flagRetainBitmask = ByteUtil.getBitMask(0, 1);
private static final byte FLAG_DUP_BITMASK = ByteUtil.getBitMask(3, 1);
private static final byte FLAG_QOS_BITMASK = ByteUtil.getBitMask(1, 2);
private static final byte FLAG_RETAIN_BITMASK = ByteUtil.getBitMask(0, 1);
// ------------------------------------------
// Variable Header
// ------------------------------------------
@BinaryField(index = 2001, length = 16)
private int topicNameLength;
//@BinaryField(index = 2001, length = 16)
//private int topicNameLength;
/**
* The Topic Name identifies the information channel to which controlHeader data is published.
*/
@VariableLengthBinaryField(index = 2102, lengthField = "topicNameLength")
@CustomBinaryField(index = 2002, serializer = TwoByteLengthPrefixedDataSerializer.class)
public String topicName;
@BinaryField(index = 2002, length = 16)
@BinaryField(index = 2003, length = 16)
public int packetId;
@Override
public int calculateVariableHeaderLength() {
return 4 + (topicName != null ? topicName.length() : 0);
}
// ------------------------------------------
// Payload
// ------------------------------------------
// - Application data
@BinaryField(index = 3001, length = 100000)
@CustomBinaryField(index = 3000, serializer = MqttPacketPublishPayloadSerializer.class)
public byte[] payload;
@Override
public int calculatePayloadLength() {
return payload == null ? 0 : payload.length;
}
// ------------------------------------------
// Util methods
// ------------------------------------------
public boolean getFlagDup() {
return (flags & flagDupBitmask) != 0;
return (flags & FLAG_DUP_BITMASK) != 0;
}
public byte getFlagQoS() {
return (byte) ((flags & flagQoSBitmask) >> 1);
return (byte) ((flags & FLAG_QOS_BITMASK) >> 1);
}
public boolean getFlagRetain() {
return (flags & flagRetainBitmask) != 0;
return (flags & FLAG_RETAIN_BITMASK) != 0;
}
private static class MqttPacketPublishPayloadSerializer implements BinaryFieldSerializer<byte[]> {
@Override
public byte[] read(InputStream in, BinaryFieldData field) throws IOException {
return new byte[0];
}
@Override
public byte[] read(InputStream in, BinaryFieldData field, Object parentObject) throws IOException {
MqttPacketPublish publishPacket = (MqttPacketPublish) parentObject;
int payloadLength = Math.max(0, publishPacket.variableHeaderAndPayloadLength - publishPacket.calculateVariableHeaderLength());
byte[] payload = new byte[payloadLength];
in.read(payload);
return payload;
}
@Override
public void write(OutputStream out, byte[] obj, BinaryFieldData field) throws IOException {
if (obj != null)
out.write(obj);
}
}
}

View file

@ -30,7 +30,7 @@ package zutil.net.mqtt.packet;
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketPublishAck extends MqttPacketHeader{
public class MqttPacketPublishAck extends MqttPacketHeader {
// Header
@ -38,11 +38,19 @@ public class MqttPacketPublishAck extends MqttPacketHeader{
type = MqttPacketHeader.PACKET_TYPE_PUBACK;
}
// ------------------------------------------
// Variable Header
// ------------------------------------------
@BinaryField(index = 2000, length = 16)
public int packetId;
@Override
public int calculateVariableHeaderLength() {
return 2;
}
// No payload
}

View file

@ -32,7 +32,7 @@ package zutil.net.mqtt.packet;
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketPublishComp extends MqttPacketHeader{
public class MqttPacketPublishComp extends MqttPacketHeader {
// Header
@ -40,11 +40,19 @@ public class MqttPacketPublishComp extends MqttPacketHeader{
type = MqttPacketHeader.PACKET_TYPE_PUBCOMP;
}
// ------------------------------------------
// Variable Header
// ------------------------------------------
@BinaryField(index = 2000, length = 16)
public int packetId;
@Override
public int calculateVariableHeaderLength() {
return 2;
}
// No payload
}

View file

@ -32,7 +32,7 @@ package zutil.net.mqtt.packet;
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketPublishRec extends MqttPacketHeader{
public class MqttPacketPublishRec extends MqttPacketHeader {
// Header
@ -40,11 +40,19 @@ public class MqttPacketPublishRec extends MqttPacketHeader{
type = MqttPacketHeader.PACKET_TYPE_PUBREC;
}
// ------------------------------------------
// Variable Header
// ------------------------------------------
@BinaryField(index = 2000, length = 16)
public int packetId;
@Override
public int calculateVariableHeaderLength() {
return 2;
}
// No payload
}

View file

@ -40,7 +40,9 @@ public class MqttPacketPublishRel extends MqttPacketHeader{
type = MqttPacketHeader.PACKET_TYPE_PUBREL;
}
// ------------------------------------------
// Variable Header
// ------------------------------------------
@BinaryField(index = 2000, length = 16)
public int packetId;

View file

@ -24,9 +24,11 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
import zutil.parser.binary.*;
import zutil.parser.binary.serializer.BinaryStructListSerializer;
import zutil.parser.binary.serializer.TwoByteLengthPrefixedDataSerializer;
import java.util.LinkedList;
import java.util.ArrayList;
import java.util.List;
/**
@ -34,7 +36,7 @@ import java.util.List;
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketSubscribe extends MqttPacketHeader{
public class MqttPacketSubscribe extends MqttPacketHeader {
// Header
@ -42,28 +44,66 @@ public class MqttPacketSubscribe extends MqttPacketHeader{
type = MqttPacketHeader.PACKET_TYPE_SUBSCRIBE;
}
// ------------------------------------------
// Variable Header
// ------------------------------------------
@BinaryField(index = 2000, length = 16)
public int packetId;
@Override
public int calculateVariableHeaderLength() {
return 2;
}
// ------------------------------------------
// Payload
// ------------------------------------------
public List<MqttSubscribePayload> payload = new LinkedList<>();
@CustomBinaryField(index = 3000, serializer = MqttSubscribePayloadSerializer.class)
public List<MqttSubscribePayload> payloads = new ArrayList<>();
@Override
public int calculatePayloadLength() {
int length = 0;
for (MqttSubscribePayload p : payloads) {
length += p.calculatePayloadLength();
}
return length;
}
public static class MqttSubscribePayload implements BinaryStruct{
@BinaryField(index = 3001, length = 16)
private int topicFilterLength;
/** A filter indicating the Topic to which the Client wants to subscribe to*/
@VariableLengthBinaryField(index = 3002, lengthField = "topicFilterLength")
public static class MqttSubscribePayload implements BinaryStruct {
//@BinaryField(index = 3001, length = 16)
//private int topicFilterLength;
/** A filter indicating the Topic to which the Client wants to subscribe to **/
@CustomBinaryField(index = 3002, serializer = TwoByteLengthPrefixedDataSerializer.class)
public String topicFilter;
@BinaryField(index = 3003, length = 6)
private int reserved;
/** the maximum QoS level at which the Server can send Application Messages to the Client **/
@BinaryField(index = 3004, length = 2)
private int qos;
public int qos;
protected int calculatePayloadLength() {
return 2 + (topicFilter != null ? topicFilter.length() : 0) + 1;
}
}
private static class MqttSubscribePayloadSerializer extends BinaryStructListSerializer<MqttSubscribePayload> {
protected MqttSubscribePayloadSerializer() {
super(MqttSubscribePayload.class);
}
@Override
protected boolean readNext(int objIndex, int bytesRead, BinaryFieldData field, Object parentObject) {
MqttPacketSubscribe packetSubscribe = ((MqttPacketSubscribe) parentObject);
return bytesRead < packetSubscribe.variableHeaderAndPayloadLength - packetSubscribe.calculateVariableHeaderLength();
}
}
}

View file

@ -24,8 +24,12 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryFieldData;
import zutil.parser.binary.BinaryStruct;
import zutil.parser.binary.serializer.BinaryStructListSerializer;
import java.io.IOException;
import java.io.OutputStream;
import java.util.LinkedList;
import java.util.List;
@ -35,7 +39,7 @@ import java.util.List;
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketSubscribeAck extends MqttPacketHeader{
public class MqttPacketSubscribeAck extends MqttPacketHeader {
// Header
@ -43,16 +47,35 @@ public class MqttPacketSubscribeAck extends MqttPacketHeader{
type = MqttPacketHeader.PACKET_TYPE_SUBACK;
}
// ------------------------------------------
// Variable Header
// ------------------------------------------
@BinaryField(index = 2000, length = 16)
public int packetId;
@Override
public int calculateVariableHeaderLength() {
return 2;
}
// ------------------------------------------
// Payload
// ------------------------------------------
public List<MqttSubscribeAckPayload> payload = new LinkedList<>();
@CustomBinaryField(index = 3000, serializer = MqttSubscribeAckPayloadSerializer.class)
public List<MqttSubscribeAckPayload> payloads = new LinkedList<>();
@Override
public int calculatePayloadLength() {
int length = 0;
for (MqttSubscribeAckPayload p : payloads) {
length += p.calculatePayloadLength();
}
return length;
}
public static class MqttSubscribeAckPayload implements BinaryStruct{
public static final int RETCODE_SUCESS_MAX_QOS_0 = 0;
@ -62,5 +85,24 @@ public class MqttPacketSubscribeAck extends MqttPacketHeader{
@BinaryField(index = 3001, length = 8)
public int returnCode;
protected int calculatePayloadLength() {
return 1;
}
}
private static class MqttSubscribeAckPayloadSerializer extends BinaryStructListSerializer<MqttSubscribeAckPayload> {
protected MqttSubscribeAckPayloadSerializer() {
super(MqttSubscribeAckPayload.class);
}
@Override
protected boolean readNext(int objIndex, int bytesRead, BinaryFieldData field, Object parentObject) {
MqttPacketSubscribeAck packetSubscribe = ((MqttPacketSubscribeAck) parentObject);
return bytesRead < packetSubscribe.variableHeaderAndPayloadLength - packetSubscribe.calculateVariableHeaderLength();
}
}
}

View file

@ -24,7 +24,10 @@
package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryFieldData;
import zutil.parser.binary.BinaryStruct;
import zutil.parser.binary.serializer.BinaryStructListSerializer;
import zutil.parser.binary.serializer.TwoByteLengthPrefixedDataSerializer;
import java.util.LinkedList;
import java.util.List;
@ -34,7 +37,7 @@ import java.util.List;
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketUnsubscribe extends MqttPacketHeader{
public class MqttPacketUnsubscribe extends MqttPacketHeader {
// Header
@ -42,24 +45,59 @@ public class MqttPacketUnsubscribe extends MqttPacketHeader{
type = MqttPacketHeader.PACKET_TYPE_UNSUBSCRIBE;
}
// ------------------------------------------
// Variable Header
// ------------------------------------------
@BinaryField(index = 2000, length = 16)
public int packetId;
@Override
public int calculateVariableHeaderLength() {
return 2;
}
// ------------------------------------------
// Payload
// ------------------------------------------
public List<MqttUnsubscribePayload> payload = new LinkedList<>();
@CustomBinaryField(index = 3000, serializer = MqttSubscribePayloadSerializer.class)
public List<MqttUnsubscribePayload> payloads = new LinkedList<>();
@Override
public int calculatePayloadLength() {
int length = 0;
for (MqttUnsubscribePayload p : payloads) {
length += p.calculatePayloadLength();
}
return length;
}
public static class MqttUnsubscribePayload implements BinaryStruct{
@BinaryField(index = 3001, length = 16)
private int topicFilterLength;
//@BinaryField(index = 3001, length = 16)
//private int topicFilterLength;
/** A filter indicating the Topic to which the Client wants to subscribe to*/
@VariableLengthBinaryField(index = 3002, lengthField = "topicFilterLength")
@CustomBinaryField(index = 3002, serializer = TwoByteLengthPrefixedDataSerializer.class)
public String topicFilter;
protected int calculatePayloadLength() {
return 2 + (topicFilter != null ? topicFilter.length() : 0);
}
}
private static class MqttSubscribePayloadSerializer extends BinaryStructListSerializer<MqttUnsubscribePayload> {
protected MqttSubscribePayloadSerializer() {
super(MqttUnsubscribePayload.class);
}
@Override
protected boolean readNext(int objIndex, int bytesRead, BinaryFieldData field, Object parentObject) {
MqttPacketUnsubscribe packetSubscribe = ((MqttPacketUnsubscribe) parentObject);
return bytesRead < packetSubscribe.variableHeaderAndPayloadLength - packetSubscribe.calculateVariableHeaderLength();
}
}
}

View file

@ -31,7 +31,7 @@ package zutil.net.mqtt.packet;
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttPacketUnsubscribeAck extends MqttPacketHeader{
public class MqttPacketUnsubscribeAck extends MqttPacketHeader {
// Header
@ -39,11 +39,19 @@ public class MqttPacketUnsubscribeAck extends MqttPacketHeader{
type = MqttPacketHeader.PACKET_TYPE_UNSUBACK;
}
// ------------------------------------------
// Variable Header
// ------------------------------------------
@BinaryField(index = 2000, length = 16)
public int packetId;
@Override
public int calculateVariableHeaderLength() {
return 2;
}
// No payload
}

View file

@ -34,7 +34,7 @@ import java.io.OutputStream;
/**
* Code from MQTT specification
*/
public class MqttVariableIntSerializer implements BinaryFieldSerializer<Integer>{
public class MqttVariableIntSerializer implements BinaryFieldSerializer<Integer> {
@Override
public Integer read(InputStream in, BinaryFieldData field) throws IOException {

View file

@ -25,13 +25,12 @@
package zutil.parser.binary;
import zutil.ByteUtil;
import zutil.io.PositionalInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* A stream class that parses a byte stream into binary struct objects.
@ -79,13 +78,13 @@ public class BinaryStructInputStream extends InputStream{
*/
public int read(BinaryStruct struct) throws IOException {
List<BinaryFieldData> structDataList = BinaryFieldData.getStructFieldList(struct.getClass());
PositionalInputStream positionalInputStream = new PositionalInputStream(in);
int totalReadLength = 0;
for (BinaryFieldData field : structDataList) {
if (field.hasSerializer()) {
BinaryFieldSerializer<Object> serializer = field.getSerializer();
BinaryFieldSerializer serializer = field.getSerializer();
Object value = serializer.read(in, field, struct);
Object value = serializer.read(positionalInputStream, field, struct);
field.setValue(struct, value);
} else {
byte[] valueData = new byte[(int) Math.ceil(field.getBitLength(struct) / 8.0)];
@ -95,7 +94,7 @@ public class BinaryStructInputStream extends InputStream{
// Parse value
for (int valueDataIndex=valueData.length-1; valueDataIndex >= 0; --valueDataIndex) {
if (dataBitIndex < 0) { // Read new data?
data = (byte) in.read();
data = (byte) positionalInputStream.read();
dataBitIndex = 7;
}
int subBitLength = Math.min(dataBitIndex + 1, field.getBitLength(struct) - fieldReadLength);
@ -106,11 +105,10 @@ public class BinaryStructInputStream extends InputStream{
// Set value
ByteUtil.shiftLeft(valueData, shiftBy); // shift data so that LSB is at the beginning
field.setByteValue(struct, valueData);
totalReadLength += fieldReadLength;
}
}
return totalReadLength;
return (int) positionalInputStream.getPosition();
}
@Override
@ -123,6 +121,11 @@ public class BinaryStructInputStream extends InputStream{
return in.read(b, off, len);
}
@Override
public int available() throws IOException {
return in.available();
}
/**
* @see InputStream#markSupported()
*/
@ -147,7 +150,6 @@ public class BinaryStructInputStream extends InputStream{
}
protected static int shiftLeftBy(int bitIndex, int bitLength) {
return (8 - ((7-bitIndex) + bitLength) % 8) % 8;
}

View file

@ -0,0 +1,73 @@
package zutil.parser.binary.serializer;
import zutil.parser.binary.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
/**
* This serializer handles a List field that contains the same type of objects.
* This class needs to be extended by a more specific subclass that can provide
* the list class type and handling the flow control of the read action.
*
* @param <T>
*/
public abstract class BinaryStructListSerializer<T extends BinaryStruct> implements BinaryFieldSerializer<List<T>> {
private Class<T> listClass;
protected BinaryStructListSerializer(Class<T> clazz) {
listClass = clazz;
}
@Override
public List<T> read(InputStream in, BinaryFieldData field) throws IOException {
return null;
}
@Override
public List<T> read(InputStream in, BinaryFieldData field, Object parentObject) throws IOException {
BinaryStructInputStream structIn = new BinaryStructInputStream(in);
List<T> list = new ArrayList<>();
try {
int bytesRead = 0;
while (readNext(list.size(), bytesRead, field, parentObject)) {
T obj = listClass.getDeclaredConstructor().newInstance();
bytesRead += structIn.read(obj);
list.add(obj);
}
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
return list;
}
/**
* Method is used to determine if the next object should be read from the stream.
*
* @param objIndex the number of objects that have been read so far.
* @param bytesRead number of bytes that have been read from the stream so far.
* @param field meta-data about the target field that will be assigned.
* @param parentObject the parent object that owns the field.
* @return true if another object can be read from the stream, false if this is the end of the struct field.
*/
protected abstract boolean readNext(int objIndex, int bytesRead, BinaryFieldData field, Object parentObject);
@Override
public void write(OutputStream out, List<T> list, BinaryFieldData field) throws IOException {
BinaryStructOutputStream structOut = new BinaryStructOutputStream(out);
for (T obj : list) {
structOut.write(obj);
}
}
}

View file

@ -24,6 +24,8 @@
package zutil.parser.binary.serializer;
import zutil.ByteUtil;
import zutil.converter.Converter;
import zutil.parser.binary.BinaryFieldData;
import zutil.parser.binary.BinaryFieldSerializer;
@ -34,7 +36,7 @@ import java.io.StreamCorruptedException;
import java.nio.charset.StandardCharsets;
/**
* Serializer handles data that is prefixed by two byte length.
* Serializer handles data that is prefixed by two byte length. Null objects will be prefixed by two zero bytes indicating length 0.
* <p>
* Currently only these types are supported:
* <ul>
@ -46,15 +48,16 @@ public class TwoByteLengthPrefixedDataSerializer implements BinaryFieldSerialize
@Override
public Object read(InputStream in, BinaryFieldData field) throws IOException {
int b = in.read();
if (b < 0)
int b1, b2;
if ((b1 = in.read()) < 0)
throw new StreamCorruptedException("Stream ended prematurely when reading first length byte.");
int length = (b & 0xFF) << 8;
b = in.read();
if (b < 0)
if ((b2 = in.read()) < 0)
throw new StreamCorruptedException("Stream ended prematurely when reading second length byte.");
length |= b & 0xFF;
int length = Converter.toInt(new byte[]{
(byte) (0XFF & b2),
(byte) (0xFF & b1)
});
byte[] payload = new byte[length];
in.read(payload);
@ -66,14 +69,19 @@ public class TwoByteLengthPrefixedDataSerializer implements BinaryFieldSerialize
@Override
public void write(OutputStream out, Object obj, BinaryFieldData field) throws IOException {
if (obj == null)
if (obj == null) {
out.write(0);
out.write(0);
return;
}
byte[] payload;
if (obj instanceof String)
payload = ((String) obj).getBytes(StandardCharsets.UTF_8);
else
else if (obj instanceof byte[])
payload = (byte[]) obj;
else
throw new UnsupportedOperationException("Class type not supported for serialization: " + obj.getClass().getSimpleName());
int length = payload.length;

View file

@ -66,7 +66,7 @@ public class MqttBrokerTest {
MqttPacketHeader responsePacket = thread.sentPackets.poll();
assertEquals(MqttPacketSubscribeAck.class, responsePacket.getClass());
assertEquals(subscribePacket.packetId, ((MqttPacketSubscribeAck)responsePacket).packetId);
assertEquals(subscribePacket.payload.size(), ((MqttPacketSubscribeAck)responsePacket).payload.size());
assertEquals(subscribePacket.payloads.size(), ((MqttPacketSubscribeAck)responsePacket).payloads.size());
}
@Test
@ -76,10 +76,10 @@ public class MqttBrokerTest {
MqttPacketSubscribe subscribePacket = new MqttPacketSubscribe();
subscribePacket.packetId = (int)(Math.random()*1000);
subscribePacket.payload.add(new MqttSubscribePayload());
subscribePacket.payload.get(0).topicFilter = "topic1";
subscribePacket.payload.add(new MqttSubscribePayload());
subscribePacket.payload.get(1).topicFilter = "topic2";
subscribePacket.payloads.add(new MqttSubscribePayload());
subscribePacket.payloads.get(0).topicFilter = "topic1";
subscribePacket.payloads.add(new MqttSubscribePayload());
subscribePacket.payloads.get(1).topicFilter = "topic2";
thread.handlePacket(subscribePacket);
@ -87,7 +87,7 @@ public class MqttBrokerTest {
MqttPacketHeader responsePacket = thread.sentPackets.poll();
assertEquals(MqttPacketSubscribeAck.class, responsePacket.getClass());
assertEquals(subscribePacket.packetId, ((MqttPacketSubscribeAck)responsePacket).packetId);
assertEquals(subscribePacket.payload.size(), ((MqttPacketSubscribeAck)responsePacket).payload.size());
assertEquals(subscribePacket.payloads.size(), ((MqttPacketSubscribeAck)responsePacket).payloads.size());
// Check broker
assertEquals(1, broker.getSubscriberCount("topic1"));
assertEquals(1, broker.getSubscriberCount("topic2"));
@ -95,9 +95,9 @@ public class MqttBrokerTest {
//************************ Duplicate subscribe packet
subscribePacket.packetId = (int)(Math.random()*1000);
subscribePacket.payload.clear();
subscribePacket.payload.add(new MqttSubscribePayload());
subscribePacket.payload.get(0).topicFilter = "topic1";
subscribePacket.payloads.clear();
subscribePacket.payloads.add(new MqttSubscribePayload());
subscribePacket.payloads.get(0).topicFilter = "topic1";
thread.handlePacket(subscribePacket);
@ -166,8 +166,8 @@ public class MqttBrokerTest {
MqttPacketUnsubscribe unsubscribePacket = new MqttPacketUnsubscribe();
unsubscribePacket.packetId = (int)(Math.random()*1000);
unsubscribePacket.payload.add(new MqttUnsubscribePayload());
unsubscribePacket.payload.get(0).topicFilter = "topic1";
unsubscribePacket.payloads.add(new MqttUnsubscribePayload());
unsubscribePacket.payloads.get(0).topicFilter = "topic1";
thread.handlePacket(unsubscribePacket);
@ -183,10 +183,10 @@ public class MqttBrokerTest {
MqttPacketSubscribe subscribePacket = new MqttPacketSubscribe();
subscribePacket.packetId = (int)(Math.random()*1000);
subscribePacket.payload.add(new MqttSubscribePayload());
subscribePacket.payload.get(0).topicFilter = "topic1";
subscribePacket.payload.add(new MqttSubscribePayload());
subscribePacket.payload.get(1).topicFilter = "topic2";
subscribePacket.payloads.add(new MqttSubscribePayload());
subscribePacket.payloads.get(0).topicFilter = "topic1";
subscribePacket.payloads.add(new MqttSubscribePayload());
subscribePacket.payloads.get(1).topicFilter = "topic2";
thread.handlePacket(subscribePacket);

View file

@ -38,33 +38,81 @@ import static org.junit.Assert.*;
public class MqttPacketConnectTest {
char[] data = new char[]{
// Fixed Header
0b0001_0000, // Packet Type + Reserved
0b0000_1010, // Variable Header + Payload Length
// Variable Header
0b0000_0000, // length
0b0000_0100, // length
0b0100_1101, // 'M'
0b0101_0001, // 'Q'
0b0101_0100, // 'T'
0b0101_0100, // 'T'
0b0000_0100, // Prot. Level
0b1100_1110, // Flags
0b0000_0000, // Keep alive
0b0000_1010, // Keep alive
};
@Test
public void decode() throws IOException {
MqttPacketConnect obj = (MqttPacketConnect)MqttPacket.read(
new BinaryStructInputStream(
new ByteArrayInputStream(
Converter.toBytes(data))));
char[] data = new char[]{
// Fixed Header
0b0001_0000, // Packet Type(4) + Reserved(4)
0b0000_1010, // Variable Header + Payload Length
// Variable Header
0b0000_0000, // length
0b0000_0100, // length
0b0100_1101, // 'M'
0b0101_0001, // 'Q'
0b0101_0100, // 'T'
0b0101_0100, // 'T'
0b0000_0100, // Prot. Level
0b0101_1010, // Flags
0b0000_0000, // Keep alive
0b0000_1010, // Keep alive
// Payload
0x00, 0x01, '5', // password
};
MqttPacketConnect obj = (MqttPacketConnect) MqttPacket.read(
new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data))));
assertEquals("MQTT", obj.protocolName);
assertEquals(4, obj.protocolLevel);
assertEquals(10, obj.keepAlive);
assertFalse(obj.flagUsername);
assertTrue(obj.flagPassword);
assertFalse(obj.flagWillRetain);
assertEquals(3, obj.flagWillQoS);
assertFalse(obj.flagWillFlag);
assertTrue(obj.flagCleanSession);
assertNull(obj.clientIdentifier);
assertNull(obj.willTopic);
assertNull(null, obj.willPayload);
assertNull(obj.username);
assertEquals("5", obj.password);
}
@Test
public void decodePayload() throws IOException {
char[] data = new char[]{
// Fixed Header
0b0001_0000, // Packet Type + Reserved
0b0000_1010, // Variable Header + Payload Length
// Variable Header
0b0000_0000, // length
0b0000_0100, // length
0b0100_1101, // 'M'
0b0101_0001, // 'Q'
0b0101_0100, // 'T'
0b0101_0100, // 'T'
0b0000_0100, // Prot. Level
0b1100_1100, // Flags
0b0000_0000, // Keep alive
0b0000_1010, // Keep alive
// Payload
0x00, 0x01, '1', // Client Identifier
0x00, 0x01, '2', // Will Topic
0x00, 0x01, 0x03, // Will payload: 3
0x00, 0x01, '4', // Username
0x00, 0x01, '5', // password
};
MqttPacketConnect obj = (MqttPacketConnect) MqttPacket.read(
new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data))));
assertEquals("MQTT", obj.protocolName);
assertEquals(4, obj.protocolLevel);
@ -75,20 +123,47 @@ public class MqttPacketConnectTest {
assertFalse(obj.flagWillRetain);
assertEquals(1, obj.flagWillQoS);
assertTrue(obj.flagWillFlag);
assertTrue(obj.flagCleanSession);
assertFalse(obj.flagCleanSession);
assertEquals("1", obj.clientIdentifier);
assertEquals("2", obj.willTopic);
assertArrayEquals(new byte[]{3}, obj.willPayload);
assertEquals("4", obj.username);
assertEquals("5", obj.password);
}
@Test
public void encode() throws IOException {
char[] data = new char[]{
// Fixed Header
0b0001_0000, // Packet Type + Reserved
0b0000_1010, // Variable Header + Payload Length
// Variable Header
0b0000_0000, // length
0b0000_0100, // length
0b0100_1101, // 'M'
0b0101_0001, // 'Q'
0b0101_0100, // 'T'
0b0101_0100, // 'T'
0b0000_0100, // Prot. Level
0b0000_1010, // Flags
0b0000_0000, // Keep alive
0b0000_1010, // Keep alive
// Payload
};
MqttPacketConnect obj = new MqttPacketConnect();
obj.payloadLength = 10;
obj.variableHeaderAndPayloadLength = 10;
obj.keepAlive = 10;
obj.flagUsername = true;
obj.flagPassword = true;
obj.flagUsername = false;
obj.flagPassword = false;
obj.flagWillRetain = false;
obj.flagWillQoS = 1;
obj.flagWillFlag = true;
obj.flagWillFlag = false;
obj.flagCleanSession = true;
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
@ -96,4 +171,55 @@ public class MqttPacketConnectTest {
MqttPacket.write(binOut, obj);
assertArrayEquals(Converter.toBytes(data), buffer.toByteArray());
}
@Test
public void encodePayload() throws IOException {
char[] data = new char[]{
// Fixed Header
0b0001_0000, // Packet Type(4) + Reserved(4)
0xFF & 25, // Variable Header + Payload Length
// Variable Header
0b0000_0000, // length
0b0000_0100, // length
0b0100_1101, // 'M'
0b0101_0001, // 'Q'
0b0101_0100, // 'T'
0b0101_0100, // 'T'
0b0000_0100, // Prot. Level
0b1101_0100, // Flags
0b0000_0000, // Keep alive
0b0000_1010, // Keep alive
// Payload
0x00, 0x01, '1', // Client Identifier: 1
0x00, 0x01, '2', // Will Topic: 2
0x00, 0x01, 0x03, // Will payload: 3
0x00, 0x01, '4', // Username: 4
0x00, 0x01, '5', // password: 5
};
MqttPacketConnect obj = new MqttPacketConnect();
obj.variableHeaderAndPayloadLength = 10;
obj.keepAlive = 10;
obj.flagUsername = true;
obj.flagPassword = true;
obj.flagWillRetain = false;
obj.flagWillQoS = 2;
obj.flagWillFlag = true;
obj.flagCleanSession = false;
obj.clientIdentifier = "1";
obj.willTopic = "2";
obj.willPayload = new byte[]{3};
obj.username = "4";
obj.password = "5";
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
BinaryStructOutputStream binOut = new BinaryStructOutputStream(buffer);
MqttPacket.write(binOut, obj);
assertArrayEquals(Converter.toBytes(data), buffer.toByteArray());
}
}

View file

@ -0,0 +1,93 @@
package zutil.net.mqtt.packet;
import org.junit.Test;
import zutil.converter.Converter;
import zutil.parser.binary.BinaryStructInputStream;
import zutil.parser.binary.BinaryStructOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import static org.junit.Assert.*;
public class MqttPacketPublishTest {
@Test
public void decode() throws IOException {
char[] data = new char[]{
// Fixed Header
0b0011_0000, // Packet Type(4) + Reserved(4)
0xFF & 6, // Variable Header + Payload Length
// Variable Header
0b0000_0000, // length
0xFF & 2, // length
'a', // Topic Name
'b', // Topic Name
0b0000_0000, // Packet Identifier
0xFF & 5, // Packet Identifier
// Payload
};
MqttPacketPublish obj = (MqttPacketPublish) MqttPacket.read(
new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data))));
assertEquals(6, obj.variableHeaderAndPayloadLength);
assertEquals("ab", obj.topicName);
assertEquals(5, obj.packetId);
assertArrayEquals(new byte[0], obj.payload);
}
@Test
public void decodePayload() throws IOException {
char[] data = new char[]{
// Fixed Header
0b0011_0000, // Packet Type(4) + Reserved(4)
0xFF & 9, // Variable Header + Payload Length
// Variable Header
0b0000_0000, // length
0xFF & 2, // length
'a', // Topic Name
'b', // Topic Name
0b0000_0000, // Packet Identifier
0xFF & 5, // Packet Identifier
// Payload
0x00, 0x01, 0x02,
};
MqttPacketPublish obj = (MqttPacketPublish) MqttPacket.read(
new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data))));
assertEquals(9, obj.variableHeaderAndPayloadLength);
assertEquals("ab", obj.topicName);
assertEquals(5, obj.packetId);
assertArrayEquals(new byte[]{0x00, 0x01, 0x02}, obj.payload);
}
@Test
public void encode() throws IOException {
char[] data = new char[]{
// Fixed Header
0b0011_0000, // Packet Type(4) + Reserved(4)
0xFF & 6, // Variable Header + Payload Length
// Variable Header
0b0000_0000, // length
0xFF & 2, // length
'a', // Topic Name
'b', // Topic Name
0b0000_0000, // Packet Identifier
0xFF & 5, // Packet Identifier
// Payload
};
MqttPacketPublish obj = new MqttPacketPublish();
obj.variableHeaderAndPayloadLength = 5;
obj.topicName = "ab";
obj.packetId = 5;
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
BinaryStructOutputStream binOut = new BinaryStructOutputStream(buffer);
MqttPacket.write(binOut, obj);
assertArrayEquals(Converter.toBytes(data), buffer.toByteArray());
}
}

View file

@ -0,0 +1,80 @@
package zutil.net.mqtt.packet;
import org.junit.Test;
import zutil.converter.Converter;
import zutil.net.mqtt.packet.MqttPacketSubscribeAck.MqttSubscribeAckPayload;
import zutil.parser.binary.BinaryStructInputStream;
import zutil.parser.binary.BinaryStructOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import static org.junit.Assert.*;
public class MqttPacketSubscribeAckTest {
@Test
public void decode() throws IOException {
char[] data = new char[]{
// Fixed Header
0b1001_0000, // Packet Type(4) + Reserved(4)
0xFF & 4, // Variable Header + Payload Length
// Variable Header
0b0000_0000, // Packet Identifier
0xFF & 8, // Packet Identifier
// Payload
// -- Item 1
0b0000_0001, // Return code
// -- Item 2
0b0000_0010, // Return code
};
MqttPacketSubscribeAck obj = (MqttPacketSubscribeAck) MqttPacket.read(
new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data))));
assertEquals(4, obj.variableHeaderAndPayloadLength);
assertEquals(8, obj.packetId);
assertEquals(2, obj.payloads.size());
assertEquals(MqttSubscribeAckPayload.RETCODE_SUCESS_MAX_QOS_1, obj.payloads.get(0).returnCode);
assertEquals(MqttSubscribeAckPayload.RETCODE_SUCESS_MAX_QOS_2, obj.payloads.get(1).returnCode);
}
@Test
public void encode() throws IOException {
char[] data = new char[]{
// Fixed Header
0b1001_0000, // Packet Type(4) + Reserved(4)
0xFF & 4, // Variable Header + Payload Length
// Variable Header
0b0000_0000, // Packet Identifier
0xFF & 8, // Packet Identifier
// Payload
// -- Item 1
0b0000_0001, // Return code
// -- Item 2
0b0000_0010, // Return code
};
MqttPacketSubscribeAck obj = new MqttPacketSubscribeAck();
obj.variableHeaderAndPayloadLength = 4;
obj.packetId = 8;
MqttSubscribeAckPayload p1 = new MqttSubscribeAckPayload();
p1.returnCode = MqttSubscribeAckPayload.RETCODE_SUCESS_MAX_QOS_1;
obj.payloads.add(p1);
MqttSubscribeAckPayload p2 = new MqttSubscribeAckPayload();
p2.returnCode = MqttSubscribeAckPayload.RETCODE_SUCESS_MAX_QOS_2;
obj.payloads.add(p2);
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
BinaryStructOutputStream binOut = new BinaryStructOutputStream(buffer);
MqttPacket.write(binOut, obj);
assertArrayEquals(Converter.toBytes(data), buffer.toByteArray());
}
}

View file

@ -0,0 +1,98 @@
package zutil.net.mqtt.packet;
import org.junit.Test;
import zutil.converter.Converter;
import zutil.net.mqtt.packet.MqttPacketSubscribe.MqttSubscribePayload;
import zutil.parser.binary.BinaryStructInputStream;
import zutil.parser.binary.BinaryStructOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import static org.junit.Assert.*;
public class MqttPacketSubscribeTest {
@Test
public void decode() throws IOException {
char[] data = new char[]{
// Fixed Header
0b1000_0000, // Packet Type(4) + Reserved(4)
0xFF & 12, // Variable Header + Payload Length
// Variable Header
0b0000_0000, // Packet Identifier
0xFF & 8, // Packet Identifier
// Payload
// -- Item 1
0b0000_0000, // length
0xFF & 2, // length
'a', // Topic Name
'b', // Topic Name
0b000000_00, // Reserved(6) + QoS(2)
// -- Item 2
0b0000_0000, // length
0xFF & 2, // length
'c', // Topic Name
'd', // Topic Name
0b000000_01, // Reserved(6) + QoS(2)
};
MqttPacketSubscribe obj = (MqttPacketSubscribe) MqttPacket.read(
new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data))));
assertEquals(12, obj.variableHeaderAndPayloadLength);
assertEquals(8, obj.packetId);
assertEquals(2, obj.payloads.size());
assertEquals("ab", obj.payloads.get(0).topicFilter);
assertEquals(0, obj.payloads.get(0).qos);
assertEquals("cd", obj.payloads.get(1).topicFilter);
assertEquals(1, obj.payloads.get(1).qos);
}
@Test
public void encode() throws IOException {
char[] data = new char[]{
// Fixed Header
0b1000_0000, // Packet Type(4) + Reserved(4)
0xFF & 12, // Variable Header + Payload Length
// Variable Header
0b0000_0000, // Packet Identifier
0xFF & 8, // Packet Identifier
// Payload
// -- Item 1
0b0000_0000, // length
0xFF & 2, // length
'a', // Topic Name
'b', // Topic Name
0b000000_00, // Reserved(6) + QoS(2)
// -- Item 2
0b0000_0000, // length
0xFF & 2, // length
'c', // Topic Name
'd', // Topic Name
0b000000_01, // Reserved(6) + QoS(2)
};
MqttPacketSubscribe obj = new MqttPacketSubscribe();
obj.variableHeaderAndPayloadLength = 12;
obj.packetId = 8;
MqttSubscribePayload p1 = new MqttSubscribePayload();
p1.topicFilter = "ab";
p1.qos = 0;
obj.payloads.add(p1);
MqttSubscribePayload p2 = new MqttSubscribePayload();
p2.topicFilter = "cd";
p2.qos = 1;
obj.payloads.add(p2);
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
BinaryStructOutputStream binOut = new BinaryStructOutputStream(buffer);
MqttPacket.write(binOut, obj);
assertArrayEquals(Converter.toBytes(data), buffer.toByteArray());
}
}

View file

@ -0,0 +1,91 @@
package zutil.net.mqtt.packet;
import org.junit.Test;
import zutil.converter.Converter;
import zutil.net.mqtt.packet.MqttPacketUnsubscribe.MqttUnsubscribePayload;
import zutil.parser.binary.BinaryStructInputStream;
import zutil.parser.binary.BinaryStructOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import static org.junit.Assert.*;
public class MqttPacketUnsubscribeTest {
@Test
public void decode() throws IOException {
char[] data = new char[]{
// Fixed Header
0b1010_0000, // Packet Type(4) + Reserved(4)
0xFF & 10, // Variable Header + Payload Length
// Variable Header
0b0000_0000, // Packet Identifier
0xFF & 8, // Packet Identifier
// Payload
// -- Item 1
0b0000_0000, // length
0xFF & 2, // length
'a', // Topic Name
'b', // Topic Name
// -- Item 2
0b0000_0000, // length
0xFF & 2, // length
'c', // Topic Name
'd', // Topic Name
};
MqttPacketUnsubscribe obj = (MqttPacketUnsubscribe) MqttPacket.read(
new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data))));
assertEquals(10, obj.variableHeaderAndPayloadLength);
assertEquals(8, obj.packetId);
assertEquals(2, obj.payloads.size());
assertEquals("ab", obj.payloads.get(0).topicFilter);
assertEquals("cd", obj.payloads.get(1).topicFilter);
}
@Test
public void encode() throws IOException {
char[] data = new char[]{
// Fixed Header
0b1010_0000, // Packet Type(4) + Reserved(4)
0xFF & 10, // Variable Header + Payload Length
// Variable Header
0b0000_0000, // Packet Identifier
0xFF & 8, // Packet Identifier
// Payload
// -- Item 1
0b0000_0000, // length
0xFF & 2, // length
'a', // Topic Name
'b', // Topic Name
// -- Item 2
0b0000_0000, // length
0xFF & 2, // length
'c', // Topic Name
'd', // Topic Name
};
MqttPacketUnsubscribe obj = new MqttPacketUnsubscribe();
obj.variableHeaderAndPayloadLength = 10;
obj.packetId = 8;
MqttUnsubscribePayload p1 = new MqttUnsubscribePayload();
p1.topicFilter = "ab";
obj.payloads.add(p1);
MqttUnsubscribePayload p2 = new MqttUnsubscribePayload();
p2.topicFilter = "cd";
obj.payloads.add(p2);
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
BinaryStructOutputStream binOut = new BinaryStructOutputStream(buffer);
MqttPacket.write(binOut, obj);
assertArrayEquals(Converter.toBytes(data), buffer.toByteArray());
}
}

View file

@ -72,10 +72,10 @@ public class TwoByteLengthPrefixedDataSerializerTest implements BinaryStruct {
// 0 Length
outputStream.reset();outputStream.reset();
serializer.write(outputStream, null, stringFieldData, this);
assertArrayEquals(new byte[]{}, outputStream.toByteArray());
assertArrayEquals(new byte[]{0, 0}, outputStream.toByteArray());
outputStream.reset();
serializer.write(outputStream, null, byteFieldData, this);
assertArrayEquals(new byte[]{}, outputStream.toByteArray());
assertArrayEquals(new byte[]{0, 0}, outputStream.toByteArray());
// 0 Length
outputStream.reset();