Some cleanup in MQTT and added global listener.

This commit is contained in:
Ziver Koc 2020-06-22 12:49:00 +02:00
parent 704b5ffc36
commit 8d350d31c5
3 changed files with 33 additions and 25 deletions

View file

@ -1,2 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4" />

View file

@ -1,6 +1,5 @@
package zutil.net.mqtt; package zutil.net.mqtt;
import zutil.ObjectUtil;
import zutil.log.LogUtil; import zutil.log.LogUtil;
import zutil.net.mqtt.packet.*; import zutil.net.mqtt.packet.*;
import zutil.net.mqtt.packet.MqttPacketSubscribe.MqttSubscribePayload; import zutil.net.mqtt.packet.MqttPacketSubscribe.MqttSubscribePayload;
@ -29,11 +28,11 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
public static final int MQTT_PORT_TLS = 8883; public static final int MQTT_PORT_TLS = 8883;
public static final int MQTT_PROTOCOL_VERSION = 0x04; // MQTT 3.1.1 public static final int MQTT_PROTOCOL_VERSION = 0x04; // MQTT 3.1.1
private Map<String, List<MqttSubscriptionListener>> subscriptions; private MqttSubscriptionListener globalListener;
private Map<String, List<MqttSubscriptionListener>> subscriptionListeners = new HashMap<>();
public MqttBroker() { public MqttBroker() {
super(MQTT_PORT); super(MQTT_PORT);
subscriptions = new HashMap<>();
} }
@Override @Override
@ -47,13 +46,19 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
* topic does not exist or has not been created yet. * topic does not exist or has not been created yet.
*/ */
public int getSubscriberCount(String topic) { public int getSubscriberCount(String topic) {
List topicSubscriptions = subscriptions.get(topic); List<MqttSubscriptionListener> topicSubscriptions = subscriptionListeners.get(topic);
if (topicSubscriptions != null) {
if (topicSubscriptions != null)
return topicSubscriptions.size(); return topicSubscriptions.size();
}
return -1; return -1;
} }
/**
* Assign a listener that will be called for any topic. Provide null to disable.
*/
public void setGlobalSubscriber(MqttSubscriptionListener listener) {
globalListener = listener;
}
/** /**
* Add the listener as a subscriber of the specific topic. * Add the listener as a subscriber of the specific topic.
@ -62,11 +67,12 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
if (topic == null || topic.isEmpty() || listener == null) if (topic == null || topic.isEmpty() || listener == null)
return; return;
if (!subscriptions.containsKey(topic)) { if (!subscriptionListeners.containsKey(topic)) {
logger.fine("Creating new topic: " + topic); logger.fine("Creating new topic: " + topic);
subscriptions.put(topic, new ArrayList<>()); subscriptionListeners.put(topic, new ArrayList<>());
} }
List topicSubscriptions = subscriptions.get(topic);
List<MqttSubscriptionListener> topicSubscriptions = subscriptionListeners.get(topic);
if (!topicSubscriptions.contains(listener)) { if (!topicSubscriptions.contains(listener)) {
logger.finer("New subscriber on topic (" + topic + "), subscriber count: " + topicSubscriptions.size()); logger.finer("New subscriber on topic (" + topic + "), subscriber count: " + topicSubscriptions.size());
@ -78,13 +84,16 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
* Publish data to the specific topic * Publish data to the specific topic
*/ */
public void publish(String topic, byte[] data) { public void publish(String topic, byte[] data) {
if (!subscriptions.containsKey(topic)) { if (!subscriptionListeners.containsKey(topic)) {
logger.fine("Data was published to topic (" + topic + ") with no subscribers."); logger.fine("Data was published to topic (" + topic + ") with no subscribers.");
return; return;
} }
logger.finer("Data has been published to topic (" + topic + ")"); logger.finer("Data has been published to topic (" + topic + ")");
List<MqttSubscriptionListener> topicSubscriptions = subscriptions.get(topic); List<MqttSubscriptionListener> topicSubscriptions = subscriptionListeners.get(topic);
if (globalListener != null)
globalListener.dataPublished(topic, data);
for (MqttSubscriptionListener subscriber : topicSubscriptions) { for (MqttSubscriptionListener subscriber : topicSubscriptions) {
subscriber.dataPublished(topic, data); subscriber.dataPublished(topic, data);
@ -98,7 +107,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
if (listener == null) if (listener == null)
return; return;
for (String topic : subscriptions.keySet()){ for (String topic : subscriptionListeners.keySet()){
unsubscribe(topic, listener); unsubscribe(topic, listener);
} }
} }
@ -110,28 +119,29 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
if (topic == null || topic.isEmpty() || listener == null) if (topic == null || topic.isEmpty() || listener == null)
return; return;
if (!subscriptions.containsKey(topic)) if (!subscriptionListeners.containsKey(topic))
return; return;
List topicSubscriptions = subscriptions.get(topic);
List<MqttSubscriptionListener> topicSubscriptions = subscriptionListeners.get(topic);
if (topicSubscriptions.remove(listener)){ if (topicSubscriptions.remove(listener)){
logger.finer("Subscriber unsubscribed from topic (" + topic + "), subscriber count: " + topicSubscriptions.size()); logger.finer("Subscriber unsubscribed from topic (" + topic + "), subscriber count: " + topicSubscriptions.size());
if (topicSubscriptions.isEmpty()) { if (topicSubscriptions.isEmpty()) {
logger.fine("Removing empty topic: " + topic); logger.fine("Removing empty topic: " + topic);
subscriptions.remove(topic); subscriptionListeners.remove(topic);
} }
} }
} }
protected static class MqttConnectionThread implements ThreadedTCPNetworkServerThread, MqttSubscriptionListener { protected static class MqttConnectionThread implements ThreadedTCPNetworkServerThread, MqttSubscriptionListener {
private MqttBroker broker; private final MqttBroker broker;
private Socket socket; private Socket socket;
private BinaryStructInputStream in; private BinaryStructInputStream in;
private BinaryStructOutputStream out; private BinaryStructOutputStream out;
private boolean shutdown = false; private boolean disconnected = false;
/** /**
* Test constructor * Test constructor
@ -157,7 +167,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
// Connected // Connected
while (!shutdown) { while (!disconnected) {
MqttPacketHeader packet = MqttPacket.read(in); MqttPacketHeader packet = MqttPacket.read(in);
if (packet == null) if (packet == null)
return; return;
@ -226,13 +236,13 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
default: default:
logger.warning("Received unknown packet type: " + packet.type); logger.warning("Received unknown packet type: " + packet.type);
case MqttPacketHeader.PACKET_TYPE_DISCONNECT: case MqttPacketHeader.PACKET_TYPE_DISCONNECT:
shutdown = true; disconnected = true;
break; break;
} }
} }
private void handlePublish(MqttPacketPublish publishPacket) throws IOException { private void handlePublish(MqttPacketPublish publishPacket) {
if (publishPacket.getFlagQoS() != 0) if (publishPacket.getFlagQoS() != 0)
throw new UnsupportedOperationException("QoS larger then 0 not supported."); throw new UnsupportedOperationException("QoS larger then 0 not supported.");
@ -276,8 +286,8 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
MqttPacket.write(out, packet); MqttPacket.write(out, packet);
} }
public boolean isShutdown() { public boolean isDisconnected() {
return shutdown; return disconnected;
} }
} }

View file

@ -199,6 +199,6 @@ public class MqttBrokerTest {
// Check response // Check response
assertEquals(null, thread.sentPackets.poll()); assertEquals(null, thread.sentPackets.poll());
assertTrue(thread.isShutdown()); assertTrue(thread.isDisconnected());
} }
} }