Some bug fixes and added setters for flags
This commit is contained in:
parent
e4d9a16ebd
commit
19a12dee44
3 changed files with 125 additions and 17 deletions
|
|
@ -104,21 +104,24 @@ public class MqttPacketPublish extends MqttPacketHeader {
|
||||||
return (flags & FLAG_DUP_BITMASK) != 0;
|
return (flags & FLAG_DUP_BITMASK) != 0;
|
||||||
}
|
}
|
||||||
public void setFlagDup(boolean isRedelivery) {
|
public void setFlagDup(boolean isRedelivery) {
|
||||||
flags |= (byte) (FLAG_DUP_BITMASK & (isRedelivery ? 1 : 0));
|
if (isRedelivery) flags |= FLAG_DUP_BITMASK;
|
||||||
|
else flags &= ~FLAG_DUP_BITMASK;
|
||||||
}
|
}
|
||||||
|
|
||||||
public byte getFlagQoS() {
|
public byte getFlagQoS() {
|
||||||
return (byte) ((flags & FLAG_QOS_BITMASK) >> 1);
|
return (byte) ((flags & FLAG_QOS_BITMASK) >>> 1);
|
||||||
}
|
}
|
||||||
public void setFlagQoS(int qos) {
|
public void setFlagQoS(int qos) {
|
||||||
flags |= (byte) (FLAG_QOS_BITMASK & qos);
|
flags &= ~FLAG_QOS_BITMASK; // reset bits
|
||||||
|
flags |= (byte) (FLAG_QOS_BITMASK & (qos << 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean getFlagRetain() {
|
public boolean getFlagRetain() {
|
||||||
return (flags & FLAG_RETAIN_BITMASK) != 0;
|
return (flags & FLAG_RETAIN_BITMASK) != 0;
|
||||||
}
|
}
|
||||||
public void setFlagRetain(boolean retain) {
|
public void setFlagRetain(boolean retain) {
|
||||||
flags |= (byte) (FLAG_RETAIN_BITMASK & (retain ? 1 : 0));
|
if (retain) flags |= FLAG_RETAIN_BITMASK;
|
||||||
|
else flags &= ~FLAG_RETAIN_BITMASK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -31,6 +31,7 @@ import zutil.net.mqtt.packet.MqttPacketSubscribe.MqttSubscribePayload;
|
||||||
import zutil.net.mqtt.packet.MqttPacketUnsubscribe.MqttUnsubscribePayload;
|
import zutil.net.mqtt.packet.MqttPacketUnsubscribe.MqttUnsubscribePayload;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
@ -52,6 +53,16 @@ public class MqttBrokerTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class MqttSubscriptionListenerMock implements MqttSubscriptionListener {
|
||||||
|
String receivedTopic = null;
|
||||||
|
byte[] receivedPayload = null;
|
||||||
|
|
||||||
|
public void dataPublished(String topic, byte[] data) {
|
||||||
|
receivedTopic = topic;
|
||||||
|
receivedPayload = data;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
//**************** Test Cases **************************
|
//**************** Test Cases **************************
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -117,19 +128,10 @@ public class MqttBrokerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void publish() throws IOException {
|
public void publish() throws IOException {
|
||||||
// Setup subscriber
|
|
||||||
final String[] recivedTopic = new String[1];
|
|
||||||
final byte[] recievedPayload = new byte[1];
|
|
||||||
MqttSubscriptionListener subscriber = new MqttSubscriptionListener() {
|
|
||||||
public void dataPublished(String topic, byte[] data) {
|
|
||||||
recivedTopic[0] = topic;
|
|
||||||
recievedPayload[0] = data[0];
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Setup broker
|
// Setup broker
|
||||||
MqttBroker broker = new MqttBroker();
|
MqttBroker broker = new MqttBroker();
|
||||||
MqttConnectionThreadMock thread = new MqttConnectionThreadMock(broker);
|
MqttConnectionThreadMock thread = new MqttConnectionThreadMock(broker);
|
||||||
|
MqttSubscriptionListenerMock subscriber = new MqttSubscriptionListenerMock();
|
||||||
broker.subscribe("test/topic", subscriber);
|
broker.subscribe("test/topic", subscriber);
|
||||||
|
|
||||||
// Setup publish
|
// Setup publish
|
||||||
|
|
@ -140,8 +142,76 @@ public class MqttBrokerTest {
|
||||||
thread.handlePacket(publish);
|
thread.handlePacket(publish);
|
||||||
|
|
||||||
// Check response
|
// Check response
|
||||||
assertEquals("test/topic", recivedTopic[0]);
|
assertEquals("test/topic", subscriber.receivedTopic);
|
||||||
assertEquals((byte) 42, recievedPayload[0]);
|
assertEquals((byte) 42, subscriber.receivedPayload[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void publishRetain() throws IOException {
|
||||||
|
// Setup broker
|
||||||
|
|
||||||
|
MqttBroker broker = new MqttBroker();
|
||||||
|
MqttConnectionThreadMock thread = new MqttConnectionThreadMock(broker);
|
||||||
|
MqttSubscriptionListenerMock subscriber = new MqttSubscriptionListenerMock();
|
||||||
|
|
||||||
|
// Setup publish message and send it
|
||||||
|
|
||||||
|
MqttPacketPublish publish = new MqttPacketPublish();
|
||||||
|
publish.topicName = "test/topic";
|
||||||
|
publish.payload = "test payload".getBytes(StandardCharsets.UTF_8);
|
||||||
|
publish.setFlagRetain(true);
|
||||||
|
thread.handlePacket(publish);
|
||||||
|
|
||||||
|
// Check if we receive retained message after it was sent
|
||||||
|
|
||||||
|
broker.subscribe("test/topic", subscriber);
|
||||||
|
assertEquals("test/topic", subscriber.receivedTopic);
|
||||||
|
assertEquals( "test payload", new String(subscriber.receivedPayload, StandardCharsets.UTF_8));
|
||||||
|
|
||||||
|
// Check that normal message and does not modify the retained message
|
||||||
|
|
||||||
|
broker.unsubscribe("test/topic", subscriber);
|
||||||
|
subscriber.receivedTopic = null;
|
||||||
|
subscriber.receivedPayload = null;
|
||||||
|
|
||||||
|
publish.payload = "none retained payload".getBytes(StandardCharsets.UTF_8);
|
||||||
|
publish.setFlagRetain(false);
|
||||||
|
thread.handlePacket(publish);
|
||||||
|
|
||||||
|
broker.subscribe("test/topic", subscriber);
|
||||||
|
assertEquals("test/topic", subscriber.receivedTopic);
|
||||||
|
assertEquals( "test payload", new String(subscriber.receivedPayload, StandardCharsets.UTF_8));
|
||||||
|
|
||||||
|
// Check that we can remove the retained message
|
||||||
|
|
||||||
|
publish.payload = new byte[0];
|
||||||
|
publish.setFlagRetain(true);
|
||||||
|
thread.handlePacket(publish);
|
||||||
|
|
||||||
|
assertEquals("test/topic", subscriber.receivedTopic);
|
||||||
|
assertEquals( 0,subscriber.receivedPayload.length);
|
||||||
|
|
||||||
|
MqttSubscriptionListenerMock subscriber2 = new MqttSubscriptionListenerMock();
|
||||||
|
broker.subscribe("test/topic", subscriber2);
|
||||||
|
assertEquals(null, subscriber2.receivedTopic);
|
||||||
|
assertEquals( null, subscriber2.receivedPayload);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void publishBadQos() throws IOException {
|
||||||
|
// Setup broker
|
||||||
|
MqttBroker broker = new MqttBroker();
|
||||||
|
MqttConnectionThreadMock thread = new MqttConnectionThreadMock(broker);
|
||||||
|
|
||||||
|
// Setup publish message
|
||||||
|
MqttPacketPublish publish = new MqttPacketPublish();
|
||||||
|
publish.topicName = "test/topic";
|
||||||
|
publish.setFlagQoS(3);
|
||||||
|
|
||||||
|
// Test illegal qos
|
||||||
|
assertThrows(UnsupportedOperationException.class, () -> {
|
||||||
|
thread.handlePacket(publish);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,41 @@ public class MqttPacketPublishTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void decodeFlags() throws IOException {
|
public void decodeFlags() throws IOException {
|
||||||
|
// Test getter and setter
|
||||||
|
|
||||||
|
MqttPacketPublish obj = new MqttPacketPublish();
|
||||||
|
assertFalse(obj.getFlagDup());
|
||||||
|
obj.setFlagDup(true);
|
||||||
|
assertTrue(obj.getFlagDup());
|
||||||
|
assertEquals(0b1_00_0, obj.flags);
|
||||||
|
obj.setFlagDup(false);
|
||||||
|
assertFalse(obj.getFlagDup());
|
||||||
|
assertEquals(0b0_00_0, obj.flags);
|
||||||
|
|
||||||
|
assertEquals(0, obj.getFlagQoS());
|
||||||
|
obj.setFlagQoS(1);
|
||||||
|
assertEquals(1, obj.getFlagQoS());
|
||||||
|
assertEquals(0b0_01_0, obj.flags);
|
||||||
|
obj.setFlagQoS(2);
|
||||||
|
assertEquals(2, obj.getFlagQoS());
|
||||||
|
assertEquals(0b0_10_0, obj.flags);
|
||||||
|
obj.setFlagQoS(3);
|
||||||
|
assertEquals(3, obj.getFlagQoS());
|
||||||
|
assertEquals(0b0_11_0, obj.flags);
|
||||||
|
obj.setFlagQoS(0);
|
||||||
|
assertEquals(0, obj.getFlagQoS());
|
||||||
|
assertEquals(0b0_00_0, obj.flags);
|
||||||
|
|
||||||
|
assertFalse(obj.getFlagRetain());
|
||||||
|
obj.setFlagRetain(true);
|
||||||
|
assertTrue(obj.getFlagRetain());
|
||||||
|
assertEquals(0b0_00_1, obj.flags);
|
||||||
|
obj.setFlagRetain(false);
|
||||||
|
assertFalse(obj.getFlagRetain());
|
||||||
|
assertEquals(0b0_00_0, obj.flags);
|
||||||
|
|
||||||
|
// Test decoding
|
||||||
|
|
||||||
char[] data = new char[]{
|
char[] data = new char[]{
|
||||||
// Fixed Header
|
// Fixed Header
|
||||||
0b0011_1000, // Packet Type(4) + Reserved(4)
|
0b0011_1000, // Packet Type(4) + Reserved(4)
|
||||||
|
|
@ -50,7 +85,7 @@ public class MqttPacketPublishTest {
|
||||||
// Payload
|
// Payload
|
||||||
};
|
};
|
||||||
|
|
||||||
MqttPacketPublish obj = (MqttPacketPublish) MqttPacket.read(
|
obj = (MqttPacketPublish) MqttPacket.read(
|
||||||
new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data))));
|
new BinaryStructInputStream(new ByteArrayInputStream(Converter.toBytes(data))));
|
||||||
|
|
||||||
assertTrue(obj.getFlagDup());
|
assertTrue(obj.getFlagDup());
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue