From 9297bea93da4100c95337a2650caed742890ad7b Mon Sep 17 00:00:00 2001 From: Ziver Koc Date: Thu, 26 Feb 2009 17:10:57 +0000 Subject: [PATCH] Added some algorithms and moved some files and fixed some comments --- src/zutil/Converter.java | 161 ++++++++++++++---- src/zutil/Hasher.java | 10 +- src/zutil/MultiPrintStream.java | 1 + src/zutil/algo/EuclideansAlgo.java | 117 +++++++++++++ src/zutil/algo/WienersAlgo.java | 68 ++++++++ src/zutil/algo/path/DijkstraPathFinder.java | 39 ----- src/zutil/algo/{ => search}/QuickSelect.java | 2 +- src/zutil/db/MySQLConnection.java | 38 ++--- src/zutil/math/ZMath.java | 63 ++++++- src/zutil/network/http/HttpPage.java | 10 +- src/zutil/network/http/HttpPrintStream.java | 10 +- src/zutil/network/nio/NioNetwork.java | 67 +++++--- .../network/nio/message/GridMessage.java | 84 +++++++++ .../network/nio/service/chat/ChatService.java | 56 +++--- src/zutil/network/nio/worker/EchoWorker.java | 12 +- .../network/nio/worker/SystemWorker.java | 42 ++--- .../network/nio/worker/grid/GridClient.java | 114 +++++++++++++ .../network/nio/worker/grid/GridJob.java | 22 +++ .../nio/worker/grid/GridJobGenerator.java | 18 ++ .../nio/worker/grid/GridResultHandler.java | 10 ++ .../nio/worker/grid/GridServerWorker.java | 111 ++++++++++++ .../network/nio/worker/grid/GridThread.java | 38 +++++ src/zutil/struct/BloomFilter.java | 34 ++-- src/zutil/struct/DynamicByteArrayStream.java | 106 ++++++++++++ src/zutil/test/QuickSelectTest.java | 2 +- 25 files changed, 1043 insertions(+), 192 deletions(-) create mode 100644 src/zutil/algo/EuclideansAlgo.java create mode 100644 src/zutil/algo/WienersAlgo.java delete mode 100644 src/zutil/algo/path/DijkstraPathFinder.java rename src/zutil/algo/{ => search}/QuickSelect.java (95%) create mode 100644 src/zutil/network/nio/message/GridMessage.java create mode 100644 src/zutil/network/nio/worker/grid/GridClient.java create mode 100644 src/zutil/network/nio/worker/grid/GridJob.java create mode 100644 src/zutil/network/nio/worker/grid/GridJobGenerator.java create mode 100644 src/zutil/network/nio/worker/grid/GridResultHandler.java create mode 100644 src/zutil/network/nio/worker/grid/GridServerWorker.java create mode 100644 src/zutil/network/nio/worker/grid/GridThread.java create mode 100644 src/zutil/struct/DynamicByteArrayStream.java diff --git a/src/zutil/Converter.java b/src/zutil/Converter.java index 8757e39..5199535 100644 --- a/src/zutil/Converter.java +++ b/src/zutil/Converter.java @@ -7,6 +7,8 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.BitSet; +import zutil.struct.DynamicByteArrayStream; + public class Converter { /** @@ -14,19 +16,42 @@ public class Converter { * * @param object the object to convert. * @return the associated byte array. + * @throws IOException */ - public static byte[] toBytes(Object object){ + public static byte[] toBytes(Object object) throws IOException{ ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try{ - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(object); - oos.flush(); - oos.close(); - }catch(IOException ioe){ - System.out.println(ioe.getMessage()); - } + + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(object); + oos.flush(); + oos.close(); + return baos.toByteArray(); } + + /** + * Converts a Integer to an byte array + * + * @param num is the number to convert + * @return an byte array of four bytes + */ + public static byte[] toBytes(int num){ + return new byte[]{ + (byte)(num & 0xff), + (byte)((num >> 8)& 0xff), + (byte)((num >> 16)& 0xff), + (byte)((num >> 24)& 0xff)}; + } + + /** + * Converts a Integer to an byte + * + * @param num is the number to convert + * @return an byte + */ + public static byte toByte(int num){ + return (byte)(num & 0xff); + } /** * Converts an array of bytes back to its constituent object. The @@ -34,20 +59,33 @@ public class Converter { * * @param bytes the byte array to convert. * @return the associated object. + * @throws Exception */ - public static Object toObject(byte[] bytes) { + public static Object toObject(byte[] bytes) throws Exception{ Object object = null; - try{ - ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - ObjectInputStream ois= new ObjectInputStream(bais); - object = ois.readObject(); - ois.close(); - bais.close(); - }catch(IOException ioe){ - System.out.println(ioe.getMessage()); - }catch(ClassNotFoundException cnfe){ - System.out.println(cnfe.getMessage()); - } + + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); + object = ois.readObject(); + ois.close(); + + return object; + } + + /** + * Converts an array of bytes back to its constituent object. The + * input array is assumed to have been created from the original object. + * + * @param bytes the byte array to convert. + * @return the associated object. + * @throws Exception + */ + public static Object toObject(DynamicByteArrayStream bytes) throws Exception{ + Object object = null; + + ObjectInputStream ois = new ObjectInputStream(bytes); + object = ois.readObject(); + ois.close(); + return object; } @@ -69,25 +107,70 @@ public class Converter { return false; } - // array neaded for byteToHex + /** array needed for byteToHex */ private static char[] HEX_CHARS = {'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'}; /** * Converts a byte Array to a Hex String * - * @param raw the byte arrat to convert + * @param raw the byte array to convert + * @return a Hex String + */ + public static String toHexString(byte[][] raw){ + StringBuffer ret = new StringBuffer(); + + for(byte[] a : raw){ + for(byte b : a){ + ret.append(HEX_CHARS[(int) (b >>> 0x04)& 0x0F ]); + ret.append(HEX_CHARS[(int) b & 0x0F ]); + } + } + + return ret.toString(); + } + + public static String toHexStringByColumn(byte[][] raw){ + StringBuffer ret = new StringBuffer(); + + for(int col=0; col>> 0x04)& 0x0F ]); + ret.append(HEX_CHARS[(int) raw[row][col] & 0x0F ]); + } + } + + return ret.toString(); + } + + /** + * Converts a byte Array to a Hex String + * + * @param raw the byte array to convert * @return a Hex String */ public static String toHexString(byte[] raw){ StringBuffer ret = new StringBuffer(); for(byte b : raw){ - ret.append(HEX_CHARS[(int) b & 0x0F ]); ret.append(HEX_CHARS[(int) (b >>> 0x04)& 0x0F ]); + ret.append(HEX_CHARS[(int) b & 0x0F ]); } return ret.toString(); } + /** + * Converts a byte to a Hex String + * + * @param raw the byte to convert + * @return a Hex String + */ + public static String toHexString(byte raw){ + String ret = ""+HEX_CHARS[(int) (raw >>> 0x04)& 0x0F ]; + ret += ""+HEX_CHARS[(int) raw & 0x0F ]; + + return ret; + } + /** * Converts the given byte to a String with 1's and 0's * @@ -101,7 +184,7 @@ public class Converter { } return ret.toString(); } - + /** * Converts the given byte array to a String with 1's and 0's * @@ -117,7 +200,7 @@ public class Converter { } return ret.toString(); } - + /** * Converts a BitSet to a Integer * @@ -126,14 +209,30 @@ public class Converter { */ public static int toInt(BitSet bits){ int ret = 0; - - for (int i = bits.nextSetBit(0); i >= 0; i = bits.nextSetBit(i+1)) { - ret += Math.pow(2, i); - } - + + for (int i = bits.nextSetBit(0); i >= 0; i = bits.nextSetBit(i+1)) { + ret += Math.pow(2, i); + } + return ret; } + /** + * Converts a boolean array(bit sequence whit most significant bit at index 0) to a Integer + * + * @param bits the boolean array to convert + * @return a Integer + */ + public static int toInt(boolean[] bits){ + int ret = 0; + + for (int i = bits.length-1; i >= 0; i--) { + if(bits[i])ret += Math.pow(2, bits.length-i-1); + } + + return ret; + } + /** * Converts a Integer to a BitSet * diff --git a/src/zutil/Hasher.java b/src/zutil/Hasher.java index c8aeff4..7e806b1 100644 --- a/src/zutil/Hasher.java +++ b/src/zutil/Hasher.java @@ -52,7 +52,7 @@ public class Hasher { public static String MD5(Serializable object){ try { return hash(object, "MD5"); - } catch (NoSuchAlgorithmException e) { + } catch (Exception e) { e.printStackTrace(); } return null; @@ -67,7 +67,7 @@ public class Hasher { public static String SHA1(Serializable object){ try { return hash(object, "SHA-1"); - } catch (NoSuchAlgorithmException e) { + } catch (Exception e) { e.printStackTrace(); } return null; @@ -80,8 +80,9 @@ public class Hasher { * @param hashType The hash method (MD2, MD5, SHA-1, SHA-256, SHA-384, SHA-512 ) * @return String containing the hash * @throws NoSuchAlgorithmException + * @throws IOException */ - public static String hash(Serializable object, String hashType) throws NoSuchAlgorithmException { + public static String hash(Serializable object, String hashType) throws Exception { MessageDigest md = null; md = MessageDigest.getInstance(hashType); //MD5 || SHA md.update(Converter.toBytes(object)); @@ -97,8 +98,9 @@ public class Hasher { * @param object The Key * @param seed Seed * @return A MurmurHash of the key + * @throws Exception */ - public static int MurmurHash(Serializable object, int seed){ + public static int MurmurHash(Serializable object, int seed) throws Exception{ byte[] data = Converter.toBytes(object); int length = data.length; diff --git a/src/zutil/MultiPrintStream.java b/src/zutil/MultiPrintStream.java index 28f0c30..c91bf74 100644 --- a/src/zutil/MultiPrintStream.java +++ b/src/zutil/MultiPrintStream.java @@ -202,6 +202,7 @@ public class MultiPrintStream extends PrintStream { */ @SuppressWarnings("unchecked") private String dump( Object o , boolean print) { + if(o == null) return "NULL"; StringBuffer buffer = new StringBuffer(); Class oClass = o.getClass(); buffer.append( oClass.getName() ); diff --git a/src/zutil/algo/EuclideansAlgo.java b/src/zutil/algo/EuclideansAlgo.java new file mode 100644 index 0000000..0a7643b --- /dev/null +++ b/src/zutil/algo/EuclideansAlgo.java @@ -0,0 +1,117 @@ +package zutil.algo; + +import java.math.BigInteger; +import java.util.LinkedList; + +import zutil.MultiPrintStream; + +/** + * Euclidean algorithm is an algorithm to determine + * the greatest common divisor (GCD) + * + * @author Ziver + * + */ +public class EuclideansAlgo { + + /** + * Simple Test + * @param args + */ + public static void main(String[] args){ + MultiPrintStream.out.println("*** Correct Answer: "); + MultiPrintStream.out.println("java.util.LinkedList{0, 2, 1, 1, 1, 4, 12, 102, 1, 1, 2, 3, 2, 2, 36}"); + MultiPrintStream.out.println("GCD: 1"); + + MultiPrintStream.out.println("*** Integer:"); + MultiPrintStream.out.dump(calcGenerators(60728973, 160523347)); + MultiPrintStream.out.println("GCD: "+calc(60728973, 160523347)); + + MultiPrintStream.out.println("*** BigInteger: "); + MultiPrintStream.out.dump(calcGenerators(new BigInteger("60728973"), new BigInteger("160523347"))); + MultiPrintStream.out.println("GCD: "+calc(new BigInteger("60728973"), new BigInteger("160523347"))); + } + + /** + * Runs the Euclidean algorithm on the two input + * values. + * + * @param a is the first integer + * @param b is the second integer + * @return a integer containing the GCD of the integers + */ + public static int calc(int a, int b){ + int t; + while( b != 0 ){ + t = b; + b = a % b; + a = t; + } + + return a; + } + + /** + * Runs the Euclidean algorithm on the two input + * values. + * + * @param a is the first BigInteger + * @param b is the second BigInteger + * @return a BigInteger containing the GCD of the BigIntegers + */ + public static BigInteger calc(BigInteger a, BigInteger b){ + BigInteger t; + + while( !b.equals(BigInteger.ZERO) ){ + t = b; + b = a.mod( b ); + a = t; + } + + return a; + } + + /** + * Runs the Euclidean algorithm on the two input + * values to find the generators for the values. + * + * @param a is the first integer + * @param b is the second integer + * @return a list of integers that is generators for a and b + */ + public static LinkedList calcGenerators(int a, int b){ + LinkedList list = new LinkedList(); + int t; + + while( b != 0 ){ + list.add( a/b ); + t = b; + b = a % b; + a = t; + } + + return list; + } + + /** + * Runs the Euclidean algorithm on the two input + * values to find the generators for the values. + * + * @param a is the first BigInteger + * @param b is the second BigInteger + * @return a list of BigIntegers that is generators of a and b + */ + public static LinkedList calcGenerators(BigInteger a, BigInteger b){ + LinkedList list = new LinkedList(); + BigInteger t; + + while( !b.equals(BigInteger.ZERO) ){ + list.add( new BigInteger("0").add( a.divide( b ) ) ); + t = b; + b = a.mod( b ); + a = t; + } + + return list; + } +} diff --git a/src/zutil/algo/WienersAlgo.java b/src/zutil/algo/WienersAlgo.java new file mode 100644 index 0000000..3593df7 --- /dev/null +++ b/src/zutil/algo/WienersAlgo.java @@ -0,0 +1,68 @@ +package zutil.algo; + +import java.math.BigInteger; +import java.util.LinkedList; + +import zutil.math.ZMath; + +/** + * The Wieners algorithm factorizes two big numbers a and b. + * It uses the Euclidien algorithm to calculate the generator of the + * numbers and then uses them to calculate the factorization. + * + * @author Ziver + * + */ +public class WienersAlgo { + + /** + * Runs the Wieners algorithm for the given values. + * + * @param n is the first value + * @param e is the second value + * @return a BigInteger array of length two. + * First index is p and second is q. + * If no value was found then it returns null. + */ + public static BigInteger[] calc(BigInteger n, BigInteger e){ + BigInteger[] ret = null; + + LinkedList gen = EuclideansAlgo.calcGenerators(e, n); + + BigInteger c0 = BigInteger.ONE; + BigInteger c1 = gen.poll(); + BigInteger d0 = BigInteger.ZERO; + BigInteger d1 = BigInteger.ONE; + + BigInteger t, n1, g; + while(!gen.isEmpty()){ + g = gen.poll(); + + t = c1; + c1 = g.multiply( c1 ).add( c0 ); + c0 = t; + + t = d1; + d1 = g.multiply( d1 ).add( d0 ); + d0 = t; + + n1 = d1.multiply( e ).subtract( BigInteger.ONE ); + if( n1.mod( c1 ).equals( BigInteger.ZERO ) ){ + n1 = n1.divide( c1 ); + + // x^2 - ( n - n1 +1 )x + n = 0 + ret = ZMath.pqFormula( + n.subtract( n1 ).add( BigInteger.ONE ).negate(), + n); + + if(ret[0].compareTo( BigInteger.ZERO ) >= 0 && + ret[1].compareTo( BigInteger.ZERO ) >= 0 && + ret[0].multiply( ret[1] ).equals( n )){ + return ret; + } + } + } + + return null; + } +} diff --git a/src/zutil/algo/path/DijkstraPathFinder.java b/src/zutil/algo/path/DijkstraPathFinder.java deleted file mode 100644 index 558ae91..0000000 --- a/src/zutil/algo/path/DijkstraPathFinder.java +++ /dev/null @@ -1,39 +0,0 @@ -package zutil.algo.path; - -import java.util.LinkedList; - -public class DijkstraPathFinder { - - public static LinkedList find(PathNode start, PathNode stop){ - // TODO - /* - 1 - - 5 dist[source] := 0 // Distance from source to source - 6 Q := copy(Graph) // All nodes in the graph are unoptimized - thus are in Q - 7 while Q is not empty: // The main loop - 8 u := extract_min(Q) // Remove and return best vertex from nodes in two given nodes - // we would use a path finding algorithm on the new graph, such as depth-first search. - 9 for each neighbor v of u: // where v has not yet been considered - 10 alt = dist[u] + length(u, v) - 11 if alt < dist[v] // Relax (u,v) - 12 dist[v] := alt - 13 previous[v] := u - 14 return previous[] -*/ - - - - LinkedList path = new LinkedList(); - PathNode current = stop; - while(true){ - path.addFirst(current); - current = current.getSourceNeighbor(); - if(current.equals(start)){ - path.addFirst(start); - break; - } - } - return path; - } -} diff --git a/src/zutil/algo/QuickSelect.java b/src/zutil/algo/search/QuickSelect.java similarity index 95% rename from src/zutil/algo/QuickSelect.java rename to src/zutil/algo/search/QuickSelect.java index 54edf57..07c1d27 100644 --- a/src/zutil/algo/QuickSelect.java +++ b/src/zutil/algo/search/QuickSelect.java @@ -1,4 +1,4 @@ -package zutil.algo; +package zutil.algo.search; import zutil.algo.sort.sortable.SortableDataList; diff --git a/src/zutil/db/MySQLConnection.java b/src/zutil/db/MySQLConnection.java index de5ecfc..4cb82c6 100644 --- a/src/zutil/db/MySQLConnection.java +++ b/src/zutil/db/MySQLConnection.java @@ -12,14 +12,11 @@ public class MySQLConnection { /** * Connects to a MySQL server - * @param url The URL of the MySQL server - * @param db The database to connect to - * @param user The user name - * @param password The password - * @throws SQLException - * @throws ClassNotFoundException - * @throws IllegalAccessException - * @throws InstantiationException + * + * @param url is the URL of the MySQL server + * @param db is the database to connect to + * @param user is the user name + * @param password is the password */ public MySQLConnection(String url,String db,String user, String password) throws SQLException, InstantiationException, IllegalAccessException, ClassNotFoundException{ Class.forName ("com.mysql.jdbc.Driver").newInstance(); @@ -27,11 +24,11 @@ public class MySQLConnection { } /** - * Runs a query and returns the result - * NOTE: Don't forget to close the ResultSet and the Statement or it can lead to memory leak tex: rows.getStatement().close(); - * @param sql The query to execute - * @return The data that the DB returned - * @throws SQLException + * Runs a query and returns the result.
+ * NOTE: Don't forget to close the ResultSet and the Statement or it can lead to memory leak tex: rows.getStatement().close(); + * + * @param sql is the query to execute + * @return the data that the DB returned */ public synchronized ResultSet returnQuery(String sql) throws SQLException{ Statement s = conn.createStatement (); @@ -41,9 +38,9 @@ public class MySQLConnection { /** * Runs a query in the MySQL server and returns effected rows - * @param sql The query to execute - * @return Number of rows effected - * @throws SQLException + * + * @param sql is the query to execute + * @return the number of rows effected */ public synchronized int updateQuery(String sql) throws SQLException{ Statement s = conn.createStatement (); @@ -53,11 +50,10 @@ public class MySQLConnection { } /** - * Runs a Prepared Statement - * NOTE: Don't forget to close the PreparedStatement or it can lead to memory leak - * @param sql The SQL to run + * Runs a Prepared Statement.
+ * NOTE: Don't forget to close the PreparedStatement or it can lead to memory leak + * @param sql is the SQL query to run * @return The PreparedStatement - * @throws SQLException */ public synchronized PreparedStatement prepareStatement(String sql) throws SQLException{ return conn.prepareStatement(sql); @@ -65,8 +61,6 @@ public class MySQLConnection { /** * Disconnects from the database - * @throws SQLException - * */ public synchronized void close() throws SQLException{ if (conn != null){ diff --git a/src/zutil/math/ZMath.java b/src/zutil/math/ZMath.java index d62aa31..406fbe9 100644 --- a/src/zutil/math/ZMath.java +++ b/src/zutil/math/ZMath.java @@ -1,5 +1,7 @@ package zutil.math; +import java.math.BigInteger; + /** * Very Simple math functions * @@ -8,9 +10,68 @@ package zutil.math; public class ZMath { /** - * Calculates the percentige the value has + * Calculates the percentage of the values */ public static double percent(int min, int max, int value){ return ((double)(value-min)/(max-min))*100; } + + /** + * Solves the equation: x^2 + px + q = 0 + * + * @return the two values of x as an array + */ + public static double[] pqFormula(double p, double q){ + double[] ret = new double[2]; + double t = (p/2); + ret[0] = Math.sqrt( t*t - q ); + ret[1] = -ret[0]; + t *= -1; + ret[0] += t; + ret[1] += t; + return ret; + } + + /** + * Solves the equation: x^2 + px + q = 0. + * WARNING: This uses only BigInteger, thereby removing the decimals in the calculation + * + * @return the two values of x as an array + */ + public static BigInteger[] pqFormula(BigInteger p, BigInteger q){ + BigInteger[] ret = new BigInteger[2]; + BigInteger t = p.divide( BigInteger.valueOf(2) ); + ret[0] = ZMath.sqrt( t.multiply( t ).subtract( q ) ); + ret[1] = ret[0].negate(); + t = t.negate(); + ret[0] = ret[0].add( t ); + ret[1] = ret[1].add( t ); + return ret; + } + + /** + * Calculates the square root of a big number + * + */ + public static BigInteger sqrt(BigInteger value){ + BigInteger op = value; + BigInteger res = BigInteger.ZERO; + BigInteger one = BigInteger.ONE; + + while( one.compareTo( op ) < 0 ){ + one = one.shiftLeft( 2 ); + } + one = one.shiftRight(2); + + while( !one.equals( BigInteger.ZERO ) ){ + if( op.compareTo( res.add( one ) ) >= 0 ){ + op = op.subtract( res.add( one ) ); + res = res.add( one.shiftLeft( 1 ) ); + } + res = res.shiftRight( 1 ); + one = one.shiftRight( 2 ); + } + + return res; + } } diff --git a/src/zutil/network/http/HttpPage.java b/src/zutil/network/http/HttpPage.java index b109266..4cf6854 100644 --- a/src/zutil/network/http/HttpPage.java +++ b/src/zutil/network/http/HttpPage.java @@ -14,11 +14,11 @@ public interface HttpPage{ * This method is called when a client wants a response * from this specific page. * - * @param out The PrintStream to the client - * @param client_info Information about the client - * @param session Session values for the client - * @param cookie Cookie information from the client - * @param request POST and GET requests from the client + * @param out is the PrintStream to the client + * @param client_info is information about the client + * @param session is session values for the client + * @param cookie is cookie information from the client + * @param request is POST and GET requests from the client */ public abstract void respond(HttpPrintStream out, HashMap client_info, diff --git a/src/zutil/network/http/HttpPrintStream.java b/src/zutil/network/http/HttpPrintStream.java index e7d0888..ebbb0c7 100644 --- a/src/zutil/network/http/HttpPrintStream.java +++ b/src/zutil/network/http/HttpPrintStream.java @@ -38,8 +38,8 @@ public class HttpPrintStream extends PrintStream{ /** * Adds a cookie that will be sent to the client * - * @param key The name of the cookie - * @param value The value of the cookie + * @param key is the name of the cookie + * @param value is the value of the cookie * @throws Exception Throws exception if the header has already been sent */ public void setCookie(String key, String value) throws Exception{ @@ -52,7 +52,7 @@ public class HttpPrintStream extends PrintStream{ * Sends the given header directly to the client. * No buffering involved. * - * @param header The header to send + * @param header is the header to send * @throws Exception Throws exception if the header has already been sent */ public void sendHeader(String header) throws Exception{ @@ -62,7 +62,7 @@ public class HttpPrintStream extends PrintStream{ } /** - * prints whit a new line + * Prints with a new line */ public void println(String s){ printOrBuffer(s+"\n"); @@ -97,7 +97,7 @@ public class HttpPrintStream extends PrintStream{ } /** - * Sends out all the buffer and clears it + * Sends out the buffer and clears it */ public void flush(){ if(buffer_enabled){ diff --git a/src/zutil/network/nio/NioNetwork.java b/src/zutil/network/nio/NioNetwork.java index 9e3140e..284a5a1 100644 --- a/src/zutil/network/nio/NioNetwork.java +++ b/src/zutil/network/nio/NioNetwork.java @@ -1,7 +1,6 @@ package zutil.network.nio; import java.io.IOException; -import java.net.ConnectException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -26,6 +25,7 @@ import zutil.network.nio.server.ChangeRequest; import zutil.network.nio.server.ClientData; import zutil.network.nio.worker.SystemWorker; import zutil.network.nio.worker.Worker; +import zutil.struct.DynamicByteArrayStream; public abstract class NioNetwork implements Runnable { @@ -36,7 +36,7 @@ public abstract class NioNetwork implements Runnable { * 2 = message debug * 3 = selector debug */ - public static final int DEBUG = 1; + public static final int DEBUG = 2; public static enum NetworkType {SERVER, CLIENT}; private NetworkType type; @@ -60,7 +60,8 @@ public abstract class NioNetwork implements Runnable { // A list of PendingChange instances private List pendingChanges = new LinkedList(); // Maps a SocketChannel to a list of ByteBuffer instances - private Map> pendingData = new HashMap>(); + private Map> pendingWriteData = new HashMap>(); + private Map pendingReadData = new HashMap(); // The encrypter private Encrypter encrypter; @@ -102,11 +103,11 @@ public abstract class NioNetwork implements Runnable { (encrypter != null ? "Enabled("+encrypter.getAlgorithm()+")" : "Disabled")+"!!"); } - public void send(SocketChannel socket, Object data) { + public void send(SocketChannel socket, Object data) throws IOException{ send(socket, Converter.toBytes(data)); } - public void send(InetSocketAddress address, Object data){ + public void send(InetSocketAddress address, Object data) throws IOException{ send(address, Converter.toBytes(data)); } @@ -140,11 +141,11 @@ public abstract class NioNetwork implements Runnable { protected void queueSend(SocketChannel socket, byte[] data){ if(DEBUG>=3)MultiPrintStream.out.println("Sending Queue..."); // And queue the data we want written - synchronized (pendingData) { - List queue = pendingData.get(socket); + synchronized (pendingWriteData) { + List queue = pendingWriteData.get(socket); if (queue == null) { queue = new ArrayList(); - pendingData.put(socket, queue); + pendingWriteData.put(socket, queue); } //encrypts if(encrypter != null) @@ -266,8 +267,10 @@ public abstract class NioNetwork implements Runnable { key.cancel(); socketChannel.close(); clients.remove(remoteAdr); + pendingReadData.remove(socketChannel); + pendingWriteData.remove(socketChannel); if(DEBUG>=1)MultiPrintStream.out.println("Connection Forced Close("+remoteAdr+")!!! Connection Count: "+clients.size()); - if(type == NetworkType.CLIENT) throw new ConnectException("Server Closed The Connection!!!"); + if(type == NetworkType.CLIENT) throw new IOException("Server Closed The Connection!!!"); return; } @@ -277,8 +280,10 @@ public abstract class NioNetwork implements Runnable { key.channel().close(); key.cancel(); clients.remove(remoteAdr); + pendingReadData.remove(socketChannel); + pendingWriteData.remove(socketChannel); if(DEBUG>=1)MultiPrintStream.out.println("Connection Close("+remoteAdr+")!!! Connection Count: "+clients.size()); - if(type == NetworkType.CLIENT) throw new ConnectException("Server Closed The Connection!!!"); + if(type == NetworkType.CLIENT) throw new IOException("Server Closed The Connection!!!"); return; } @@ -286,8 +291,28 @@ public abstract class NioNetwork implements Runnable { // to the client byte[] rspByteData = new byte[numRead]; System.arraycopy(readBuffer.array(), 0, rspByteData, 0, numRead); + if(encrypter != null)// Encryption + rspByteData = encrypter.decrypt(rspByteData); - handleRecivedMessage(socketChannel, rspByteData); + /* + if(!pendingReadData.containsKey(socketChannel)){ + pendingReadData.put(socketChannel, new DynamicByteArrayStream()); + } + if(encrypter != null)// Encryption + rspByteData = encrypter.decrypt(rspByteData); + + pendingReadData.get(socketChannel).add(rspByteData); + */ + + Object rspData = null; + try{ + rspData = Converter.toObject(rspByteData); + //rspData = Converter.toObject(pendingReadData.get(socketChannel)); + handleRecivedMessage(socketChannel, rspData); + //pendingReadData.get(socketChannel).clear(); + }catch(Exception e){ + e.printStackTrace(); + } } /** @@ -296,18 +321,22 @@ public abstract class NioNetwork implements Runnable { private void write(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); - synchronized (pendingData) { - List queue = pendingData.get(socketChannel); + synchronized (pendingWriteData) { + List queue = pendingWriteData.get(socketChannel); if(queue == null){ queue = new ArrayList(); } + int i = 0; // Write until there's not more data ... while (!queue.isEmpty()) { ByteBuffer buf = queue.get(0); + i += buf.remaining(); socketChannel.write(buf); + i -= buf.remaining(); if (buf.remaining() > 0) { // ... or the socket's buffer fills up + if(DEBUG>=3)MultiPrintStream.out.println("Write Buffer Full!!"); break; } queue.remove(0); @@ -323,14 +352,8 @@ public abstract class NioNetwork implements Runnable { } } - private void handleRecivedMessage(SocketChannel socketChannel, byte[] rspByteData){ - //Encryption - Object rspData; - if(encrypter != null) - rspData = Converter.toObject(encrypter.decrypt(rspByteData)); - else rspData = Converter.toObject(rspByteData); + private void handleRecivedMessage(SocketChannel socketChannel, Object rspData){ if(DEBUG>=2)MultiPrintStream.out.println("Handling incomming message..."); - if(rspData instanceof SystemMessage){ if(systemWorker != null){ if(DEBUG>=3)MultiPrintStream.out.println("System Message!!!"); @@ -347,13 +370,13 @@ public abstract class NioNetwork implements Runnable { worker.processData(this, socketChannel, rspData); } else{ - if(DEBUG>=1)MultiPrintStream.out.println("Unhandled Message!!!"); + if(DEBUG>=1)MultiPrintStream.out.println("Unhandled Worker Message!!!"); } } } /** - * Initializes a socket to the server + * Initializes a socket to a server */ protected SocketChannel initiateConnection(InetSocketAddress address) throws IOException { // Create a non-blocking socket channel diff --git a/src/zutil/network/nio/message/GridMessage.java b/src/zutil/network/nio/message/GridMessage.java new file mode 100644 index 0000000..b21365d --- /dev/null +++ b/src/zutil/network/nio/message/GridMessage.java @@ -0,0 +1,84 @@ +package zutil.network.nio.message; + +public class GridMessage extends Message{ + private static final long serialVersionUID = 1L; + + // Client type messages + /** Computation job return right answer **/ + public static final int COMP_SUCCESSFUL = 1; // + /** Initial static data **/ + public static final int COMP_INCORRECT = 2; // + /** Computation job return wrong answer **/ + public static final int COMP_ERROR = 3; // + /** There was an error computing **/ + public static final int REGISTER = 4; // + /** Register at the server **/ + public static final int UNREGISTER = 5; // + /** Request new computation data **/ + public static final int NEW_DATA = 6; // + + // Server type messages + /** Sending initial static data **/ + public static final int INIT_DATA = 100; + /** Sending new dynamic data **/ + public static final int COMP_DATA = 101; + + + private int type; + private int jobID; + private T data; + + /** + * Creates a new GridMessage + * + * @param type is the type of message + * @param jobID is the id of the job + */ + public GridMessage(int type){ + this(type, 0, null); + } + + /** + * Creates a new GridMessage + * + * @param type is the type of message + * @param jobID is the id of the job + */ + public GridMessage(int type, int jobID){ + this(type, jobID, null); + } + + /** + * Creates a new GridMessage + * + * @param type is the type of message + * @param jobID is the id of the job + * @param data is the data to send with this message + */ + public GridMessage(int type, int jobID, T data){ + this.type = type; + this.jobID = jobID; + this.data = data; + } + + /** + * @return the type of message + */ + public int messageType(){ + return type; + } + + /** + * @return the job id for this message + */ + public int getJobQueueID(){ + return jobID; + } + + /** + * @return the data in this message, may not always carry any data. + */ + public T getData(){ + return data; + } +} diff --git a/src/zutil/network/nio/service/chat/ChatService.java b/src/zutil/network/nio/service/chat/ChatService.java index 2c77f62..bd15027 100644 --- a/src/zutil/network/nio/service/chat/ChatService.java +++ b/src/zutil/network/nio/service/chat/ChatService.java @@ -21,38 +21,42 @@ public class ChatService extends NetworkService{ @Override public void handleMessage(Message message, SocketChannel socket) { - // New message - if(message instanceof ChatMessage){ - ChatMessage chatmessage = (ChatMessage)message; - //is this a new message - if(chatmessage.type == ChatMessage.ChatMessageType.MESSAGE){ - // Is this the server - if(nio.getType() == NioNetwork.NetworkType.SERVER){ - if(rooms.containsKey(chatmessage.room)){ - LinkedList tmpList = rooms.get(chatmessage.room); + try { + // New message + if(message instanceof ChatMessage){ + ChatMessage chatmessage = (ChatMessage)message; + //is this a new message + if(chatmessage.type == ChatMessage.ChatMessageType.MESSAGE){ + // Is this the server + if(nio.getType() == NioNetwork.NetworkType.SERVER){ + if(rooms.containsKey(chatmessage.room)){ + LinkedList tmpList = rooms.get(chatmessage.room); - // Broadcast the message - for(SocketChannel s : tmpList){ - if(s.isConnected()){ - nio.send(s, chatmessage); - } - else{ - unRegisterUser(chatmessage.room, s); + // Broadcast the message + for(SocketChannel s : tmpList){ + if(s.isConnected()){ + nio.send(s, chatmessage); + } + else{ + unRegisterUser(chatmessage.room, s); + } } } } + if(NioNetwork.DEBUG>=2)MultiPrintStream.out.println("New Chat Message: "+chatmessage.msg); + listener.messageAction(chatmessage.msg, chatmessage.room); + } + // register to a room + else if(chatmessage.type == ChatMessage.ChatMessageType.REGISTER){ + registerUser(chatmessage.room, socket); + } + // unregister to a room + else if(chatmessage.type == ChatMessage.ChatMessageType.UNREGISTER){ + unRegisterUser(chatmessage.room, socket); } - if(NioNetwork.DEBUG>=2)MultiPrintStream.out.println("New Chat Message: "+chatmessage.msg); - listener.messageAction(chatmessage.msg, chatmessage.room); - } - // register to a room - else if(chatmessage.type == ChatMessage.ChatMessageType.REGISTER){ - registerUser(chatmessage.room, socket); - } - // unregister to a room - else if(chatmessage.type == ChatMessage.ChatMessageType.UNREGISTER){ - unRegisterUser(chatmessage.room, socket); } + } catch (Exception e) { + e.printStackTrace(); } } diff --git a/src/zutil/network/nio/worker/EchoWorker.java b/src/zutil/network/nio/worker/EchoWorker.java index 2156eb3..606497c 100644 --- a/src/zutil/network/nio/worker/EchoWorker.java +++ b/src/zutil/network/nio/worker/EchoWorker.java @@ -1,14 +1,20 @@ package zutil.network.nio.worker; +import java.io.IOException; + import zutil.MultiPrintStream; public class EchoWorker extends ThreadedEventWorker { @Override public void messageEvent(WorkerDataEvent dataEvent) { - // Return to sender - MultiPrintStream.out.println("Recived Msg: "+dataEvent.data); - dataEvent.network.send(dataEvent.socket, dataEvent.data); + try { + // Return to sender + MultiPrintStream.out.println("Recived Msg: "+dataEvent.data); + dataEvent.network.send(dataEvent.socket, dataEvent.data); + } catch (IOException e) { + e.printStackTrace(); + } } diff --git a/src/zutil/network/nio/worker/SystemWorker.java b/src/zutil/network/nio/worker/SystemWorker.java index f1b81b7..ca8239e 100644 --- a/src/zutil/network/nio/worker/SystemWorker.java +++ b/src/zutil/network/nio/worker/SystemWorker.java @@ -33,27 +33,31 @@ public class SystemWorker extends ThreadedEventWorker { @Override public void messageEvent(WorkerDataEvent event) { - if(NioNetwork.DEBUG>=2)MultiPrintStream.out.println("System Message: "+event.data.getClass().getName()); - if(event.data instanceof Message){ - if(event.data instanceof EchoMessage && ((EchoMessage)event.data).echo()){ - // Echos back the recived message - ((EchoMessage)event.data).recived(); - if(NioNetwork.DEBUG>=3)MultiPrintStream.out.println("Echoing Message: "+event.data); - nio.send(event.socket, event.data); - } - else if(event.data instanceof ResponseRequestMessage && - rspEvents.get(((ResponseRequestMessage)event.data).getResponseId()) != null){ - // Handle the response - handleResponse(((ResponseRequestMessage)event.data).getResponseId(), event.data); - if(NioNetwork.DEBUG>=3)MultiPrintStream.out.println("Response Request Message: "+event.data); - } - else{ - //Services - if(services.containsKey(event.data.getClass()) || - !services.containsKey(event.data.getClass()) && defaultServices(event.data)){ - services.get(event.data.getClass()).handleMessage((Message)event.data, event.socket); + try { + if(NioNetwork.DEBUG>=2)MultiPrintStream.out.println("System Message: "+event.data.getClass().getName()); + if(event.data instanceof Message){ + if(event.data instanceof EchoMessage && ((EchoMessage)event.data).echo()){ + // Echos back the recived message + ((EchoMessage)event.data).recived(); + if(NioNetwork.DEBUG>=3)MultiPrintStream.out.println("Echoing Message: "+event.data); + nio.send(event.socket, event.data); + } + else if(event.data instanceof ResponseRequestMessage && + rspEvents.get(((ResponseRequestMessage)event.data).getResponseId()) != null){ + // Handle the response + handleResponse(((ResponseRequestMessage)event.data).getResponseId(), event.data); + if(NioNetwork.DEBUG>=3)MultiPrintStream.out.println("Response Request Message: "+event.data); + } + else{ + //Services + if(services.containsKey(event.data.getClass()) || + !services.containsKey(event.data.getClass()) && defaultServices(event.data)){ + services.get(event.data.getClass()).handleMessage((Message)event.data, event.socket); + } } } + } catch (Exception e) { + e.printStackTrace(); } } diff --git a/src/zutil/network/nio/worker/grid/GridClient.java b/src/zutil/network/nio/worker/grid/GridClient.java new file mode 100644 index 0000000..f8f0c95 --- /dev/null +++ b/src/zutil/network/nio/worker/grid/GridClient.java @@ -0,0 +1,114 @@ +package zutil.network.nio.worker.grid; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.Queue; + +import zutil.MultiPrintStream; +import zutil.network.nio.NioClient; +import zutil.network.nio.message.GridMessage; +import zutil.network.nio.worker.ThreadedEventWorker; +import zutil.network.nio.worker.WorkerDataEvent; + +/** + * This class is the client part of the grid. + * It connects to a grid server and requests new job. + * And then sends back the result to the server. + * + * @author Ziver + */ +@SuppressWarnings("unchecked") +public class GridClient extends ThreadedEventWorker { + private static LinkedList jobQueue; + private static GridThread thread; + private static NioClient network; + + /** + * Creates a new GridClient object and registers itself at the server + * and sets itself as a worker in NioClient + * + * @param thread the Thread interface to run for the jobs + * @param network the NioClient to use to communicate to the server + */ + public GridClient(GridThread thread, NioClient network){ + jobQueue = new LinkedList(); + GridClient.thread = thread; + GridClient.network = network; + + } + + /** + * Starts up the client and a couple of GridThreads. + * And registers itself as a worker in NioClient + * @throws IOException + */ + public void initiate() throws IOException{ + network.setDefaultWorker(this); + network.send(new GridMessage(GridMessage.REGISTER)); + + for(int i=0; i { + /** + * @return static and final values that do not change for every job + */ + public Object initValues(); + /** + * @return a new generated job + */ + public T generateJob(); +} diff --git a/src/zutil/network/nio/worker/grid/GridResultHandler.java b/src/zutil/network/nio/worker/grid/GridResultHandler.java new file mode 100644 index 0000000..c0e1a08 --- /dev/null +++ b/src/zutil/network/nio/worker/grid/GridResultHandler.java @@ -0,0 +1,10 @@ +package zutil.network.nio.worker.grid; + +/** + * Handles the incoming results from the grid + * + * @author Ziver + */ +public interface GridResultHandler { + public void resultEvent(int jobID, boolean correct, T result); +} diff --git a/src/zutil/network/nio/worker/grid/GridServerWorker.java b/src/zutil/network/nio/worker/grid/GridServerWorker.java new file mode 100644 index 0000000..6819686 --- /dev/null +++ b/src/zutil/network/nio/worker/grid/GridServerWorker.java @@ -0,0 +1,111 @@ +package zutil.network.nio.worker.grid; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Queue; + +import zutil.network.nio.message.GridMessage; +import zutil.network.nio.worker.ThreadedEventWorker; +import zutil.network.nio.worker.WorkerDataEvent; + +/** + * Implements a simple network computing server + * + * @author Ziver + */ +@SuppressWarnings("unchecked") +public class GridServerWorker extends ThreadedEventWorker{ + // Job timeout after 30 min + public static int JOB_TIMEOUT = 1000*60*30; + + private HashMap jobs; // contains all the ongoing jobs + private Queue reSendjobQueue; // Contains all the jobs that will be recalculated + private GridJobGenerator jobGenerator; // The job generator + private GridResultHandler resHandler; + private int nextJobID; + + public GridServerWorker(GridResultHandler resHandler, GridJobGenerator jobGenerator){ + this.resHandler = resHandler; + this.jobGenerator = jobGenerator; + nextJobID = 0; + + jobs = new HashMap(); + reSendjobQueue = new LinkedList(); + GridMaintainer maintainer = new GridMaintainer(); + maintainer.start(); + } + + @Override + public void messageEvent(WorkerDataEvent e) { + try { + // ignores other messages than GridMessage + if(e.data instanceof GridMessage){ + GridMessage msg = (GridMessage)e.data; + GridJob job = null; + + switch(msg.messageType()){ + case GridMessage.REGISTER: + e.network.send(e.socket, new GridMessage(GridMessage.INIT_DATA, 0, jobGenerator.initValues())); + break; + // Sending new data to compute to the client + case GridMessage.NEW_DATA: + if(!reSendjobQueue.isEmpty()){ // checks first if there is a job for recalculation + job = reSendjobQueue.poll(); + job.renewTimeStamp(); + } + else{ // generates new job + job = new GridJob(nextJobID, + jobGenerator.generateJob()); + jobs.put(job.jobID, job); + nextJobID++; + } + GridMessage newMsg = new GridMessage(GridMessage.COMP_DATA, job.jobID, job.job); + e.network.send(e.socket, newMsg); + break; + + // Received computation results + case GridMessage.COMP_SUCCESSFUL: + resHandler.resultEvent(msg.getJobQueueID(), true, msg.getData()); + break; + case GridMessage.COMP_INCORRECT: + resHandler.resultEvent(msg.getJobQueueID(), false, msg.getData()); + break; + case GridMessage.COMP_ERROR: // marks the job for recalculation + job = jobs.get(msg.getJobQueueID()); + reSendjobQueue.add(job); + break; + } + } + } catch (IOException e1) { + e1.printStackTrace(); + } + } + + /** + * Changes the job timeout value + * @param min is the timeout in minutes + */ + public static void setJobTimeOut(int min){ + JOB_TIMEOUT = 1000*60*min; + } + + class GridMaintainer extends Thread{ + /** + * Runs some behind the scenes stuff + * like job timeout. + */ + public void run(){ + while(true){ + long time = System.currentTimeMillis(); + for(int jobID : jobs.keySet()){ + if(time-jobs.get(jobID).timestamp > JOB_TIMEOUT){ + reSendjobQueue.add(jobs.get(jobID)); + } + } + try{Thread.sleep(1000*60*1);}catch(Exception e){}; + } + } + } +} + diff --git a/src/zutil/network/nio/worker/grid/GridThread.java b/src/zutil/network/nio/worker/grid/GridThread.java new file mode 100644 index 0000000..25aa321 --- /dev/null +++ b/src/zutil/network/nio/worker/grid/GridThread.java @@ -0,0 +1,38 @@ +package zutil.network.nio.worker.grid; + +/** + * This interface is the thread that will do + * all the computation in the grid + * + * @author Ziver + */ +public abstract class GridThread implements Runnable{ + /** + * The initial static and final data will be sent to this + * method. + * + * @param data is the static and or final data + */ + public abstract void setInitData(Object data); + + public void run(){ + while(true){ + GridJob tmp = null; + try { + tmp = GridClient.getNextJob(); + compute(tmp); + } catch (Exception e) { + e.printStackTrace(); + if(tmp != null){ + GridClient.jobError(tmp.jobID); + } + } + } + } + + /** + * Compute the given data and return + * @param data + */ + public abstract void compute(GridJob data) throws Exception; +} diff --git a/src/zutil/struct/BloomFilter.java b/src/zutil/struct/BloomFilter.java index 2978d3e..0a669d5 100644 --- a/src/zutil/struct/BloomFilter.java +++ b/src/zutil/struct/BloomFilter.java @@ -41,12 +41,16 @@ public class BloomFilter implements Set, Serializable * @return If the optimal size has been reached */ public boolean add(T e) { - content_size++; - int hash = 0; - for(int i=0; i implements Set, Serializable * if the Object is not Serializable */ public boolean contains(Object o) { - if(!(o instanceof Serializable))return false; - int hash = 0; - for(int i=0; i bytes; + /** The current size of the stream */ + private int size; + /** points to the current index in the Arraylist */ + private int globalPointer; + /** points localy in the current index in the ArrayList */ + private int localPointer; + /** The current position */ + private int currentPos; + + /** + * Create a new instance of DynamicByteArrayStream + */ + public DynamicByteArrayStream(){ + bytes = new ArrayList(); + size = 0; + globalPointer = 0; + localPointer = 0; + currentPos = 0; + } + + /** + * Append an byte array to the stream + * @param b The byte array to add + */ + public synchronized void add(byte[] b){ + bytes.add(b); + size += b.length; + } + + /** + * Clears this stream from the byte arrays + */ + public synchronized void clear(){ + size = 0; + globalPointer = 0; + localPointer = 0; + currentPos = 0; + + bytes.clear(); + } + + @Override + public synchronized int read() throws IOException { + if(currentPos >= size){ + return -1; + } + int ret = bytes.get(globalPointer)[localPointer] & 0xff; + currentPos++; + localPointer++; + if(localPointer >= bytes.get(globalPointer).length){ + globalPointer++; + localPointer = 0; + } + return ret; + } +/* + public synchronized int read(byte b[], int off, int len) { + System.out.println("read off:"+off+" len: "+len); + if(currentPos+off >= size){ + return -1; + } + off += localPointer; + while(off>0){ + if(bytes.get(globalPointer).length < off){ + globalPointer++; + off -= bytes.get(globalPointer).length; + } + else break; + } + + int length; + int oldLen = len; + while(len > 0){ + length = bytes.get(globalPointer).length; + System.arraycopy(b, 0, bytes.get(globalPointer), 0, (length 0) globalPointer++; + if(bytes.size() <= globalPointer) break; + } + localPointer = 0; + currentPos += ( len<0 ? oldLen : oldLen-len); + return ( len<0 ? oldLen : oldLen-len); + }*/ + + public synchronized int available() { + return size - currentPos; + } + + public synchronized void reset() { + globalPointer = 0; + localPointer = 0; + currentPos = 0; + } + + public void close() throws IOException { + } +} diff --git a/src/zutil/test/QuickSelectTest.java b/src/zutil/test/QuickSelectTest.java index db41927..43b3da8 100644 --- a/src/zutil/test/QuickSelectTest.java +++ b/src/zutil/test/QuickSelectTest.java @@ -2,7 +2,7 @@ package zutil.test; import java.util.Arrays; -import zutil.algo.QuickSelect; +import zutil.algo.search.QuickSelect; import zutil.algo.sort.sortable.SortableIntArray; public class QuickSelectTest {