//package PA2; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketException; import java.util.Arrays; import java.util.Random; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.ConsoleHandler; import java.util.logging.Level; import java.util.logging.Logger; /** @author V. Arun */ /* This ChannelEmulator class implements a UDP "chat server" that can * (1) echo messages, (2) accept control commands to register a user name * or change channel properties like loss, corruption probability, or delay, * and (3) relay messages to other users. It internally invokes a Timer * thread to delay messages and to periodically garbage collect state * left by users after a period of inactivity. * * Supports the following optional command-line arguments: * -PM privileged_user_name -P port_number - L loss_rate * -D delay_secs -R delay_dev_ratio -C corruption_rate */ public class ChannelEmulator { private static int PORT = 8888; private static final int MAX_MSG_SIZE = 2048; private static final long MAX_INACTIVE_TIME = 1800000; // milliseconds after which user state is garbage collected private static final int MAX_Q_SIZE = 8; // max number of outstanding segments per client private static final int MAX_TQ_SIZE = 10000; // max number of total outstanding segments private static final int MAX_MAP_SIZE = 1000000; // max number of client state entries // Default channel parameters private static double LOSS = 0.1; // loss rate private static double DELAY = 0.1; // delay in seconds private static double DELAY_DEV_RATIO = 1.0; // ratio of deviation to average delay private static double CORRUPTION = 0.01; // probability of corruption of exactly one byte in each 100 byte block private static boolean PRIVILEGED_MODE=false; // If true, only privileged_user can set CHNL parameters private static String privileged_user=null; private DatagramSocket udpsock = null; private int TQSize = 0; // total number of outstanding segments /* The two hashmaps below need to be ConcurrentHashMaps as they are modified by * the main thread as well as the garbage collector thread. */ private ConcurrentHashMap sockToCinfo=null; // [IP,port] -> channel info private ConcurrentHashMap nameToSock=null; // name -> [IP,port] private GarbageCollector GC=null; // Garbage collects state left by ungraceful client exits private Timer timer=null; // Used by GarbageCollector and Delayer to schedule future events private Random random=null; // Used for implementing channel characteristics private int MAX_LOGMSG_SIZE = 64; private static Logger log = Logger.getLogger(ChannelEmulator.class.getName()); /* Local structure to store info about user name or customized * channel info (e.g., loss, delay, peer relaying). */ private class ChannelInfo { public double loss=LOSS; public double delay=DELAY; public double delay_dev_ratio=DELAY_DEV_RATIO; // ratio of deviation to average delay public double corruption=CORRUPTION; public String name="DEFAULT"; // Name of user public String peer=null; // Name of peer to which relaying public long lastActive; // time in milliseconds, used for garbage collection upon inactivity private int qSize=0; // number of buffered packets at server public synchronized int incrQSize() {return ++qSize;} // called by main and Delayer threads public synchronized int decrQSize() {return --qSize;} // called by main and Delayer threads public synchronized int getQSize() {return qSize;} } /* This timertask periodically cleans up state left by users who did * not cleanly send a QUIT command before exiting. */ private class GarbageCollector extends TimerTask { public void run() { log.fine(printStats()); for(InetSocketAddress isaddr: sockToCinfo.keySet()) { ChannelInfo cinfo = sockToCinfo.get(isaddr); if(cinfo!=null && cinfo.lastActive(); nameToSock = new ConcurrentHashMap(); timer = new Timer(); random = new Random(); GC = new GarbageCollector(); } /* Called by both the main thread and the GC thread. Needs to be * synchronized as, otherwise, interleaving between the GC calling * this method and the main thread calling processAsControlMessage's * setName method below to assign a name can leave inconsistent * entries in sockToCinfo and nameToSock. */ private synchronized void cleanup(InetSocketAddress isaddr) { ChannelInfo cinfo = isaddr!=null ? sockToCinfo.get(isaddr) : null; if(cinfo!=null) { sockToCinfo.remove(isaddr); if(cinfo.name!=null) nameToSock.remove(cinfo.name); } } /* Synchronization here as well as in the cleanup method ensures the invariant * that for any sockaddr, if sockToCinfo.get(sockaddr).name = name, then * nameToSock.get(name) = sockaddr, and vice versa. */ private synchronized void setName(String name, InetSocketAddress isaddr) { assert(isaddr!=null && name!=null); ChannelInfo cinfo = sockToCinfo.get(isaddr); if(cinfo==null) cinfo = new ChannelInfo(); else nameToSock.remove(cinfo.name); nameToSock.put(name, isaddr); cinfo.name = name; sockToCinfo.put(isaddr, cinfo); } private synchronized int incrTQSize() {return ++TQSize;} private synchronized int decrTQSize() {return --TQSize;} private synchronized int getTQSize() {return TQSize;} private synchronized void clearMaps() {sockToCinfo.clear();nameToSock.clear();} /* Start of methods to mangle packets being relayed by the channel. */ private byte[] lose(byte[] msg, ChannelInfo cinfo) { if(msg==null) return null; double loss = (cinfo!=null ? cinfo.loss : LOSS); if(random.nextDouble() < loss) msg = null; return msg; } // Randomly corrupts a byte in each 100 byte block with corruption probability private byte[] corrupt(byte[] msg, ChannelInfo cinfo) { if(msg==null) return null; double corruption = (cinfo!=null ? cinfo.corruption : CORRUPTION); for(int i=0; i1 && (room=haveRoom(cinfo))) { if(cinfo==null) cinfo = new ChannelInfo(); cinfo.peer = (msg.split("\\s",2))[1]; InetSocketAddress peerSock = getPeerSock(cinfo.peer); /* If no sockaddr stored for peer but can parse sockaddr * in peer, use the parsed sockaddr as peer instead. That * is, peer is either a name that is mapped to a sockaddr * or it is a sockaddr itself. */ if(nameToSock.get(cinfo.peer)==null && peerSock!=null) cinfo.peer = peerSock.toString(); sockToCinfo.put(isaddr, cinfo); response = "OK Relaying to " + cinfo.peer + ((peerSock==null || sockToCinfo.get(peerSock)==null) ? " who is probably offline" : (!cinfo.peer.equals(peerSock.toString()) ? " at " + peerSock :"")); } /* The privileged_mode allows only the privileged_user to set channel * parameters and sets it for all users. */ else if(cmd.equals("CHNL") && parts.length>=3 && allowCHNL(dgram) && (room=haveRoom(cinfo))) { if(cinfo==null) cinfo = new ChannelInfo(); boolean parsed=true; for(int i=1; i1) { try { InetAddress IP = InetAddress.getByName(parts[parts.length-2]); int port = Integer.valueOf(parts[parts.length-1]); isaddr = new InetSocketAddress(IP, port); } catch(Exception e) { // do nothing as null will (and should) be returned anyway log.finer("Incorrectly formatter peer name: " + name); } } } return isaddr; } // Affix peer IP:port to received datagram to make relayable datagram private DatagramPacket getRelayMessage(DatagramPacket dgram) { InetSocketAddress sndsock = (InetSocketAddress) (dgram.getSocketAddress()); ChannelInfo cinfo = sndsock!=null ? sockToCinfo.get(sndsock) : null; if(cinfo!=null && cinfo.peer!=null) { InetSocketAddress rcvsock = getPeerSock(cinfo.peer); if(rcvsock!=null) { dgram.setAddress(rcvsock.getAddress()); dgram.setPort(rcvsock.getPort()); } else dgram=null; } return dgram; } private String truncate(DatagramPacket dgram) { return truncate(new String(dgram.getData(), 0, dgram.getLength())); } private String truncate(String msg) { int length = Math.min(MAX_LOGMSG_SIZE, msg.length()); return msg.substring(0, length) + (length "+truncate(response)); dgram.setData(response.getBytes()); send(dgram); } // else check for relaying else if (isRelaying(dgram)){ log.fine("Relay message from " + sender + " to " + getPeerString(dgram) + " : " + truncate(dgram)); InetSocketAddress isaddr = (InetSocketAddress)dgram.getSocketAddress(); mangle(getRelayMessage(dgram), isaddr); } // else simply echo back to sender else { send(dgram); // simply echo datagram by default log.fine("Echo message from/to " + sender + ": " + truncate(dgram)); } } catch(Exception e) { log.warning("Exception #"+num_excepts+": "+e); try {if(++num_excepts>3) tryRecover();} // A hail mary pass before giving up catch(Exception efatal) { log.severe("Unable to recover from Exception, giving up: " + efatal); } } } } /* Sets logging level and invokes run() */ public static void main(String[] args) throws SocketException, IOException { ConsoleHandler ch = new ConsoleHandler(); ch.setLevel(Level.FINEST); log.addHandler(ch); log.setLevel(Level.FINEST); log.setUseParentHandlers(false); ChannelEmulator chem = new ChannelEmulator(args); log.info("Starting chat server at " + InetAddress.getLocalHost() + PORT + chem.printParams(null)+"\n"); chem.run(); } }