This commit is contained in:
Ziver Koc 2007-06-03 21:02:32 +00:00
parent 16b9f1f265
commit c0bf13e493

View file

@ -118,12 +118,8 @@ public abstract class NioNetwork implements Runnable {
* @param data The data to send
*/
public void send(SocketChannel socket, byte[] data) {
synchronized (pendingChanges) {
// Indicate we want the interest ops set changed
pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));
queueSend(socket,data);
}
}
/**
* Queues the message to be sent and wakeups the selector
@ -133,6 +129,11 @@ public abstract class NioNetwork implements Runnable {
*/
@SuppressWarnings("unchecked")
protected void queueSend(SocketChannel socket, byte[] data){
synchronized (pendingChanges) {
// Indicate we want the interest ops set changed
pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));
}
System.out.println("pendingData");
// And queue the data we want written
synchronized (pendingData) {
List<ByteBuffer> queue = pendingData.get(socket);
@ -143,6 +144,7 @@ public abstract class NioNetwork implements Runnable {
queue.add(ByteBuffer.wrap(data));
}
System.out.println("selector.wakeup();");
// Finally, wake up our selecting thread so it can make the required changes
selector.wakeup();
}
@ -159,6 +161,7 @@ public abstract class NioNetwork implements Runnable {
case ChangeRequest.CHANGEOPS:
SelectionKey key = change.socket.keyFor(selector);
key.interestOps(change.ops);
System.out.println("change.ops "+change.ops);
break;
case ChangeRequest.REGISTER:
change.socket.register(selector, change.ops);
@ -181,19 +184,21 @@ public abstract class NioNetwork implements Runnable {
// Check what event is available and deal with it
if (key.isAcceptable()) {
accept(key);
//System.out.println("Accepting new Connection!! connection count:"+selector.keys().size());
System.out.println("Accepting new Connection!! connection count:"+selector.keys().size());
}
else if (key.isConnectable()) {
//key.interestOps(SelectionKey.OP_READ);
closeConnection(key);
//System.out.println("Disconnecting Connection!! connection count:"+selector.keys().size());
System.out.println("Disconnecting Connection!! connection count:"+selector.keys().size());
}
else if (key.isReadable()) {
read(key);
//System.out.println("Reading");
System.out.println("Reading");
}
else if (key.isWritable()) {
System.out.println("Writing");
write(key);
//System.out.println("Writing");
System.out.println("Writing");
}
}
}
@ -223,6 +228,7 @@ public abstract class NioNetwork implements Runnable {
InetSocketAddress remoteAdr = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress();
if(!clients.containsValue(remoteAdr)){
clients.put(remoteAdr, new ClientData(socketChannel));
System.out.println("New Connection("+remoteAdr+")!!! Count: "+clients.size());
}
}
@ -231,6 +237,7 @@ public abstract class NioNetwork implements Runnable {
*/
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
InetSocketAddress remoteAdr = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress();
// Clear out our read buffer so it's ready for new data
readBuffer.clear();
@ -244,6 +251,8 @@ public abstract class NioNetwork implements Runnable {
// the selection key and close the channel.
key.cancel();
socketChannel.close();
clients.remove(remoteAdr);
System.out.println("Connection Forced Closed("+remoteAdr+")!!! Count: "+clients.size());
return;
}
@ -252,6 +261,8 @@ public abstract class NioNetwork implements Runnable {
// same from our end and cancel the channel.
key.channel().close();
key.cancel();
clients.remove(remoteAdr);
System.out.println("Connection Closed("+remoteAdr+")!!! Count: "+clients.size());
return;
}
@ -285,50 +296,6 @@ public abstract class NioNetwork implements Runnable {
}
}
/**
* Initieates a socket to the server
*/
protected SocketChannel initiateConnection(InetSocketAddress address) throws IOException {
// Create a non-blocking socket channel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
// Kick off connection establishment
socketChannel.connect(address);
// Queue a channel registration since the caller is not the
// selecting thread. As part of the registration we'll register
// an interest in connection events. These are raised when a channel
// is ready to complete connection establishment.
synchronized(this.pendingChanges) {
pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT));
}
return socketChannel;
}
protected SocketChannel getSocketChannel(InetSocketAddress address){
return clients.get(address).getSocketChannel();
}
/**
* Client And Server ResponseEvent
*/
private void handleResponse(SocketChannel socketChannel, Object rspData) throws IOException {
// Look up the handler for this channel
ResponseEvent handler = (ResponseEvent) rspEvents.get(socketChannel);
// And pass the response to it
if (handler.handleResponse(rspData)) {
// The handler has seen enough, close the connection
socketChannel.close();
socketChannel.keyFor(selector).cancel();
}
rspEvents.remove(socketChannel);
}
/**
* Client and Server
*/
@ -359,10 +326,55 @@ public abstract class NioNetwork implements Runnable {
}
}
/**
* Client And Server ResponseEvent
*/
private void handleResponse(SocketChannel socketChannel, Object rspData) throws IOException {
// Look up the handler for this channel
ResponseEvent handler = (ResponseEvent) rspEvents.get(socketChannel);
// And pass the response to it
if (handler.handleResponse(rspData)) {
// The handler has seen enough, close the connection
//socketChannel.close();
//socketChannel.keyFor(selector).cancel();
}
rspEvents.remove(socketChannel);
}
/**
* Initieates a socket to the server
*/
protected SocketChannel initiateConnection(InetSocketAddress address) throws IOException {
// Create a non-blocking socket channel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
System.out.println(address);
// Kick off connection establishment
socketChannel.connect(address);
// Queue a channel registration since the caller is not the
// selecting thread. As part of the registration we'll register
// an interest in connection events. These are raised when a channel
// is ready to complete connection establishment.
synchronized(this.pendingChanges) {
pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT));
}
return socketChannel;
}
protected SocketChannel getSocketChannel(InetSocketAddress address){
return clients.get(address).getSocketChannel();
}
/**
* Client
*/
private void closeConnection(SelectionKey key) throws IOException {
protected void closeConnection(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// Finish the connection. If the connection operation failed