Added multiple global subscribers and improved logging

This commit is contained in:
Ziver Koc 2023-03-26 00:52:33 +01:00
parent 214bf0f40d
commit 46b608489d

View file

@ -54,7 +54,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
public static final int MQTT_PORT_TLS = 8883;
public static final int MQTT_PROTOCOL_VERSION = 0x04; // MQTT 3.1.1
private MqttSubscriptionListener globalListener;
private List<MqttSubscriptionListener> globalListeners = new ArrayList<>();
private Map<String, List<MqttSubscriptionListener>> subscriptionListeners = new HashMap<>();
@ -86,29 +86,43 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
}
/**
* Assign a listener that will be called for any topic. Provide null to disable.
* Assign a listener that will be called for any topic.
*
* @param listener the listener that will be called.
*/
public void setGlobalSubscriber(MqttSubscriptionListener listener) {
globalListener = listener;
public void addGlobalSubscriber(MqttSubscriptionListener listener) {
globalListeners.add(listener);
}
/**
* Remove the given listener from global subscribers.
*
* @param listener the listener that should not be called anymore.
*/
public void removeGlobalSubscriber(MqttSubscriptionListener listener) {
globalListeners.remove(listener);
}
/**
* Add the listener as a subscriber of the specific topic.
*
* @param topic the topic that will be subscribed to.
* @param listener the listener that will be called.
*/
public synchronized void subscribe(String topic, MqttSubscriptionListener listener) {
if (topic == null || topic.isEmpty() || listener == null)
return;
if (!subscriptionListeners.containsKey(topic)) {
logger.fine("Creating new topic: " + topic);
logger.finest("Creating new topic: " + topic);
subscriptionListeners.put(topic, new ArrayList<>());
}
List<MqttSubscriptionListener> topicSubscriptions = subscriptionListeners.get(topic);
if (!topicSubscriptions.contains(listener)) {
logger.finer("New subscriber on topic (" + topic + "), subscriber count: " + topicSubscriptions.size());
topicSubscriptions.add(listener);
logger.finer("New subscriber on topic: " + topic + " (subscriber count: " + topicSubscriptions.size() + ")");
}
}
@ -116,16 +130,14 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
* Publish data to the specific topic
*/
public void publish(String topic, byte[] data) {
logger.finer("Data has been published to topic (" + topic + ")");
logger.finer("Data has been published to topic: " + topic);
if (globalListener != null)
globalListener.dataPublished(topic, data);
if (globalListeners != null)
globalListeners.forEach(listener -> listener.dataPublished(topic, data));
List<MqttSubscriptionListener> topicSubscriptions = subscriptionListeners.get(topic);
if (topicSubscriptions != null) {
for (MqttSubscriptionListener subscriber : topicSubscriptions) {
subscriber.dataPublished(topic, data);
}
topicSubscriptions.forEach(subscriber -> subscriber.dataPublished(topic, data));
}
}
@ -154,10 +166,10 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
List<MqttSubscriptionListener> topicSubscriptions = subscriptionListeners.get(topic);
if (topicSubscriptions.remove(listener)) {
logger.finer("Subscriber unsubscribed from topic (" + topic + "), subscriber count: " + topicSubscriptions.size());
logger.finer("Subscriber unsubscribed from topic " + topic + " (subscriber count: " + topicSubscriptions.size() + ")");
if (topicSubscriptions.isEmpty()) {
logger.fine("Removing empty topic: " + topic);
logger.finest("Removing empty topic: " + topic);
subscriptionListeners.remove(topic);
}
}
@ -172,7 +184,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
private boolean disconnected = false;
/** A message that should be sent in case the connection to client is abnormally disconnected */
private MqttPacketHeader willPacket = null;
private MqttPacketPublish willPacket = null;
/** The maximum amount of time(seconds) to wait for activity from client, 0 means no timeout */
private int connectionTimeoutTime = 0;
@ -200,6 +212,8 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
public void run() {
try {
// Setup connection
logger.fine("[" + socket.getInetAddress() + "] New MQTT client connected.");
MqttPacketHeader connectPacket = MqttPacket.read(in);
handleConnect(connectPacket);
@ -214,11 +228,12 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
}
} catch (IOException e) {
logger.log(Level.SEVERE, null, e);
logger.log(Level.WARNING, "[" + socket.getInetAddress() + "] There was a issue with the client connection.", e);
} finally {
try {
sendWillPacket();
logger.fine("[" + socket.getInetAddress() + "] MQTT client disconnected.");
socket.close();
broker.unsubscribe(this);
} catch (IOException e) {
@ -266,9 +281,10 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
// Handle will message
if (conn.flagWillFlag) {
// TODO: Read will message from payload
//willPacket = xxx
//throw new UnsupportedOperationException("Will packet currently not supported.");
logger.fine("[" + socket.getInetAddress() + "] Registered will packet for topic: " + conn.willTopic);
willPacket = new MqttPacketPublish();
willPacket.topicName = conn.willTopic;
willPacket.payload = conn.willPayload;
} else {
willPacket = null;
}
@ -317,7 +333,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
// Close connection
default:
logger.warning("Received unknown packet type: " + packet.type + " (" + packet.getClass() + ")");
logger.warning("[" + socket.getInetAddress() + "] Received unknown packet type: " + packet.type + " (" + packet.getClass() + ")");
sendWillPacket();
/* FALLTHROUGH */
case MqttPacketHeader.PACKET_TYPE_DISCONNECT:
@ -332,6 +348,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
if (publishPacket.getFlagQoS() != 0)
throw new UnsupportedOperationException("QoS larger then 0 not supported.");
logger.finer("[" + socket.getInetAddress() + "] Publishing to topic: " + publishPacket.topicName);
broker.publish(publishPacket.topicName, publishPacket.payload);
}
@ -340,6 +357,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
subscribeAckPacket.packetId = subscribePacket.packetId;
for (MqttSubscribePayload payload : subscribePacket.payloads) {
logger.finer("[" + socket.getInetAddress() + "] Subscribing to topic: " + payload.topicFilter);
broker.subscribe(payload.topicFilter, this);
// Prepare response
@ -353,6 +371,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
private void handleUnsubscribe(MqttPacketUnsubscribe unsubscribePacket) throws IOException {
for (MqttUnsubscribePayload payload : unsubscribePacket.payloads) {
logger.finer("[" + socket.getInetAddress() + "] Unsubscribing from topic: " + payload.topicFilter);
broker.unsubscribe(payload.topicFilter, this);
}
@ -370,6 +389,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
private void sendWillPacket() throws IOException {
if (willPacket != null) {
logger.fine("[" + socket.getInetAddress() + "] Publishing will packet.");
sendPacket(willPacket);
willPacket = null;
}