diff --git a/src/zutil/io/PositionalInputStream.java b/src/zutil/io/PositionalInputStream.java index f6044d6..a532efb 100644 --- a/src/zutil/io/PositionalInputStream.java +++ b/src/zutil/io/PositionalInputStream.java @@ -28,6 +28,14 @@ public class PositionalInputStream extends FilterInputStream { return pos; } + /** + * Method will reset the position to 0. + */ + public synchronized void resetPosition() { + pos = 0; + mark = 0; + } + @Override public int read() throws IOException { @@ -72,11 +80,10 @@ public class PositionalInputStream extends FilterInputStream { public synchronized void reset() throws IOException { super.reset(); - synchronized(this) { - // Only update the position if mark is supported, - // as reset succeeds even if mark is not supported. - if (markSupported()) - pos = mark; - } + // Only update the position if mark is supported, + // as reset succeeds even if mark is not supported. + if (markSupported()) + pos = mark; + } } diff --git a/src/zutil/net/mqtt/MqttBroker.java b/src/zutil/net/mqtt/MqttBroker.java index b5cb2dc..8463d21 100755 --- a/src/zutil/net/mqtt/MqttBroker.java +++ b/src/zutil/net/mqtt/MqttBroker.java @@ -34,7 +34,9 @@ import zutil.net.threaded.ThreadedTCPNetworkServerThread; import zutil.parser.binary.BinaryStructInputStream; import zutil.parser.binary.BinaryStructOutputStream; +import java.io.BufferedInputStream; import java.io.IOException; +import java.io.InputStream; import java.net.Socket; import java.util.*; import java.util.logging.Level; @@ -55,10 +57,16 @@ public class MqttBroker extends ThreadedTCPNetworkServer { private MqttSubscriptionListener globalListener; private Map> subscriptionListeners = new HashMap<>(); + public MqttBroker() throws IOException { super(MQTT_PORT); } + public MqttBroker(int port) throws IOException { + super(port); + } + + @Override protected ThreadedTCPNetworkServerThread getThreadInstance(Socket s) throws IOException { return new MqttConnectionThread(this, s); @@ -108,19 +116,16 @@ public class MqttBroker extends ThreadedTCPNetworkServer { * Publish data to the specific topic */ public void publish(String topic, byte[] data) { - if (!subscriptionListeners.containsKey(topic)) { - logger.fine("Data was published to topic (" + topic + ") with no subscribers."); - return; - } - logger.finer("Data has been published to topic (" + topic + ")"); - List topicSubscriptions = subscriptionListeners.get(topic); if (globalListener != null) globalListener.dataPublished(topic, data); - for (MqttSubscriptionListener subscriber : topicSubscriptions) { - subscriber.dataPublished(topic, data); + List topicSubscriptions = subscriptionListeners.get(topic); + if (topicSubscriptions != null) { + for (MqttSubscriptionListener subscriber : topicSubscriptions) { + subscriber.dataPublished(topic, data); + } } } @@ -166,6 +171,10 @@ public class MqttBroker extends ThreadedTCPNetworkServer { private BinaryStructOutputStream out; private boolean disconnected = false; + /** A message that should be sent in case the connection to client is abnormally disconnected */ + private MqttPacketHeader willPacket = null; + /** The maximum amount of time(seconds) to wait for activity from client, 0 means no timeout */ + private int connectionTimeoutTime = 0; /** * Test constructor @@ -177,7 +186,12 @@ public class MqttBroker extends ThreadedTCPNetworkServer { public MqttConnectionThread(MqttBroker b, Socket s) throws IOException { this(b); socket = s; - in = new BinaryStructInputStream(socket.getInputStream()); + + InputStream baseInputstream = socket.getInputStream(); + if (!baseInputstream.markSupported()) + baseInputstream = new BufferedInputStream(baseInputstream); + + in = new BinaryStructInputStream(baseInputstream); out = new BinaryStructOutputStream(socket.getOutputStream()); } @@ -203,6 +217,8 @@ public class MqttBroker extends ThreadedTCPNetworkServer { logger.log(Level.SEVERE, null, e); } finally { try { + sendWillPacket(); + socket.close(); broker.unsubscribe(this); } catch (IOException e) { @@ -220,6 +236,10 @@ public class MqttBroker extends ThreadedTCPNetworkServer { // Reply MqttPacketConnectAck connectAck = new MqttPacketConnectAck(); + // ---------------------------------- + // Handling Header + // ---------------------------------- + // Incorrect protocol version? if (conn.protocolLevel != MQTT_PROTOCOL_VERSION) { connectAck.returnCode = MqttPacketConnectAck.RETCODE_PROT_VER_ERROR; @@ -229,14 +249,53 @@ public class MqttBroker extends ThreadedTCPNetworkServer { connectAck.returnCode = MqttPacketConnectAck.RETCODE_OK; } + // Is reserved field properly set? should be false + if (conn.flagReserved) { + disconnected = true; + return; + } + + // Handle Session + if (conn.flagCleanSession) { + // TODO: Remove session + connectAck.flagSessionPresent = false; + } else { + // TODO: Restore or create new session + throw new UnsupportedOperationException("Sessions currently not supported."); + } + + // Handle will message + if (conn.flagWillFlag) { + // TODO: Read will message from payload + //willPacket = xxx + //throw new UnsupportedOperationException("Will packet currently not supported."); + } else { + willPacket = null; + } + // TODO: authenticate - // TODO: clean session + if (conn.flagUsername) { + String username; + + if (conn.flagPassword) { + String password; + } + } + + connectionTimeoutTime = conn.keepAlive; + + // ---------------------------------- + // Handling Payload + // ---------------------------------- + + sendPacket(connectAck); } protected void handlePacket(MqttPacketHeader packet) throws IOException { // TODO: QOS 1 // TODO: QOS 2 + // TODO: handle connection timeout switch (packet.type) { case MqttPacketHeader.PACKET_TYPE_PUBLISH: @@ -259,7 +318,10 @@ public class MqttBroker extends ThreadedTCPNetworkServer { // Close connection default: logger.warning("Received unknown packet type: " + packet.type); + sendWillPacket(); + /* FALLTHROUGH */ case MqttPacketHeader.PACKET_TYPE_DISCONNECT: + willPacket = null; disconnected = true; break; } @@ -277,20 +339,20 @@ public class MqttBroker extends ThreadedTCPNetworkServer { MqttPacketSubscribeAck subscribeAckPacket = new MqttPacketSubscribeAck(); subscribeAckPacket.packetId = subscribePacket.packetId; - for (MqttSubscribePayload payload : subscribePacket.payload) { + for (MqttSubscribePayload payload : subscribePacket.payloads) { broker.subscribe(payload.topicFilter, this); // Prepare response MqttSubscribeAckPayload ackPayload = new MqttSubscribeAckPayload(); ackPayload.returnCode = MqttSubscribeAckPayload.RETCODE_SUCESS_MAX_QOS_0; - subscribeAckPacket.payload.add(ackPayload); + subscribeAckPacket.payloads.add(ackPayload); } sendPacket(subscribeAckPacket); } private void handleUnsubscribe(MqttPacketUnsubscribe unsubscribePacket) throws IOException { - for (MqttUnsubscribePayload payload : unsubscribePacket.payload) { + for (MqttUnsubscribePayload payload : unsubscribePacket.payloads) { broker.unsubscribe(payload.topicFilter, this); } @@ -306,13 +368,20 @@ public class MqttBroker extends ThreadedTCPNetworkServer { // Data has been published to a subscribed topic. } + private void sendWillPacket() throws IOException { + if (willPacket != null) { + sendPacket(willPacket); + willPacket = null; + } + } + public synchronized void sendPacket(MqttPacketHeader packet) throws IOException { MqttPacket.write(out, packet); } + public boolean isDisconnected() { return disconnected; } - } } diff --git a/src/zutil/net/mqtt/packet/MqttPacket.java b/src/zutil/net/mqtt/packet/MqttPacket.java index 8ed9315..4f3e175 100755 --- a/src/zutil/net/mqtt/packet/MqttPacket.java +++ b/src/zutil/net/mqtt/packet/MqttPacket.java @@ -49,31 +49,32 @@ public class MqttPacket { // Resolve the correct header class switch (packet.type) { case PACKET_TYPE_CONN: packet = new MqttPacketConnect(); break; - case PACKET_TYPE_CONNACK: packet = new MqttPacketConnectAck(); break; + case PACKET_TYPE_CONNACK: packet = new MqttPacketConnectAck(); break; // no payload case PACKET_TYPE_PUBLISH: packet = new MqttPacketPublish(); break; - case PACKET_TYPE_PUBACK: packet = new MqttPacketPublishAck(); break; - case PACKET_TYPE_PUBREC: packet = new MqttPacketPublishRec(); break; - case PACKET_TYPE_PUBREL: packet = new MqttPacketPublishRec(); break; - case PACKET_TYPE_PUBCOMP: packet = new MqttPacketPublishComp(); break; + case PACKET_TYPE_PUBACK: packet = new MqttPacketPublishAck(); break; // no payload + case PACKET_TYPE_PUBREC: /* FALLTHROUGH */ + case PACKET_TYPE_PUBREL: packet = new MqttPacketPublishRec(); break; // no payload + case PACKET_TYPE_PUBCOMP: packet = new MqttPacketPublishComp(); break; // no payload case PACKET_TYPE_SUBSCRIBE: packet = new MqttPacketSubscribe(); break; case PACKET_TYPE_SUBACK: packet = new MqttPacketSubscribeAck(); break; case PACKET_TYPE_UNSUBSCRIBE: packet = new MqttPacketUnsubscribe(); break; - case PACKET_TYPE_UNSUBACK: packet = new MqttPacketUnsubscribeAck(); break; - case PACKET_TYPE_PINGREQ: packet = new MqttPacketPingReq(); break; - case PACKET_TYPE_PINGRESP: packet = new MqttPacketPingResp(); break; - case PACKET_TYPE_DISCONNECT: packet = new MqttPacketDisconnect(); break; + case PACKET_TYPE_UNSUBACK: packet = new MqttPacketUnsubscribeAck(); break; // no payload + case PACKET_TYPE_PINGREQ: packet = new MqttPacketPingReq(); break; // no payload + case PACKET_TYPE_PINGRESP: packet = new MqttPacketPingResp(); break; // no payload + case PACKET_TYPE_DISCONNECT: packet = new MqttPacketDisconnect(); break; // no payload default: throw new IOException("Unknown header type: " + packet.type); } in.read(packet); // TODO: payload + byte[] payload = new byte[Math.max(0, packet.variableHeaderAndPayloadLength - packet.calculateVariableHeaderLength())]; + in.read(payload); return packet; } public static void write(BinaryStructOutputStream out, MqttPacketHeader header) throws IOException{ - + header.variableHeaderAndPayloadLength = header.calculateVariableHeaderLength() + header.calculatePayloadLength(); out.write(header); - // TODO: payload } } diff --git a/src/zutil/net/mqtt/packet/MqttPacketConnect.java b/src/zutil/net/mqtt/packet/MqttPacketConnect.java index e1ca2a7..a86edac 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketConnect.java +++ b/src/zutil/net/mqtt/packet/MqttPacketConnect.java @@ -24,6 +24,14 @@ package zutil.net.mqtt.packet; +import zutil.parser.binary.BinaryFieldData; +import zutil.parser.binary.BinaryFieldSerializer; +import zutil.parser.binary.serializer.TwoByteLengthPrefixedDataSerializer; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + /** * This packet is the first message sent from a Client when it * has established a connection to a Server. A Client can only @@ -39,7 +47,9 @@ public class MqttPacketConnect extends MqttPacketHeader { type = MqttPacketHeader.PACKET_TYPE_CONN; } + // ------------------------------------------ // Variable header + // ------------------------------------------ @BinaryField(index = 2001, length = 16) private int protocolNameLength = 4; @@ -86,7 +96,7 @@ public class MqttPacketConnect extends MqttPacketHeader { public boolean flagCleanSession; @BinaryField(index = 2016, length = 1) - private boolean reserved; + public boolean flagReserved; /** @@ -101,11 +111,91 @@ public class MqttPacketConnect extends MqttPacketHeader { public int keepAlive; - // Payload: - // - Client identifier - // - Will Topic - // - Will message - // - User name - // - Password + @Override + public int calculateVariableHeaderLength() { + return 10; + } + // ------------------------------------------ + // Payload + // ------------------------------------------ + + @CustomBinaryField(index = 3000, serializer = MqttPacketConnectPayloadSerializer.class) + public String clientIdentifier; + + @CustomBinaryField(index = 3001, serializer = MqttPacketConnectPayloadSerializer.class) + public String willTopic; + + @CustomBinaryField(index = 3002, serializer = MqttPacketConnectPayloadSerializer.class) + public byte[] willPayload; + + @CustomBinaryField(index = 3003, serializer = MqttPacketConnectPayloadSerializer.class) + public String username; + + @CustomBinaryField(index = 3004, serializer = MqttPacketConnectPayloadSerializer.class) + public String password; + + + @Override + public int calculatePayloadLength() { + int length = 0; + + // Each String and byte[] is prefixed with a 2 byte length value in the payload + + if (!flagCleanSession) + length += 2 + clientIdentifier.length(); + if (flagWillFlag) + length += 2 + willTopic.length() + 2 + willPayload.length; + if (flagUsername) + length += 2 + username.length(); + if (flagPassword) + length += 2 + password.length(); + + return length; + } + + // ------------------------------------------ + // Utilities + // ------------------------------------------ + + protected static class MqttPacketConnectPayloadSerializer implements BinaryFieldSerializer { + + @Override + public Object read(InputStream in, BinaryFieldData field, Object parentObject) throws IOException { + MqttPacketConnect packet = (MqttPacketConnect) parentObject; + TwoByteLengthPrefixedDataSerializer serializer = new TwoByteLengthPrefixedDataSerializer(); + + if ("clientIdentifier".equals(field.getName()) && !packet.flagCleanSession || + "willTopic".equals(field.getName()) && packet.flagWillFlag || + "willPayload".equals(field.getName()) && packet.flagWillFlag || + "username".equals(field.getName()) && packet.flagUsername || + "password".equals(field.getName()) && packet.flagPassword) { + return serializer.read(in, field, parentObject); + } + + return null; + } + + @Override + public void write(OutputStream out, Object obj, BinaryFieldData field, Object parentObject) throws IOException { + MqttPacketConnect packet = (MqttPacketConnect) parentObject; + TwoByteLengthPrefixedDataSerializer serializer = new TwoByteLengthPrefixedDataSerializer(); + + if ("clientIdentifier".equals(field.getName()) && !packet.flagCleanSession || + "willTopic".equals(field.getName()) && packet.flagWillFlag || + "willPayload".equals(field.getName()) && packet.flagWillFlag || + "username".equals(field.getName()) && packet.flagUsername || + "password".equals(field.getName()) && packet.flagPassword) { + serializer.write(out, obj, field, parentObject); + } + } + + + @Override + public Object read(InputStream in, BinaryFieldData field) throws IOException { + return null; + } + @Override + public void write(OutputStream out, Object obj, BinaryFieldData field) throws IOException {} + } } diff --git a/src/zutil/net/mqtt/packet/MqttPacketConnectAck.java b/src/zutil/net/mqtt/packet/MqttPacketConnectAck.java index 0132c95..687768a 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketConnectAck.java +++ b/src/zutil/net/mqtt/packet/MqttPacketConnectAck.java @@ -30,7 +30,7 @@ package zutil.net.mqtt.packet; * * @see MQTT v3.1.1 Spec */ -public class MqttPacketConnectAck extends MqttPacketHeader{ +public class MqttPacketConnectAck extends MqttPacketHeader { public static final int RETCODE_OK = 0; public static final int RETCODE_PROT_VER_ERROR = 1; public static final int RETCODE_IDENTIFIER_REJECT = 2; @@ -44,10 +44,12 @@ public class MqttPacketConnectAck extends MqttPacketHeader{ type = MqttPacketHeader.PACKET_TYPE_CONNACK; } + // ------------------------------------------ // Variable header + // ------------------------------------------ @BinaryField(index = 2000, length = 7) - private int flagReserved; + private int flagReserved = 0; /** Indicates that there is a valid Session available */ @BinaryField(index = 2001, length = 1) public boolean flagSessionPresent; @@ -55,6 +57,12 @@ public class MqttPacketConnectAck extends MqttPacketHeader{ @BinaryField(index = 2002, length = 8) public int returnCode; + + @Override + public int calculateVariableHeaderLength() { + return 2; + } + // No payload } diff --git a/src/zutil/net/mqtt/packet/MqttPacketDisconnect.java b/src/zutil/net/mqtt/packet/MqttPacketDisconnect.java index 90f29b1..96c26f6 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketDisconnect.java +++ b/src/zutil/net/mqtt/packet/MqttPacketDisconnect.java @@ -31,7 +31,7 @@ package zutil.net.mqtt.packet; * * @see MQTT v3.1.1 Spec */ -public class MqttPacketDisconnect extends MqttPacketHeader{ +public class MqttPacketDisconnect extends MqttPacketHeader { // Header diff --git a/src/zutil/net/mqtt/packet/MqttPacketHeader.java b/src/zutil/net/mqtt/packet/MqttPacketHeader.java index 4afb140..6841d40 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketHeader.java +++ b/src/zutil/net/mqtt/packet/MqttPacketHeader.java @@ -26,6 +26,10 @@ package zutil.net.mqtt.packet; import zutil.parser.binary.BinaryStruct; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + /** * */ @@ -57,6 +61,27 @@ public class MqttPacketHeader implements BinaryStruct { public byte flags; @CustomBinaryField(index = 3, serializer = MqttVariableIntSerializer.class) - public int payloadLength; + public int variableHeaderAndPayloadLength; + // ------------------------------------------ + // Variable Header + // ------------------------------------------ + + /** + * @return the calculated length of the variable MQTT header in bytes + */ + public int calculateVariableHeaderLength() { + return 0; + } + + // ------------------------------------------ + // Payload + // ------------------------------------------ + + /** + * @return the calculated length of assigned payload in bytes + */ + public int calculatePayloadLength() { + return 0; + } } diff --git a/src/zutil/net/mqtt/packet/MqttPacketPingReq.java b/src/zutil/net/mqtt/packet/MqttPacketPingReq.java index 415ef9b..30a4fd9 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPingReq.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPingReq.java @@ -29,7 +29,7 @@ package zutil.net.mqtt.packet; * * @see MQTT v3.1.1 Spec */ -public class MqttPacketPingReq extends MqttPacketHeader{ +public class MqttPacketPingReq extends MqttPacketHeader { // Header diff --git a/src/zutil/net/mqtt/packet/MqttPacketPingResp.java b/src/zutil/net/mqtt/packet/MqttPacketPingResp.java index 9a4ecfd..0c7832f 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPingResp.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPingResp.java @@ -31,7 +31,7 @@ package zutil.net.mqtt.packet; * * @see MQTT v3.1.1 Spec */ -public class MqttPacketPingResp extends MqttPacketHeader{ +public class MqttPacketPingResp extends MqttPacketHeader { // Header diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublish.java b/src/zutil/net/mqtt/packet/MqttPacketPublish.java index 0dffb77..ca9b2df 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublish.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublish.java @@ -26,6 +26,13 @@ package zutil.net.mqtt.packet; import zutil.ByteUtil; +import zutil.parser.binary.BinaryFieldData; +import zutil.parser.binary.BinaryFieldSerializer; +import zutil.parser.binary.serializer.TwoByteLengthPrefixedDataSerializer; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; /** * A PUBLISH Control Packet is sent from a Client to a Server @@ -41,42 +48,83 @@ public class MqttPacketPublish extends MqttPacketHeader { } - private byte flagDupBitmask = ByteUtil.getBitMask(3, 1); - private byte flagQoSBitmask = ByteUtil.getBitMask(1, 2); - private byte flagRetainBitmask = ByteUtil.getBitMask(0, 1); + private static final byte FLAG_DUP_BITMASK = ByteUtil.getBitMask(3, 1); + private static final byte FLAG_QOS_BITMASK = ByteUtil.getBitMask(1, 2); + private static final byte FLAG_RETAIN_BITMASK = ByteUtil.getBitMask(0, 1); + // ------------------------------------------ // Variable Header + // ------------------------------------------ - @BinaryField(index = 2001, length = 16) - private int topicNameLength; + //@BinaryField(index = 2001, length = 16) + //private int topicNameLength; /** * The Topic Name identifies the information channel to which controlHeader data is published. */ - @VariableLengthBinaryField(index = 2102, lengthField = "topicNameLength") + @CustomBinaryField(index = 2002, serializer = TwoByteLengthPrefixedDataSerializer.class) public String topicName; - @BinaryField(index = 2002, length = 16) + @BinaryField(index = 2003, length = 16) public int packetId; + @Override + public int calculateVariableHeaderLength() { + return 4 + (topicName != null ? topicName.length() : 0); + } + + // ------------------------------------------ // Payload + // ------------------------------------------ // - Application data - @BinaryField(index = 3001, length = 100000) + @CustomBinaryField(index = 3000, serializer = MqttPacketPublishPayloadSerializer.class) public byte[] payload; + @Override + public int calculatePayloadLength() { + return payload == null ? 0 : payload.length; + } + + // ------------------------------------------ // Util methods + // ------------------------------------------ public boolean getFlagDup() { - return (flags & flagDupBitmask) != 0; + return (flags & FLAG_DUP_BITMASK) != 0; } public byte getFlagQoS() { - return (byte) ((flags & flagQoSBitmask) >> 1); + return (byte) ((flags & FLAG_QOS_BITMASK) >> 1); } public boolean getFlagRetain() { - return (flags & flagRetainBitmask) != 0; + return (flags & FLAG_RETAIN_BITMASK) != 0; + } + + + private static class MqttPacketPublishPayloadSerializer implements BinaryFieldSerializer { + + @Override + public byte[] read(InputStream in, BinaryFieldData field) throws IOException { + return new byte[0]; + } + + @Override + public byte[] read(InputStream in, BinaryFieldData field, Object parentObject) throws IOException { + MqttPacketPublish publishPacket = (MqttPacketPublish) parentObject; + int payloadLength = Math.max(0, publishPacket.variableHeaderAndPayloadLength - publishPacket.calculateVariableHeaderLength()); + + byte[] payload = new byte[payloadLength]; + in.read(payload); + return payload; + } + + @Override + public void write(OutputStream out, byte[] obj, BinaryFieldData field) throws IOException { + if (obj != null) + out.write(obj); + } } } diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublishAck.java b/src/zutil/net/mqtt/packet/MqttPacketPublishAck.java index 7ff2de7..ddb9e16 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublishAck.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublishAck.java @@ -30,7 +30,7 @@ package zutil.net.mqtt.packet; * * @see MQTT v3.1.1 Spec */ -public class MqttPacketPublishAck extends MqttPacketHeader{ +public class MqttPacketPublishAck extends MqttPacketHeader { // Header @@ -38,11 +38,19 @@ public class MqttPacketPublishAck extends MqttPacketHeader{ type = MqttPacketHeader.PACKET_TYPE_PUBACK; } + // ------------------------------------------ // Variable Header + // ------------------------------------------ @BinaryField(index = 2000, length = 16) public int packetId; + + @Override + public int calculateVariableHeaderLength() { + return 2; + } + // No payload } diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublishComp.java b/src/zutil/net/mqtt/packet/MqttPacketPublishComp.java index 9ba40c0..3374f22 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublishComp.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublishComp.java @@ -32,7 +32,7 @@ package zutil.net.mqtt.packet; * * @see MQTT v3.1.1 Spec */ -public class MqttPacketPublishComp extends MqttPacketHeader{ +public class MqttPacketPublishComp extends MqttPacketHeader { // Header @@ -40,11 +40,19 @@ public class MqttPacketPublishComp extends MqttPacketHeader{ type = MqttPacketHeader.PACKET_TYPE_PUBCOMP; } + // ------------------------------------------ // Variable Header + // ------------------------------------------ @BinaryField(index = 2000, length = 16) public int packetId; + + @Override + public int calculateVariableHeaderLength() { + return 2; + } + // No payload } diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublishRec.java b/src/zutil/net/mqtt/packet/MqttPacketPublishRec.java index 1cb8415..1d4415d 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublishRec.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublishRec.java @@ -32,7 +32,7 @@ package zutil.net.mqtt.packet; * * @see MQTT v3.1.1 Spec */ -public class MqttPacketPublishRec extends MqttPacketHeader{ +public class MqttPacketPublishRec extends MqttPacketHeader { // Header @@ -40,11 +40,19 @@ public class MqttPacketPublishRec extends MqttPacketHeader{ type = MqttPacketHeader.PACKET_TYPE_PUBREC; } + // ------------------------------------------ // Variable Header + // ------------------------------------------ @BinaryField(index = 2000, length = 16) public int packetId; + + @Override + public int calculateVariableHeaderLength() { + return 2; + } + // No payload } diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublishRel.java b/src/zutil/net/mqtt/packet/MqttPacketPublishRel.java index b38ae4e..ff934e9 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublishRel.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublishRel.java @@ -40,7 +40,9 @@ public class MqttPacketPublishRel extends MqttPacketHeader{ type = MqttPacketHeader.PACKET_TYPE_PUBREL; } + // ------------------------------------------ // Variable Header + // ------------------------------------------ @BinaryField(index = 2000, length = 16) public int packetId; diff --git a/src/zutil/net/mqtt/packet/MqttPacketSubscribe.java b/src/zutil/net/mqtt/packet/MqttPacketSubscribe.java index fdf84eb..9a41780 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketSubscribe.java +++ b/src/zutil/net/mqtt/packet/MqttPacketSubscribe.java @@ -24,9 +24,11 @@ package zutil.net.mqtt.packet; -import zutil.parser.binary.BinaryStruct; +import zutil.parser.binary.*; +import zutil.parser.binary.serializer.BinaryStructListSerializer; +import zutil.parser.binary.serializer.TwoByteLengthPrefixedDataSerializer; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; /** @@ -34,7 +36,7 @@ import java.util.List; * * @see MQTT v3.1.1 Spec */ -public class MqttPacketSubscribe extends MqttPacketHeader{ +public class MqttPacketSubscribe extends MqttPacketHeader { // Header @@ -42,28 +44,66 @@ public class MqttPacketSubscribe extends MqttPacketHeader{ type = MqttPacketHeader.PACKET_TYPE_SUBSCRIBE; } + // ------------------------------------------ // Variable Header + // ------------------------------------------ @BinaryField(index = 2000, length = 16) public int packetId; + + @Override + public int calculateVariableHeaderLength() { + return 2; + } + + // ------------------------------------------ // Payload + // ------------------------------------------ - public List payload = new LinkedList<>(); + @CustomBinaryField(index = 3000, serializer = MqttSubscribePayloadSerializer.class) + public List payloads = new ArrayList<>(); + @Override + public int calculatePayloadLength() { + int length = 0; + for (MqttSubscribePayload p : payloads) { + length += p.calculatePayloadLength(); + } + return length; + } - public static class MqttSubscribePayload implements BinaryStruct{ - - @BinaryField(index = 3001, length = 16) - private int topicFilterLength; - /** A filter indicating the Topic to which the Client wants to subscribe to*/ - @VariableLengthBinaryField(index = 3002, lengthField = "topicFilterLength") + public static class MqttSubscribePayload implements BinaryStruct { + //@BinaryField(index = 3001, length = 16) + //private int topicFilterLength; + /** A filter indicating the Topic to which the Client wants to subscribe to **/ + @CustomBinaryField(index = 3002, serializer = TwoByteLengthPrefixedDataSerializer.class) public String topicFilter; @BinaryField(index = 3003, length = 6) private int reserved; + /** the maximum QoS level at which the Server can send Application Messages to the Client **/ @BinaryField(index = 3004, length = 2) - private int qos; + public int qos; + + + protected int calculatePayloadLength() { + return 2 + (topicFilter != null ? topicFilter.length() : 0) + 1; + } + } + + + private static class MqttSubscribePayloadSerializer extends BinaryStructListSerializer { + + protected MqttSubscribePayloadSerializer() { + super(MqttSubscribePayload.class); + } + + @Override + protected boolean readNext(int objIndex, int bytesRead, BinaryFieldData field, Object parentObject) { + MqttPacketSubscribe packetSubscribe = ((MqttPacketSubscribe) parentObject); + return bytesRead < packetSubscribe.variableHeaderAndPayloadLength - packetSubscribe.calculateVariableHeaderLength(); + } } } diff --git a/src/zutil/net/mqtt/packet/MqttPacketSubscribeAck.java b/src/zutil/net/mqtt/packet/MqttPacketSubscribeAck.java index b00d1f9..1c86531 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketSubscribeAck.java +++ b/src/zutil/net/mqtt/packet/MqttPacketSubscribeAck.java @@ -24,8 +24,12 @@ package zutil.net.mqtt.packet; +import zutil.parser.binary.BinaryFieldData; import zutil.parser.binary.BinaryStruct; +import zutil.parser.binary.serializer.BinaryStructListSerializer; +import java.io.IOException; +import java.io.OutputStream; import java.util.LinkedList; import java.util.List; @@ -35,7 +39,7 @@ import java.util.List; * * @see MQTT v3.1.1 Spec */ -public class MqttPacketSubscribeAck extends MqttPacketHeader{ +public class MqttPacketSubscribeAck extends MqttPacketHeader { // Header @@ -43,16 +47,35 @@ public class MqttPacketSubscribeAck extends MqttPacketHeader{ type = MqttPacketHeader.PACKET_TYPE_SUBACK; } + // ------------------------------------------ // Variable Header + // ------------------------------------------ @BinaryField(index = 2000, length = 16) public int packetId; + + @Override + public int calculateVariableHeaderLength() { + return 2; + } + + // ------------------------------------------ // Payload + // ------------------------------------------ - public List payload = new LinkedList<>(); + @CustomBinaryField(index = 3000, serializer = MqttSubscribeAckPayloadSerializer.class) + public List payloads = new LinkedList<>(); + @Override + public int calculatePayloadLength() { + int length = 0; + for (MqttSubscribeAckPayload p : payloads) { + length += p.calculatePayloadLength(); + } + return length; + } public static class MqttSubscribeAckPayload implements BinaryStruct{ public static final int RETCODE_SUCESS_MAX_QOS_0 = 0; @@ -62,5 +85,24 @@ public class MqttPacketSubscribeAck extends MqttPacketHeader{ @BinaryField(index = 3001, length = 8) public int returnCode; + + + protected int calculatePayloadLength() { + return 1; + } + } + + + private static class MqttSubscribeAckPayloadSerializer extends BinaryStructListSerializer { + + protected MqttSubscribeAckPayloadSerializer() { + super(MqttSubscribeAckPayload.class); + } + + @Override + protected boolean readNext(int objIndex, int bytesRead, BinaryFieldData field, Object parentObject) { + MqttPacketSubscribeAck packetSubscribe = ((MqttPacketSubscribeAck) parentObject); + return bytesRead < packetSubscribe.variableHeaderAndPayloadLength - packetSubscribe.calculateVariableHeaderLength(); + } } } diff --git a/src/zutil/net/mqtt/packet/MqttPacketUnsubscribe.java b/src/zutil/net/mqtt/packet/MqttPacketUnsubscribe.java index c850d5d..8cb0efb 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketUnsubscribe.java +++ b/src/zutil/net/mqtt/packet/MqttPacketUnsubscribe.java @@ -24,7 +24,10 @@ package zutil.net.mqtt.packet; +import zutil.parser.binary.BinaryFieldData; import zutil.parser.binary.BinaryStruct; +import zutil.parser.binary.serializer.BinaryStructListSerializer; +import zutil.parser.binary.serializer.TwoByteLengthPrefixedDataSerializer; import java.util.LinkedList; import java.util.List; @@ -34,7 +37,7 @@ import java.util.List; * * @see MQTT v3.1.1 Spec */ -public class MqttPacketUnsubscribe extends MqttPacketHeader{ +public class MqttPacketUnsubscribe extends MqttPacketHeader { // Header @@ -42,24 +45,59 @@ public class MqttPacketUnsubscribe extends MqttPacketHeader{ type = MqttPacketHeader.PACKET_TYPE_UNSUBSCRIBE; } + // ------------------------------------------ // Variable Header + // ------------------------------------------ @BinaryField(index = 2000, length = 16) public int packetId; + + @Override + public int calculateVariableHeaderLength() { + return 2; + } + + // ------------------------------------------ // Payload + // ------------------------------------------ - public List payload = new LinkedList<>(); + @CustomBinaryField(index = 3000, serializer = MqttSubscribePayloadSerializer.class) + public List payloads = new LinkedList<>(); + @Override + public int calculatePayloadLength() { + int length = 0; + for (MqttUnsubscribePayload p : payloads) { + length += p.calculatePayloadLength(); + } + return length; + } public static class MqttUnsubscribePayload implements BinaryStruct{ - - @BinaryField(index = 3001, length = 16) - private int topicFilterLength; + //@BinaryField(index = 3001, length = 16) + //private int topicFilterLength; /** A filter indicating the Topic to which the Client wants to subscribe to*/ - @VariableLengthBinaryField(index = 3002, lengthField = "topicFilterLength") + @CustomBinaryField(index = 3002, serializer = TwoByteLengthPrefixedDataSerializer.class) public String topicFilter; + + protected int calculatePayloadLength() { + return 2 + (topicFilter != null ? topicFilter.length() : 0); + } + } + + private static class MqttSubscribePayloadSerializer extends BinaryStructListSerializer { + + protected MqttSubscribePayloadSerializer() { + super(MqttUnsubscribePayload.class); + } + + @Override + protected boolean readNext(int objIndex, int bytesRead, BinaryFieldData field, Object parentObject) { + MqttPacketUnsubscribe packetSubscribe = ((MqttPacketUnsubscribe) parentObject); + return bytesRead < packetSubscribe.variableHeaderAndPayloadLength - packetSubscribe.calculateVariableHeaderLength(); + } } } diff --git a/src/zutil/net/mqtt/packet/MqttPacketUnsubscribeAck.java b/src/zutil/net/mqtt/packet/MqttPacketUnsubscribeAck.java index 5852520..512076d 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketUnsubscribeAck.java +++ b/src/zutil/net/mqtt/packet/MqttPacketUnsubscribeAck.java @@ -31,7 +31,7 @@ package zutil.net.mqtt.packet; * * @see MQTT v3.1.1 Spec */ -public class MqttPacketUnsubscribeAck extends MqttPacketHeader{ +public class MqttPacketUnsubscribeAck extends MqttPacketHeader { // Header @@ -39,11 +39,19 @@ public class MqttPacketUnsubscribeAck extends MqttPacketHeader{ type = MqttPacketHeader.PACKET_TYPE_UNSUBACK; } + // ------------------------------------------ // Variable Header + // ------------------------------------------ @BinaryField(index = 2000, length = 16) public int packetId; + + @Override + public int calculateVariableHeaderLength() { + return 2; + } + // No payload } diff --git a/src/zutil/net/mqtt/packet/MqttVariableIntSerializer.java b/src/zutil/net/mqtt/packet/MqttVariableIntSerializer.java index 0c17863..10bd06f 100755 --- a/src/zutil/net/mqtt/packet/MqttVariableIntSerializer.java +++ b/src/zutil/net/mqtt/packet/MqttVariableIntSerializer.java @@ -34,7 +34,7 @@ import java.io.OutputStream; /** * Code from MQTT specification */ -public class MqttVariableIntSerializer implements BinaryFieldSerializer{ +public class MqttVariableIntSerializer implements BinaryFieldSerializer { @Override public Integer read(InputStream in, BinaryFieldData field) throws IOException { diff --git a/src/zutil/parser/binary/BinaryStructInputStream.java b/src/zutil/parser/binary/BinaryStructInputStream.java index 31a2890..3961265 100755 --- a/src/zutil/parser/binary/BinaryStructInputStream.java +++ b/src/zutil/parser/binary/BinaryStructInputStream.java @@ -25,13 +25,12 @@ package zutil.parser.binary; import zutil.ByteUtil; +import zutil.io.PositionalInputStream; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; -import java.util.HashMap; import java.util.List; -import java.util.Map; /** * A stream class that parses a byte stream into binary struct objects. @@ -79,13 +78,13 @@ public class BinaryStructInputStream extends InputStream{ */ public int read(BinaryStruct struct) throws IOException { List structDataList = BinaryFieldData.getStructFieldList(struct.getClass()); + PositionalInputStream positionalInputStream = new PositionalInputStream(in); - int totalReadLength = 0; for (BinaryFieldData field : structDataList) { if (field.hasSerializer()) { - BinaryFieldSerializer serializer = field.getSerializer(); + BinaryFieldSerializer serializer = field.getSerializer(); - Object value = serializer.read(in, field, struct); + Object value = serializer.read(positionalInputStream, field, struct); field.setValue(struct, value); } else { byte[] valueData = new byte[(int) Math.ceil(field.getBitLength(struct) / 8.0)]; @@ -95,7 +94,7 @@ public class BinaryStructInputStream extends InputStream{ // Parse value for (int valueDataIndex=valueData.length-1; valueDataIndex >= 0; --valueDataIndex) { if (dataBitIndex < 0) { // Read new data? - data = (byte) in.read(); + data = (byte) positionalInputStream.read(); dataBitIndex = 7; } int subBitLength = Math.min(dataBitIndex + 1, field.getBitLength(struct) - fieldReadLength); @@ -106,11 +105,10 @@ public class BinaryStructInputStream extends InputStream{ // Set value ByteUtil.shiftLeft(valueData, shiftBy); // shift data so that LSB is at the beginning field.setByteValue(struct, valueData); - totalReadLength += fieldReadLength; } } - return totalReadLength; + return (int) positionalInputStream.getPosition(); } @Override @@ -123,6 +121,11 @@ public class BinaryStructInputStream extends InputStream{ return in.read(b, off, len); } + @Override + public int available() throws IOException { + return in.available(); + } + /** * @see InputStream#markSupported() */ @@ -147,7 +150,6 @@ public class BinaryStructInputStream extends InputStream{ } - protected static int shiftLeftBy(int bitIndex, int bitLength) { return (8 - ((7-bitIndex) + bitLength) % 8) % 8; } diff --git a/src/zutil/parser/binary/serializer/BinaryStructListSerializer.java b/src/zutil/parser/binary/serializer/BinaryStructListSerializer.java new file mode 100644 index 0000000..4de0696 --- /dev/null +++ b/src/zutil/parser/binary/serializer/BinaryStructListSerializer.java @@ -0,0 +1,73 @@ +package zutil.parser.binary.serializer; + +import zutil.parser.binary.*; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + + +/** + * This serializer handles a List field that contains the same type of objects. + * This class needs to be extended by a more specific subclass that can provide + * the list class type and handling the flow control of the read action. + * + * @param + */ +public abstract class BinaryStructListSerializer implements BinaryFieldSerializer> { + + private Class listClass; + + + protected BinaryStructListSerializer(Class clazz) { + listClass = clazz; + } + + + @Override + public List read(InputStream in, BinaryFieldData field) throws IOException { + return null; + } + + @Override + public List read(InputStream in, BinaryFieldData field, Object parentObject) throws IOException { + BinaryStructInputStream structIn = new BinaryStructInputStream(in); + + List list = new ArrayList<>(); + try { + int bytesRead = 0; + while (readNext(list.size(), bytesRead, field, parentObject)) { + T obj = listClass.getDeclaredConstructor().newInstance(); + bytesRead += structIn.read(obj); + list.add(obj); + } + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + return list; + } + + /** + * Method is used to determine if the next object should be read from the stream. + * + * @param objIndex the number of objects that have been read so far. + * @param bytesRead number of bytes that have been read from the stream so far. + * @param field meta-data about the target field that will be assigned. + * @param parentObject the parent object that owns the field. + * @return true if another object can be read from the stream, false if this is the end of the struct field. + */ + protected abstract boolean readNext(int objIndex, int bytesRead, BinaryFieldData field, Object parentObject); + + @Override + public void write(OutputStream out, List list, BinaryFieldData field) throws IOException { + BinaryStructOutputStream structOut = new BinaryStructOutputStream(out); + + for (T obj : list) { + structOut.write(obj); + } + } +} diff --git a/src/zutil/parser/binary/serializer/TwoByteLengthPrefixedDataSerializer.java b/src/zutil/parser/binary/serializer/TwoByteLengthPrefixedDataSerializer.java index 138cddf..8a54718 100644 --- a/src/zutil/parser/binary/serializer/TwoByteLengthPrefixedDataSerializer.java +++ b/src/zutil/parser/binary/serializer/TwoByteLengthPrefixedDataSerializer.java @@ -24,6 +24,8 @@ package zutil.parser.binary.serializer; +import zutil.ByteUtil; +import zutil.converter.Converter; import zutil.parser.binary.BinaryFieldData; import zutil.parser.binary.BinaryFieldSerializer; @@ -34,7 +36,7 @@ import java.io.StreamCorruptedException; import java.nio.charset.StandardCharsets; /** - * Serializer handles data that is prefixed by two byte length. + * Serializer handles data that is prefixed by two byte length. Null objects will be prefixed by two zero bytes indicating length 0. *

* Currently only these types are supported: *

    @@ -46,15 +48,16 @@ public class TwoByteLengthPrefixedDataSerializer implements BinaryFieldSerialize @Override public Object read(InputStream in, BinaryFieldData field) throws IOException { - int b = in.read(); - if (b < 0) + int b1, b2; + if ((b1 = in.read()) < 0) throw new StreamCorruptedException("Stream ended prematurely when reading first length byte."); - int length = (b & 0xFF) << 8; - - b = in.read(); - if (b < 0) + if ((b2 = in.read()) < 0) throw new StreamCorruptedException("Stream ended prematurely when reading second length byte."); - length |= b & 0xFF; + + int length = Converter.toInt(new byte[]{ + (byte) (0XFF & b2), + (byte) (0xFF & b1) + }); byte[] payload = new byte[length]; in.read(payload); @@ -66,14 +69,19 @@ public class TwoByteLengthPrefixedDataSerializer implements BinaryFieldSerialize @Override public void write(OutputStream out, Object obj, BinaryFieldData field) throws IOException { - if (obj == null) + if (obj == null) { + out.write(0); + out.write(0); return; + } byte[] payload; if (obj instanceof String) payload = ((String) obj).getBytes(StandardCharsets.UTF_8); - else + else if (obj instanceof byte[]) payload = (byte[]) obj; + else + throw new UnsupportedOperationException("Class type not supported for serialization: " + obj.getClass().getSimpleName()); int length = payload.length; diff --git a/test/zutil/net/mqtt/MqttBrokerTest.java b/test/zutil/net/mqtt/MqttBrokerTest.java index b37253f..90b6963 100644 --- a/test/zutil/net/mqtt/MqttBrokerTest.java +++ b/test/zutil/net/mqtt/MqttBrokerTest.java @@ -66,7 +66,7 @@ public class MqttBrokerTest { MqttPacketHeader responsePacket = thread.sentPackets.poll(); assertEquals(MqttPacketSubscribeAck.class, responsePacket.getClass()); assertEquals(subscribePacket.packetId, ((MqttPacketSubscribeAck)responsePacket).packetId); - assertEquals(subscribePacket.payload.size(), ((MqttPacketSubscribeAck)responsePacket).payload.size()); + assertEquals(subscribePacket.payloads.size(), ((MqttPacketSubscribeAck)responsePacket).payloads.size()); } @Test @@ -76,10 +76,10 @@ public class MqttBrokerTest { MqttPacketSubscribe subscribePacket = new MqttPacketSubscribe(); subscribePacket.packetId = (int)(Math.random()*1000); - subscribePacket.payload.add(new MqttSubscribePayload()); - subscribePacket.payload.get(0).topicFilter = "topic1"; - subscribePacket.payload.add(new MqttSubscribePayload()); - subscribePacket.payload.get(1).topicFilter = "topic2"; + subscribePacket.payloads.add(new MqttSubscribePayload()); + subscribePacket.payloads.get(0).topicFilter = "topic1"; + subscribePacket.payloads.add(new MqttSubscribePayload()); + subscribePacket.payloads.get(1).topicFilter = "topic2"; thread.handlePacket(subscribePacket); @@ -87,7 +87,7 @@ public class MqttBrokerTest { MqttPacketHeader responsePacket = thread.sentPackets.poll(); assertEquals(MqttPacketSubscribeAck.class, responsePacket.getClass()); assertEquals(subscribePacket.packetId, ((MqttPacketSubscribeAck)responsePacket).packetId); - assertEquals(subscribePacket.payload.size(), ((MqttPacketSubscribeAck)responsePacket).payload.size()); + assertEquals(subscribePacket.payloads.size(), ((MqttPacketSubscribeAck)responsePacket).payloads.size()); // Check broker assertEquals(1, broker.getSubscriberCount("topic1")); assertEquals(1, broker.getSubscriberCount("topic2")); @@ -95,9 +95,9 @@ public class MqttBrokerTest { //************************ Duplicate subscribe packet subscribePacket.packetId = (int)(Math.random()*1000); - subscribePacket.payload.clear(); - subscribePacket.payload.add(new MqttSubscribePayload()); - subscribePacket.payload.get(0).topicFilter = "topic1"; + subscribePacket.payloads.clear(); + subscribePacket.payloads.add(new MqttSubscribePayload()); + subscribePacket.payloads.get(0).topicFilter = "topic1"; thread.handlePacket(subscribePacket); @@ -166,8 +166,8 @@ public class MqttBrokerTest { MqttPacketUnsubscribe unsubscribePacket = new MqttPacketUnsubscribe(); unsubscribePacket.packetId = (int)(Math.random()*1000); - unsubscribePacket.payload.add(new MqttUnsubscribePayload()); - unsubscribePacket.payload.get(0).topicFilter = "topic1"; + unsubscribePacket.payloads.add(new MqttUnsubscribePayload()); + unsubscribePacket.payloads.get(0).topicFilter = "topic1"; thread.handlePacket(unsubscribePacket); @@ -183,10 +183,10 @@ public class MqttBrokerTest { MqttPacketSubscribe subscribePacket = new MqttPacketSubscribe(); subscribePacket.packetId = (int)(Math.random()*1000); - subscribePacket.payload.add(new MqttSubscribePayload()); - subscribePacket.payload.get(0).topicFilter = "topic1"; - subscribePacket.payload.add(new MqttSubscribePayload()); - subscribePacket.payload.get(1).topicFilter = "topic2"; + subscribePacket.payloads.add(new MqttSubscribePayload()); + subscribePacket.payloads.get(0).topicFilter = "topic1"; + subscribePacket.payloads.add(new MqttSubscribePayload()); + subscribePacket.payloads.get(1).topicFilter = "topic2"; thread.handlePacket(subscribePacket); diff --git a/test/zutil/net/mqtt/packet/MqttPacketConnectTest.java b/test/zutil/net/mqtt/packet/MqttPacketConnectTest.java index bd5c5c7..c1a7823 100755 --- a/test/zutil/net/mqtt/packet/MqttPacketConnectTest.java +++ b/test/zutil/net/mqtt/packet/MqttPacketConnectTest.java @@ -38,33 +38,81 @@ import static org.junit.Assert.*; public class MqttPacketConnectTest { - char[] data = new char[]{ - // Fixed Header - 0b0001_0000, // Packet Type + Reserved - 0b0000_1010, // Variable Header + Payload Length - // Variable Header - 0b0000_0000, // length - 0b0000_0100, // length - 0b0100_1101, // 'M' - 0b0101_0001, // 'Q' - 0b0101_0100, // 'T' - 0b0101_0100, // 'T' - - 0b0000_0100, // Prot. Level - - 0b1100_1110, // Flags - - 0b0000_0000, // Keep alive - 0b0000_1010, // Keep alive - }; - - @Test public void decode() throws IOException { - MqttPacketConnect obj = (MqttPacketConnect)MqttPacket.read( - new BinaryStructInputStream( - new ByteArrayInputStream( - Converter.toBytes(data)))); + char[] data = new char[]{ + // Fixed Header + 0b0001_0000, // Packet Type(4) + Reserved(4) + 0b0000_1010, // Variable Header + Payload Length + // Variable Header + 0b0000_0000, // length + 0b0000_0100, // length + 0b0100_1101, // 'M' + 0b0101_0001, // 'Q' + 0b0101_0100, // 'T' + 0b0101_0100, // 'T' + + 0b0000_0100, // Prot. Level + + 0b0101_1010, // Flags + + 0b0000_0000, // Keep alive + 0b0000_1010, // Keep alive + // Payload + 0x00, 0x01, '5', // password + }; + + MqttPacketConnect obj = (MqttPacketConnect) MqttPacket.read( + new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data)))); + + assertEquals("MQTT", obj.protocolName); + assertEquals(4, obj.protocolLevel); + assertEquals(10, obj.keepAlive); + + assertFalse(obj.flagUsername); + assertTrue(obj.flagPassword); + assertFalse(obj.flagWillRetain); + assertEquals(3, obj.flagWillQoS); + assertFalse(obj.flagWillFlag); + assertTrue(obj.flagCleanSession); + + assertNull(obj.clientIdentifier); + assertNull(obj.willTopic); + assertNull(null, obj.willPayload); + assertNull(obj.username); + assertEquals("5", obj.password); + } + + @Test + public void decodePayload() throws IOException { + char[] data = new char[]{ + // Fixed Header + 0b0001_0000, // Packet Type + Reserved + 0b0000_1010, // Variable Header + Payload Length + // Variable Header + 0b0000_0000, // length + 0b0000_0100, // length + 0b0100_1101, // 'M' + 0b0101_0001, // 'Q' + 0b0101_0100, // 'T' + 0b0101_0100, // 'T' + + 0b0000_0100, // Prot. Level + + 0b1100_1100, // Flags + + 0b0000_0000, // Keep alive + 0b0000_1010, // Keep alive + // Payload + 0x00, 0x01, '1', // Client Identifier + 0x00, 0x01, '2', // Will Topic + 0x00, 0x01, 0x03, // Will payload: 3 + 0x00, 0x01, '4', // Username + 0x00, 0x01, '5', // password + }; + + MqttPacketConnect obj = (MqttPacketConnect) MqttPacket.read( + new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data)))); assertEquals("MQTT", obj.protocolName); assertEquals(4, obj.protocolLevel); @@ -75,20 +123,47 @@ public class MqttPacketConnectTest { assertFalse(obj.flagWillRetain); assertEquals(1, obj.flagWillQoS); assertTrue(obj.flagWillFlag); - assertTrue(obj.flagCleanSession); + assertFalse(obj.flagCleanSession); + + assertEquals("1", obj.clientIdentifier); + assertEquals("2", obj.willTopic); + assertArrayEquals(new byte[]{3}, obj.willPayload); + assertEquals("4", obj.username); + assertEquals("5", obj.password); } @Test public void encode() throws IOException { + char[] data = new char[]{ + // Fixed Header + 0b0001_0000, // Packet Type + Reserved + 0b0000_1010, // Variable Header + Payload Length + // Variable Header + 0b0000_0000, // length + 0b0000_0100, // length + 0b0100_1101, // 'M' + 0b0101_0001, // 'Q' + 0b0101_0100, // 'T' + 0b0101_0100, // 'T' + + 0b0000_0100, // Prot. Level + + 0b0000_1010, // Flags + + 0b0000_0000, // Keep alive + 0b0000_1010, // Keep alive + // Payload + }; + MqttPacketConnect obj = new MqttPacketConnect(); - obj.payloadLength = 10; + obj.variableHeaderAndPayloadLength = 10; obj.keepAlive = 10; - obj.flagUsername = true; - obj.flagPassword = true; + obj.flagUsername = false; + obj.flagPassword = false; obj.flagWillRetain = false; obj.flagWillQoS = 1; - obj.flagWillFlag = true; + obj.flagWillFlag = false; obj.flagCleanSession = true; ByteArrayOutputStream buffer = new ByteArrayOutputStream(); @@ -96,4 +171,55 @@ public class MqttPacketConnectTest { MqttPacket.write(binOut, obj); assertArrayEquals(Converter.toBytes(data), buffer.toByteArray()); } + + @Test + public void encodePayload() throws IOException { + char[] data = new char[]{ + // Fixed Header + 0b0001_0000, // Packet Type(4) + Reserved(4) + 0xFF & 25, // Variable Header + Payload Length + // Variable Header + 0b0000_0000, // length + 0b0000_0100, // length + 0b0100_1101, // 'M' + 0b0101_0001, // 'Q' + 0b0101_0100, // 'T' + 0b0101_0100, // 'T' + + 0b0000_0100, // Prot. Level + + 0b1101_0100, // Flags + + 0b0000_0000, // Keep alive + 0b0000_1010, // Keep alive + // Payload + 0x00, 0x01, '1', // Client Identifier: 1 + 0x00, 0x01, '2', // Will Topic: 2 + 0x00, 0x01, 0x03, // Will payload: 3 + 0x00, 0x01, '4', // Username: 4 + 0x00, 0x01, '5', // password: 5 + }; + + MqttPacketConnect obj = new MqttPacketConnect(); + obj.variableHeaderAndPayloadLength = 10; + obj.keepAlive = 10; + + obj.flagUsername = true; + obj.flagPassword = true; + obj.flagWillRetain = false; + obj.flagWillQoS = 2; + obj.flagWillFlag = true; + obj.flagCleanSession = false; + + obj.clientIdentifier = "1"; + obj.willTopic = "2"; + obj.willPayload = new byte[]{3}; + obj.username = "4"; + obj.password = "5"; + + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + BinaryStructOutputStream binOut = new BinaryStructOutputStream(buffer); + MqttPacket.write(binOut, obj); + assertArrayEquals(Converter.toBytes(data), buffer.toByteArray()); + } } \ No newline at end of file diff --git a/test/zutil/net/mqtt/packet/MqttPacketPublishTest.java b/test/zutil/net/mqtt/packet/MqttPacketPublishTest.java new file mode 100644 index 0000000..f87f9f1 --- /dev/null +++ b/test/zutil/net/mqtt/packet/MqttPacketPublishTest.java @@ -0,0 +1,93 @@ +package zutil.net.mqtt.packet; + +import org.junit.Test; +import zutil.converter.Converter; +import zutil.parser.binary.BinaryStructInputStream; +import zutil.parser.binary.BinaryStructOutputStream; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import static org.junit.Assert.*; + +public class MqttPacketPublishTest { + + @Test + public void decode() throws IOException { + char[] data = new char[]{ + // Fixed Header + 0b0011_0000, // Packet Type(4) + Reserved(4) + 0xFF & 6, // Variable Header + Payload Length + // Variable Header + 0b0000_0000, // length + 0xFF & 2, // length + 'a', // Topic Name + 'b', // Topic Name + 0b0000_0000, // Packet Identifier + 0xFF & 5, // Packet Identifier + // Payload + }; + + MqttPacketPublish obj = (MqttPacketPublish) MqttPacket.read( + new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data)))); + + assertEquals(6, obj.variableHeaderAndPayloadLength); + assertEquals("ab", obj.topicName); + assertEquals(5, obj.packetId); + assertArrayEquals(new byte[0], obj.payload); + } + + @Test + public void decodePayload() throws IOException { + char[] data = new char[]{ + // Fixed Header + 0b0011_0000, // Packet Type(4) + Reserved(4) + 0xFF & 9, // Variable Header + Payload Length + // Variable Header + 0b0000_0000, // length + 0xFF & 2, // length + 'a', // Topic Name + 'b', // Topic Name + 0b0000_0000, // Packet Identifier + 0xFF & 5, // Packet Identifier + // Payload + 0x00, 0x01, 0x02, + }; + + MqttPacketPublish obj = (MqttPacketPublish) MqttPacket.read( + new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data)))); + + assertEquals(9, obj.variableHeaderAndPayloadLength); + assertEquals("ab", obj.topicName); + assertEquals(5, obj.packetId); + assertArrayEquals(new byte[]{0x00, 0x01, 0x02}, obj.payload); + } + + @Test + public void encode() throws IOException { + char[] data = new char[]{ + // Fixed Header + 0b0011_0000, // Packet Type(4) + Reserved(4) + 0xFF & 6, // Variable Header + Payload Length + // Variable Header + 0b0000_0000, // length + 0xFF & 2, // length + 'a', // Topic Name + 'b', // Topic Name + 0b0000_0000, // Packet Identifier + 0xFF & 5, // Packet Identifier + // Payload + }; + + MqttPacketPublish obj = new MqttPacketPublish(); + obj.variableHeaderAndPayloadLength = 5; + obj.topicName = "ab"; + obj.packetId = 5; + + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + BinaryStructOutputStream binOut = new BinaryStructOutputStream(buffer); + MqttPacket.write(binOut, obj); + assertArrayEquals(Converter.toBytes(data), buffer.toByteArray()); + } +} \ No newline at end of file diff --git a/test/zutil/net/mqtt/packet/MqttPacketSubscribeAckTest.java b/test/zutil/net/mqtt/packet/MqttPacketSubscribeAckTest.java new file mode 100644 index 0000000..1c686f8 --- /dev/null +++ b/test/zutil/net/mqtt/packet/MqttPacketSubscribeAckTest.java @@ -0,0 +1,80 @@ +package zutil.net.mqtt.packet; + +import org.junit.Test; +import zutil.converter.Converter; +import zutil.net.mqtt.packet.MqttPacketSubscribeAck.MqttSubscribeAckPayload; +import zutil.parser.binary.BinaryStructInputStream; +import zutil.parser.binary.BinaryStructOutputStream; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import static org.junit.Assert.*; + + +public class MqttPacketSubscribeAckTest { + + @Test + public void decode() throws IOException { + char[] data = new char[]{ + // Fixed Header + 0b1001_0000, // Packet Type(4) + Reserved(4) + 0xFF & 4, // Variable Header + Payload Length + // Variable Header + 0b0000_0000, // Packet Identifier + 0xFF & 8, // Packet Identifier + // Payload + // -- Item 1 + 0b0000_0001, // Return code + // -- Item 2 + 0b0000_0010, // Return code + }; + + MqttPacketSubscribeAck obj = (MqttPacketSubscribeAck) MqttPacket.read( + new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data)))); + + assertEquals(4, obj.variableHeaderAndPayloadLength); + assertEquals(8, obj.packetId); + assertEquals(2, obj.payloads.size()); + + assertEquals(MqttSubscribeAckPayload.RETCODE_SUCESS_MAX_QOS_1, obj.payloads.get(0).returnCode); + + assertEquals(MqttSubscribeAckPayload.RETCODE_SUCESS_MAX_QOS_2, obj.payloads.get(1).returnCode); + } + + @Test + public void encode() throws IOException { + char[] data = new char[]{ + // Fixed Header + 0b1001_0000, // Packet Type(4) + Reserved(4) + 0xFF & 4, // Variable Header + Payload Length + // Variable Header + 0b0000_0000, // Packet Identifier + 0xFF & 8, // Packet Identifier + // Payload + // -- Item 1 + 0b0000_0001, // Return code + // -- Item 2 + 0b0000_0010, // Return code + }; + + MqttPacketSubscribeAck obj = new MqttPacketSubscribeAck(); + obj.variableHeaderAndPayloadLength = 4; + obj.packetId = 8; + + MqttSubscribeAckPayload p1 = new MqttSubscribeAckPayload(); + p1.returnCode = MqttSubscribeAckPayload.RETCODE_SUCESS_MAX_QOS_1; + obj.payloads.add(p1); + + MqttSubscribeAckPayload p2 = new MqttSubscribeAckPayload(); + p2.returnCode = MqttSubscribeAckPayload.RETCODE_SUCESS_MAX_QOS_2; + obj.payloads.add(p2); + + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + BinaryStructOutputStream binOut = new BinaryStructOutputStream(buffer); + MqttPacket.write(binOut, obj); + assertArrayEquals(Converter.toBytes(data), buffer.toByteArray()); + } + +} \ No newline at end of file diff --git a/test/zutil/net/mqtt/packet/MqttPacketSubscribeTest.java b/test/zutil/net/mqtt/packet/MqttPacketSubscribeTest.java new file mode 100644 index 0000000..a3e49af --- /dev/null +++ b/test/zutil/net/mqtt/packet/MqttPacketSubscribeTest.java @@ -0,0 +1,98 @@ +package zutil.net.mqtt.packet; + +import org.junit.Test; +import zutil.converter.Converter; +import zutil.net.mqtt.packet.MqttPacketSubscribe.MqttSubscribePayload; +import zutil.parser.binary.BinaryStructInputStream; +import zutil.parser.binary.BinaryStructOutputStream; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import static org.junit.Assert.*; + +public class MqttPacketSubscribeTest { + + @Test + public void decode() throws IOException { + char[] data = new char[]{ + // Fixed Header + 0b1000_0000, // Packet Type(4) + Reserved(4) + 0xFF & 12, // Variable Header + Payload Length + // Variable Header + 0b0000_0000, // Packet Identifier + 0xFF & 8, // Packet Identifier + // Payload + // -- Item 1 + 0b0000_0000, // length + 0xFF & 2, // length + 'a', // Topic Name + 'b', // Topic Name + 0b000000_00, // Reserved(6) + QoS(2) + // -- Item 2 + 0b0000_0000, // length + 0xFF & 2, // length + 'c', // Topic Name + 'd', // Topic Name + 0b000000_01, // Reserved(6) + QoS(2) + }; + + MqttPacketSubscribe obj = (MqttPacketSubscribe) MqttPacket.read( + new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data)))); + + assertEquals(12, obj.variableHeaderAndPayloadLength); + assertEquals(8, obj.packetId); + assertEquals(2, obj.payloads.size()); + + assertEquals("ab", obj.payloads.get(0).topicFilter); + assertEquals(0, obj.payloads.get(0).qos); + + assertEquals("cd", obj.payloads.get(1).topicFilter); + assertEquals(1, obj.payloads.get(1).qos); + } + + @Test + public void encode() throws IOException { + char[] data = new char[]{ + // Fixed Header + 0b1000_0000, // Packet Type(4) + Reserved(4) + 0xFF & 12, // Variable Header + Payload Length + // Variable Header + 0b0000_0000, // Packet Identifier + 0xFF & 8, // Packet Identifier + // Payload + // -- Item 1 + 0b0000_0000, // length + 0xFF & 2, // length + 'a', // Topic Name + 'b', // Topic Name + 0b000000_00, // Reserved(6) + QoS(2) + // -- Item 2 + 0b0000_0000, // length + 0xFF & 2, // length + 'c', // Topic Name + 'd', // Topic Name + 0b000000_01, // Reserved(6) + QoS(2) + }; + + MqttPacketSubscribe obj = new MqttPacketSubscribe(); + obj.variableHeaderAndPayloadLength = 12; + obj.packetId = 8; + + MqttSubscribePayload p1 = new MqttSubscribePayload(); + p1.topicFilter = "ab"; + p1.qos = 0; + obj.payloads.add(p1); + + MqttSubscribePayload p2 = new MqttSubscribePayload(); + p2.topicFilter = "cd"; + p2.qos = 1; + obj.payloads.add(p2); + + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + BinaryStructOutputStream binOut = new BinaryStructOutputStream(buffer); + MqttPacket.write(binOut, obj); + assertArrayEquals(Converter.toBytes(data), buffer.toByteArray()); + } +} \ No newline at end of file diff --git a/test/zutil/net/mqtt/packet/MqttPacketUnsubscribeTest.java b/test/zutil/net/mqtt/packet/MqttPacketUnsubscribeTest.java new file mode 100644 index 0000000..f0b6b8a --- /dev/null +++ b/test/zutil/net/mqtt/packet/MqttPacketUnsubscribeTest.java @@ -0,0 +1,91 @@ +package zutil.net.mqtt.packet; + +import org.junit.Test; +import zutil.converter.Converter; +import zutil.net.mqtt.packet.MqttPacketUnsubscribe.MqttUnsubscribePayload; +import zutil.parser.binary.BinaryStructInputStream; +import zutil.parser.binary.BinaryStructOutputStream; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import static org.junit.Assert.*; + + +public class MqttPacketUnsubscribeTest { + + @Test + public void decode() throws IOException { + char[] data = new char[]{ + // Fixed Header + 0b1010_0000, // Packet Type(4) + Reserved(4) + 0xFF & 10, // Variable Header + Payload Length + // Variable Header + 0b0000_0000, // Packet Identifier + 0xFF & 8, // Packet Identifier + // Payload + // -- Item 1 + 0b0000_0000, // length + 0xFF & 2, // length + 'a', // Topic Name + 'b', // Topic Name + // -- Item 2 + 0b0000_0000, // length + 0xFF & 2, // length + 'c', // Topic Name + 'd', // Topic Name + }; + + MqttPacketUnsubscribe obj = (MqttPacketUnsubscribe) MqttPacket.read( + new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data)))); + + assertEquals(10, obj.variableHeaderAndPayloadLength); + assertEquals(8, obj.packetId); + assertEquals(2, obj.payloads.size()); + + assertEquals("ab", obj.payloads.get(0).topicFilter); + + assertEquals("cd", obj.payloads.get(1).topicFilter); + } + + @Test + public void encode() throws IOException { + char[] data = new char[]{ + // Fixed Header + 0b1010_0000, // Packet Type(4) + Reserved(4) + 0xFF & 10, // Variable Header + Payload Length + // Variable Header + 0b0000_0000, // Packet Identifier + 0xFF & 8, // Packet Identifier + // Payload + // -- Item 1 + 0b0000_0000, // length + 0xFF & 2, // length + 'a', // Topic Name + 'b', // Topic Name + // -- Item 2 + 0b0000_0000, // length + 0xFF & 2, // length + 'c', // Topic Name + 'd', // Topic Name + }; + + MqttPacketUnsubscribe obj = new MqttPacketUnsubscribe(); + obj.variableHeaderAndPayloadLength = 10; + obj.packetId = 8; + + MqttUnsubscribePayload p1 = new MqttUnsubscribePayload(); + p1.topicFilter = "ab"; + obj.payloads.add(p1); + + MqttUnsubscribePayload p2 = new MqttUnsubscribePayload(); + p2.topicFilter = "cd"; + obj.payloads.add(p2); + + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + BinaryStructOutputStream binOut = new BinaryStructOutputStream(buffer); + MqttPacket.write(binOut, obj); + assertArrayEquals(Converter.toBytes(data), buffer.toByteArray()); + } +} \ No newline at end of file diff --git a/test/zutil/parser/binary/serializer/TwoByteLengthPrefixedDataSerializerTest.java b/test/zutil/parser/binary/serializer/TwoByteLengthPrefixedDataSerializerTest.java index 5a41756..db4bb0f 100644 --- a/test/zutil/parser/binary/serializer/TwoByteLengthPrefixedDataSerializerTest.java +++ b/test/zutil/parser/binary/serializer/TwoByteLengthPrefixedDataSerializerTest.java @@ -72,10 +72,10 @@ public class TwoByteLengthPrefixedDataSerializerTest implements BinaryStruct { // 0 Length outputStream.reset();outputStream.reset(); serializer.write(outputStream, null, stringFieldData, this); - assertArrayEquals(new byte[]{}, outputStream.toByteArray()); + assertArrayEquals(new byte[]{0, 0}, outputStream.toByteArray()); outputStream.reset(); serializer.write(outputStream, null, byteFieldData, this); - assertArrayEquals(new byte[]{}, outputStream.toByteArray()); + assertArrayEquals(new byte[]{0, 0}, outputStream.toByteArray()); // 0 Length outputStream.reset();