Some more work on MQTT

This commit is contained in:
Ziver Koc 2017-06-02 16:54:14 +02:00
parent 173a754088
commit 8b7f8a5bc4
14 changed files with 147 additions and 7 deletions

View file

@ -0,0 +1,114 @@
package zutil.net.mqtt;
import zutil.log.LogUtil;
import zutil.net.mqtt.packet.*;
import zutil.net.threaded.ThreadedTCPNetworkServer;
import zutil.net.threaded.ThreadedTCPNetworkServerThread;
import zutil.parser.binary.BinaryStructInputStream;
import zutil.parser.binary.BinaryStructOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* TODO:
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttBroker extends ThreadedTCPNetworkServer{
private static final Logger logger = LogUtil.getLogger();
public static final int MQTT_PORT = 1883;
public static final int MQTT_PORT_TLS = 8883;
public static final int MQTT_PROTOCOL_VERSION = 0x04; // MQTT 3.1.1
public MqttBroker(){
super(MQTT_PORT);
}
@Override
protected ThreadedTCPNetworkServerThread getThreadInstance(Socket s) throws IOException {
return new MQTTConnectionThread(s);
}
private static class MQTTConnectionThread implements ThreadedTCPNetworkServerThread {
private Socket socket;
private BinaryStructInputStream in;
private BinaryStructOutputStream out;
private MQTTConnectionThread(Socket s) throws IOException {
socket = s;
in = new BinaryStructInputStream(socket.getInputStream());
out = new BinaryStructOutputStream(socket.getOutputStream());
}
@Override
public void run() {
try {
// Setup connection
MqttPacket packet = MqttPacket.read(in);
// Unexpected packet?
if ( ! (packet.header instanceof MqttPacketConnect))
throw new IOException("Expected MqttPacketConnect but received "+packet.header.getClass());
MqttPacketConnect conn = (MqttPacketConnect) packet.header;
// Reply
MqttPacketConnectAck connack = new MqttPacketConnectAck();
connack.returnCode = MqttPacketConnectAck.RETCODE_OK;
// Incorrect protocol version?
if (conn.protocolLevel != MQTT_PROTOCOL_VERSION){
connack.returnCode = MqttPacketConnectAck.RETCODE_PROT_VER_ERROR;
MqttPacket.write(out, connack);
return;
}
// TODO: authenticate
// TODO: clean session
MqttPacket.write(out, connack);
// Connected
while (true) {
packet = MqttPacket.read(in);
if (packet == null)
return;
switch (packet.header.type){
// Publish
case MqttPacketHeader.PACKET_TYPE_PUBLISH:
break;
// Subscribe
case MqttPacketHeader.PACKET_TYPE_SUBSCRIBE:
break;
// Unsubscribe
case MqttPacketHeader.PACKET_TYPE_UNSUBSCRIBE:
break;
// Ping
case MqttPacketHeader.PACKET_TYPE_PINGREQ:
MqttPacket.write(out, new MqttPacketPingResp());
break;
// Close connection
default:
logger.warning("Received unknown packet type: "+packet.header.type);
case MqttPacketHeader.PACKET_TYPE_DISCONNECT:
return;
}
}
} catch (IOException e) {
logger.log(Level.SEVERE, null, e);
} finally {
try {
socket.close();
} catch (IOException e) {
logger.log(Level.SEVERE, null, e);
}
}
}
}
}

View file

@ -0,0 +1,9 @@
package zutil.net.mqtt;
/**
* TODO:
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html">MQTT v3.1.1 Spec</a>
*/
public class MqttClient {
}

View file

@ -66,6 +66,11 @@ public class MqttPacketConnect extends MqttPacketHeader {
public int keepAlive; public int keepAlive;
// Payload // Payload:
// - Client identifier
// - Will Topic
// - Will message
// - User name
// - Password
} }

View file

@ -17,13 +17,16 @@ public class MqttPacketConnectAck extends MqttPacketHeader{
public static final int RETCODE_NOT_AUTHORIZED = 5; public static final int RETCODE_NOT_AUTHORIZED = 5;
// Variable header
@BinaryField(index = 2000, length = 7) @BinaryField(index = 2000, length = 7)
private int flagReserved; private int flagReserved;
/** Indicates that there is a valid Session available */ /** Indicates that there is a valid Session available */
@BinaryField(index = 2001, length = 1) @BinaryField(index = 2001, length = 1)
public int flagSessionPresent; public boolean flagSessionPresent;
@BinaryField(index = 2002, length = 8) @BinaryField(index = 2002, length = 8)
public int returnCode; public int returnCode;
// No payload
} }

View file

@ -10,5 +10,7 @@ import zutil.parser.binary.BinaryStruct;
*/ */
public class MqttPacketDisconnect extends MqttPacketHeader{ public class MqttPacketDisconnect extends MqttPacketHeader{
// No variable header
// No payload
} }

View file

@ -25,6 +25,7 @@ public class MqttPacketHeader implements BinaryStruct {
// RESERVED = 15 // RESERVED = 15
// Fixed Header
@BinaryField(index = 1, length = 4) @BinaryField(index = 1, length = 4)
public byte type; public byte type;

View file

@ -9,5 +9,7 @@ import zutil.parser.binary.BinaryStruct;
*/ */
public class MqttPacketPingReq extends MqttPacketHeader{ public class MqttPacketPingReq extends MqttPacketHeader{
// No variable header
// No payload
} }

View file

@ -10,5 +10,7 @@ import zutil.parser.binary.BinaryStruct;
*/ */
public class MqttPacketPingResp extends MqttPacketHeader{ public class MqttPacketPingResp extends MqttPacketHeader{
// No variable header
// No payload
} }

View file

@ -34,6 +34,7 @@ public class MqttPacketPublish extends MqttPacketHeader{
public int packetId; public int packetId;
// Optional Payload // Payload
// - Application data
} }

View file

@ -14,5 +14,5 @@ public class MqttPacketPublishAck extends MqttPacketHeader{
@BinaryField(index = 2000, length = 16) @BinaryField(index = 2000, length = 16)
public int packetId; public int packetId;
// No payload
} }

View file

@ -16,5 +16,5 @@ public class MqttPacketPublishComp extends MqttPacketHeader{
@BinaryField(index = 2000, length = 16) @BinaryField(index = 2000, length = 16)
public int packetId; public int packetId;
// No payload
} }

View file

@ -16,5 +16,5 @@ public class MqttPacketPublishRec extends MqttPacketHeader{
@BinaryField(index = 2000, length = 16) @BinaryField(index = 2000, length = 16)
public int packetId; public int packetId;
// No payload
} }

View file

@ -16,5 +16,5 @@ public class MqttPacketPublishRel extends MqttPacketHeader{
@BinaryField(index = 2000, length = 16) @BinaryField(index = 2000, length = 16)
public int packetId; public int packetId;
// No Payload
} }

View file

@ -17,4 +17,5 @@ public class MqttPacketUnsubscribeAck extends MqttPacketHeader{
@BinaryField(index = 2000, length = 16) @BinaryField(index = 2000, length = 16)
public int packetId; public int packetId;
// No payload
} }