From 46b608489dd0312b7ad256e962e917dc69555372 Mon Sep 17 00:00:00 2001 From: Ziver Koc Date: Sun, 26 Mar 2023 00:52:33 +0100 Subject: [PATCH] Added multiple global subscribers and improved logging --- src/zutil/net/mqtt/MqttBroker.java | 60 ++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 20 deletions(-) diff --git a/src/zutil/net/mqtt/MqttBroker.java b/src/zutil/net/mqtt/MqttBroker.java index d0ea86a..b4a72ce 100755 --- a/src/zutil/net/mqtt/MqttBroker.java +++ b/src/zutil/net/mqtt/MqttBroker.java @@ -54,7 +54,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer { public static final int MQTT_PORT_TLS = 8883; public static final int MQTT_PROTOCOL_VERSION = 0x04; // MQTT 3.1.1 - private MqttSubscriptionListener globalListener; + private List globalListeners = new ArrayList<>(); private Map> subscriptionListeners = new HashMap<>(); @@ -86,29 +86,43 @@ public class MqttBroker extends ThreadedTCPNetworkServer { } /** - * Assign a listener that will be called for any topic. Provide null to disable. + * Assign a listener that will be called for any topic. + * + * @param listener the listener that will be called. */ - public void setGlobalSubscriber(MqttSubscriptionListener listener) { - globalListener = listener; + public void addGlobalSubscriber(MqttSubscriptionListener listener) { + globalListeners.add(listener); + } + + /** + * Remove the given listener from global subscribers. + * + * @param listener the listener that should not be called anymore. + */ + public void removeGlobalSubscriber(MqttSubscriptionListener listener) { + globalListeners.remove(listener); } /** * Add the listener as a subscriber of the specific topic. + * + * @param topic the topic that will be subscribed to. + * @param listener the listener that will be called. */ public synchronized void subscribe(String topic, MqttSubscriptionListener listener) { if (topic == null || topic.isEmpty() || listener == null) return; if (!subscriptionListeners.containsKey(topic)) { - logger.fine("Creating new topic: " + topic); + logger.finest("Creating new topic: " + topic); subscriptionListeners.put(topic, new ArrayList<>()); } List topicSubscriptions = subscriptionListeners.get(topic); if (!topicSubscriptions.contains(listener)) { - logger.finer("New subscriber on topic (" + topic + "), subscriber count: " + topicSubscriptions.size()); topicSubscriptions.add(listener); + logger.finer("New subscriber on topic: " + topic + " (subscriber count: " + topicSubscriptions.size() + ")"); } } @@ -116,16 +130,14 @@ public class MqttBroker extends ThreadedTCPNetworkServer { * Publish data to the specific topic */ public void publish(String topic, byte[] data) { - logger.finer("Data has been published to topic (" + topic + ")"); + logger.finer("Data has been published to topic: " + topic); - if (globalListener != null) - globalListener.dataPublished(topic, data); + if (globalListeners != null) + globalListeners.forEach(listener -> listener.dataPublished(topic, data)); List topicSubscriptions = subscriptionListeners.get(topic); if (topicSubscriptions != null) { - for (MqttSubscriptionListener subscriber : topicSubscriptions) { - subscriber.dataPublished(topic, data); - } + topicSubscriptions.forEach(subscriber -> subscriber.dataPublished(topic, data)); } } @@ -154,10 +166,10 @@ public class MqttBroker extends ThreadedTCPNetworkServer { List topicSubscriptions = subscriptionListeners.get(topic); if (topicSubscriptions.remove(listener)) { - logger.finer("Subscriber unsubscribed from topic (" + topic + "), subscriber count: " + topicSubscriptions.size()); + logger.finer("Subscriber unsubscribed from topic " + topic + " (subscriber count: " + topicSubscriptions.size() + ")"); if (topicSubscriptions.isEmpty()) { - logger.fine("Removing empty topic: " + topic); + logger.finest("Removing empty topic: " + topic); subscriptionListeners.remove(topic); } } @@ -172,7 +184,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer { private boolean disconnected = false; /** A message that should be sent in case the connection to client is abnormally disconnected */ - private MqttPacketHeader willPacket = null; + private MqttPacketPublish willPacket = null; /** The maximum amount of time(seconds) to wait for activity from client, 0 means no timeout */ private int connectionTimeoutTime = 0; @@ -200,6 +212,8 @@ public class MqttBroker extends ThreadedTCPNetworkServer { public void run() { try { // Setup connection + + logger.fine("[" + socket.getInetAddress() + "] New MQTT client connected."); MqttPacketHeader connectPacket = MqttPacket.read(in); handleConnect(connectPacket); @@ -214,11 +228,12 @@ public class MqttBroker extends ThreadedTCPNetworkServer { } } catch (IOException e) { - logger.log(Level.SEVERE, null, e); + logger.log(Level.WARNING, "[" + socket.getInetAddress() + "] There was a issue with the client connection.", e); } finally { try { sendWillPacket(); + logger.fine("[" + socket.getInetAddress() + "] MQTT client disconnected."); socket.close(); broker.unsubscribe(this); } catch (IOException e) { @@ -266,9 +281,10 @@ public class MqttBroker extends ThreadedTCPNetworkServer { // Handle will message if (conn.flagWillFlag) { - // TODO: Read will message from payload - //willPacket = xxx - //throw new UnsupportedOperationException("Will packet currently not supported."); + logger.fine("[" + socket.getInetAddress() + "] Registered will packet for topic: " + conn.willTopic); + willPacket = new MqttPacketPublish(); + willPacket.topicName = conn.willTopic; + willPacket.payload = conn.willPayload; } else { willPacket = null; } @@ -317,7 +333,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer { // Close connection default: - logger.warning("Received unknown packet type: " + packet.type + " (" + packet.getClass() + ")"); + logger.warning("[" + socket.getInetAddress() + "] Received unknown packet type: " + packet.type + " (" + packet.getClass() + ")"); sendWillPacket(); /* FALLTHROUGH */ case MqttPacketHeader.PACKET_TYPE_DISCONNECT: @@ -332,6 +348,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer { if (publishPacket.getFlagQoS() != 0) throw new UnsupportedOperationException("QoS larger then 0 not supported."); + logger.finer("[" + socket.getInetAddress() + "] Publishing to topic: " + publishPacket.topicName); broker.publish(publishPacket.topicName, publishPacket.payload); } @@ -340,6 +357,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer { subscribeAckPacket.packetId = subscribePacket.packetId; for (MqttSubscribePayload payload : subscribePacket.payloads) { + logger.finer("[" + socket.getInetAddress() + "] Subscribing to topic: " + payload.topicFilter); broker.subscribe(payload.topicFilter, this); // Prepare response @@ -353,6 +371,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer { private void handleUnsubscribe(MqttPacketUnsubscribe unsubscribePacket) throws IOException { for (MqttUnsubscribePayload payload : unsubscribePacket.payloads) { + logger.finer("[" + socket.getInetAddress() + "] Unsubscribing from topic: " + payload.topicFilter); broker.unsubscribe(payload.topicFilter, this); } @@ -370,6 +389,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer { private void sendWillPacket() throws IOException { if (willPacket != null) { + logger.fine("[" + socket.getInetAddress() + "] Publishing will packet."); sendPacket(willPacket); willPacket = null; }