Building Dynamic Fail-Over Java Servers by Chang Sau Sheong Listing One public class Server { ... public static void main(String[] args) throws IOException { // use the current cluster status Status status = Status.getInstance(); Config config = Config.getInstance(); ... status.setRunningServer(server_ip); status.setRunningPort(server_port); if (_isPrimary) { status.promote(); } status.addServer(server_ip, server_port); ... // create server-side TCP socket to listen to port ServerSocket serverSocket = null; boolean listening = true; // create server Socket try { serverSocket = new ServerSocket(status.getPort(status.getRunningServer()), _SOCKET_BUFFER, InetAddress.getByName(status.getRunningServer())); } catch (Exception e) { ... System.exit(-1); } Listing Two // Create the synchronizer to synchronize between servers Synchronizer synchronizer = new Synchronizer(); String appClassName = config.getAppClassName(); Application app; try { app = (Application)Class.forName(appClassName).newInstance(); } ... // listen on sockets from incoming connections from clients while (listening) { synchronizer.sync(); (new MultiServerThread(serverSocket.accept())).start(); if (status.isPrimary()) { System.out.println("*** PRIMARY ***"); app.activate(); } } // close server socket serverSocket.close(); ... Listing Three public void run() { try { PrintWriter out = new PrintWriter(_socket.getOutputStream(), true); BufferedReader in = new BufferedReader(new InputStreamReader(_socket.getInputStream())); String inputLine = new String(); while ((inputLine = in.readLine()) != null) { StringTokenizer st = new StringTokenizer(inputLine, " "); String command = st.nextToken(); Vector parameters = new Vector(); while (st.hasMoreTokens()) { parameters.add(st.nextElement()); } ... // request for information else if (command.equalsIgnoreCase("ask")) { if (!parameters.isEmpty()) { out.println(doAsk(parameters)); } else { out.println("##Error: Try ask "); } } else { out.println("##Error: No such command."); } // end request for information } // end while loop Listing Four ... /** Asking for information */ private String doAsk(Vector parameters) { // get the ask command String command = (String)parameters.get(0); if (command.equalsIgnoreCase("primary")) { return status.getPrimaryServer(); } else if (command.equalsIgnoreCase("isAlive")) { return "*"; } else if (command.equalsIgnoreCase("info")) { return "elipva Xander Server version 1.0."; } ... Listing Five public class Synchronizer { ... public void sync() { try { double randNo = (new Random((Calendar.getInstance()) .getTime().getTime())).nextDouble() * config.getWaitTimer(); double s = (new Double((Double.toString(randNo)) .substring(0,5))).doubleValue(); // declare own existence (new DeclareSelfThread()).start(); wait(s); // check for other servers in the cluster (new CheckServersThread()).start(); wait(s); // if this is a primary, declare itself as primary if (status.isPrimary()) { (new DeclarePrimaryThread()).start(); wait(s); } // check if a primary server exists (new CheckPrimaryThread()).start(); wait(s); // check servers in the cluster, if down, remove it (new PingThread()).start(); wait(s); } ... Listing Six ... public void run() { Hashtable servers = status.getServers(); if (servers != null) { Writer out = null; Enumeration hosts = servers.keys(); while (hosts.hasMoreElements()) { String host = (String)hosts.nextElement(); try { _socket = new Socket(host,((Integer)servers.get(host)).intValue()); Synchronizer.serverCount.put(host, new Integer(0)); } catch (IOException e) { int count = 0; try { count = ((Integer)Synchronizer.serverCount.get(host)).intValue(); } catch (NullPointerException ne) {} count++; int maxRetries = config.getMaxServerRetry(); if (count > config.getMaxServerRetry()) { status.removeServer(host); Synchronizer.serverCount.remove(host); } else { Synchronizer.serverCount.put(host, new Integer(count)); } } ... Listing Seven ... int portOut = config.getDiscoveryPort(); byte ttl = (byte) 1; // 1 byte ttl for subnet only InetAddress iaOut = null; try { iaOut = InetAddress.getByName(config.getDiscoveryServer()); } catch (Exception e) { System.err.println("### Error : error contacting multicast port. Shutting down server."); e.printStackTrace(); System.exit(-1); } String clusterStatus = status.getServersAsString(); System.out.println("status >> " + clusterStatus); byte [] dataOut = (status.getRunningServer() + ":" + status.getRunningPort()).getBytes(); DatagramPacket dpOut = new DatagramPacket(dataOut, dataOut.length, iaOut, portOut); try { MulticastSocket msOut = new MulticastSocket(portOut); msOut.joinGroup(iaOut); msOut.send(dpOut, ttl); msOut.leaveGroup(iaOut); msOut.close(); } catch (SocketException e) { System.err.println(e); e.printStackTrace(); } ... Listing Eight ... MulticastSocket socket = null; try { socket = new MulticastSocket(config.getDiscoveryPort()); InetAddress address=InetAddress.getByName(config.getDiscoveryServer()); DatagramPacket packet; byte[] buf = new byte[256]; packet = new DatagramPacket(buf, buf.length); socket.setSoTimeout(config.getTimeout()); socket.joinGroup(address); socket.receive(packet); String data = new String(packet.getData()); StringTokenizer st = new StringTokenizer(data, ":"); String host = st.nextToken(); int port = Integer.parseInt((st.nextToken()).trim()); status.addServer(host, port); socket.leaveGroup(address); socket.close(); } catch (InterruptedIOException e) { System.out.println("### timed out."); socket.close(); } ... Listing Nine ... try { msIn.receive(dpIn); String primary = (new String(dpIn.getData())).trim(); if (status.isPrimary() && !primary.equals(status.getRunningServer())) { System.out.println("### More than 1 primary found.Demoting this server."); status.demote(); } Synchronizer.primaryCount = 0; } catch (InterruptedIOException e) { Synchronizer.primaryCount++; System.out.println("### Timing out the primary server : " + Synchronizer.primaryCount); if (Synchronizer.primaryCount > config.getMaxPrimaryRetry()) { status.promote(); } } ... Listing Ten package xander; /** Application is an interface that is used to * allow Xander to call the application that it runs. The default * implementation of an Application is the Scheduler. * last modified - 6 April 2001
version 1.0
* author Chang Sau Sheong */ public interface Application { public void activate(); } Listing Eleven public class Scheduler implements Application { int _PERIOD = 10; // period in seconds Date _start = new Date(); public void activate() { Date now = new Date(); long start_millis = _start.getTime(); long now_millis = now.getTime(); if (now_millis > (start_millis + (_PERIOD * 1000))) { _start = now; System.out.println("DING DONG!!!"); } } } Listing Twelve 230.0.0.1 4446 225.0.0.2 4000 15 15 1.5 5 Default Scheduler Application xander.Scheduler 6