From cbd673472f9fb012efd5c8df15e388176ad4238b Mon Sep 17 00:00:00 2001 From: Ziver Koc Date: Sat, 9 Jun 2007 14:26:48 +0000 Subject: [PATCH] The network engin is working now. Added some new Message types --- src/ei/engine/network/NioClient.java | 12 +- src/ei/engine/network/NioNetwork.java | 180 +++++++++++------- src/ei/engine/network/NioServer.java | 9 +- .../engine/network/message/StringMessage.java | 21 +- .../network/message/type/EchoMessage.java | 21 ++ .../message/type/ReplyRequestMessage.java | 12 -- .../message/type/ResponseRequestMessage.java | 18 ++ src/ei/engine/network/util/Converter.java | 11 +- src/ei/engine/network/worker/EchoWorker.java | 1 + src/ei/engine/test/NetworkClient.java | 1 + src/ei/engine/test/NetworkServer.java | 4 - 11 files changed, 184 insertions(+), 106 deletions(-) create mode 100644 src/ei/engine/network/message/type/EchoMessage.java delete mode 100644 src/ei/engine/network/message/type/ReplyRequestMessage.java create mode 100644 src/ei/engine/network/message/type/ResponseRequestMessage.java diff --git a/src/ei/engine/network/NioClient.java b/src/ei/engine/network/NioClient.java index b17b9b8..63ba13a 100644 --- a/src/ei/engine/network/NioClient.java +++ b/src/ei/engine/network/NioClient.java @@ -7,13 +7,15 @@ import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; +import ei.engine.network.message.type.ResponseRequestMessage; import ei.engine.network.response.ResponseEvent; -import ei.engine.network.util.Converter; public class NioClient extends NioNetwork{ + private SocketChannel socket; public NioClient(InetAddress hostAddress, int port) throws IOException { super(hostAddress, port, CLIENT); + socket = initiateConnection(new InetSocketAddress(hostAddress, port)); } protected Selector initSelector() throws IOException { @@ -28,13 +30,7 @@ public class NioClient extends NioNetwork{ * @param data The data to send * @throws IOException */ - public void send(ResponseEvent handler, byte[] data) throws IOException { - // Start a new connection - SocketChannel socket = initiateConnection(new InetSocketAddress(hostAddress, port)); + public void send(ResponseEvent handler, ResponseRequestMessage data) throws IOException { send(socket, handler, data); } - - public void send(ResponseEvent handler, Object data) throws IOException { - send(handler, Converter.toBytes(data)); - } } diff --git a/src/ei/engine/network/NioNetwork.java b/src/ei/engine/network/NioNetwork.java index 847b4cf..3a73170 100644 --- a/src/ei/engine/network/NioNetwork.java +++ b/src/ei/engine/network/NioNetwork.java @@ -1,6 +1,7 @@ package ei.engine.network; import java.io.IOException; +import java.net.ConnectException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -16,10 +17,9 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import ei.engine.network.message.Message; -import ei.engine.network.message.StringMessage; +import ei.engine.network.message.type.EchoMessage; +import ei.engine.network.message.type.ResponseRequestMessage; import ei.engine.network.message.type.SystemMessage; -import ei.engine.network.response.PrintRsp; import ei.engine.network.response.ResponseEvent; import ei.engine.network.server.ClientData; import ei.engine.network.util.Converter; @@ -40,9 +40,9 @@ public abstract class NioNetwork implements Runnable { // The channel on which we'll accept connections protected ServerSocketChannel serverChannel; // The selector we'll be monitoring - protected Selector selector; + private Selector selector; // The buffer into which we'll read data when it's available - protected ByteBuffer readBuffer = ByteBuffer.allocate(8192); + private ByteBuffer readBuffer = ByteBuffer.allocate(8192); protected Worker worker; protected Worker systemWorker; @@ -50,12 +50,12 @@ public abstract class NioNetwork implements Runnable { protected Map clients = new HashMap(); // A list of PendingChange instances - protected List pendingChanges = new LinkedList(); + private List pendingChanges = new LinkedList(); // Maps a SocketChannel to a list of ByteBuffer instances - protected Map pendingData = new HashMap(); + private Map pendingData = new HashMap(); // Maps a SocketChannel to a RspHandler - protected Map rspEvents = Collections.synchronizedMap(new HashMap()); + private Map rspEvents = Collections.synchronizedMap(new HashMap()); /** * Create a nio network class @@ -100,15 +100,11 @@ public abstract class NioNetwork implements Runnable { send(getSocketChannel(address), data); } - public void send(SocketChannel socket, ResponseEvent handler, Object data) throws IOException { - send(socket, handler, Converter.toBytes(data)); - } - - public void send(SocketChannel socket, ResponseEvent handler, byte[] data) throws IOException { + public void send(SocketChannel socket, ResponseEvent handler, ResponseRequestMessage data) throws IOException { // Register the response handler - rspEvents.put(socket, handler); + rspEvents.put(data.getResponseId(), handler); - queueSend(socket,data); + queueSend(socket,Converter.toBytes(data)); } /** @@ -129,10 +125,6 @@ public abstract class NioNetwork implements Runnable { */ @SuppressWarnings("unchecked") protected void queueSend(SocketChannel socket, byte[] data){ - synchronized (pendingChanges) { - // Indicate we want the interest ops set changed - pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE)); - } System.out.println("pendingData"); // And queue the data we want written synchronized (pendingData) { @@ -141,8 +133,12 @@ public abstract class NioNetwork implements Runnable { queue = new ArrayList(); pendingData.put(socket, queue); } - queue.add(ByteBuffer.wrap(data)); - + queue.add(ByteBuffer.wrap(data)); + } + // Changing the key state to write + synchronized (pendingChanges) { + // Indicate we want the interest ops set changed + pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE)); } System.out.println("selector.wakeup();"); // Finally, wake up our selecting thread so it can make the required changes @@ -164,42 +160,51 @@ public abstract class NioNetwork implements Runnable { System.out.println("change.ops "+change.ops); break; case ChangeRequest.REGISTER: - change.socket.register(selector, change.ops); + change.socket.register(selector, change.ops); + System.out.println("register socket "); break; } } pendingChanges.clear(); } + /* + Iterator it = pendingData.keySet().iterator(); + while(it.hasNext()){ + SocketChannel sc = (SocketChannel)it.next(); + if(pendingData.get(sc) != null && !pendingData.get(sc).isEmpty()){ + sc.keyFor(selector).interestOps(SelectionKey.OP_WRITE); + } + }*/ // Wait for an event one of the registered channels selector.select(); + System.out.println("selector is awake"); // Iterate over the set of keys for which events are available Iterator selectedKeys = selector.selectedKeys().iterator(); while (selectedKeys.hasNext()) { SelectionKey key = (SelectionKey) selectedKeys.next(); selectedKeys.remove(); - + System.out.println("KeyOP: "+key.interestOps()+" isAcceptable: "+SelectionKey.OP_ACCEPT+" isConnectable: "+SelectionKey.OP_CONNECT+" isWritable: "+SelectionKey.OP_WRITE+" isReadable: "+SelectionKey.OP_READ); + if (key.isValid()) { // Check what event is available and deal with it if (key.isAcceptable()) { + System.out.println("Accepting Connection!!"); accept(key); - System.out.println("Accepting new Connection!! connection count:"+selector.keys().size()); } else if (key.isConnectable()) { - //key.interestOps(SelectionKey.OP_READ); - closeConnection(key); - System.out.println("Disconnecting Connection!! connection count:"+selector.keys().size()); + System.out.println("Finnishing Connection!!"); + finishConnection(key); } - else if (key.isReadable()) { - read(key); - System.out.println("Reading"); - } else if (key.isWritable()) { System.out.println("Writing"); write(key); - System.out.println("Writing"); } + else if (key.isReadable()) { + System.out.println("Reading"); + read(key); + } } } } catch (Exception e) { @@ -218,8 +223,9 @@ public abstract class NioNetwork implements Runnable { // Accept the connection and make it non-blocking SocketChannel socketChannel = serverSocketChannel.accept(); + socketChannel.socket().setReuseAddress(true); socketChannel.configureBlocking(false); - + // Register the new SocketChannel with our Selector, indicating // we'd like to be notified when there's data waiting to be read socketChannel.register(selector, SelectionKey.OP_READ); @@ -252,7 +258,8 @@ public abstract class NioNetwork implements Runnable { key.cancel(); socketChannel.close(); clients.remove(remoteAdr); - System.out.println("Connection Forced Closed("+remoteAdr+")!!! Count: "+clients.size()); + System.out.println("Connection Forced Close("+remoteAdr+")!!! Count: "+clients.size()); + if(type == CLIENT) throw new ConnectException("Server Closed The Connection!!!"); return; } @@ -262,7 +269,8 @@ public abstract class NioNetwork implements Runnable { key.channel().close(); key.cancel(); clients.remove(remoteAdr); - System.out.println("Connection Closed("+remoteAdr+")!!! Count: "+clients.size()); + System.out.println("Connection Close("+remoteAdr+")!!! Count: "+clients.size()); + if(type == CLIENT) throw new ConnectException("Server Closed The Connection!!!"); return; } @@ -270,30 +278,8 @@ public abstract class NioNetwork implements Runnable { // to the client byte[] rspByteData = new byte[numRead]; System.arraycopy(readBuffer.array(), 0, rspByteData, 0, numRead); - - Object rspData = Converter.toObject(rspByteData); - - if(rspData instanceof SystemMessage){ - if(systemWorker != null){ - systemWorker.processData(this, socketChannel, rspData); - } - else{ - System.out.println("Unhandled System Message!!!"); - } - } - else if(rspEvents.get(socketChannel) != null){ - // Handle the response - handleResponse(socketChannel, rspData); - } - else{ - // Hand the data off to our worker thread - if(worker != null){ - worker.processData(this, socketChannel, rspData); - } - else{ - System.out.println("Unhandled Message!!!"); - } - } + + handleRecivedMessage(socketChannel, rspByteData); } /** @@ -304,6 +290,9 @@ public abstract class NioNetwork implements Runnable { synchronized (pendingData) { List queue = (List) pendingData.get(socketChannel); + if(queue == null){ + queue = new ArrayList(); + } // Write until there's not more data ... while (!queue.isEmpty()) { @@ -320,29 +309,55 @@ public abstract class NioNetwork implements Runnable { // We wrote away all data, so we're no longer interested // in writing on this socket. Switch back to waiting for // data. + System.out.println("No more Data to write!!"); key.interestOps(SelectionKey.OP_READ); - pendingData.remove(socketChannel); } } } + private void handleRecivedMessage(SocketChannel socketChannel, byte[] rspByteData){ + Object rspData = Converter.toObject(rspByteData); + + if(rspData instanceof SystemMessage){ + if(systemWorker != null){ + systemWorker.processData(this, socketChannel, rspData); + } + else{ + System.out.println("Unhandled System Message!!!"); + } + } + else if(rspData instanceof EchoMessage && ((EchoMessage)rspData).echo()){ + // Echoes back the recived message + ((EchoMessage)rspData).recived(); + System.out.println("Echoing Message: "+rspData); + send(socketChannel,rspData); + } + else if(rspData instanceof ResponseRequestMessage && + rspEvents.get(((ResponseRequestMessage)rspData).getResponseId()) != null){ + // Handle the response + handleResponse(((ResponseRequestMessage)rspData).getResponseId(), rspData); + } + else{ + // Hand the data off to our worker thread + if(worker != null){ + worker.processData(this, socketChannel, rspData); + } + else{ + System.out.println("Unhandled Message!!!"); + } + } + } /** * Client And Server ResponseEvent */ - private void handleResponse(SocketChannel socketChannel, Object rspData) throws IOException { - + private void handleResponse(double responseId, Object rspData){ // Look up the handler for this channel - ResponseEvent handler = (ResponseEvent) rspEvents.get(socketChannel); - + ResponseEvent handler = (ResponseEvent) rspEvents.get(responseId); // And pass the response to it - if (handler.handleResponse(rspData)) { - // The handler has seen enough, close the connection - //socketChannel.close(); - //socketChannel.keyFor(selector).cancel(); - } + handler.handleResponse(rspData); - rspEvents.remove(socketChannel); + rspEvents.remove(responseId); } /** @@ -351,8 +366,10 @@ public abstract class NioNetwork implements Runnable { protected SocketChannel initiateConnection(InetSocketAddress address) throws IOException { // Create a non-blocking socket channel SocketChannel socketChannel = SocketChannel.open(); + socketChannel.socket().setReuseAddress(true); socketChannel.configureBlocking(false); - System.out.println(address); + System.out.println("Connecting to: "+address); + // Kick off connection establishment socketChannel.connect(address); @@ -374,7 +391,7 @@ public abstract class NioNetwork implements Runnable { /** * Client */ - protected void closeConnection(SelectionKey key) throws IOException { + private void finishConnection(SelectionKey key){ SocketChannel socketChannel = (SocketChannel) key.channel(); // Finish the connection. If the connection operation failed @@ -387,8 +404,25 @@ public abstract class NioNetwork implements Runnable { key.cancel(); return; } - + // Register an interest in writing on this channel key.interestOps(SelectionKey.OP_WRITE); } + + /** + * Client + * @throws IOException + */ + protected void closeConnection(SocketChannel socketChannel) throws IOException{ + socketChannel.close(); + socketChannel.keyFor(selector).cancel(); + } + + /** + * Client + * @throws IOException + */ + protected void closeConnection(InetSocketAddress address) throws IOException{ + closeConnection(getSocketChannel(address)); + } } diff --git a/src/ei/engine/network/NioServer.java b/src/ei/engine/network/NioServer.java index 2fb59bb..5fb1f28 100644 --- a/src/ei/engine/network/NioServer.java +++ b/src/ei/engine/network/NioServer.java @@ -21,6 +21,7 @@ public class NioServer extends NioNetwork{ // Create a new non-blocking server socket channel serverChannel = ServerSocketChannel.open(); + serverChannel.socket().setReuseAddress(true); serverChannel.configureBlocking(false); // Bind the server socket to the specified address and port @@ -35,9 +36,11 @@ public class NioServer extends NioNetwork{ } public void broadcast(byte[] data){ - Iterator it = clients.keySet().iterator(); - while(it.hasNext()){ - send(it.next(), data); + synchronized(clients){ + Iterator it = clients.keySet().iterator(); + while(it.hasNext()){ + send(it.next(), data); + } } } diff --git a/src/ei/engine/network/message/StringMessage.java b/src/ei/engine/network/message/StringMessage.java index a7acf1d..10a6697 100644 --- a/src/ei/engine/network/message/StringMessage.java +++ b/src/ei/engine/network/message/StringMessage.java @@ -1,12 +1,19 @@ package ei.engine.network.message; -public class StringMessage extends Message { +import ei.engine.network.message.type.EchoMessage; +import ei.engine.network.message.type.ResponseRequestMessage; + +public class StringMessage extends Message implements ResponseRequestMessage, EchoMessage{ private static final long serialVersionUID = 1L; + private boolean echo; + private double responseId; private String msg; public StringMessage(String msg){ this.msg = msg; + echo = true; + responseId = Math.random(); } public String getString(){ @@ -20,4 +27,16 @@ public class StringMessage extends Message { public String toString(){ return getString(); } + + public boolean echo() { + return echo; + } + + public void recived() { + echo = false; + } + + public double getResponseId() { + return responseId; + } } diff --git a/src/ei/engine/network/message/type/EchoMessage.java b/src/ei/engine/network/message/type/EchoMessage.java new file mode 100644 index 0000000..29faf67 --- /dev/null +++ b/src/ei/engine/network/message/type/EchoMessage.java @@ -0,0 +1,21 @@ +package ei.engine.network.message.type; + +/** + * The reciver will echo out this message to the sender + * + * @author Ziver + */ +public interface EchoMessage { + + /** + * This method returns if the message should be echoed + * @return If the message should be echoed + */ + public boolean echo(); + + /** + * Called by the reciver to disable looping of the message + * + */ + public void recived(); +} diff --git a/src/ei/engine/network/message/type/ReplyRequestMessage.java b/src/ei/engine/network/message/type/ReplyRequestMessage.java deleted file mode 100644 index 9fdaf2f..0000000 --- a/src/ei/engine/network/message/type/ReplyRequestMessage.java +++ /dev/null @@ -1,12 +0,0 @@ -package ei.engine.network.message.type; - -/** - * This interface means that the sender - * wants a reply from the destination - * - * @author Ziver - * - */ -public interface ReplyRequestMessage { - -} diff --git a/src/ei/engine/network/message/type/ResponseRequestMessage.java b/src/ei/engine/network/message/type/ResponseRequestMessage.java new file mode 100644 index 0000000..2006430 --- /dev/null +++ b/src/ei/engine/network/message/type/ResponseRequestMessage.java @@ -0,0 +1,18 @@ +package ei.engine.network.message.type; + +/** + * This interface means that the sender + * wants a reply from the destination + * + * @author Ziver + * + */ +public interface ResponseRequestMessage { + + /** + * The id of the response to identify the response event + * @return Response id + */ + public double getResponseId(); + +} diff --git a/src/ei/engine/network/util/Converter.java b/src/ei/engine/network/util/Converter.java index 640921b..4488fdf 100644 --- a/src/ei/engine/network/util/Converter.java +++ b/src/ei/engine/network/util/Converter.java @@ -6,8 +6,6 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import ei.engine.util.MultiPrintStream; - public class Converter { /** @@ -22,8 +20,9 @@ public class Converter { ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeObject(object); oos.flush(); + oos.close(); }catch(IOException ioe){ - MultiPrintStream.out.println(ioe.getMessage()); + System.out.println(ioe.getMessage()); } return baos.toByteArray(); } @@ -41,10 +40,12 @@ public class Converter { ByteArrayInputStream bais = new ByteArrayInputStream(bytes); ObjectInputStream ois= new ObjectInputStream(bais); object = ois.readObject(); + ois.close(); + bais.close(); }catch(IOException ioe){ - MultiPrintStream.out.println(ioe.getMessage()); + System.out.println(ioe.getMessage()); }catch(ClassNotFoundException cnfe){ - MultiPrintStream.out.println(cnfe.getMessage()); + System.out.println(cnfe.getMessage()); } return object; } diff --git a/src/ei/engine/network/worker/EchoWorker.java b/src/ei/engine/network/worker/EchoWorker.java index dd4ce27..f45eec2 100644 --- a/src/ei/engine/network/worker/EchoWorker.java +++ b/src/ei/engine/network/worker/EchoWorker.java @@ -18,6 +18,7 @@ public class EchoWorker extends Worker { } // Return to sender + System.out.println("Recived Msg: "+dataEvent.data); dataEvent.network.send(dataEvent.socket, dataEvent.data); } } diff --git a/src/ei/engine/test/NetworkClient.java b/src/ei/engine/test/NetworkClient.java index 387e04a..6a51021 100644 --- a/src/ei/engine/test/NetworkClient.java +++ b/src/ei/engine/test/NetworkClient.java @@ -18,6 +18,7 @@ public class NetworkClient { PrintRsp handler = new PrintRsp(); client.send(handler, new StringMessage("StringMessage: "+i)); handler.waitForResponse(); + //try {Thread.sleep(1);} catch (InterruptedException e) {} //System.out.println("sending"); } } catch (IOException e) { diff --git a/src/ei/engine/test/NetworkServer.java b/src/ei/engine/test/NetworkServer.java index f987f3f..d84ab31 100644 --- a/src/ei/engine/test/NetworkServer.java +++ b/src/ei/engine/test/NetworkServer.java @@ -4,15 +4,11 @@ import java.io.IOException; import ei.engine.network.NioNetwork; import ei.engine.network.NioServer; -import ei.engine.network.worker.EchoWorker; public class NetworkServer { public static void main(String[] args) { try { - EchoWorker worker = new EchoWorker(); - new Thread(worker).start(); NioNetwork server = new NioServer(null, 6056); - server.setWorker(worker); new Thread(server).start(); } catch (IOException e) { e.printStackTrace();