Implemented MQTT QoS 1

This commit is contained in:
Ziver Koc 2025-12-24 03:01:50 +01:00
parent f31f850644
commit c4f7823ba3
2 changed files with 56 additions and 8 deletions

View file

@ -464,8 +464,6 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
} }
protected void handlePacket(MqttPacketHeader packet) throws IOException { protected void handlePacket(MqttPacketHeader packet) throws IOException {
// TODO: QOS 1
// TODO: QOS 2
// TODO: handle connection timeout // TODO: handle connection timeout
switch (packet.type) { switch (packet.type) {
@ -502,11 +500,25 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
} }
private void handlePublish(MqttPacketPublish publishPacket) { private void handlePublish(MqttPacketPublish publishPacket) throws IOException {
if (publishPacket.getFlagQoS() != 0) // TODO: QOS 2
throw new UnsupportedOperationException("QoS larger then 0 not supported (value: " + publishPacket.getFlagQoS() + ").");
if (publishPacket.getFlagQoS() == 2)
throw new UnsupportedOperationException("QoS value of 2 not supported.");
if (publishPacket.getFlagQoS() >= 3) if (publishPacket.getFlagQoS() >= 3)
throw new IllegalArgumentException("QoS value of 3 is not valid."); throw new IllegalArgumentException("QoS value of 3 or larger is not valid value (received QoS: " + publishPacket.getFlagQoS() + ").");
// QoS=0, the Receiver:
// - Accepts ownership of the message when it receives the PUBLISH packet.
// QoS=1, the Receiver
// - MUST respond with a PUBACK Packet containing the Packet Identifier from the incoming PUBLISH Packet, having accepted ownership of the Application Message
// - After it has sent a PUBACK Packet the Receiver MUST treat any incoming PUBLISH packet that contains the same Packet Identifier as being a new publication, irrespective of the setting of its DUP flag.
if (publishPacket.getFlagQoS() == 1) {
MqttPacketPublishAck publishAckPacket = new MqttPacketPublishAck();
publishAckPacket.packetId = publishPacket.packetId;
sendPacket(publishAckPacket);
}
logger.finer("[" + clientId + "] Publishing to topic: " + publishPacket.topicName); logger.finer("[" + clientId + "] Publishing to topic: " + publishPacket.topicName);
broker.publish(publishPacket.topicName, publishPacket.payload); broker.publish(publishPacket.topicName, publishPacket.payload);
@ -549,6 +561,12 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
} }
/**
* Handle data that has been published to the clients subscriptions
*
* @param topic the name of the topic
* @param data the published data
*/
@Override @Override
public void dataPublished(String topic, byte[] data) { public void dataPublished(String topic, byte[] data) {
try { try {
@ -559,13 +577,17 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
publishPacket.payload = data; publishPacket.payload = data;
sendPacket(publishPacket); sendPacket(publishPacket);
// TODO: QOS 1
// TODO: QOS 2 // TODO: QOS 2
} catch (IOException e) { } catch (IOException e) {
logger.log(Level.WARNING, "Was unable to publish topic to client " + clientId, e); logger.log(Level.WARNING, "Was unable to publish topic to client " + clientId, e);
} }
} }
/**
* Will publish this clients will packet.
*
* @throws IOException
*/
private void sendWillPacket() throws IOException { private void sendWillPacket() throws IOException {
if (willPacket != null) { if (willPacket != null) {
logger.fine("[" + clientId + "] Publishing will packet."); logger.fine("[" + clientId + "] Publishing will packet.");

View file

@ -1,7 +1,7 @@
/* /*
* The MIT License (MIT) * The MIT License (MIT)
* *
* Copyright (c) 2020 Ziver Koc * Copyright (c) 2020-2025 Ziver Koc
* *
* Permission is hereby granted, free of charge, to any person obtaining a copy * Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal * of this software and associated documentation files (the "Software"), to deal
@ -388,6 +388,32 @@ public class MqttBrokerTest {
assertEquals( null, subscriber2.receivedPayload); assertEquals( null, subscriber2.receivedPayload);
} }
@Test
public void publishQoS1() throws IOException {
// Setup broker
MqttBroker broker = new MqttBroker();
MqttConnectionThreadMock thread = new MqttConnectionThreadMock(broker);
MqttSubscriptionListenerMock subscriber = new MqttSubscriptionListenerMock();
broker.subscribe("test/topic", subscriber);
// Setup publish
MqttPacketPublish publish = new MqttPacketPublish();
publish.topicName = "test/topic";
publish.setFlagQoS(1);
publish.packetId = 33;
publish.payload = new byte[]{42};
thread.handlePacket(publish);
// Check response
assertEquals(1, thread.sentPackets.size());
MqttPacketPublishAck ackPacket = (MqttPacketPublishAck) thread.sentPackets.get(0);
assertEquals(33, ackPacket.packetId);
assertEquals("test/topic", subscriber.receivedTopic);
assertEquals((byte) 42, subscriber.receivedPayload[0]);
}
@Test @Test
public void publishBadQos() throws IOException { public void publishBadQos() throws IOException {
// Setup broker // Setup broker