This commit is contained in:
Ziver Koc 2018-11-08 20:39:53 +01:00
commit 6215c8a01a
17 changed files with 173 additions and 52 deletions

View file

@ -17,7 +17,7 @@ import java.util.logging.Logger;
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttBroker extends ThreadedTCPNetworkServer{
public class MqttBroker extends ThreadedTCPNetworkServer {
private static final Logger logger = LogUtil.getLogger();
public static final int MQTT_PORT = 1883;
@ -25,24 +25,28 @@ public class MqttBroker extends ThreadedTCPNetworkServer{
public static final int MQTT_PROTOCOL_VERSION = 0x04; // MQTT 3.1.1
public MqttBroker(){
public MqttBroker() {
super(MQTT_PORT);
}
@Override
protected ThreadedTCPNetworkServerThread getThreadInstance(Socket s) throws IOException {
return new MQTTConnectionThread(s);
return new MqttConnectionThread(s);
}
private static class MQTTConnectionThread implements ThreadedTCPNetworkServerThread {
protected static class MqttConnectionThread implements ThreadedTCPNetworkServerThread {
private Socket socket;
private BinaryStructInputStream in;
private BinaryStructOutputStream out;
private boolean shutdown = false;
private MQTTConnectionThread(Socket s) throws IOException {
protected MqttConnectionThread() {
} // Test constructor
public MqttConnectionThread(Socket s) throws IOException {
socket = s;
in = new BinaryStructInputStream(socket.getInputStream());
out = new BinaryStructOutputStream(socket.getOutputStream());
@ -53,53 +57,42 @@ public class MqttBroker extends ThreadedTCPNetworkServer{
public void run() {
try {
// Setup connection
MqttPacketHeader packet = MqttPacket.read(in);
MqttPacketHeader connectPacket = MqttPacket.read(in);
// Unexpected packet?
if ( ! (packet instanceof MqttPacketConnect))
throw new IOException("Expected MqttPacketConnect but received "+packet.getClass());
MqttPacketConnect conn = (MqttPacketConnect) packet;
if (!(connectPacket instanceof MqttPacketConnect))
throw new IOException("Expected MqttPacketConnect but received " + connectPacket.getClass());
MqttPacketConnect conn = (MqttPacketConnect) connectPacket;
// Reply
MqttPacketConnectAck connack = new MqttPacketConnectAck();
connack.returnCode = MqttPacketConnectAck.RETCODE_OK;
MqttPacketConnectAck connectAck = new MqttPacketConnectAck();
// Incorrect protocol version?
if (conn.protocolLevel != MQTT_PROTOCOL_VERSION){
connack.returnCode = MqttPacketConnectAck.RETCODE_PROT_VER_ERROR;
MqttPacket.write(out, connack);
if (conn.protocolLevel != MQTT_PROTOCOL_VERSION) {
connectAck.returnCode = MqttPacketConnectAck.RETCODE_PROT_VER_ERROR;
MqttPacket.write(out, connectAck);
return;
} else {
connectAck.returnCode = MqttPacketConnectAck.RETCODE_OK;
}
// TODO: authenticate
// TODO: clean session
MqttPacket.write(out, connack);
MqttPacket.write(out, connectAck);
// Connected
while (true) {
packet = MqttPacket.read(in);
while (!shutdown) {
MqttPacketHeader packet = MqttPacket.read(in);
if (packet == null)
return;
switch (packet.type){
// Publish
case MqttPacketHeader.PACKET_TYPE_PUBLISH:
break;
// Subscribe
case MqttPacketHeader.PACKET_TYPE_SUBSCRIBE:
break;
// Unsubscribe
case MqttPacketHeader.PACKET_TYPE_UNSUBSCRIBE:
break;
// Ping
case MqttPacketHeader.PACKET_TYPE_PINGREQ:
MqttPacket.write(out, new MqttPacketPingResp());
break;
// Close connection
default:
logger.warning("Received unknown packet type: "+packet.type);
case MqttPacketHeader.PACKET_TYPE_DISCONNECT:
return;
}
MqttPacketHeader packetRsp = handleMqttPacket(packet);
if (packetRsp != null)
MqttPacket.write(out, packetRsp);
}
socket.close();
} catch (IOException e) {
logger.log(Level.SEVERE, null, e);
} finally {
@ -110,5 +103,33 @@ public class MqttBroker extends ThreadedTCPNetworkServer{
}
}
}
public MqttPacketHeader handleMqttPacket(MqttPacketHeader packet) throws IOException {
switch (packet.type) {
// TODO: Publish
case MqttPacketHeader.PACKET_TYPE_PUBLISH:
break;
// TODO: Subscribe
case MqttPacketHeader.PACKET_TYPE_SUBSCRIBE:
break;
// TODO: Unsubscribe
case MqttPacketHeader.PACKET_TYPE_UNSUBSCRIBE:
break;
// Ping
case MqttPacketHeader.PACKET_TYPE_PINGREQ:
return new MqttPacketPingResp();
// Close connection
default:
logger.warning("Received unknown packet type: " + packet.type);
case MqttPacketHeader.PACKET_TYPE_DISCONNECT:
shutdown = true;
}
return null;
}
public boolean isShutdown() {
return shutdown;
}
}
}

View file

@ -48,20 +48,6 @@ public class MqttPacket {
}
public static void write(BinaryStructOutputStream out, MqttPacketHeader header) throws IOException{
if (header instanceof MqttPacketConnect) header.type = PACKET_TYPE_CONN;
else if (header instanceof MqttPacketConnectAck) header.type = PACKET_TYPE_CONNACK;
else if (header instanceof MqttPacketPublishAck) header.type = PACKET_TYPE_PUBLISH;
else if (header instanceof MqttPacketPublishRec) header.type = PACKET_TYPE_PUBACK;
else if (header instanceof MqttPacketPublishComp) header.type = PACKET_TYPE_PUBREL;
else if (header instanceof MqttPacketSubscribe) header.type = PACKET_TYPE_PUBCOMP;
else if (header instanceof MqttPacketSubscribeAck) header.type = PACKET_TYPE_SUBSCRIBE;
else if (header instanceof MqttPacketUnsubscribe) header.type = PACKET_TYPE_UNSUBSCRIBE;
else if (header instanceof MqttPacketUnsubscribeAck) header.type = PACKET_TYPE_UNSUBACK;
else if (header instanceof MqttPacketPingReq) header.type = PACKET_TYPE_PINGREQ;
else if (header instanceof MqttPacketPingResp) header.type = PACKET_TYPE_PINGRESP;
else if (header instanceof MqttPacketDisconnect) header.type = PACKET_TYPE_DISCONNECT;
else
throw new IOException("Unknown header class: "+ header.getClass());
out.write(header);
// TODO: payload

View file

@ -9,6 +9,12 @@ package zutil.net.mqtt.packet;
*/
public class MqttPacketConnect extends MqttPacketHeader {
// Header
{
type = MqttPacketHeader.PACKET_TYPE_CONN;
}
// Variable header
@BinaryField(index = 2001, length = 16)

View file

@ -14,6 +14,11 @@ public class MqttPacketConnectAck extends MqttPacketHeader{
public static final int RETCODE_BADD_USER_OR_PASS = 4;
public static final int RETCODE_NOT_AUTHORIZED = 5;
// Header
{
type = MqttPacketHeader.PACKET_TYPE_CONNACK;
}
// Variable header

View file

@ -9,6 +9,12 @@ package zutil.net.mqtt.packet;
*/
public class MqttPacketDisconnect extends MqttPacketHeader{
// Header
{
type = MqttPacketHeader.PACKET_TYPE_DISCONNECT;
}
// No variable header
// No payload

View file

@ -7,6 +7,12 @@ package zutil.net.mqtt.packet;
*/
public class MqttPacketPingReq extends MqttPacketHeader{
// Header
{
type = MqttPacketHeader.PACKET_TYPE_PINGREQ;
}
// No variable header
// No payload

View file

@ -9,6 +9,12 @@ package zutil.net.mqtt.packet;
*/
public class MqttPacketPingResp extends MqttPacketHeader{
// Header
{
type = MqttPacketHeader.PACKET_TYPE_PINGRESP;
}
// No variable header
// No payload

View file

@ -8,6 +8,12 @@ package zutil.net.mqtt.packet;
*/
public class MqttPacketPublish extends MqttPacketHeader {
// Header
{
type = MqttPacketHeader.PACKET_TYPE_PUBLISH;
}
// Static Header
/*
@BinaryField(index = 2000, length = 1)
@ -20,7 +26,6 @@ public class MqttPacketPublish extends MqttPacketHeader {
@CustomBinaryField(index = 3, serializer = MqttVariableIntSerializer.class)
private int length;
*/
// Variable Header
@BinaryField(index = 2101, length = 16)

View file

@ -8,6 +8,12 @@ package zutil.net.mqtt.packet;
*/
public class MqttPacketPublishAck extends MqttPacketHeader{
// Header
{
type = MqttPacketHeader.PACKET_TYPE_PUBACK;
}
// Variable Header
@BinaryField(index = 2000, length = 16)

View file

@ -10,6 +10,12 @@ package zutil.net.mqtt.packet;
*/
public class MqttPacketPublishComp extends MqttPacketHeader{
// Header
{
type = MqttPacketHeader.PACKET_TYPE_PUBCOMP;
}
// Variable Header
@BinaryField(index = 2000, length = 16)

View file

@ -10,6 +10,12 @@ package zutil.net.mqtt.packet;
*/
public class MqttPacketPublishRec extends MqttPacketHeader{
// Header
{
type = MqttPacketHeader.PACKET_TYPE_PUBREC;
}
// Variable Header
@BinaryField(index = 2000, length = 16)

View file

@ -10,6 +10,12 @@ package zutil.net.mqtt.packet;
*/
public class MqttPacketPublishRel extends MqttPacketHeader{
// Header
{
type = MqttPacketHeader.PACKET_TYPE_PUBREL;
}
// Variable Header
@BinaryField(index = 2000, length = 16)

View file

@ -11,6 +11,12 @@ import java.util.List;
*/
public class MqttPacketSubscribe extends MqttPacketHeader{
// Header
{
type = MqttPacketHeader.PACKET_TYPE_SUBSCRIBE;
}
// Variable Header
@BinaryField(index = 2000, length = 16)

View file

@ -12,6 +12,12 @@ import java.util.List;
*/
public class MqttPacketSubscribeAck extends MqttPacketHeader{
// Header
{
type = MqttPacketHeader.PACKET_TYPE_SUBACK;
}
// Variable Header
@BinaryField(index = 2000, length = 16)

View file

@ -11,6 +11,12 @@ import java.util.List;
*/
public class MqttPacketUnsubscribe extends MqttPacketHeader{
// Header
{
type = MqttPacketHeader.PACKET_TYPE_UNSUBACK;
}
// Variable Header
@BinaryField(index = 2000, length = 16)

View file

@ -9,6 +9,12 @@ package zutil.net.mqtt.packet;
*/
public class MqttPacketUnsubscribeAck extends MqttPacketHeader{
// Header
{
type = MqttPacketHeader.PACKET_TYPE_UNSUBACK;
}
// Variable Header
@BinaryField(index = 2000, length = 16)