From c4f7823ba38f84c1a8420b8e0a8c57eb700c1df4 Mon Sep 17 00:00:00 2001 From: Ziver Koc Date: Wed, 24 Dec 2025 03:01:50 +0100 Subject: [PATCH] Implemented MQTT QoS 1 --- src/zutil/net/mqtt/MqttBroker.java | 36 ++++++++++++++++++++----- test/zutil/net/mqtt/MqttBrokerTest.java | 28 ++++++++++++++++++- 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/src/zutil/net/mqtt/MqttBroker.java b/src/zutil/net/mqtt/MqttBroker.java index 3561f02..146447b 100755 --- a/src/zutil/net/mqtt/MqttBroker.java +++ b/src/zutil/net/mqtt/MqttBroker.java @@ -464,8 +464,6 @@ public class MqttBroker extends ThreadedTCPNetworkServer { } protected void handlePacket(MqttPacketHeader packet) throws IOException { - // TODO: QOS 1 - // TODO: QOS 2 // TODO: handle connection timeout switch (packet.type) { @@ -502,11 +500,25 @@ public class MqttBroker extends ThreadedTCPNetworkServer { } - private void handlePublish(MqttPacketPublish publishPacket) { - if (publishPacket.getFlagQoS() != 0) - throw new UnsupportedOperationException("QoS larger then 0 not supported (value: " + publishPacket.getFlagQoS() + ")."); + private void handlePublish(MqttPacketPublish publishPacket) throws IOException { + // TODO: QOS 2 + + if (publishPacket.getFlagQoS() == 2) + throw new UnsupportedOperationException("QoS value of 2 not supported."); if (publishPacket.getFlagQoS() >= 3) - throw new IllegalArgumentException("QoS value of 3 is not valid."); + throw new IllegalArgumentException("QoS value of 3 or larger is not valid value (received QoS: " + publishPacket.getFlagQoS() + ")."); + + // QoS=0, the Receiver: + // - Accepts ownership of the message when it receives the PUBLISH packet. + + // QoS=1, the Receiver + // - MUST respond with a PUBACK Packet containing the Packet Identifier from the incoming PUBLISH Packet, having accepted ownership of the Application Message + // - After it has sent a PUBACK Packet the Receiver MUST treat any incoming PUBLISH packet that contains the same Packet Identifier as being a new publication, irrespective of the setting of its DUP flag. + if (publishPacket.getFlagQoS() == 1) { + MqttPacketPublishAck publishAckPacket = new MqttPacketPublishAck(); + publishAckPacket.packetId = publishPacket.packetId; + sendPacket(publishAckPacket); + } logger.finer("[" + clientId + "] Publishing to topic: " + publishPacket.topicName); broker.publish(publishPacket.topicName, publishPacket.payload); @@ -549,6 +561,12 @@ public class MqttBroker extends ThreadedTCPNetworkServer { } + /** + * Handle data that has been published to the clients subscriptions + * + * @param topic the name of the topic + * @param data the published data + */ @Override public void dataPublished(String topic, byte[] data) { try { @@ -559,13 +577,17 @@ public class MqttBroker extends ThreadedTCPNetworkServer { publishPacket.payload = data; sendPacket(publishPacket); - // TODO: QOS 1 // TODO: QOS 2 } catch (IOException e) { logger.log(Level.WARNING, "Was unable to publish topic to client " + clientId, e); } } + /** + * Will publish this clients will packet. + * + * @throws IOException + */ private void sendWillPacket() throws IOException { if (willPacket != null) { logger.fine("[" + clientId + "] Publishing will packet."); diff --git a/test/zutil/net/mqtt/MqttBrokerTest.java b/test/zutil/net/mqtt/MqttBrokerTest.java index 1dd32a0..208a832 100644 --- a/test/zutil/net/mqtt/MqttBrokerTest.java +++ b/test/zutil/net/mqtt/MqttBrokerTest.java @@ -1,7 +1,7 @@ /* * The MIT License (MIT) * - * Copyright (c) 2020 Ziver Koc + * Copyright (c) 2020-2025 Ziver Koc * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -388,6 +388,32 @@ public class MqttBrokerTest { assertEquals( null, subscriber2.receivedPayload); } + @Test + public void publishQoS1() throws IOException { + // Setup broker + MqttBroker broker = new MqttBroker(); + MqttConnectionThreadMock thread = new MqttConnectionThreadMock(broker); + MqttSubscriptionListenerMock subscriber = new MqttSubscriptionListenerMock(); + broker.subscribe("test/topic", subscriber); + + // Setup publish + MqttPacketPublish publish = new MqttPacketPublish(); + publish.topicName = "test/topic"; + publish.setFlagQoS(1); + publish.packetId = 33; + publish.payload = new byte[]{42}; + + thread.handlePacket(publish); + + // Check response + assertEquals(1, thread.sentPackets.size()); + MqttPacketPublishAck ackPacket = (MqttPacketPublishAck) thread.sentPackets.get(0); + assertEquals(33, ackPacket.packetId); + + assertEquals("test/topic", subscriber.receivedTopic); + assertEquals((byte) 42, subscriber.receivedPayload[0]); + } + @Test public void publishBadQos() throws IOException { // Setup broker