The network engin is working now. Added some new Message types
This commit is contained in:
parent
c0bf13e493
commit
cbd673472f
11 changed files with 184 additions and 106 deletions
|
|
@ -7,13 +7,15 @@ import java.nio.channels.Selector;
|
|||
import java.nio.channels.SocketChannel;
|
||||
import java.nio.channels.spi.SelectorProvider;
|
||||
|
||||
import ei.engine.network.message.type.ResponseRequestMessage;
|
||||
import ei.engine.network.response.ResponseEvent;
|
||||
import ei.engine.network.util.Converter;
|
||||
|
||||
public class NioClient extends NioNetwork{
|
||||
private SocketChannel socket;
|
||||
|
||||
public NioClient(InetAddress hostAddress, int port) throws IOException {
|
||||
super(hostAddress, port, CLIENT);
|
||||
socket = initiateConnection(new InetSocketAddress(hostAddress, port));
|
||||
}
|
||||
|
||||
protected Selector initSelector() throws IOException {
|
||||
|
|
@ -28,13 +30,7 @@ public class NioClient extends NioNetwork{
|
|||
* @param data The data to send
|
||||
* @throws IOException
|
||||
*/
|
||||
public void send(ResponseEvent handler, byte[] data) throws IOException {
|
||||
// Start a new connection
|
||||
SocketChannel socket = initiateConnection(new InetSocketAddress(hostAddress, port));
|
||||
public void send(ResponseEvent handler, ResponseRequestMessage data) throws IOException {
|
||||
send(socket, handler, data);
|
||||
}
|
||||
|
||||
public void send(ResponseEvent handler, Object data) throws IOException {
|
||||
send(handler, Converter.toBytes(data));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package ei.engine.network;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
|
|
@ -16,10 +17,9 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import ei.engine.network.message.Message;
|
||||
import ei.engine.network.message.StringMessage;
|
||||
import ei.engine.network.message.type.EchoMessage;
|
||||
import ei.engine.network.message.type.ResponseRequestMessage;
|
||||
import ei.engine.network.message.type.SystemMessage;
|
||||
import ei.engine.network.response.PrintRsp;
|
||||
import ei.engine.network.response.ResponseEvent;
|
||||
import ei.engine.network.server.ClientData;
|
||||
import ei.engine.network.util.Converter;
|
||||
|
|
@ -40,9 +40,9 @@ public abstract class NioNetwork implements Runnable {
|
|||
// The channel on which we'll accept connections
|
||||
protected ServerSocketChannel serverChannel;
|
||||
// The selector we'll be monitoring
|
||||
protected Selector selector;
|
||||
private Selector selector;
|
||||
// The buffer into which we'll read data when it's available
|
||||
protected ByteBuffer readBuffer = ByteBuffer.allocate(8192);
|
||||
private ByteBuffer readBuffer = ByteBuffer.allocate(8192);
|
||||
protected Worker worker;
|
||||
protected Worker systemWorker;
|
||||
|
||||
|
|
@ -50,12 +50,12 @@ public abstract class NioNetwork implements Runnable {
|
|||
protected Map<InetSocketAddress, ClientData> clients = new HashMap<InetSocketAddress, ClientData>();
|
||||
|
||||
// A list of PendingChange instances
|
||||
protected List<ChangeRequest> pendingChanges = new LinkedList<ChangeRequest>();
|
||||
private List<ChangeRequest> pendingChanges = new LinkedList<ChangeRequest>();
|
||||
// Maps a SocketChannel to a list of ByteBuffer instances
|
||||
protected Map<SocketChannel, List> pendingData = new HashMap<SocketChannel, List>();
|
||||
private Map<SocketChannel, List> pendingData = new HashMap<SocketChannel, List>();
|
||||
|
||||
// Maps a SocketChannel to a RspHandler
|
||||
protected Map<SocketChannel, ResponseEvent> rspEvents = Collections.synchronizedMap(new HashMap<SocketChannel, ResponseEvent>());
|
||||
private Map<Double, ResponseEvent> rspEvents = Collections.synchronizedMap(new HashMap<Double, ResponseEvent>());
|
||||
|
||||
/**
|
||||
* Create a nio network class
|
||||
|
|
@ -100,15 +100,11 @@ public abstract class NioNetwork implements Runnable {
|
|||
send(getSocketChannel(address), data);
|
||||
}
|
||||
|
||||
public void send(SocketChannel socket, ResponseEvent handler, Object data) throws IOException {
|
||||
send(socket, handler, Converter.toBytes(data));
|
||||
}
|
||||
|
||||
public void send(SocketChannel socket, ResponseEvent handler, byte[] data) throws IOException {
|
||||
public void send(SocketChannel socket, ResponseEvent handler, ResponseRequestMessage data) throws IOException {
|
||||
// Register the response handler
|
||||
rspEvents.put(socket, handler);
|
||||
rspEvents.put(data.getResponseId(), handler);
|
||||
|
||||
queueSend(socket,data);
|
||||
queueSend(socket,Converter.toBytes(data));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -129,10 +125,6 @@ public abstract class NioNetwork implements Runnable {
|
|||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void queueSend(SocketChannel socket, byte[] data){
|
||||
synchronized (pendingChanges) {
|
||||
// Indicate we want the interest ops set changed
|
||||
pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));
|
||||
}
|
||||
System.out.println("pendingData");
|
||||
// And queue the data we want written
|
||||
synchronized (pendingData) {
|
||||
|
|
@ -141,8 +133,12 @@ public abstract class NioNetwork implements Runnable {
|
|||
queue = new ArrayList<ByteBuffer>();
|
||||
pendingData.put(socket, queue);
|
||||
}
|
||||
queue.add(ByteBuffer.wrap(data));
|
||||
|
||||
queue.add(ByteBuffer.wrap(data));
|
||||
}
|
||||
// Changing the key state to write
|
||||
synchronized (pendingChanges) {
|
||||
// Indicate we want the interest ops set changed
|
||||
pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));
|
||||
}
|
||||
System.out.println("selector.wakeup();");
|
||||
// Finally, wake up our selecting thread so it can make the required changes
|
||||
|
|
@ -164,42 +160,51 @@ public abstract class NioNetwork implements Runnable {
|
|||
System.out.println("change.ops "+change.ops);
|
||||
break;
|
||||
case ChangeRequest.REGISTER:
|
||||
change.socket.register(selector, change.ops);
|
||||
change.socket.register(selector, change.ops);
|
||||
System.out.println("register socket ");
|
||||
break;
|
||||
}
|
||||
}
|
||||
pendingChanges.clear();
|
||||
}
|
||||
/*
|
||||
Iterator it = pendingData.keySet().iterator();
|
||||
while(it.hasNext()){
|
||||
SocketChannel sc = (SocketChannel)it.next();
|
||||
if(pendingData.get(sc) != null && !pendingData.get(sc).isEmpty()){
|
||||
sc.keyFor(selector).interestOps(SelectionKey.OP_WRITE);
|
||||
}
|
||||
}*/
|
||||
|
||||
// Wait for an event one of the registered channels
|
||||
selector.select();
|
||||
System.out.println("selector is awake");
|
||||
|
||||
// Iterate over the set of keys for which events are available
|
||||
Iterator selectedKeys = selector.selectedKeys().iterator();
|
||||
while (selectedKeys.hasNext()) {
|
||||
SelectionKey key = (SelectionKey) selectedKeys.next();
|
||||
selectedKeys.remove();
|
||||
|
||||
System.out.println("KeyOP: "+key.interestOps()+" isAcceptable: "+SelectionKey.OP_ACCEPT+" isConnectable: "+SelectionKey.OP_CONNECT+" isWritable: "+SelectionKey.OP_WRITE+" isReadable: "+SelectionKey.OP_READ);
|
||||
|
||||
if (key.isValid()) {
|
||||
// Check what event is available and deal with it
|
||||
if (key.isAcceptable()) {
|
||||
System.out.println("Accepting Connection!!");
|
||||
accept(key);
|
||||
System.out.println("Accepting new Connection!! connection count:"+selector.keys().size());
|
||||
}
|
||||
else if (key.isConnectable()) {
|
||||
//key.interestOps(SelectionKey.OP_READ);
|
||||
closeConnection(key);
|
||||
System.out.println("Disconnecting Connection!! connection count:"+selector.keys().size());
|
||||
System.out.println("Finnishing Connection!!");
|
||||
finishConnection(key);
|
||||
}
|
||||
else if (key.isReadable()) {
|
||||
read(key);
|
||||
System.out.println("Reading");
|
||||
}
|
||||
else if (key.isWritable()) {
|
||||
System.out.println("Writing");
|
||||
write(key);
|
||||
System.out.println("Writing");
|
||||
}
|
||||
else if (key.isReadable()) {
|
||||
System.out.println("Reading");
|
||||
read(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
|
@ -218,8 +223,9 @@ public abstract class NioNetwork implements Runnable {
|
|||
|
||||
// Accept the connection and make it non-blocking
|
||||
SocketChannel socketChannel = serverSocketChannel.accept();
|
||||
socketChannel.socket().setReuseAddress(true);
|
||||
socketChannel.configureBlocking(false);
|
||||
|
||||
|
||||
// Register the new SocketChannel with our Selector, indicating
|
||||
// we'd like to be notified when there's data waiting to be read
|
||||
socketChannel.register(selector, SelectionKey.OP_READ);
|
||||
|
|
@ -252,7 +258,8 @@ public abstract class NioNetwork implements Runnable {
|
|||
key.cancel();
|
||||
socketChannel.close();
|
||||
clients.remove(remoteAdr);
|
||||
System.out.println("Connection Forced Closed("+remoteAdr+")!!! Count: "+clients.size());
|
||||
System.out.println("Connection Forced Close("+remoteAdr+")!!! Count: "+clients.size());
|
||||
if(type == CLIENT) throw new ConnectException("Server Closed The Connection!!!");
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -262,7 +269,8 @@ public abstract class NioNetwork implements Runnable {
|
|||
key.channel().close();
|
||||
key.cancel();
|
||||
clients.remove(remoteAdr);
|
||||
System.out.println("Connection Closed("+remoteAdr+")!!! Count: "+clients.size());
|
||||
System.out.println("Connection Close("+remoteAdr+")!!! Count: "+clients.size());
|
||||
if(type == CLIENT) throw new ConnectException("Server Closed The Connection!!!");
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -270,30 +278,8 @@ public abstract class NioNetwork implements Runnable {
|
|||
// to the client
|
||||
byte[] rspByteData = new byte[numRead];
|
||||
System.arraycopy(readBuffer.array(), 0, rspByteData, 0, numRead);
|
||||
|
||||
Object rspData = Converter.toObject(rspByteData);
|
||||
|
||||
if(rspData instanceof SystemMessage){
|
||||
if(systemWorker != null){
|
||||
systemWorker.processData(this, socketChannel, rspData);
|
||||
}
|
||||
else{
|
||||
System.out.println("Unhandled System Message!!!");
|
||||
}
|
||||
}
|
||||
else if(rspEvents.get(socketChannel) != null){
|
||||
// Handle the response
|
||||
handleResponse(socketChannel, rspData);
|
||||
}
|
||||
else{
|
||||
// Hand the data off to our worker thread
|
||||
if(worker != null){
|
||||
worker.processData(this, socketChannel, rspData);
|
||||
}
|
||||
else{
|
||||
System.out.println("Unhandled Message!!!");
|
||||
}
|
||||
}
|
||||
|
||||
handleRecivedMessage(socketChannel, rspByteData);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -304,6 +290,9 @@ public abstract class NioNetwork implements Runnable {
|
|||
|
||||
synchronized (pendingData) {
|
||||
List queue = (List) pendingData.get(socketChannel);
|
||||
if(queue == null){
|
||||
queue = new ArrayList<ByteBuffer>();
|
||||
}
|
||||
|
||||
// Write until there's not more data ...
|
||||
while (!queue.isEmpty()) {
|
||||
|
|
@ -320,29 +309,55 @@ public abstract class NioNetwork implements Runnable {
|
|||
// We wrote away all data, so we're no longer interested
|
||||
// in writing on this socket. Switch back to waiting for
|
||||
// data.
|
||||
System.out.println("No more Data to write!!");
|
||||
key.interestOps(SelectionKey.OP_READ);
|
||||
pendingData.remove(socketChannel);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void handleRecivedMessage(SocketChannel socketChannel, byte[] rspByteData){
|
||||
Object rspData = Converter.toObject(rspByteData);
|
||||
|
||||
if(rspData instanceof SystemMessage){
|
||||
if(systemWorker != null){
|
||||
systemWorker.processData(this, socketChannel, rspData);
|
||||
}
|
||||
else{
|
||||
System.out.println("Unhandled System Message!!!");
|
||||
}
|
||||
}
|
||||
else if(rspData instanceof EchoMessage && ((EchoMessage)rspData).echo()){
|
||||
// Echoes back the recived message
|
||||
((EchoMessage)rspData).recived();
|
||||
System.out.println("Echoing Message: "+rspData);
|
||||
send(socketChannel,rspData);
|
||||
}
|
||||
else if(rspData instanceof ResponseRequestMessage &&
|
||||
rspEvents.get(((ResponseRequestMessage)rspData).getResponseId()) != null){
|
||||
// Handle the response
|
||||
handleResponse(((ResponseRequestMessage)rspData).getResponseId(), rspData);
|
||||
}
|
||||
else{
|
||||
// Hand the data off to our worker thread
|
||||
if(worker != null){
|
||||
worker.processData(this, socketChannel, rspData);
|
||||
}
|
||||
else{
|
||||
System.out.println("Unhandled Message!!!");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Client And Server ResponseEvent
|
||||
*/
|
||||
private void handleResponse(SocketChannel socketChannel, Object rspData) throws IOException {
|
||||
|
||||
private void handleResponse(double responseId, Object rspData){
|
||||
// Look up the handler for this channel
|
||||
ResponseEvent handler = (ResponseEvent) rspEvents.get(socketChannel);
|
||||
|
||||
ResponseEvent handler = (ResponseEvent) rspEvents.get(responseId);
|
||||
// And pass the response to it
|
||||
if (handler.handleResponse(rspData)) {
|
||||
// The handler has seen enough, close the connection
|
||||
//socketChannel.close();
|
||||
//socketChannel.keyFor(selector).cancel();
|
||||
}
|
||||
handler.handleResponse(rspData);
|
||||
|
||||
rspEvents.remove(socketChannel);
|
||||
rspEvents.remove(responseId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -351,8 +366,10 @@ public abstract class NioNetwork implements Runnable {
|
|||
protected SocketChannel initiateConnection(InetSocketAddress address) throws IOException {
|
||||
// Create a non-blocking socket channel
|
||||
SocketChannel socketChannel = SocketChannel.open();
|
||||
socketChannel.socket().setReuseAddress(true);
|
||||
socketChannel.configureBlocking(false);
|
||||
System.out.println(address);
|
||||
System.out.println("Connecting to: "+address);
|
||||
|
||||
// Kick off connection establishment
|
||||
socketChannel.connect(address);
|
||||
|
||||
|
|
@ -374,7 +391,7 @@ public abstract class NioNetwork implements Runnable {
|
|||
/**
|
||||
* Client
|
||||
*/
|
||||
protected void closeConnection(SelectionKey key) throws IOException {
|
||||
private void finishConnection(SelectionKey key){
|
||||
SocketChannel socketChannel = (SocketChannel) key.channel();
|
||||
|
||||
// Finish the connection. If the connection operation failed
|
||||
|
|
@ -387,8 +404,25 @@ public abstract class NioNetwork implements Runnable {
|
|||
key.cancel();
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// Register an interest in writing on this channel
|
||||
key.interestOps(SelectionKey.OP_WRITE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Client
|
||||
* @throws IOException
|
||||
*/
|
||||
protected void closeConnection(SocketChannel socketChannel) throws IOException{
|
||||
socketChannel.close();
|
||||
socketChannel.keyFor(selector).cancel();
|
||||
}
|
||||
|
||||
/**
|
||||
* Client
|
||||
* @throws IOException
|
||||
*/
|
||||
protected void closeConnection(InetSocketAddress address) throws IOException{
|
||||
closeConnection(getSocketChannel(address));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ public class NioServer extends NioNetwork{
|
|||
|
||||
// Create a new non-blocking server socket channel
|
||||
serverChannel = ServerSocketChannel.open();
|
||||
serverChannel.socket().setReuseAddress(true);
|
||||
serverChannel.configureBlocking(false);
|
||||
|
||||
// Bind the server socket to the specified address and port
|
||||
|
|
@ -35,9 +36,11 @@ public class NioServer extends NioNetwork{
|
|||
}
|
||||
|
||||
public void broadcast(byte[] data){
|
||||
Iterator<InetSocketAddress> it = clients.keySet().iterator();
|
||||
while(it.hasNext()){
|
||||
send(it.next(), data);
|
||||
synchronized(clients){
|
||||
Iterator<InetSocketAddress> it = clients.keySet().iterator();
|
||||
while(it.hasNext()){
|
||||
send(it.next(), data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,12 +1,19 @@
|
|||
package ei.engine.network.message;
|
||||
|
||||
public class StringMessage extends Message {
|
||||
import ei.engine.network.message.type.EchoMessage;
|
||||
import ei.engine.network.message.type.ResponseRequestMessage;
|
||||
|
||||
public class StringMessage extends Message implements ResponseRequestMessage, EchoMessage{
|
||||
private static final long serialVersionUID = 1L;
|
||||
private boolean echo;
|
||||
private double responseId;
|
||||
|
||||
private String msg;
|
||||
|
||||
public StringMessage(String msg){
|
||||
this.msg = msg;
|
||||
echo = true;
|
||||
responseId = Math.random();
|
||||
}
|
||||
|
||||
public String getString(){
|
||||
|
|
@ -20,4 +27,16 @@ public class StringMessage extends Message {
|
|||
public String toString(){
|
||||
return getString();
|
||||
}
|
||||
|
||||
public boolean echo() {
|
||||
return echo;
|
||||
}
|
||||
|
||||
public void recived() {
|
||||
echo = false;
|
||||
}
|
||||
|
||||
public double getResponseId() {
|
||||
return responseId;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
21
src/ei/engine/network/message/type/EchoMessage.java
Normal file
21
src/ei/engine/network/message/type/EchoMessage.java
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
package ei.engine.network.message.type;
|
||||
|
||||
/**
|
||||
* The reciver will echo out this message to the sender
|
||||
*
|
||||
* @author Ziver
|
||||
*/
|
||||
public interface EchoMessage {
|
||||
|
||||
/**
|
||||
* This method returns if the message should be echoed
|
||||
* @return If the message should be echoed
|
||||
*/
|
||||
public boolean echo();
|
||||
|
||||
/**
|
||||
* Called by the reciver to disable looping of the message
|
||||
*
|
||||
*/
|
||||
public void recived();
|
||||
}
|
||||
|
|
@ -1,12 +0,0 @@
|
|||
package ei.engine.network.message.type;
|
||||
|
||||
/**
|
||||
* This interface means that the sender
|
||||
* wants a reply from the destination
|
||||
*
|
||||
* @author Ziver
|
||||
*
|
||||
*/
|
||||
public interface ReplyRequestMessage {
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,18 @@
|
|||
package ei.engine.network.message.type;
|
||||
|
||||
/**
|
||||
* This interface means that the sender
|
||||
* wants a reply from the destination
|
||||
*
|
||||
* @author Ziver
|
||||
*
|
||||
*/
|
||||
public interface ResponseRequestMessage {
|
||||
|
||||
/**
|
||||
* The id of the response to identify the response event
|
||||
* @return Response id
|
||||
*/
|
||||
public double getResponseId();
|
||||
|
||||
}
|
||||
|
|
@ -6,8 +6,6 @@ import java.io.IOException;
|
|||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
|
||||
import ei.engine.util.MultiPrintStream;
|
||||
|
||||
public class Converter {
|
||||
|
||||
/**
|
||||
|
|
@ -22,8 +20,9 @@ public class Converter {
|
|||
ObjectOutputStream oos = new ObjectOutputStream(baos);
|
||||
oos.writeObject(object);
|
||||
oos.flush();
|
||||
oos.close();
|
||||
}catch(IOException ioe){
|
||||
MultiPrintStream.out.println(ioe.getMessage());
|
||||
System.out.println(ioe.getMessage());
|
||||
}
|
||||
return baos.toByteArray();
|
||||
}
|
||||
|
|
@ -41,10 +40,12 @@ public class Converter {
|
|||
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
|
||||
ObjectInputStream ois= new ObjectInputStream(bais);
|
||||
object = ois.readObject();
|
||||
ois.close();
|
||||
bais.close();
|
||||
}catch(IOException ioe){
|
||||
MultiPrintStream.out.println(ioe.getMessage());
|
||||
System.out.println(ioe.getMessage());
|
||||
}catch(ClassNotFoundException cnfe){
|
||||
MultiPrintStream.out.println(cnfe.getMessage());
|
||||
System.out.println(cnfe.getMessage());
|
||||
}
|
||||
return object;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ public class EchoWorker extends Worker {
|
|||
}
|
||||
|
||||
// Return to sender
|
||||
System.out.println("Recived Msg: "+dataEvent.data);
|
||||
dataEvent.network.send(dataEvent.socket, dataEvent.data);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ public class NetworkClient {
|
|||
PrintRsp handler = new PrintRsp();
|
||||
client.send(handler, new StringMessage("StringMessage: "+i));
|
||||
handler.waitForResponse();
|
||||
//try {Thread.sleep(1);} catch (InterruptedException e) {}
|
||||
//System.out.println("sending");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
|
|
|||
|
|
@ -4,15 +4,11 @@ import java.io.IOException;
|
|||
|
||||
import ei.engine.network.NioNetwork;
|
||||
import ei.engine.network.NioServer;
|
||||
import ei.engine.network.worker.EchoWorker;
|
||||
|
||||
public class NetworkServer {
|
||||
public static void main(String[] args) {
|
||||
try {
|
||||
EchoWorker worker = new EchoWorker();
|
||||
new Thread(worker).start();
|
||||
NioNetwork server = new NioServer(null, 6056);
|
||||
server.setWorker(worker);
|
||||
new Thread(server).start();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue