不知道前主程是处于什么目的,总之我接手这套程序的时候,出现了超级多的问题,也发现了超级多的问题。
比如说吧,接受网络消息逻辑是线程独立的,而发送消息给客户端缺阻塞在了逻辑线程里面;原本可以放在一个进程里面处理的逻辑,却分散在了四个进程里面去处理,导致我完成一个功能,大部分时间要话费了进程之间的玩家信息的同步上面,在我无法忍受的情况下,我终于是用NIO将网络底层从新写了,而且将四个进程合并,但是在很多逻辑上还是尽量保持了和原逻辑处理的兼容!
先说说这个重构的底层吧!
我们看下最重要的ClientHandle类,主要处理每个连接的收发数据的!
public class ClientHandle implements ISession{ public final static int RW_BUFFER_SIZE = 1024; private SocketChannel socket = null; private java.nio.ByteBuffer reader = java.nio.ByteBuffer.allocate(RW_BUFFER_SIZE); private java.nio.ByteBuffer writer = java.nio.ByteBuffer.allocate(4*RW_BUFFER_SIZE); BlockingQueuewriteQueue = new LinkedBlockingQueue (); private IPlayer player = null; private boolean active = false;
包含SocketChannel对象不用说了,reader和writer是用来做消息收发的缓冲的,因为服务器广播的压力会大一些,所以将writer的大小设置为reader的4倍,当然这个可以调整。
writeQueue是用来存储需要发送给客户端的ByteBuffer,每次在这个链接可以写数据的时候,就将writeQueue里面存储的数据转移到writer中,并且一次发送,减少了writer的系统调用次数。ByteBuffer的结构简单说下,不同于java.nio.ByteBuffer,而是自己封装的一个消息解析器,给出源代码
package NetBase;/** * 类说明:字节缓存类,字节操作高位在前,低位在后 * * @version 1.0 * @author fxxxysh*/public class ByteBuffer{ /* static fields */ /** 默认的初始容量大小 */ public static final int CAPACITY = 32; /** 默认的动态数据或文字的最大长度,400k */ public static final int MAX_DATA_LENGTH = 400 * 1024; /* fields */ /** 字节数组 */ byte[] bytes; /** 字节缓存的长度 */ int top; /** 字节缓存的偏移量 */ int offset; /* constructors */ /** 按默认的大小构造一个字节缓存对象 */ public ByteBuffer() { this(CAPACITY); } /** 按指定的大小构造一个字节缓存对象 */ public ByteBuffer(int capacity) { if (capacity < 1) throw new IllegalArgumentException(getClass().getName() + " , invalid capatity:" + capacity); bytes = new byte[capacity]; top = 0; offset = 0; } /** 按指定的字节数组构造一个字节缓存对象 */ public ByteBuffer(byte[] data) { if (data == null) throw new IllegalArgumentException(getClass().getName() + " , null data"); bytes = data; top = data.length; offset = 0; } /** 按指定的字节数组构造一个字节缓存对象 */ public ByteBuffer(byte[] data, int index, int length) { if (data == null) throw new IllegalArgumentException(getClass().getName() + " , null data"); if (index < 0 || index > data.length) throw new IllegalArgumentException(getClass().getName() + " , invalid index:" + index); if (length < 0 || data.length < index + length) throw new IllegalArgumentException(getClass().getName() + " , invalid length:" + length); bytes = data; top = index + length; offset = index; } /* properties */ /** 得到字节缓存的容积 */ public int capacity() { return bytes.length; } /** 设置字节缓存的容积,只能扩大容积 */ public void setCapacity(int len) { int c = bytes.length; if (len <= c) return; for (; c < len; c = (c << 1) + 1) ; byte[] temp = new byte[c]; System.arraycopy(bytes, 0, temp, 0, top); bytes = temp; } /** 得到字节缓存的长度 */ public int top() { return top; } /** 设置字节缓存的长度 */ public void setTop(int top) { if (top < offset) throw new IllegalArgumentException(this + " setTop, invalid top:" + top); if (top > bytes.length) setCapacity(top); this.top = top; } /** 得到字节缓存的偏移量 */ public int offset() { return offset; } /** 设置字节缓存的偏移量 */ public void setOffset(int offset) { if (offset < 0 || offset > top) throw new IllegalArgumentException(this + " setOffset, invalid offset:" + offset); this.offset = offset; } /** 得到字节缓存的使用长度 */ public int length() { return top - offset; } /** 得到字节缓存的字节数组,一般使用toArray()方法 */ public byte[] getByteArray() { return bytes; } /* methods */ /* byte methods */ /** 得到指定偏移位置的字节 */ public byte read(int pos) { return bytes[pos]; } /** 设置指定偏移位置的字节 */ public void write(int b, int pos) { bytes[pos] = (byte) b; } /* read methods */ /** * 按当前偏移位置读入指定的字节数组 * * @param data * 指定的字节数组 * @param pos * 指定的字节数组的起始位置 * @param len * 读入的长度 */ public void read(byte[] data, int pos, int len) { System.arraycopy(bytes, offset, data, pos, len); offset += len; } /** 读出一个布尔值 */ public boolean readBoolean() { return (bytes[offset++] != 0); } /** 读出一个字节 */ public byte readByte() { return bytes[offset++]; } /** 读出一个无符号字节 */ public int readUnsignedByte() { return bytes[offset++] & 0xff; } /** 读出一个字符 */ public char readChar() { return (char) readUnsignedShort(); } /** 读出一个短整型数值 */ public short readShort() { return (short) readUnsignedShort(); } /** 读出一个无符号的短整型数值 */ public int readUnsignedShort() { int pos = offset; offset += 2; return (bytes[pos + 1] & 0xff) + ((bytes[pos] & 0xff) << 8); } /** 读出一个整型数值 */ public int readInt() { int pos = offset; offset += 4; return (bytes[pos + 3] & 0xff) + ((bytes[pos + 2] & 0xff) << 8) + ((bytes[pos + 1] & 0xff) << 16) + ((bytes[pos] & 0xff) << 24); } /** 读出一个浮点数值 */ public float readFloat() { return Float.intBitsToFloat(readInt()); } /** 读出一个长整型数值 */ public long readLong() { int pos = offset; offset += 8; return (bytes[pos + 7] & 0xffL) + ((bytes[pos + 6] & 0xffL) << 8) + ((bytes[pos + 5] & 0xffL) << 16) + ((bytes[pos + 4] & 0xffL) << 24) + ((bytes[pos + 3] & 0xffL) << 32) + ((bytes[pos + 2] & 0xffL) << 40) + ((bytes[pos + 1] & 0xffL) << 48) + ((bytes[pos] & 0xffL) << 56); } /** 读出一个双浮点数值 */ public double readDouble() { return Double.longBitsToDouble(readLong()); } /** * 读出动态长度, 数据大小采用动态长度,整数类型下,最大为512M 1xxx,xxxx表示(0~0x80) 0~128B * 01xx,xxxx,xxxx,xxxx表示(0~0x4000) 0~16K * 001x,xxxx,xxxx,xxxx,xxxx,xxxx,xxxx,xxxx表示(0~0x20000000) 0~512M */ public int readLength() { int n = bytes[offset] & 0xff; if (n >= 0x80) { offset++; return n - 0x80; } else if (n >= 0x40) return readUnsignedShort() - 0x4000; else if (n >= 0x20) return readInt() - 0x20000000; else throw new IllegalArgumentException(this + " readLength, invalid number:" + n); } /** 读出一个指定长度的字节数组,可以为null */ public byte[] readData() { int len = readLength() - 1; if (len < 0) return null; if (len > MAX_DATA_LENGTH) throw new IllegalArgumentException(this + " readData, data overflow:" + len); byte[] data = new byte[len]; read(data, 0, len); return data; } /** 读出一个短字节数组,长度不超过254 */ public byte[] readShortData() { int len = readUnsignedByte(); if (len == 255) return null; byte[] data = new byte[len]; if (len != 0) read(data, 0, len); return data; } /** 读出一个指定长度的字符串 */ public String readString(int len) { byte[] data = new byte[len]; if (len == 0) return ""; read(data, 0, len); return new String(data); } /** 读出一个短字符串,长度不超过254 */ public String readShortString() { int len = readUnsignedByte(); if (len == 255) return null; return readString(len); } /** 读出一个字符串,长度不超过65534 */ public String readString() { int len = readUnsignedShort(); if (len == 65535) return null; return readString(len); } /** 读出一个指定长度和编码类型的字符串 */ public String readUTF(String charsetName) { int len = readLength() - 1; if (len < 0) return null; if (len > MAX_DATA_LENGTH) throw new IllegalArgumentException(this + " readUTF, data overflow:" + len); byte[] data = new byte[len]; read(data, 0, len); if (charsetName == null) return new String(data); try { return new String(data, charsetName); } catch (Exception e) { throw new IllegalArgumentException(this + " readUTF, invalid charsetName:" + charsetName); } } /** 读出一个指定长度的utf字符串 */ public String readUTF() { int len = readLength() - 1; if (len < 0) return null; if (len == 0) return ""; if (len > MAX_DATA_LENGTH) throw new IllegalArgumentException(this + " readUTF, data overflow:" + len); StringBuffer sb = new StringBuffer(len); int pos = ByteKit.readUTF(bytes, offset, len, sb); if (pos > 0) throw new IllegalArgumentException(this + " readUTF, format err, len=" + len + ", pos:" + pos); offset += len; return sb.toString(); } /* write methods */ /** * 写入指定字节数组 * * @param data * 指定的字节数组 * @param pos * 指定的字节数组的起始位置 * @param len * 写入的长度 */ public void write(byte[] data, int pos, int len) { if (bytes.length < top + len) setCapacity(top + len); System.arraycopy(data, pos, bytes, top, len); top += len; } /** 写入一个布尔值 */ public void writeBoolean(boolean b) { if (bytes.length < top + 1) setCapacity(top + CAPACITY); bytes[top++] = (byte) (b ? 1 : 0); } /** 写入一个字节 */ public void writeByte(int b) { if (bytes.length < top + 1) setCapacity(top + CAPACITY); bytes[top++] = (byte) b; } /** 写入一个字符 */ public void writeChar(int c) { writeShort(c); } /** 写入一个短整型数值 */ public void writeShort(int s) { int pos = top; if (bytes.length < pos + 2) setCapacity(pos + CAPACITY); bytes[pos] = (byte) (s >>> 8); bytes[pos + 1] = (byte) s; top += 2; } /** 在指定位置写入一个短整型数值,length不变 */ public void writeShort(int s, int pos) { if (bytes.length < pos + 2) setCapacity(pos + CAPACITY); bytes[pos] = (byte) (s >>> 8); bytes[pos + 1] = (byte) s; } /** 写入一个整型数值 */ public void writeInt(int i) { int pos = top; if (bytes.length < pos + 4) setCapacity(pos + CAPACITY); bytes[pos] = (byte) (i >>> 24); bytes[pos + 1] = (byte) (i >>> 16); bytes[pos + 2] = (byte) (i >>> 8); bytes[pos + 3] = (byte) i; top += 4; } /** 在指定位置写入一个整型数值,length不变 */ public void writeInt(int i, int pos) { if (bytes.length < pos + 4) setCapacity(pos + CAPACITY); bytes[pos] = (byte) (i >>> 24); bytes[pos + 1] = (byte) (i >>> 16); bytes[pos + 2] = (byte) (i >>> 8); bytes[pos + 3] = (byte) i; } /** 写入一个浮点数值 */ public void writeFloat(float f) { writeInt(Float.floatToIntBits(f)); } /** 写入一个长整型数值 */ public void writeLong(long l) { int pos = top; if (bytes.length < pos + 8) setCapacity(pos + CAPACITY); bytes[pos] = (byte) (l >>> 56); bytes[pos + 1] = (byte) (l >>> 48); bytes[pos + 2] = (byte) (l >>> 40); bytes[pos + 3] = (byte) (l >>> 32); bytes[pos + 4] = (byte) (l >>> 24); bytes[pos + 5] = (byte) (l >>> 16); bytes[pos + 6] = (byte) (l >>> 8); bytes[pos + 7] = (byte) l; top += 8; } /** 写入一个双浮点数值 */ public void writeDouble(double d) { writeLong(Double.doubleToLongBits(d)); } /** 写入动态长度 */ public void writeLength(int len) { if (len >= 0x20000000 || len < 0) throw new IllegalArgumentException(this + " writeLength, invalid len:" + len); if (len >= 0x4000) writeInt(len + 0x20000000); else if (len >= 0x80) writeShort(len + 0x4000); else writeByte(len + 0x80); } /** 写入一个字节数组,可以为null */ public void writeData(byte[] data) { writeData(data, 0, (data != null) ? data.length : 0); } /** 写入一个字节数组,可以为null */ public void writeData(byte[] data, int pos, int len) { if (data == null) { writeLength(0); return; } writeLength(len + 1); write(data, pos, len); } /** 写入一个字符串,可以为null */ public void writeString(String s) { if (s != null) { byte[] temp = s.getBytes(); if (temp.length > 65534) throw new IllegalArgumentException(getClass().getName() + " writeString, invalid s:" + s); writeShort(temp.length); if (temp.length != 0) write(temp, 0, temp.length); } else writeShort(65535); } /** 写入一个字符串,以指定的字符进行编码 */ public void writeUTF(String str, String charsetName) { if (str == null) { writeLength(0); return; } byte[] data; if (charsetName != null) { try { data = str.getBytes(charsetName); } catch (Exception e) { throw new IllegalArgumentException(this + " writeUTF, invalid charsetName:" + charsetName); } } else data = str.getBytes(); writeLength(data.length + 1); write(data, 0, data.length); } /** 写入一个utf字符串,可以为null */ public void writeUTF(String str) { writeUTF(str, 0, (str != null) ? str.length() : 0); } /** 写入一个utf字符串中指定的部分,可以为null */ public void writeUTF(String str, int index, int length) { if (str == null) { writeLength(0); return; } int len = ByteKit.getUTFLength(str, index, length); writeLength(len + 1); int pos = top; if (bytes.length < pos + len) setCapacity(pos + len); ByteKit.writeUTF(str, index, length, bytes, pos); top += len; } /** 检查是否为相同类型的实例 */ public boolean checkClass(Object obj) { return (obj instanceof ByteBuffer); } /** 在指定位置写入一个字节,length不变 */ public void writeByte(int b, int pos) { if (bytes.length < pos + 1) setCapacity(pos + CAPACITY); bytes[pos] = (byte) b; } /** 得到字节缓存当前长度的字节数组 */ public byte[] toByteArray() { byte[] data = new byte[top - offset]; System.arraycopy(bytes, offset, data, 0, data.length); return data; } /** 清除字节缓存对象 */ public void clear() { top = 0; offset = 0; } /* common methods */ public int hashCode() { int hash = 17; for (int i = top - 1; i >= 0; i--) hash = 65537 * hash + bytes[i]; return hash; } public boolean equals(Object obj) { if (this == obj) return true; if (!checkClass(obj)) return false; ByteBuffer bb = (ByteBuffer) obj; if (bb.top != top) return false; if (bb.offset != offset) return false; for (int i = top - 1; i >= 0; i--) { if (bb.bytes[i] != bytes[i]) return false; } return true; } public String toString() { return super.toString() + "[" + top + "," + offset + "," + bytes.length + "] "; }}
下面看下 ClientHandle的可读逻辑:
public int handleRead() throws IOException { int r = this.socket.read(this.reader); if(r <= 0) { return -1; } this.reader.flip(); ByteBuffer data = this.createBuffer(); while(data != null) { this.reader.get(data.getByteArray(), data.top(), data.capacity()); this.processData(data); data = this.createBuffer(); } this.reader.clear(); return 0; }
依次将数据读入到reader中,并且按照LC(L表示长度,C表示内容)结构将reader中的数据解析成一个个ByteBuffer对象处理。下面是createBuffer函数和processData函数:
private ByteBuffer createBuffer() { if(reader.remaining() < 4) { return null; } int len = reader.getInt(); if(len > reader.remaining()) { reader.clear(); return null; } if (len > 0 && len <= 10 * 1024) { return new ByteBuffer(len); } return null; }public void processData(ByteBuffer data){ player.insertData(data);}
这里要注意,1:解析reader中的消息一定要做容错处理;2:将解析的待处理包放到玩家身上,让玩家自己处理!
发送函数的处理:
public int handleWrite() throws IOException { ByteBuffer data = writeQueue.poll(); while(data != null) { this.writer.putInt(data.length()); this.writer.put(data.toByteArray(), 0, data.length()); data = writeQueue.poll(); } this.writer.flip(); if(!this.writer.hasRemaining()) { this.writer.limit(this.writer.capacity()); return 0; } this.socket.write(writer); if(this.writer.hasRemaining()) { this.writer.compact(); this.writer.position(this.writer.limit()); this.writer.limit(this.writer.capacity()); } else { this.writer.compact(); this.writer.limit(this.writer.capacity()); } return 0; }
发送函数的处理相对复杂些,首先要做的就是每个连接的发送函数每100ms(可以调整)触发一次,每次触发时候,要将待发送的数据包bytebuffer填充到writer缓冲区,然后一次发送!
管理协调这些链接的新建和处理都是使用了java nio的selector结构,具体的代码就不贴出来了,想要的可以联系我,需要注意的有两点,1:对于空闲连接的处理,2:对于发送数据的处理
大致讲完了网络线程,那么讲一讲主逻辑线程,逻辑线程采用线程绑定地图的设计;在服务器启动之时,启动n(可以调整)个地图线程,每个地图线程绑定N(可以调整)个地图,这N个地图上的所有玩家的逻辑处理,都有地图所在线程来处理,具体处理方式:
地图线程的主逻辑:
public class SceneThread implements Runnable{ Listscenes = new ArrayList (); private int index = 0; @Override public void run() { while(true) { try { synchronized (scenes) { for(IScene scene : this.scenes) { scene.beatHeart(); } } Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); } } } public void addScene(IScene scene) { synchronized (scenes) { this.scenes.add(scene); } } public void removeScene(IScene scene) { synchronized (scenes) { this.scenes.remove(scene); } } public void setIndex(int index) { this.index = index; } public String toString() { return "SceneThread : " + index; } }
场景Scene的心跳函数:
public class Scene implements IScene{ public void beatHeart() { long now = System.currentTimeMillis(); Listplayers = null; synchronized (idPlayerMap) { players = new ArrayList (idPlayerMap.values()); } for(IPlayer player : players) { player.beatHeart(now); } } }
玩家的心跳函数:
BlockingQueuedataToProcess = new LinkedBlockingDeque (); public void insertData(ByteBuffer data) { this.dataToProcess.offer(data); }public void beatHeart(long now){ ByteBuffer data = this.dataToProcess.poll(); while(data != null) { this.processData(data); data = this.dataToProcess.poll(); } //.....处理心跳定时器,上一篇有讲到}
好了,大概的服务器的主逻辑就这些了,是不是精简小巧。晚上的时候还做了一下广播压力测试,效果还不错!
欢迎大家提出宝贵意见!