diff --git a/src/zutil/net/mqtt/MqttBroker.java b/src/zutil/net/mqtt/MqttBroker.java index aeb7bd5..db40e31 100755 --- a/src/zutil/net/mqtt/MqttBroker.java +++ b/src/zutil/net/mqtt/MqttBroker.java @@ -43,8 +43,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer { private boolean shutdown = false; - protected MqttConnectionThread() { - } // Test constructor + protected MqttConnectionThread() {} // Test constructor public MqttConnectionThread(Socket s) throws IOException { socket = s; @@ -69,7 +68,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer { // Incorrect protocol version? if (conn.protocolLevel != MQTT_PROTOCOL_VERSION) { connectAck.returnCode = MqttPacketConnectAck.RETCODE_PROT_VER_ERROR; - MqttPacket.write(out, connectAck); + sendPacket(connectAck); return; } else { connectAck.returnCode = MqttPacketConnectAck.RETCODE_OK; @@ -77,7 +76,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer { // TODO: authenticate // TODO: clean session - MqttPacket.write(out, connectAck); + sendPacket(connectAck); // Connected @@ -86,10 +85,10 @@ public class MqttBroker extends ThreadedTCPNetworkServer { if (packet == null) return; - MqttPacketHeader packetRsp = handleMqttPacket(packet); + MqttPacketHeader packetRsp = handlePacket(packet); if (packetRsp != null) - MqttPacket.write(out, packetRsp); + sendPacket(packetRsp); } socket.close(); @@ -104,7 +103,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer { } } - public MqttPacketHeader handleMqttPacket(MqttPacketHeader packet) throws IOException { + public MqttPacketHeader handlePacket(MqttPacketHeader packet) throws IOException { switch (packet.type) { // TODO: Publish case MqttPacketHeader.PACKET_TYPE_PUBLISH: @@ -128,6 +127,10 @@ public class MqttBroker extends ThreadedTCPNetworkServer { return null; } + public void sendPacket(MqttPacketHeader packet) throws IOException { + MqttPacket.write(out, packet); + } + public boolean isShutdown() { return shutdown; } diff --git a/test/zutil/net/mqtt/MqttBrokerTest.java b/test/zutil/net/mqtt/MqttBrokerTest.java index 1e4a5d3..c6f08bd 100644 --- a/test/zutil/net/mqtt/MqttBrokerTest.java +++ b/test/zutil/net/mqtt/MqttBrokerTest.java @@ -3,30 +3,41 @@ package zutil.net.mqtt; import org.junit.Test; import zutil.net.mqtt.MqttBroker.MqttConnectionThread; import zutil.net.mqtt.packet.MqttPacketDisconnect; +import zutil.net.mqtt.packet.MqttPacketHeader; import zutil.net.mqtt.packet.MqttPacketPingReq; import zutil.net.mqtt.packet.MqttPacketPingResp; import java.io.IOException; +import java.util.LinkedList; +import java.util.Queue; import static org.junit.Assert.*; public class MqttBrokerTest { + public static class MqttConnectionMockThread extends MqttConnectionThread { + public LinkedList sentPackets = new LinkedList<>(); + + @Override + public void sendPacket(MqttPacketHeader packet){ + sentPackets.add(packet); + } + } @Test public void ping() throws IOException { - MqttConnectionThread thread = new MqttConnectionThread(); + MqttConnectionMockThread thread = new MqttConnectionMockThread(); MqttPacketPingReq pingPacket = new MqttPacketPingReq(); - assertEquals(MqttPacketPingResp.class, thread.handleMqttPacket(pingPacket).getClass()); + assertEquals(MqttPacketPingResp.class, thread.handlePacket(pingPacket).getClass()); } @Test public void disconnect() throws IOException { - MqttConnectionThread thread = new MqttConnectionThread(); + MqttConnectionMockThread thread = new MqttConnectionMockThread(); MqttPacketDisconnect disconnectPacket = new MqttPacketDisconnect(); - assertEquals(null, thread.handleMqttPacket(disconnectPacket)); + assertEquals(null, thread.handlePacket(disconnectPacket)); assertTrue(thread.isShutdown()); } } \ No newline at end of file