diff --git a/src/zutil/net/mqtt/MqttBroker.java b/src/zutil/net/mqtt/MqttBroker.java index 4a31374..a771e4b 100755 --- a/src/zutil/net/mqtt/MqttBroker.java +++ b/src/zutil/net/mqtt/MqttBroker.java @@ -41,6 +41,7 @@ import java.io.InputStream; import java.net.Socket; import java.nio.charset.StandardCharsets; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; @@ -57,7 +58,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer { public static final int MQTT_PROTOCOL_VERSION = 0x04; // MQTT 3.1.1 private List globalListeners = new ArrayList<>(); - private Map topics = new HashMap<>(); + private Map topics = new ConcurrentHashMap<>(); protected static class MqttTopic { public String name; @@ -190,12 +191,23 @@ public class MqttBroker extends ThreadedTCPNetworkServer { public void publish(String topicName, byte[] data) { logger.finest("Data has been published to topic: " + topicName); - if (globalListeners != null) - globalListeners.forEach(listener -> listener.dataPublished(topicName, data)); + if (globalListeners != null) { + publishToSubscribers(globalListeners, topicName, data); + } MqttTopic topicData = topics.get(topicName); if (topicData != null) { - topicData.subscribers.forEach(subscriber -> subscriber.dataPublished(topicName, data)); + publishToSubscribers(topicData.subscribers, topicName, data); + } + } + + private static void publishToSubscribers(List subscriberList, String topicName, byte[] data) { + for (MqttSubscriptionListener subscriber : subscriberList) { + try { + subscriber.dataPublished(topicName, data); + } catch (Exception e) { + logger.log(Level.WARNING, "MqttSubscriptionListener threw exception.", e); + } } } @@ -365,7 +377,8 @@ public class MqttBroker extends ThreadedTCPNetworkServer { logger.fine("[" + socket.getInetAddress() + "] Registered will packet for topic: " + conn.willTopic); willPacket = new MqttPacketPublish(); willPacket.topicName = conn.willTopic; - willPacket.payload = conn.willPayload; + willPacket.setFlagQoS(conn.flagWillQoS); + willPacket.setFlagRetain(conn.flagWillRetain); } else { willPacket = null; } @@ -418,7 +431,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer { // Close connection default: logger.warning("[" + clientId + "] Received unknown packet type: " + packet.type + " (" + packet.getClass() + ")"); - sendWillPacket(); + sendWillPacket(); // Only publish will message in case of faulty disconnect /* FALLTHROUGH */ case MqttPacketHeader.PACKET_TYPE_DISCONNECT: willPacket = null; @@ -492,7 +505,13 @@ public class MqttBroker extends ThreadedTCPNetworkServer { private void sendWillPacket() throws IOException { if (willPacket != null) { logger.fine("[" + clientId + "] Publishing will packet."); + // TODO: Change to handlePublish when QoS is implemented broker.publish(willPacket.topicName, willPacket.payload); + + if (willPacket.getFlagRetain()) { + broker.retain(willPacket.topicName, willPacket.payload); + } + willPacket = null; } } diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublish.java b/src/zutil/net/mqtt/packet/MqttPacketPublish.java index 231f522..6432148 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublish.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublish.java @@ -27,6 +27,7 @@ package zutil.net.mqtt.packet; import zutil.ByteUtil; import zutil.converter.Converter; +import zutil.log.LogUtil; import zutil.parser.binary.BinaryFieldData; import zutil.parser.binary.BinaryFieldSerializer; import zutil.parser.binary.serializer.TwoByteLengthPrefixedDataSerializer; @@ -34,6 +35,7 @@ import zutil.parser.binary.serializer.TwoByteLengthPrefixedDataSerializer; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.logging.Logger; /** * A PUBLISH Control Packet is sent from a Client to a Server @@ -128,7 +130,7 @@ public class MqttPacketPublish extends MqttPacketHeader { private static class MqttPacketPublishPayloadSerializer implements BinaryFieldSerializer { @Override - public byte[] read(InputStream in, BinaryFieldData field) throws IOException { + public byte[] read(InputStream in, BinaryFieldData field) { return new byte[0]; } @@ -139,7 +141,15 @@ public class MqttPacketPublish extends MqttPacketHeader { int payloadLength = Math.max(0, publishPacket.variableHeaderAndPayloadLength - variableLength); byte[] payload = new byte[payloadLength]; - in.read(payload); + + for (int payloadOffset = 0; payloadOffset <= payloadLength; ) { + int readLength = in.read(payload, payloadOffset, payloadLength - payloadOffset); + if (readLength <= 0) + break; + + payloadOffset += readLength; + } + return payload; }