源码信息

Mars 是微信官方的终端基础组件,是一个使用 C++ 编写的业务性无关,平台性无关的基础组件,目前已经开源,下面是官方的源码以及文档地址:

Mars 概览

整个项目主要包含如下几个模块:

COMM:基础库,包括socket、线程、消息队列、协程等基础工具;
XLOG:通用日志模块,充分考虑移动终端的特点,提供高性能、高可用、安全性、容错性的日志功能;
SDT:网络诊断模块;
STN:是微信的信令传输网络模块,它是基于Socket层的网络解决方案(它并不支持完整的 HTTP 协议),负责终端与服务器的小数据信令通道。是微信日常中使用最频繁的网络通道,STN中包含了很多其他方面的实用设计:包括自定义DNS、容灾设计、负载考量、APP的前后台考量、休眠机制考量、省电机制等等。网络通道上,目前STN提供了长连、短连两种类型的通道,用于满足不同的需求。使用STN后,应用开发者只需关注业务开发。
CDN: 数据分发网络,负责大数据传输,这部分涉及具体的业务,所以未开源

下面是几种网络开源库的对比:

Mars Sample 代码分析

1. 让Sample项目跑起来

微信官方有专门的接入文档:
Mars iOS/OS X 接入指南 一种是以framework形式引入的,一般实际开发项目中会以这种方式接入,另一种是调试模式,初期分析Mars Sample代码的时候需要以这种方式引入,这里需要注意的是编译成功后需要将mars.framework拷贝到项目文件夹下再添加到项目中。不然会提示找不到某些头文件。

这里还需要注意的是如果要以调试模式进行接入,运行编译脚本的时候需要选择3,否则找不到mars.xcodeproj

Enter menu:
1. Clean && build mars.
2. Clean && build xlog.
3. Gen iOS mars Project.
4. Exit

下面就以调试模式来开始我们mars源码的分析:

2. Sample代码分析

Mars Sample 业务层核心部分主要由NetworkStatusNetworkServiceNetworkEvent 三大部分构成,在介绍Mars Sample 业务层代码之前我们先过下这部分功能。

NetworkStatus

这个类用于监听网络状态的,具体的思想和AFNetWorking 里面的AFNetworkReachabilityManager思路是一致的,只不过代码有点…..所以不贴代码了。大家可以看下之前介绍AFNetWorking的源码分析博客。

只要调用了Start方法之后在网络状态改变后都会调用它的ChangeReach方法。

这里我们在Appdelegate类中将NetworkService作为Start参数,也就是说在网络状态改变的时候会调用NetworkService的ChangeReach方法。

NetworkService

是整个业务层比较重要的一个类,它和底层mars关系最为密切。它主要有如下功能:

1. Mars 的上层调用
2. 网络状态NetworkStatus的监听器,并将这个网络状态传递给Mars底层的mars::baseevent
3. 将Mars底层的callback通知到****NetworkEvent****

NetworkEvent

NetworkEvent 管理着taskscontrollerspushrecvers

@interface NetworkEvent : NSObject<NetworkDelegate> {
NSMutableDictionary *tasks;
NSMutableDictionary* controllers;
NSMutableDictionary* pushrecvers;
}

在有数据需要分发下去的时候可以通过它来分发,一般事件的起源都是从Mars底层callback产生的,然后经过NetworkService传递给NetworkEventNetworkEvent负责分发给对应的对象。下面是这三者之间的关系,整体如下图所示:

NetworkService 负责设置 Mars,由于在NetworkService setCallBack中设置了对应的回调所以Mars一旦有回调就传给NetworkServiceNetworkService 将 Mars上传的事件传递给NetworkEventNetworkEvent再将事件传递给对应的controllers,pushrecvers。

接下来我们会以如下几个部分对业务层代码进行解析:

2.1 Mars 网络层初始化

- (BOOL)application:(UIApplication *)application didFinishLaunchingWithOptions:(NSDictionary *)launchOptions {

// 设置NetworkEvent为NetworkService的代理以便将NetworkService感知的事件传递给NetworkEvent进行分发
[NetworkService sharedInstance].delegate = [[NetworkEvent alloc] init];
// 监听Mars的底层回调,在Mars有关键事件发生的时候可以通知到NetworkService
[[NetworkService sharedInstance] setCallBack];
// 创建Mars
[[NetworkService sharedInstance] createMars];
// 设置客户端版本
[[NetworkService sharedInstance] setClientVersion:200];
// 添加长链接地址和端口
[[NetworkService sharedInstance] setLongLinkAddress:@"localhost" port:8081];
// 添加短链接监听端口
[[NetworkService sharedInstance] setShortLinkPort:8080];
// 上报当前应用处于前台状态
[[NetworkService sharedInstance] reportEvent_OnForeground:YES];
// 确保长链接已经连接上
[[NetworkService sharedInstance] makesureLongLinkConnect];
// 启动网络状态监听
[[NetworkStatus sharedInstance] Start:[NetworkService sharedInstance]];

return YES;
}

- (void)applicationDidEnterBackground:(UIApplication *)application {
//上报当前应用处于后台状态
[[NetworkService sharedInstance] reportEvent_OnForeground:NO];
}

- (void)applicationWillEnterForeground:(UIApplication *)application {
//上报当前应用处于前端状态
[[NetworkService sharedInstance] reportEvent_OnForeground:YES];
}

- (void)applicationWillTerminate:(UIApplication *)application {
// 销毁Mars
[[NetworkService sharedInstance] destroyMars];
appender_close();
}

这部分我们先埋个坑,在介绍Mars底层代码的时候我们再展开介绍。

2.2 通过Mars 拉取数据

- (void)loadView {
[super loadView];

//......
converSations = [[NSArray alloc] init];
CGITask *convlstCGI = [[CGITask alloc] initAll:ChannelType_ShortConn AndCmdId:kConvLst AndCGIUri:@"/mars/getconvlist" AndHost:@"localhost"];
[[NetworkService sharedInstance] startTask:convlstCGI ForUI:self];
}

- (NSData*)requestSendData {
ConversationListRequest *convlstRequest = [ConversationListRequest new];
convlstRequest.type = 0;
convlstRequest.accessToken = @"123456";
NSData *data = [convlstRequest data];
return data;
}

- (int)onPostDecode:(NSData*)responseData {
convlstResponse = [ConversationListResponse parseFromData:responseData error:nil];
self->converSations = convlstResponse.listArray;
LOG_INFO(kModuleViewController, @"recv conversation list, size: %lu", (unsigned long)[self->converSations count]);
return [self->converSations count] > 0 ? 0 : -1;
}

- (int)onTaskEnd:(uint32_t)tid errType:(uint32_t)errtype errCode:(uint32_t)errcode {
dispatch_async(dispatch_get_main_queue(), ^{
[self.tableView reloadData];
});
return 0;
}

2.2 通过Mars 监听下发的数据

- (void)viewDidLoad {
[super viewDidLoad];
self.title = _conversation.notice;
//.......
_messages = [NSMutableArray new];
[[NetworkService sharedInstance] addPushObserver:self withCmdId:kPushMessageCmdId];
}

-(NSData*)requestSendData {
SendMessageRequest *sendMsgRequest = [SendMessageRequest new];
sendMsgRequest.from = [self username];
sendMsgRequest.to = @"all";
sendMsgRequest.text = _textField.text;
sendMsgRequest.accessToken = @"123456";
sendMsgRequest.topic = _conversation.topic;
LOG_INFO(kModuleViewController, @"send msg to topic:%@", _conversation.notice);
NSData* data = [sendMsgRequest data];
dispatch_async(dispatch_get_main_queue(), ^{
_textField.text = @"";
});
return data;
}

-(int)onPostDecode:(NSData*)responseData {
SendMessageResponse *sendMsgResponse = [SendMessageResponse parseFromData:responseData error:nil];
dispatch_async(dispatch_get_main_queue(), ^{
NSString *recvtext = [NSString stringWithFormat:@"%@ : %@", sendMsgResponse.from, sendMsgResponse.text];
[self.messages addObject:recvtext];
[self.tableView reloadData];
NSIndexPath *indexPath = [NSIndexPath indexPathForRow:self.messages.count-1 inSection:0];
[self.tableView scrollToRowAtIndexPath:indexPath atScrollPosition:UITableViewScrollPositionBottom animated:YES];
});
return sendMsgResponse.errCode == 0 ? 0 : -1;
}

- (void)sendMessage {
CGITask *sendMsgCGI = [[CGITask alloc] initAll:ChannelType_LongConn AndCmdId:kSendMsg AndCGIUri:@"/mars/sendmessage" AndHost:@"localhost"];
[[NetworkService sharedInstance] startTask:sendMsgCGI ForUI:self];
}

- (void)notifyPushMessage:(NSData*)pushData withCmdId:(int)cmdId {
MessagePush* messagePush = [MessagePush parseFromData:pushData error:nil];
if (messagePush != nil) {
dispatch_async(dispatch_get_main_queue(), ^{
NSString *recvtext = [NSString stringWithFormat:@"%@ : %@", messagePush.from, messagePush.content];
[self.messages addObject:recvtext];
[self.tableView reloadData];
NSIndexPath *indexPath = [NSIndexPath indexPathForRow:self.messages.count-1 inSection:0];
[self.tableView scrollToRowAtIndexPath:indexPath atScrollPosition:UITableViewScrollPositionBottom animated:YES];
});
}
}

- (int)onTaskEnd:(uint32_t)tid errType:(uint32_t)errtype errCode:(uint32_t)errcode {
return 0;
}

在Sameple 例子中每个请求都被定义成一个CGITask,这里面包含了任务id,长链接,短链接通道的选择,用于表示标示某个请求的id,url 以及host地址。

@interface CGITask : NSObject

@property(nonatomic) uint32_t taskid;
@property(nonatomic) ChannelType channel_select;
@property(nonatomic) uint32_t cmdid;
@property(nonatomic, copy) NSString *cgi;
@property(nonatomic, copy) NSString *host;

@end

紧接着调用startTask启动任务:

[[NetworkService sharedInstance] startTask:convlstCGI ForUI:self];
- (int)startTask:(CGITask *)task ForUI:(id<UINotifyDelegate>)delegateUI {
Task ctask;
ctask.cmdid = task.cmdid;
ctask.channel_select = task.channel_select;
ctask.cgi = std::string(task.cgi.UTF8String);
ctask.shortlink_host_list.push_back(std::string(task.host.UTF8String));
ctask.user_context = (__bridge void*)task;

mars::stn::StartTask(ctask);

NSString *taskIdKey = [NSString stringWithFormat:@"%d", ctask.taskid];
[_delegate addObserver:delegateUI forKey:taskIdKey];
[_delegate addCGITasks:task forKey:taskIdKey];
return ctask.taskid;
}

在startTask方法中会将CGITask转换为Task,并调用mars::stn::StartTask 然后将当前当前对象,以及task添加到NetworkEvent中,这样一旦有事件就会通知到它们。我们看下NetworkEvent 中与controllers有关的方法,这些方法都是task请求触发的。

- (NSData*)Request2BufferWithTaskID:(uint32_t)tid task:(CGITask *)task {
NSData* data = NULL;
NSString *taskIdKey = [NSString stringWithFormat:@"%d", tid];
id<UINotifyDelegate> uiObserver = [controllers objectForKey:taskIdKey];
if (uiObserver != nil) {
data = [uiObserver requestSendData];
}
return data;
}

- (NSInteger)Buffer2ResponseWithTaskID:(uint32_t)tid responseData:(NSData *)data task:(CGITask *)task {
int returnType = 0;
NSString *taskIdKey = [NSString stringWithFormat:@"%d", tid];
id<UINotifyDelegate> uiObserver = [controllers objectForKey:taskIdKey];
if (uiObserver != nil) {
returnType = [uiObserver onPostDecode:data];
}
else {
returnType = -1;
}
return returnType;
}

- (NSInteger)OnTaskEndWithTaskID:(uint32_t)tid task:(CGITask *)task errType:(uint32_t)errtype errCode:(uint32_t)errcode {
NSString *taskIdKey = [NSString stringWithFormat:@"%d", tid];
[tasks removeObjectForKey:taskIdKey];
id<UINotifyDelegate> uiObserver = [controllers objectForKey:taskIdKey];
[uiObserver onTaskEnd:tid errType:errtype errCode:errcode];
[controllers removeObjectForKey:taskIdKey];
return 0;
}

也就是说在Request2BufferWithTaskIDBuffer2ResponseWithTaskIDOnTaskEndWithTaskID 会分别触发 发起 Task类的requestSendDataonPostDecodeOnTaskEndWithTaskID方法。

而这三者在会在stn_callback.mm中的StnCallBack::Req2BufStnCallBack::Buf2RespStnCallBack::OnTaskEnd 调用NetworkService的Request2BufferWithTaskIDBuffer2ResponseWithTaskIDOnTaskEndWithTaskID方法。

bool StnCallBack::Req2Buf(uint32_t _taskid, void* const _user_context, AutoBuffer& _outbuffer, AutoBuffer& _extend, int& _error_code, const int _channel_select) {
NSData* requestData = [[NetworkService sharedInstance] Request2BufferWithTaskID:_taskid userContext:_user_context];
if (requestData == nil) {
requestData = [[NSData alloc] init];
}
_outbuffer.AllocWrite(requestData.length);
_outbuffer.Write(requestData.bytes,requestData.length);
return requestData.length > 0;
}

int StnCallBack::Buf2Resp(uint32_t _taskid, void* const _user_context, const AutoBuffer& _inbuffer, const AutoBuffer& _extend, int& _error_code, const int _channel_select) {
int handle_type = mars::stn::kTaskFailHandleNormal;
NSData* responseData = [NSData dataWithBytes:(const void *) _inbuffer.Ptr() length:_inbuffer.Length()];
NSInteger errorCode = [[NetworkService sharedInstance] Buffer2ResponseWithTaskID:_taskid ResponseData:responseData userContext:_user_context];
if (errorCode != 0) {
handle_type = mars::stn::kTaskFailHandleDefault;
}
return handle_type;
}

int StnCallBack::OnTaskEnd(uint32_t _taskid, void* const _user_context, int _error_type, int _error_code) {
return (int)[[NetworkService sharedInstance] OnTaskEndWithTaskID:_taskid userContext:_user_context errType:_error_type errCode:_error_code];
}

至于stn_callback什么时候被调用这里先不做过多的介绍,后续介绍底层的时候再来揭开这个答案。

现在我们看下通过上层拉取一个接口是怎样的一个过程:

1. 构建一个CGITask,传入使用哪种渠道:长链接?短链接?地址,域名,命令id
2. 调用mars::stn::StartTask
3. 将当前类添加到NetWorkEvent的controller列表,
4. 调用mars::stn::StartTask后,Mars 对应的事件会从底层传给NetWorkService,再通过NetWorkEvent分发给对应的controller,我们可以在requestSendData,onPostDecode,OnTaskEndWithTaskID做响应的处理。

看完拉取请求后,其实接收推送的过程也差不多类似:

stn_callback.mm

void StnCallBack::OnPush(uint64_t _channel_id, uint32_t _cmdid, uint32_t _taskid, const AutoBuffer& _body, const AutoBuffer& _extend) {
if (_body.Length() > 0) {
NSData* recvData = [NSData dataWithBytes:(const void *) _body.Ptr() length:_body.Length()];
[[NetworkService sharedInstance] OnPushWithCmd:_cmdid data:recvData];
}

}

NetworkService.m

- (void)OnPushWithCmd:(NSInteger)cid data:(NSData *)data {
return [_delegate OnPushWithCmd:cid data:data];
}

NetworkEvent.m

- (void)OnPushWithCmd:(NSInteger)cid data:(NSData *)data {
id<PushNotifyDelegate> pushObserver = [pushrecvers objectForKey:[NSString stringWithFormat:@"%d", cid]];
if (pushObserver != nil) {
[pushObserver notifyPushMessage:data withCmdId:cid];
}
}

注册监听下发对象

[[NetworkService sharedInstance] addPushObserver:self withCmdId:kPushMessageCmdId];

3. Mars 底层源码解析

3.1 回调设置

- (void)setCallBack {
mars::stn::SetCallback(mars::stn::StnCallBack::Instance());
mars::app::SetCallback(mars::app::AppCallBack::Instance());
}

在PublicComponentV2 文件夹下面有 app_callback 以及 stn_callback两组文件:

stn_callback这个文件是STN模块的回调接口,app_callback是应用相关的回调接口,Mars 通过这些回调接口从上层获取对应的定制化服务,从而将部分可定制的内容放在应用层来做,将这部分业务相关的从底层抽取出来。我们来看下这部分接口,至于具体的时候在介绍到对应的逻辑的时候再介绍,这里主要是了解,回调有哪些接口,这些接口在底层怎么使用:

StnCallBack 接口定义如下:

class StnCallBack : public Callback {

private:
//.....
public:
//......
//流量统计
virtual void TrafficData(ssize_t _send, ssize_t _recv);
//底层询问上层该host对应的ip列表
virtual std::vector<std::string> OnNewDns(const std::string& _host);
//网络层收到push消息回调
virtual void OnPush(uint64_t _channel_id, uint32_t _cmdid, uint32_t _taskid, const AutoBuffer& _body, const AutoBuffer& _extend);
//底层获取task要发送的数据
virtual bool Req2Buf(uint32_t _taskid, void* const _user_context, AutoBuffer& _outbuffer, AutoBuffer& _extend, int& _error_code, const int _channel_select);
//底层回包返回给上层解析
virtual int Buf2Resp(uint32_t _taskid, void* const _user_context, const AutoBuffer& _inbuffer, const AutoBuffer& _extend, int& _error_code, const int _channel_select);
//任务执行结束
virtual int OnTaskEnd(uint32_t _taskid, void* const _user_context, int _error_type, int _error_code);
//上报网络连接状态
virtual void ReportConnectStatus(int _status, int longlink_status);
//长连信令校验 ECHECK_NOW, ECHECK_NEVER = 1, ECHECK_NEXT = 2
virtual int GetLonglinkIdentifyCheckBuffer(AutoBuffer& _identify_buffer, AutoBuffer& _buffer_hash, int32_t& _cmdid);
//长连信令校验回包
virtual bool OnLonglinkIdentifyResponse(const AutoBuffer& _response_buffer, const AutoBuffer& _identify_buffer_hash);
//.....
private:
static StnCallBack* instance_;

};

AppCallBack 接口定义如下:

class AppCallBack : public Callback {

private:
//.....
public:
//.....
//获取应用路径
virtual std::string GetAppFilePath();
//获取当前用户信息
virtual AccountInfo GetAccountInfo();
//获取版本信息
virtual unsigned int GetClientVersion();
//获取设备信息
virtual DeviceInfo GetDeviceInfo();
private:
static AppCallBack* instance_;
};

调用SetCallback后就会将对应的callback保存到sg_callback中:

void SetCallback(Callback* const callback) {
sg_callback = callback;
}

StnCallBack 将会在stn_logic.cc被注册到Mars底层的stn模块中,AppCallBack 将会在 app_logic.cc中被注入到Mars底层的app模块中,至于这些模块的作用我们后面会具体介绍。设置完回调之后就可以通过这些回调就可以将与业务相关的逻辑交给业务层,比如流量统计这部分在Mars的stn模块就可以通过如下方式调用上层逻辑:

void (*TrafficData)(ssize_t _send, ssize_t _recv)
= [](ssize_t _send, ssize_t _recv) {
xassert2(sg_callback != NULL);
return sg_callback->TrafficData(_send, _recv);
};

3.2 创建Mars

Mars的创建是通过createMars方法进行创建的:

[[NetworkService sharedInstance] createMars];

这里调用了mars::baseevent的OnCreate

- (void) createMars {
mars::baseevent::OnCreate();
}

在继续介绍Mars创建的过程之前我们先看下mars::baseevent的工作机制:

mars::baseevent里面除了OnCreate外还有如下几个方法,它们的用法看方法名就可以看出,我们这里不介绍这些方法。

void OnCreate()
void OnDestroy()
void OnSingalCrash(int _sig)
void OnExceptionCrash()
void OnForeground(bool _isforeground)
void OnNetworkChange()

mars::baseevent::OnCreate() 实现如下:

void OnCreate() {
GetSignalOnCreate()();
}

baseprjevent.cc实现如下,这里利用了boost这个c++库的signals2信号槽的机制,接触过qt编程的大家都会理解信号槽这个概念。它相当于将一个信号与一个方法绑定在一起,只要发起那个信号,对应绑定的方法就会触发。

boost::signals2::signal<void ()>& GetSignalOnCreate() {
static boost::signals2::signal<void ()> SignalOnCreate;
return SignalOnCreate;
}

那么这个绑定关系是在哪里绑定的呢?

我们可以看到BOOT_RUN_STARTUP宏的定义,这些方法是会在启动的时候执行,

#define BOOT_RUN_STARTUP(func) VARIABLE_IS_NOT_USED static int __anonymous_run_variable_startup_##func = __boot_run_atstartup(func)

下面罗列出了包含BOOT_RUN_STARTUP的类

message_queue.cc

BOOT_RUN_STARTUP(__RgisterANRCheckCallback);

app_logic.cc

BOOT_RUN_STARTUP(__InitbindBaseprjevent);

active_logic.cc

BOOT_RUN_STARTUP(__initbind_baseprjevent);

active_logic.cc

BOOT_RUN_STARTUP(__initbind_baseprjevent);

sdt_logic.cc

BOOT_RUN_STARTUP(__initbind_baseprjevent);

stn_logic.cc

BOOT_RUN_STARTUP(__initbind_baseprjevent);

我们可以看到:sdt_logic.cc

static void __initbind_baseprjevent() {
//.....
GetSignalOnCreate().connect(&onCreate);
GetSignalOnDestroy().connect(5, &onDestroy);
}

以及stn_logic.cc

static void __initbind_baseprjevent() {

//.....
GetSignalOnCreate().connect(&onCreate);
GetSignalOnDestroy().connect(&onDestroy); //low priority signal func
GetSignalOnSingalCrash().connect(&onSingalCrash);
GetSignalOnExceptionCrash().connect(&onExceptionCrash);
GetSignalOnNetworkChange().connect(5, &onNetworkChange); //define group 5
//...
GetSignalOnNetworkDataChange().connect(&OnNetworkDataChange);
}

都包含将GetSignalOnCreate信号通过信号槽与对应方法进行绑定的工作:因此sdt_logic.cc以及stn_logic.cc中的onCreate方法就是我们调用createMars时候底层的所有操作:

sdt_logic.cc

static void onCreate() {
//....
SdtCore::Singleton::Instance();
}

SdtCore 模块的初始化:

SdtCore::SdtCore()
: thread_(boost::bind(&SdtCore::__RunOn, this))
, check_list_(std::list<BaseChecker*>())
, cancel_(false)
, checking_(false) {
xinfo_function();
}

stn_logic.cc

static void onCreate() {
//....
ActiveLogic::Singleton::Instance();
NetCore::Singleton::Instance();
}
ActiveLogic::ActiveLogic()
: isforeground_(false), isactive_(true)
, alarm_(boost::bind(&ActiveLogic::__OnInActive, this), false)
, lastforegroundchangetime_(::gettickcount())
{
//....
}
NetCore::NetCore()
: messagequeue_creater_(true, XLOGGER_TAG)
//很长很长的初始化
}

在这两个文件中分别初始化了SdtCoreNetCoreActiveLogic模块。

3.3 设置长链接地址和短链接地址

设置长链接地址:

[[NetworkService sharedInstance] setLongLinkAddress:@"localhost" port:8081];
void (*SetLonglinkSvrAddr)(const std::string& host, const std::vector<uint16_t> ports, const std::string& debugip)
= [](const std::string& host, const std::vector<uint16_t> ports, const std::string& debugip) {
std::vector<std::string> hosts;
if (!host.empty()) {
hosts.push_back(host);
}
NetSource::SetLongLink(hosts, ports, debugip);
};

设置短链接地址:

[[NetworkService sharedInstance] setShortLinkPort:8080];
void (*SetShortlinkSvrAddr)(const uint16_t port, const std::string& debugip)
= [](const uint16_t port, const std::string& debugip) {
NetSource::SetShortlink(port, debugip);
};
void NetSource::SetLongLink(const std::vector<std::string>& _hosts, const std::vector<uint16_t>& _ports, const std::string& _debugip) {
ScopedLock lock(sg_ip_mutex);
//......
if (!_hosts.empty()) {
sg_longlink_hosts = _hosts;
}
else {
//.......
}
sg_longlink_ports = _ports;
}

void NetSource::SetShortlink(const uint16_t _port, const std::string& _debugip) {
ScopedLock lock(sg_ip_mutex);
//......
sg_shortlink_port = _port;
sg_shortlink_debugip = _debugip;
}

最终会将长链接,短链接的host以及port存到NetSource的sg_longlink_hostssg_longlink_portssg_shortlink_port数组中。

3.3 上报处于前台后台的状态

[[NetworkService sharedInstance] reportEvent_OnForeground:YES/NO];

同样它也是依靠信号来触发的,最终的绑定在active_logic.cc中:

static void __initbind_baseprjevent() {
GetSignalOnForeground().connect(&onForeground);
}

static void onForeground(bool _isforeground) {
ActiveLogic::Singleton::Instance()->OnForeground(_isforeground);
}

void ActiveLogic::OnForeground(bool _isforeground) {

//....
if (_isforeground == isforeground_) return;

bool oldisactive = isactive_;
isactive_ = true;
isforeground_ = _isforeground;
lastforegroundchangetime_ = ::gettickcount();
alarm_.Cancel();

//.....
bool isnotify = oldisactive!=isactive_;
SignalForeground(isforeground_);

if (isnotify) {
SignalActive(isactive_);
}
}

ActiveLogic::OnForeground方法中最重要的就是SignalForeground,以及SignalActive的调用,这也是通过信号来发送的,所以直接略过中间的环境,看下最终的底层调用:

LongLinkConnectMonitor::LongLinkConnectMonitor(ActiveLogic& _activelogic, LongLink& _longlink, MessageQueue::MessageQueue_t _id)
: asyncreg_(MessageQueue::InstallAsyncHandler(_id))
, activelogic_(_activelogic), longlink_(_longlink), alarm_(boost::bind(&LongLinkConnectMonitor::__OnAlarm, this), _id)
, status_(LongLink::kDisConnected)
, last_connect_time_(0)
, last_connect_net_type_(kNoNet)
, thread_(boost::bind(&LongLinkConnectMonitor::__Run, this), XLOGGER_TAG"::con_mon")
, conti_suc_count_(0)
, isstart_(false) {
/*SignalActive 信号绑定*/
activelogic_.SignalActive.connect(boost::bind(&LongLinkConnectMonitor::__OnSignalActive, this, _1));
/*SignalForeground 信号绑定*/
activelogic_.SignalForeground.connect(boost::bind(&LongLinkConnectMonitor::__OnSignalForeground, this, _1));
longlink_.SignalConnection.connect(boost::bind(&LongLinkConnectMonitor::__OnLongLinkStatuChanged, this, _1));
}
void LongLinkConnectMonitor::__OnSignalForeground(bool _isForeground) {
//.......
if (_isForeground) {
longlink_.GetLastRecvTime().get(), int64_t(tickcount_t().gettickcount() - longlink_.GetLastRecvTime()));
if ((longlink_.ConnectStatus() == LongLink::kConnected) &&
(tickcount_t().gettickcount() - longlink_.GetLastRecvTime() > tickcountdiff_t(4.5 * 60 * 1000))) {
//如果超过4.5 分钟的话就会重新连接
__ReConnect();
}
}
/.......
}

SignalForeground 只有应用在前台的情况下如果当前时间与上一次长链接接收时间超过4.5 分钟的话就会重新连接

void LongLinkConnectMonitor::__OnSignalActive(bool _isactive) {
__AutoIntervalConnect();
}
uint64_t LongLinkConnectMonitor::__AutoIntervalConnect() {
alarm_.Cancel();
uint64_t remain = __IntervalConnect(kLongLinkConnect);
alarm_.Start((int)remain);
return remain;
}

SignalActive 会在__IntervalConnect 间隔时间内自动连接,__IntervalConnect 会根据连接类型,以及当前的应用的状态来查询下面的表格,进而确定间隔时间。

                            kForgroundOneMinute | kForgroundTenMinute | kForgroundActive | kBackgroundActive | kInactive
static unsigned long const sg_interval[][5] = {
kTaskConnect: { 5, 10, 20, 30, 300},
kLongLinkConnect: { 15, 30, 240, 300, 600},
kNetworkChangeConnect: { 0, 0, 0, 0, 0},
};

整个重连机制会在介绍长链接的时候进一步介绍。

3.4 检测长链接状态,保证长链接处于连接状态

我们继续来看Appdelegate中的Mars初始化最后一步makesureLongLinkConnect

[[NetworkService sharedInstance] makesureLongLinkConnect];

下面是层层的调用关系:

- (void)makesureLongLinkConnect {
mars::stn::MakesureLonglinkConnected();
}
void (*MakesureLonglinkConnected)()
= []() {
STN_WEAK_CALL(MakeSureLongLinkConnect());
};
void NetCore::MakeSureLongLinkConnect() {
#ifdef USE_LONG_LINK
ASYNC_BLOCK_START
longlink_task_manager_->LongLinkChannel().MakeSureConnected();
ASYNC_BLOCK_END
#endif
}
bool LongLink::MakeSureConnected(bool* _newone) {
if (_newone) *_newone = false;
ScopedLock lock(mutex_);
if (kConnected == ConnectStatus()) return true;
bool newone = false;
thread_.start(&newone);
if (newone) {
//.....
}
if (_newone) *_newone = newone;
return false;
}

LongLink::MakeSureConnected中最关键的是调用了thread_.start(&newone)

LongLink::LongLink(const mq::MessageQueue_t& _messagequeueid, NetSource& _netsource)
: asyncreg_(MessageQueue::InstallAsyncHandler(_messagequeueid))
, netsource_(_netsource)
/*绑定__Run方法*/
, thread_(boost::bind(&LongLink::__Run, this), XLOGGER_TAG "::lonklink")
, connectstatus_(kConnectIdle)
, disconnectinternalcode_(kNone)
, smartheartbeat_(NULL)
, wakelock_(NULL)
{}

在初始化LongLink的时候已经通过boost::bind将thread_绑定到了__Run所以,在调用start的时候调用的是__Run方法:

void LongLink::__Run() {

//.......

SOCKET sock = __RunConnect(conn_profile);
//......
ErrCmdType errtype = kEctOK;
int errcode = 0;
__RunReadWrite(sock, errtype, errcode, conn_profile);
//....
}

__Run中最关键的部分是__RunConnect以及__RunReadWrite:前者是执行socket连接,后者是在一个循环中等待执行数据的读写。关于数据的读写后面会专门介绍。

3.5 发起请求

我们上面代码可以看出发起请求是通过如下代码来实现的:

- (int)startTask:(CGITask *)task ForUI:(id<UINotifyDelegate>)delegateUI {
Task ctask;
ctask.cmdid = task.cmdid;
ctask.channel_select = task.channel_select;
ctask.cgi = std::string(task.cgi.UTF8String);
ctask.shortlink_host_list.push_back(std::string(task.host.UTF8String));
ctask.user_context = (__bridge void*)task;

mars::stn::StartTask(ctask);

NSString *taskIdKey = [NSString stringWithFormat:@"%d", ctask.taskid];
[_delegate addObserver:delegateUI forKey:taskIdKey];
[_delegate addCGITasks:task forKey:taskIdKey];

return ctask.taskid;
}

上面创建了一个Task对象,将请求所需要的全部数据都放在Task上,然后通过mars::stn::StartTask(ctask) 调用Mars发起请求:

stn_logic.cc

bool (*StartTask)(const Task& _task)
= [](const Task& _task) {
STN_RETURN_WEAK_CALL(StartTask(_task));
};

stn_logic.cc

void NetCore::StartTask(const Task& _task) {

//......
Task task = _task;
//判断当前请求是否有效,如果是下面的情况下就会返回错误
//1. 服务端处理时间超过2分钟
//2. 尝试次数超过30次
//3. 超时10分钟
//4. 长连接cmdid为0
//5. 短连接cgi为空
if (!__ValidAndInitDefault(task, group)) {
OnTaskEnd(task.taskid, task.user_context, kEctLocal, kEctLocalTaskParam);
return;
}
//.....
if (0 == task.channel_select) {
//通道类型错误
OnTaskEnd(task.taskid, task.user_context, kEctLocal, kEctLocalChannelSelect);
return;
}

//没有网络,并且在长链接的情况下未连接
if (task.network_status_sensitive && kNoNet ==::getNetInfo()
#ifdef USE_LONG_LINK
&& LongLink::kConnected != longlink_task_manager_->LongLinkChannel().ConnectStatus()
#endif
) {
OnTaskEnd(task.taskid, task.user_context, kEctLocal, kEctLocalNoNet);
return;
}

bool start_ok = false;

//如果发现长链接未连接,并且在前台超过15分钟没有发送消息了,就必须重新连接长链接。
if (LongLink::kConnected != longlink_task_manager_->LongLinkChannel().ConnectStatus()
&& (Task::kChannelLong & task.channel_select) && ActiveLogic::Singleton::Instance()->IsForeground()
&& (15 * 60 * 1000 >= gettickcount() - ActiveLogic::Singleton::Instance()->LastForegroundChangeTime()))
longlink_task_manager_->getLongLinkConnectMonitor().MakeSureConnected();

switch (task.channel_select) {
case Task::kChannelBoth: {

#ifdef USE_LONG_LINK
// 长链接已经连接并且当前长链接任务数目小于长链接所能支持的最大任务数,如果连接策略为kChannelFastStrategy的情况下使用长链接,
// 也就是说在长链接允许的情况下优先使用长链接。
bool bUseLongLink = LongLink::kConnected == longlink_task_manager_->LongLinkChannel().ConnectStatus();
if (bUseLongLink && task.channel_strategy == Task::kChannelFastStrategy) {
bUseLongLink = bUseLongLink && (longlink_task_manager_->GetTaskCount() <= kFastSendUseLonglinkTaskCntLimit);
}
if (bUseLongLink)
start_ok = longlink_task_manager_->StartTask(task);
else
#endif
start_ok = shortlink_task_manager_->StartTask(task);
}
break;
#ifdef USE_LONG_LINK

case Task::kChannelLong:
start_ok = longlink_task_manager_->StartTask(task);
break;
#endif

case Task::kChannelShort:
start_ok = shortlink_task_manager_->StartTask(task);
break;

default:
xassert2(false);
break;
}
//.......
}

在NetCore::StartTask中最主要的工作就是根据channel_select来选择调用长链接模块还是短链接模块的StartTask。当channel_select为kChannelBoth的时候在长链接允许的情况下优先使用长链接。

3.5.1 短链接StartTask

我们先看下短链接的情况下的StartTask:

bool ShortLinkTaskManager::StartTask(const Task& _task) {

//.....
TaskProfile task(_task);
task.link_type = Task::kChannelShort;
//添加到列表中,并按照优先级排序
lst_cmd_.push_back(task);
lst_cmd_.sort(__CompareTask);
//在__RunLoop中处理请求
__RunLoop();
return true;
}

短链接的情况下的StartTask会将Task添加到lst_cmd_然后启动__RunLoop中处理task,我们再来看下__RunLoop

void ShortLinkTaskManager::__RunLoop() {
//....
__RunOnTimeout();
__RunOnStartTask();

if (!lst_cmd_.empty()) {
MessageQueue::FasterMessage(asyncreg_.Get(),
MessageQueue::Message((MessageQueue::MessageTitle_t)this, boost::bind(&ShortLinkTaskManager::__RunLoop, this), "ShortLinkTaskManager::__RunLoop"),
MessageQueue::MessageTiming(1000));
} else {}
}

在__RunLoop方法中主要调用了****__RunOnTimeout__RunOnStartTask****然后如果lst_cmd_不为空的话会在间隔1秒后重新执行__RunLoop,从而形成一个循环。
__RunOnTimeout 主要处理请求超时的任务。而__RunOnStartTask才是发起请求的关键:

void ShortLinkTaskManager::__RunOnStartTask() {
//......
while (first != last) {
std::list<TaskProfile>::iterator next = first;
++next;

//如果当前任务正在执行
if (first->running_id) {
++sent_count;
first = next;
continue;
}

//当前间隔时间小于重试间隔时间
if (first->retry_time_interval > curtime - first->retry_start_time) {
xdebug2(TSF"retry interval, taskid:%0, task retry late task, wait:%1", first->task.taskid, (curtime - first->transfer_profile.loop_start_task_time) / 1000);
first = next;
continue;
}

// 如果需要登录的话,确保登录成功
if (first->task.need_authed) {

if (!ismakesureauthruned) {
ismakesureauthruned = true;
ismakesureauthsuccess = MakesureAuthed();
}

if (!ismakesureauthsuccess) {
xinfo2_if(curtime % 3 == 1, TSF"makeSureAuth retsult=%0", ismakesureauthsuccess);
first = next;
continue;
}
}

AutoBuffer bufreq;
AutoBuffer buffer_extension;
int error_code = 0;

//调用上层方法构建请求
if (!Req2Buf(first->task.taskid, first->task.user_context, bufreq, buffer_extension, error_code, Task::kChannelShort)) {
__SingleRespHandle(first, kEctEnDecode, error_code, kTaskFailHandleTaskEnd, 0, first->running_id ? ((ShortLinkInterface*)first->running_id)->Profile() : ConnectProfile());
first = next;
continue;
}

//雪崩检测
xassert2(fun_anti_avalanche_check_);
if (!fun_anti_avalanche_check_(first->task, bufreq.Ptr(), (int)bufreq.Length())) {
__SingleRespHandle(first, kEctLocal, kEctLocalAntiAvalanche, kTaskFailHandleTaskEnd, 0, first->running_id ? ((ShortLinkInterface*)first->running_id)->Profile() : ConnectProfile());
first = next;
continue;
}

first->transfer_profile.loop_start_task_time = ::gettickcount();
first->transfer_profile.first_pkg_timeout = __FirstPkgTimeout(first->task.server_process_cost, bufreq.Length(), sent_count, dynamic_timeout_.GetStatus());
first->current_dyntime_status = (first->task.server_process_cost <= 0) ? dynamic_timeout_.GetStatus() : kEValuating;
first->transfer_profile.read_write_timeout = __ReadWriteTimeout(first->transfer_profile.first_pkg_timeout);
first->transfer_profile.send_data_size = bufreq.Length();
first->use_proxy = (first->remain_retry_count == 0 && first->task.retry_count > 0) ? !default_use_proxy_ : default_use_proxy_;

//创建短链接接口
ShortLinkInterface* worker = ShortLinkChannelFactory::Create(MessageQueue::Handler2Queue(asyncreg_.Get()), net_source_, first->task, first->use_proxy);
//将ShortLinkTaskManager 的 __OnSend,__OnRecv, __OnResponse 绑定到ShortLinkInterface
worker->OnSend.set(boost::bind(&ShortLinkTaskManager::__OnSend, this, _1), AYNC_HANDLER);
worker->OnRecv.set(boost::bind(&ShortLinkTaskManager::__OnRecv, this, _1, _2, _3), AYNC_HANDLER);
worker->OnResponse.set(boost::bind(&ShortLinkTaskManager::__OnResponse, this, _1, _2, _3, _4, _5, _6, _7), AYNC_HANDLER);
//设置running_id,也就是发起请求后会有一个非0的running_id
first->running_id = (intptr_t)worker;

//...

worker->func_network_report.set(fun_notify_network_err_);
//调用SendRequest发送请求
worker->SendRequest(bufreq, buffer_extension);
//......
//发送计数+1
++sent_count;
first = next;
}
}

上面是给出的详细代码注释,我们接下来看下当中的关键代码:


//调用上层方法构建请求
if (!Req2Buf(first->task.taskid, first->task.user_context, bufreq, buffer_extension, error_code, Task::kChannelShort)) {
__SingleRespHandle(first, kEctEnDecode, error_code, kTaskFailHandleTaskEnd, 0, first->running_id ? ((ShortLinkInterface*)first->running_id)->Profile() : ConnectProfile());
first = next;
continue;
}

//创建短链接接口
ShortLinkInterface* worker = ShortLinkChannelFactory::Create(MessageQueue::Handler2Queue(asyncreg_.Get()), net_source_, first->task, first->use_proxy);
//将ShortLinkTaskManager 的 __OnSend,__OnRecv, __OnResponse 绑定到ShortLinkInterface
worker->OnSend.set(boost::bind(&ShortLinkTaskManager::__OnSend, this, _1), AYNC_HANDLER);
worker->OnRecv.set(boost::bind(&ShortLinkTaskManager::__OnRecv, this, _1, _2, _3), AYNC_HANDLER);
worker->OnResponse.set(boost::bind(&ShortLinkTaskManager::__OnResponse, this, _1, _2, _3, _4, _5, _6, _7), AYNC_HANDLER);
//设置running_id,也就是发起请求后会有一个非0的running_id
first->running_id = (intptr_t)worker;
//...
//调用SendRequest发送请求
worker->SendRequest(bufreq, buffer_extension);

从上面可以看出在经过一系列的检查后调用上层的Req2Buf方法,然后会通过ShortLinkChannelFactory创建ShortLinkInterface(ShortLink 对象)然后将ShortLinkInterface的OnSend,OnRecv,OnResponse与ShortLinkTaskManager进行绑定,然后为当前任务分配一个running_id,最后调用SendRequest将请求数据发送出去。

我们继续看ShortLink::SendRequest

void ShortLink::SendRequest(AutoBuffer& _buf_req, AutoBuffer& _buffer_extend) {
send_body_.Attach(_buf_req);
send_extend_.Attach(_buffer_extend);
thread_.start();
}

ShortLink::SendRequest最关键的就是调用了thread_.start()方法,从而运行__Run方法:

void ShortLink::__Run() {
//.....

SOCKET fd_socket = __RunConnect(conn_profile);

if (INVALID_SOCKET == fd_socket) return;
if (OnSend) {
OnSend(this);
} else {
//....
}
//....
__RunReadWrite(fd_socket, errtype, errcode, conn_profile);
//....
socket_close(fd_socket);
}

ShortLink::__Run 紧接着调用****__RunConnect__RunReadWrite****,在****__RunConnect****方法中主要任务是创建短链接,用于发送请求:

SOCKET ShortLink::__RunConnect(ConnectProfile& _conn_profile) {

//.....
ShortLinkConnectObserver connect_observer(*this);
ComplexConnect conn(kShortlinkConnTimeout, kShortlinkConnInterval);

SOCKET sock = conn.ConnectImpatient(vecaddr, breaker_, &connect_observer, _conn_profile.proxy_info.type, proxy_addr, _conn_profile.proxy_info.username, _conn_profile.proxy_info.password);
//.....
return sock;
}

__RunConnect同样方法很长,我们只看最关键的部分它调用ComplexConnect::ConnectImpatient来创建一个SOCKET返回:

SOCKET ComplexConnect::ConnectImpatient(const std::vector<socket_address>& _vecaddr, SocketBreaker& _breaker, MComplexConnect* _observer,
mars::comm::ProxyType _proxy_type, const socket_address* _proxy_addr,
const std::string& _proxy_username, const std::string& _proxy_pwd) {
//......
for (unsigned int i = 0; i < _vecaddr.size(); ++i) {
ConnectCheckFSM* ic = NULL;
if (mars::comm::kProxyHttpTunel == _proxy_type && _proxy_addr) {
ic = new ConnectHttpTunelCheckFSM(_vecaddr[i], *_proxy_addr, _proxy_username, _proxy_pwd, timeout_, i, _observer);
} else if (mars::comm::kProxySocks5 == _proxy_type && _proxy_addr) {
ic = new ConnectSocks5CheckFSM(_vecaddr[i], *_proxy_addr, _proxy_username, _proxy_pwd, timeout_, i, _observer);
} else {
ic = new ConnectCheckFSM(_vecaddr[i], timeout_, i, _observer);
}
//创建 ConnectCheckFSM 添加到vecsocketfsm
vecsocketfsm.push_back(ic);
}

//......
SOCKET retsocket = INVALID_SOCKET;

do {
//......

// socket
for (unsigned int i = 0; i < index; ++i) {
//......

if (TcpClientFSM::EReadWrite == vecsocketfsm[i]->Status() && ConnectCheckFSM::ECheckOK == vecsocketfsm[i]->CheckStatus()) {
if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(),
vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime));
//....
retsocket = vecsocketfsm[i]->Socket();
//....
break;
}
}
//......
} while (true);
//........
return retsocket;
}

接下来就是最关键的收发数据的阶段了,我们看下ShortLink::__RunReadWrite:

void ShortLink::__RunReadWrite(SOCKET _socket, int& _err_type, int& _err_code, ConnectProfile& _conn_profile) {
xmessage2_define(message)(TSF"taskid:%_, cgi:%_, @%_", task_.taskid, task_.cgi, this);

std::string url;
std::map<std::string, std::string> headers;

//.....
if (kIPSourceProxy == _conn_profile.ip_type) {
url += "http://";
url += _conn_profile.host;
}
url += task_.cgi;
//http:// + _conn_profile.host + task_.cgi
headers[http::HeaderFields::KStringHost] = _conn_profile.host;

//.......

//构建出请求后,将请求输出到out_buff
AutoBuffer out_buff;
shortlink_pack(url, headers, send_body_, send_extend_, out_buff, tracker_.get());

//发送请求数据
int send_ret = block_socket_send(_socket, (const unsigned char*)out_buff.Ptr(), (unsigned int)out_buff.Length(), breaker_, _err_code);

//.....
//recv response
AutoBuffer body;
AutoBuffer recv_buf;
AutoBuffer extension;
int status_code = -1;
off_t recv_pos = 0;
MemoryBodyReceiver* receiver = new MemoryBodyReceiver(body);
http::Parser parser(receiver, true);
while (true) {
int recv_ret = block_socket_recv(_socket, recv_buf, KBufferSize, breaker_, _err_code, 5000);
//错误校验
Parser::TRecvStatus parse_status = parser.Recv(recv_buf.Ptr(recv_buf.Length() - recv_ret), recv_ret);
if (parser.FirstLineReady()) {
//获取状态码
status_code = parser.Status().StatusCode();
}
if (parse_status == http::Parser::kFirstLineError) {
//....
} else if (parse_status == http::Parser::kHeaderFieldsError) {
//....
} else if (parse_status == http::Parser::kBodyError) {
//.....
} else if (parse_status == http::Parser::kEnd) {
if (status_code != 200) {
//......
} else {
__OnResponse(kEctOK, status_code, body, extension, _conn_profile, true);
}
break;
} else {
//........
}
}
//......
}

ShortLink::__RunReadWrite 会先调用shortlink_pack将所有的请求打包,然后通过block_socket_send发出去,然后不断轮询调用block_socket_recv获得请求,如果获得成功那么调用__OnResponse,我们之前在介绍创建ShortLink的时候会将ShortLink的__OnResponse方法与ShortLinkTaskManager::__OnResponse进行绑定,所以这里会触发ShortLinkTaskManager::__OnResponse的调用。

void ShortLinkTaskManager::__OnResponse(ShortLinkInterface* _worker, ErrCmdType _err_type, int _status, AutoBuffer& _body, AutoBuffer& _extension, bool _cancel_retry, ConnectProfile& _conn_profile) {

//.......
int handle_type = Buf2Resp(it->task.taskid, it->task.user_context, _body, _extension, err_code, Task::kChannelShort);

switch(handle_type){
case kTaskFailHandleNoError: {
//.....
} break;
case kTaskFailHandleSessionTimeout:{
//....
} break;
case kTaskFailHandleRetryAllTasks: {
//.....
} break;
case kTaskFailHandleTaskEnd: {
//.....
} break;
case kTaskFailHandleDefault: {
//...
} break;
default: {
//...
}
}
}

在上面的方法中通过Buf2Resp回调将底层Socket 返回数据传递到业务逻辑层进行处理。

短链接的收发数据大致流程如下图所示

3.5.2 长链接StartTask

介绍了短链接的整个数据通信流程后再来看长链接会显得相对轻松点,在长链接部分我们最需要注意的是心跳包的处理,为了完整起见这里还是从NetWorkService的StartTask开始介绍:

bool LongLinkTaskManager::StartTask(const Task& _task) {
//将task放置到lst_cmd_数组
lst_cmd_.push_back(task);
lst_cmd_.sort(__CompareTask);
//执行RunLoop
__RunLoop();
return true;
}

LongLinkTaskManager::StartTaskShortLinkTaskManager::StartTask 一样都是将任务添加到lst_cmd_,然后执行__RunLoop。


void LongLinkTaskManager::__RunLoop() {

//.....
__RunOnTimeout();
__RunOnStartTask();

if (!lst_cmd_.empty()) {
//........
MessageQueue::FasterMessage(asyncreg_.Get(),
MessageQueue::Message((MessageQueue::MessageTitle_t)this, boost::bind(&LongLinkTaskManager::__RunLoop, this), "LongLinkTaskManager::__RunLoop"),
MessageQueue::MessageTiming(1000));
} else {
//......
}
}

__RunLoop和短连接也没啥两样,最主要的工作还是放在__RunOnStartTask中,然后每间隔1秒重新发送一个FasterMessage。继续执行__RunLoop:


void LongLinkTaskManager::__RunOnStartTask() {

//.......
bool canretry = curtime - lastbatcherrortime_ >= retry_interval_;//超时间隔是否达到指定的要求
bool canprint = true;
int sent_count = 0;

while (first != last) {
std::list<TaskProfile>::iterator next = first;
++next;

//当前任务已经在运行了
//.......

//重试间隔, 不影响第一次发送的任务
//.......

// 登录处理
//.....

AutoBuffer bufreq;
AutoBuffer buffer_extension;
int error_code = 0;

//如果未进行雪崩检测则进行一次雪崩检测
if (!first->antiavalanche_checked) {
//...
first->antiavalanche_checked = true;
}

//确保长链接处于连接状态
if (!longlinkconnectmon_->MakeSureConnected()) {
//........
}

//如果当前请求缓存是空的则再次通过Req2Buf从上层拿到上层构建的请求
if (0 == bufreq.Length()) {
if (!Req2Buf(first->task.taskid, first->task.user_context, bufreq, buffer_extension, error_code, Task::kChannelLong)) {
__SingleRespHandle(first, kEctEnDecode, error_code, kTaskFailHandleTaskEnd, longlink_->Profile());
first = next;
continue;
}
// 雪崩检测
//......
}

first->transfer_profile.loop_start_task_time = ::gettickcount();
first->transfer_profile.first_pkg_timeout = __FirstPkgTimeout(first->task.server_process_cost, bufreq.Length(), sent_count, dynamic_timeout_.GetStatus());
first->current_dyntime_status = (first->task.server_process_cost <= 0) ? dynamic_timeout_.GetStatus() : kEValuating;
first->transfer_profile.read_write_timeout = __ReadWriteTimeout(first->transfer_profile.first_pkg_timeout);
first->transfer_profile.send_data_size = bufreq.Length();
//调用longlink_->Send 发送数据
first->running_id = longlink_->Send(bufreq, buffer_extension, first->task);

//.......
++sent_count;
first = next;
}
}

__RunOnStartTask 中最关键的部分在于MakeSureConnected* 以及 LongLink::Send

MakeSureConnected 起始在初始化Mars的时候已经简单介绍过了,我们这里展开介绍下,主要还是看,心跳包的控制:

bool LongLinkConnectMonitor::MakeSureConnected() {
__IntervalConnect(kTaskConnect);
return LongLink::kConnected == longlink_.ConnectStatus();
}

uint64_t LongLinkConnectMonitor::__IntervalConnect(int _type) {

//如果连接状态为正在连接或者已经连接的情况下返回0,这时候不会自动重连
if (LongLink::kConnecting == longlink_.ConnectStatus() || LongLink::kConnected == longlink_.ConnectStatus()) return 0;
//获取下一次心跳时间
uint64_t interval = __Interval(_type, activelogic_) * 1000ULL;
//....
if (posttime >= interval) {
bool ret = longlink_.MakeSureConnected(&newone);
//....
} else {
//....
}
}

我们重点看下****__Interval*****:

static unsigned long __Interval(int _type, const ActiveLogic& _activelogic) {

unsigned long interval = sg_interval[_type][__CurActiveState(_activelogic)];/*获得间隔时间*/
if (kLongLinkConnect != _type) return interval;
if (__CurActiveState(_activelogic) == kInactive || __CurActiveState(_activelogic) == kForgroundActive) { // now - LastForegroundChangeTime>10min
//不活跃的情况下用户名为空
if (!_activelogic.IsActive() && GetAccountInfo().username.empty()) {
interval = kNoAccountInfoInactiveInterval;
xwarn2(TSF"no account info and inactive, interval:%_", interval);
//无网络的情况下
} else if (kNoNet == getNetInfo()) {
interval = interval * kNoNetSaltRate + kNoNetSaltRise;
xinfo2(TSF"no net, interval:%0", interval);
//无用户名的情况下
} else if (GetAccountInfo().username.empty()) {
interval = interval * kNoAccountInfoSaltRate + kNoAccountInfoSaltRise;
xinfo2(TSF"no account info, interval:%0", interval);

} else {
// default value
interval += rand() % (20);
}
}
return interval;
}

它会根据连接类型,当前设备活跃状态设置下一次心跳的时间,它的值存在二维数组sg_interval中:

                            kForgroundOneMinute | kForgroundTenMinute | kForgroundActive | kBackgroundActive | kInactive
static unsigned long const sg_interval[][5] = {
kTaskConnect: { 5, 10, 20, 30, 300},
kLongLinkConnect: { 15, 30, 240, 300, 600},
kNetworkChangeConnect: { 0, 0, 0, 0, 0},
};

我们继续回到LongLink::MakeSureConnected

bool LongLink::MakeSureConnected(bool* _newone) {
//.......
thread_.start(&newone);
//....
return false;
}

LongLink::MakeSureConnected的处理非常简单只是调用了thread_.start,实际上会执行LongLink::__Run方法:

void LongLink::__Run() {
//.....
SOCKET sock = __RunConnect(conn_profile);
//......
__RunReadWrite(sock, errtype, errcode, conn_profile);

socket_close(sock);
//...
}

LongLink::__Run方法和短链接一样都只执行两个方法****__RunConnect创建SOCKET,__RunReadWrite****开始运行读写循环。

SOCKET LongLink::__RunConnect(ConnectProfile& _conn_profile) {
//........
LongLinkConnectObserver connect_observer(*this, ip_items);
ComplexConnect com_connect(kLonglinkConnTimeout, kLonglinkConnInteral, kLonglinkConnInteral, kLonglinkConnMax);
SOCKET sock = com_connect.ConnectImpatient(vecaddr, connectbreak_, &connect_observer, proxy_info.type, proxy_addr, proxy_info.username, proxy_info.password);
//......
return sock;
}

长短链接最大的区别还是在****__RunReadWrite****方法上:

void LongLink::__RunReadWrite(SOCKET _sock, ErrCmdType& _errtype, int& _errcode, ConnectProfile& _profile) {

//心跳间隔时间一到就执行__OnAlarm
Alarm alarmnoopinterval(boost::bind(&LongLink::__OnAlarm, this), false);
//超时定时器
Alarm alarmnooptimeout(boost::bind(&LongLink::__OnAlarm, this), false);

//.......

while (true) {
if (!alarmnoopinterval.IsWaiting()) {
if (first_noop_sent && alarmnoopinterval.Status() != Alarm::kOnAlarm) {
xassert2(false, "noop interval alarm not running");
}

if(first_noop_sent && alarmnoopinterval.Status() == Alarm::kOnAlarm) {
__NotifySmartHeartbeatJudgeDozeStyle();
}
xgroup2_define(noop_xlog);
uint64_t last_noop_interval = alarmnoopinterval.After();
uint64_t last_noop_actual_interval = (alarmnoopinterval.Status() == Alarm::kOnAlarm) ? alarmnoopinterval.ElapseTime() : 0;
bool has_late_toomuch = (last_noop_actual_interval >= (15*60*1000));

//发送起始心跳
if (__NoopReq(noop_xlog, alarmnooptimeout, has_late_toomuch)) {
nooping = true;
__NotifySmartHeartbeatHeartReq(_profile, last_noop_interval, last_noop_actual_interval);
}
first_noop_sent = true;
//获得下一个心跳时间
uint64_t noop_interval = __GetNextHeartbeatInterval();
alarmnoopinterval.Cancel();
//启动心跳计时
alarmnoopinterval.Start((int)noop_interval);
}
//.........
if (sel.Write_FD_ISSET(_sock) && !lstsenddata_.empty()) {
xgroup2_define(xlog_group);
xinfo2(TSF"task socket send sock:%0, ", _sock) >> xlog_group;

iovec* vecwrite = (iovec*)calloc(lstsenddata_.size(), sizeof(iovec));
unsigned int offset = 0;
//将数据放到缓存中,lstsenddata_是Send方法添加的。
for (auto it = lstsenddata_.begin(); it != lstsenddata_.end(); ++it) {
vecwrite[offset].iov_base = it->second->PosPtr();
vecwrite[offset].iov_len = it->second->PosLength();
++offset;
}
//写到发送端口,将数据发送出去
ssize_t writelen = writev(_sock, vecwrite, (int)lstsenddata_.size());
free(vecwrite);
//获得下一个心跳时间
unsigned long long noop_interval = __GetNextHeartbeatInterval();
alarmnoopinterval.Cancel();
alarmnoopinterval.Start((int)noop_interval);

auto it = lstsenddata_.begin();

while (it != lstsenddata_.end() && 0 < writelen) {
if (0 == it->second->Pos() && OnSend) OnSend(it->first.taskid);
if ((size_t)writelen >= it->second->PosLength()) {
//......
LongLinkNWriteData nwrite(it->second->Length(), it->first);
nsent_datas.push_back(nwrite);
it = lstsenddata_.erase(it);
} else {
//......
}
}
}

if (sel.Read_FD_ISSET(_sock)) {
bufrecv.AllocWrite(64 * 1024, false);
ssize_t recvlen = recv(_sock, bufrecv.PosPtr(), 64 * 1024, 0);
//.......
while (0 < bufrecv.Length()) {
uint32_t cmdid = 0;
uint32_t taskid = Task::kInvalidTaskID;
size_t packlen = 0;
AutoBuffer body;
AutoBuffer extension;
//读取数据,解压数据
int unpackret = longlink_unpack(bufrecv, cmdid, taskid, packlen, body, extension, tracker_.get());

//.......
if (LONGLINK_UNPACK_STREAM_PACKAGE == unpackret) {
if (OnRecv)
OnRecv(taskid, packlen, packlen);
} else if (!__NoopResp(cmdid, taskid, stream_resp.stream, stream_resp.extension, alarmnooptimeout, nooping, _profile)) {
//交给上层处理
if (OnResponse)
OnResponse(kEctOK, 0, cmdid, taskid, stream_resp.stream, stream_resp.extension, _profile);
sent_taskids.erase(taskid);
}
}
}
}
//............
}
bool LongLink::Send(const AutoBuffer& _body, const AutoBuffer& _extension, const Task& _task) {
//......
lstsenddata_.push_back(std::make_pair(_task, move_wrapper<AutoBuffer>(AutoBuffer())));
longlink_pack(_task.cmdid, _task.taskid, _body, _extension, lstsenddata_.back().second, tracker_.get());
lstsenddata_.back().second->Seek(0, AutoBuffer::ESeekStart);
//...
return true;
}

关于长链接的数据收发,在发送的时候会调用LongLink::Send这时候会将数据放置到lstsenddata_,在有写数据的socket的时候,会将lstsenddata_的数据调用writev写到网络端口,从而发送出去,而读数据和短链接流程类似都是将数据从端口读出,然后通过onResponse将数据交给LongLink,再由LongLink将数据交给LongLinkManager,LongLinkManager再将数据通过持有的Req2Buf callback,将数据传递到应用层。

整个Mars的流程图如下所示:

Contents
  1. 1. 源码信息
  2. 2. Mars 概览
  3. 3. Mars Sample 代码分析