Added some bit operation functions

This commit is contained in:
Ziver Koc 2024-09-07 00:00:52 +02:00
parent 3cd3a2fc7c
commit e4d9a16ebd
3 changed files with 118 additions and 47 deletions

View file

@ -139,6 +139,13 @@ public class ByteUtil {
return (byte) BYTE_MASK[index][length];
}
/**
* @param b the byte to be bit inverted
* @return a new byte that has all the bits inverted.
*/
public static byte invert(byte b) {
return (byte) (0xFF ^ b);
}
/**
* Shifts a whole byte array to the right towards the LSB by the specified amount.

View file

@ -56,7 +56,15 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
public static final int MQTT_PROTOCOL_VERSION = 0x04; // MQTT 3.1.1
private List<MqttSubscriptionListener> globalListeners = new ArrayList<>();
private Map<String, List<MqttSubscriptionListener>> subscriptionListeners = new HashMap<>();
private Map<String, MqttTopic> topics = new HashMap<>();
protected static class MqttTopic {
public String name;
/** If set this will be sent as a publish message as soon as a client subscribes to this topic */
public byte[] retainedPayload;
public List<MqttSubscriptionListener> subscribers = new ArrayList<>(0); // Stat with low capacity to save memory
}
public MqttBroker() throws IOException {
@ -73,16 +81,51 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
return new MqttConnectionThread(this, s);
}
/**
* @param topic the topic name
* @return a new topic if it does not exist otherwise returns the existing topic.
*/
private MqttTopic createTopic(String topic) {
if (!topics.containsKey(topic)) {
logger.finest("Creating new topic: " + topic);
MqttTopic topicData = new MqttTopic();
topicData.name = topic;
topics.put(topic, topicData);
}
return topics.get(topic);
}
/**
* Will check if the topic is empty, if so its data will be cleared.
* A topic is empty if: <ul>
* <li>No subscribers</li>
* <li>No retained message</li>
* </ul>
*
* @param topic the topic name
*/
private void removeTopicIfEmpty(String topic) {
MqttTopic topicData = topics.get(topic);
if (topicData == null)
return;
if (topicData.retainedPayload == null &&
topicData.subscribers.isEmpty()) {
logger.finest("Removing empty topic: " + topic);
topics.remove(topic);
}
}
/**
* @return the subscriber count for the specific topic, -1 if
* topic does not exist or has not been created yet.
*/
public int getSubscriberCount(String topic) {
List<MqttSubscriptionListener> topicSubscriptions = subscriptionListeners.get(topic);
public int getSubscriberCount(String topicName) {
MqttTopic topicData = topics.get(topicName);
if (topicSubscriptions != null)
return topicSubscriptions.size();
if (topicData != null)
return topicData.subscribers.size();
return -1;
}
@ -107,51 +150,66 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
/**
* Add the listener as a subscriber of the specific topic.
*
* @param topic the topic that will be subscribed to.
* @param listener the listener that will be called.
* @param topicName the topic that will be subscribed to.
* @param listener the listener that will be called.
*/
public synchronized void subscribe(String topic, MqttSubscriptionListener listener) {
if (topic == null || topic.isEmpty() || listener == null)
public synchronized void subscribe(String topicName, MqttSubscriptionListener listener) {
if (topicName == null || topicName.isEmpty() || listener == null)
return;
if (!subscriptionListeners.containsKey(topic)) {
logger.finest("Creating new topic: " + topic);
subscriptionListeners.put(topic, new ArrayList<>());
}
MqttTopic topicData = createTopic(topicName);
List<MqttSubscriptionListener> topicSubscriptions = subscriptionListeners.get(topic);
if (!topicData.subscribers.contains(listener)) {
topicData.subscribers.add(listener);
logger.finer("New subscriber on topic: " + topicName + " (subscriber count: " + topicData.subscribers.size() + ")");
if (!topicSubscriptions.contains(listener)) {
topicSubscriptions.add(listener);
logger.finer("New subscriber on topic: " + topic + " (subscriber count: " + topicSubscriptions.size() + ")");
// Send any retained messages for the topic
if (topicData.retainedPayload != null) {
listener.dataPublished(topicName, topicData.retainedPayload);
}
}
}
/**
* Publish data to the specific topic.
*
* @param topic the topic where the data should be published to.
* @param data the data that should be published. String will be converted to UTF-8 byte array.
* @param topicName the topic where the data should be published to.
* @param data the data that should be published. String will be converted to UTF-8 byte array.
*/
public void publish(String topic, String data) {
publish(topic, data.getBytes(StandardCharsets.UTF_8));
public void publish(String topicName, String data) {
publish(topicName, data.getBytes(StandardCharsets.UTF_8));
}
/**
* Publish data to the specific topic.
*
* @param topic the topic where the data should be published to.
* @param data the data that should be published.
* @param topicName the topic where the data should be published to.
* @param data the data that should be published.
*/
public void publish(String topic, byte[] data) {
logger.finer("Data has been published to topic: " + topic);
public void publish(String topicName, byte[] data) {
logger.finer("Data has been published to topic: " + topicName);
if (globalListeners != null)
globalListeners.forEach(listener -> listener.dataPublished(topic, data));
globalListeners.forEach(listener -> listener.dataPublished(topicName, data));
List<MqttSubscriptionListener> topicSubscriptions = subscriptionListeners.get(topic);
if (topicSubscriptions != null) {
topicSubscriptions.forEach(subscriber -> subscriber.dataPublished(topic, data));
MqttTopic topicData = topics.get(topicName);
if (topicData != null) {
topicData.subscribers.forEach(subscriber -> subscriber.dataPublished(topicName, data));
}
}
/**
* Will retain the given publish message for the specific topic, the message will be sent to any new client subscribing to the topic.
* Note, only a single message is saved for each topic.
*
* @param topicName the topic where the message should be retained.
* @param data the payload to be stored for the topic. Set to null to clear retained message.
*/
public void retain(String topicName, byte[] data) {
MqttTopic topicData = createTopic(topicName);
if (topicData != null) {
topicData.retainedPayload = data;
}
}
@ -164,32 +222,27 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
if (listener == null)
return;
for (String topic : subscriptionListeners.keySet()) {
unsubscribe(topic, listener);
for (MqttTopic topic : topics.values()) {
unsubscribe(topic.name, listener);
}
}
/**
* Unsubscribe the listener from the specific MQTT topic.
*
* @param topic the specific topic that should be unsubscribed from.
* @param listener the target listener that should be unsubscribed.
* @param topicName the specific topic that should be unsubscribed from.
* @param listener the target listener that should be unsubscribed.
*/
public synchronized void unsubscribe(String topic, MqttSubscriptionListener listener) {
if (topic == null || topic.isEmpty() || listener == null)
public synchronized void unsubscribe(String topicName, MqttSubscriptionListener listener) {
if (topicName == null || topicName.isEmpty() || listener == null)
return;
if (!subscriptionListeners.containsKey(topic))
return;
MqttTopic topicData = topics.get(topicName);
if (topicData != null) {
if (topicData.subscribers.remove(listener)) {
logger.finer("Subscriber unsubscribed from topic " + topicName + " (subscriber count: " + topicData.subscribers.size() + ")");
List<MqttSubscriptionListener> topicSubscriptions = subscriptionListeners.get(topic);
if (topicSubscriptions.remove(listener)) {
logger.finer("Subscriber unsubscribed from topic " + topic + " (subscriber count: " + topicSubscriptions.size() + ")");
if (topicSubscriptions.isEmpty()) {
logger.finest("Removing empty topic: " + topic);
subscriptionListeners.remove(topic);
removeTopicIfEmpty(topicName);
}
}
}
@ -371,11 +424,15 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
private void handlePublish(MqttPacketPublish publishPacket) {
if (publishPacket.getFlagQoS() != 0)
throw new UnsupportedOperationException("QoS larger then 0 not supported.");
if (publishPacket.getFlagRetain())
throw new UnsupportedOperationException("Publish retain is not supported.");
if (publishPacket.getFlagQoS() >= 3)
throw new IllegalArgumentException("QoS value of 3 is not valid.");
logger.finer("[" + clientId + "] Publishing to topic: " + publishPacket.topicName);
broker.publish(publishPacket.topicName, publishPacket.payload);
if (publishPacket.getFlagRetain()) {
broker.retain(publishPacket.topicName, publishPacket.payload);
}
}
private void handleSubscribe(MqttPacketSubscribe subscribePacket) throws IOException {

View file

@ -157,4 +157,11 @@ public class ByteUtilTest {
assertArrayEquals( new byte[]{0b0000_0000,0b0000_0001,0b0000_0001,0b0000_0001},
ByteUtil.shiftLeft(new byte[]{0b0001_0000,0b0001_0000,0b0001_0000,0b0001_0000}, 4));
}
@Test
public void invert() {
assertEquals((byte) 0b1111_1110, ByteUtil.invert((byte) 0b0000_0001));
assertEquals((byte) 0b0000_1111, ByteUtil.invert((byte) 0b1111_0000));
assertEquals((byte) 0b0000_1000, ByteUtil.invert((byte) 0b1111_0111));
}
}