源码信息 Mars 是微信官方的终端基础组件,是一个使用 C++ 编写的业务性无关,平台性无关的基础组件,目前已经开源,下面是官方的源码以及文档地址:
Mars 概览
整个项目主要包含如下几个模块:
COMM :基础库,包括socket、线程、消息队列、协程等基础工具;XLOG :通用日志模块,充分考虑移动终端的特点,提供高性能、高可用、安全性、容错性的日志功能;SDT :网络诊断模块;STN :是微信的信令传输网络模块,它是基于Socket层的网络解决方案(它并不支持完整的 HTTP 协议),负责终端与服务器的小数据信令通道。是微信日常中使用最频繁的网络通道,STN中包含了很多其他方面的实用设计:包括自定义DNS、容灾设计、负载考量、APP的前后台考量、休眠机制考量、省电机制等等。网络通道上,目前STN提供了长连、短连两种类型的通道,用于满足不同的需求。使用STN后,应用开发者只需关注业务开发。CDN : 数据分发网络,负责大数据传输,这部分涉及具体的业务,所以未开源
下面是几种网络开源库的对比:
Mars Sample 代码分析 1. 让Sample项目跑起来
微信官方有专门的接入文档:Mars iOS/OS X 接入指南 一种是以framework形式引入的,一般实际开发项目中会以这种方式接入,另一种是调试模式,初期分析Mars Sample代码的时候需要以这种方式引入,这里需要注意的是编译成功后需要将mars.framework拷贝到项目文件夹下再添加到项目中。不然会提示找不到某些头文件。
这里还需要注意的是如果要以调试模式进行接入,运行编译脚本的时候需要选择3,否则找不到mars.xcodeproj
Enter menu: 1. Clean && build mars.2. Clean && build xlog.3. Gen iOS mars Project.4. Exit
下面就以调试模式来开始我们mars源码的分析:
2. Sample代码分析
Mars Sample 业务层核心部分主要由NetworkStatus ,NetworkService ,NetworkEvent 三大部分构成,在介绍Mars Sample 业务层代码之前我们先过下这部分功能。
NetworkStatus
这个类用于监听网络状态的,具体的思想和AFNetWorking 里面的AFNetworkReachabilityManager思路是一致的,只不过代码有点…..所以不贴代码了。大家可以看下之前介绍AFNetWorking的源码分析博客。
只要调用了Start方法之后在网络状态改变后都会调用它的ChangeReach 方法。
这里我们在Appdelegate类中将NetworkService 作为Start参数,也就是说在网络状态改变的时候会调用NetworkService的ChangeReach 方法。
NetworkService
是整个业务层比较重要的一个类,它和底层mars关系最为密切。它主要有如下功能:
1. Mars 的上层调用2. 网络状态NetworkStatus的监听器,并将这个网络状态传递给Mars底层的mars::baseevent3. 将Mars底层的callback通知到**** NetworkEvent****
NetworkEvent
NetworkEvent 管理着tasks ,controllers 和 pushrecvers :
@interface NetworkEvent : NSObject <NetworkDelegate > { NSMutableDictionary * tasks; NSMutableDictionary * controllers; NSMutableDictionary * pushrecvers; }
在有数据需要分发下去的时候可以通过它来分发,一般事件的起源都是从Mars底层callback产生的,然后经过NetworkService 传递给NetworkEvent ,NetworkEvent 负责分发给对应的对象。下面是这三者之间的关系,整体如下图所示:
NetworkService 负责设置 Mars ,由于在NetworkService setCallBack中设置了对应的回调所以Mars 一旦有回调就传给NetworkService ,NetworkService 将 Mars上传的事件传递给NetworkEvent ,NetworkEvent 再将事件传递给对应的controllers,pushrecvers。
接下来我们会以如下几个部分对业务层代码进行解析:
2.1 Mars 网络层初始化
- (BOOL)application:(UIApplication *)application didFinishLaunchingWithOptions:(NSDictionary *)launchOptions { [NetworkService sharedInstance] .delegate = [[NetworkEvent alloc] init]; [[NetworkService sharedInstance] setCallBack]; [[NetworkService sharedInstance] createMars]; [[NetworkService sharedInstance] setClientVersion:200 ]; [[NetworkService sharedInstance] setLongLinkAddress:@"localhost" port:8081 ]; [[NetworkService sharedInstance] setShortLinkPort:8080 ]; [[NetworkService sharedInstance] reportEvent_OnForeground:YES]; [[NetworkService sharedInstance] makesureLongLinkConnect]; [[NetworkStatus sharedInstance] Start:[NetworkService sharedInstance]]; return YES; } - (void )applicationDidEnterBackground:(UIApplication *)application { [[NetworkService sharedInstance] reportEvent_OnForeground:NO];} - (void )applicationWillEnterForeground:(UIApplication *)application { [[NetworkService sharedInstance] reportEvent_OnForeground:YES];} - (void )applicationWillTerminate:(UIApplication *)application { [[NetworkService sharedInstance] destroyMars]; appender_close(); }
这部分我们先埋个坑,在介绍Mars底层代码的时候我们再展开介绍。
2.2 通过Mars 拉取数据
- (void )loadView { [super loadView]; converSations = [[NSArray alloc] init]; CGITask *convlstCGI = [[CGITask alloc] initAll:ChannelType_ShortConn AndCmdId:kConvLst AndCGIUri:@"/mars/getconvlist" AndHost:@"localhost" ]; [[NetworkService sharedInstance] startTask:convlstCGI ForUI:self ]; } - (NSData *)requestSendData { ConversationListRequest *convlstRequest = [ConversationListRequest new]; convlstRequest.type = 0 ; convlstRequest.accessToken = @"123456" ; NSData *data = [convlstRequest data]; return data; } - (int )onPostDecode:(NSData *)responseData { convlstResponse = [ConversationListResponse parseFromData:responseData error:nil ]; self ->converSations = convlstResponse.listArray; LOG_INFO(kModuleViewController, @"recv conversation list, size: %lu" , (unsigned long )[self ->converSations count]); return [self ->converSations count] > 0 ? 0 : -1 ; } - (int )onTaskEnd:(uint32_t)tid errType:(uint32_t)errtype errCode:(uint32_t)errcode { dispatch_async (dispatch_get_main_queue(), ^{ [self .tableView reloadData]; }); return 0 ; }
2.2 通过Mars 监听下发的数据
- (void )viewDidLoad { [super viewDidLoad]; self .title = _conversation.notice; _messages = [NSMutableArray new]; [[NetworkService sharedInstance] addPushObserver:self withCmdId:kPushMessageCmdId]; } -(NSData *)requestSendData { SendMessageRequest *sendMsgRequest = [SendMessageRequest new]; sendMsgRequest.from = [self username]; sendMsgRequest.to = @"all" ; sendMsgRequest.text = _textField.text; sendMsgRequest.accessToken = @"123456" ; sendMsgRequest.topic = _conversation.topic; LOG_INFO(kModuleViewController, @"send msg to topic:%@" , _conversation.notice); NSData * data = [sendMsgRequest data]; dispatch_async (dispatch_get_main_queue(), ^{ _textField.text = @"" ; }); return data; } -(int )onPostDecode:(NSData *)responseData { SendMessageResponse *sendMsgResponse = [SendMessageResponse parseFromData:responseData error:nil ]; dispatch_async (dispatch_get_main_queue(), ^{ NSString *recvtext = [NSString stringWithFormat:@"%@ : %@" , sendMsgResponse.from, sendMsgResponse.text]; [self .messages addObject:recvtext]; [self .tableView reloadData]; NSIndexPath *indexPath = [NSIndexPath indexPathForRow:self .messages.count-1 inSection:0 ]; [self .tableView scrollToRowAtIndexPath:indexPath atScrollPosition:UITableViewScrollPositionBottom animated:YES ]; }); return sendMsgResponse.errCode == 0 ? 0 : -1 ; } - (void )sendMessage { CGITask *sendMsgCGI = [[CGITask alloc] initAll:ChannelType_LongConn AndCmdId:kSendMsg AndCGIUri:@"/mars/sendmessage" AndHost:@"localhost" ]; [[NetworkService sharedInstance] startTask:sendMsgCGI ForUI:self ]; } - (void )notifyPushMessage:(NSData *)pushData withCmdId:(int )cmdId { MessagePush* messagePush = [MessagePush parseFromData:pushData error:nil ]; if (messagePush != nil ) { dispatch_async (dispatch_get_main_queue(), ^{ NSString *recvtext = [NSString stringWithFormat:@"%@ : %@" , messagePush.from, messagePush.content]; [self .messages addObject:recvtext]; [self .tableView reloadData]; NSIndexPath *indexPath = [NSIndexPath indexPathForRow:self .messages.count-1 inSection:0 ]; [self .tableView scrollToRowAtIndexPath:indexPath atScrollPosition:UITableViewScrollPositionBottom animated:YES ]; }); } } - (int )onTaskEnd:(uint32_t)tid errType:(uint32_t)errtype errCode:(uint32_t)errcode { return 0 ; }
在Sameple 例子中每个请求都被定义成一个CGITask ,这里面包含了任务id,长链接,短链接通道的选择,用于表示标示某个请求的id,url 以及host地址。
@interface CGITask : NSObject@property (nonatomic) uint32_t taskid;@property (nonatomic) ChannelType channel_select;@property (nonatomic) uint32_t cmdid;@property (nonatomic, copy) NSString *cgi;@property (nonatomic, copy) NSString *host;@end
紧接着调用startTask启动任务:
[[NetworkService sharedInstance] startTask:convlstCGI ForUI:self];
- (int )startTask:(CGITask *)task ForUI:(id<UINotifyDelegate>)delegateUI { Task ctask; ctask.cmdid = task .cmdid ; ctask.channel_select = task .channel_select ; ctask.cgi = std::string (task .cgi .UTF8String ); ctask.shortlink_host_list .push_back (std::string (task .host .UTF8String )); ctask.user_context = (__bridge void *)task ; mars::stn::StartTask(ctask); NSString *taskIdKey = [NSString stringWithFormat:@"%d" , ctask.taskid ]; [_delegate addObserver:delegateUI forKey:taskIdKey]; [_delegate addCGITasks:task forKey:taskIdKey]; return ctask.taskid ; }
在startTask方法中会将CGITask转换为Task,并调用mars::stn::StartTask 然后将当前当前对象,以及task添加到NetworkEvent中,这样一旦有事件就会通知到它们。我们看下NetworkEvent 中与controllers有关的方法,这些方法都是task请求触发的。
- (NSData *)Request2BufferWithTaskID:(uint32_t)tid task:(CGITask *)task { NSData * data = NULL ; NSString *taskIdKey = [NSString stringWithFormat:@"%d" , tid]; id <UINotifyDelegate > uiObserver = [controllers objectForKey:taskIdKey]; if (uiObserver != nil ) { data = [uiObserver requestSendData]; } return data; } - (NSInteger )Buffer2ResponseWithTaskID:(uint32_t)tid responseData:(NSData *)data task:(CGITask *)task { int returnType = 0 ; NSString *taskIdKey = [NSString stringWithFormat:@"%d" , tid]; id <UINotifyDelegate > uiObserver = [controllers objectForKey:taskIdKey]; if (uiObserver != nil ) { returnType = [uiObserver onPostDecode:data]; } else { returnType = -1 ; } return returnType; } - (NSInteger )OnTaskEndWithTaskID:(uint32_t)tid task:(CGITask *)task errType:(uint32_t)errtype errCode:(uint32_t)errcode { NSString *taskIdKey = [NSString stringWithFormat:@"%d" , tid]; [tasks removeObjectForKey:taskIdKey]; id <UINotifyDelegate > uiObserver = [controllers objectForKey:taskIdKey]; [uiObserver onTaskEnd:tid errType:errtype errCode:errcode]; [controllers removeObjectForKey:taskIdKey]; return 0 ; }
也就是说在Request2BufferWithTaskID ,Buffer2ResponseWithTaskID ,OnTaskEndWithTaskID 会分别触发 发起 Task类的requestSendData ,onPostDecode ,OnTaskEndWithTaskID 方法。
而这三者在会在stn_callback.mm 中的StnCallBack::Req2Buf ,StnCallBack::Buf2Resp ,StnCallBack::OnTaskEnd 调用NetworkService的Request2BufferWithTaskID ,Buffer2ResponseWithTaskID ,OnTaskEndWithTaskID 方法。
bool StnCallBack::Req2Buf(uint32_t _taskid , void * const _user_context , AutoBuffer& _outbuffer , AutoBuffer& _extend , int & _error_code , const int _channel_select ) { NSData* requestData = [[NetworkService sharedInstance ] Request2BufferWithTaskID:_taskid userContext:_user_context]; if (requestData == nil) { requestData = [[NSData alloc ] init]; } _outbuffer.AllocWrite(requestData .length ) ; _outbuffer.Write(requestData .bytes ,requestData .length ) ; return requestData.length > 0 ; } int StnCallBack::Buf2Resp(uint32_t _taskid , void * const _user_context , const AutoBuffer& _inbuffer , const AutoBuffer& _extend , int & _error_code , const int _channel_select ) { int handle_type = mars::stn::kTaskFailHandleNormal; NSData* responseData = [NSData dataWithBytes :(const void * ) _inbuffer .Ptr () length :_inbuffer .Length ()] ; NSInteger errorCode = [[NetworkService sharedInstance ] Buffer2ResponseWithTaskID:_taskid ResponseData:responseData userContext:_user_context]; if (errorCode != 0 ) { handle_type = mars::stn::kTaskFailHandleDefault; } return handle_type; } int StnCallBack::OnTaskEnd(uint32_t _taskid , void * const _user_context , int _error_type , int _error_code ) { return (int )[[NetworkService sharedInstance ] OnTaskEndWithTaskID:_taskid userContext:_user_context errType:_error_type errCode:_error_code]; }
至于stn_callback什么时候被调用这里先不做过多的介绍,后续介绍底层的时候再来揭开这个答案。
现在我们看下通过上层拉取一个接口是怎样的一个过程:
1. 构建一个CGITask,传入使用哪种渠道:长链接?短链接?地址,域名,命令id2. 调用mars::stn::StartTask3. 将当前类添加到NetWorkEvent的controller列表,4. 调用mars::stn::StartTask后,Mars 对应的事件会从底层传给NetWorkService,再通过NetWorkEvent分发给对应的controller,我们可以在requestSendData,onPostDecode,OnTaskEndWithTaskID做响应的处理。
看完拉取请求后,其实接收推送的过程也差不多类似:
stn_callback.mm
void StnCallBack::OnPush(uint64_t _channel_id , uint32_t _cmdid , uint32_t _taskid , const AutoBuffer& _body , const AutoBuffer& _extend ) { if (_body.Length() > 0 ) { NSData* recvData = [NSData dataWithBytes :(const void * ) _body .Ptr () length :_body .Length ()] ; [[NetworkService sharedInstance ] OnPushWithCmd:_cmdid data:recvData]; } }
NetworkService.m
- (void)OnPushWithCmd :(NSInteger )cid data :(NSData *)data { return [_delegate OnPushWithCmd :cid data :data ]; }
NetworkEvent.m
- (void)OnPushWithCmd :(NSInteger )cid data :(NSData *)data { id <PushNotifyDelegate > pushObserver = [pushrecvers objectForKey :[NSString stringWithFormat :@"%d ", cid ]]; if (pushObserver != nil ) { [pushObserver notifyPushMessage :data withCmdId :cid ]; } }
注册监听下发对象
[[NetworkService sharedInstance] addPushObserver:self withCmdId:kPushMessageCmdId];
3. Mars 底层源码解析
3.1 回调设置
- (void )setCallBack { mars::stn ::SetCallback (mars::stn ::StnCallBack ::Instance ()); mars::app ::SetCallback (mars::app ::AppCallBack ::Instance ()); }
在PublicComponentV2 文件夹下面有 app_callback 以及 stn_callback 两组文件:
stn_callback 这个文件是STN模块的回调接口,app_callback 是应用相关的回调接口,Mars 通过这些回调接口从上层获取对应的定制化服务,从而将部分可定制的内容放在应用层来做,将这部分业务相关的从底层抽取出来。我们来看下这部分接口,至于具体的时候在介绍到对应的逻辑的时候再介绍,这里主要是了解,回调有哪些接口,这些接口在底层怎么使用:
StnCallBack 接口定义如下:
class StnCallBack : public Callback { private : public: virtual void TrafficData(ssize_t _send , ssize_t _recv ) ; virtual std::vector<std::string > OnNewDns(const std ::string & _host ) ; virtual void OnPush(uint64_t _channel_id , uint32_t _cmdid , uint32_t _taskid , const AutoBuffer& _body , const AutoBuffer& _extend ) ; virtual bool Req2Buf(uint32_t _taskid , void * const _user_context , AutoBuffer& _outbuffer , AutoBuffer& _extend , int & _error_code , const int _channel_select ) ; virtual int Buf2Resp(uint32_t _taskid , void * const _user_context , const AutoBuffer& _inbuffer , const AutoBuffer& _extend , int & _error_code , const int _channel_select ) ; virtual int OnTaskEnd(uint32_t _taskid , void * const _user_context , int _error_type , int _error_code ) ; virtual void ReportConnectStatus(int _status , int longlink_status ) ; virtual int GetLonglinkIdentifyCheckBuffer(AutoBuffer& _identify_buffer , AutoBuffer& _buffer_hash , int32_t & _cmdid ) ; virtual bool OnLonglinkIdentifyResponse(const AutoBuffer& _response_buffer , const AutoBuffer& _identify_buffer_hash ) ; private : static StnCallBack* instance_; };
AppCallBack 接口定义如下:
class AppCallBack : public Callback { private : public : virtual std::string GetAppFilePath () ; virtual AccountInfo GetAccountInfo () ; virtual unsigned int GetClientVersion () ; virtual DeviceInfo GetDeviceInfo () ; private : static AppCallBack* instance_; };
调用SetCallback后就会将对应的callback保存到sg_callback中:
void SetCallback (Callback* const callback) { sg_callback = callback; }
StnCallBack 将会在stn_logic.cc 被注册到Mars底层的stn模块中,AppCallBack 将会在 app_logic.cc 中被注入到Mars底层的app模块中,至于这些模块的作用我们后面会具体介绍。设置完回调之后就可以通过这些回调就可以将与业务相关的逻辑交给业务层,比如流量统计这部分在Mars的stn模块就可以通过如下方式调用上层逻辑:
void (*TrafficData)(ssize_t _send, ssize_t _recv) = [](ssize_t _send, ssize_t _recv) { xassert2(sg_callback != NULL); return sg_callback->TrafficData(_send, _recv); };
3.2 创建Mars
Mars的创建是通过createMars方法进行创建的:
这里调用了mars::baseevent的OnCreate
- (void ) createMars { mars::baseevent::OnCreate (); }
在继续介绍Mars创建的过程之前我们先看下mars::baseevent的工作机制:
mars::baseevent里面除了OnCreate外还有如下几个方法,它们的用法看方法名就可以看出,我们这里不介绍这些方法。
void OnCreate () void OnDestroy () void OnSingalCrash (int _sig) void OnExceptionCrash () void OnForeground (bool _isforeground) void OnNetworkChange ()
mars::baseevent::OnCreate() 实现如下:
void OnCreate () { GetSignalOnCreate ()(); }
在baseprjevent.cc 实现如下,这里利用了boost这个c++库的signals2信号槽的机制,接触过qt编程的大家都会理解信号槽这个概念。它相当于将一个信号与一个方法绑定在一起,只要发起那个信号,对应绑定的方法就会触发。
boost::signals2 ::signal <void ()>& GetSignalOnCreate () { static boost::signals2 ::signal <void ()> SignalOnCreate; return SignalOnCreate; }
那么这个绑定关系是在哪里绑定的呢?
我们可以看到BOOT_RUN_STARTUP宏的定义,这些方法是会在启动的时候执行,
#define BOOT_RUN_STARTUP(func ) VARIABLE_IS_NOT_USED static int __anonymous_run_variable_startup_##func = __boot_run_atstartup(func )
下面罗列出了包含BOOT_RUN_STARTUP的类
message_queue.cc
BOOT_RUN_STARTUP(__RgisterANRCheckCallback ) ;
app_logic.cc
BOOT_RUN_STARTUP(__InitbindBaseprjevent ) ;
active_logic.cc
BOOT_RUN_STARTUP(__initbind_baseprjevent ) ;
active_logic.cc
BOOT_RUN_STARTUP(__initbind_baseprjevent ) ;
sdt_logic.cc
BOOT_RUN_STARTUP(__initbind_baseprjevent ) ;
stn_logic.cc
BOOT_RUN_STARTUP(__initbind_baseprjevent ) ;
我们可以看到:sdt_logic.cc
static void __initbind_baseprjevent () { GetSignalOnCreate ().connect (&onCreate); GetSignalOnDestroy ().connect (5 , &onDestroy); }
以及stn_logic.cc
static void __initbind_baseprjevent () { GetSignalOnCreate ().connect (&onCreate); GetSignalOnDestroy ().connect (&onDestroy); GetSignalOnSingalCrash ().connect (&onSingalCrash); GetSignalOnExceptionCrash ().connect (&onExceptionCrash); GetSignalOnNetworkChange ().connect (5 , &onNetworkChange); GetSignalOnNetworkDataChange ().connect (&OnNetworkDataChange); }
都包含将GetSignalOnCreate信号通过信号槽与对应方法进行绑定的工作:因此sdt_logic.cc以及stn_logic.cc中的onCreate方法就是我们调用createMars时候底层的所有操作:
sdt_logic.cc
static void onCreate () { SdtCore::Singleton::Instance (); }
SdtCore 模块的初始化:
SdtCore::SdtCore() : thread_(boost ::bind (&SdtCore::__RunOn , this ) ) , check_list_(std ::list <BaseChecker* >() ) , cancel_(false ) , checking_(false ) { xinfo_function() ; }
stn_logic.cc
static void onCreate () { ActiveLogic::Singleton::Instance (); NetCore::Singleton::Instance (); }
ActiveLogic::ActiveLogic() : isforeground_(false ) , isactive_(true ) , alarm_(boost ::bind (&ActiveLogic::__OnInActive , this ) , false ) , lastforegroundchangetime_(::gettickcount () ) { }
NetCore::NetCore() : messagequeue_creater_(true , XLOGGER_TAG) }
在这两个文件中分别初始化了SdtCore ,NetCore ,ActiveLogic 模块。
3.3 设置长链接地址和短链接地址
设置长链接地址:
[[NetworkService sharedInstance] setLongLinkAddress:@"localhost" port:8081 ];
void (*SetLonglinkSvrAddr)(const std ::string & host, const std ::vector <uint16_t> ports, const std ::string & debugip)= [](const std ::string & host, const std ::vector <uint16_t> ports, const std ::string & debugip) { std::vector <std::string > hosts; if (!host.empty ()) { hosts.push_back (host); } NetSource ::SetLongLink (hosts, ports, debugip); };
设置短链接地址:
[[NetworkService sharedInstance] setShortLinkPort:8080 ];
void (*SetShortlinkSvrAddr)(const uint16_t port, const std: :st ring& debugip) = [](const uint16_t port, const std: :st ring& debugip) { NetSource: :Se tShortlink(port, debugip); };
void NetSource::SetLongLink(const std ::vector <std ::string >& _hosts , const std ::vector <uint16_t >& _ports , const std ::string & _debugip ) { ScopedLock lock(sg_ip_mutex); if (!_hosts . empty() ) { sg_longlink_hosts = _hosts; } else { } sg_longlink_ports = _ports; } void NetSource::SetShortlink(const uint16_t _port , const std ::string & _debugip ) { ScopedLock lock(sg_ip_mutex); sg_shortlink_port = _port; sg_shortlink_debugip = _debugip; }
最终会将长链接,短链接的host以及port存到NetSource的sg_longlink_hosts ,sg_longlink_ports ,sg_shortlink_port 数组中。
3.3 上报处于前台后台的状态
[[NetworkService sharedInstance] reportEvent_OnForeground:YES/NO];
同样它也是依靠信号来触发的,最终的绑定在active_logic.cc 中:
static void __initbind_baseprjevent () { GetSignalOnForeground ().connect (&onForeground); } static void onForeground (bool _isforeground) { ActiveLogic::Singleton::Instance()->OnForeground(_isforeground); }
void ActiveLogic::OnForeground(bool _isforeground ) { if (_isforeground == isforeground_) return; bool oldisactive = isactive_; isactive_ = true ; isforeground_ = _isforeground; lastforegroundchangetime_ = ::gettickcount() ; alarm_.Cancel() ; bool isnotify = oldisactive!=isactive_; SignalForeground(isforeground_ ) ; if (isnotify) { SignalActive(isactive_ ) ; } }
在ActiveLogic::OnForeground 方法中最重要的就是SignalForeground ,以及SignalActive 的调用,这也是通过信号来发送的,所以直接略过中间的环境,看下最终的底层调用:
LongLinkConnectMonitor::LongLinkConnectMonitor(ActiveLogic& _activelogic , LongLink& _longlink , MessageQueue::MessageQueue_t _id ) : asyncreg_(MessageQueue::InstallAsyncHandler(_id ) ) , activelogic_(_activelogic ) , longlink_(_longlink ) , alarm_(boost ::bind (&LongLinkConnectMonitor::__OnAlarm , this ) , _id) , status_(LongLink::kDisConnected ) , last_connect_time_(0) , last_connect_net_type_(kNoNet ) , thread_(boost ::bind (&LongLinkConnectMonitor::__Run , this ) , XLOGGER_TAG"::con_mon" ) , conti_suc_count_(0) , isstart_(false ) { activelogic_.SignalActive . connect(boost::bind(&LongLinkConnectMonitor::__OnSignalActive, this, _1)); activelogic_.SignalForeground . connect(boost::bind(&LongLinkConnectMonitor::__OnSignalForeground, this, _1)); longlink_.SignalConnection . connect(boost::bind(&LongLinkConnectMonitor::__OnLongLinkStatuChanged, this, _1)); }
void LongLinkConnectMonitor::__OnSignalForeground(bool _isForeground ) { if (_isForeground) { longlink_.GetLastRecvTime() .get() , int64 _t(tickcount_t () .gettickcount() - longlink_.GetLastRecvTime() )); if ((longlink_.ConnectStatus() == LongLink::kConnected) && (tickcount_t() .gettickcount() - longlink_.GetLastRecvTime() > tickcountdiff_t(4.5 * 60 * 1000) )) { __ReConnect() ; } } /....... }
SignalForeground 只有应用在前台的情况下如果当前时间与上一次长链接接收时间超过4.5 分钟的话就会重新连接
void LongLinkConnectMonitor::__OnSignalActive(bool _isactive ) { __AutoIntervalConnect() ; }
uint64_t LongLinkConnectMonitor::__AutoIntervalConnect() { alarm_.Cancel() ; uint64_t remain = __IntervalConnect(kLongLinkConnect ) ; alarm_.Start((int ) remain); return remain; }
SignalActive 会在__IntervalConnect 间隔时间内自动连接,__IntervalConnect 会根据连接类型,以及当前的应用的状态来查询下面的表格,进而确定间隔时间。
kForgroundOneMinute | kForgroundTenMinute | kForgroundActive | kBackgroundActive | kInactive static unsigned long const sg_interval[][5 ] = { kTaskConnect: { 5 , 10 , 20 , 30 , 300 }, kLongLinkConnect: { 15 , 30 , 240 , 300 , 600 }, kNetworkChangeConnect: { 0 , 0 , 0 , 0 , 0 }, };
整个重连机制会在介绍长链接的时候进一步介绍。
3.4 检测长链接状态,保证长链接处于连接状态
我们继续来看Appdelegate中的Mars初始化最后一步makesureLongLinkConnect :
下面是层层的调用关系:
- (void )makesureLongLinkConnect { mars::stn::MakesureLonglinkConnected (); }
void (*MakesureLonglinkConnected)() = []() { STN_WEAK_CALL(MakeSureLongLinkConnect()); };
void NetCore::MakeSureLongLinkConnect () {#ifdef USE_LONG_LINK ASYNC_BLOCK_START longlink_task_manager_->LongLinkChannel ().MakeSureConnected (); ASYNC_BLOCK_END #endif }
bool LongLink: :MakeSureConnected(bool* _newone ) { if (_newone ) *_newone = false ; ScopedLock lock(mutex_); if (kConnected == ConnectStatus()) return true ; bool new one = false ; thread_.start(&new one ); if (new one ) { } if (_newone ) *_newone = new one ; return false ; }
LongLink::MakeSureConnected中最关键的是调用了thread_.start(&newone)
LongLink::LongLink(const mq ::MessageQueue_t& _messagequeueid , NetSource& _netsource ) : asyncreg_(MessageQueue::InstallAsyncHandler(_messagequeueid ) ) , netsource_(_netsource ) , thread_(boost ::bind (&LongLink::__Run , this ) , XLOGGER_TAG "::lonklink" ) , connectstatus_(kConnectIdle ) , disconnectinternalcode_(kNone ) , smartheartbeat_(NULL) , wakelock_(NULL) {}
在初始化LongLink的时候已经通过boost::bind将thread_绑定到了__Run所以,在调用start的时候调用的是__Run方法:
void LongLink::__Run() { SOCKET sock = __RunConnect(conn_profile ) ; ErrCmdType errtype = kEctOK; int errcode = 0 ; __RunReadWrite(sock , errtype , errcode , conn_profile ) ; }
__Run中最关键的部分是__RunConnect以及__RunReadWrite:前者是执行socket连接,后者是在一个循环中等待执行数据的读写。关于数据的读写后面会专门介绍。
3.5 发起请求
我们上面代码可以看出发起请求是通过如下代码来实现的:
- (int )startTask:(CGITask *)task ForUI:(id<UINotifyDelegate>)delegateUI { Task ctask; ctask.cmdid = task .cmdid ; ctask.channel_select = task .channel_select ; ctask.cgi = std::string (task .cgi .UTF8String ); ctask.shortlink_host_list .push_back (std::string (task .host .UTF8String )); ctask.user_context = (__bridge void *)task ; mars::stn::StartTask(ctask); NSString *taskIdKey = [NSString stringWithFormat:@"%d" , ctask.taskid ]; [_delegate addObserver:delegateUI forKey:taskIdKey]; [_delegate addCGITasks:task forKey:taskIdKey]; return ctask.taskid ; }
上面创建了一个Task对象,将请求所需要的全部数据都放在Task上,然后通过mars::stn::StartTask(ctask) 调用Mars发起请求:
stn_logic.cc
bool (*StartTask)(const Task& _task) = [](const Task& _task) { STN_RETURN_WEAK_CALL(StartTask(_task)); };
stn_logic.cc
void NetCore::StartTask(const Task& _task ) { Task task = _task; if (!__ValidAndInitDefault(task , group ) ) { OnTaskEnd(task .taskid , task .user_context , kEctLocal , kEctLocalTaskParam ) ; return; } if (0 == task.channel_select) { OnTaskEnd(task .taskid , task .user_context , kEctLocal , kEctLocalChannelSelect ) ; return; } if (task.network_status_sensitive && kNoNet ==::getNetInfo() #ifdef USE_LONG_LINK && LongLink::kConnected != longlink_task_manager_->LongLinkChannel() .ConnectStatus() #endif ) { OnTaskEnd(task .taskid , task .user_context , kEctLocal , kEctLocalNoNet ) ; return; } bool start_ok = false ; if (LongLink::kConnected != longlink_task_manager_->LongLinkChannel() .ConnectStatus() && (Task::kChannelLong & task.channel_select) && ActiveLogic::Singleton::Instance() ->IsForeground() && (15 * 60 * 1000 >= gettickcount() - ActiveLogic::Singleton::Instance() ->LastForegroundChangeTime() )) longlink_task_manager_->getLongLinkConnectMonitor() .MakeSureConnected() ; switch (task.channel_select) { case Task::kChannelBoth: { #ifdef USE_LONG_LINK bool bUseLongLink = LongLink::kConnected == longlink_task_manager_->LongLinkChannel() .ConnectStatus() ; if (bUseLongLink && task.channel_strategy == Task::kChannelFastStrategy) { bUseLongLink = bUseLongLink && (longlink_task_manager_->GetTaskCount() <= kFastSendUseLonglinkTaskCntLimit); } if (bUseLongLink) start_ok = longlink_task_manager_->StartTask(task ) ; else #endif start_ok = shortlink_task_manager_->StartTask(task ) ; } break; #ifdef USE_LONG_LINK case Task::kChannelLong: start_ok = longlink_task_manager_->StartTask(task ) ; break; #endif case Task::kChannelShort: start_ok = shortlink_task_manager_->StartTask(task ) ; break; default: xassert2(false ); break; } }
在NetCore::StartTask中最主要的工作就是根据channel_select来选择调用长链接模块还是短链接模块的StartTask。当channel_select为kChannelBoth的时候在长链接允许的情况下优先使用长链接。
3.5.1 短链接StartTask
我们先看下短链接的情况下的StartTask:
bool ShortLinkTaskManager::StartTask (const Task & _task) { TaskProfile task (_task) ; task.link_type = Task ::kChannelShort; lst_cmd_.push_back (task); lst_cmd_.sort (__CompareTask); __RunLoop(); return true ; }
短链接的情况下的StartTask会将Task添加到lst_cmd_然后启动__RunLoop中处理task,我们再来看下__RunLoop
void ShortLinkTaskManager::__RunLoop () { __RunOnTimeout(); __RunOnStartTask(); if (!lst_cmd_.empty()) { MessageQueue::FasterMessage (asyncreg_.Get(), MessageQueue::Message ((MessageQueue::MessageTitle_t )this, boost::bind (&ShortLinkTaskManager::__RunLoop , this), "ShortLinkTaskManager::__RunLoop" ), MessageQueue::MessageTiming (1000 )); } else {} }
在__RunLoop方法中主要调用了****__RunOnTimeout, __RunOnStartTask****然后如果lst_cmd_不为空的话会在间隔1秒后重新执行__RunLoop,从而形成一个循环。 __RunOnTimeout 主要处理请求超时的任务。而__RunOnStartTask才是发起请求的关键:
void ShortLinkTaskManager::__RunOnStartTask() { while (first != last) { std::list<TaskProfile>::iterator next = first; ++next; if (first-> running_id) { ++sent_count; first = next; continue; } if (first->retry_time_interval > curtime - first-> retry_start_time) { xdebug2 (TSF"retry interval, taskid:%0, task retry late task, wait:%1", first->task .taskid, (curtime - first-> transfer_profile.loop_start_task_time) / 1000 ); first = next; continue; } if (first-> task.need_authed) { if (!ismakesureauthruned) { ismakesureauthruned = true ; ismakesureauthsuccess = MakesureAuthed(); } if (!ismakesureauthsuccess) { xinfo2_if(curtime % 3 == 1 , TSF"makeSureAuth retsult=%0" , ismakesureauthsuccess); first = next; continue; } } AutoBuffer bufreq; AutoBuffer buffer_extension; int error_code = 0 ; if (!Req2Buf(first->task .taskid, first-> task.user_context, bufreq, buffer_extension, error_code, Task::kChannelShort)) { __SingleRespHandle (first, kEctEnDecode, error_code, kTaskFailHandleTaskEnd, 0, first->running_id ? ((ShortLinkInterface*)first->running_id )-> Profile() : ConnectProfile()); first = next; continue; } xassert2(fun_anti_avalanche_check_); if (!fun_anti_avalanche_check_(first-> task, bufreq.Ptr(), (int)bufreq.Length())) { __SingleRespHandle (first, kEctLocal, kEctLocalAntiAvalanche, kTaskFailHandleTaskEnd, 0, first->running_id ? ((ShortLinkInterface*)first->running_id )-> Profile() : ConnectProfile()); first = next; continue; } first -> transfer_profile.loop_start_task_time = ::gettickcount(); first ->transfer_profile .first_pkg_timeout = __FirstPkgTimeout(first-> task.server_process_cost, bufreq.Length(), sent_count, dynamic_timeout_.GetStatus()); first ->current_dyntime_status = (first-> task.server_process_cost <= 0 ) ? dynamic_timeout_.GetStatus() : kEValuating; first ->transfer_profile .read_write_timeout = __ReadWriteTimeout(first-> transfer_profile.first_pkg_timeout); first -> transfer_profile.send_data_size = bufreq.Length(); first ->use_proxy = (first->remain_retry_count == 0 && first-> task.retry_count > 0 ) ? !default_use_proxy_ : default_use_proxy_; ShortLinkInterface * worker = ShortLinkChannelFactory::Create(MessageQueue::Handler2Queue(asyncreg_.Get()), net_source_, first->task , first-> use_proxy); worker -> OnSend.set(boost::bind(&ShortLinkTaskManager::__OnSend, this, _1), AYNC_HANDLER); worker -> OnRecv.set(boost::bind(&ShortLinkTaskManager::__OnRecv, this, _1, _2, _3), AYNC_HANDLER); worker -> OnResponse.set(boost::bind(&ShortLinkTaskManager::__OnResponse, this, _1, _2, _3, _4, _5, _6, _7), AYNC_HANDLER); first -> running_id = (intptr_t)worker; worker -> func_network_report.set(fun_notify_network_err_); worker -> SendRequest(bufreq, buffer_extension); ++sent_count; first = next; } }
上面是给出的详细代码注释,我们接下来看下当中的关键代码:
if (!Req2Buf(first ->task .taskid , first ->task .user_context , bufreq , buffer_extension , error_code , Task::kChannelShort ) ) { __SingleRespHandle(first , kEctEnDecode , error_code , kTaskFailHandleTaskEnd , 0, first ->running_id ? ((ShortLinkInterface* ) first->running_id)->Profile() : ConnectProfile() ); first = next; continue; } ShortLinkInterface* worker = ShortLinkChannelFactory::Create(MessageQueue::Handler2Queue(asyncreg_ .Get() ), net_source_, first->task, first->use_proxy); worker->OnSend . set(boost::bind(&ShortLinkTaskManager::__OnSend, this, _1), AYNC_HANDLER); worker->OnRecv . set(boost::bind(&ShortLinkTaskManager::__OnRecv, this, _1, _2, _3), AYNC_HANDLER); worker->OnResponse . set(boost::bind(&ShortLinkTaskManager::__OnResponse, this, _1, _2, _3, _4, _5, _6, _7), AYNC_HANDLER); first->running_id = (intptr_t)worker; worker->SendRequest(bufreq , buffer_extension ) ;
从上面可以看出在经过一系列的检查后调用上层的Req2Buf 方法,然后会通过ShortLinkChannelFactory 创建ShortLinkInterface (ShortLink 对象)然后将ShortLinkInterface的OnSend,OnRecv,OnResponse与ShortLinkTaskManager进行绑定,然后为当前任务分配一个running_id,最后调用SendRequest将请求数据发送出去。
我们继续看ShortLink::SendRequest
void ShortLink::SendRequest(AutoBuffer& _buf_req , AutoBuffer& _buffer_extend ) { send_body_.Attach(_buf_req ) ; send_extend_.Attach(_buffer_extend ) ; thread_.start() ; }
ShortLink::SendRequest最关键的就是调用了thread_.start()方法,从而运行__Run方法:
void ShortLink::__Run() { SOCKET fd_socket = __RunConnect(conn_profile ) ; if (INVALID_SOCKET == fd_socket) return; if (OnSend) { OnSend(this ) ; } else { } __RunReadWrite(fd_socket , errtype , errcode , conn_profile ) ; socket_close(fd_socket ) ; }
ShortLink::__Run 紧接着调用****__RunConnect, __RunReadWrite****,在****__RunConnect****方法中主要任务是创建短链接,用于发送请求:
SOCKET ShortLink::__RunConnect(ConnectProfile& _conn_profile ) { ShortLinkConnectObserver connect_observer(* this ) ; ComplexConnect conn(kShortlinkConnTimeout, kShortlinkConnInterval); SOCKET sock = conn.ConnectImpatient(vecaddr , breaker_ , &connect_observer , _conn_profile .proxy_info .type , proxy_addr , _conn_profile .proxy_info .username , _conn_profile .proxy_info .password ) ; return sock; }
__RunConnect 同样方法很长,我们只看最关键的部分它调用ComplexConnect::ConnectImpatient来创建一个SOCKET返回:
SOCKET ComplexConnect::ConnectImpatient(const std::vector<socket_address>& _vecaddr, SocketBreaker& _breaker, MComplexConnect* _observer, mars::comm::ProxyType _proxy_type, const socket_address* _proxy_addr, const std::string& _proxy_username, const std::string& _proxy_pwd) { for (unsigned int i = 0 ; i < _vecaddr.size(); ++i) { ConnectCheckFSM* ic = NULL; if (mars::comm::kProxyHttpTunel == _proxy_type && _proxy_addr) { ic = new ConnectHttpTunelCheckFSM(_vecaddr[i], *_proxy_addr, _proxy_username, _proxy_pwd, timeout_, i, _observer); } else if (mars::comm::kProxySocks5 == _proxy_type && _proxy_addr) { ic = new ConnectSocks5CheckFSM(_vecaddr[i], *_proxy_addr, _proxy_username, _proxy_pwd, timeout_, i, _observer); } else { ic = new ConnectCheckFSM(_vecaddr[i], timeout_, i, _observer); } vecsocketfsm.push_back(ic); } SOCKET retsocket = INVALID_SOCKET; do { for (unsigned int i = 0 ; i < index; ++i) { if (TcpClientFSM::EReadWrite == vecsocketfsm[i]->Status() && ConnectCheckFSM::ECheckOK == vecsocketfsm[i]->CheckStatus()) { if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(), vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int )(gettickcount() - starttime)); retsocket = vecsocketfsm[i]->Socket(); break ; } } } while (true ); return retsocket; }
接下来就是最关键的收发数据的阶段了,我们看下ShortLink::__RunReadWrite :
void ShortLink::__RunReadWrite(SOCKET _socket, int & _err_type, int & _err_code, ConnectProfile& _conn_profile) { xmessage2_define(message)(TSF"taskid:%_, cgi:%_, @%_" , task_.taskid, task_.cgi, this); std::string url; std::map<std::string, std::string> headers; if (kIPSourceProxy == _conn_profile.ip_type) { url += "http://" ; url += _conn_profile.host; } url += task_.cgi; headers[http::HeaderFields::KStringHost] = _conn_profile.host; AutoBuffer out_buff; shortlink_pack(url, headers, send_body_, send_extend_, out_buff, tracker_.get()); int send_ret = block_socket_send(_socket, (const unsigned char *)out_buff.Ptr(), (unsigned int )out_buff.Length(), breaker_, _err_code); AutoBuffer body; AutoBuffer recv_buf; AutoBuffer extension; int status_code = -1 ; off_t recv_pos = 0 ; MemoryBodyReceiver* receiver = new MemoryBodyReceiver(body); http::Parser parser(receiver, true ); while (true ) { int recv_ret = block_socket_recv(_socket, recv_buf, KBufferSize, breaker_, _err_code, 5000 ); Parser::TRecvStatus parse_status = parser.Recv(recv_buf.Ptr(recv_buf.Length() - recv_ret), recv_ret); if (parser.FirstLineReady()) { status_code = parser.Status().StatusCode(); } if (parse_status == http::Parser::kFirstLineError) { } else if (parse_status == http::Parser::kHeaderFieldsError) { } else if (parse_status == http::Parser::kBodyError) { } else if (parse_status == http::Parser::kEnd) { if (status_code != 200 ) { } else { __OnResponse(kEctOK, status_code, body, extension, _conn_profile, true ); } break ; } else { } } }
ShortLink::__RunReadWrite 会先调用shortlink_pack将所有的请求打包,然后通过block_socket_send发出去,然后不断轮询调用block_socket_recv获得请求,如果获得成功那么调用__OnResponse,我们之前在介绍创建ShortLink的时候会将ShortLink的__OnResponse方法与ShortLinkTaskManager::__OnResponse进行绑定,所以这里会触发ShortLinkTaskManager::__OnResponse的调用。
void ShortLinkTaskManager::__OnResponse(ShortLinkInterface* _worker , ErrCmdType _err_type , int _status , AutoBuffer& _body , AutoBuffer& _extension , bool _cancel_retry , ConnectProfile& _conn_profile ) { int handle_type = Buf2Resp(it ->task .taskid , it ->task .user_context , _body , _extension , err_code , Task::kChannelShort ) ; switch(handle_type){ case kTaskFailHandleNoError: { } break; case kTaskFailHandleSessionTimeout:{ } break; case kTaskFailHandleRetryAllTasks: { } break; case kTaskFailHandleTaskEnd: { } break; case kTaskFailHandleDefault: { } break; default: { } } }
在上面的方法中通过Buf2Resp回调将底层Socket 返回数据传递到业务逻辑层进行处理。
短链接的收发数据大致流程如下图所示 :
3.5.2 长链接StartTask
介绍了短链接的整个数据通信流程后再来看长链接会显得相对轻松点,在长链接部分我们最需要注意的是心跳包的处理,为了完整起见这里还是从NetWorkService的StartTask开始介绍:
bool LongLinkTaskManager::StartTask(const Task& _task ) { lst_cmd_.push_back(task ) ; lst_cmd_.sort(__CompareTask); __RunLoop() ; return true ; }
LongLinkTaskManager::StartTask 和 ShortLinkTaskManager::StartTask 一样都是将任务添加到lst_cmd_,然后执行__RunLoop。
void LongLinkTaskManager::__RunLoop () { __RunOnTimeout(); __RunOnStartTask(); if (!lst_cmd_.empty()) { MessageQueue::FasterMessage (asyncreg_.Get(), MessageQueue::Message ((MessageQueue::MessageTitle_t )this, boost::bind (&LongLinkTaskManager::__RunLoop , this), "LongLinkTaskManager::__RunLoop" ), MessageQueue::MessageTiming (1000 )); } else { } }
__RunLoop 和短连接也没啥两样,最主要的工作还是放在__RunOnStartTask中,然后每间隔1秒重新发送一个FasterMessage。继续执行__RunLoop:
void LongLinkTaskManager::__RunOnStartTask() { bool canretry = curtime - lastbatcherrortime_ >= retry_interval_; bool canprint = true ; int sent_count = 0 ; while (first != last) { std::list <TaskProfile>::iterator next = first; ++next; AutoBuffer bufreq; AutoBuffer buffer_extension; int error_code = 0 ; if (!first->antiavalanche_checked) { first->antiavalanche_checked = true ; } if (!longlinkconnectmon_->MakeSureConnected() ) { } if (0 == bufreq.Length() ) { if (!Req2Buf(first ->task .taskid , first ->task .user_context , bufreq , buffer_extension , error_code , Task::kChannelLong ) ) { __SingleRespHandle(first , kEctEnDecode , error_code , kTaskFailHandleTaskEnd , longlink_ ->Profile() ); first = next; continue; } } first->transfer_profile.loop_start_task_time = ::gettickcount() ; first->transfer_profile.first_pkg_timeout = __FirstPkgTimeout(first ->task .server_process_cost , bufreq .Length() , sent_count, dynamic_timeout_.GetStatus() ); first->current_dyntime_status = (first->task.server_process_cost <= 0 ) ? dynamic_timeout_.GetStatus() : kEValuating; first->transfer_profile.read_write_timeout = __ReadWriteTimeout(first ->transfer_profile .first_pkg_timeout ) ; first->transfer_profile.send_data_size = bufreq.Length() ; first->running_id = longlink_->Send(bufreq , buffer_extension , first ->task ) ; ++sent_count; first = next; } }
__RunOnStartTask 中最关键的部分在于MakeSureConnected * 以及 LongLink::Send
MakeSureConnected 起始在初始化Mars的时候已经简单介绍过了,我们这里展开介绍下,主要还是看,心跳包的控制:
bool LongLinkConnectMonitor::MakeSureConnected() { __IntervalConnect(kTaskConnect ) ; return LongLink::kConnected == longlink_.ConnectStatus() ; } uint64_t LongLinkConnectMonitor::__IntervalConnect(int _type ) { if (LongLink::kConnecting == longlink_.ConnectStatus() || LongLink::kConnected == longlink_.ConnectStatus() ) return 0 ; uint64_t interval = __Interval(_type , activelogic_ ) * 1000 ULL; if (posttime >= interval) { bool ret = longlink_.MakeSureConnected(&newone ) ; } else { } }
我们重点看下****__Interval*****:
static unsigned long __Interval(int _type, const ActiveLogic& _activelogic) { unsigned long int erval = sg_interval[_type][__CurActiveState(_activelogic)]; if (kLongLinkConnect != _type) return int erval; if (__CurActiveState(_activelogic) == kInactive || __CurActiveState(_activelogic) == kForgroundActive) { if (!_activelogic.IsActive() && GetAccountInfo().username.empty()) { int erval = kNoAccountInfoInactiveInterval; xwarn2(TSF"no account info and inactive, interval:%_" , int erval); } else if (kNoNet == getNetInfo()) { int erval = int erval * kNoNetSaltRate + kNoNetSaltRise; xinfo2(TSF"no net, interval:%0" , int erval); } else if (GetAccountInfo().username.empty()) { int erval = int erval * kNoAccountInfoSaltRate + kNoAccountInfoSaltRise; xinfo2(TSF"no account info, interval:%0" , int erval); } else { int erval += rand() % (20 ); } } return int erval; }
它会根据连接类型,当前设备活跃状态设置下一次心跳的时间,它的值存在二维数组sg_interval中:
kForgroundOneMinute | kForgroundTenMinute | kForgroundActive | kBackgroundActive | kInactive static unsigned long const sg_interval[][5 ] = { kTaskConnect: { 5 , 10 , 20 , 30 , 300 }, kLongLinkConnect: { 15 , 30 , 240 , 300 , 600 }, kNetworkChangeConnect: { 0 , 0 , 0 , 0 , 0 }, };
我们继续回到LongLink::MakeSureConnected
bool LongLink::MakeSureConnected (bool * _newone) { thread_.start (&newone); return false ; }
LongLink::MakeSureConnected 的处理非常简单只是调用了thread_.start,实际上会执行LongLink::__Run 方法:
void LongLink::__Run() { SOCKET sock = __RunConnect(conn_profile ) ; __RunReadWrite(sock , errtype , errcode , conn_profile ) ; socket_close(sock ) ; }
LongLink::__Run 方法和短链接一样都只执行两个方法****__RunConnect创建SOCKET, __RunReadWrite****开始运行读写循环。
SOCKET LongLink::__RunConnect(ConnectProfile& _conn_profile ) { LongLinkConnectObserver connect_observer(* this , ip_items ) ; ComplexConnect com_connect(kLonglinkConnTimeout , kLonglinkConnInteral , kLonglinkConnInteral , kLonglinkConnMax ) ; SOCKET sock = com_connect.ConnectImpatient(vecaddr , connectbreak_ , &connect_observer , proxy_info .type , proxy_addr , proxy_info .username , proxy_info .password ) ; return sock; }
长短链接最大的区别还是在****__RunReadWrite****方法上:
void LongLink::__RunReadWrite(SOCKET _sock , ErrCmdType& _errtype , int & _errcode , ConnectProfile& _profile ) { Alarm alarmnoopinterval(boost::bind(&LongLink::__OnAlarm, this), false ); Alarm alarmnooptimeout(boost::bind(&LongLink::__OnAlarm, this), false ); while (true ) { if (!alarmnoopinterval.IsWaiting() ) { if (first_noop_sent && alarmnoopinterval.Status() != Alarm::kOnAlarm) { xassert2(false , "noop interval alarm not running" ); } if (first_noop_sent && alarmnoopinterval.Status() == Alarm::kOnAlarm) { __NotifySmartHeartbeatJudgeDozeStyle() ; } xgroup2_define(noop_xlog ) ; uint64_t last_noop_interval = alarmnoopinterval.After() ; uint64_t last_noop_actual_interval = (alarmnoopinterval.Status() == Alarm::kOnAlarm) ? alarmnoopinterval.ElapseTime() : 0 ; bool has_late_toomuch = (last_noop_actual_interval >= (15 *60 *1000 )); if (__NoopReq(noop_xlog , alarmnooptimeout , has_late_toomuch ) ) { nooping = true ; __NotifySmartHeartbeatHeartReq(_profile , last_noop_interval , last_noop_actual_interval ) ; } first_noop_sent = true ; uint64_t noop_interval = __GetNextHeartbeatInterval() ; alarmnoopinterval.Cancel() ; alarmnoopinterval.Start((int ) noop_interval); } if (sel.Write_FD_ISSET(_sock ) && !lstsenddata_.empty() ) { xgroup2_define(xlog_group ) ; xinfo2(TSF"task socket send sock:%0, " , _sock) >> xlog_group; iovec* vecwrite = (iovec*)calloc(lstsenddata_.size() , sizeof(iovec)); unsigned int offset = 0 ; for (auto it = lstsenddata_.begin () ; it != lstsenddata_.end () ; ++it) { vecwrite[offset ] .iov_base = it->second->PosPtr() ; vecwrite[offset ] .iov_len = it->second->PosLength() ; ++offset; } ssize_t writelen = writev(_sock, vecwrite, (int )lstsenddata_.size() ); free(vecwrite); unsigned long long noop_interval = __GetNextHeartbeatInterval() ; alarmnoopinterval.Cancel() ; alarmnoopinterval.Start((int ) noop_interval); auto it = lstsenddata_.begin () ; while (it != lstsenddata_.end () && 0 < writelen) { if (0 == it->second->Pos() && OnSend) OnSend(it ->first .taskid ) ; if ((size_t)writelen >= it->second->PosLength() ) { LongLinkNWriteData nwrite(it->second->Length() , it->first); nsent_datas.push_back(nwrite ) ; it = lstsenddata_.erase(it); } else { } } } if (sel.Read_FD_ISSET(_sock ) ) { bufrecv.AllocWrite(64 * 1024, false ) ; ssize_t recvlen = recv(_sock, bufrecv.PosPtr() , 64 * 1024 , 0 ); while (0 < bufrecv.Length() ) { uint32_t cmdid = 0 ; uint32_t taskid = Task::kInvalidTaskID; size_t packlen = 0 ; AutoBuffer body; AutoBuffer extension; int unpackret = longlink_unpack(bufrecv , cmdid , taskid , packlen , body , extension , tracker_ .get () ); if (LONGLINK_UNPACK_STREAM_PACKAGE == unpackret) { if (OnRecv) OnRecv(taskid , packlen , packlen ) ; } else if (!__NoopResp(cmdid , taskid , stream_resp .stream , stream_resp .extension , alarmnooptimeout , nooping , _profile ) ) { if (OnResponse) OnResponse(kEctOK , 0, cmdid , taskid , stream_resp .stream , stream_resp .extension , _profile ) ; sent_taskids.erase(taskid); } } } } }
bool LongLink::Send(const AutoBuffer& _body , const AutoBuffer& _extension , const Task& _task ) { lstsenddata_.push_back(std ::make_pair (_task , move_wrapper <AutoBuffer>(AutoBuffer() ))); longlink_pack(_task .cmdid , _task .taskid , _body , _extension , lstsenddata_ .back () .second, tracker_.get() ); lstsenddata_.back() .second->Seek(0, AutoBuffer::ESeekStart) ; return true ; }
关于长链接的数据收发,在发送的时候会调用LongLink::Send 这时候会将数据放置到lstsenddata_,在有写数据的socket的时候,会将lstsenddata_的数据调用writev写到网络端口,从而发送出去,而读数据和短链接流程类似都是将数据从端口读出,然后通过onResponse将数据交给LongLink,再由LongLink将数据交给LongLinkManager,LongLinkManager再将数据通过持有的Req2Buf callback,将数据传递到应用层。
整个Mars的流程图如下所示: