Implemented wildcards for MQTT and downgraded SQLite to the version that supports retrieving last inserted key

This commit is contained in:
Ziver Koc 2024-10-01 23:25:37 +02:00
parent 1de26a7240
commit 529cacfe81
5 changed files with 299 additions and 43 deletions

View file

@ -142,7 +142,7 @@ public class DBConnection implements Closeable{
ResultSet result = null;
try {
result = stmt.getGeneratedKeys();
if (result != null && !result.isBeforeFirst()) {
if (result != null) {
return new SimpleSQLResult<Integer>().handleQueryResult(stmt, result);
}
} catch (SQLException e) {
@ -161,7 +161,7 @@ public class DBConnection implements Closeable{
/**
* Runs a Prepared Statement.<br>
* <b>NOTE:</b> Don't forget to close the PreparedStatement or it can lead to memory leaks
* <b>NOTE:</b> Don't forget to close the PreparedStatement, or it can lead to memory leaks
*
* @param sql is the SQL query to run
* @return an PreparedStatement

View file

@ -42,8 +42,11 @@ import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* TODO:
@ -57,7 +60,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
public static final int MQTT_PORT_TLS = 8883;
public static final int MQTT_PROTOCOL_VERSION = 0x04; // MQTT 3.1.1
private List<MqttSubscriptionListener> globalListeners = new ArrayList<>();
private List<MqttWildcardSubscriber> wildcardSubscribers = new CopyOnWriteArrayList<>();
private Map<String, MqttTopic> topics = new ConcurrentHashMap<>();
protected static class MqttTopic {
@ -68,6 +71,11 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
public List<MqttSubscriptionListener> subscribers = new ArrayList<>(0); // Stat with low capacity to save memory
}
protected static class MqttWildcardSubscriber {
public Pattern topicPattern;
public MqttSubscriptionListener listener;
}
public MqttBroker() throws IOException {
super(MQTT_PORT);
@ -113,40 +121,42 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
return;
if (topicData.retainedPayload == null &&
topicData.subscribers.isEmpty()) {
getSubscriberCount(topic) <= 0) {
logger.finest("Removing empty topic: " + topic);
topics.remove(topic);
}
}
/**
* @return the subscriber count for the specific topic, -1 if
* topic does not exist or has not been created yet.
* @return the subscriber count for the specific topic.
*/
public int getSubscriberCount(String topicName) {
MqttTopic topicData = topics.get(topicName);
int subscribers = 0;
if (topicData != null)
return topicData.subscribers.size();
return -1;
// Check wildcard subscribers
for (MqttWildcardSubscriber subscriber : wildcardSubscribers) {
if (subscriber.topicPattern.matcher(topicName).matches()) {
++subscribers;
}
}
// Check topic subscribers
MqttTopic topicData = topics.get(topicName);
if (topicData != null) {
subscribers += topicData.subscribers.size();
}
return subscribers;
}
/**
* Assign a listener that will be called for any topic.
* Assign a listener that will subscribe to all topics.
* This is same as calling subscribe with the topic "#"
*
* @param listener the listener that will be called.
* @param listener the listener that will be called when new data is published.
*/
public void addGlobalSubscriber(MqttSubscriptionListener listener) {
globalListeners.add(listener);
}
/**
* Remove the given listener from global subscribers.
*
* @param listener the listener that should not be called anymore.
*/
public void removeGlobalSubscriber(MqttSubscriptionListener listener) {
globalListeners.remove(listener);
subscribe("#", listener);
}
/**
@ -159,19 +169,57 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
if (topicName == null || topicName.isEmpty() || listener == null)
return;
MqttTopic topicData = createTopic(topicName);
// Is it a wildcard topic?
if (topicName.contains("#") || topicName.contains("+")) {
MqttWildcardSubscriber subscriber = new MqttWildcardSubscriber();
subscriber.listener = listener;
subscriber.topicPattern = generatePatternFromTopic(topicName);
wildcardSubscribers.add(subscriber);
if (!topicData.subscribers.contains(listener)) {
topicData.subscribers.add(listener);
logger.finer("New subscriber on topic: " + topicName + " (subscriber count: " + topicData.subscribers.size() + ")");
// Send any retained messages for matched topics
for (MqttTopic topicData : topics.values()) {
if (topicData.retainedPayload != null && subscriber.topicPattern.matcher(topicData.name).matches()) {
listener.dataPublished(topicName, topicData.retainedPayload);
}
}
} else {
MqttTopic topicData = createTopic(topicName);
// Send any retained messages for the topic
if (topicData.retainedPayload != null) {
listener.dataPublished(topicName, topicData.retainedPayload);
if (!topicData.subscribers.contains(listener)) {
topicData.subscribers.add(listener);
logger.finer("New subscriber on topic: " + topicName + " (subscriber count: " + topicData.subscribers.size() + ")");
// Send any retained messages for the topic
if (topicData.retainedPayload != null) {
listener.dataPublished(topicName, topicData.retainedPayload);
}
}
}
}
/**
* @param topicName a topic name containing wildcard characters
* @return a Pattern object matching the wildcard behaviour specified in the topic name.
*/
protected static Pattern generatePatternFromTopic(String topicName) {
// Error checking
if (topicName.contains("#")) {
if (topicName.indexOf("#") != topicName.length() - 1)
throw new IllegalArgumentException("Topic wildcard # is required to be the last character in the topic name");
if (topicName.length() > 1 && topicName.charAt(topicName.length() - 2) != '/')
throw new IllegalArgumentException("Topic wildcard # is required to be after a topic separator /");
}
if (topicName.contains("+")) {
if (Pattern.compile("[^/]\\+").matcher(topicName).find())
throw new IllegalArgumentException("Topic wildcard + is required to be after a topic separator / or the first character of the topic String");
}
topicName = topicName.replaceFirst("^#", Matcher.quoteReplacement(".*"));
topicName = topicName.replaceFirst("/#", Matcher.quoteReplacement("($|/.*)"));
topicName = topicName.replaceFirst(Pattern.quote("+"), Matcher.quoteReplacement("[^/]*"));
return Pattern.compile(topicName);
}
/**
* Publish data to the specific topic.
*
@ -191,23 +239,29 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
public void publish(String topicName, byte[] data) {
logger.finest("Data has been published to topic: " + topicName);
if (globalListeners != null) {
publishToSubscribers(globalListeners, topicName, data);
if (!topicName.startsWith("$")) {
// Publish to wildcard subscribers
for (MqttWildcardSubscriber subscriber : wildcardSubscribers) {
if (subscriber.topicPattern.matcher(topicName).matches()) {
publishToSubscriber(subscriber.listener, topicName, data);
}
}
}
// Publish to topic subscribers
MqttTopic topicData = topics.get(topicName);
if (topicData != null) {
publishToSubscribers(topicData.subscribers, topicName, data);
for (MqttSubscriptionListener subscriber : topicData.subscribers) {
publishToSubscriber(subscriber, topicName, data);
}
}
}
private static void publishToSubscribers(List<MqttSubscriptionListener> subscriberList, String topicName, byte[] data) {
for (MqttSubscriptionListener subscriber : subscriberList) {
try {
subscriber.dataPublished(topicName, data);
} catch (Exception e) {
logger.log(Level.WARNING, "MqttSubscriptionListener threw exception.", e);
}
private static void publishToSubscriber(MqttSubscriptionListener subscriber, String topicName, byte[] data) {
try {
subscriber.dataPublished(topicName, data);
} catch (Exception e) {
logger.log(Level.WARNING, "MqttSubscriptionListener threw exception.", e);
}
}
@ -241,13 +295,20 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
if (listener == null)
return;
// Search wildcard subscribers
for (MqttWildcardSubscriber subscriber : wildcardSubscribers) {
if (subscriber.listener.equals(listener)) {
wildcardSubscribers.remove(subscriber);
}
}
// Search topic subscribers
for (MqttTopic topic : topics.values()) {
unsubscribe(topic.name, listener);
}
}
/**
* Unsubscribe the listener from the specific MQTT topic.
* Unsubscribe the listener from the specific MQTT topic. This does not apply to wildcard listeners.
*
* @param topicName the specific topic that should be unsubscribed from.
* @param listener the target listener that should be unsubscribed.
@ -459,6 +520,9 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
MqttPacketSubscribeAck subscribeAckPacket = new MqttPacketSubscribeAck();
subscribeAckPacket.packetId = subscribePacket.packetId;
// TODO: handle topic wildcard #
// TODO: topic starting with $ should not be matched to wildcards
for (MqttSubscribePayload payload : subscribePacket.payloads) {
logger.finer("[" + clientId + "] Subscribing to topic: " + payload.topicFilter);
broker.subscribe(payload.topicFilter, this);