WebSocket,并非HTML 5独有,WebSocket是一种协议。只是在handshake的时候,发送的链接信息头和HTTP相似。HTML 5只是实现了WebSocket的客户端。其实,难点在于服务端,服务端相对还是比较复杂的。
网上demo很多,但是能讲到点上的不多,而且也不知道作者有没有深入试验过。 ,handshake这part其实还是比较简单的,比较复杂的还是数据传输(第二部分)比较难。
上图是帧格式,对于解释可以看文档也可以查看文章:
也就说,你需要解读opcode, Payload len 这些比较敏感的位值之后,你才开始取后面的Payload Data, 比如opcode如果是1,那么就是读取字符串,如果是2,那么就是读取流,如果是8,那么就是关闭socket。
如果自己用java做服务端,握手协议的响应,换行符不能使“\r\n”或“\n”,因为这不是标准的换行符,其实是个字符串,只是屏幕显示的时候是换行,可以用PrintWrite.println(),或者
String newLine = (String) java.security.AccessController.doPrivileged(
new sun.security.action.GetPropertyAction("line.separator"));
握手之后,根据opcode做相应的操作,JAVA的IO和NIO,在实现方面都缺憾,难以实现。使用IO的话,在获得socket之后,InputStream会处于阻塞,因为阻塞,所以后续做outputStream的操作时就会不方便。NIO的话,是SocketChannel写入读出,基本都是ByteBuffer,用这个的话,有时opcode值为1(读取字符串),ByteBuffer解码得到的字符串经常性是乱码(这个本人没有深究)。感觉
Tomcat在7.0.27之后就开始支持WebSocket,在它之上建立WebSocket是很简单的,基本就是继承WebSocketServlet,实现createWebSocketInbound方法和重写StreamInbound的方法。Tomcat的example里面有相关的例子。
######如果已经知道tomcat的运行流程那么就略过这一段#####
tomcat的源代码研究,网上挺多的。个人就看过王程斯的。
下面引用一下他的插图:
大体来讲就是通过监听几个端口,运用线程池处理Socket,之后adapter打包数据给容器。
下面罗嗦一下:
tomcat启动时,会启用几个AcceptorThread 监控端口,JIoEndpointAcceptor - Acceptor(线程),将接收到的socket放入 JIoEndpointAcceptor - processSocket 处理。在processSocket 里面 socket被打包之后SocketWrapper放入JIoEndpointAcceptor - SocketProcessor(线程)处理 ( SSL handshake在这里做处理)。在SocketProcessor线程里面,socket被交给 AbstractProtocol process方法进行处理(处理的过程,创建Http11Processor,Http11Protocol - Http11ConnectionHandler - createProcessor)。processor之后会被注册,Http11Processor register(processor);被注册缓存起来,以便其他socket过来可以沿用(对了,Http11Processor 继承于 AbstractHttp11Processor)。继续,socket被Http11Processor - process(SocketWrapper<S> socketWrapper) 处理生成Request对象,要知道tomcat为我们做了封装,我们的输入输出都只需要调用Request和Response。Http11Processor 的process都做了什么呢??getInputBuffer().init(socketWrapper, endpoint);InternalInputBuffer 获得socket的inputStream,getOutputBuffer().init(socketWrapper, endpoint);InternalOutputBuffer 获得socket的outputStream。读取inputStream的值到buf里面,prepareRequest 将值包装到Request。
######################end
好吧,很乱,就着源代码画着图就比较容易理解。而且这个只是Http11的普通IO处理流程,tomcat还能处理ajp协议。而且对于HTTP11还有另一套的NIO处理流程。
将上面的流程,主要是因为:WebSocket启用的是JIOEndpoint,而不是JNIOEndpoint等。
每次交互都会由Http11Processor process处理,如果处理的时候返回的状态是upgrade,也就是http升级协议(websocket协议)。那么tomcat就会封装出一个UpgradeInbound 。
UpgradeInbound inbound = processor.getUpgradeInbound();// Release the Http11 processor to be re-usedrelease(socket, processor, false, false);// Create the light-weight upgrade processorprocessor = createUpgradeProcessor(socket, inbound);inbound.onUpgradeComplete();
之后就会开始握手:
state = processor.upgradeDispatch();
有70页左右,没有深入的看下去,太煎熬了(希望有牛人,可以翻译一下,造福人类)。看了些协议,又看了tomcat的WebSocket实现。现在就是贴自己做的demo(demo很简单,请别喷):
客户端:
Web Socket Demo -- EchoClient WebSocket客户端实例
请输入一些文字
略过,不说,哪哪都有这部分的代码。
服务端:
上面说了,服务端是挺难的。
1、处理握手,本人使用InputStream.read(byte[], off, len)方法读取了字节之后再处理。
2、使用InputStream.read()方法逐一读取字节,解析FIN,opcode,PayloadLen,mask(掩码,根据websocket协议,客户端传过来的数据必须通过掩码计算再传输)等信息,当然rsv在我的demo没有,没用上。
3、知道数据长度了(PS:JAVA的UTF8里面中文可能占用3-4个字节),读取数据,然后通过解码获取真正的byte值。
4、输出结果,结果无需掩码,但是也有格式要求,看协议,懒得看,就看tomcat源码。
package socket;import java.io.ByteArrayInputStream;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.io.OutputStream;import java.io.PrintWriter;import java.net.ServerSocket;import java.net.Socket;import java.nio.ByteBuffer;import java.nio.charset.Charset;import java.security.MessageDigest;/* * 垃圾程序,只求速成,没有效率,复用这个概念,望谅解 * */public class EchoServer { private int port = 8000; private ServerSocket serverSocket; public EchoServer() throws IOException { serverSocket = new ServerSocket(port); System.out.println("服务器启动"); } private void service() { Socket socket = null; while (true) { try { socket = serverSocket.accept(); Thread workThread = new Thread(new Handler(socket)); workThread.start(); } catch (IOException e) { e.printStackTrace(); } } } class Handler implements Runnable { private Socket socket; private boolean hasHandshake = false; Charset charset = Charset.forName("UTF-8"); public Handler(Socket socket) { this.socket = socket; } private PrintWriter getWriter(Socket socket) throws IOException { OutputStream socketOut = socket.getOutputStream(); return new PrintWriter(socketOut, true); } public String echo(String msg) { return "echo:" + msg; } public void run() { try { System.out.println("New connection accepted" + socket.getInetAddress() + ":" + socket.getPort()); InputStream in = socket.getInputStream(); PrintWriter pw = getWriter(socket); //读入缓存 byte[] buf = new byte[1024]; //读到字节 int len = in.read(buf, 0, 1024); //读到字节数组 byte[] res = new byte[len]; System.arraycopy(buf, 0, res, 0, len); String key = new String(res); if(!hasHandshake && key.indexOf("Key") > 0){ //握手 key = key.substring(0, key.indexOf("==") + 2); key = key.substring(key.indexOf("Key") + 4, key.length()).trim(); key+= "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; MessageDigest md = MessageDigest.getInstance("SHA-1"); md.update(key.getBytes("utf-8"), 0, key.length()); byte[] sha1Hash = md.digest(); sun.misc.BASE64Encoder encoder = new sun.misc.BASE64Encoder(); key = encoder.encode(sha1Hash); pw.println("HTTP/1.1 101 Switching Protocols"); pw.println("Upgrade: websocket"); pw.println("Connection: Upgrade"); pw.println("Sec-WebSocket-Accept: " + key); pw.println(); pw.flush(); hasHandshake = true; //接收数据 byte[] first = new byte[1]; //这里会阻塞 int read = in.read(first, 0, 1); while(read > 0){ int b = first[0] & 0xFF; //1为字符数据,8为关闭socket byte opCode = (byte) (b & 0x0F); if(opCode == 8){ socket.getOutputStream().close(); break; } b = in.read(); int payloadLength = b & 0x7F; if (payloadLength == 126) { byte[] extended = new byte[2]; in.read(extended, 0, 2); int shift = 0; payloadLength = 0; for (int i = extended.length - 1; i >= 0; i--) { payloadLength = payloadLength + ((extended[i] & 0xFF) << shift); shift += 8; } } else if (payloadLength == 127) { byte[] extended = new byte[8]; in.read(extended, 0, 8); int shift = 0; payloadLength = 0; for (int i = extended.length - 1; i >= 0; i--) { payloadLength = payloadLength + ((extended[i] & 0xFF) << shift); shift += 8; } } //掩码 byte[] mask = new byte[4]; in.read(mask, 0, 4); int readThisFragment = 1; ByteBuffer byteBuf = ByteBuffer.allocate(payloadLength + 10); byteBuf.put("echo: ".getBytes("UTF-8")); while(payloadLength > 0){ int masked = in.read(); masked = masked ^ (mask[(int) ((readThisFragment - 1) % 4)] & 0xFF); byteBuf.put((byte) masked); payloadLength--; readThisFragment++; } byteBuf.flip(); responseClient(byteBuf, true); printRes(byteBuf.array()); in.read(first, 0, 1); } } in.close(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (socket != null) socket.close(); } catch (IOException e) { e.printStackTrace(); } } } private void responseClient(ByteBuffer byteBuf, boolean finalFragment) throws IOException { OutputStream out = socket.getOutputStream(); int first = 0x00; //是否是输出最后的WebSocket响应片段 if (finalFragment) { first = first + 0x80; first = first + 0x1; } out.write(first); if (byteBuf.limit() < 126) { out.write(byteBuf.limit()); } else if (byteBuf.limit() < 65536) { out.write(126); out.write(byteBuf.limit() >>> 8); out.write(byteBuf.limit() & 0xFF); } else { // Will never be more than 2^31-1 out.write(127); out.write(0); out.write(0); out.write(0); out.write(0); out.write(byteBuf.limit() >>> 24); out.write(byteBuf.limit() >>> 16); out.write(byteBuf.limit() >>> 8); out.write(byteBuf.limit() & 0xFF); } // Write the content out.write(byteBuf.array(), 0, byteBuf.limit()); out.flush(); } private void printRes(byte[] array) { ByteArrayInputStream byteIn = new ByteArrayInputStream(array); InputStreamReader reader = new InputStreamReader(byteIn, charset.newDecoder()); int b = 0; String res = ""; try { while((b = reader.read()) > 0){ res += (char)b; } } catch (IOException e) { e.printStackTrace(); } System.out.println(res); } } public static void main(String[] args) throws IOException { new EchoServer().service(); }}
本文出自:
by lin_bobo