From 0692112e908b8b124937c5a8559855df330dcb08 Mon Sep 17 00:00:00 2001 From: Ziver Koc Date: Sat, 27 Dec 2025 23:44:47 +0100 Subject: [PATCH] Fixed some issues in MQTT server --- src/zutil/net/mqtt/MqttBroker.java | 22 +++++++--- test/zutil/net/mqtt/MqttBrokerRun.java | 27 ------------ test/zutil/net/mqtt/MqttBrokerTestRun.java | 51 ++++++++++++++++++++++ 3 files changed, 66 insertions(+), 34 deletions(-) delete mode 100644 test/zutil/net/mqtt/MqttBrokerRun.java create mode 100644 test/zutil/net/mqtt/MqttBrokerTestRun.java diff --git a/src/zutil/net/mqtt/MqttBroker.java b/src/zutil/net/mqtt/MqttBroker.java index 146447b..7af796c 100755 --- a/src/zutil/net/mqtt/MqttBroker.java +++ b/src/zutil/net/mqtt/MqttBroker.java @@ -63,14 +63,20 @@ public class MqttBroker extends ThreadedTCPNetworkServer { private List wildcardSubscribers = new CopyOnWriteArrayList<>(); private Map topics = new ConcurrentHashMap<>(); + /** + * Topic specific data + */ protected static class MqttTopic { public String name; - /** If set this will be sent as a publish message as soon as a client subscribes to this topic */ + /** If set, this will be sent as a publish-message as soon as a client subscribes to this topic */ public byte[] retainedPayload; public List subscribers = new ArrayList<>(0); // Stat with low capacity to save memory } + /** + * Subscriber specific data + */ protected static class MqttWildcardSubscriber { public Pattern topicPattern; public MqttSubscriptionListener listener; @@ -171,6 +177,8 @@ public class MqttBroker extends ThreadedTCPNetworkServer { // Is it a wildcard topic? if (topicName.contains("#") || topicName.contains("+")) { + logger.finest("New wildcard subscriber to topic: " + topicName); + MqttWildcardSubscriber subscriber = new MqttWildcardSubscriber(); subscriber.listener = listener; subscriber.topicPattern = generatePatternFromTopic(topicName); @@ -179,10 +187,12 @@ public class MqttBroker extends ThreadedTCPNetworkServer { // Send any retained messages for matched topics for (MqttTopic topicData : topics.values()) { if (topicData.retainedPayload != null && subscriber.topicPattern.matcher(topicData.name).matches()) { - listener.dataPublished(topicName, topicData.retainedPayload); + logger.finest("Sending retained message to new subscriber: " + topicData.name); + listener.dataPublished(topicData.name, topicData.retainedPayload); } } } else { + logger.finest("New subscriber to topic: " + topicName); MqttTopic topicData = createTopic(topicName); if (!topicData.subscribers.contains(listener)) { @@ -438,6 +448,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer { logger.fine("[" + socket.getInetAddress() + "] Registered will packet for topic: " + conn.willTopic); willPacket = new MqttPacketPublish(); willPacket.topicName = conn.willTopic; + willPacket.payload = conn.willPayload; willPacket.setFlagQoS(conn.flagWillQoS); willPacket.setFlagRetain(conn.flagWillRetain); } else { @@ -501,8 +512,6 @@ public class MqttBroker extends ThreadedTCPNetworkServer { private void handlePublish(MqttPacketPublish publishPacket) throws IOException { - // TODO: QOS 2 - if (publishPacket.getFlagQoS() == 2) throw new UnsupportedOperationException("QoS value of 2 not supported."); if (publishPacket.getFlagQoS() >= 3) @@ -520,6 +529,8 @@ public class MqttBroker extends ThreadedTCPNetworkServer { sendPacket(publishAckPacket); } + // TODO: QOS 2 + logger.finer("[" + clientId + "] Publishing to topic: " + publishPacket.topicName); broker.publish(publishPacket.topicName, publishPacket.payload); @@ -532,9 +543,6 @@ public class MqttBroker extends ThreadedTCPNetworkServer { MqttPacketSubscribeAck subscribeAckPacket = new MqttPacketSubscribeAck(); subscribeAckPacket.packetId = subscribePacket.packetId; - // TODO: handle topic wildcard # - // TODO: topic starting with $ should not be matched to wildcards - for (MqttSubscribePayload payload : subscribePacket.payloads) { logger.finer("[" + clientId + "] Subscribing to topic: " + payload.topicFilter); broker.subscribe(payload.topicFilter, this); diff --git a/test/zutil/net/mqtt/MqttBrokerRun.java b/test/zutil/net/mqtt/MqttBrokerRun.java deleted file mode 100644 index 450f9d2..0000000 --- a/test/zutil/net/mqtt/MqttBrokerRun.java +++ /dev/null @@ -1,27 +0,0 @@ -package zutil.net.mqtt; - -import zutil.log.CompactLogFormatter; -import zutil.log.LogUtil; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.logging.Level; -import java.util.logging.Logger; - -public class MqttBrokerRun implements MqttSubscriptionListener { - private static final Logger logger = LogUtil.getLogger(); - - public static void main(String[] args) throws IOException { - LogUtil.setGlobalLevel(Level.FINEST); - LogUtil.setGlobalFormatter(new CompactLogFormatter()); - - MqttBroker mqttBroker = new MqttBroker(); - mqttBroker.addGlobalSubscriber(new MqttBrokerRun()); - mqttBroker.start(); - } - - @Override - public void dataPublished(String topic, byte[] data) { - logger.info("MQTT data published(topic: " + topic + "): " + new String(data, StandardCharsets.UTF_8)); - } -} diff --git a/test/zutil/net/mqtt/MqttBrokerTestRun.java b/test/zutil/net/mqtt/MqttBrokerTestRun.java new file mode 100644 index 0000000..c6d1c37 --- /dev/null +++ b/test/zutil/net/mqtt/MqttBrokerTestRun.java @@ -0,0 +1,51 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2025 Ziver Koc + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package zutil.net.mqtt; + +import zutil.log.CompactLogFormatter; +import zutil.log.LogUtil; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class MqttBrokerTestRun implements MqttSubscriptionListener { + private static final Logger logger = LogUtil.getLogger(); + + public static void main(String[] args) throws IOException { + LogUtil.setGlobalLevel(Level.FINEST); + LogUtil.setGlobalFormatter(new CompactLogFormatter()); + + MqttBroker mqttBroker = new MqttBroker(); + mqttBroker.addGlobalSubscriber(new MqttBrokerTestRun()); + mqttBroker.start(); + } + + @Override + public void dataPublished(String topic, byte[] data) { + logger.info("MQTT data published(topic: " + topic + "): " + new String(data, StandardCharsets.UTF_8)); + } +}