开源库信息

为什么需要WebSocket

对于一个应用来说Http协议几乎是必备的协议,当客户端需要数据的时候通过Http协议发起请求,服务端响应请求返回对应的数据给客户端,但是如果客户端需要实时知道服务的某个状态变化怎么办?在不借助WebSocket的情况下,只能通过轮询来不断向服务端查询这个状态,这种方式对服务端和客户端都会带来资源上的损耗,客户端要不断地发送请求给服务端来请求当前的状态,这种方式一来耗费了大量CPU时间,二来带来了不必要的流量损耗。对于服务端来说多出了一系列的无用的请求,给服务器带来了不必要的负担。最后的结果还是没办法做到真正的实时,当然这里可以做些优化,在某次拉取数据发现状态没有改变的时候,增加下一次请求的时间,直到达到某个阈值后再减小两次请求的间隔时间。还可以使用长轮询:客户端发送一个超时时间很长的请求,服务端长时间持有这个请求,在服务端状态改变的时候通过这个请求返回,长轮询这种方式虽然省去了大量无效请求,减少了服务器压力和一定的网络带宽的占用,但是还是需要保持大量的连接。

但是这里还需要明确一个问题:Http长连接和WebSocket长连接的区别

要弄清楚这个问题必须要明确三个概念HTTP keep-alive,Comet,WebSocket

在之前的HTTP 1.0 之前默认使用的是短连接,短连接的特点是客户端和服务端每进行一次通信就会建立一次连接,这里的连接指的是传输层的TCP连接,也就是说每次请求都会建立一次TCP连接,然后发送请求,等待收到服务端返回的请求后,断开这个连接。整个过程是由客户端驱动的,服务端只能响应某个具体的请求,不能主动和客户端建立连接发送数据。

HTTP 1.1之后就默认使用长连接,它会在响应头加入如下属性:

Connection:keep-alive;

那么这种长连接是不是就可以实现客户端和服务端全双工通信呢?其实不能的,因为这里的长连接还是传输层的长连接并不是应用层面的长连接,它相对于HTTP 1.0 来说,每次请求客户端收到来自服务端的返回体之后,底层的TCP连接不会立马断掉,如果后续有 HTTP 请求还是会复用这个底层的TCP连接。但是应用层面上还是遵循了HTTP协议规定的一个Request,一个Response的过程。一个请求获得一个响应后应用层必定会断掉。而且只有客户端发起的请求才会有响应,也就是说整个过程还是客户端来驱动的,客户端索要数据,服务端响应请求返回数据。即使采用keep-alive技术来保证TCP连接不会断,如果服务端也无法主动给客户端发起一个HTTP请求。

Comet是指客户端发送一个HTTP请求,但是服务端不会立刻返回,而是一直持有着这个请求直到有客户端需要的内容后再返回,在这个期间这个 HTTP请求可以连接维持比较长的时间。类似一种服务端推送机制:客户端发起请求相当于先把连接建立好,等服务端有消息需要返回时再返回给客户端,但是本质还是不变的,服务器的每次数据下发都是针对客户端先前发起的某次请求,服务端不能无缘无故地向客户端下发数据。

WebSocketHTTP两个协议在协议堆栈里面其实是两个对等的协议,都是位于应用层,是两个完全不同的概念,WebSocket可以不用像HTTP协议那样,要先有请求后服务端才能向客户端返回数据,它的长链接是应用层上的长连接,而不再单单是传输层上TCP上的长连接,但是它和HTTP协议都是基于TCP基础之上的。在WebSocket连接建立后,服务端可以主动地向客户端发送数据,从而实现全双工通讯。

所以什么是WebSocket,为什么需要有WebSocket? WebSocket 实际上是一个与Http协议一样都是基于TCP协议之上的应用层协议,它和Http协议的区别在于它不再遵循客户端主动发起请求,服务端响应的Request-Response 机制,而是可以在客户端没有发送请求的情况下,服务端主动下发数据给客户端。实现客户端和服务端的全双工通讯,也就是说WebSocket是为了弥补Http不能全双工数据通信的不足而推出的,连接一旦建立,双方可以随时向对方发送数据。

WebSocket 协议简要介绍

WebSocket的默认端口是80和443,分别对应的协议为ws://,wss://,整个协议包含三大部分:
建立连接握手数据通信

建立连接

在客户端与服务端进行握手之前,客户端和服务端需要建立好连接:

  • 一个客户端对于相同的目标地址(这里简单理解成一个服务器IP),同一时刻只能有一个处于CONNECTING状态的连接。如果当前已有指向目标地址的连接处于CONNECTING状态,就要等待这个连接成功,或者建立失败之后才能建立新的连接。

  • 如果客户端处于一个代理环境中,它首先要请求它的代理来建立一个到达目标地址的TCP连接。例如,如果客户端处于代理环境中,它想要连接某目标地址的80端口,它可能要首先发送以下消息:

CONNECT example.com:80 HTTP/1.1
Host: example.com
  • 如果客户端没有处于代理环境中,那么就需要建立一个到达目标地址的直接的TCP连接

握手

建立好客户端和服务端的连接后,客户端就可以向服务端发起握手请求了。握手请求消息的方法必须是GET,HTTP版本必须大于1.1 。

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13

出于兼容性的考虑,WS的建立连接还是依赖HTTP来实现,这种方式的好处是:握手时不容易被屏蔽,能通过各种 HTTP 代理服务器。

下面针对上述给出的包一一进行介绍:

  • ****Upgrade ****
Upgrade是HTTP 1.1中用于定义转换协议的header域,它表示如果服务器支持的话,客户端希望从已建立好的连接协议,切换到另外一个应用层协议,这里是从HTTP协议切换到WebSocket协议。
  • **** Connection ****
HTTP 1.1中规定Upgrade只能应用在直接连接中,也就是说WS的连接不能通过中间人来转发,它必须是一个直接连接。如果客户端和服务器之间是通过代理连接的,那么在发送这个握手消息之前首先要发送CONNECT消息来建立直接连接。

  • **** Sec-WebSocket-Key ****
请求消息中的"Sec-WebSocket-Key"是一个Base64编码的16位随机字符,服务器端会用这些数据来构造出一个SHA-1的信息摘要,把"Sec-WebSocket-Key"加上一个魔幻字符串"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"。使用SHA-1加密,然后进行BASE-64编码,将结果做为"Sec-WebSocket-Accept"头的值,返回给客户端。
  • **** Origin ****
用于防止跨站攻击,客户端会使用这个来标识原始域。
  • Sec-WebSocket-Protocol
客户端支持的子协议的列表,这个字段用于协商应用子协议,在创建 WebSocket 对象的时候,可以传递一个可选的子协议数组,告诉服务器,客户端可以理解哪些协议。服务器必须从数据里面选择几个支持的协议进行返回,如果一个都不支持,那么会直接导致握手失败。客户端可以不发送子协议,但是一旦发送,服务器无法支持其中任意一个都会导致握手失败。

  • Sec-WebSocket-Version
客户端支持的WS协议的版本列表,客户端可以初始请求它选择的 WebSocket 协议版本,如果服务器支持请求的版本服务器将接受该版本。如果服务器不支持请求的版本,它会返回一个包含所有它所能支持的Sec-WebSocket-Version头字段。 在这时候客户端可以根据服务端返回的服务端支持的版本,重新构建请求发起握手。
下面是一个服务端返回的响应头信息:

HTTP/1.1 400 Bad Request
Sec-WebSocket-Version: 13, 8

在收到请求后服务端作为回应会返回如下的报文:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat

101表示服务器收到了客户端切换协议的请求,并且同意切换到此协议,Sec-WebSocket-Accept的生成方式在上面已经介绍过了,通过这个步骤,客户端和服务端会建立起一个长连接。

客户端收到服务端发送过来的应答数据后,如果返回的状态码为101,则可以开始解析header域:

* 判断是否含有Upgrade头,且内容包含websocket。
* 判断是否含有Connection头,且内容包含Upgrade
* 判断是否含有Sec-WebSocket-Accept头,并对这个字段进行校验。
* 如果含有Sec-WebSocket-Extensions头,要判断是否之前的Request握手带有此内容,如果没有,则连接失败。
* 如果含有Sec-WebSocket-Protocol头,要判断是否之前的Request握手带有此协议,如果没有,则连接失败。

上面最关键的就是Sec-WebSocket-Accept字段的校验。

WebSocket数据通信

WebSocket 会把应用的消息分割成一个或多个帧,接收方接到到多个帧会进行组装,等到接收到完整消息之后再通知接收端。

0                   1                   2                   3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
FIN      1bit 表示信息的最后一帧,flag,也就是标记符
RSV 1-3 1bit each 以后备用的 默认都为 0
Opcode 4bit 帧类型,稍后细说
Mask 1bit 掩码,是否加密数据,只适用于客户端发给服务器的消息,客户端给服务器发送消息,这里一定为 1
Payload 7bit 数据的长度
Masking-key 1 or 4 bit 掩码Key
Payload data (x + y) bytes 数据
Extension data x bytes 扩展数据
Application data y bytes 程序数据

Opcode 字段代表的意思如下所示:

%x0:表示一个延续帧。当Opcode为0时,表示本次数据传输采用了数据分片,当前收到的数据帧为其中一个数据分片。
%x1:表示这是一个文本帧(frame)
%x2:表示这是一个二进制帧(frame)
%x3-7:保留的操作代码,用于后续定义的非控制帧。
%x8:表示连接断开。
%x9:表示这是一个ping操作。
%xA:表示这是一个pong操作。
%xB-F:保留的操作代码,用于后续定义的控制帧。

WebSocket分帧规则

分帧是通过将消息分割为更小的一个个分段以更好地共享输出通道。

RFC 6455 规定的分帧规则如下:

1. 一个没有分片的消息由一个带有FIN值为1以及一个非0操作码的帧组成。
2. 一个分片的消息由单个带有FIN为0 和一个非0操作码的帧组成,跟随零个或多个带有FIN位为0和操作码设置为0的帧,且终止于一个带有FIN等于1且操作码为0的帧。
3. 消息分片必须按发送者发送顺序交付给接收者,片段中的消息不能与片段中的另一个消息交替。
4. 控制帧本身必须不被分割,中间件必须不尝试改变控制帧的分片。
5. 一个端点必须能处理一个分片消息中间的控制帧。

也就是说一个分帧后的数据可能会以如下形式呈现:

开始帧 :单个帧,FIN 设为 0,opcode 非 0;
中间帧 :0 个或多个帧,FIN 设为 0,opcode 设为 0;
结束帧:单个帧,FIN 设为 1,opcode 设为 0的帧。

其中开始帧和结束帧可以带数据也可以不带数据

WebSocket采用排队的机制,希望发送出去的数据会先丢到数据缓存区中,然后按照排队的顺序进行发送。

SRWebSocket 源码解析

上层使用方法

SRWebSocket 整个代码量其实不多,就两个文件,在使用上也十分方便,几行代码就可以完成SRWebSocket的接入:

- (void)connectWebSocketServer:(NSString *)server port:(NSString *)port {
NSURLRequest *request = [NSURLRequest requestWithURL:[NSURL URLWithString:[NSString stringWithFormat:@"ws://%@:%@",server,port]]];
_socket = [[SRWebSocket alloc] initWithURLRequest:request];
_socket.delegate = self;
[_socket open];
}

- (void)webSocket:(SRWebSocket *)webSocket didReceiveMessage:(id)message {

}

- (void)webSocketDidOpen:(SRWebSocket *)webSocket{

}

- (void)webSocket:(SRWebSocket *)webSocket didFailWithError:(NSError *)error {

}

- (void)webSocket:(SRWebSocket *)webSocket didCloseWithCode:(NSInteger)code reason:(NSString *)reason wasClean:(BOOL)wasClean {

}

有了上面WebSocket原理的介绍对整个代码的理解会轻松很多,接下来我们就逐步对SRWebSocket进行解析:

1. SRWebSocket初始化

- (id)initWithURLRequest:(NSURLRequest *)request
protocols:(NSArray *)protocols
allowsUntrustedSSLCertificates:(BOOL)allowsUntrustedSSLCertificates {

if (self = [super init]) {
assert(request.URL);
_url = request.URL;
_urlRequest = request;
_allowsUntrustedSSLCertificates = allowsUntrustedSSLCertificates;
_requestedProtocols = [protocols copy];
[self _SR_commonInit];
}
return self;
}

initWithURLRequest最关键的部分在于对****_SR_commonInit****方法的调用,其中_allowsUntrustedSSLCertificates表示是否允许未经信任的SSL证书,_requestedProtocols就是上面提到的WebSocket子协议。

- (void)_SR_commonInit {

//校验URL的scheme,必须是ws,http,wss,https
NSString *scheme = _url.scheme.lowercaseString;
assert([scheme isEqualToString:@"ws"] || [scheme isEqualToString:@"http"] || [scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]);

//是否是安全的请求
if ([scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]) {
_secure = YES;
}

// 初始化状态为SR_CONNECTING
_readyState = SR_CONNECTING;
_consumerStopped = YES;
//webSocket 版本
_webSocketVersion = 13;

//初始化串行工作队列
_workQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
dispatch_queue_set_specific(_workQueue, (__bridge void *)self, maybe_bridge(_workQueue), NULL);

//设置代理queue为主队列
_delegateDispatchQueue = dispatch_get_main_queue();
sr_dispatch_retain(_delegateDispatchQueue);

//读写缓存
_readBuffer = [[NSMutableData alloc] init];
_outputBuffer = [[NSMutableData alloc] init];
//当前数据帧
_currentFrameData = [[NSMutableData alloc] init];

//消费者队列
_consumers = [[NSMutableArray alloc] init];

_consumerPool = [[SRIOConsumerPool alloc] init];

//RunLoopes
_scheduledRunloops = [[NSMutableSet alloc] init];
[self _initializeStreams];
}

_SR_commonInit 在最开始的时候对当前协议进行了校验,如果不是ws,http,wss,https,就会报错。而后就是WebSocket状态的设置,以及WebSocket版本的设置。紧接着初始化工作队列_workQueue,WebSocket工作在串行队列中,后续在介绍到数据收发的时候会介绍到这个队列。至于为什么是串行队列,通过上面的理论介绍估计大家心里已经有答案了,但是不急后面会给大家详细介绍_workQueue在数据收发过程中的使用。再接着就是_delegateDispatchQueue的初始化,它是一个运行在主线程上的一个队列。再接着就是读写缓存的初始化,以及消费者数组,对应Runloop 的初始化。

- (void)_initializeStreams {
//断言 port值小于UINT32_MAX
assert(_url.port.unsignedIntValue <= UINT32_MAX);
uint32_t port = _url.port.unsignedIntValue;
//如果没有指定port值则使用webSocket默认的端口值
if (port == 0) {
if (!_secure) {
port = 80;
} else {
port = 443;
}
}
NSString *host = _url.host;

CFReadStreamRef readStream = NULL;
CFWriteStreamRef writeStream = NULL;

//CFStreamCreatePairWithSocketToHost接口用于创建一对Socket stream,一个用于读取,一个用于写入
CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)host, port, &readStream, &writeStream);

_outputStream = CFBridgingRelease(writeStream);
_inputStream = CFBridgingRelease(readStream);

//代理设为自己
_inputStream.delegate = self;
_outputStream.delegate = self;
}

_initializeStreams 最主要的是通过CFStreamCreatePairWithSocketToHost接口创建一对Socket stream,一个用于读取(readStream),一个用于写入(writeStream)并为读写stream。设置对应的代理。

到此为止整个初始化过程结束,最主要的工作如下:

  1. 串行工作队列_workQueue的创建
  2. 代理分发队列_delegateDispatchQueue的创建
  3. 输入输出缓存 _readBuffer,_outputBuffer的创建
  4. 消费者数组_consumers的创建
  5. WebSocket 相关的runloop的创建
  6. 输入输出流的创建,以及对应代理NSStreamDelegate的设置

NSStreamDelegate定义如下所示:

````
@protocol NSStreamDelegate
@optional

  • (void)stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode;
    @end

typedef NS_OPTIONS(NSUInteger, NSStreamEvent) {
NSStreamEventNone = 0,
NSStreamEventOpenCompleted = 1UL << 0,
NSStreamEventHasBytesAvailable = 1UL << 1,
NSStreamEventHasSpaceAvailable = 1UL << 2,
NSStreamEventErrorOccurred = 1UL << 3,
NSStreamEventEndEncountered = 1UL << 4
};


****2. 开启连接****

  • (void)open {

    assert(_url);
    NSAssert(_readyState == SR_CONNECTING, @”Cannot call -(void)open on SRWebSocket more than once”);

    //如果有指定超时时间则延迟指定超时时间后查看状态是否还是SR_CONNECTING 如果是则返回超时消息
    _selfRetain = self;
    if (_urlRequest.timeoutInterval > 0) {
    dispatch_time_t popTime = dispatch_time(DISPATCH_TIME_NOW, _urlRequest.timeoutInterval * NSEC_PER_SEC);
    dispatch_after(popTime, dispatch_get_main_queue(), ^(void){
    if (self.readyState == SR_CONNECTING)
    [self _failWithError:[NSError errorWithDomain:@”com.squareup.SocketRocket” code:504 userInfo:@{NSLocalizedDescriptionKey: @”Timeout Connecting to Server”}]];
    });
    }

    //执行连接任务
    [self openConnection];

}


  • (void)openConnection {

    //更新安全、流配置
    [self _updateSecureStreamOptions];

    //判断有没有runloop,如果没有的话就新建对应的runloop
    if (!_scheduledRunloops.count) {
    [self scheduleInRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode];
    }

    //开启输入输出流
    [_outputStream open];
    [_inputStream open];

}


这里最关键有三个部分:

1. ****_updateSecureStreamOptions****

如果是****wss://****类型的WebSocket
  • (void)_updateSecureStreamOptions {
    if (_secure) {
    NSMutableDictionary *SSLOptions = [[NSMutableDictionary alloc] init];
    [_outputStream setProperty:(__bridge id)kCFStreamSocketSecurityLevelNegotiatedSSL
    forKey:(__bridge id)kCFStreamPropertySocketSecurityLevel];
    // 如果我们正在使用pinned certs 我们将不对证书链进行验证
    if ([_urlRequest SR_SSLPinnedCertificates].count) {
    [SSLOptions setValue:@NO forKey:(__bridge id)kCFStreamSSLValidatesCertificateChain];
    }
    #if DEBUG
    //如果是debug模式则允许非信任的证书
    self.allowsUntrustedSSLCertificates = YES;
    #endif
    //如果允许非信任的证书则将配置设置到SSLOptions
    if (self.allowsUntrustedSSLCertificates) {
    [SSLOptions setValue:@NO forKey:(__bridge id)kCFStreamSSLValidatesCertificateChain];
    SRFastLog(@”Allowing connection to any root cert”);
    }
    //将SSLOptions设置为输出流设置属性
    [_outputStream setProperty:SSLOptions
    forKey:(__bridge id)kCFStreamPropertySSLSettings];
    }
    //重新设置输入流和输出流的delegate
    _inputStream.delegate = self;
    _outputStream.delegate = self;
    //为输入流和输出流设置networkServiceType
    [self setupNetworkServiceType:_urlRequest.networkServiceType];
    }
_updateSecureStreamOptions 只针对安全流才需要设置的,在这里会根据SR_SSLPinnedCertificates是否存在,以及allowsUntrustedSSLCertificates值设置_outputStream的SSL设置。然后重新设置_inputStream以及_outputStream的delegate。

  • (void)setupNetworkServiceType:(NSURLRequestNetworkServiceType)requestNetworkServiceType {
    NSString *networkServiceType;
    switch (requestNetworkServiceType) {
    case NSURLNetworkServiceTypeDefault:
    break;
    case NSURLNetworkServiceTypeVoIP: {
    networkServiceType = NSStreamNetworkServiceTypeVoIP;
    #if TARGET_OS_IPHONE && __IPHONE_9_0
    if (floor(NSFoundationVersionNumber) > NSFoundationVersionNumber_iOS_8_3) {
    static dispatch_once_t predicate;
    dispatch_once(&predicate, ^{
    NSLog(@”SocketRocket: %@ - this service type is deprecated in favor of using PushKit for VoIP control”, networkServiceType);
    });
    }
    #endif
    break;
    }
    case NSURLNetworkServiceTypeVideo:
    networkServiceType = NSStreamNetworkServiceTypeVideo;
    break;
    case NSURLNetworkServiceTypeBackground:
    networkServiceType = NSStreamNetworkServiceTypeBackground;
    break;
    case NSURLNetworkServiceTypeVoice:
    networkServiceType = NSStreamNetworkServiceTypeVoice;
    break;
    }
    if (networkServiceType != nil) {
    [_inputStream setProperty:networkServiceType forKey:NSStreamNetworkServiceType];
    [_outputStream setProperty:networkServiceType forKey:NSStreamNetworkServiceType];
    }
    }

    然后再设置_inputStream 以及 _outputStream 的 NSStreamNetworkServiceType属性。

    2. ****_scheduledRunloops****

  • (NSRunLoop *)SR_networkRunLoop {
    static dispatch_once_t onceToken;
    dispatch_once(&onceToken, ^{
    networkThread = [[_SRRunLoopThread alloc] init];
    networkThread.name = @”com.squareup.SocketRocket.NetworkThread”;
    [networkThread start];
    networkRunLoop = networkThread.runLoop;
    });
    return networkRunLoop;
    }

    在SR_networkRunLoop方法中会新创建一个NSThread的线程,然后持有这个线程的runLoop作为networkRunLoop,由于这个线程是通过单例创建的所以,在整个应用生命周期都会存在。提到networkThread不得不提_workQueue,networkThread主要用于支持读写的工作线程,_workQueue是用于处理控制任务的队列,两者单独分开,各司其职。这样做的目的主要是为了避免数据的读写任务阻塞了控制任务,从而影响了事件的实时性。

    3. ****_outputStream/_inputStream open****

    调用_outputStream/_inputStream open之后端口就会被打开,这时候输入输出流就会有数据通过代理传递进来。


    ****3. 通过代理处理数据****

  • (void)stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode {
    __weak typeof(self) weakSelf = self;

    //如果协议是wss://类型而且_pinnedCertFound 为NO,并且事件类型是有可读数据未读,或者事件类型是还有空余空间可写
    if (_secure && !_pinnedCertFound && (eventCode == NSStreamEventHasBytesAvailable || eventCode == NSStreamEventHasSpaceAvailable)) {
    //获取到SSLPinnedCertificates
    NSArray *sslCerts = [_urlRequest SR_SSLPinnedCertificates];
    if (sslCerts) {
    将NSStream中的证书与服务端请求中的SSLPinnedCertificates一个一个进行比较,查看是否有找到匹配的。
    SecTrustRef secTrust = (__bridge SecTrustRef)[aStream propertyForKey:(__bridge id)kCFStreamPropertySSLPeerTrust];
    if (secTrust) {
    NSInteger numCerts = SecTrustGetCertificateCount(secTrust);
    for (NSInteger i = 0; i < numCerts && !_pinnedCertFound; i++) {
    SecCertificateRef cert = SecTrustGetCertificateAtIndex(secTrust, i);
    NSData *certData = CFBridgingRelease(SecCertificateCopyData(cert));

    for (id ref in sslCerts) {
    SecCertificateRef trustedCert = (__bridge SecCertificateRef)ref;
    NSData *trustedCertData = CFBridgingRelease(SecCertificateCopyData(trustedCert));

    if ([trustedCertData isEqualToData:certData]) {
    _pinnedCertFound = YES;
    break;
    }
    }
    }
    }
    if (!_pinnedCertFound) {
    //服务端证书无效
    dispatch_async(_workQueue, ^{
    NSDictionary *userInfo = @{ NSLocalizedDescriptionKey : @”Invalid server cert” };
    [weakSelf _failWithError:[NSError errorWithDomain:@”org.lolrus.SocketRocket” code:23556 userInfo:userInfo]];
    });
    return;
    } else if (aStream == _outputStream) {
    //如果流是输出流,则打开流成功
    dispatch_async(_workQueue, ^{
    [self didConnect];
    });
    }
    }
    }

    dispatch_async(_workQueue, ^{
    //将当前数据交给_workQueue处理
    [weakSelf safeHandleEvent:eventCode stream:aStream];
    });

}


  • (void)stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode

    是数据的主要集中地,对于未认证的请求,会先对服务端的证书进行认证,如果认证成功会调用didConnect进行握手,然后不论是什么类型的请求,都会进过safeHandleEvent将数据分发出去。接下来我们重点看下didConnect以及safeHandleEvent:

  • (void)didConnect {

    // 设置GET请求,并指定HTTP 协议版本为1.1
    CFHTTPMessageRef request = CFHTTPMessageCreateRequest(NULL, CFSTR(“GET”), (__bridge CFURLRef)_url, kCFHTTPVersion1_1);

    // 设置Host字段
    CFHTTPMessageSetHeaderFieldValue(request, CFSTR(“Host”), (__bridge CFStringRef)(_url.port ? [NSString stringWithFormat:@”%@:%@”, _url.host, _url.port] : _url.host));

    //生成随机数
    NSMutableData *keyBytes = [[NSMutableData alloc] initWithLength:16];
    SecRandomCopyBytes(kSecRandomDefault, keyBytes.length, keyBytes.mutableBytes);

    //Base64加密
    if ([keyBytes respondsToSelector:@selector(base64EncodedStringWithOptions:)]) {
    _secKey = [keyBytes base64EncodedStringWithOptions:0];
    } else {

#pragma clang diagnostic push
#pragma clang diagnostic ignored “-Wdeprecated-declarations”
_secKey = [keyBytes base64Encoding];
#pragma clang diagnostic pop
}

assert([_secKey length] == 24);

// Apply cookies if any have been provided
NSDictionary * cookies = [NSHTTPCookie requestHeaderFieldsWithCookies:[self requestCookies]];
for (NSString * cookieKey in cookies) {
    NSString * cookieValue = [cookies objectForKey:cookieKey];
    if ([cookieKey length] && [cookieValue length]) {
        CFHTTPMessageSetHeaderFieldValue(request, (__bridge CFStringRef)cookieKey, (__bridge CFStringRef)cookieValue);
    }
}

// 添加认证字段
if (_url.user.length && _url.password.length) {
    NSData *userAndPassword = [[NSString stringWithFormat:@"%@:%@", _url.user, _url.password] dataUsingEncoding:NSUTF8StringEncoding];
    NSString *userAndPasswordBase64Encoded;
    if ([keyBytes respondsToSelector:@selector(base64EncodedStringWithOptions:)]) {
        userAndPasswordBase64Encoded = [userAndPassword base64EncodedStringWithOptions:0];
    } else {

#pragma clang diagnostic push
#pragma clang diagnostic ignored “-Wdeprecated-declarations”
userAndPasswordBase64Encoded = [userAndPassword base64Encoding];
#pragma clang diagnostic pop
}
_basicAuthorizationString = [NSString stringWithFormat:@”Basic %@”, userAndPasswordBase64Encoded];
CFHTTPMessageSetHeaderFieldValue(request, CFSTR(“Authorization”), (__bridge CFStringRef)_basicAuthorizationString);
}

CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Upgrade"), CFSTR("websocket"));
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Connection"), CFSTR("Upgrade"));
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Sec-WebSocket-Key"), (__bridge CFStringRef)_secKey);
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Sec-WebSocket-Version"), (__bridge CFStringRef)[NSString stringWithFormat:@"%ld", (long)_webSocketVersion]);
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Origin"), (__bridge CFStringRef)_url.SR_origin);

//追加子协议
if (_requestedProtocols) {
    CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Sec-WebSocket-Protocol"), (__bridge CFStringRef)[_requestedProtocols componentsJoinedByString:@", "]);
}

//添加http头
[_urlRequest.allHTTPHeaderFields enumerateKeysAndObjectsUsingBlock:^(id key, id obj, BOOL *stop) {
    CFHTTPMessageSetHeaderFieldValue(request, (__bridge CFStringRef)key, (__bridge CFStringRef)obj);
}];

NSData *message = CFBridgingRelease(CFHTTPMessageCopySerializedMessage(request));

CFRelease(request);

//将数据送出
[self _writeData:message];
//指派数据消费者等待响应头部准备就绪后读取HttpHeader
[self _readHTTPHeader];

}


上述的****didConnect****其实就是之前介绍的WebSocket握手阶段。它会构建出一个Http 1.1 的请求头,然后发送出去,并分配数据消费者,等待消费服务端返回的应答头部。

  • (void)_readHTTPHeader {
    if (_receivedHTTPHeaders == NULL) {
    _receivedHTTPHeaders = CFHTTPMessageCreateEmpty(NULL, NO);
    }
    //等待头数据读取完毕后执行下面的block
    [self _readUntilHeaderCompleteWithCallback:^(SRWebSocket *self, NSData *data) {
    //将获取到的数据追加到_receivedHTTPHeaders
    CFHTTPMessageAppendBytes(_receivedHTTPHeaders, (const UInt8 *)data.bytes, data.length);
    //判断请求头响应部分是否已经读取完毕
    if (CFHTTPMessageIsHeaderComplete(_receivedHTTPHeaders)) {
    // 如果读取完毕则调用_HTTPHeadersDidFinish
    [self _HTTPHeadersDidFinish];
    } else {
    // 否则继续读返回的头部数据
    [self _readHTTPHeader];
    }
    }];
    }


  • (void)_HTTPHeadersDidFinish {

    //获取返回的状态码
    NSInteger responseCode = CFHTTPMessageGetResponseStatusCode(_receivedHTTPHeaders);

    //如果大于400 表示握手失败
    if (responseCode >= 400) {
    SRFastLog(@”Request failed with response code %d”, responseCode);
    [self _failWithError:[NSError errorWithDomain:SRWebSocketErrorDomain code:2132 userInfo:@{NSLocalizedDescriptionKey:[NSString stringWithFormat:@”received bad response code from server %ld”, (long)responseCode], SRHTTPResponseErrorKey:@(responseCode)}]];
    return;
    }

    //检查服务器返回的Sec-WebSocket-Accept字段是否正确
    if(![self _checkHandshake:_receivedHTTPHeaders]) {
    [self _failWithError:[NSError errorWithDomain:SRWebSocketErrorDomain code:2133 userInfo:[NSDictionary dictionaryWithObject:[NSString stringWithFormat:@”Invalid Sec-WebSocket-Accept response”] forKey:NSLocalizedDescriptionKey]]];
    return;
    }

    //对子协议进行校验
    NSString *negotiatedProtocol = CFBridgingRelease(CFHTTPMessageCopyHeaderFieldValue(_receivedHTTPHeaders, CFSTR(“Sec-WebSocket-Protocol”)));
    if (negotiatedProtocol) {
    // Make sure we requested the protocol
    if ([_requestedProtocols indexOfObject:negotiatedProtocol] == NSNotFound) {
    [self _failWithError:[NSError errorWithDomain:SRWebSocketErrorDomain code:2133 userInfo:[NSDictionary dictionaryWithObject:[NSString stringWithFormat:@”Server specified Sec-WebSocket-Protocol that wasn’t requested”] forKey:NSLocalizedDescriptionKey]]];
    return;
    }

    _protocol = negotiatedProtocol;
    }

    //上述校验结束后代表整个WebSocket已经打开了
    self.readyState = SR_OPEN;

    //如果没有错误发生则读取新的帧
    if (!_didFail) {
    [self _readFrameNew];
    }

    //通过代理通知业务层WebSocket已经打开了
    [self _performDelegateBlock:^{
    if ([self.delegate respondsToSelector:@selector(webSocketDidOpen:)]) {
    [self.delegate webSocketDidOpen:self];
    };
    }];

}


我们接下来看下每次数据进来是怎么进行交付的。我们再回到:

  • (void)stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode
    除了上述的认证过程外,还有个十分重要的方法safeHandleEvent:stream,每个数据进来都会通过这个方法进行处理:

  • (void)safeHandleEvent:(NSStreamEvent)eventCode stream:(NSStream *)aStream {
    switch (eventCode) {
    case NSStreamEventOpenCompleted: {
    SRFastLog(@”NSStreamEventOpenCompleted %@”, aStream);
    if (self.readyState >= SR_CLOSING) {
    return;
    }
    assert(_readBuffer);

    // didConnect fires after certificate verification if we’re using pinned certificates.
    BOOL usingPinnedCerts = [[_urlRequest SR_SSLPinnedCertificates] count] > 0;
    if ((!_secure || !usingPinnedCerts) && self.readyState == SR_CONNECTING && aStream == _inputStream) {
    [self didConnect];
    }
    [self _pumpWriting];
    [self _pumpScanner];
    break;
    }

    case NSStreamEventErrorOccurred: {
    SRFastLog(@”NSStreamEventErrorOccurred %@ %@”, aStream, [[aStream streamError] copy]);
    /// TODO specify error better!
    [self _failWithError:aStream.streamError];
    _readBufferOffset = 0;
    [_readBuffer setLength:0];
    break;

    }

    case NSStreamEventEndEncountered: {
    [self _pumpScanner];
    SRFastLog(@”NSStreamEventEndEncountered %@”, aStream);
    if (aStream.streamError) {
    [self _failWithError:aStream.streamError];
    } else {
    dispatch_async(_workQueue, ^{
    if (self.readyState != SR_CLOSED) {
    self.readyState = SR_CLOSED;
    [self _scheduleCleanup];
    }

    if (!_sentClose && !_failed) {
    _sentClose = YES;
    // If we get closed in this state it’s probably not clean because we should be sending this when we send messages
    [self _performDelegateBlock:^{
    if ([self.delegate respondsToSelector:@selector(webSocket:didCloseWithCode:reason:wasClean:)]) {
    [self.delegate webSocket:self didCloseWithCode:SRStatusCodeGoingAway reason:@”Stream end encountered” wasClean:NO];
    }
    }];
    }
    });
    }

    break;
    }

    case NSStreamEventHasBytesAvailable: {
    SRFastLog(@”NSStreamEventHasBytesAvailable %@”, aStream);
    const int bufferSize = 2048;
    uint8_t buffer[bufferSize];

    while (_inputStream.hasBytesAvailable) {
    NSInteger bytes_read = [_inputStream read:buffer maxLength:bufferSize];

    if (bytes_read > 0) {
    [_readBuffer appendBytes:buffer length:bytes_read];
    } else if (bytes_read < 0) {
    [self _failWithError:_inputStream.streamError];
    }

    if (bytes_read != bufferSize) {
    break;
    }
    };
    [self _pumpScanner];
    break;
    }

    case NSStreamEventHasSpaceAvailable: {
    SRFastLog(@”NSStreamEventHasSpaceAvailable %@”, aStream);
    [self _pumpWriting];
    break;
    }

    default:
    SRFastLog(@”(default) %@”, aStream);
    break;
    }

}


这里有个十分关键的类型:****NSStreamEvent****用于表示当前消息的类型:

typedef NS_OPTIONS(NSUInteger, NSStreamEvent) {
NSStreamEventNone = 0,
NSStreamEventOpenCompleted = 1UL << 0, //打开完成
NSStreamEventHasBytesAvailable = 1UL << 1, //流中有数据可读
NSStreamEventHasSpaceAvailable = 1UL << 2, //缓存中有空间可写
NSStreamEventErrorOccurred = 1UL << 3, //遇到错误
NSStreamEventEndEncountered = 1UL << 4 //遇到结束符
};


我们这里先看下 ****NSStreamEventOpenCompleted****,后面介绍读写的时候还会接着介绍****NSStreamEventHasBytesAvailable********NSStreamEventHasSpaceAvailable****这两个事件。

//连接完成
case NSStreamEventOpenCompleted: {
SRFastLog(@”NSStreamEventOpenCompleted %@”, aStream);
//如果就绪状态为关闭或者正在关闭,直接返回
if (self.readyState >= SR_CLOSING) {
return;
}
assert(_readBuffer);

//如果是ws,或者无自签证书,而且是正准备连接,而且aStream是输入流
// didConnect fires after certificate verification if we're using pinned certificates.
BOOL usingPinnedCerts = [[_urlRequest SR_SSLPinnedCertificates] count] > 0;
if ((!_secure || !usingPinnedCerts) && self.readyState == SR_CONNECTING && aStream == _inputStream) {
    //进行握手连接
    [self didConnect];
}
//开始写数据
[self _pumpWriting];
//读取应答数据
[self _pumpScanner];
break;

}

之前介绍的是wss协议类型的情况,这里则是ws或者无自签名证书的情形,主要流程还是类似的。


上面介绍了从初始化,到建立连接,再到握手的整个流程,在介绍WebSocket读写数据之前,我们再从头梳理下这个阶段的关键过程:

  1. 最开始我们会对协议类型进行校验,判断是否是ws,http,wss,https这些类型的一种
  2. 设置当前状态为SR_CONNECTING
  3. 创建_workQueue,networkThread以及_delegateDispatchQueue
  4. 初始化读写缓存_readBuffer,_outputBuffer,消费者数组_consumers以及消费对象池_consumerPool。
  5. 通过CFStreamCreatePairWithSocketToHost创建读写数据流readStream,writeStream。并设置读写流代理,指定对应的Runloop,如果是wss协议,还要更新流的安全属性, 上面设置完毕后调用输入输出流的open方法打开流。
  6. 流打开后就会有数据从代理进来,对于wss类型的协议,如果证书还没找到,那么会进行证书的匹配,如果匹配成功则调用didConnect,在didConnect中发送握手消息头到服务端,进行握手通信。
  7. 通过数据消费者从缓存中读取应答头数据,这时候分别对服务端返回的头部的状态码,Sec-WebSocket-Accept,子协议进行校验,如果校验通过则将状态设置为SR_OPEN并通过代理webSocketDidOpen通知业务层websocket已经成功连接。

****3. 数据读写****

最后剩下一个问题,数据是怎样从输入流读入,怎样从输出流发出,数据消费者在整个过程中扮演的角色,workQueue,networkThread在整个过程扮演的角色。

我们来看下****NSStreamEventHasBytesAvailable****用于表示可以从缓存中读取数据了:

case NSStreamEventHasBytesAvailable: {
SRFastLog(@”NSStreamEventHasBytesAvailable %@”, aStream);
const int bufferSize = 2048;
uint8_t buffer[bufferSize];

while (_inputStream.hasBytesAvailable) {
    //从输入流中读取数据到缓存中
    NSInteger bytes_read = [_inputStream read:buffer maxLength:bufferSize];
    if (bytes_read > 0) {
        //将数据追加到读缓存中
        [_readBuffer appendBytes:buffer length:bytes_read];
    } else if (bytes_read < 0) {
        [self _failWithError:_inputStream.streamError];
    }
    //如果读取的不等于最大的,说明读完了,跳出循环
    if (bytes_read != bufferSize) {
        break;
    }
};
//开始扫描,看消费者什么时候消费数据
[self _pumpScanner];
break;

}


-(void)_pumpScanner {
[self assertOnWorkQueue];
//判断是否在扫描
if (!_isPumping) {
_isPumping = YES;
} else {
return;
}
while ([self _innerPumpScanner]) {}
_isPumping = NO;
}


  • (BOOL)_innerPumpScanner {

    BOOL didWork = NO;
    //如果当前状态为已关闭 返回NO
    if (self.readyState >= SR_CLOSED) {
    return didWork;
    }
    //如果数据消费者列表为空,返回NO
    if (!_consumers.count) {
    return didWork;
    }

    //读取的buffer长度 - 偏移量 = 可以被读取的数据帧的长度
    size_t curSize = _readBuffer.length - _readBufferOffset;
    //如果未读为空,返回NO
    if (!curSize) {
    return didWork;
    }

    //取出数据消费者队列中的第一个消费者
    SRIOConsumer *consumer = [_consumers objectAtIndex:0];
    //得到需要的字节数
    size_t bytesNeeded = consumer.bytesNeeded;
    //消费者本次找到的要读取的数据大小
    size_t foundSize = 0;

    // 确定foundSize 大小
    //consumer.consumer是指定了用于查找要读取数据的规则。
    if (consumer.consumer) {
    //把未读数据从readBuffer中赋值到tempView里,待consumer.consumer按照它的规则进行查找要读取的数据
    NSData *tempView = [NSData dataWithBytesNoCopy:(char *)_readBuffer.bytes + _readBufferOffset length:_readBuffer.length - _readBufferOffset freeWhenDone:NO];
    //得到要读取的数据大小
    foundSize = consumer.consumer(tempView);
    } else {
    //如果没有指定查找规则则按bytesNeeded来确定foundSize
    assert(consumer.bytesNeeded);
    //如果未读字节大于需要字节,直接等于需要字节
    if (curSize >= bytesNeeded) {
    foundSize = bytesNeeded;
    }
    //如果为读取当前帧
    else if (consumer.readToCurrentFrame) {
    //消费大小等于当前未读字节
    foundSize = curSize;
    }
    }

    //通过上面的foundSize得到需要读取的数据,并且释放已读的空间
    NSData *slice = nil;
    //如果读取当前帧或者foundSize大于0
    if (consumer.readToCurrentFrame || foundSize) {
    //从已读偏移到要读的字节处
    NSRange sliceRange = NSMakeRange(_readBufferOffset, foundSize);
    //得到data
    slice = [_readBuffer subdataWithRange:sliceRange];
    //增加已读偏移
    _readBufferOffset += foundSize;
    //如果读取偏移的大小大于4096,或者读取偏移大于 1/2的buffer大小
    if (_readBufferOffset > 4096 && _readBufferOffset > (_readBuffer.length >> 1)) {
    //重新创建,释放已读那部分的data空间
    _readBuffer = [[NSMutableData alloc] initWithBytes:(char *)_readBuffer.bytes + _readBufferOffset length:_readBuffer.length - _readBufferOffset]; _readBufferOffset = 0;
    }

    //如果用户未掩码的数据
    if (consumer.unmaskBytes) {
    //copy切片
    NSMutableData *mutableSlice = [slice mutableCopy];
    //得到长度字节数
    NSUInteger len = mutableSlice.length;
    uint8_t *bytes = mutableSlice.mutableBytes;

    for (NSUInteger i = 0; i < len; i++) {
    //得到一个读取掩码key,为偏移量_currentReadMaskOffset取余_currentReadMaskKey,当前掩码key,
    //再和字节异或运算(相同为0,不同为1)
    bytes[i] = bytes[i] ^ _currentReadMaskKey[_currentReadMaskOffset % sizeof(_currentReadMaskKey)];
    //偏移量+1
    _currentReadMaskOffset += 1;
    }
    //把数据改成掩码后的
    slice = mutableSlice;
    }

    //如果读取当前帧
    if (consumer.readToCurrentFrame) {
    //拼接数据
    [_currentFrameData appendData:slice];
    //+1
    _readOpCount += 1;
    //判断Opcode,如果是文本帧
    if (_currentFrameOpcode == SROpCodeTextFrame) {
    // Validate UTF8 stuff.
    //得到当前帧数据的长度
    size_t currentDataSize = _currentFrameData.length;
    //如果currentDataSize 大于0
    if (_currentFrameOpcode == SROpCodeTextFrame && currentDataSize > 0) {
    // TODO: Optimize the crap out of this. Don’t really have to copy all the data each time
    //判断需要scan的大小
    size_t scanSize = currentDataSize - _currentStringScanPosition;
    //得到要sacn的data
    NSData *scan_data = [_currentFrameData subdataWithRange:NSMakeRange(_currentStringScanPosition, scanSize)];
    //验证数据
    int32_t valid_utf8_size = validate_dispatch_data_partial_string(scan_data);

    //-1为错误,关闭连接
    if (valid_utf8_size == -1) {
    [self closeWithCode:SRStatusCodeInvalidUTF8 reason:@”Text frames must be valid UTF-8”];
    dispatch_async(_workQueue, ^{
    [self closeConnection];
    });
    return didWork;
    } else {
    //扫描的位置+上合法数据的size
    _currentStringScanPosition += valid_utf8_size;
    }
    }

    }
    //需要的size - 已操作的size
    consumer.bytesNeeded -= foundSize;
    //如果还需要的字节数 = 0,移除消费者
    if (consumer.bytesNeeded == 0) {
    [_consumers removeObjectAtIndex:0];
    //回调读取完成
    consumer.handler(self, nil);//由于读取当前帧的时候数据存放在_currentFrameData,所以这里返回为nil
    //把要返回的consumer,先放在_consumerPool缓冲池中
    [_consumerPool returnConsumer:consumer];
    //已经工作为YES
    didWork = YES;
    }
    } else if (foundSize) {
    //移除消费者
    [_consumers removeObjectAtIndex:0];
    //回调回去当前接受到的数据
    consumer.handler(self, slice);

    [_consumerPool returnConsumer:consumer];
    didWork = YES;
    }
    }
    return didWork;

}


上面是比较详细的代码注释,接下来我们抠出这段代码中关键的部分,让大家对这部分流程更加清晰:

  • (BOOL)_innerPumpScanner {

    //第一部分: 前置条件判断
    //…………….

    //取出数据消费者队列中的第一个消费者
    SRIOConsumer *consumer = [_consumers objectAtIndex:0];

    //第二部分:得到需要的字节数foundSize
    size_t bytesNeeded = consumer.bytesNeeded;
    //消费者本次找到的要读取的数据大小
    size_t foundSize = 0;
    // 确定foundSize 大小
    //consumer.consumer是指定了用于查找要读取数据的规则。
    if (consumer.consumer) {
    //指定了用于查找要读取数据的规则的情况
    } else {
    //读取当前帧或者读取部分数据的情况
    }

    //第三部分:通过上面的foundSize得到需要读取的数据,并且释放已读的空间
    //如果读取当前帧或者foundSize大于0
    if (consumer.readToCurrentFrame || foundSize) {
    //从已读偏移到要读的字节处
    NSRange sliceRange = NSMakeRange(_readBufferOffset, foundSize);
    //得到data
    slice = [_readBuffer subdataWithRange:sliceRange];
    //…………….

    //如果读取当前帧
    if (consumer.readToCurrentFrame) {
    //拼接数据
    [_currentFrameData appendData:slice];
    //…………..
    //当前帧已经读取完毕
    if (consumer.bytesNeeded == 0) {
    [_consumers removeObjectAtIndex:0];
    //回调读取完成
    consumer.handler(self, nil);//由于读取当前帧的时候数据存放在_currentFrameData,所以这里返回为nil
    //把要返回的consumer,先放在_consumerPool缓冲池中
    [_consumerPool returnConsumer:consumer];
    //已经工作为YES
    didWork = YES;
    }
    }
    //非当前帧的情况
    else if (foundSize) {
    //第四部分:将得到所需要数据的消费者从消费者列表中移除,并通过对应的block交付数据,将consumer放到对象池中。
    [_consumers removeObjectAtIndex:0];
    //回调回去当前接受到的数据
    consumer.handler(self, slice);
    [_consumerPool returnConsumer:consumer];
    didWork = YES;
    }
    }
    return didWork;

}


整个流程分成四个步骤:

第一部分:前置条件判断
第二部分:确定消费者需要的字节数foundSize
第三部分:通过上面的foundSize得到需要读取的数据,并且释放已读的空间
第四部分:将得到所需要数据的消费者从消费者列表中移除,并通过对应的block交付数据,将consumer放到对象池中。


再回到NSStreamEventHasBytesAvailable事件的处理中做个总结:

这个事件中,首先将数据从数据流中取出数据然后添加到读缓存中,然后从数据消费者队列中取出一个消费者,让它再读缓存中按需读取出它所需要的数据,并通过block交付给应用层。至于消费者是何时添加的我们放在后面集中介绍,我们接下来看NSStreamEventHasSpaceAvailable

case NSStreamEventHasSpaceAvailable: {
SRFastLog(@”NSStreamEventHasSpaceAvailable %@”, aStream);
[self _pumpWriting];
break;
}


写相对读来说相对简单,这里就不做过多介绍,大家可以直接看下面注释:

  • (void)_writeData:(NSData *)data {
    [self assertOnWorkQueue];

    if (_closeWhenFinishedWriting) {
    return;
    }
    [_outputBuffer appendData:data];
    [self _pumpWriting];

}


  • (void)_pumpWriting {
    [self assertOnWorkQueue];
    //输出缓存大小
    NSUInteger dataLength = _outputBuffer.length;
    //输出缓存和输出流都还有空间剩余
    if (dataLength - _outputBufferOffset > 0 && _outputStream.hasSpaceAvailable) {
    //通过输出缓存输出数据
    NSInteger bytesWritten = [_outputStream write:_outputBuffer.bytes + _outputBufferOffset maxLength:dataLength - _outputBufferOffset];
    //………..
    //修改缓存偏移
    _outputBufferOffset += bytesWritten;
    //……….
    }
    //如果_closeWhenFinishedWriting表示写完数据后关闭连接则走下面流程
    if (_closeWhenFinishedWriting &&
    _outputBuffer.length - _outputBufferOffset == 0 &&
    (_inputStream.streamStatus != NSStreamStatusNotOpen &&
    _inputStream.streamStatus != NSStreamStatusClosed) &&
    !_sentClose) {
    _sentClose = YES;
    @synchronized(self) {
    [_outputStream close];
    [_inputStream close];
    for (NSArray *runLoop in [_scheduledRunloops copy]) {
    [self unscheduleFromRunLoop:[runLoop objectAtIndex:0] forMode:[runLoop objectAtIndex:1]];
    }
    }
    if (!_failed) {
    //通过代理告诉应用层关闭的原因
    [self _performDelegateBlock:^{
    if ([self.delegate respondsToSelector:@selector(webSocket:didCloseWithCode:reason:wasClean:)]) {
    [self.delegate webSocket:self didCloseWithCode:_closeCode reason:_closeReason wasClean:YES];
    }
    }];
    }
    [self _scheduleCleanup];
    }
    }


    我们接下来看下数据消费者是如何管理的:

  • (void)_addConsumerWithDataLength:(size_t)dataLength
    callback:(data_callback)callback
    readToCurrentFrame:(BOOL)readToCurrentFrame
    unmaskBytes:(BOOL)unmaskBytes {
    [self assertOnWorkQueue];
    assert(dataLength);

    [_consumers addObject:[_consumerPool consumerWithScanner:nil
    handler:callback
    bytesNeeded:dataLength
    readToCurrentFrame:readToCurrentFrame
    unmaskBytes:unmaskBytes]];
    [self _pumpScanner];

}


  • (SRIOConsumer *)consumerWithScanner:(stream_scanner)scanner
    handler:(data_callback)handler
    bytesNeeded:(size_t)bytesNeeded
    readToCurrentFrame:(BOOL)readToCurrentFrame
    unmaskBytes:(BOOL)unmaskBytes {
    SRIOConsumer *consumer = nil;
    if (_bufferedConsumers.count) {
    consumer = [_bufferedConsumers lastObject];
    [_bufferedConsumers removeLastObject];
    } else {
    consumer = [[SRIOConsumer alloc] init];
    }

    [consumer setupWithScanner:scanner handler:handler bytesNeeded:bytesNeeded readToCurrentFrame:readToCurrentFrame unmaskBytes:unmaskBytes];

    return consumer;

}


  • (void)returnConsumer:(SRIOConsumer *)consumer {
    if (_bufferedConsumers.count < _poolSize) {
    [_bufferedConsumers addObject:consumer];
    }
    }


    数据消费者是通过****_addConsumerWithDataLength****添加到消费者数组_consumers中的,同时为了避免频繁创建消费者对象这里使用了对象池,默认消费者对象池中的对象为8个,每次用完都会归还到对象池,在需要的时候会从对象池中拿取最后一个返回。

    所以添加消费者对象的时候会从消费者对象池中取出一个闲置的消费者对象,然后添加到消费者队列中,然后调用_pumpScanner,从消费者列表中取出第一个消费者,让它从读缓存中读取需要的数据,并通过handler交付读到的数据。

    SRWebSocket会在如下情况下添加消费者:

  • (void)_readUntilHeaderCompleteWithCallback:(data_callback)dataHandler; // 握手阶段,发送完握手头信息后会去读取服务端返回的响应头

  • (void)_readFrameContinue;持续读帧信息的时候

    也就是每次需要数据的时候都会往消费者队列中添加一个消费者,等到有数据之后会去除这些消费者读取数据,并通过block交付。

    ****读取接收到的连续帧****

    介绍了缓存的读写,读消费者的管理,以及帧格式的介绍后下面我们来看下怎样读取帧和发送帧,首先看下读取连续帧:

  • (void)_readFrameContinue {

    assert((_currentFrameCount == 0 && _currentFrameOpcode == 0) || (_currentFrameCount > 0 && _currentFrameOpcode > 0));

    [self _addConsumerWithDataLength:2 callback:^(SRWebSocket *self, NSData *data) {

      __block frame_header header = {0};
      
      const uint8_t *headerBuffer = data.bytes;
      assert(data.length >= 2);
      
      //读取RSV,默认必须是0
      if (headerBuffer[0] & SRRsvMask) {
          [self _closeWithProtocolError:@"Server used RSV bits"];
          return;
      }
      //获取操作码
      uint8_t receivedOpcode = (SROpCodeMask & headerBuffer[0]);
      //当前帧是否是控制帧
      BOOL isControlFrame = (receivedOpcode == SROpCodePing || receivedOpcode == SROpCodePong || receivedOpcode == SROpCodeConnectionClose);
      
      //中间帧的操作码必须为0
      if (!isControlFrame && receivedOpcode != 0 && self->_currentFrameCount > 0) {
          [self _closeWithProtocolError:@"all data frames after the initial data frame must have opcode 0"];
          return;
      }
      if (receivedOpcode == 0 && self->_currentFrameCount == 0) {
          [self _closeWithProtocolError:@"cannot continue a message"];
          return;
      }
      //操作码赋给header
      header.opcode = receivedOpcode == 0 ? self->_currentFrameOpcode : receivedOpcode;
      //fin结束码赋给header
      header.fin = !!(SRFinMask & headerBuffer[0]);
      //是否mask字段
      header.masked = !!(SRMaskMask & headerBuffer[1]);
      //内容长度
      header.payload_length = SRPayloadLenMask & headerBuffer[1];
      headerBuffer = NULL;
      //客户端收到的数据必须是unmasked数据
      if (header.masked) {
          [self _closeWithProtocolError:@"Client must receive unmasked data"];
      }
      //额外内容
      size_t extra_bytes_needed = header.masked ? sizeof(_currentReadMaskKey) : 0;
      //.........
      if (extra_bytes_needed == 0) {
          [self _handleFrameHeader:header curData:self->_currentFrameData];
      } else {
          //..........
      }
    

    } readToCurrentFrame:NO unmaskBytes:NO];

}


  • (void)_handleFrameHeader:(frame_header)frame_header curData:(NSData *)curData {

    //当前状态检查
    if (self.readyState == SR_CLOSED) {
    return;
    }

    //是否是控制帧
    BOOL isControlFrame = (frame_header.opcode == SROpCodePing || frame_header.opcode == SROpCodePong || frame_header.opcode == SROpCodeConnectionClose);

    //控制帧不能分片
    if (isControlFrame && !frame_header.fin) {
    [self _closeWithProtocolError:@”Fragmented control frames not allowed”];
    return;
    }

    //控制帧大小不能大于126
    if (isControlFrame && frame_header.payload_length >= 126) {
    [self _closeWithProtocolError:@”Control frames cannot have payloads larger than 126 bytes”];
    return;
    }

    if (!isControlFrame) {
    _currentFrameOpcode = frame_header.opcode;
    _currentFrameCount += 1;
    }

    if (frame_header.payload_length == 0) {
    //是控制帧的情况
    if (isControlFrame) {
    [self _handleFrameWithData:curData opCode:frame_header.opcode];
    } else {
    if (frame_header.fin) {
    //数据帧读取完毕
    [self _handleFrameWithData:_currentFrameData opCode:frame_header.opcode];
    } else {
    //数据帧还未读取完毕
    [self _readFrameContinue];
    }
    }
    } else {
    //有负载数据
    assert(frame_header.payload_length <= SIZE_T_MAX);
    [self _addConsumerWithDataLength:(size_t)frame_header.payload_length callback:^(SRWebSocket *self, NSData *newData) {
    //是控制帧的情况
    if (isControlFrame) {
    [self _handleFrameWithData:newData opCode:frame_header.opcode];
    } else {
    if (frame_header.fin) {
    //数据帧读取完毕
    [self _handleFrameWithData:self->_currentFrameData opCode:frame_header.opcode];
    } else {
    //数据帧还未读取完毕
    [self _readFrameContinue];
    }

    }
    } readToCurrentFrame:!isControlFrame unmaskBytes:frame_header.masked];
    }

}


  • (void)_handleFrameWithData:(NSData *)frameData opCode:(NSInteger)opcode {
    // Check that the current data is valid UTF8

    BOOL isControlFrame = (opcode == SROpCodePing || opcode == SROpCodePong || opcode == SROpCodeConnectionClose);
    //………….
    switch (opcode) {
    //文本流
    case SROpCodeTextFrame: {
    //如果当前类型为文本类型,是否转为string
    if ([self.delegate respondsToSelector:@selector(webSocketShouldConvertTextFrameToString:)] && ![self.delegate webSocketShouldConvertTextFrameToString:self]) {
    //不转为string
    [self _handleMessage:[frameData copy]];
    } else {
    //转为string
    NSString *str = [[NSString alloc] initWithData:frameData encoding:NSUTF8StringEncoding];
    //有数据但是转换后string为空,表明转换失败
    if (str == nil && frameData) {
    [self closeWithCode:SRStatusCodeInvalidUTF8 reason:@”Text frames must be valid UTF-8”];
    dispatch_async(_workQueue, ^{
    [self closeConnection];
    });
    return;
    }
    [self _handleMessage:str];
    }
    break;
    }
    case SROpCodeBinaryFrame:
    //二进制流
    [self _handleMessage:[frameData copy]];
    break;
    //关闭消息
    case SROpCodeConnectionClose:
    [self handleCloseWithData:[frameData copy]];
    break;
    // ping
    case SROpCodePing:
    [self handlePing:[frameData copy]];
    break;
    // pong
    case SROpCodePong:
    [self handlePong:[frameData copy]];
    break;
    default:
    [self _closeWithProtocolError:[NSString stringWithFormat:@”Unknown opcode %ld”, (long)opcode]];
    // TODO: Handle invalid opcode
    break;
    }

}



  • (void)_handleMessage:(id)message {
    SRFastLog(@”Received message”);
    [self _performDelegateBlock:^{
    [self.delegate webSocket:self didReceiveMessage:message];
    }];
    }

    上面将所有读连续帧的代码都贴出来了,并做了对应的注释,下面我们来做个总结,****_readFrameContinue****会先读取帧的前两个字节取出fin,rsv,opcode,payload len,并对这些字段进行校验。****_handleFrameHeader****继续读取payload数据,再将数据以及opcode送到****_handleFrameWithData**** 通过 ****_handleMessage**** 将消息送到业务层。从这里可以看出虽然WebSocket会将数据进行分包,但是SRWebSocket会再底层将这些分包进行拼接后将一个完整的数据交付给业务层。


    ****发送帧数据****

  • (void)send:(id)data {
    data = [data copy];
    dispatch_async(_workQueue, ^{
    if ([data isKindOfClass:[NSString class]]) {
    [self _sendFrameWithOpcode:SROpCodeTextFrame data:[(NSString *)data dataUsingEncoding:NSUTF8StringEncoding]];
    } else if ([data isKindOfClass:[NSData class]]) {
    [self _sendFrameWithOpcode:SROpCodeBinaryFrame data:data];
    } else if (data == nil) {
    [self _sendFrameWithOpcode:SROpCodeTextFrame data:data];
    } else {
    assert(NO);
    }
    });
    }

  • (void)_sendFrameWithOpcode:(SROpCode)opcode data:(id)data {

    //….

    size_t payloadLength = [data isKindOfClass:[NSString class]] ? [(NSString *)data lengthOfBytesUsingEncoding:NSUTF8StringEncoding] : [data length];

    NSMutableData *frame = [[NSMutableData alloc] initWithLength:payloadLength + SRFrameHeaderOverhead];

    //…..

    uint8_t *frame_buffer = (uint8_t *)[frame mutableBytes];

    // 设置fin
    frame_buffer[0] = SRFinMask | opcode;

    BOOL useMask = YES;

#ifdef NOMASK
useMask = NO;
#endif

// 客户端发送给服务端的必须使用mask
if (useMask) {
// set the mask and header
    frame_buffer[1] |= SRMaskMask;
}

size_t frame_buffer_size = 2;

//填充数据
const uint8_t *unmasked_payload = NULL;
if ([data isKindOfClass:[NSData class]]) {
    unmasked_payload = (uint8_t *)[data bytes];
} else if ([data isKindOfClass:[NSString class]]) {
    unmasked_payload =  (const uint8_t *)[data UTF8String];
} else {
    return;
}

//设置payload字段
if (payloadLength < 126) {
    frame_buffer[1] |= payloadLength;
} else if (payloadLength <= UINT16_MAX) {
    frame_buffer[1] |= 126;
    *((uint16_t *)(frame_buffer + frame_buffer_size)) = EndianU16_BtoN((uint16_t)payloadLength);
    frame_buffer_size += sizeof(uint16_t);
} else {
    frame_buffer[1] |= 127;
    *((uint64_t *)(frame_buffer + frame_buffer_size)) = EndianU64_BtoN((uint64_t)payloadLength);
    frame_buffer_size += sizeof(uint64_t);
}
    
if (!useMask) {
    for (size_t i = 0; i < payloadLength; i++) {
        frame_buffer[frame_buffer_size] = unmasked_payload[i];
        frame_buffer_size += 1;
    }
} else {
    //对数据进行mark 处理
    uint8_t *mask_key = frame_buffer + frame_buffer_size;
    SecRandomCopyBytes(kSecRandomDefault, sizeof(uint32_t), (uint8_t *)mask_key);
    frame_buffer_size += sizeof(uint32_t);
    
    // TODO: could probably optimize this with SIMD
    for (size_t i = 0; i < payloadLength; i++) {
        frame_buffer[frame_buffer_size] = unmasked_payload[i] ^ mask_key[i % sizeof(uint32_t)];
        frame_buffer_size += 1;
    }
}

assert(frame_buffer_size <= [frame length]);
frame.length = frame_buffer_size;

//调用_writeData 写到流当中
[self _writeData:frame];

}

```

发送帧流程比较简单,就是构造帧数据,然后将数据塞入帧中,往输出流写。

之前再规划这篇博客的时候本来只想简单介绍下SRWebSocket的但是写着写着,发现它还是满有意思的,特别是数据消费者部分的设计提起了兴趣,所以还是硬着头皮看将整个代码过了一遍,下面是一些比较好的文章,可以供大家深入阅读,个人觉得只要弄清楚了WebSocket理论基础后看这部分代码会显得十分轻松,希望这篇博客能够帮助到大家。好了今天就到这里。

较好的文章推荐

Contents
  1. 1. 开源库信息
  2. 2. 为什么需要WebSocket
  3. 3. WebSocket 协议简要介绍
  4. 4. SRWebSocket 源码解析
  5. 5. 较好的文章推荐