Mocked MQTT thread
This commit is contained in:
parent
6215c8a01a
commit
a1caa8b0e1
2 changed files with 25 additions and 11 deletions
|
|
@ -43,8 +43,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
|
||||||
|
|
||||||
private boolean shutdown = false;
|
private boolean shutdown = false;
|
||||||
|
|
||||||
protected MqttConnectionThread() {
|
protected MqttConnectionThread() {} // Test constructor
|
||||||
} // Test constructor
|
|
||||||
|
|
||||||
public MqttConnectionThread(Socket s) throws IOException {
|
public MqttConnectionThread(Socket s) throws IOException {
|
||||||
socket = s;
|
socket = s;
|
||||||
|
|
@ -69,7 +68,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
|
||||||
// Incorrect protocol version?
|
// Incorrect protocol version?
|
||||||
if (conn.protocolLevel != MQTT_PROTOCOL_VERSION) {
|
if (conn.protocolLevel != MQTT_PROTOCOL_VERSION) {
|
||||||
connectAck.returnCode = MqttPacketConnectAck.RETCODE_PROT_VER_ERROR;
|
connectAck.returnCode = MqttPacketConnectAck.RETCODE_PROT_VER_ERROR;
|
||||||
MqttPacket.write(out, connectAck);
|
sendPacket(connectAck);
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
connectAck.returnCode = MqttPacketConnectAck.RETCODE_OK;
|
connectAck.returnCode = MqttPacketConnectAck.RETCODE_OK;
|
||||||
|
|
@ -77,7 +76,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
|
||||||
|
|
||||||
// TODO: authenticate
|
// TODO: authenticate
|
||||||
// TODO: clean session
|
// TODO: clean session
|
||||||
MqttPacket.write(out, connectAck);
|
sendPacket(connectAck);
|
||||||
|
|
||||||
// Connected
|
// Connected
|
||||||
|
|
||||||
|
|
@ -86,10 +85,10 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
|
||||||
if (packet == null)
|
if (packet == null)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
MqttPacketHeader packetRsp = handleMqttPacket(packet);
|
MqttPacketHeader packetRsp = handlePacket(packet);
|
||||||
|
|
||||||
if (packetRsp != null)
|
if (packetRsp != null)
|
||||||
MqttPacket.write(out, packetRsp);
|
sendPacket(packetRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
socket.close();
|
socket.close();
|
||||||
|
|
@ -104,7 +103,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public MqttPacketHeader handleMqttPacket(MqttPacketHeader packet) throws IOException {
|
public MqttPacketHeader handlePacket(MqttPacketHeader packet) throws IOException {
|
||||||
switch (packet.type) {
|
switch (packet.type) {
|
||||||
// TODO: Publish
|
// TODO: Publish
|
||||||
case MqttPacketHeader.PACKET_TYPE_PUBLISH:
|
case MqttPacketHeader.PACKET_TYPE_PUBLISH:
|
||||||
|
|
@ -128,6 +127,10 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void sendPacket(MqttPacketHeader packet) throws IOException {
|
||||||
|
MqttPacket.write(out, packet);
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isShutdown() {
|
public boolean isShutdown() {
|
||||||
return shutdown;
|
return shutdown;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,30 +3,41 @@ package zutil.net.mqtt;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import zutil.net.mqtt.MqttBroker.MqttConnectionThread;
|
import zutil.net.mqtt.MqttBroker.MqttConnectionThread;
|
||||||
import zutil.net.mqtt.packet.MqttPacketDisconnect;
|
import zutil.net.mqtt.packet.MqttPacketDisconnect;
|
||||||
|
import zutil.net.mqtt.packet.MqttPacketHeader;
|
||||||
import zutil.net.mqtt.packet.MqttPacketPingReq;
|
import zutil.net.mqtt.packet.MqttPacketPingReq;
|
||||||
import zutil.net.mqtt.packet.MqttPacketPingResp;
|
import zutil.net.mqtt.packet.MqttPacketPingResp;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.Queue;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
public class MqttBrokerTest {
|
public class MqttBrokerTest {
|
||||||
|
|
||||||
|
public static class MqttConnectionMockThread extends MqttConnectionThread {
|
||||||
|
public LinkedList<MqttPacketHeader> sentPackets = new LinkedList<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sendPacket(MqttPacketHeader packet){
|
||||||
|
sentPackets.add(packet);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void ping() throws IOException {
|
public void ping() throws IOException {
|
||||||
MqttConnectionThread thread = new MqttConnectionThread();
|
MqttConnectionMockThread thread = new MqttConnectionMockThread();
|
||||||
MqttPacketPingReq pingPacket = new MqttPacketPingReq();
|
MqttPacketPingReq pingPacket = new MqttPacketPingReq();
|
||||||
|
|
||||||
assertEquals(MqttPacketPingResp.class, thread.handleMqttPacket(pingPacket).getClass());
|
assertEquals(MqttPacketPingResp.class, thread.handlePacket(pingPacket).getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void disconnect() throws IOException {
|
public void disconnect() throws IOException {
|
||||||
MqttConnectionThread thread = new MqttConnectionThread();
|
MqttConnectionMockThread thread = new MqttConnectionMockThread();
|
||||||
MqttPacketDisconnect disconnectPacket = new MqttPacketDisconnect();
|
MqttPacketDisconnect disconnectPacket = new MqttPacketDisconnect();
|
||||||
|
|
||||||
assertEquals(null, thread.handleMqttPacket(disconnectPacket));
|
assertEquals(null, thread.handlePacket(disconnectPacket));
|
||||||
assertTrue(thread.isShutdown());
|
assertTrue(thread.isShutdown());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue