(源码篇)雨露均沾的OkHttp—WebSocket长连接

2020-10-17 15:31:33 蜻蜓队长

前言

雨露均沾的OkHttp—WebSocket长连接(使用篇)

雨露均沾的OkHttp—WebSocket长连接(源码篇)

上期我们熟悉了OkHttp中实现WebSocket长连接的接入,并且可以通过OkHttp官方的MockWebSocket服务来模拟服务端,实现整个流程。

今天我们就来说下具体OkHttp中是怎么实现这些功能的呢?相信看过这篇文章你也能深刻了解WebSocket这个协议。

使用回顾

简单贴下WebSocket使用方法,方便下面解析:

       //初始化
        mClient = new OkHttpClient.Builder()
                .pingInterval(10, TimeUnit.SECONDS)
                .build();
        Request request = new Request.Builder()
                .url(mWbSocketUrl)
                .build();
        mWebSocket = mClient.newWebSocket(request, new WsListener());
        
        //收到消息回调
        @Override
        public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
            super.onMessage(webSocket, text);
            Log.e(TAG,"收到消息!");
            onWSDataChanged(DATE_NORMAL, text);
        }        
        
        //发送消息
        mWebSocket.send(message);
        
        //主动关闭连接
        mWebSocket.close(code, reason);
复制代码

源码解析

WebSocket整个流程无非三个功能:连接,接收消息,发送消息。下面我们就从这三个方面分析下具体是怎么实现的。

连接

通过上面的代码我们得知,WebSocket连接是通过newWebSocket方法。直接点进去看这个方法:

  override fun newWebSocket(request: Request, listener: WebSocketListener): WebSocket {
    val webSocket = RealWebSocket(
        taskRunner = TaskRunner.INSTANCE,
        originalRequest = request,
        listener = listener,
        random = Random(),
        pingIntervalMillis = pingIntervalMillis.toLong(),
        extensions = null, // Always null for clients.
        minimumDeflateSize = minWebSocketMessageToCompress
    )
    webSocket.connect(this)
    return webSocket
  }
复制代码

这里做了两件事:

  • 初始化RealWebSocket,主要是设置了一些参数(比如pingIntervalMillis心跳包时间间隔,还有监听事件之类的)
  • connect方法进行WebSocket连接

继续查看connect方法:

connect(WebSocket连接握手)
  fun connect(client: OkHttpClient) {
    //***
    val webSocketClient = client.newBuilder()
        .eventListener(EventListener.NONE)
        .protocols(ONLY_HTTP1)
        .build()
    val request = originalRequest.newBuilder()
        .header("Upgrade", "websocket")
        .header("Connection", "Upgrade")
        .header("Sec-WebSocket-Key", key)
        .header("Sec-WebSocket-Version", "13")
        .header("Sec-WebSocket-Extensions", "permessage-deflate")
        .build()
    call = RealCall(webSocketClient, request, forWebSocket = true)
    call!!.enqueue(object : Callback {
      override fun onResponse(call: Call, response: Response) {
        
        //得到数据流
        val streams: Streams
        try {
          checkUpgradeSuccess(response, exchange)
          streams = exchange!!.newWebSocketStreams()
        } 
        
        //***
        // Process all web socket messages.
        try {
          val name = "$okHttpName WebSocket ${request.url.redact()}"
          initReaderAndWriter(name, streams)
          listener.onOpen(this@RealWebSocket, response)
          loopReader()
        } catch (e: Exception) {
          failWebSocket(e, null)
        }
      }
    })
  }
复制代码

上一篇使用篇文章中说过,Websocket连接需要一次Http协议的握手,然后才能把协议升级成WebSocket。所以这段代码就体现出这个功能了。

首先就new了一个用来进行Http连接的request,其中Header的参数就表示我要进行WebSocket连接了,参数解析如下:

  • Connection:Upgrade,表示客户端要连接升级
  • Upgrade:websocket, 表示客户端要升级建立Websocket连接
  • Sec-Websocket-Key:key, 这个key是随机生成的,服务器会通过这个参数验证该请求是否有效
  • Sec-WebSocket-Version:13, websocket使用的版本,一般就是13
  • Sec-webSocket-Extension:permessage-deflate,客户端指定的一些扩展协议,比如这里permessage-deflate就是WebSocket的一种压缩协议。

Header设置好之后,就调用了callenqueue方法,这个方法大家应该都很熟悉吧,OkHttp里面对于Http请求的异步请求就是这个方法。 至此,握手结束,服务器返回响应码101,表示协议升级。

然后我们继续看看获取服务器响应之后又做了什么? 在发送Http请求成功之后,onResponse响应方法里面主要表现为四个处理逻辑:

  • Http流转换成WebSocket流,得到Streams对象,这个流后面会转化成输入流和输出流,也就是进行发送和读取的操作流
  • listener.onOpen(this@RealWebSocket, response),回调了接口WebSocketListeneronOpen方法,告诉用户WebSocket已经连接
  • initReaderAndWriter(name, streams)
  • loopReader()

前两个逻辑还是比较好理解,主要是后两个方法,我们分别解析下。 首先看initReaderAndWriter方法。

initReaderAndWriter(初始化输入流输出流)
  //RealWebSocket.kt

  @Throws(IOException::class)
  fun initReaderAndWriter(name: String, streams: Streams) {
    val extensions = this.extensions!!
    synchronized(this) {
      //***
      
      //写数据,发送数据的工具类
      this.writer = WebSocketWriter()
      
      //设置心跳包事件
      if (pingIntervalMillis != 0L) {
        val pingIntervalNanos = MILLISECONDS.toNanos(pingIntervalMillis)
        taskQueue.schedule("$name ping", pingIntervalNanos) {
          writePingFrame()
          return@schedule pingIntervalNanos
        }
      }
      //***
    }

  //***
  
  //读取数据的工具类
    reader = WebSocketReader(     
      ***
      frameCallback = this,
      ***
    )
  }
  
  internal fun writePingFrame() {
   //***
    try {
      writer.writePing(ByteString.EMPTY)
    } catch (e: IOException) {
      failWebSocket(e, null)
    }
  }  
  
复制代码

这个方法主要干了两件事:

  • 实例化输出流输入流工具类,也就是WebSocketWriterWebSocketReader,用来处理数据的收发。
  • 设置心跳包事件。如果pingIntervalMillis参数不为0,就通过计时器,每隔pingIntervalNanos发送一个ping消息。其中writePingFrame方法就是发送了ping帧数据。

接收消息处理消息

loopReader

接着看看这个loopReader方法是干什么的,看这个名字我们大胆猜测下,难道这个方法就是用来循环读取数据的?去代码里找找答案:

  fun loopReader() {
    while (receivedCloseCode == -1) {
      // This method call results in one or more onRead* methods being called on this thread.
      reader!!.processNextFrame()
    }
  }
复制代码

代码很简单,一个while循环,循环条件是receivedCloseCode == -1的时候,做的事情是reader!!.processNextFrame()方法。继续:

  //WebSocketWriter.kt
  fun processNextFrame() {
    //读取头部信息
    readHeader()
    if (isControlFrame) {
      //如果是控制帧,读取控制帧内容
      readControlFrame()
    } else {
      //读取普通消息内容
      readMessageFrame()
    }
  }
  
  //读取头部信息
  @Throws(IOException::class, ProtocolException::class)
  private fun readHeader() {
    if (closed) throw IOException("closed")
    
    try {
     //读取数据,获取数据帧的前8位
      b0 = source.readByte() and 0xff
    } finally {
      source.timeout().timeout(timeoutBefore, TimeUnit.NANOSECONDS)
    }    
    //***
    //获取数据帧的opcode(数据格式)
    opcode = b0 and B0_MASK_OPCODE
    //是否为最终帧
    isFinalFrame = b0 and B0_FLAG_FIN != 0
    //是否为控制帧(指令)
    isControlFrame = b0 and OPCODE_FLAG_CONTROL != 0

    //判断最终帧,获取帧长度等等
  }  
  
  
  //读取控制帧(指令)
    @Throws(IOException::class)
  private fun readControlFrame() {
    if (frameLength > 0L) {
      source.readFully(controlFrameBuffer, frameLength)
    }

    when (opcode) {
      OPCODE_CONTROL_PING -> {
      //ping 帧
        frameCallback.onReadPing(controlFrameBuffer.readByteString())
      }
      OPCODE_CONTROL_PONG -> {
        //pong 帧
        frameCallback.onReadPong(controlFrameBuffer.readByteString())
      }
      OPCODE_CONTROL_CLOSE -> {
        //关闭 帧
        var code = CLOSE_NO_STATUS_CODE
        var reason = ""
        val bufferSize = controlFrameBuffer.size
        if (bufferSize == 1L) {
          throw ProtocolException("Malformed close payload length of 1.")
        } else if (bufferSize != 0L) {
          code = controlFrameBuffer.readShort().toInt()
          reason = controlFrameBuffer.readUtf8()
          val codeExceptionMessage = WebSocketProtocol.closeCodeExceptionMessage(code)
          if (codeExceptionMessage != null) throw ProtocolException(codeExceptionMessage)
        }
        //回调onReadClose方法
        frameCallback.onReadClose(code, reason)
        closed = true
      }
    }
  }
  
  //读取普通消息
  @Throws(IOException::class)
  private fun readMessageFrame() {
    
    readMessage()

    if (readingCompressedMessage) {
      val messageInflater = this.messageInflater
          ?: MessageInflater(noContextTakeover).also { this.messageInflater = it }
      messageInflater.inflate(messageFrameBuffer)
    }

    if (opcode == OPCODE_TEXT) {
      frameCallback.onReadMessage(messageFrameBuffer.readUtf8())
    } else {
      frameCallback.onReadMessage(messageFrameBuffer.readByteString())
    }
  }  
  
复制代码

代码还是比较直观,这个processNextFrame其实就是读取数据用的,首先读取头部信息,获取数据帧的类型,判断是否为控制帧,再分别去读取控制帧数据或者普通消息帧数据。

数据帧格式

问题来了,什么是数据头部信息,什么是控制帧? 这里就要说下WebSocket的数据帧了,先附上一个数据帧格式:


   0 1 2 3 4 5 6 7    0 1 2 3 4 5 6 7  0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
  +-+-+-+-+-------+  +-+-------------+ +-----------------------------+
  |F|R|R|R| OP    |  |M| LENGTH      |   Extended payload length
  |I|S|S|S| CODE  |  |A|             |  (if LENGTH=126  |N|V|V|V|       |  |S|             |
  | |1|2|3|       |  |K|             |
  +-+-+-+-+-------+  +-+-------------+
  |                      Extended payload length(if LENGTH=127  +                                  +-------------------------------
  |      Extended payload length     | Masking-key,if Mask set to 1
  +----------------------------------+-------------------------------
  |   Masking-key                    |       Data
  +----------------------------------+-------------------------------
  |                                Data
  +----------------------------------+-------------------------------


复制代码

我承认,我懵逼了。 冷静冷静,一步一步分析下吧。

首先每一行代表4个字节,一共也就是32位数,哦,那也就是几个字节而已嘛,每个字节有他自己的代表意义呗,这样想是不是就很简单了,下面来具体看看每个字节。

第1个字节:

  • 第一位是FIN码,其实就是一个标示位,因为数据可能多帧操作嘛,所以多帧情况下,只有最后一帧的FIN设置成1,标示结束帧,前面所有帧设置为0。
  • 第二位到第四位是RSV码,一般通信两端没有设置自定义协议,就默认为0。
  • 后四位是opcode,我们叫它操作码。这个就是判断这个数据帧的类型了,一般有以下几个被定义好的类型:

1) 0x0 表示附加数据帧
2) 0x1 表示文本数据帧
3) 0x2 表示二进制数据帧
4) 0x3-7 保留用于未来的非控制帧
5) 0x8 表示连接关闭
6) 0x9 表示ping
7) 0xA 表示pong
8) 0xB-F 保留用于未来的非控制帧

是不是发现了些什么,这不就对应了我们应用中的几种格式吗?2和3对应的是普通消息帧,包括了文本和二进制数据。567对应的就是控制帧格式,包括了close,ping,pong

第2个字节:

  • 第一位是Mask掩码,其实就是标识数据是否加密混淆,1代表数据经过掩码的,0是没有经过掩码的,如果是1的话,后续就会有4个字节代表掩码key,也就是数据帧中Masking-key所处的位置。
  • 后7位是LENGTH,用来标示数据长度。因为只有7位,所以最大只能储存1111111对应的十进制数127长度的数据,如果需要更大的数据,这个储存长度肯定就不够了。 所以规定来了,1) 小于126长度则数据用这七位表示实际长度。2) 如果长度设置为126,也就是二进制1111110,就代表取额外2个字节表示数据长度,共是16位表示数据长度。3) 如果长度设置为127,也就是二进制1111111,就代表取额外8个字节,共是64位表示数据长度。

需要注意的是LENGHT的三种情况在一个数据帧里面只会出现一种情况,不共存,所以在图中是用if表示。同样的,Masking-key也是当Mask为1的时候才存在。

所以也就有了数据帧里面的Extended payload length(LENGTH=126)所处的2个字节,以及Extended payload length(LENGTH=127)所处的8个字节。

最后的字节部分自然就是掩码key(Mask为1的时候才存在)和具体的传输数据了。
还是有点晕吧

以上内容来自于网络,如有侵权联系即删除
相关文章

上一篇: Flutter之初识贝塞尔曲线 - 实现炫酷的路由动画

下一篇: Lifecycle深度实践与解析

在线咨询
客户经理