NioNetwork working now, but something wrong with close()

This commit is contained in:
Ziver Koc 2016-12-08 17:57:56 +01:00
parent 13082f0db0
commit de4e4e54c9
30 changed files with 574 additions and 783 deletions

View file

@ -161,8 +161,7 @@ public class Converter {
* @return the associated object. * @return the associated object.
*/ */
public static Object toObject(byte[] bytes) throws Exception{ public static Object toObject(byte[] bytes) throws Exception{
Object object = null; Object object;
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
object = ois.readObject(); object = ois.readObject();
ois.close(); ois.close();
@ -171,20 +170,15 @@ public class Converter {
} }
/** /**
* Converts an array of bytes back to its constituent object. The * Reads the first Java Serialized object from a stream.
* input array is assumed to have been created from the original object.
* *
* @param bytes the byte array to convert. * @param input the stream to read from
* @return the associated object. * @return an parsed object.
*/ */
public static Object toObject(DynamicByteArrayStream bytes) throws Exception{ public static Object toObject(InputStream input) throws Exception{
Object object = null; ObjectInputStream ois = new ObjectInputStream(input);
// Don't close the stream as it will close the underlying stream.
ObjectInputStream ois = new ObjectInputStream(bytes); return ois.readObject();
object = ois.readObject();
ois.close();
return object;
} }

2
src/zutil/log/net/NetLogExceptionMessage.java Normal file → Executable file
View file

@ -28,7 +28,7 @@ import zutil.net.nio.message.Message;
import java.util.logging.LogRecord; import java.util.logging.LogRecord;
public class NetLogExceptionMessage extends Message { public class NetLogExceptionMessage implements Message {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private int count; private int count;

3
src/zutil/log/net/NetLogMessage.java Normal file → Executable file
View file

@ -30,7 +30,7 @@ import java.text.SimpleDateFormat;
import java.util.Date; import java.util.Date;
import java.util.logging.LogRecord; import java.util.logging.LogRecord;
public class NetLogMessage extends Message { public class NetLogMessage implements Message {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private static final SimpleDateFormat dataFormat = private static final SimpleDateFormat dataFormat =
new SimpleDateFormat("yyyy--MM-dd HH:mm:ss"); new SimpleDateFormat("yyyy--MM-dd HH:mm:ss");
@ -42,6 +42,7 @@ public class NetLogMessage extends Message {
private String methodName; private String methodName;
private String log; private String log;
public NetLogMessage(String level, long timestamp, String log){ public NetLogMessage(String level, long timestamp, String log){
this.level = level; this.level = level;
this.timestamp = timestamp; this.timestamp = timestamp;

2
src/zutil/log/net/NetLogStatusMessage.java Normal file → Executable file
View file

@ -26,7 +26,7 @@ package zutil.log.net;
import zutil.net.nio.message.Message; import zutil.net.nio.message.Message;
public class NetLogStatusMessage extends Message{ public class NetLogStatusMessage implements Message{
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public long totalMemory; public long totalMemory;

View file

@ -25,12 +25,11 @@
package zutil.net.nio; package zutil.net.nio;
import zutil.net.nio.message.Message; import zutil.net.nio.message.Message;
import zutil.net.nio.message.RequestResponseMessage;
import zutil.net.nio.response.ResponseEvent;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider; import java.nio.channels.spi.SelectorProvider;
@ -55,7 +54,7 @@ public class NioClient extends NioNetwork{
} }
/** /**
* Sends a Message to the default server * Sends a Message to the connected server
* *
* @param data the data to be sent * @param data the data to be sent
*/ */
@ -64,12 +63,15 @@ public class NioClient extends NioNetwork{
} }
/** /**
* This method is for the Client to send a message to the server * Sends a Message to the connected server
* *
* @param handler the response handler * @param data the data to be sent
* @param data the data to send
*/ */
public void send(ResponseEvent handler, RequestResponseMessage data) throws IOException { public void send(byte[] data) throws IOException {
send(remoteAddress, handler, data); send(remoteAddress, data);
} }
public SocketAddress getRemoteAddress(){
return remoteAddress;
}
} }

View file

@ -26,12 +26,8 @@ package zutil.net.nio;
import zutil.converter.Converter; import zutil.converter.Converter;
import zutil.log.LogUtil; import zutil.log.LogUtil;
import zutil.net.nio.message.RequestResponseMessage;
import zutil.net.nio.message.SystemMessage;
import zutil.net.nio.response.ResponseEvent;
import zutil.net.nio.server.ChangeRequest; import zutil.net.nio.server.ChangeRequest;
import zutil.net.nio.server.ClientData; import zutil.net.nio.server.ClientData;
import zutil.net.nio.worker.SystemWorker;
import zutil.net.nio.worker.Worker; import zutil.net.nio.worker.Worker;
import java.io.IOException; import java.io.IOException;
@ -57,7 +53,6 @@ public abstract class NioNetwork implements Runnable {
// The buffer into which we'll read data when it's available // The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(8192); private ByteBuffer readBuffer = ByteBuffer.allocate(8192);
protected Worker worker; protected Worker worker;
protected SystemWorker systemWorker;
// This map contains all the clients that are connected // This map contains all the clients that are connected
protected Map<InetSocketAddress, ClientData> clients = new HashMap<InetSocketAddress, ClientData>(); protected Map<InetSocketAddress, ClientData> clients = new HashMap<InetSocketAddress, ClientData>();
@ -85,7 +80,6 @@ public abstract class NioNetwork implements Runnable {
this.localAddress = localAddress; this.localAddress = localAddress;
// init selector // init selector
this.selector = initSelector(); this.selector = initSelector();
this.systemWorker = new SystemWorker(this);
// init traffic thread // init traffic thread
new Thread(this).start(); new Thread(this).start();
} }
@ -128,13 +122,6 @@ public abstract class NioNetwork implements Runnable {
send(address, Converter.toBytes(data)); send(address, Converter.toBytes(data));
} }
public void send(SocketAddress address, ResponseEvent handler, RequestResponseMessage data) throws IOException {
// Register the response handler
systemWorker.addResponseHandler(handler, data);
send(address, Converter.toBytes(data));
}
/** /**
* Queues a message to be sent * Queues a message to be sent
* *
@ -168,8 +155,8 @@ public abstract class NioNetwork implements Runnable {
public void run() { public void run() {
logger.fine("NioNetwork Started."); logger.info("NioNetwork Started.");
while (true) { while (selector.isOpen()) {
try { try {
// Handle any pending changes // Handle any pending changes
synchronized (pendingChanges) { synchronized (pendingChanges) {
@ -196,36 +183,36 @@ public abstract class NioNetwork implements Runnable {
logger.finest("selector is awake"); logger.finest("selector is awake");
// Iterate over the set of keys for which events are available // Iterate over the set of keys for which events are available
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); if (selector.isOpen()) {
while (selectedKeys.hasNext()) { Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
SelectionKey key = selectedKeys.next(); while (selectedKeys.hasNext()) {
selectedKeys.remove(); SelectionKey key = selectedKeys.next();
logger.finest("KeyOP: "+key.interestOps()+" isAcceptable: "+SelectionKey.OP_ACCEPT+" isConnectible: "+SelectionKey.OP_CONNECT+" isWritable: "+SelectionKey.OP_WRITE+" isReadable: "+SelectionKey.OP_READ); selectedKeys.remove();
logger.finest("KeyOP: " + key.interestOps() + " isAcceptable: " + SelectionKey.OP_ACCEPT + " isConnectible: " + SelectionKey.OP_CONNECT + " isWritable: " + SelectionKey.OP_WRITE + " isReadable: " + SelectionKey.OP_READ);
if (key.isValid()) { if (key.isValid()) {
// Check what event is available and deal with it // Check what event is available and deal with it
if (key.isAcceptable()) { if (key.isAcceptable()) {
logger.finest("Accepting Connection!!"); logger.finest("Accepting Connection!!");
accept(key); accept(key);
} } else if (key.isConnectable()) {
else if (key.isConnectable()) { logger.finest("Establishing Connection!!");
logger.finest("Establishing Connection!!"); establishConnection(key);
establishConnection(key); } else if (key.isWritable()) {
} logger.finest("Writing");
else if (key.isWritable()) { write(key);
logger.finest("Writing"); } else if (key.isReadable()) {
write(key); logger.finest("Reading");
} read(key);
else if (key.isReadable()) { }
logger.finest("Reading"); }
read(key); }
} }
}
}
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
logger.info("Shutting down NioNetwork");
} }
/** /**
@ -309,7 +296,7 @@ public abstract class NioNetwork implements Runnable {
*/ */
private void read(SelectionKey key) throws IOException { private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel(); SocketChannel socketChannel = (SocketChannel) key.channel();
InetSocketAddress remoteAdr = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress(); SocketAddress remoteAdr = socketChannel.socket().getRemoteSocketAddress();
// Clear out our read buffer so it's ready for new data // Clear out our read buffer so it's ready for new data
readBuffer.clear(); readBuffer.clear();
@ -341,43 +328,25 @@ public abstract class NioNetwork implements Runnable {
} }
// Make a correctly sized copy of the data before handing it to the client // Make a correctly sized copy of the data before handing it to the client
byte[] rspByteData = new byte[numRead]; //byte[] rspByteData = new byte[numRead];
System.arraycopy(readBuffer.array(), 0, rspByteData, 0, numRead); //System.arraycopy(readBuffer.array(), 0, rspByteData, 0, numRead);
try{ try{
Object rspData = Converter.toObject(rspByteData); Object rspData = Converter.toObject(readBuffer.array());
handleReceivedMessage(socketChannel, rspData);
// Hand the data off to our worker thread
if (worker != null) {
logger.finer("Handling incoming message...");
worker.processData(this, socketChannel.getRemoteAddress(), rspData);
} else {
logger.fine("No worker set, message unhandled!");
}
}catch(Exception e){ }catch(Exception e){
e.printStackTrace(); e.printStackTrace();
} }
} }
private void handleReceivedMessage(SocketChannel socketChannel, Object rspData){
logger.finer("Handling incoming message...");
try {
if (rspData instanceof SystemMessage) {
if (systemWorker != null) {
logger.finest("Handling system message");
systemWorker.processData(this, socketChannel.getRemoteAddress(), rspData);
} else {
logger.finer("Unhandled system message!");
}
} else {
// Hand the data off to our worker thread
if (worker != null) {
logger.finest("Handling generic worker message");
worker.processData(this, socketChannel.getRemoteAddress(), rspData);
} else {
logger.fine("Unhandled message!");
}
}
}catch (IOException e){
e.printStackTrace();
}
}
private ClientData registerSocketChannel(SocketChannel socket){ private ClientData registerSocketChannel(SocketChannel socket){
InetSocketAddress remoteAdr = (InetSocketAddress) socket.socket().getRemoteSocketAddress(); InetSocketAddress remoteAdr = (InetSocketAddress) socket.socket().getRemoteSocketAddress();
@ -395,7 +364,7 @@ public abstract class NioNetwork implements Runnable {
/** /**
* Close a ongoing connection * Close a specific ongoing connection
*/ */
protected void closeConnection(InetSocketAddress address) throws IOException{ protected void closeConnection(InetSocketAddress address) throws IOException{
closeConnection(getSocketChannel(address)); closeConnection(getSocketChannel(address));
@ -406,14 +375,17 @@ public abstract class NioNetwork implements Runnable {
socketChannel.keyFor(selector).cancel(); socketChannel.keyFor(selector).cancel();
} }
/**
* Close all connections
/*public void close() throws IOException{ */
public void close() throws IOException{
if(serverChannel != null){ if(serverChannel != null){
serverChannel.close(); serverChannel.close();
serverChannel.keyFor(selector).cancel(); serverChannel.keyFor(selector).cancel();
} }
selector.close(); clients.clear();
}*/ pendingChanges.clear();
pendingWriteData.clear();
selector.close();
}
} }

View file

@ -25,18 +25,18 @@
package zutil.net.nio.message; package zutil.net.nio.message;
/** /**
* The reciver will echo out this message to the sender * The receiver will echo out this message to the sender
* *
* @author Ziver * @author Ziver
*/ */
public abstract class EchoMessage extends Message implements SystemMessage{ public abstract class EchoMessage implements Message{
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private boolean echo = false; private boolean echo = false;
/** /**
* @return true if this message is an echo of an original message * @return true if this message is an echo/copy of an original message
*/ */
public boolean echo() { public boolean echo() {
return echo; return echo;

View file

@ -1,38 +0,0 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Ziver Koc
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package zutil.net.nio.message;
/**
* Tells the destination that the
* source is still online
*
* @author Ziver
*
*/
public class KeepAliveMessage extends Message implements SystemMessage{
private static final long serialVersionUID = 1L;
}

4
src/zutil/net/nio/message/Message.java Normal file → Executable file
View file

@ -26,8 +26,6 @@ package zutil.net.nio.message;
import java.io.Serializable; import java.io.Serializable;
public class Message implements Serializable{ public interface Message extends Serializable{
private static final long serialVersionUID = 1L;
} }

View file

@ -1,36 +0,0 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Ziver Koc
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package zutil.net.nio.message;
/**
* A message that implements this will be
* handeld internaly by the network engine
*
* @author Ziver
*
*/
public interface SystemMessage {
}

View file

@ -24,13 +24,12 @@
package zutil.net.nio.response; package zutil.net.nio.response;
import zutil.io.MultiPrintStream;
public class PrintRsp extends ResponseEvent{ public class PrintResponseHandler extends ResponseHandler {
@Override @Override
protected void responseEvent(Object rsp) { protected void responseEvent(Object rsp) {
MultiPrintStream.out.println(rsp); System.out.println(rsp);
} }
} }

View file

@ -22,7 +22,7 @@
* THE SOFTWARE. * THE SOFTWARE.
*/ */
package zutil.net.nio.message; package zutil.net.nio.response;
/** /**
* This interface defines a request response flow where a request * This interface defines a request response flow where a request

View file

@ -1,67 +0,0 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Ziver Koc
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package zutil.net.nio.response;
public abstract class ResponseEvent {
private Object rsp = null;
public synchronized boolean handleResponse(Object rsp) {
this.rsp = rsp;
notify();
return true;
}
/**
* Blocks the Thread until there is a response
*/
public synchronized void waitForResponse() {
while(!gotResponse()) {
try {
this.wait();
} catch (InterruptedException e) {}
}
responseEvent(rsp);
}
/**
* Handles the response
*/
public void handleResponse(){
if(gotResponse()){
responseEvent(rsp);
}
}
/**
* @return If there is an response
*/
public boolean gotResponse(){
return (rsp != null);
}
protected abstract void responseEvent(Object rsp);
}

49
src/zutil/net/nio/response/ResponseHandler.java Normal file → Executable file
View file

@ -24,42 +24,35 @@
package zutil.net.nio.response; package zutil.net.nio.response;
import java.util.LinkedList; // TODO: this class has a strange structure, should be refactored
import java.util.List; public abstract class ResponseHandler {
private Object rsp = null;
public synchronized void handleResponse(Object rsp) {
public abstract class ResponseHandler implements Runnable{ this.rsp = rsp;
private List<ResponseEvent> queue = new LinkedList<ResponseEvent>(); responseEvent(rsp);
public ResponseHandler(){
}
public synchronized void addResponseEvent(ResponseEvent re){
queue.add(re);
notify(); notify();
} }
public synchronized void removeResponseEvent(ResponseEvent re){ /**
queue.remove(re); * Blocks the calling thread until there is a response
} */
public void waitForResponse() {
public void run() { while(!gotResponse()) {
while(true) {
try { try {
this.wait(); synchronized (this) {
this.wait();
}
} catch (InterruptedException e) {} } catch (InterruptedException e) {}
update();
} }
} }
public synchronized void update(){ /**
while(!queue.isEmpty()){ * @return true if a response has been received
queue.get(0).handleResponse(); */
if(queue.get(0).gotResponse()){ public boolean gotResponse(){
queue.remove(0); return (rsp != null);
}
}
} }
protected abstract void responseEvent(Object rsp);
} }

View file

@ -22,33 +22,33 @@
* THE SOFTWARE. * THE SOFTWARE.
*/ */
package zutil.net.nio.message; package zutil.net.nio.response;
import zutil.net.nio.message.EchoMessage;
public class StringResponseMessage extends EchoMessage implements RequestResponseMessage { public class StringResponseMessage extends EchoMessage implements RequestResponseMessage {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private long responseId; private long responseId;
private String msg; private String msg;
public StringResponseMessage(String msg){ public StringResponseMessage(String msg){
this.msg = msg; this.msg = msg;
responseId = (long)(Math.random()*Long.MAX_VALUE); responseId = (long)(Math.random()*Long.MAX_VALUE);
} }
public String getString(){
return msg; public long getResponseId() {
} return responseId;
}
public void setString(String msg){ public void setString(String msg){
this.msg = msg; this.msg = msg;
} }
public String toString(){ public String toString(){
return getString(); return msg;
}
public long getResponseId() {
return responseId;
} }
} }

View file

@ -1,44 +0,0 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Ziver Koc
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package zutil.net.nio.worker;
import java.io.IOException;
public class EchoWorker extends ThreadedEventWorker {
@Override
public void messageEvent(WorkerEventData dataEvent) {
try {
// Return to sender
dataEvent.network.send(dataEvent.remoteAddress, dataEvent.data);
} catch (IOException e) {
e.printStackTrace();
}
}
}

View file

@ -24,33 +24,36 @@
package zutil.net.nio.worker; package zutil.net.nio.worker;
import zutil.converter.Converter;
import zutil.log.LogUtil; import zutil.log.LogUtil;
import zutil.net.nio.NioNetwork; import zutil.net.nio.NioNetwork;
import zutil.net.nio.message.Message; import zutil.net.nio.message.Message;
import zutil.net.nio.message.EchoMessage; import zutil.net.nio.message.EchoMessage;
import zutil.net.nio.message.RequestResponseMessage; import zutil.net.nio.response.RequestResponseMessage;
import zutil.net.nio.response.ResponseEvent; import zutil.net.nio.response.ResponseHandler;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.logging.Logger; import java.util.logging.Logger;
public class SystemWorker extends ThreadedEventWorker { public class StandardWorker extends ThreadedEventWorker {
private static Logger logger = LogUtil.getLogger(); private static Logger logger = LogUtil.getLogger();
private NioNetwork nio; private NioNetwork nio;
// Maps a responseId to a RspHandler // Maps a responseId to a RspHandler
private Map<Long, ResponseEvent> rspEvents = new HashMap<>(); private Map<Long, ResponseHandler> rspEvents = new HashMap<>();
// Different services listening on specific messages // Different services listening on specific messages
private Map<Class<?>, ThreadedEventWorker> services = new HashMap<>(); private Map<Class<?>, ThreadedEventWorker> services = new HashMap<>();
/** /**
* Creates a new SystemWorker * Creates a new StandardWorker
*/ */
public SystemWorker(NioNetwork nio){ public StandardWorker(NioNetwork nio){
this.nio = nio; this.nio = nio;
} }
@ -59,31 +62,30 @@ public class SystemWorker extends ThreadedEventWorker {
@Override @Override
public void messageEvent(WorkerEventData event) { public void messageEvent(WorkerEventData event) {
try { try {
logger.finer("System Message: "+event.data.getClass().getName()); logger.finer("Message: "+event.data.getClass().getName());
if(event.data instanceof Message){
if(event.data instanceof EchoMessage && ((EchoMessage)event.data).echo()){ if(event.data instanceof EchoMessage && !((EchoMessage)event.data).echo()){
// Echos back the received message // Echo back the received message
((EchoMessage)event.data).received(); ((EchoMessage)event.data).received();
logger.finer("Echoing Message: "+event.data); logger.finer("Echoing Message: "+event.data);
nio.send(event.remoteAddress, event.data); nio.send(event.remoteAddress, event.data);
} }
else if(event.data instanceof RequestResponseMessage && else if(event.data instanceof RequestResponseMessage &&
rspEvents.get(((RequestResponseMessage)event.data).getResponseId()) != null){ rspEvents.get(((RequestResponseMessage)event.data).getResponseId()) != null){
long responseId = ((RequestResponseMessage)event.data).getResponseId(); long responseId = ((RequestResponseMessage)event.data).getResponseId();
// Look up the handler for this channel // Look up the handler for this channel
ResponseEvent handler = rspEvents.get(responseId); ResponseHandler handler = rspEvents.get(responseId);
// And pass the response to it // And pass the response to it
handler.handleResponse(event.data); handler.handleResponse(event.data);
rspEvents.remove(responseId); rspEvents.remove(responseId);
logger.finer("Response Request Message: "+event.data); logger.finer("Response Request Message: "+event.data);
} }
else{ else{
// Check mapped workers // Check mapped workers
if(services.containsKey(event.data.getClass())){ if(services.containsKey(event.data.getClass())){
services.get(event.data.getClass()).messageEvent(event); services.get(event.data.getClass()).messageEvent(event);
} }
} }
}
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -108,11 +110,17 @@ public class SystemWorker extends ThreadedEventWorker {
services.remove(messageClass); services.remove(messageClass);
} }
/** /**
* Connects a ResponseHandler to a specific message object * Send a message with a defined response handler
*/ *
public void addResponseHandler(ResponseEvent handler, RequestResponseMessage data){ * @param address the target host for the message
rspEvents.put(data.getResponseId(), handler); * @param message the message object
} * @param handler the handler that should be called when a response is received
*/
public void send(SocketAddress address, RequestResponseMessage message, ResponseHandler handler) throws IOException {
// Register the response handler
rspEvents.put(message.getResponseId(), handler);
nio.send(address, Converter.toBytes(message));
}
} }

View file

@ -32,7 +32,8 @@ import java.util.List;
public abstract class Worker { public abstract class Worker {
private LinkedList<WorkerEventData> queue = new LinkedList<WorkerEventData>(); private LinkedList<WorkerEventData> queue = new LinkedList<>();
public void processData(NioNetwork server, SocketAddress remote, Object data) { public void processData(NioNetwork server, SocketAddress remote, Object data) {
synchronized(queue) { synchronized(queue) {
@ -50,7 +51,7 @@ public abstract class Worker {
} }
/** /**
* Polls a event from the list or waits until there is a event * Polls a event from the list or blocks until there is a event available
* *
* @return the next event * @return the next event
*/ */

View file

@ -35,6 +35,7 @@ public class WorkerEventData {
public SocketAddress remoteAddress; public SocketAddress remoteAddress;
public Object data; public Object data;
public WorkerEventData(NioNetwork server, SocketAddress remoteAddress, Object data) { public WorkerEventData(NioNetwork server, SocketAddress remoteAddress, Object data) {
this.network = server; this.network = server;
this.remoteAddress = remoteAddress; this.remoteAddress = remoteAddress;

View file

@ -22,9 +22,11 @@
* THE SOFTWARE. * THE SOFTWARE.
*/ */
package zutil.net.nio.message; package zutil.net.nio.worker.chat;
public class ChatMessage extends Message{ import zutil.net.nio.message.Message;
public class ChatMessage implements Message {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public static enum ChatMessageType {REGISTER, UNREGISTER, MESSAGE}; public static enum ChatMessageType {REGISTER, UNREGISTER, MESSAGE};

View file

@ -25,14 +25,10 @@
package zutil.net.nio.worker.chat; package zutil.net.nio.worker.chat;
import zutil.log.LogUtil; import zutil.log.LogUtil;
import zutil.net.nio.NioNetwork;
import zutil.net.nio.message.ChatMessage;
import zutil.net.nio.message.Message;
import zutil.net.nio.worker.ThreadedEventWorker; import zutil.net.nio.worker.ThreadedEventWorker;
import zutil.net.nio.worker.WorkerEventData; import zutil.net.nio.worker.WorkerEventData;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.logging.Logger; import java.util.logging.Logger;

View file

@ -26,7 +26,6 @@ package zutil.net.nio.worker.grid;
import zutil.io.MultiPrintStream; import zutil.io.MultiPrintStream;
import zutil.net.nio.NioClient; import zutil.net.nio.NioClient;
import zutil.net.nio.message.GridMessage;
import zutil.net.nio.worker.ThreadedEventWorker; import zutil.net.nio.worker.ThreadedEventWorker;
import zutil.net.nio.worker.WorkerEventData; import zutil.net.nio.worker.WorkerEventData;

View file

@ -22,9 +22,11 @@
* THE SOFTWARE. * THE SOFTWARE.
*/ */
package zutil.net.nio.message; package zutil.net.nio.worker.grid;
public class GridMessage<T> extends Message{ import zutil.net.nio.message.Message;
public class GridMessage<T> implements Message {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
// Client type messages // Client type messages
@ -49,14 +51,13 @@ public class GridMessage<T> extends Message{
private int type; private int type;
private int jobID; private int jobId;
private T data; private T data;
/** /**
* Creates a new GridMessage * Creates a new GridMessage
* *
* @param type is the type of message * @param type is the type of message
* @param jobID is the id of the job
*/ */
public GridMessage(int type){ public GridMessage(int type){
this(type, 0, null); this(type, 0, null);
@ -66,22 +67,22 @@ public class GridMessage<T> extends Message{
* Creates a new GridMessage * Creates a new GridMessage
* *
* @param type is the type of message * @param type is the type of message
* @param jobID is the id of the job * @param jobId is the id of the job
*/ */
public GridMessage(int type, int jobID){ public GridMessage(int type, int jobId){
this(type, jobID, null); this(type, jobId, null);
} }
/** /**
* Creates a new GridMessage * Creates a new GridMessage
* *
* @param type is the type of message * @param type is the type of message
* @param jobID is the id of the job * @param jobId is the id of the job
* @param data is the data to send with this message * @param data is the data to send with this message
*/ */
public GridMessage(int type, int jobID, T data){ public GridMessage(int type, int jobId, T data){
this.type = type; this.type = type;
this.jobID = jobID; this.jobId = jobId;
this.data = data; this.data = data;
} }
@ -96,7 +97,7 @@ public class GridMessage<T> extends Message{
* @return the job id for this message * @return the job id for this message
*/ */
public int getJobQueueID(){ public int getJobQueueID(){
return jobID; return jobId;
} }
/** /**

View file

@ -24,7 +24,6 @@
package zutil.net.nio.worker.grid; package zutil.net.nio.worker.grid;
import zutil.net.nio.message.GridMessage;
import zutil.net.nio.worker.ThreadedEventWorker; import zutil.net.nio.worker.ThreadedEventWorker;
import zutil.net.nio.worker.WorkerEventData; import zutil.net.nio.worker.WorkerEventData;

View file

@ -22,7 +22,7 @@
* THE SOFTWARE. * THE SOFTWARE.
*/ */
package zutil.net.nio.message; package zutil.net.nio.worker.sync;
public class GraphicsSyncMessage extends SyncMessage{ public class GraphicsSyncMessage extends SyncMessage{

View file

@ -24,8 +24,6 @@
package zutil.net.nio.worker.sync; package zutil.net.nio.worker.sync;
import zutil.net.nio.message.SyncMessage;
public abstract class ObjectSync { public abstract class ObjectSync {
public String id; public String id;

View file

@ -22,11 +22,18 @@
* THE SOFTWARE. * THE SOFTWARE.
*/ */
package zutil.net.nio.message; package zutil.net.nio.worker.sync;
public class SyncMessage extends Message implements SystemMessage{ import zutil.net.nio.message.Message;
public class SyncMessage implements Message {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public static enum MessageType { REQUEST_ID, NEW, REMOVE, SYNC }; public enum MessageType {
REQUEST_ID,
NEW,
REMOVE,
SYNC
}
// type of message // type of message
public MessageType type; public MessageType type;

View file

@ -25,13 +25,9 @@
package zutil.net.nio.worker.sync; package zutil.net.nio.worker.sync;
import zutil.log.LogUtil; import zutil.log.LogUtil;
import zutil.net.nio.NioNetwork;
import zutil.net.nio.message.Message;
import zutil.net.nio.message.SyncMessage;
import zutil.net.nio.worker.ThreadedEventWorker; import zutil.net.nio.worker.ThreadedEventWorker;
import zutil.net.nio.worker.WorkerEventData; import zutil.net.nio.worker.WorkerEventData;
import java.nio.channels.SocketChannel;
import java.util.HashMap; import java.util.HashMap;
import java.util.logging.Logger; import java.util.logging.Logger;

View file

@ -26,8 +26,9 @@ package zutil.net.nio;
import zutil.log.CompactLogFormatter; import zutil.log.CompactLogFormatter;
import zutil.log.LogUtil; import zutil.log.LogUtil;
import zutil.net.nio.message.StringResponseMessage; import zutil.net.nio.response.StringResponseMessage;
import zutil.net.nio.response.PrintRsp; import zutil.net.nio.response.PrintResponseHandler;
import zutil.net.nio.worker.StandardWorker;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
@ -39,21 +40,27 @@ import java.util.logging.Level;
public class NetworkClientTest { public class NetworkClientTest {
public static void main(String[] args) throws NoSuchAlgorithmException, InterruptedException { public static void main(String[] args) throws NoSuchAlgorithmException, InterruptedException {
try { try {
LogUtil.setGlobalLevel(Level.ALL); //LogUtil.setGlobalLevel(Level.ALL);
LogUtil.setGlobalFormatter(new CompactLogFormatter()); LogUtil.setGlobalFormatter(new CompactLogFormatter());
int count = 0; int count = 0;
long time = System.currentTimeMillis()+1000*60; long time = System.currentTimeMillis()+1000*60;
NioClient client = new NioClient(InetAddress.getByName("localhost"), 6056); NioClient client = new NioClient(InetAddress.getByName("localhost"), 6056);
StandardWorker worker = new StandardWorker(client);
client.setDefaultWorker(worker);
Thread.sleep(1000); Thread.sleep(1000);
while(time > System.currentTimeMillis()){ while(time > System.currentTimeMillis()){
PrintRsp handler = new PrintRsp(); PrintResponseHandler handler = new PrintResponseHandler();
client.send(handler, new StringResponseMessage("StringResponseMessage: "+count)); worker.send(client.getRemoteAddress(),
new StringResponseMessage("StringResponseMessage: "+count),
handler);
handler.waitForResponse(); handler.waitForResponse();
//Thread.sleep(100); //Thread.sleep(100);
//System.out.println("sending.."); //System.out.println("sending..");
count++; count++;
} }
client.close();
System.out.println("Message Count 1m: "+count); System.out.println("Message Count 1m: "+count);
System.out.println("Message Count 1s: "+count/60); System.out.println("Message Count 1s: "+count/60);

View file

@ -26,6 +26,7 @@ package zutil.net.nio;
import zutil.log.CompactLogFormatter; import zutil.log.CompactLogFormatter;
import zutil.log.LogUtil; import zutil.log.LogUtil;
import zutil.net.nio.worker.StandardWorker;
import java.io.IOException; import java.io.IOException;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
@ -36,10 +37,11 @@ import java.util.logging.Level;
public class NetworkServerTest { public class NetworkServerTest {
public static void main(String[] args) throws NoSuchAlgorithmException, InterruptedException { public static void main(String[] args) throws NoSuchAlgorithmException, InterruptedException {
try { try {
LogUtil.setGlobalLevel(Level.ALL); //LogUtil.setGlobalLevel(Level.ALL);
LogUtil.setGlobalFormatter(new CompactLogFormatter()); LogUtil.setGlobalFormatter(new CompactLogFormatter());
NioServer server = new NioServer(6056); NioServer server = new NioServer(6056);
server.setDefaultWorker(new StandardWorker(server));
while(true){ while(true){
Thread.sleep(1000); Thread.sleep(1000);