开源信息

coobjc 是由手淘架构团队推出的能在 iOS 上使用的协程开发框架,所以大家在了解coobjc之前必须先了解什么是协程,说到协程必定让人想起和它相关的两个概念进程线程,
这里不会对这两个概念进行长篇大论,概括地讲对操作系统来说,线程是最小的执行单元,进程是最小的资源管理单元。进程负责开拓各个资源,线程共享这些资源,每个线程有单独的堆栈空间。并且无论进程还是线程,都是由操作系统所管理的。
而协程是用户态的微线程,协程不是被操作系统内核所管理,而完全是由程序所控制。

它的好处有哪些呢?

首先它可以以同步的方式写异步逻辑,可以避免“回调地域”现象,在性能方面调度性能更快,协程本身不需要进行内核级线程的切换,调度性能快,即使创建上万个协程也毫无压力。协程的使用以帮助开发减少锁、信号量的滥用。
对于经常进行高并发处理的服务端协程是很实用的。但是这不意味着移动端就不需要协程了。就iOS开发而言Objective C中基于Block 的异步编程回调也会照成比较常见的“回调地域”。如果具备同步方式编写异步代码可以极大地改善我们代码的逻辑结构,并且由于协程的引入也可以减少因为锁的使用带来的性能损耗。

整体架构

下面是coobjc的整体架构图:

底层是协程的核心,包括堆栈切换管理,协程调度,协程之间channel通信实现
中间层是协程的封装,提供了 async/await, generator 和 Actor的支持
顶层是对系统库的一个扩展,是对FoundationUIKit IO及其他耗时操作的封装

coobjec中每个协程都是可以暂停和恢复的,并且每个协程都分配一个单独的内存区域用于存储它的调用栈,它有四个状态分别是:READYRUNNINGSUSPENDDEAD,并且在运行过程中可以多次在RUNNINGSUSPEND状态之间进行切换。
下面是协程进行yield和resume操作的示意图:

coobjc内部使用一个调度器来负责用户所有协程的调度,它实际上是通过一个协程队列来进行管理,调度器会不断从队列中取出协程去执行。一旦队列中没有协程可以执行的时候,会切换到线程执行。所以当我们需要执行某个协程的时候,我们只需要将协程添加到某个线程的调度器队列中就可以了。
调度器负责执行它。

话不多说我们来看下coobjc的具体用法:

coobjc 的使用

1 创建协程

coobjc可以在当前线程或者指定的queue创建协程,在哪里创建就会在哪个线程进行调度,当然还可以在指定的队列创建和运行协程。

co_launch(^{
//默认在当前线程进行调度
});

co_launch_onqueue(q, ^{
//在指定的队列中运行的协程
});

2 取消协程

在coobjc中协程是可取消的,ObjC不建议使用异常,所以coobjc中不使用异常来取消协程。要取消协程可以调用CCOCoroutine的cancel方法。通过co_isCancelled宏来查询当前协程是否被取消了。

CCOCoroutine *co = co_launch(^{
val++;
co_delay(1.0);
if(co_isCancelled()){
//Do some cleaning operations
return;
}
val++;
});

[co cancel];

在调用cancel后,协程的内部代码将不再继续执行,协程的内存将会被释放。

3. COPromise用法

await主要用于避免回调地狱现象,有时候我们会遇到请求两个不相关的数据接口后,将最终的数据组合后作为最终数据返回。如果不借助await的话,只能分别串行请求后最后将数据组合,这样会显得很低效。借助协程可以同时并行执行然后将数据合并。

  1. 定义一个COPromise,在COPromise中通过fulfill 返回成功的结果,通过reject 返回失败的结果。返回的错误信息可以通过co_getError来获取。

fulfill

- (COPromise *)testPromiseFullfill {
COPromise *promise = [COPromise promise];
dispatch_async(dispatch_get_main_queue(), ^{
[promise fulfill:@"Hello coobjc!"];
});
return promise;
}

co_launch(^{
id result = await([self testPromiseFullfill]);
NSLog(@"result = %@",result);
});

reject

- (COPromise *)testPromiseReject {
COPromise *promise = [COPromise promise];
dispatch_async(dispatch_get_main_queue(), ^{
[promise reject:[NSError errorWithDomain:@"COPromise" code:-100 userInfo:nil]];
});
return promise;
}

co_launch(^{
id result = await([self testPromiseReject]);
NSError *error = co_getError();
if(error) {
NSLog(@"Error = %@",error);
} else {
NSLog(@"result = %@",result);
}
});

fullfill && reject

- (COPromise *)testPromiseRejectFullfile {
return [COPromise promise:^(COPromiseFulfill _Nonnull fullfill, COPromiseReject _Nonnull reject) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
fullfill(@"Say Hello To Coobjc");
});
}];
}

co_launch(^{
id res = await([self testPromiseRejectFullfile]);
NSLog(@"result = %@",res);
});

cancel

- (COPromise *)testPromiseCancel {
COPromise *promise = [COPromise promise];
dispatch_async(dispatch_get_main_queue(), ^{
[promise cancel];
});
return promise;
}

co_launch(^{
COPromise *promise = [self testPromiseCancel];
[promise onCancel:^(COPromise * _Nonnull promise) {
NSLog(@"isCancel");
}];

[promise catch:^(NSError * _Nonnull error) {
NSLog(@"NSError");
}];
id res = await(promise);
if(!res) {
NSError *error = co_getError();
BOOL isPromiseCancel = [COPromise isPromiseCancelled:error];
NSLog(@"isPromiseCancel %ld",isPromiseCancel);
return;
}
NSLog(@"result = %@",res);
});

then

co_launch(^{
COPromise *promise = [self testPromiseFullfill];
COPromise *thenPromise = [promise then:^id _Nullable(id _Nullable value) {
return [self testPromiseThen];
}];

id res = await(thenPromise);
if(!res) {
NSError *error = co_getError();
BOOL isPromiseCancel = [COPromise isPromiseCancelled:error];
return;
}
NSLog(@"result = %@",res);
});

批量等待

co_launch(^{
NSArray *batchFullfile =
batch_await(@[
[self testPromiseFullfill01],
[self testPromiseFullfill02],
[self testPromiseFullfill03],
]);

if(batchFullfile && batchFullfile.count) {
NSLog(@"%@",batchFullfile);
}
});

我们来看下COPromise的一些关键属性和方法:

关键属性:

/**
Tell the promise is pending or not.
*/
@property(nonatomic, readonly) BOOL isPending;

/**
Tell the promise is fulfilled or not.
*/
@property(nonatomic, readonly) BOOL isFulfilled;

/**
Tell the promise is rejected or not.
*/
@property(nonatomic, readonly) BOOL isRejected;

/**
If fulfilled, value store into this property.
*/
@property(nonatomic, readonly, nullable) Value value;

/**
If reject, error store into this property
*/
@property(nonatomic, readonly, nullable) NSError *error;

关键构造方法:

/**
Create a promise without constructor. Which means, you should control when the job begins.

@return The `COPromise` instance
*/
+ (instancetype)promise;
/**
Create a promise with constructor. the job begans when someone observing on it.

@param constructor the constructor block.
@return The `COPromise` instance
*/
+ (instancetype)promise:(COPromiseConstructor)constructor;
/**
Create a promise with constructor. the job begans when someone observing on it.

@param constructor the constructor block.
@param queue the dispatch_queue_t that the job run.
@return The `COPromise` instance
*/
+ (instancetype)promise:(COPromiseConstructor)constructor onQueue:(dispatch_queue_t _Nullable )queue;

关键方法:

/**
Fulfill the promise with a return value.

@param value the value fulfilled.
*/
- (void)fulfill:(nullable Value)value;

/**
Reject the promise with a error

@param error the error.
*/
- (void)reject:(NSError * _Nullable)error;

/**
Cancel the job.

@discussion If you want a `COPromise` be cancellable, you must make the job cancel in `onCancel:`.
*/
- (void)cancel;
/**
Set the onCancelBlock.

@param onCancelBlock will execute on the promise cancelled.
*/
- (void)onCancel:(COPromiseOnCancelBlock _Nullable )onCancelBlock;

/**
Chained observe the promise fulfilled.

@param work the observer worker.
@return The chained promise instance.
*/
- (COPromise *)then:(COPromiseThenWorkBlock)work;

/**
Observe the promises rejected.

@param reject the reject dealing worker.
@return The chained promise instance.
*/
- (COPromise *)catch:(COPromiseCatchWorkBlock)reject;

/**
Tell if the error is promise cancelled error

@param error the error object
@return is cancellled error.
*/
+ (BOOL)isPromiseCancelled:(NSError *)error;

4.COProgressPromise用法

static COProgressPromise* progressDownloadFileFromUrl(NSString *url){
COProgressPromise *promise = [COProgressPromise promise];
[NSURLSession sharedSession].configuration.requestCachePolicy = NSURLRequestReloadIgnoringCacheData;
NSURLSessionDataTask *task = [[NSURLSession sharedSession] dataTaskWithURL:[NSURL URLWithString:url] completionHandler:^(NSData * _Nullable data, NSURLResponse * _Nullable response, NSError * _Nullable error) {
if (error) {
[promise reject:error];
}
else{
[promise fulfill:data];
}
}];
[task resume];
if (@available(iOS 11.0, *)) {
[promise setupWithProgress:task.progress];
} else {
// Fallback on earlier versions
NSProgress *progress = [NSProgress progressWithTotalUnitCount:10];
[promise setupWithProgress:progress];
dispatch_source_t timer = nil;
dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue);
dispatch_source_set_timer(timer, dispatch_walltime(NULL, 0), 0.1 * NSEC_PER_SEC, 0 * NSEC_PER_SEC);
dispatch_source_set_event_handler(timer, ^{
if (progress.completedUnitCount < progress.totalUnitCount) {
progress.completedUnitCount += 1;
}
else{
dispatch_source_cancel(timer);
}
});
dispatch_resume(timer);
[promise onCancel:^(COPromise * _Nonnull promise) {
dispatch_source_cancel(timer);
}];
}
return promise;
}

co_launch(^{
int progressCount = 0;
COProgressPromise *promise = progressDownloadFileFromUrl(@"http://img17.3lian.com/d/file/201701/17/9a0d018ba683b9cbdcc5a7267b90891c.jpg");
for(id p in promise){
double v = [p doubleValue];
NSLog(@"current progress: %f", (float)v);
progressCount++;
}
NSData *data = await(promise);
});

上述的这些co_launch都可以在任何子线程中运行。

5. COChan用法

COChan 主要用于协程之间进行数据传输,它有阻塞和非阻塞两种方式收发数据,针对channel,可以有无缓存类型,有缓存类型,以及无限缓存类型,下面会进行详细介绍。

和Promise类似的用法:

- (COChan<id> *)co_fetchSomething {
COChan *chan = [COChan chan];
dispatch_async(_someQueue, ^{
// fetch result operations
...
[chan send_nonblock:result];
});
return chan;
}

// calling in a coroutine.
co_launch(^{
id ret = await([self co_fetchSomething]);
});

作为生产者消费者的用法:

COChan *chan = [COChan chanWithBuffCount:10];
co_launch(^{
for (int i = 0; i < 10; i++) {
[chan send_nonblock:@(i)];
}
});

co_launch(^{
//NSArray *data = [chan receiveAll];
//NSLog(@"%@",data);

NSArray *dataWithCount = [chan receiveWithCount:3];
NSLog(@"%@",dataWithCount);
});

当COChan容量为0的时候,发送数据是阻塞的,只有在有人接收数据后才会执行send之后的代码

COChan *chan = [COChan chanWithBuffCount:0];
__block NSInteger step = 0;
co_launch(^{
step = 1;//@1
[chan send:@111]; //代码会停在这里直到receive_nonblock消费数据后才继续往下执行 //@2
step = 2;//@6
});

co_launch(^{
step = 3;//@3
id value = [chan receive_nonblock];//@4
step = 4;//@5
});

上面数字代表的是代码的执行顺序。

当将COChan容量改为非0的时候执行顺序如下:

COChan *chan = [COChan chanWithBuffCount:2];
__block NSInteger step = 0;
co_launch(^{
step = 1;//@1
[chan send:@111]; //代码不会阻塞 //@2
step = 2;//@3
});

co_launch(^{
step = 3;//@4
id value = [chan receive_nonblock];//@5
step = 4;//@6
});

如果COChan中没有数据的时候调用receive将会阻塞:

__block NSInteger step = 0;
COChan *chan = [COChan chan];
co_launch(^{
step = 1;//@1
id value = [chan receive];//@2
step = 2;//@6
});

co_launch(^{
step = 3;//@3
[chan send:@111];//@4
step = 4;//@5
});

也就是send方法会往COChan中通道缓存中添加消息,如果满的话则会阻塞,直到数据被receive之后,才会继续执行。

receive会不断从通道缓存中获取数据,如果数据被取完后就阻塞,直到有数据send到缓存的时候,才会继续执行。这是很典型的生产者消费者模型。
send_nonblock会完缓存通道送数据后立刻返回,如果缓存满了就会丢弃数据,不会阻塞。看下coobjc给出的两个测试用例:

it(@"send non blocking will not block the coroutine.", ^{
__block NSInteger step = 0;
COChan *chan = [COChan chanWithBuffCount:1];
co_launch(^{
step = 1;
[chan send_nonblock:@111];
expect(step).to.equal(1);
step = 2;
});

co_launch(^{
expect(step).to.equal(2);
step = 3;
id value = [chan receive_nonblock];
expect(step).to.equal(3);
expect(value).to.equal(@111);
});
waitUntil(^(DoneCallback done) {
dispatch_async(dispatch_get_main_queue(), ^{
expect(step).to.equal(3);
done();
});
});
});

it(@"Channel buff is full, send non blocking will abandon the value.", ^{
__block NSInteger step = 0;

COChan *chan = [COChan chan];
co_launch(^{
step = 1;
[chan send_nonblock:@111];
expect(step).to.equal(1);
step = 2;
});
co_launch(^{
expect(step).to.equal(2);
step = 3;
id value = [chan receive_nonblock];
expect(step).to.equal(3);
expect(value).to.equal(nil);
});
waitUntil(^(DoneCallback done) {
dispatch_async(dispatch_get_main_queue(), ^{
expect(step).to.equal(3);
done();
});
});
});

我们看下接收被阻塞的例子:

it(@"receive can block muti coroutine.", ^{
__block NSInteger step = 0;

COChan *chan = [COChan chanWithBuffCount:1];

co_launch(^{
step = 1;
id value = [chan receive];
expect(step).to.equal(7);
expect(value).to.equal(@111);
step = 2;
});

co_launch(^{
expect(step).to.equal(1);
step = 3;
id value = [chan receive];
expect(step).to.equal(2);
expect(value).to.equal(@222);
step = 4;
});

co_launch(^{
expect(step).to.equal(3);
step = 5;
[chan send:@111];
expect(step).to.equal(5);

step = 6;
[chan send:@222];
expect(step).to.equal(6);
step = 7;
});
waitUntil(^(DoneCallback done) {
dispatch_async(dispatch_get_main_queue(), ^{
expect(step).to.equal(4);
done();
});
});
});

上面的例子中缓存通道容量都是指定的,还可以使用可扩展的通道expandableChan:

it(@"expandableChan will not abandon values.", ^{
__block NSInteger receiveCount = 0;
__block NSInteger receiveValue = 0;
__block NSInteger sendCount = 0;
COChan *chan = [COChan expandableChan];
for (int i = 0; i < 2000; i++) {
co_launch(^{
[chan send:@(i)];
sendCount++;
});
}

for (int i = 0; i < 2000; i++) {
co_launch(^{
id value = [chan receive];
receiveCount++;
receiveValue+=[value integerValue];
});
}
waitUntilTimeout(100, ^(DoneCallback done) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(10 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
expect(receiveCount).to.equal(2000);
expect(receiveValue).to.equal(1999000);
expect(sendCount).to.equal(2000);
done();
});
});
});

在进入下一个主题之前我们先回顾下OCChannel接口:

构造方法:

/**
容量为0的消息通道
*/
+ (instancetype)chan;

/**
指定容量的消息通道
*/
+ (instancetype)chanWithBuffCount:(int32_t)buffCount;

/**
创建一个可扩展的通道,通道的缓存大小是可以扩展的,这样send就不会阻塞当前进程,并且发送的数据将不会丢失
*/
+ (instancetype)expandableChan;

往通道发送数据:

/**
发送一个数据到通道上,这个方法会在缓存已满,并且没有接收者的情况下阻塞当前协程
*/
- (void)send:(Value _Nullable )val;

/**
和上面一致,只不过会在协程取消的时候调用cancelBlock
*/
- (void)send:(Value _Nullable )val onCancel:(COChanOnCancelBlock _Nullable)cancelBlock;

/**
非阻塞式得向通道发送数据:
如果有接收者则发送
如果没有接收者,但是缓存还没满则存在缓存中
如果没有接收者,并且缓存已经满了,则丢弃数据
*/
- (void)send_nonblock:(Value _Nullable )val;

接收数据

/**
阻塞式从通道中获取数据,这个方法会一直阻塞,直到有人往通道上发送数据为止
*/
- (Value _Nullable )receive;

/**
和上面一致,只不过会在协程取消的时候调用cancelBlock
*/
- (Value _Nullable )receiveWithOnCancel:(COChanOnCancelBlock _Nullable)cancelBlock;

/**
非阻塞式接收数据
如果缓存中有数据则从缓存中直接取数据后返回
如果缓存是空的,但是刚好有人发送了数据,则直接接收
如果缓存是空的,但是没人发送数据,这时候返回nil
*/
- (Value _Nullable)receive_nonblock;

/**
阻塞接收通道的所有数据
1. 如果通道没有数据,那么会阻塞等待直到有一个数据为止
2. 如果通道内有数据则返回所有的值
3. 如果发送了nil,那么接送到数据将会是 [NSNull null],因此需要注意检测返回数组中的类型
*/
- (NSArray<Value> * _Nonnull)receiveAll;

/**
阻塞式接收指定数量的数据,如果通道缓存中没有足够的数据那么就会一直阻塞。
如果发送了nil,那么接送到数据将会是 [NSNull null],因此需要注意检测返回数组中的类型
*/
- (NSArray<Value> * _Nonnull)receiveWithCount:(NSUInteger)count;

6. COActor用法

Actor 的概念来自于 Erlang ,在 AKKA 中,可以认为一个 Actor 就是一个容器,用以存储状态、行为、Mailbox 以及子 Actor 与 Supervisor 策略。Actor 之间并不直接通信,而是通过 Mail 来互通有无

* Mailbox: 用于存储消息的队列
* Isolated State: actor的状态以及内部变量等。
* message: 消息,类似于方法调用
Actor模型有两个特点:
1. 在单个线程中的每个Actor顺序处理发送给它的消息
2. 不同的Actors同时并行运行

Actor的创建:

co_actor(^(COActorChan *chan) {
val = 1;
XCTAssert(chan != nil);
});

co_actor_onqueue(get_test_queue(), ^(COActorChan *chan) {
val1 = 1;
XCTAssert(chan != nil);
});
it(@"next in chan", ^{
__block int val = 0;
COActor* actor = co_actor(^(COActorChan *chan) {
int tmpVal = 0;
COActorMessage *message = nil;
while((message = [chan next])){
if([message intType] == 1){
tmpVal++;
}
else if([message intType] == -1){
tmpVal--;
}
else if([message intType] == 2){
//返回数据
message.complete(@(tmpVal));
}
}
});
co_launch(^{
[actor sendMessage:@(1)];
[actor sendMessage:@(1)];
[actor sendMessage:@(1)];
[actor sendMessage:@(1)];
[actor sendMessage:@(1)];
[actor sendMessage:@(1)];

[actor sendMessage:@(-1)];

COActorCompletable *completable = [actor sendMessage:@(2)];
id result = await(completable);
val = [result intValue];
[actor cancel];
});

waitUntil(^(DoneCallback done) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(0.5 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
XCTAssert(val == 5);
done();
});
});
});


it(@"error example", ^{
COActor *actor = co_actor_onqueue(get_test_queue(), ^(COActorChan *channel) {
for(COActorMessage *message in channel){
message.complete(await(test_promise()));
}
});
co_launch(^{
id value = await([actor sendMessage:@"test"]);
NSError *error = co_getError();
XCTAssert(error.code == 100);
});

waitUntilTimeout(3, ^(DoneCallback done) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(3 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
done();
});
});

});

7. COTuple用法

COPromise<COTuple*>*
cotest_loadContentFromFile(NSString *filePath){
return [COPromise promise:^(COPromiseFullfill _Nonnull resolve, COPromiseReject _Nonnull reject) {
if ([[NSFileManager defaultManager] fileExistsAtPath:filePath]) {
NSData *data = [[NSData alloc] initWithContentsOfFile:filePath];
resolve(co_tuple(filePath, data, nil));
}
else{
NSError *error = [NSError errorWithDomain:@"fileNotFound" code:-1 userInfo:nil];
resolve(co_tuple(filePath, nil, error));
}
}];
}

co_launch(^{
NSString *tmpFilePath = nil;
NSData *data = nil;
NSError *error = nil;
co_unpack(&tmpFilePath, &data, &error) = await(cotest_loadContentFromFile(filePath));
XCTAssert([tmpFilePath isEqualToString:filePath], @"file path is wrong");
XCTAssert(data.length > 0, @"data is wrong");
XCTAssert(error == nil, @"error is wrong");
});

8. 协程内延迟

co_delay 可以暂停所在的协程但是协程所在的线程不会停止运行,线程内部的其他协程也不会暂停,co_delay只能在协程内部运行,如果在外部执行的话将会抛出异常。

co_launch(^{
NSTimeInterval begin = [[NSDate date] timeIntervalSince1970];
co_delay(3);
realDuration = [[NSDate date] timeIntervalSince1970] - begin;
});

9. cokit简介

cokit 为常用的一些原生方法提供了一些常用的分类供我们使用,主要涉及到文件网络的IO

  • NSDictionary+Coroutine.h:主要用于将NSDictionary从文件中读取和写入文件。
  • NSArray+Coroutine.h:主要用于读取NSPropertyList中的数组。
  • NSString+Coroutine.h: 用于字符串从文件中读取和写入文件。
  • NSData+Coroutine.h:主要用于将NSData数据从文件中读取和写入文件。
  • NSFileManager+Coroutine.hw: 用于文件的操作
  • NSJSONSerialization+Coroutine.hw: 用于JSON的序列化操作
  • NSKeyedArchiver+Coroutine.h:用于归档类操作
  • NSURLConnection+Coroutine.h,NSURLSession+Coroutine.h:用于网络请求操作
  • NSUserDefaults+Coroutine.h:用于NSUserDefaults存储操作
  • UIImage+Coroutine.h:用于获取图片操作
Contents
  1. 1. 开源信息
  2. 2. 整体架构
  3. 3. coobjc 的使用