coobjc 是由手淘架构团队推出的能在 iOS 上使用的协程开发框架,所以大家在了解coobjc之前必须先了解什么是协程,说到协程必定让人想起和它相关的两个概念进程 和线程 , 这里不会对这两个概念进行长篇大论,概括地讲对操作系统来说,线程是最小的执行单元,进程是最小的资源管理单元。进程负责开拓各个资源,线程共享这些资源,每个线程有单独的堆栈空间。并且无论进程还是线程,都是由操作系统所管理的。 而协程是用户态的微线程,协程不是被操作系统内核所管理,而完全是由程序所控制。
首先它可以以同步的方式写异步逻辑,可以避免“回调地域”现象,在性能方面调度性能更快,协程本身不需要进行内核级线程的切换,调度性能快,即使创建上万个协程也毫无压力。协程的使用以帮助开发减少锁、信号量的滥用。 对于经常进行高并发处理的服务端协程是很实用的。但是这不意味着移动端就不需要协程了。就iOS开发而言Objective C中基于Block 的异步编程回调也会照成比较常见的“回调地域”。如果具备同步方式编写异步代码可以极大地改善我们代码的逻辑结构,并且由于协程的引入也可以减少因为锁的使用带来的性能损耗。
整体架构 下面是coobjc的整体架构图:
底层是协程的核心,包括堆栈切换管理,协程调度,协程之间channel通信实现。 中间层是协程的封装,提供了 async / await , generator 和 Actor的支持 。 顶层是对系统库的一个扩展,是对Foundation 和 UIKit IO及其他耗时操作的封装
coobjec中每个协程都是可以暂停和恢复的,并且每个协程都分配一个单独的内存区域用于存储它的调用栈,它有四个状态分别是:READY ,RUNNING ,SUSPEND ,DEAD ,并且在运行过程中可以多次在RUNNING ,SUSPEND 状态之间进行切换。 下面是协程进行yield和resume操作的示意图:
coobjc内部使用一个调度器来负责用户所有协程的调度,它实际上是通过一个协程队列来进行管理,调度器会不断从队列中取出协程去执行。一旦队列中没有协程可以执行的时候,会切换到线程执行。所以当我们需要执行某个协程的时候,我们只需要将协程添加到某个线程的调度器队列中就可以了。 调度器负责执行它。
coobjc 的使用 1 创建协程
co_launch(^{ // 默认在当前线程进行调度 }); co_launch_onqueue(q, ^{ // 在指定的队列中运行的协程 });
2 取消协程
CCOCoroutine *co = co_launch(^{ val ++; co_delay(1.0 ); if (co_isCancelled()){ return ; } val ++; }); [co cancel];
3. COPromise用法
定义一个COPromise,在COPromise中通过fulfill 返回成功的结果,通过reject 返回失败的结果。返回的错误信息可以通过co_getError来获取。
- (COPromise *)testPromiseFullfill { COPromise *promise = [COPromise promise] dispatch_async(dispatch_get_main_queue (), ^{ [promise fulfill:@"Hello coobjc!" ] }) return promise } co_launch(^{ id result = await([self testPromiseFullfill]) NSLog(@"result = %@" ,result) })
- (COPromise *)testPromiseReject { COPromise *promise = [COPromise promise]; dispatch_async (dispatch_get_main_queue(), ^{ [promise reject:[NSError errorWithDomain:@"COPromise" code:-100 userInfo:nil ]]; }); return promise; } co_launch(^{ id result = await([self testPromiseReject]); NSError *error = co_getError(); if (error) { NSLog (@"Error = %@" ,error); } else { NSLog (@"result = %@" ,result); } });
fullfill && reject
- (COPromise *) testPromiseRejectFullfile { return [COPromise promise:^(COPromiseFulfill _Nonnull fullfill, COPromiseReject _Nonnull reject) { dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC) ), dispatch_get_main_queue() , ^{ fullfill(@"Say Hello To Coobjc" ) ; }); }]; } co_launch(^{ id res = await([self testPromiseRejectFullfile]); NSLog(@"result = %@" ,res); }) ;
- (COPromise *)testPromiseCancel { COPromise *promise = [COPromise promise] dispatch_async(dispatch_get_main_queue (), ^{ [promise cancel] }) return promise } co_launch(^{ COPromise *promise = [self testPromiseCancel]; [promise onCancel:^(COPromise * _Nonnull promise) { NSLog(@"isCancel" ) }] [promise catch:^(NSError * _Nonnull error) { NSLog(@"NSError"); }]; id res = await(promise); if(!res) { NSError *error = co_getError() BOOL isPromiseCancel = [COPromise isPromiseCancelled:error ] NSLog(@"isPromiseCancel %ld" ,isPromiseCancel) return } NSLog(@"result = %@" ,res) })
co_launch(^{ COPromise *promise = [self testPromiseFullfill]; COPromise *thenPromise = [promise then:^id _Nullable(id _Nullable value) { return [self testPromiseThen]; }]; id res = await (thenPromise); if (!res) { NSError *error = co_getError(); BOOL isPromiseCancel = [COPromise isPromiseCancelled:error]; return ; } NSLog(@"result = %@" ,res); });
co_launch(^{ NSArray *batchFullfile = batch_await(@[ [self testPromiseFullfill01], [self testPromiseFullfill02], [self testPromiseFullfill03], ]); if(batchFullfile && batchFullfile.count) { NSLog(@"%@" ,batchFullfile); }});
@property (nonatomic , readonly ) BOOL isPending;@property (nonatomic , readonly ) BOOL isFulfilled;@property (nonatomic , readonly ) BOOL isRejected;@property (nonatomic , readonly , nullable ) Value value;@property (nonatomic , readonly , nullable ) NSError *error;
/** Create a promise without constructor . Which means, you should control when the job begins. @return The `COPromise` instance */+ (instancetype)promise; /** Create a promise with constructor . the job begans when someone observing on it. @param constructor the constructor block. @return The `COPromise` instance */+ (instancetype)promise:(COPromiseConstructor)constructor; /** Create a promise with constructor . the job begans when someone observing on it. @param constructor the constructor block. @param queue the dispatch_queue_t that the job run. @return The `COPromise` instance */+ (instancetype)promise:(COPromiseConstructor)constructor onQueue:(dispatch_queue_t _Nullable )queue;
- (void )fulfill:(nullable Value)value; - (void )reject:(NSError * _Nullable)error; - (void )cancel;
- (void )onCancel:(COPromiseOnCancelBlock _Nullable )onCancelBlock; - (COPromise *)then:(COPromiseThenWorkBlock)work; - (COPromise *)catch :(COPromiseCatchWorkBlock)reject; + (BOOL)isPromiseCancelled:(NSError *)error;
static COProgressPromise* progressDownloadFileFromUrl(NSString *url){ COProgressPromise *promise = [COProgressPromise promise]; [NSURLSession sharedSession].configuration.requestCachePolicy = NSURLRequestReloadIgnoringCacheData ; NSURLSessionDataTask *task = [[NSURLSession sharedSession] dataTaskWithURL:[NSURL URLWithString:url] completionHandler:^(NSData * _Nullable data, NSURLResponse * _Nullable response, NSError * _Nullable error) { if (error) { [promise reject:error]; } else { [promise fulfill:data]; } }]; [task resume]; if (@available(iOS 11.0 , *)) { [promise setupWithProgress:task.progress]; } else { NSProgress *progress = [NSProgress progressWithTotalUnitCount:10 ]; [promise setupWithProgress:progress]; dispatch_source_t timer = nil ; dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0 ); timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0 , 0 , queue); dispatch_source_set_timer(timer, dispatch_walltime(NULL , 0 ), 0.1 * NSEC_PER_SEC , 0 * NSEC_PER_SEC ); dispatch_source_set_event_handler(timer, ^{ if (progress.completedUnitCount < progress.totalUnitCount) { progress.completedUnitCount += 1 ; } else { dispatch_source_cancel(timer); } }); dispatch_resume(timer); [promise onCancel:^(COPromise * _Nonnull promise) { dispatch_source_cancel(timer); }]; } return promise; } co_launch(^{ int progressCount = 0 ; COProgressPromise *promise = progressDownloadFileFromUrl(@"http://img17.3lian.com/d/file/201701/17/9a0d018ba683b9cbdcc5a7267b90891c.jpg" ); for (id p in promise){ double v = [p doubleValue]; NSLog (@"current progress: %f" , (float )v); progressCount++; } NSData *data = await(promise); });
5. COChan用法
COChan 主要用于协程之间进行数据传输,它有阻塞和非阻塞两种方式收发数据,针对channel,可以有无缓存类型,有缓存类型,以及无限缓存类型,下面会进行详细介绍。
- (COChan<id> *)co_fetchSomething { COChan *chan = [COChan chan ]; dispatch_async(_someQueue, ^{ ... [chan send_nonblock:result]; }); return chan ; } co_launch(^{ id ret = await([self co_fetchSomething]); });
COChan *chan = [COChan chanWithBuffCount:10 ]; co_launch(^{ for (int i = 0 ; i < 10 ; i++) { [chan send_nonblock:@(i)]; } }); co_launch(^{ NSArray *dataWithCount = [chan receiveWithCount:3 ]; NSLog(@"%@" ,dataWithCount); });
COChan *chan = [COChan chanWithBuffCount:0 ]; __block NSInteger step = 0 ; co_launch(^{ step = 1 ;// @1 [chan send:@111 ]; // 代码会停在这里直到receive_nonblock消费数据后才继续往下执行 // @2 step = 2 ;// @6 }); co_launch(^{ step = 3 ;// @3 id value = [chan receive_nonblock];// @4 step = 4 ;// @5 });
COChan *chan = [COChan chanWithBuffCount:2 ]; __block NSInteger step = 0 ; co_launch(^{ step = 1 ;// @1 [chan send:@111 ]; // 代码不会阻塞 // @2 step = 2 ;// @3 }); co_launch(^{ step = 3 ;// @4 id value = [chan receive_nonblock];// @5 step = 4 ;// @6 });
__block NSInteger step = 0 ; COChan *chan = [COChan chan]; co_launch(^{ step = 1 ;// @1 id value = [chan receive];// @2 step = 2 ;// @6 }); co_launch(^{ step = 3 ;// @3 [chan send :@111];// @4 step = 4 ;// @5 });
也就是send方法会往COChan中通道缓存中添加消息,如果满的话则会阻塞,直到数据被receive之后,才会继续执行。 receive会不断从通道缓存中获取数据,如果数据被取完后就阻塞,直到有数据send到缓存的时候,才会继续执行。这是很典型的生产者消费者模型。 send_nonblock会完缓存通道送数据后立刻返回,如果缓存满了就会丢弃数据,不会阻塞。看下coobjc给出的两个测试用例:
it(@"send non blocking will not block the coroutine." , ^{ __block NSInteger step = 0 ; COChan *chan = [COChan chanWithBuffCount:1 ]; co_launch(^{ step = 1 ; [chan send_nonblock:@111 ]; expect(step ).to .equal(1 ); step = 2 ; }); co_launch(^{ expect(step ).to .equal(2 ); step = 3 ; id value = [chan receive_nonblock]; expect(step ).to .equal(3 ); expect(value).to .equal(@111 ); }); waitUntil (^(DoneCallback done) { dispatch_async(dispatch_get_main_queue(), ^{ expect(step ).to .equal(3 ); done(); }); }); }); it(@"Channel buff is full, send non blocking will abandon the value." , ^{ __block NSInteger step = 0 ; COChan *chan = [COChan chan]; co_launch(^{ step = 1 ; [chan send_nonblock:@111 ]; expect(step ).to .equal(1 ); step = 2 ; }); co_launch(^{ expect(step ).to .equal(2 ); step = 3 ; id value = [chan receive_nonblock]; expect(step ).to .equal(3 ); expect(value).to .equal(nil ); }); waitUntil (^(DoneCallback done) { dispatch_async(dispatch_get_main_queue(), ^{ expect(step ).to .equal(3 ); done(); }); }); });
it(@"receive can block muti coroutine." , ^{ __block NSInteger step = 0 COChan *chan = [COChan chanWithBuffCount:1 ] co_launch(^{ step = 1 id value = [chan receive] expect(step ).to .equal(7 ) expect(value).to .equal(@111 ) step = 2 }) co_launch(^{ expect(step ).to .equal(1 ) step = 3 id value = [chan receive] expect(step ).to .equal(2 ) expect(value).to .equal(@222 ) step = 4 }) co_launch(^{ expect(step ).to .equal(3 ) step = 5 [chan send :@111] expect(step ).to .equal(5 ) step = 6 [chan send :@222] expect(step ).to .equal(6 ) step = 7 }) waitUntil(^(DoneCallback done) { dispatch_async(dispatch_get_main_queue(), ^{ expect(step ).to .equal(4 ) done() }) }) })
it (@"expandableChan will not abandon values.", ^{ __block NSInteger receiveCount = 0 ; __block NSInteger receiveValue = 0 ; __block NSInteger sendCount = 0 ; COChan *chan = [COChan expandableChan]; for (int i = 0 ; i < 2000 ; i++) { co_launch (^{ [chan send:@(i)]; sendCount++; }); } for (int i = 0 ; i < 2000 ; i++) { co_launch (^{ id value = [chan receive]; receiveCount++; receiveValue+=[value integerValue]; }); } waitUntilTimeout (100 , ^(DoneCallback done) { dispatch_after (dispatch_time(DISPATCH_TIME_NOW, (int64_t)(10 * NSEC_PER_SEC)), dispatch_get_main_queue (), ^{ expect (receiveCount).to .equal (2000 ); expect (receiveValue).to .equal (1999000 ); expect (sendCount).to .equal (2000 ); done (); }); }); });
/** 容量为0 的消息通道 */ + (instancetype )chan; /** 指定容量的消息通道 */ + (instancetype )chanWithBuffCount:(int32_t)buffCount; /** 创建一个可扩展的通道,通道的缓存大小是可以扩展的,这样send就不会阻塞当前进程,并且发送的数据将不会丢失 */ + (instancetype )expandableChan;
- (void)send:(Value _Nullable )val; - (void)send:(Value _Nullable )val onCancel:(COChanOnCancelBlock _Nullable)cancelBlock; - (void)send_nonblock:(Value _Nullable )val;
- (Value _Nullable )receive; - (Value _Nullable )receiveWithOnCancel:(COChanOnCancelBlock _Nullable)cancelBlock; - (Value _Nullable)receive_nonblock; - (NSArray<Value> * _Nonnull)receiveAll; - (NSArray<Value> * _Nonnull)receiveWithCount:(NSUInteger)count;
6. COActor用法
Actor 的概念来自于 Erlang ,在 AKKA 中,可以认为一个 Actor 就是一个容器,用以存储状态、行为、Mailbox 以及子 Actor 与 Supervisor 策略。Actor 之间并不直接通信,而是通过 Mail 来互通有无
* Mailbox: 用于存储消息的队列* Isolated State: actor的状态以及内部变量等。* message: 消息,类似于方法调用Actor模型有两个特点: 1. 在单个线程中的每个Actor顺序处理发送给它的消息2. 不同的Actors同时并行运行
co_actor(^(COActorChan * chan ) { val = 1 ; XCTAssert(chan != nil ) ; }); co_actor_onqueue(get_test_queue () , ^(COActorChan *chan) { val1 = 1 ; XCTAssert(chan != nil ) ; });
it(@"next in chan" , ^ { __block int val = 0 ; COActor * actor = co_actor(^ (COActorChan * chan) { int tmpVal = 0 ; COActorMessage * message = nil ; while ((message = [chan next])){ if ([message intType] == 1 ){ tmpVal++ ; } else if ([message intType] == - 1 ){ tmpVal-- ; } else if ([message intType] == 2 ){ message.complete(@(tmpVal)); } } }); co_launch(^ { [actor sendMessage:@(1 )]; [actor sendMessage:@(1 )]; [actor sendMessage:@(1 )]; [actor sendMessage:@(1 )]; [actor sendMessage:@(1 )]; [actor sendMessage:@(1 )]; [actor sendMessage:@(- 1 )]; COActorCompletable * completable = [actor sendMessage:@(2 )]; id result = await (completable); val = [result intValue]; [actor cancel]; }); waitUntil(^ (DoneCallback done) { dispatch_after(dispatch_time(DISPATCH_TIME_NOW , (int64_t)(0.5 * NSEC_PER_SEC )), dispatch_get_main_queue(), ^ { XCTAssert (val == 5 ); done(); }); }); }); it(@"error example" , ^ { COActor * actor = co_actor_onqueue(get_test_queue(), ^ (COActorChan * channel) { for (COActorMessage * message in channel){ message.complete(await (test_promise())); } }); co_launch(^ { id value = await ([actor sendMessage:@"test" ]); NSError * error = co_getError(); XCTAssert (error.code == 100 ); }); waitUntilTimeout(3 , ^ (DoneCallback done) { dispatch_after(dispatch_time(DISPATCH_TIME_NOW , (int64_t)(3 * NSEC_PER_SEC )), dispatch_get_main_queue(), ^ { done(); }); }); });
7. COTuple用法
COPromise <COTuple *>* cotest_loadContentFromFile(NSString * filePath){ return [COPromise promise:^ (COPromiseFullfill _Nonnull resolve, COPromiseReject _Nonnull reject) { if ([[NSFileManager defaultManager] fileExistsAtPath:filePath]) { NSData * data = [[NSData alloc] initWithContentsOfFile:filePath]; resolve(co_tuple(filePath, data, nil )); } else { NSError * error = [NSError errorWithDomain:@"fileNotFound" code:- 1 userInfo:nil ]; resolve(co_tuple(filePath, nil , error)); } }]; } co_launch(^ { NSString * tmpFilePath = nil ; NSData * data = nil ; NSError * error = nil ; co_unpack(& tmpFilePath, & data, & error) = await (cotest_loadContentFromFile(filePath)); XCTAssert ([tmpFilePath isEqualToString:filePath], @"file path is wrong" ); XCTAssert (data.length > 0 , @"data is wrong" ); XCTAssert (error == nil , @"error is wrong" ); });
8. 协程内延迟
co_delay 可以暂停所在的协程但是协程所在的线程不会停止运行,线程内部的其他协程也不会暂停,co_delay只能在协程内部运行,如果在外部执行的话将会抛出异常。
co_launch(^{ NSTimeInterval begin = ; co_delay(3); realDuration = - begin; });
9. cokit简介
cokit 为常用的一些原生方法提供了一些常用的分类供我们使用,主要涉及到文件网络的IO
NSDictionary+Coroutine.h :主要用于将NSDictionary从文件中读取和写入文件。
NSArray+Coroutine.h :主要用于读取NSPropertyList中的数组。
NSString+Coroutine.h : 用于字符串从文件中读取和写入文件。
NSData+Coroutine.h :主要用于将NSData数据从文件中读取和写入文件。
NSFileManager+Coroutine.h w: 用于文件的操作
NSJSONSerialization+Coroutine.h w: 用于JSON的序列化操作
NSKeyedArchiver+Coroutine.h :用于归档类操作
NSURLConnection+Coroutine.h,NSURLSession+Coroutine.h :用于网络请求操作
NSUserDefaults+Coroutine.h :用于NSUserDefaults存储操作
UIImage+Coroutine.h :用于获取图片操作