Fixed some things
This commit is contained in:
parent
c21b229882
commit
41c474d2a5
8 changed files with 239 additions and 138 deletions
|
|
@ -32,11 +32,11 @@ public abstract class NioNetwork implements Runnable {
|
|||
/**
|
||||
* Debug level
|
||||
* 0 = nothing
|
||||
* 1 = connection debug
|
||||
* 1 = connection info
|
||||
* 2 = message debug
|
||||
* 3 = selector debug
|
||||
*/
|
||||
public static final int DEBUG = 2;
|
||||
public static int DEBUG = 2;
|
||||
public static enum NetworkType {SERVER, CLIENT};
|
||||
|
||||
private NetworkType type;
|
||||
|
|
@ -139,7 +139,7 @@ public abstract class NioNetwork implements Runnable {
|
|||
* @param data The data to send
|
||||
*/
|
||||
protected void queueSend(SocketChannel socket, byte[] data){
|
||||
if(DEBUG>=3)MultiPrintStream.out.println("Sending Queue...");
|
||||
if(DEBUG>=3) MultiPrintStream.out.println("Sending Queue...");
|
||||
// And queue the data we want written
|
||||
synchronized (pendingWriteData) {
|
||||
List<ByteBuffer> queue = pendingWriteData.get(socket);
|
||||
|
|
@ -157,7 +157,7 @@ public abstract class NioNetwork implements Runnable {
|
|||
// Indicate we want the interest ops set changed
|
||||
pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));
|
||||
}
|
||||
if(DEBUG>=3)MultiPrintStream.out.println("selector.wakeup();");
|
||||
if(DEBUG>=3) MultiPrintStream.out.println("selector.wakeup();");
|
||||
// Finally, wake up our selecting thread so it can make the required changes
|
||||
selector.wakeup();
|
||||
}
|
||||
|
|
@ -175,11 +175,11 @@ public abstract class NioNetwork implements Runnable {
|
|||
case ChangeRequest.CHANGEOPS:
|
||||
SelectionKey key = change.socket.keyFor(selector);
|
||||
key.interestOps(change.ops);
|
||||
if(DEBUG>=3)MultiPrintStream.out.println("change.ops "+change.ops);
|
||||
if(DEBUG>=3) MultiPrintStream.out.println("change.ops "+change.ops);
|
||||
break;
|
||||
case ChangeRequest.REGISTER:
|
||||
change.socket.register(selector, change.ops);
|
||||
if(DEBUG>=3)MultiPrintStream.out.println("register socket ");
|
||||
if(DEBUG>=3) MultiPrintStream.out.println("register socket ");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
@ -188,31 +188,31 @@ public abstract class NioNetwork implements Runnable {
|
|||
|
||||
// Wait for an event one of the registered channels
|
||||
selector.select();
|
||||
if(DEBUG>=3)MultiPrintStream.out.println("selector is awake");
|
||||
if(DEBUG>=3) MultiPrintStream.out.println("selector is awake");
|
||||
|
||||
// Iterate over the set of keys for which events are available
|
||||
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
|
||||
while (selectedKeys.hasNext()) {
|
||||
SelectionKey key = (SelectionKey) selectedKeys.next();
|
||||
selectedKeys.remove();
|
||||
if(DEBUG>=3)MultiPrintStream.out.println("KeyOP: "+key.interestOps()+" isAcceptable: "+SelectionKey.OP_ACCEPT+" isConnectable: "+SelectionKey.OP_CONNECT+" isWritable: "+SelectionKey.OP_WRITE+" isReadable: "+SelectionKey.OP_READ);
|
||||
if(DEBUG>=3) MultiPrintStream.out.println("KeyOP: "+key.interestOps()+" isAcceptable: "+SelectionKey.OP_ACCEPT+" isConnectable: "+SelectionKey.OP_CONNECT+" isWritable: "+SelectionKey.OP_WRITE+" isReadable: "+SelectionKey.OP_READ);
|
||||
|
||||
if (key.isValid()) {
|
||||
// Check what event is available and deal with it
|
||||
if (key.isAcceptable()) {
|
||||
if(DEBUG>=3)MultiPrintStream.out.println("Accepting Connection!!");
|
||||
if(DEBUG>=3) MultiPrintStream.out.println("Accepting Connection!!");
|
||||
accept(key);
|
||||
}
|
||||
else if (key.isConnectable()) {
|
||||
if(DEBUG>=3)MultiPrintStream.out.println("Finnishing Connection!!");
|
||||
if(DEBUG>=3) MultiPrintStream.out.println("Finnishing Connection!!");
|
||||
finishConnection(key);
|
||||
}
|
||||
else if (key.isWritable()) {
|
||||
if(DEBUG>=3)MultiPrintStream.out.println("Writing");
|
||||
if(DEBUG>=3) MultiPrintStream.out.println("Writing");
|
||||
write(key);
|
||||
}
|
||||
else if (key.isReadable()) {
|
||||
if(DEBUG>=3)MultiPrintStream.out.println("Reading");
|
||||
if(DEBUG>=3) MultiPrintStream.out.println("Reading");
|
||||
read(key);
|
||||
}
|
||||
}
|
||||
|
|
@ -269,7 +269,7 @@ public abstract class NioNetwork implements Runnable {
|
|||
clients.remove(remoteAdr);
|
||||
pendingReadData.remove(socketChannel);
|
||||
pendingWriteData.remove(socketChannel);
|
||||
if(DEBUG>=1)MultiPrintStream.out.println("Connection Forced Close("+remoteAdr+")!!! Connection Count: "+clients.size());
|
||||
if(DEBUG>=1) MultiPrintStream.out.println("Connection Forced Close("+remoteAdr+")!!! Connection Count: "+clients.size());
|
||||
if(type == NetworkType.CLIENT) throw new IOException("Server Closed The Connection!!!");
|
||||
return;
|
||||
}
|
||||
|
|
@ -282,7 +282,7 @@ public abstract class NioNetwork implements Runnable {
|
|||
clients.remove(remoteAdr);
|
||||
pendingReadData.remove(socketChannel);
|
||||
pendingWriteData.remove(socketChannel);
|
||||
if(DEBUG>=1)MultiPrintStream.out.println("Connection Close("+remoteAdr+")!!! Connection Count: "+clients.size());
|
||||
if(DEBUG>=1) MultiPrintStream.out.println("Connection Close("+remoteAdr+")!!! Connection Count: "+clients.size());
|
||||
if(type == NetworkType.CLIENT) throw new IOException("Server Closed The Connection!!!");
|
||||
return;
|
||||
}
|
||||
|
|
@ -293,26 +293,25 @@ public abstract class NioNetwork implements Runnable {
|
|||
System.arraycopy(readBuffer.array(), 0, rspByteData, 0, numRead);
|
||||
if(encrypter != null)// Encryption
|
||||
rspByteData = encrypter.decrypt(rspByteData);
|
||||
|
||||
/*
|
||||
|
||||
// Message Count 1m: 36750
|
||||
// Message Count 1s: 612
|
||||
if(!pendingReadData.containsKey(socketChannel)){
|
||||
pendingReadData.put(socketChannel, new DynamicByteArrayStream());
|
||||
}
|
||||
if(encrypter != null)// Encryption
|
||||
rspByteData = encrypter.decrypt(rspByteData);
|
||||
|
||||
pendingReadData.get(socketChannel).add(rspByteData);
|
||||
*/
|
||||
DynamicByteArrayStream dynBuf = pendingReadData.get(socketChannel);
|
||||
dynBuf.add(rspByteData);
|
||||
|
||||
|
||||
Object rspData = null;
|
||||
try{
|
||||
rspData = Converter.toObject(rspByteData);
|
||||
//rspData = Converter.toObject(pendingReadData.get(socketChannel));
|
||||
//rspData = Converter.toObject(rspByteData);
|
||||
rspData = Converter.toObject(dynBuf);
|
||||
handleRecivedMessage(socketChannel, rspData);
|
||||
//pendingReadData.get(socketChannel).clear();
|
||||
dynBuf.clear();
|
||||
}catch(Exception e){
|
||||
e.printStackTrace();
|
||||
//pendingReadData.get(socketChannel).reset();
|
||||
dynBuf.reset();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -326,18 +325,16 @@ public abstract class NioNetwork implements Runnable {
|
|||
List<ByteBuffer> queue = pendingWriteData.get(socketChannel);
|
||||
if(queue == null){
|
||||
queue = new ArrayList<ByteBuffer>();
|
||||
pendingWriteData.put(socketChannel, queue);
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
// Write until there's not more data ...
|
||||
while (!queue.isEmpty()) {
|
||||
ByteBuffer buf = queue.get(0);
|
||||
i += buf.remaining();
|
||||
socketChannel.write(buf);
|
||||
i -= buf.remaining();
|
||||
if (buf.remaining() > 0) {
|
||||
// ... or the socket's buffer fills up
|
||||
if(DEBUG>=3)MultiPrintStream.out.println("Write Buffer Full!!");
|
||||
if(DEBUG>=3) MultiPrintStream.out.println("Write Buffer Full!!");
|
||||
break;
|
||||
}
|
||||
queue.remove(0);
|
||||
|
|
@ -347,31 +344,32 @@ public abstract class NioNetwork implements Runnable {
|
|||
// We wrote away all data, so we're no longer interested
|
||||
// in writing on this socket. Switch back to waiting for
|
||||
// data.
|
||||
if(DEBUG>=3)MultiPrintStream.out.println("No more Data to write!!");
|
||||
if(DEBUG>=3) MultiPrintStream.out.println("No more Data to write!!");
|
||||
key.interestOps(SelectionKey.OP_READ);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void handleRecivedMessage(SocketChannel socketChannel, Object rspData){
|
||||
if(DEBUG>=2)MultiPrintStream.out.println("Handling incomming message...");
|
||||
if(DEBUG>=2) MultiPrintStream.out.println("Handling incomming message...");
|
||||
|
||||
if(rspData instanceof SystemMessage){
|
||||
if(systemWorker != null){
|
||||
if(DEBUG>=3)MultiPrintStream.out.println("System Message!!!");
|
||||
if(DEBUG>=3) MultiPrintStream.out.println("System Message!!!");
|
||||
systemWorker.processData(this, socketChannel, rspData);
|
||||
}
|
||||
else{
|
||||
if(DEBUG>=2)MultiPrintStream.out.println("Unhandled System Message!!!");
|
||||
if(DEBUG>=2) MultiPrintStream.out.println("Unhandled System Message!!!");
|
||||
}
|
||||
}
|
||||
else{
|
||||
// Hand the data off to our worker thread
|
||||
if(worker != null){
|
||||
if(DEBUG>=3)MultiPrintStream.out.println("Worker Message!!!");
|
||||
if(DEBUG>=3) MultiPrintStream.out.println("Worker Message!!!");
|
||||
worker.processData(this, socketChannel, rspData);
|
||||
}
|
||||
else{
|
||||
if(DEBUG>=1)MultiPrintStream.out.println("Unhandled Worker Message!!!");
|
||||
if(DEBUG>=1) MultiPrintStream.out.println("Unhandled Worker Message!!!");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue