diff --git a/src/zutil/ByteUtil.java b/src/zutil/ByteUtil.java index 7fa5cd5..7ac827b 100755 --- a/src/zutil/ByteUtil.java +++ b/src/zutil/ByteUtil.java @@ -139,6 +139,13 @@ public class ByteUtil { return (byte) BYTE_MASK[index][length]; } + /** + * @param b the byte to be bit inverted + * @return a new byte that has all the bits inverted. + */ + public static byte invert(byte b) { + return (byte) (0xFF ^ b); + } /** * Shifts a whole byte array to the right towards the LSB by the specified amount. diff --git a/src/zutil/net/mqtt/MqttBroker.java b/src/zutil/net/mqtt/MqttBroker.java index e96394a..c8d87a2 100755 --- a/src/zutil/net/mqtt/MqttBroker.java +++ b/src/zutil/net/mqtt/MqttBroker.java @@ -56,7 +56,15 @@ public class MqttBroker extends ThreadedTCPNetworkServer { public static final int MQTT_PROTOCOL_VERSION = 0x04; // MQTT 3.1.1 private List globalListeners = new ArrayList<>(); - private Map> subscriptionListeners = new HashMap<>(); + private Map topics = new HashMap<>(); + + protected static class MqttTopic { + public String name; + /** If set this will be sent as a publish message as soon as a client subscribes to this topic */ + public byte[] retainedPayload; + + public List subscribers = new ArrayList<>(0); // Stat with low capacity to save memory + } public MqttBroker() throws IOException { @@ -73,16 +81,51 @@ public class MqttBroker extends ThreadedTCPNetworkServer { return new MqttConnectionThread(this, s); } + /** + * @param topic the topic name + * @return a new topic if it does not exist otherwise returns the existing topic. + */ + private MqttTopic createTopic(String topic) { + if (!topics.containsKey(topic)) { + logger.finest("Creating new topic: " + topic); + MqttTopic topicData = new MqttTopic(); + topicData.name = topic; + topics.put(topic, topicData); + } + + return topics.get(topic); + } + + /** + * Will check if the topic is empty, if so its data will be cleared. + * A topic is empty if:
    + *
  • No subscribers
  • + *
  • No retained message
  • + *
+ * + * @param topic the topic name + */ + private void removeTopicIfEmpty(String topic) { + MqttTopic topicData = topics.get(topic); + if (topicData == null) + return; + + if (topicData.retainedPayload == null && + topicData.subscribers.isEmpty()) { + logger.finest("Removing empty topic: " + topic); + topics.remove(topic); + } + } /** * @return the subscriber count for the specific topic, -1 if * topic does not exist or has not been created yet. */ - public int getSubscriberCount(String topic) { - List topicSubscriptions = subscriptionListeners.get(topic); + public int getSubscriberCount(String topicName) { + MqttTopic topicData = topics.get(topicName); - if (topicSubscriptions != null) - return topicSubscriptions.size(); + if (topicData != null) + return topicData.subscribers.size(); return -1; } @@ -107,51 +150,66 @@ public class MqttBroker extends ThreadedTCPNetworkServer { /** * Add the listener as a subscriber of the specific topic. * - * @param topic the topic that will be subscribed to. - * @param listener the listener that will be called. + * @param topicName the topic that will be subscribed to. + * @param listener the listener that will be called. */ - public synchronized void subscribe(String topic, MqttSubscriptionListener listener) { - if (topic == null || topic.isEmpty() || listener == null) + public synchronized void subscribe(String topicName, MqttSubscriptionListener listener) { + if (topicName == null || topicName.isEmpty() || listener == null) return; - if (!subscriptionListeners.containsKey(topic)) { - logger.finest("Creating new topic: " + topic); - subscriptionListeners.put(topic, new ArrayList<>()); - } + MqttTopic topicData = createTopic(topicName); - List topicSubscriptions = subscriptionListeners.get(topic); + if (!topicData.subscribers.contains(listener)) { + topicData.subscribers.add(listener); + logger.finer("New subscriber on topic: " + topicName + " (subscriber count: " + topicData.subscribers.size() + ")"); - if (!topicSubscriptions.contains(listener)) { - topicSubscriptions.add(listener); - logger.finer("New subscriber on topic: " + topic + " (subscriber count: " + topicSubscriptions.size() + ")"); + // Send any retained messages for the topic + if (topicData.retainedPayload != null) { + listener.dataPublished(topicName, topicData.retainedPayload); + } } } /** * Publish data to the specific topic. * - * @param topic the topic where the data should be published to. - * @param data the data that should be published. String will be converted to UTF-8 byte array. + * @param topicName the topic where the data should be published to. + * @param data the data that should be published. String will be converted to UTF-8 byte array. */ - public void publish(String topic, String data) { - publish(topic, data.getBytes(StandardCharsets.UTF_8)); + public void publish(String topicName, String data) { + publish(topicName, data.getBytes(StandardCharsets.UTF_8)); } /** * Publish data to the specific topic. * - * @param topic the topic where the data should be published to. - * @param data the data that should be published. + * @param topicName the topic where the data should be published to. + * @param data the data that should be published. */ - public void publish(String topic, byte[] data) { - logger.finer("Data has been published to topic: " + topic); + public void publish(String topicName, byte[] data) { + logger.finer("Data has been published to topic: " + topicName); if (globalListeners != null) - globalListeners.forEach(listener -> listener.dataPublished(topic, data)); + globalListeners.forEach(listener -> listener.dataPublished(topicName, data)); - List topicSubscriptions = subscriptionListeners.get(topic); - if (topicSubscriptions != null) { - topicSubscriptions.forEach(subscriber -> subscriber.dataPublished(topic, data)); + MqttTopic topicData = topics.get(topicName); + if (topicData != null) { + topicData.subscribers.forEach(subscriber -> subscriber.dataPublished(topicName, data)); + } + } + + /** + * Will retain the given publish message for the specific topic, the message will be sent to any new client subscribing to the topic. + * Note, only a single message is saved for each topic. + * + * @param topicName the topic where the message should be retained. + * @param data the payload to be stored for the topic. Set to null to clear retained message. + */ + public void retain(String topicName, byte[] data) { + MqttTopic topicData = createTopic(topicName); + + if (topicData != null) { + topicData.retainedPayload = data; } } @@ -164,32 +222,27 @@ public class MqttBroker extends ThreadedTCPNetworkServer { if (listener == null) return; - for (String topic : subscriptionListeners.keySet()) { - unsubscribe(topic, listener); + for (MqttTopic topic : topics.values()) { + unsubscribe(topic.name, listener); } } /** * Unsubscribe the listener from the specific MQTT topic. * - * @param topic the specific topic that should be unsubscribed from. - * @param listener the target listener that should be unsubscribed. + * @param topicName the specific topic that should be unsubscribed from. + * @param listener the target listener that should be unsubscribed. */ - public synchronized void unsubscribe(String topic, MqttSubscriptionListener listener) { - if (topic == null || topic.isEmpty() || listener == null) + public synchronized void unsubscribe(String topicName, MqttSubscriptionListener listener) { + if (topicName == null || topicName.isEmpty() || listener == null) return; - if (!subscriptionListeners.containsKey(topic)) - return; + MqttTopic topicData = topics.get(topicName); + if (topicData != null) { + if (topicData.subscribers.remove(listener)) { + logger.finer("Subscriber unsubscribed from topic " + topicName + " (subscriber count: " + topicData.subscribers.size() + ")"); - List topicSubscriptions = subscriptionListeners.get(topic); - - if (topicSubscriptions.remove(listener)) { - logger.finer("Subscriber unsubscribed from topic " + topic + " (subscriber count: " + topicSubscriptions.size() + ")"); - - if (topicSubscriptions.isEmpty()) { - logger.finest("Removing empty topic: " + topic); - subscriptionListeners.remove(topic); + removeTopicIfEmpty(topicName); } } } @@ -371,11 +424,15 @@ public class MqttBroker extends ThreadedTCPNetworkServer { private void handlePublish(MqttPacketPublish publishPacket) { if (publishPacket.getFlagQoS() != 0) throw new UnsupportedOperationException("QoS larger then 0 not supported."); - if (publishPacket.getFlagRetain()) - throw new UnsupportedOperationException("Publish retain is not supported."); + if (publishPacket.getFlagQoS() >= 3) + throw new IllegalArgumentException("QoS value of 3 is not valid."); logger.finer("[" + clientId + "] Publishing to topic: " + publishPacket.topicName); broker.publish(publishPacket.topicName, publishPacket.payload); + + if (publishPacket.getFlagRetain()) { + broker.retain(publishPacket.topicName, publishPacket.payload); + } } private void handleSubscribe(MqttPacketSubscribe subscribePacket) throws IOException { diff --git a/test/zutil/ByteUtilTest.java b/test/zutil/ByteUtilTest.java index 20a9e2a..22e5dc4 100755 --- a/test/zutil/ByteUtilTest.java +++ b/test/zutil/ByteUtilTest.java @@ -157,4 +157,11 @@ public class ByteUtilTest { assertArrayEquals( new byte[]{0b0000_0000,0b0000_0001,0b0000_0001,0b0000_0001}, ByteUtil.shiftLeft(new byte[]{0b0001_0000,0b0001_0000,0b0001_0000,0b0001_0000}, 4)); } + + @Test + public void invert() { + assertEquals((byte) 0b1111_1110, ByteUtil.invert((byte) 0b0000_0001)); + assertEquals((byte) 0b0000_1111, ByteUtil.invert((byte) 0b1111_0000)); + assertEquals((byte) 0b0000_1000, ByteUtil.invert((byte) 0b1111_0111)); + } }