Fixed issue where InputStream was not reading all the data for the buffer

This commit is contained in:
Ziver Koc 2024-09-09 23:35:05 +02:00
parent 09cce76707
commit 41be297132
2 changed files with 37 additions and 8 deletions

View file

@ -41,6 +41,7 @@ import java.io.InputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -57,7 +58,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
public static final int MQTT_PROTOCOL_VERSION = 0x04; // MQTT 3.1.1
private List<MqttSubscriptionListener> globalListeners = new ArrayList<>();
private Map<String, MqttTopic> topics = new HashMap<>();
private Map<String, MqttTopic> topics = new ConcurrentHashMap<>();
protected static class MqttTopic {
public String name;
@ -190,12 +191,23 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
public void publish(String topicName, byte[] data) {
logger.finest("Data has been published to topic: " + topicName);
if (globalListeners != null)
globalListeners.forEach(listener -> listener.dataPublished(topicName, data));
if (globalListeners != null) {
publishToSubscribers(globalListeners, topicName, data);
}
MqttTopic topicData = topics.get(topicName);
if (topicData != null) {
topicData.subscribers.forEach(subscriber -> subscriber.dataPublished(topicName, data));
publishToSubscribers(topicData.subscribers, topicName, data);
}
}
private static void publishToSubscribers(List<MqttSubscriptionListener> subscriberList, String topicName, byte[] data) {
for (MqttSubscriptionListener subscriber : subscriberList) {
try {
subscriber.dataPublished(topicName, data);
} catch (Exception e) {
logger.log(Level.WARNING, "MqttSubscriptionListener threw exception.", e);
}
}
}
@ -365,7 +377,8 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
logger.fine("[" + socket.getInetAddress() + "] Registered will packet for topic: " + conn.willTopic);
willPacket = new MqttPacketPublish();
willPacket.topicName = conn.willTopic;
willPacket.payload = conn.willPayload;
willPacket.setFlagQoS(conn.flagWillQoS);
willPacket.setFlagRetain(conn.flagWillRetain);
} else {
willPacket = null;
}
@ -418,7 +431,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
// Close connection
default:
logger.warning("[" + clientId + "] Received unknown packet type: " + packet.type + " (" + packet.getClass() + ")");
sendWillPacket();
sendWillPacket(); // Only publish will message in case of faulty disconnect
/* FALLTHROUGH */
case MqttPacketHeader.PACKET_TYPE_DISCONNECT:
willPacket = null;
@ -492,7 +505,13 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
private void sendWillPacket() throws IOException {
if (willPacket != null) {
logger.fine("[" + clientId + "] Publishing will packet.");
// TODO: Change to handlePublish when QoS is implemented
broker.publish(willPacket.topicName, willPacket.payload);
if (willPacket.getFlagRetain()) {
broker.retain(willPacket.topicName, willPacket.payload);
}
willPacket = null;
}
}

View file

@ -27,6 +27,7 @@ package zutil.net.mqtt.packet;
import zutil.ByteUtil;
import zutil.converter.Converter;
import zutil.log.LogUtil;
import zutil.parser.binary.BinaryFieldData;
import zutil.parser.binary.BinaryFieldSerializer;
import zutil.parser.binary.serializer.TwoByteLengthPrefixedDataSerializer;
@ -34,6 +35,7 @@ import zutil.parser.binary.serializer.TwoByteLengthPrefixedDataSerializer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.logging.Logger;
/**
* A PUBLISH Control Packet is sent from a Client to a Server
@ -128,7 +130,7 @@ public class MqttPacketPublish extends MqttPacketHeader {
private static class MqttPacketPublishPayloadSerializer implements BinaryFieldSerializer<byte[]> {
@Override
public byte[] read(InputStream in, BinaryFieldData field) throws IOException {
public byte[] read(InputStream in, BinaryFieldData field) {
return new byte[0];
}
@ -139,7 +141,15 @@ public class MqttPacketPublish extends MqttPacketHeader {
int payloadLength = Math.max(0, publishPacket.variableHeaderAndPayloadLength - variableLength);
byte[] payload = new byte[payloadLength];
in.read(payload);
for (int payloadOffset = 0; payloadOffset <= payloadLength; ) {
int readLength = in.read(payload, payloadOffset, payloadLength - payloadOffset);
if (readLength <= 0)
break;
payloadOffset += readLength;
}
return payload;
}