From 529cacfe817bd7ff4b205a021d236130a7e2602f Mon Sep 17 00:00:00 2001 From: Ziver Koc Date: Tue, 1 Oct 2024 23:25:37 +0200 Subject: [PATCH] Implemented wildcards for MQTT and downgraded SQLite to the version that supports retrieving last inserted key --- Jenkinsfile | 1 + build.gradle | 2 +- src/zutil/db/DBConnection.java | 4 +- src/zutil/net/mqtt/MqttBroker.java | 140 ++++++++++++----- test/zutil/net/mqtt/MqttBrokerTest.java | 195 +++++++++++++++++++++++- 5 files changed, 299 insertions(+), 43 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 5eb8989..f62011e 100755 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -28,6 +28,7 @@ pipeline { withGradle { sh "./gradlew ${gradleParams} test" } + junit './build/test-reports/test/*.xml' } } diff --git a/build.gradle b/build.gradle index 4d16ed4..8b2f42d 100644 --- a/build.gradle +++ b/build.gradle @@ -29,7 +29,7 @@ repositories { dependencies { implementation 'org.dom4j:dom4j:2.1.3' - implementation 'org.xerial:sqlite-jdbc:3.46.1.0' + implementation 'org.xerial:sqlite-jdbc:3.42.0.1' compileOnly 'mysql:mysql-connector-java:8.0.28' compileOnly 'javax.servlet:javax.servlet-api:3.1.0' diff --git a/src/zutil/db/DBConnection.java b/src/zutil/db/DBConnection.java index 5660c63..9a3ecc0 100755 --- a/src/zutil/db/DBConnection.java +++ b/src/zutil/db/DBConnection.java @@ -142,7 +142,7 @@ public class DBConnection implements Closeable{ ResultSet result = null; try { result = stmt.getGeneratedKeys(); - if (result != null && !result.isBeforeFirst()) { + if (result != null) { return new SimpleSQLResult().handleQueryResult(stmt, result); } } catch (SQLException e) { @@ -161,7 +161,7 @@ public class DBConnection implements Closeable{ /** * Runs a Prepared Statement.
- * NOTE: Don't forget to close the PreparedStatement or it can lead to memory leaks + * NOTE: Don't forget to close the PreparedStatement, or it can lead to memory leaks * * @param sql is the SQL query to run * @return an PreparedStatement diff --git a/src/zutil/net/mqtt/MqttBroker.java b/src/zutil/net/mqtt/MqttBroker.java index a771e4b..6c46b53 100755 --- a/src/zutil/net/mqtt/MqttBroker.java +++ b/src/zutil/net/mqtt/MqttBroker.java @@ -42,8 +42,11 @@ import java.net.Socket; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * TODO: @@ -57,7 +60,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer { public static final int MQTT_PORT_TLS = 8883; public static final int MQTT_PROTOCOL_VERSION = 0x04; // MQTT 3.1.1 - private List globalListeners = new ArrayList<>(); + private List wildcardSubscribers = new CopyOnWriteArrayList<>(); private Map topics = new ConcurrentHashMap<>(); protected static class MqttTopic { @@ -68,6 +71,11 @@ public class MqttBroker extends ThreadedTCPNetworkServer { public List subscribers = new ArrayList<>(0); // Stat with low capacity to save memory } + protected static class MqttWildcardSubscriber { + public Pattern topicPattern; + public MqttSubscriptionListener listener; + } + public MqttBroker() throws IOException { super(MQTT_PORT); @@ -113,40 +121,42 @@ public class MqttBroker extends ThreadedTCPNetworkServer { return; if (topicData.retainedPayload == null && - topicData.subscribers.isEmpty()) { + getSubscriberCount(topic) <= 0) { logger.finest("Removing empty topic: " + topic); topics.remove(topic); } } /** - * @return the subscriber count for the specific topic, -1 if - * topic does not exist or has not been created yet. + * @return the subscriber count for the specific topic. */ public int getSubscriberCount(String topicName) { - MqttTopic topicData = topics.get(topicName); + int subscribers = 0; - if (topicData != null) - return topicData.subscribers.size(); - return -1; + // Check wildcard subscribers + for (MqttWildcardSubscriber subscriber : wildcardSubscribers) { + if (subscriber.topicPattern.matcher(topicName).matches()) { + ++subscribers; + } + } + + // Check topic subscribers + MqttTopic topicData = topics.get(topicName); + if (topicData != null) { + subscribers += topicData.subscribers.size(); + } + + return subscribers; } /** - * Assign a listener that will be called for any topic. + * Assign a listener that will subscribe to all topics. + * This is same as calling subscribe with the topic "#" * - * @param listener the listener that will be called. + * @param listener the listener that will be called when new data is published. */ public void addGlobalSubscriber(MqttSubscriptionListener listener) { - globalListeners.add(listener); - } - - /** - * Remove the given listener from global subscribers. - * - * @param listener the listener that should not be called anymore. - */ - public void removeGlobalSubscriber(MqttSubscriptionListener listener) { - globalListeners.remove(listener); + subscribe("#", listener); } /** @@ -159,19 +169,57 @@ public class MqttBroker extends ThreadedTCPNetworkServer { if (topicName == null || topicName.isEmpty() || listener == null) return; - MqttTopic topicData = createTopic(topicName); + // Is it a wildcard topic? + if (topicName.contains("#") || topicName.contains("+")) { + MqttWildcardSubscriber subscriber = new MqttWildcardSubscriber(); + subscriber.listener = listener; + subscriber.topicPattern = generatePatternFromTopic(topicName); + wildcardSubscribers.add(subscriber); - if (!topicData.subscribers.contains(listener)) { - topicData.subscribers.add(listener); - logger.finer("New subscriber on topic: " + topicName + " (subscriber count: " + topicData.subscribers.size() + ")"); + // Send any retained messages for matched topics + for (MqttTopic topicData : topics.values()) { + if (topicData.retainedPayload != null && subscriber.topicPattern.matcher(topicData.name).matches()) { + listener.dataPublished(topicName, topicData.retainedPayload); + } + } + } else { + MqttTopic topicData = createTopic(topicName); - // Send any retained messages for the topic - if (topicData.retainedPayload != null) { - listener.dataPublished(topicName, topicData.retainedPayload); + if (!topicData.subscribers.contains(listener)) { + topicData.subscribers.add(listener); + logger.finer("New subscriber on topic: " + topicName + " (subscriber count: " + topicData.subscribers.size() + ")"); + + // Send any retained messages for the topic + if (topicData.retainedPayload != null) { + listener.dataPublished(topicName, topicData.retainedPayload); + } } } } + /** + * @param topicName a topic name containing wildcard characters + * @return a Pattern object matching the wildcard behaviour specified in the topic name. + */ + protected static Pattern generatePatternFromTopic(String topicName) { + // Error checking + if (topicName.contains("#")) { + if (topicName.indexOf("#") != topicName.length() - 1) + throw new IllegalArgumentException("Topic wildcard # is required to be the last character in the topic name"); + if (topicName.length() > 1 && topicName.charAt(topicName.length() - 2) != '/') + throw new IllegalArgumentException("Topic wildcard # is required to be after a topic separator /"); + } + if (topicName.contains("+")) { + if (Pattern.compile("[^/]\\+").matcher(topicName).find()) + throw new IllegalArgumentException("Topic wildcard + is required to be after a topic separator / or the first character of the topic String"); + } + + topicName = topicName.replaceFirst("^#", Matcher.quoteReplacement(".*")); + topicName = topicName.replaceFirst("/#", Matcher.quoteReplacement("($|/.*)")); + topicName = topicName.replaceFirst(Pattern.quote("+"), Matcher.quoteReplacement("[^/]*")); + return Pattern.compile(topicName); + } + /** * Publish data to the specific topic. * @@ -191,23 +239,29 @@ public class MqttBroker extends ThreadedTCPNetworkServer { public void publish(String topicName, byte[] data) { logger.finest("Data has been published to topic: " + topicName); - if (globalListeners != null) { - publishToSubscribers(globalListeners, topicName, data); + if (!topicName.startsWith("$")) { + // Publish to wildcard subscribers + for (MqttWildcardSubscriber subscriber : wildcardSubscribers) { + if (subscriber.topicPattern.matcher(topicName).matches()) { + publishToSubscriber(subscriber.listener, topicName, data); + } + } } + // Publish to topic subscribers MqttTopic topicData = topics.get(topicName); if (topicData != null) { - publishToSubscribers(topicData.subscribers, topicName, data); + for (MqttSubscriptionListener subscriber : topicData.subscribers) { + publishToSubscriber(subscriber, topicName, data); + } } } - private static void publishToSubscribers(List subscriberList, String topicName, byte[] data) { - for (MqttSubscriptionListener subscriber : subscriberList) { - try { - subscriber.dataPublished(topicName, data); - } catch (Exception e) { - logger.log(Level.WARNING, "MqttSubscriptionListener threw exception.", e); - } + private static void publishToSubscriber(MqttSubscriptionListener subscriber, String topicName, byte[] data) { + try { + subscriber.dataPublished(topicName, data); + } catch (Exception e) { + logger.log(Level.WARNING, "MqttSubscriptionListener threw exception.", e); } } @@ -241,13 +295,20 @@ public class MqttBroker extends ThreadedTCPNetworkServer { if (listener == null) return; + // Search wildcard subscribers + for (MqttWildcardSubscriber subscriber : wildcardSubscribers) { + if (subscriber.listener.equals(listener)) { + wildcardSubscribers.remove(subscriber); + } + } + // Search topic subscribers for (MqttTopic topic : topics.values()) { unsubscribe(topic.name, listener); } } /** - * Unsubscribe the listener from the specific MQTT topic. + * Unsubscribe the listener from the specific MQTT topic. This does not apply to wildcard listeners. * * @param topicName the specific topic that should be unsubscribed from. * @param listener the target listener that should be unsubscribed. @@ -459,6 +520,9 @@ public class MqttBroker extends ThreadedTCPNetworkServer { MqttPacketSubscribeAck subscribeAckPacket = new MqttPacketSubscribeAck(); subscribeAckPacket.packetId = subscribePacket.packetId; + // TODO: handle topic wildcard # + // TODO: topic starting with $ should not be matched to wildcards + for (MqttSubscribePayload payload : subscribePacket.payloads) { logger.finer("[" + clientId + "] Subscribing to topic: " + payload.topicFilter); broker.subscribe(payload.topicFilter, this); diff --git a/test/zutil/net/mqtt/MqttBrokerTest.java b/test/zutil/net/mqtt/MqttBrokerTest.java index 998c80c..1dd32a0 100644 --- a/test/zutil/net/mqtt/MqttBrokerTest.java +++ b/test/zutil/net/mqtt/MqttBrokerTest.java @@ -33,6 +33,7 @@ import zutil.net.mqtt.packet.MqttPacketUnsubscribe.MqttUnsubscribePayload; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.LinkedList; +import java.util.regex.Pattern; import static org.junit.Assert.*; @@ -61,6 +62,11 @@ public class MqttBrokerTest { receivedTopic = topic; receivedPayload = data; } + + public void clear() { + receivedTopic = null; + receivedPayload = null; + } }; //**************** Test Cases ************************** @@ -126,6 +132,191 @@ public class MqttBrokerTest { assertEquals(1, broker.getSubscriberCount("topic2")); } + @Test + public void generatePatternFromTopic() { + assertEquals("test", MqttBroker.generatePatternFromTopic("test").pattern()); + + assertEquals(".*", MqttBroker.generatePatternFromTopic("#").pattern()); + assertEquals("test($|/.*)", MqttBroker.generatePatternFromTopic("test/#").pattern()); + + assertEquals("[^/]*", MqttBroker.generatePatternFromTopic("+").pattern()); + assertEquals("test/[^/]*", MqttBroker.generatePatternFromTopic("test/+").pattern()); + } + + @Test + public void subscribeMultiLevelWildcard() throws IOException { + MqttBroker broker = new MqttBroker(); + MqttSubscriptionListenerMock subscriber = new MqttSubscriptionListenerMock(); + + /* + Invalid subscriptions + - "sport/tennis#" not valid. disconnect + - "sport/tennis/#/ranking" is not valid + - "sport/tennis/#/ranking/#" is not valid + */ + assertThrows(IllegalArgumentException.class, () -> + broker.subscribe("sport/tennis#", subscriber) + ); + assertThrows(IllegalArgumentException.class, () -> + broker.subscribe("sport/tennis/#/ranking", subscriber) + ); + assertThrows(IllegalArgumentException.class, () -> + broker.subscribe("sport/tennis/#/ranking/#", subscriber) + ); + + /* + Global Subscriber + - "#" valid and will receive every Application Message + */ + broker.subscribe("#", subscriber); + assertEquals(1, broker.getSubscriberCount("sport/")); + assertEquals(1, broker.getSubscriberCount("sport/tennis")); + assertEquals(1, broker.getSubscriberCount("test/")); + + broker.unsubscribe(subscriber); + assertEquals(0, broker.getSubscriberCount("sport/")); + assertEquals(0, broker.getSubscriberCount("sport/tennis")); + assertEquals(0, broker.getSubscriberCount("test/")); + + /* + For example, if a Client subscribes to "sport/tennis/player1/#", it would receive messages published using these topic names: + - "sport/tennis/player1" match + - "sport/tennis/player1/ranking" match + - "sport/tennis/player1/score/wimbledon" match + */ + broker.subscribe("sport/tennis/player1/#", subscriber); + + broker.publish("sport/tennis/player1", "test"); + assertEquals("sport/tennis/player1", subscriber.receivedTopic); + assertEquals("test", new String(subscriber.receivedPayload)); + + subscriber.clear(); + broker.publish("sport/tennis/player1/ranking", "test2"); + assertEquals("sport/tennis/player1/ranking", subscriber.receivedTopic); + assertEquals("test2", new String(subscriber.receivedPayload)); + + subscriber.clear(); + broker.publish("sport/tennis/player1/score/wimbledon", "test3"); + assertEquals("sport/tennis/player1/score/wimbledon", subscriber.receivedTopic); + assertEquals("test3", new String(subscriber.receivedPayload)); + } + + @Test + public void subscribeSingleLevelWildcard() throws IOException { + MqttBroker broker = new MqttBroker(); + MqttSubscriptionListenerMock subscriber = new MqttSubscriptionListenerMock(); + MqttSubscriptionListenerMock subscriber2 = new MqttSubscriptionListenerMock(); + MqttSubscriptionListenerMock subscriber3 = new MqttSubscriptionListenerMock(); + + /* + Not Valid + - "sport+" is not valid + */ + assertThrows(IllegalArgumentException.class, () -> + broker.subscribe("sport+", subscriber) + ); + assertThrows(IllegalArgumentException.class, () -> + broker.subscribe("sport/+/test+/tennis", subscriber) + ); + + /* + Root Subscription + - "+" is valid + */ + broker.subscribe("+", subscriber); + + subscriber.clear(); + broker.publish("sport", "testPayload"); + assertEquals("sport", subscriber.receivedTopic); + + subscriber.clear(); + broker.publish("test", "testPayload"); + assertEquals("test", subscriber.receivedTopic); + + /* + For example, "sport/tennis/+" matches: + - "sport/tennis" no match + - "sport/tennis/" match + - "sport/tennis/player1" match + - "sport/tennis/player2" match + - "sport/tennis/player1/ranking" no match + */ + broker.unsubscribe(subscriber); + broker.subscribe("sport/tennis/+", subscriber); + + subscriber.clear(); + broker.publish("sport/tennis", "testPayload"); + assertEquals(null, subscriber.receivedTopic); + + subscriber.clear(); + broker.publish("sport/tennis/", "testPayload"); + assertEquals("sport/tennis/", subscriber.receivedTopic); + + subscriber.clear(); + broker.publish("sport/tennis/player1", "testPayload"); + assertEquals("sport/tennis/player1", subscriber.receivedTopic); + + subscriber.clear(); + broker.publish("sport/tennis/player2", "testPayload"); + assertEquals("sport/tennis/player2", subscriber.receivedTopic); + + subscriber.clear(); + broker.publish("sport/tennis/player1/ranking", "testPayload"); + assertEquals(null, subscriber.receivedTopic); + + /* + Special Wildcards + - "sport/+/player1" is valid + - "/finance" matches "+/+" and "/+", but not "+" + */ + broker.unsubscribe(subscriber); + broker.subscribe("sport/+/player1", subscriber); + subscriber.clear(); + broker.publish("sport/game/player1", "testPayload"); + assertEquals("sport/game/player1", subscriber.receivedTopic); + + subscriber.clear(); + subscriber2.clear(); + subscriber3.clear(); + broker.unsubscribe(subscriber); + broker.unsubscribe(subscriber2); + broker.unsubscribe(subscriber3); + broker.subscribe("+/+", subscriber); + broker.subscribe("/+", subscriber2); + broker.subscribe("+", subscriber3); + + broker.publish("/finance", "testPayload"); + // TODO: assertEquals("/finance", subscriber.receivedTopic); + assertEquals("/finance", subscriber2.receivedTopic); + assertEquals(null, subscriber3.receivedTopic); + } + + @Test + public void subscribeMixedWildcard() throws IOException { + MqttBroker broker = new MqttBroker(); + MqttSubscriptionListenerMock subscriber = new MqttSubscriptionListenerMock(); + + /* + - "+/tennis/#" is valid + */ + subscriber.clear(); + broker.unsubscribe(subscriber); + broker.subscribe("+/tennis/#", subscriber); + + broker.publish("sport/tennis/game/one", "testPayload"); + assertEquals("sport/tennis/game/one", subscriber.receivedTopic); + + /* + - "/finance/+/tennis/#" is valid + */ + subscriber.clear(); + broker.unsubscribe(subscriber); + broker.subscribe("/finance/+/tennis/#", subscriber); + + broker.publish("/finance/sport/tennis/game/one", "testPayload"); + assertEquals("/finance/sport/tennis/game/one", subscriber.receivedTopic); + } + @Test public void publish() throws IOException { // Setup broker @@ -246,7 +437,7 @@ public class MqttBrokerTest { assertEquals(MqttPacketUnsubscribeAck.class, responsePacket.getClass()); assertEquals(unsubscribePacket.packetId, ((MqttPacketUnsubscribeAck)responsePacket).packetId); // Check broker - assertEquals(-1, broker.getSubscriberCount("topic1")); + assertEquals(0, broker.getSubscriberCount("topic1")); //************************ New subscriber @@ -270,7 +461,7 @@ public class MqttBrokerTest { thread.handlePacket(unsubscribePacket); // Check broker - assertEquals(-1, broker.getSubscriberCount("topic1")); + assertEquals(0, broker.getSubscriberCount("topic1")); } @Test