diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublish.java b/src/zutil/net/mqtt/packet/MqttPacketPublish.java index a515e7c..231f522 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublish.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublish.java @@ -104,21 +104,24 @@ public class MqttPacketPublish extends MqttPacketHeader { return (flags & FLAG_DUP_BITMASK) != 0; } public void setFlagDup(boolean isRedelivery) { - flags |= (byte) (FLAG_DUP_BITMASK & (isRedelivery ? 1 : 0)); + if (isRedelivery) flags |= FLAG_DUP_BITMASK; + else flags &= ~FLAG_DUP_BITMASK; } public byte getFlagQoS() { - return (byte) ((flags & FLAG_QOS_BITMASK) >> 1); + return (byte) ((flags & FLAG_QOS_BITMASK) >>> 1); } public void setFlagQoS(int qos) { - flags |= (byte) (FLAG_QOS_BITMASK & qos); + flags &= ~FLAG_QOS_BITMASK; // reset bits + flags |= (byte) (FLAG_QOS_BITMASK & (qos << 1)); } public boolean getFlagRetain() { return (flags & FLAG_RETAIN_BITMASK) != 0; } public void setFlagRetain(boolean retain) { - flags |= (byte) (FLAG_RETAIN_BITMASK & (retain ? 1 : 0)); + if (retain) flags |= FLAG_RETAIN_BITMASK; + else flags &= ~FLAG_RETAIN_BITMASK; } diff --git a/test/zutil/net/mqtt/MqttBrokerTest.java b/test/zutil/net/mqtt/MqttBrokerTest.java index 90b6963..998c80c 100644 --- a/test/zutil/net/mqtt/MqttBrokerTest.java +++ b/test/zutil/net/mqtt/MqttBrokerTest.java @@ -31,6 +31,7 @@ import zutil.net.mqtt.packet.MqttPacketSubscribe.MqttSubscribePayload; import zutil.net.mqtt.packet.MqttPacketUnsubscribe.MqttUnsubscribePayload; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.LinkedList; import static org.junit.Assert.*; @@ -52,6 +53,16 @@ public class MqttBrokerTest { } } + public static class MqttSubscriptionListenerMock implements MqttSubscriptionListener { + String receivedTopic = null; + byte[] receivedPayload = null; + + public void dataPublished(String topic, byte[] data) { + receivedTopic = topic; + receivedPayload = data; + } + }; + //**************** Test Cases ************************** @Test @@ -117,19 +128,10 @@ public class MqttBrokerTest { @Test public void publish() throws IOException { - // Setup subscriber - final String[] recivedTopic = new String[1]; - final byte[] recievedPayload = new byte[1]; - MqttSubscriptionListener subscriber = new MqttSubscriptionListener() { - public void dataPublished(String topic, byte[] data) { - recivedTopic[0] = topic; - recievedPayload[0] = data[0]; - } - }; - // Setup broker MqttBroker broker = new MqttBroker(); MqttConnectionThreadMock thread = new MqttConnectionThreadMock(broker); + MqttSubscriptionListenerMock subscriber = new MqttSubscriptionListenerMock(); broker.subscribe("test/topic", subscriber); // Setup publish @@ -140,8 +142,76 @@ public class MqttBrokerTest { thread.handlePacket(publish); // Check response - assertEquals("test/topic", recivedTopic[0]); - assertEquals((byte) 42, recievedPayload[0]); + assertEquals("test/topic", subscriber.receivedTopic); + assertEquals((byte) 42, subscriber.receivedPayload[0]); + } + + @Test + public void publishRetain() throws IOException { + // Setup broker + + MqttBroker broker = new MqttBroker(); + MqttConnectionThreadMock thread = new MqttConnectionThreadMock(broker); + MqttSubscriptionListenerMock subscriber = new MqttSubscriptionListenerMock(); + + // Setup publish message and send it + + MqttPacketPublish publish = new MqttPacketPublish(); + publish.topicName = "test/topic"; + publish.payload = "test payload".getBytes(StandardCharsets.UTF_8); + publish.setFlagRetain(true); + thread.handlePacket(publish); + + // Check if we receive retained message after it was sent + + broker.subscribe("test/topic", subscriber); + assertEquals("test/topic", subscriber.receivedTopic); + assertEquals( "test payload", new String(subscriber.receivedPayload, StandardCharsets.UTF_8)); + + // Check that normal message and does not modify the retained message + + broker.unsubscribe("test/topic", subscriber); + subscriber.receivedTopic = null; + subscriber.receivedPayload = null; + + publish.payload = "none retained payload".getBytes(StandardCharsets.UTF_8); + publish.setFlagRetain(false); + thread.handlePacket(publish); + + broker.subscribe("test/topic", subscriber); + assertEquals("test/topic", subscriber.receivedTopic); + assertEquals( "test payload", new String(subscriber.receivedPayload, StandardCharsets.UTF_8)); + + // Check that we can remove the retained message + + publish.payload = new byte[0]; + publish.setFlagRetain(true); + thread.handlePacket(publish); + + assertEquals("test/topic", subscriber.receivedTopic); + assertEquals( 0,subscriber.receivedPayload.length); + + MqttSubscriptionListenerMock subscriber2 = new MqttSubscriptionListenerMock(); + broker.subscribe("test/topic", subscriber2); + assertEquals(null, subscriber2.receivedTopic); + assertEquals( null, subscriber2.receivedPayload); + } + + @Test + public void publishBadQos() throws IOException { + // Setup broker + MqttBroker broker = new MqttBroker(); + MqttConnectionThreadMock thread = new MqttConnectionThreadMock(broker); + + // Setup publish message + MqttPacketPublish publish = new MqttPacketPublish(); + publish.topicName = "test/topic"; + publish.setFlagQoS(3); + + // Test illegal qos + assertThrows(UnsupportedOperationException.class, () -> { + thread.handlePacket(publish); + }); } @Test diff --git a/test/zutil/net/mqtt/packet/MqttPacketPublishTest.java b/test/zutil/net/mqtt/packet/MqttPacketPublishTest.java index 09230ff..bd02164 100644 --- a/test/zutil/net/mqtt/packet/MqttPacketPublishTest.java +++ b/test/zutil/net/mqtt/packet/MqttPacketPublishTest.java @@ -38,6 +38,41 @@ public class MqttPacketPublishTest { @Test public void decodeFlags() throws IOException { + // Test getter and setter + + MqttPacketPublish obj = new MqttPacketPublish(); + assertFalse(obj.getFlagDup()); + obj.setFlagDup(true); + assertTrue(obj.getFlagDup()); + assertEquals(0b1_00_0, obj.flags); + obj.setFlagDup(false); + assertFalse(obj.getFlagDup()); + assertEquals(0b0_00_0, obj.flags); + + assertEquals(0, obj.getFlagQoS()); + obj.setFlagQoS(1); + assertEquals(1, obj.getFlagQoS()); + assertEquals(0b0_01_0, obj.flags); + obj.setFlagQoS(2); + assertEquals(2, obj.getFlagQoS()); + assertEquals(0b0_10_0, obj.flags); + obj.setFlagQoS(3); + assertEquals(3, obj.getFlagQoS()); + assertEquals(0b0_11_0, obj.flags); + obj.setFlagQoS(0); + assertEquals(0, obj.getFlagQoS()); + assertEquals(0b0_00_0, obj.flags); + + assertFalse(obj.getFlagRetain()); + obj.setFlagRetain(true); + assertTrue(obj.getFlagRetain()); + assertEquals(0b0_00_1, obj.flags); + obj.setFlagRetain(false); + assertFalse(obj.getFlagRetain()); + assertEquals(0b0_00_0, obj.flags); + + // Test decoding + char[] data = new char[]{ // Fixed Header 0b0011_1000, // Packet Type(4) + Reserved(4) @@ -50,7 +85,7 @@ public class MqttPacketPublishTest { // Payload }; - MqttPacketPublish obj = (MqttPacketPublish) MqttPacket.read( + obj = (MqttPacketPublish) MqttPacket.read( new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data)))); assertTrue(obj.getFlagDup());