RTSP 概述:

RTSP 是Real Time Streaming Protocol(实时流媒体协议)的简称。RTSP提供一种可扩展的框架,使得能够提供可控制的,按需传输实时数据,比如音频和视频文件。RTSP对流媒体提供了诸如暂停,快进等控制,而它本身并不传输数据,RTSP作用相当于流媒体服务器的远程控制。传输数据可以通过传输层的TCP,UDP协议,RTSP也提供了基于 RTP传输机制的一些有效的方法。

RTSP 模型:

客户机在向视频服务器请求视频服务之前,首先通过HTTP协议从WEB服务器获取所请求视频服务的演示描述(Presentation description)文件,在RTSP中,每个演示(Presentation)及其所对应的媒体流都由一个RTSP URL标识。整个演示及媒体特性都在一个演示描述(Presentation description)文件中定义,该文件可能包括媒体编码方式、语言、RTSPURLs、目标地址、端口及其它参数。用户在向服务器请求某个连续媒体流的服务之前,必须首先从服务器获得该媒体流的演示描述(Presentation description )文件以得到必需的参数。利用该文件提供的信息定位视频服务地址(包括视频服务器地址和端口号)及视频服务的编码方式等信息。
客户机根据上述信息向视频服务器请求视频服务。视频服务初始化完毕,视频服务器为该客户建立一个新的视频服务流,客户端与服务器运行实时流控制协议RTSP,以对该流进行各种VCR 控制信号的交换,如播放、暂停、快进、快退等。当服务完毕,客户端提出拆线(TEARDOWN)请求。服务器使用 RTP协议将媒体数据传输给客户端,一旦数据抵达客户端,客户端应用程序即可播放输出。在流式传输中,使用RTP/RTCP和RTSP /TCP两种不同的通信协议在客户端和服务器间建立联系。如下图:

RTSP 协议消息格式:
  • 请求消息格式:
方法   URI  RTSP版本   CR  LF 
消息头 CR LF CR LF
消息体 CR LF

其中方法包括OPTION回应中所有的命令,URI是接受方的地址,例如
rtsp://192.168.20.136

RTSP版本一般都是 RTSP/1.0.每行后面的CR LF表示回车换行,需要接受端有相应的解析,最后一个消息头需要有两个CR LF

  • 回应消息格式:
RTSP版本  状态码  解释 CR  LF 
消息头 CR LF CR LF
消息体 CR LF

其中RTSP版本一般都是RTSP/1.0,状态码是一个数值,200表示成功,解释是与状态码对应的文本解释。

简单的RTSP 交互过程:

下面以一次流媒体播放为例介绍整个播放过程的RTSP状态转换的流程:
其中C表示RTSP客户端,S表示RTSP服务端:

C->S:OPTION     request        //询问服务端有哪些方法可用
S->C:OPTION response //服务端回应信息中包括提供的所有可用方法

C->S:DESCRIBE request //要求得到服务端提供的媒体初始化描述信息
S->C:DESCRIBE response //服务端回应媒体初始化描述信息,主要是SDP

C->S:SETUP request //设置会话的属性,以及传输模式提醒服务端建立会话
S->C:SETUP response //服务端建立会话,返回会话标识符,和会话相关信息

C->S:PLAY request //客户端请求播放
S->C:PLAY response //服务器回应该请求的信息

S->C: //发送流媒体数据

C->S:TEARDOWN request //客户端请求关闭会话
S->C:TEARDOWN response //服务端回应该请求

其中第SETUP和PLAY这两部是必需的,
OPTION 步骤只要服务器客户端约定好,有哪些方法可用,则option请求可以不要。
如果我们有其他途径得到媒体初始化描述信息,则我们也不需要通过RTSP中的DESCRIPTION请求来完成。
TEARDOWN,可以根据系统需求的设计来决定是否需要。

RTSP的主要命令表:





RTSP状态码:
Status-Code =
| "100" ; Continue
| "200" ; OK
| "201" ; Created
| "250" ; Low on Storage Space
| "300" ; Multiple Choices
| "301" ; Moved Permanently
| "302" ; Moved Temporarily
| "303" ; See Other
| "304" ; Not Modified
| "305" ; Use Proxy
| "400" ; Bad Request
| "401" ; Unauthorized
| "402" ; Payment Required
| "403" ; Forbidden
| "404" ; Not Found
| "405" ; Method Not Allowed
| "406" ; Not Acceptable
| "407" ; Proxy Authentication Required
| "408" ; Request Time-out
| "410" ; Gone
| "411" ; Length Required
| "412" ; Precondition Failed
| "413" ; Request Entity Too Large
| "414" ; Request-URI Too Large
| "415" ; Unsupported Media Type
| "451" ; Parameter Not Understood
| "452" ; Conference Not Found
| "453" ; Not Enough Bandwidth
| "454" ; Session Not Found
| "455" ; Method Not Valid in This State
| "456" ; Header Field Not Valid for Resource
| "457" ; Invalid Range
| "458" ; Parameter Is Read-Only
| "459" ; Aggregate operation not allowed
| "460" ; Only aggregate operation allowed
| "461" ; Unsupported transport
| "462" ; Destination unreachable
| "500" ; Internal Server Error
| "501" ; Not Implemented
| "502" ; Bad Gateway
| "503" ; Service Unavailable
| "504" ; Gateway Time-out
| "505" ; RTSP Version not supported
| "551" ; Option not supported

SDP的格式:
v=<version>                            (协议版本)
o=<username> <session id> <version> <network type> <address type> <address> (所有者/创建者和会话标识符)
s=<session name> (会话名称)
i=<session description> (会话信息)
u=<URI> (URI 描述)
e=<email address> (Email 地址)
p=<phone number> (电话号码)
c=<network type> <address type> <connection address> (连接信息)
b=<modifier>:<bandwidth-value> (带宽信息)
t=<start time> <stop time> (会话活动时间)
r=<repeat interval> <active duration> <list of offsets from start-time>
(0或多次重复次数)
z=<adjustment time> <offset> <adjustment time> <offset>(时间区域调整)
k=<method>:<encryption key> (加密密钥)
a=<attribute>:<value> (0 个或多个会话属性行)
m=<media> <port> <transport> <fmt list> (媒体名称和传输地址)

时间描述:
t = (会话活动时间)
r = * (0或多次重复次数)
媒体描述:
m = (媒体名称和传输地址)
i = * (媒体标题)
c = * (连接信息 — 如果包含在会话层则该字段可选)
b = * (带宽信息)
k = * (加密密钥)
a = * (0 个或多个媒体属性行)
RTP协议:

实时传输协议(Real-time Transport Protocol,RTP)是用来在单播或者多播的情境中传流媒体数据的数据传输协议。通常使用UDP来进行多媒体数据的传输,也不排除使用TCP或者ATM等其它协议作为它的载体,整个RTP 协议由两个密切相关的部分组成:RTP数据协议和RTP控制协议(也就是RTCP协议)。
RTP为Internet上端到端的实时传输提供时间信息和流同步,但它并不保证服务质量,服务质量由RTCP来提供。

  • 使用RTP协议进行数据传输的一个简要RTP的会话过程:

当应用程序建立一个RTP会话时,应用程序将确定一对目的传输地址。目的传输地址由一个网络地址和一对端口组成,有两个端口:一个给RTP包,一个给RTCP包,也就是说RTP和RTCP数据包是分开传输的,这样可以使得RTP/RTCP数据能够正确发送。其中RTP数据发向偶数的UDP端口,而对应的控制信号RTCP数据发向相邻的奇数UDP端口,这样就构成一个UDP端口对。
当发送数据的时候RTP协议从上层接收流媒体信息码流,封装成RTP数据包;RTCP从上层接收控制信息,封装成RTCP控制包。RTP将RTP 数据包发往UDP端口对中偶数端口;RTCP将RTCP控制包发往UDP端口对中的接收端口。
如果在一次会议中同时使用了音频和视频会议,这两种媒体将分别在不同的RTP会话中传送,每一个会话使用不同的传输地址(IP地址+端口)。如果一个用户同时使用了两个会话,则每个会话对应的RTCP包都使用规范化名字CNAME(Canonical Name)。与会者可以根据RTCP包中的CNAME来获取相关联的音频和视频,然后根据RTCP包中的计时信息(Network time protocol)来实现音频和视频的同步。

  • 翻译器和混合器
    在RTP协议中还引入了翻译器和混合器。翻译器和混合器都是RTP级的中继系统。
    混合器的使用情景:
    在Internet上举行视频会议时,可能有少数参加者通过低速链路与使用高速网络的多数参加者相连接。为了不强制所有会议参加者都使用低带宽和低质量的数据编码,RTP允许在低带宽区域附近使用混合器作为RTP级中继器。混合器从一个或多个信源接收RTP报文,对到达的数据报文进行重新同步和重新组合,这些重组的数据流被混合成一个数据流,将数据编码转化为在低带宽上可用的类型,并通过低速链路向低带宽区域转发。为了对多个输入信源进行统一的同步,混合器在多个媒体流之间进行定时调整,产生它自己的定时同步,因此所有从混合器输出的报文都把混合器作为同步信源。为了保证接收者能够正确识别混合器处理前的原始报文发送者,混合器在RTP报头中设置了CSRC标识符队列,以标识那些产生混和报文的原始同步信源。
    翻译器的使用情景
    在Internet环境中,一些会议的参加者可能被隔离在应用级防火墙的外面,这些参加者被禁止直接使用IP组播地址进行访问,虽然他们可能是通过高速链路连接的。在这些情况下,RTP允许使用转换器作为RTP级中继器。在防火墙两端分别安装一个转换器,防火墙之外的转换器过滤所有接收到的组播报文,并通过一条安全的连接传送给防火墙之内的转换器,内部转换器将这些组播报文再转发送给内部网络中的组播组成员
RTP协议报头格式


RTCP协议报头格式

如前面所述RTCP的主要功能是:服务质量的监视与反馈、媒体间的同步,以及多播组中成员的标识。在RTP会话期间,各参与者周期性地传送RTCP包。RTCP包中含有已发送的数据包的数量、丢失的数据包的数量等统计资料,因此,各参与者可以利用这些信息动态地改变传输速率,甚至改变有效载荷类型。RTP和RTCP配合使用,它们能以有效的反馈和最小的开销使传输效率最佳化,因而特别适合传送网上的实时数据。RTCP也是用UDP来传送的,但RTCP封装的仅仅是一些控制信息,因而分组很短,所以可以将多个RTCP分组封装在一个UDP包中。
RTCP有如下五种分组类型:

下面是SR分组的格式:

基于NuPLayer的RTSP 代码流程

setDataSource 阶段的任务这里就不重复介绍了,它主要完成播放引擎的建立以及根据URL格式创建对应的Source,比如这里将要提到的RTSPSource,然后赋值给mSource。

我们直接来看prepare阶段:

先上图再看代码,结合图看会比较清晰



在prepare阶段我们首先会判断是否是SDP,mIsSDP这个变量是在初始化RTSPSource时候传入的,我们这里先分析mIsSDP = false的情况。这种情况下首先创建一个MyHandler,并调用connect,与服务器建立连接。

void NuPlayer::RTSPSource::prepareAsync() {

//..........
sp<AMessage> notify = new AMessage(kWhatNotify, this);

//检查当前状态是否为DISCONNECTED
CHECK_EQ(mState, (int)DISCONNECTED);
//设置当前状态为CONNECTING
mState = CONNECTING;

if (mIsSDP) {
//如果是SDP那么就需要创建一个SDPLoader 从服务器上加载一个描述文件
mSDPLoader = new SDPLoader(notify, (mFlags & kFlagIncognito) ? SDPLoader::kFlagIncognito : 0, mHTTPService);
mSDPLoader->load(mURL.c_str(), mExtraHeaders.isEmpty() ? NULL : &mExtraHeaders);
} else {
//如果不是SDP 那么就使用MyHandler 来进行连接
mHandler = new MyHandler(mURL.c_str(), notify, mUIDValid, mUID);
mLooper->registerHandler(mHandler);
mHandler->connect();
}
//启动缓存
startBufferingIfNecessary();
}

在介绍connect方法之前需要先了解mConn以及mRTPConn这两个成员变量,mConn是一个ARTSPConnection,它主要与服务器相连,发送和接收请求数据,mRTPConn是一个ARTPConnection 用于发送和接收媒体数据。
在connect方法中会使用mConn向服务器发起连接请求。

void connect() {
//mConn(new ARTSPConnection(mUIDValid, mUID)),
looper()->registerHandler(mConn);
//mRTPConn(new ARTPConnection),
(1 ? mNetLooper : looper())->registerHandler(mRTPConn);
sp<AMessage> notify = new AMessage('biny', this);
mConn->observeBinaryData(notify);
//连接服务
sp<AMessage> reply = new AMessage('conn', this);
mConn->connect(mOriginalSessionURL.c_str(), reply);
}
void ARTSPConnection::connect(const char *url, const sp<AMessage> &reply) {
//发送一个kWhatConnect消息,注意这里传递的是一个url和reply
sp<AMessage> msg = new AMessage(kWhatConnect, this);
msg->setString("url", url);
msg->setMessage("reply", reply);
msg->post();
}
case kWhatConnect:
onConnect(msg);
break;

在ARTSPConnection::onConnect中将会从传递过来的URl中解析host,port,path,mUser,mPass,并调用::connect 和服务器取得联系,最后调用postReceiveReponseEvent将请求的回复响应暂存起来。

void ARTSPConnection::onConnect(const sp<AMessage> &msg) {
++mConnectionID;

if (mState != DISCONNECTED) {
if (mUIDValid) {
HTTPBase::UnRegisterSocketUserTag(mSocket);
HTTPBase::UnRegisterSocketUserMark(mSocket);
}
close(mSocket);
mSocket = -1;
flushPendingRequests();
}

mState = CONNECTING;
AString url;
//从消息中取下Url
CHECK(msg->findString("url", &url));
sp<AMessage> reply;
//从消息中取下replay
CHECK(msg->findMessage("reply", &reply));

AString host, path;
unsigned port;
//从URl中解析host,port,path,mUser,mPass
if (!ParseURL(url.c_str(), &host, &port, &path, &mUser, &mPass)
|| (mUser.size() > 0 && mPass.size() == 0)) {

//有用户名,但是没有密码,返回错误信息
// If we have a user name but no password we have to give up
// right here, since we currently have no way of asking the user
// for this information.
ALOGE("Malformed rtsp url %s", uriDebugString(url).c_str());
reply->setInt32("result", ERROR_MALFORMED);
reply->post();
mState = DISCONNECTED;
return;
}

if (mUser.size() > 0) {
ALOGV("user = '%s', pass = '%s'", mUser.c_str(), mPass.c_str());
}

struct hostent *ent = gethostbyname(host.c_str());
if (ent == NULL) {
ALOGE("Unknown host %s", host.c_str());
reply->setInt32("result", -ENOENT);
reply->post();
mState = DISCONNECTED;
return;
}

mSocket = socket(AF_INET, SOCK_STREAM, 0);

if (mUIDValid) {
HTTPBase::RegisterSocketUserTag(mSocket, mUID,(uint32_t)*(uint32_t*) "RTSP");
HTTPBase::RegisterSocketUserMark(mSocket, mUID);
}

MakeSocketBlocking(mSocket, false);

struct sockaddr_in remote;
memset(remote.sin_zero, 0, sizeof(remote.sin_zero));
remote.sin_family = AF_INET;
remote.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
remote.sin_port = htons(port);
//连接到服务器
int err = ::connect(mSocket, (const struct sockaddr *)&remote, sizeof(remote));

//返回服务器ip
reply->setInt32("server-ip", ntohl(remote.sin_addr.s_addr));

if (err < 0) {
if (errno == EINPROGRESS) {
sp<AMessage> msg = new AMessage(kWhatCompleteConnection, this);
msg->setMessage("reply", reply);
msg->setInt32("connection-id", mConnectionID);
msg->post();
return;
}

reply->setInt32("result", -errno);
mState = DISCONNECTED;

if (mUIDValid) {
HTTPBase::UnRegisterSocketUserTag(mSocket);
HTTPBase::UnRegisterSocketUserMark(mSocket);
}
close(mSocket);
mSocket = -1;
} else {
//成功的花返回result为OK
reply->setInt32("result", OK);
//设置状态为CONNECTED
mState = CONNECTED;
mNextCSeq = 1;
//发送等待返回消息
postReceiveReponseEvent();
}
//‘conn’
reply->post();
}

我们接下来看下postReceiveReponseEvent

void ARTSPConnection::postReceiveReponseEvent() {
if (mReceiveResponseEventPending) {
return;
}
sp<AMessage> msg = new AMessage(kWhatReceiveResponse, this);
msg->post();
mReceiveResponseEventPending = true;
}

调用receiveRTSPReponse获得服务器的回复

void ARTSPConnection::onReceiveResponse() {
mReceiveResponseEventPending = false;
if (mState != CONNECTED) {
return;
}
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = kSelectTimeoutUs;
fd_set rs;
FD_ZERO(&rs);
FD_SET(mSocket, &rs);

//选择一个返回的连接
int res = select(mSocket + 1, &rs, NULL, NULL, &tv);

if (res == 1) {
MakeSocketBlocking(mSocket, true);
bool success = receiveRTSPReponse();
MakeSocketBlocking(mSocket, false);
if (!success) {
// Something horrible, irreparable has happened.
flushPendingRequests();
return;
}
}
postReceiveReponseEvent();
}

注意这里的receiveRTSPReponse是有双重功能的,方面可以接收从服务器发来的请求,另一方面可以处理服务器发来的应答信号。

bool ARTSPConnection::receiveRTSPReponse() {

AString statusLine;
if (!receiveLine(&statusLine)) {
return false;
}
if (statusLine == "$") {
sp<ABuffer> buffer = receiveBinaryData();
if (buffer == NULL) {
return false;
}
if (mObserveBinaryMessage != NULL) {
sp<AMessage> notify = mObserveBinaryMessage->dup();
notify->setBuffer("buffer", buffer);
notify->post();
} else {
ALOGW("received binary data, but no one cares.");
}
return true;
}

//RTSP返回对象
sp<ARTSPResponse> response = new ARTSPResponse;
response->mStatusLine = statusLine;
ALOGI("status: %s", response->mStatusLine.c_str());
ssize_t space1 = response->mStatusLine.find(" ");
if (space1 < 0) {
return false;
}
ssize_t space2 = response->mStatusLine.find(" ", space1 + 1);
if (space2 < 0) {
return false;
}

bool isRequest = false;
//判断返回的RTSP版本是否正确
if (!IsRTSPVersion(AString(response->mStatusLine, 0, space1))) {
CHECK(IsRTSPVersion(AString(response->mStatusLine,space2 + 1,response->mStatusLine.size() - space2 - 1)));
isRequest = true;
response->mStatusCode = 0;
} else {
//判断状态码是否正确
AString statusCodeStr(response->mStatusLine, space1 + 1, space2 - space1 - 1);
if (!ParseSingleUnsignedLong(statusCodeStr.c_str(), &response->mStatusCode) || response->mStatusCode < 100 || response->mStatusCode > 999) {
return false;
}
}

AString line;
ssize_t lastDictIndex = -1;
for (;;) {
if (!receiveLine(&line)) {
break;
}
if (line.empty()) {
break;
}
ALOGV("line: '%s'", line.c_str());
if (line.c_str()[0] == ' ' || line.c_str()[0] == '\t') {
// Support for folded header values.
if (lastDictIndex < 0) {
// First line cannot be a continuation of the previous one.
return false;
}
AString &value = response->mHeaders.editValueAt(lastDictIndex);
value.append(line);

continue;
}
ssize_t colonPos = line.find(":");
if (colonPos < 0) {
// Malformed header line.
return false;
}
AString key(line, 0, colonPos);
key.trim();
key.tolower();
line.erase(0, colonPos + 1);
lastDictIndex = response->mHeaders.add(key, line);
}

for (size_t i = 0; i < response->mHeaders.size(); ++i) {
response->mHeaders.editValueAt(i).trim();
}

unsigned long contentLength = 0;

ssize_t i = response->mHeaders.indexOfKey("content-length");

if (i >= 0) {
AString value = response->mHeaders.valueAt(i);
if (!ParseSingleUnsignedLong(value.c_str(), &contentLength)) {
return false;
}
}
//接收mContent
if (contentLength > 0) {
response->mContent = new ABuffer(contentLength);
if (receive(response->mContent->data(), contentLength) != OK) {
return false;
}
}
//isRequest 表示是服务器主动发送的请求,那么将调用handleServerRequest,否则表示是服务器被动响应客户端的请求,那么将通知服务器有响应了notifyResponseListener
return isRequest
? handleServerRequest(response)
: notifyResponseListener(response);
}

isRequest 表示是服务器主动发送的请求,那么将调用handleServerRequest,否则表示是服务器被动响应客户端的请求,那么将通知服务器有响应了notifyResponseListener,我们这里先看下这两个方法的实现:

看到handleServerRequest大家可能会有点失望,因为这里尚未实现这个功能所以只是向服务器返回一个“RTSP/1.0 501 Not Implemented”的消息。

bool ARTSPConnection::handleServerRequest(const sp<ARTSPResponse> &request) {
// Implementation of server->client requests is optional for all methods
// but we do need to respond, even if it's just to say that we don't
// support the method.

//这里我们不实现任何答复行为只是简单反馈我们尚未实现这个功能
ssize_t space1 = request->mStatusLine.find(" ");
CHECK_GE(space1, 0);
AString response;
response.append("RTSP/1.0 501 Not Implemented\r\n");
ssize_t i = request->mHeaders.indexOfKey("cseq");
if (i >= 0) {
AString value = request->mHeaders.valueAt(i);
unsigned long cseq;
if (!ParseSingleUnsignedLong(value.c_str(), &cseq)) {
return false;
}
response.append("CSeq: ");
response.append(cseq);
response.append("\r\n");
}
response.append("\r\n");
size_t numBytesSent = 0;
while (numBytesSent < response.size()) {
ssize_t n =
send(mSocket, response.c_str() + numBytesSent,
response.size() - numBytesSent, 0);
if (n < 0 && errno == EINTR) {
continue;
}
if (n <= 0) {
if (n == 0) {
// Server closed the connection.
ALOGE("Server unexpectedly closed the connection.");
} else {
ALOGE("Error sending rtsp response (%s).", strerror(errno));
}
performDisconnect();
return false;
}
numBytesSent += (size_t)n;
}
return true;
}

notifyResponseListener的实现比较清晰,它会根据服务器发来的应答响应,找出响应该应答的Message,然后将response返回给MyHandler,进行处理。

bool ARTSPConnection::notifyResponseListener(
const sp<ARTSPResponse> &response) {
ssize_t i;
//在队列中查找尚未处理的请求
status_t err = findPendingRequest(response, &i);
if (err != OK) {
return false;
}
//发送服务器的回复给它
sp<AMessage> reply = mPendingRequests.valueAt(i);
mPendingRequests.removeItemsAt(i);
reply->setInt32("result", OK);
reply->setObject("response", response);
reply->post();
return true;
}

好了我们言归正传,我们看下MyHandler中对conn回复怎么处理:

case 'conn':
{
int32_t result;
//取出反馈结果
CHECK(msg->findInt32("result", &result));
if (result == OK) {
//发送请求描述符的消息
AString request;
request = "DESCRIBE ";
request.append(mSessionURL);
request.append(" RTSP/1.0\r\n");
request.append("Accept: application/sdp\r\n");
request.append("\r\n");
sp<AMessage> reply = new AMessage('desc', this);
mConn->sendRequest(request.c_str(), reply);
} else {
(new AMessage('disc', this))->post();
}
break;
}

这里比较简单就是收到答复之后,直接判断结果是OK还是不OK,如果OK那么就发送一个DESCRIBE的请求。我们重点看下,onSendRequest理解这个很重要:
在onSendRequest中会对请求加工处理下,比如添加Cseq等操作,然后就会调用send向服务器发送请求。并将请求以Cseq为键码,replay为回复消息的待处理请求队列中。

void ARTSPConnection::onSendRequest(const sp<AMessage> &msg) {
sp<AMessage> reply;
CHECK(msg->findMessage("reply", &reply));
//对请求进行加工处理
AString request;
CHECK(msg->findString("request", &request));
// Just in case we need to re-issue the request with proper authentication
// later, stash it away.
reply->setString("original-request", request.c_str(), request.size());
addAuthentication(&request);
addUserAgent(&request);
// Find the boundary between headers and the body.
ssize_t i = request.find("\r\n\r\n");
CHECK_GE(i, 0);
int32_t cseq = mNextCSeq++;
AString cseqHeader = "CSeq: ";
cseqHeader.append(cseq);
cseqHeader.append("\r\n");
request.insert(cseqHeader, i + 2);
ALOGV("request: '%s'", request.c_str());

size_t numBytesSent = 0;
while (numBytesSent < request.size()) {
//如果请求还没完全发送结束那么继续发送
ssize_t n = send(mSocket, request.c_str() + numBytesSent,request.size() - numBytesSent, 0);}
//忽略错误处理代码
numBytesSent += (size_t)n;
}
//将请求添加到mPendingRequests,等待服务器回复
mPendingRequests.add(cseq, reply);
}

回到上面提到的notifyResponseListener结合onSendRequest以及findPendingRequest是否看出了整个事件处理的流程?

status_t ARTSPConnection::findPendingRequest(
const sp<ARTSPResponse> &response, ssize_t *index) const {
*index = 0;

ssize_t i = response->mHeaders.indexOfKey("cseq");

if (i < 0) {
// This is an unsolicited server->client message.
*index = -1;
return OK;
}
AString value = response->mHeaders.valueAt(i);

unsigned long cseq;
if (!ParseSingleUnsignedLong(value.c_str(), &cseq)) {
return ERROR_MALFORMED;
}
i = mPendingRequests.indexOfKey(cseq);
if (i < 0) {
return -ENOENT;
}
*index = i;
return OK;
}

onSendRequest 会不断将请求放入mPendingRequests中,而每次服务器给出应答的时候会调用notifyResponseListener,notifyResponseListener会从mPendingRequests中取出一个应答消息,并发送消息给MyHandler进行处理,而notifyResponseListener又会阻塞等待下一个服务器的应答信号。

OK我们接下来看下收到‘desc’信号后的处理:

case 'desc':
{
int32_t result;
CHECK(msg->findInt32("result", &result));

if (result == OK) {
sp<RefBase> obj;
CHECK(msg->findObject("response", &obj));
sp<ARTSPResponse> response = static_cast<ARTSPResponse *>(obj.get());

if (response->mStatusCode == 301 || response->mStatusCode == 302) {
//重定向连接
//............
}

if (response->mStatusCode != 200) {
result = UNKNOWN_ERROR;
} else if (response->mContent == NULL) {
result = ERROR_MALFORMED;
ALOGE("The response has no content.");
} else {
//获得ASessionDescription
mSessionDesc = new ASessionDescription;
mSessionDesc->setTo(response->mContent->data(),response->mContent->size());

if (!mSessionDesc->isValid()) {
//............
} else {
//............
if (mSessionDesc->countTracks() < 2) {
// There's no actual tracks in this session.
// The first "track" is merely session meta
// data.
ALOGW("Session doesn't contain any playable "
"tracks. Aborting.");
result = ERROR_UNSUPPORTED;
} else {
//这里才是真正要处理的代码
setupTrack(1);
}
}
}
}
break;
}

上面代码很长我们忽略不重要的,直接看setupTrack。

void setupTrack(size_t index) {

sp<APacketSource> source = new APacketSource(mSessionDesc, index);
AString url;
CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
AString trackURL;
//获得多媒体文件的Uri
CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));

mTracks.push(TrackInfo());
TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
//设置uri
info->mURL = trackURL;
//设置APacketSource
info->mPacketSource = source;
info->mUsingInterleavedTCP = false;
info->mFirstSeqNumInSegment = 0;
info->mNewSegment = true;
info->mRTPSocket = -1;
info->mRTCPSocket = -1;
info->mRTPAnchor = 0;
info->mNTPAnchorUs = -1;
info->mNormalPlayTimeRTP = 0;
info->mNormalPlayTimeUs = 0ll;

unsigned long PT;
AString formatDesc;
AString formatParams;
mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);

int32_t timescale;
int32_t numChannels;
ASessionDescription::ParseFormatDesc(formatDesc.c_str(), &timescale, &numChannels);

info->mTimeScale = timescale;
info->mEOSReceived = false;

ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());

//建立SETUP请求
AString request = "SETUP ";
request.append(trackURL);
request.append(" RTSP/1.0\r\n");
if (mTryTCPInterleaving) {
size_t interleaveIndex = 2 * (mTracks.size() - 1);
info->mUsingInterleavedTCP = true;
info->mRTPSocket = interleaveIndex;
info->mRTCPSocket = interleaveIndex + 1;
request.append("Transport: RTP/AVP/TCP;interleaved=");
request.append(interleaveIndex);
request.append("-");
request.append(interleaveIndex + 1);
} else {
unsigned rtpPort;
ARTPConnection::MakePortPair(
&info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
if (mUIDValid) {
HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
(uint32_t)*(uint32_t*) "RTP_");
HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
(uint32_t)*(uint32_t*) "RTP_");
HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID);
HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID);
}
request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
request.append(rtpPort);
request.append("-");
request.append(rtpPort + 1);
}
request.append("\r\n");
if (index > 1) {
request.append("Session: ");
request.append(mSessionID);
request.append("\r\n");
}
request.append("\r\n");
sp<AMessage> reply = new AMessage('setu', this);
reply->setSize("index", index);
reply->setSize("track-index", mTracks.size() - 1);
mConn->sendRequest(request.c_str(), reply);
}

这里的逻辑也很简单就是将要获取到的歌曲信息存放到mTracks,并使用sendRequest发起setu请求,sendRequest就不再作详细介绍了,我们直接看下‘setu’返回后的处理:

case 'setu':
{
size_t index;
CHECK(msg->findSize("index", &index));
TrackInfo *track = NULL;
size_t trackIndex;
if (msg->findSize("track-index", &trackIndex)) {
track = &mTracks.editItemAt(trackIndex);
}

int32_t result;
CHECK(msg->findInt32("result", &result));
if (result == OK) {
CHECK(track != NULL);
sp<RefBase> obj;
CHECK(msg->findObject("response", &obj));
sp<ARTSPResponse> response =
static_cast<ARTSPResponse *>(obj.get());

if (response->mStatusCode != 200) {

} else {
ssize_t i = response->mHeaders.indexOfKey("session");
CHECK_GE(i, 0);
//得到SessionID
mSessionID = response->mHeaders.valueAt(i);
mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
AString timeoutStr;
//........................
sp<AMessage> notify = new AMessage('accu', this);
notify->setSize("track-index", trackIndex);
i = response->mHeaders.indexOfKey("transport");
CHECK_GE(i, 0);
if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
if (!track->mUsingInterleavedTCP) {
AString transport = response->mHeaders.valueAt(i);
// We are going to continue even if we were
// unable to poke a hole into the firewall...
pokeAHole(
track->mRTPSocket,
track->mRTCPSocket,
transport);
}
mRTPConn->addStream(
track->mRTPSocket, track->mRTCPSocket,
mSessionDesc, index,
notify, track->mUsingInterleavedTCP);
mSetupTracksSuccessful = true;
} else {
result = BAD_VALUE;
}
}
}

上面最重要的就是获取SessionID并调用mRTPConn->addStream完ARTPConnection中添加一个流,我们看下addStream:

void ARTPConnection::addStream(
int rtpSocket, int rtcpSocket,
const sp<ASessionDescription> &sessionDesc,
size_t index,
const sp<AMessage> &notify,
bool injected) {
sp<AMessage> msg = new AMessage(kWhatAddStream, this);
msg->setInt32("rtp-socket", rtpSocket);
msg->setInt32("rtcp-socket", rtcpSocket);
msg->setObject("session-desc", sessionDesc);
msg->setSize("index", index);
msg->setMessage("notify", notify);
msg->setInt32("injected", injected);
msg->post();
}
case kWhatAddStream:
{
onAddStream(msg);
break;
}
void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
//将Stream信息添加到mStreams
mStreams.push_back(StreamInfo());
StreamInfo *info = &*--mStreams.end();
int32_t s;
//获得rtp-socket
CHECK(msg->findInt32("rtp-socket", &s));
info->mRTPSocket = s;
//获得rtcp-socket
CHECK(msg->findInt32("rtcp-socket", &s));
info->mRTCPSocket = s;

int32_t injected;
CHECK(msg->findInt32("injected", &injected));

info->mIsInjected = injected;
//获得session-desc
sp<RefBase> obj;
CHECK(msg->findObject("session-desc", &obj));
info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());

CHECK(msg->findSize("index", &info->mIndex));
CHECK(msg->findMessage("notify", &info->mNotifyMsg));

info->mNumRTCPPacketsReceived = 0;
info->mNumRTPPacketsReceived = 0;
memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));

//发送轮询查询事件
if (!injected) {
postPollEvent();
}
}

上面代码中重点关注的是postPollEvent:

void ARTPConnection::postPollEvent() {
if (mPollEventPending) {
return;
}
sp<AMessage> msg = new AMessage(kWhatPollStreams, this);
msg->post();
mPollEventPending = true;
}
case kWhatPollStreams:
{
onPollStreams();
break;
}
void ARTPConnection::onPollStreams() {
mPollEventPending = false;

if (mStreams.empty()) {
return;
}

struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = kSelectTimeoutUs;

fd_set rs;
FD_ZERO(&rs);

int maxSocket = -1;
for (List<StreamInfo>::iterator it = mStreams.begin();
it != mStreams.end(); ++it) {
if ((*it).mIsInjected) {
continue;
}
FD_SET(it->mRTPSocket, &rs);
FD_SET(it->mRTCPSocket, &rs);
if (it->mRTPSocket > maxSocket) {
maxSocket = it->mRTPSocket;
}
if (it->mRTCPSocket > maxSocket) {
maxSocket = it->mRTCPSocket;
}
}

if (maxSocket == -1) {
return;
}

//选择一个网络请求
int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);

if (res > 0) {
//在这里接收服务器发过来的数据
List<StreamInfo>::iterator it = mStreams.begin();
while (it != mStreams.end()) {
if ((*it).mIsInjected) {
++it;
continue;
}
status_t err = OK;
//接受从服务器发来的数据
if (FD_ISSET(it->mRTPSocket, &rs)) {
//调用的是status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP)
err = receive(&*it, true);
}
//接受从服务器发来的数据
if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
//调用的是status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP)
err = receive(&*it, false);
}
++it;
}
}

int64_t nowUs = ALooper::GetNowUs();
if (mLastReceiverReportTimeUs <= 0|| mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
//新建一个缓存区
sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
List<StreamInfo>::iterator it = mStreams.begin();
while (it != mStreams.end()) {
StreamInfo *s = &*it;
if (s->mIsInjected) {
++it;
continue;
}
if (s->mNumRTCPPacketsReceived == 0) {
// We have never received any RTCP packets on this stream,
// we don't even know where to send a report.
++it;
continue;
}

buffer->setRange(0, 0);
for (size_t i = 0; i < s->mSources.size(); ++i) {
sp<ARTPSource> source = s->mSources.valueAt(i);
//填充buffer
source->addReceiverReport(buffer);
if (mFlags & kRegularlyRequestFIR) {
source->addFIR(buffer);
}
}
if (buffer->size() > 0) {
ALOGV("Sending RR...");
ssize_t n;
do {
//通過RTCPSocket發送
n = sendto(s->mRTCPSocket, buffer->data(), buffer->size(), 0,(const struct sockaddr *)&s->mRemoteRTCPAddr, sizeof(s->mRemoteRTCPAddr));
} while (n < 0 && errno == EINTR);
CHECK_EQ(n, (ssize_t)buffer->size());
mLastReceiverReportTimeUs = nowUs;
}
++it;
}
}

if (!mStreams.empty()) {
postPollEvent();
}
}

再onPollStreams中会阻塞监听服务器发过来的媒体数据,并调用receive对其进行处理,并定期发送RTCP消息给服务器。

status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {

ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");

CHECK(!s->mIsInjected);

sp<ABuffer> buffer = new ABuffer(65536);

socklen_t remoteAddrLen =
(!receiveRTP && s->mNumRTCPPacketsReceived == 0)
? sizeof(s->mRemoteRTCPAddr) : 0;

ssize_t nbytes;
do {
//从服务器接收数据
nbytes = recvfrom(
receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
buffer->data(),
buffer->capacity(),
0,
remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
remoteAddrLen > 0 ? &remoteAddrLen : NULL);
} while (nbytes < 0 && errno == EINTR);

if (nbytes <= 0) {
return -ECONNRESET;
}

buffer->setRange(0, nbytes);

// ALOGI("received %d bytes.", buffer->size());

status_t err;
//解析RTP 或者 parseRTCP
if (receiveRTP) {
err = parseRTP(s, buffer);
} else {
err = parseRTCP(s, buffer);
}

return err;
}

receive方法中会调用recvfrom。将数据从服务器中读取到缓存,并调用parseRTP或者parseRTCP对缓存中的数据进行处理

status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {

const uint8_t *data = buffer->data();

if ((data[0] >> 6) != 2) {
// Unsupported version.
return -1;
}

if (data[0] & 0x20) {
// Padding present.

size_t paddingLength = data[size - 1];

if (paddingLength + 12 > size) {
// If we removed this much padding we'd end up with something
// that's too short to be a valid RTP header.
return -1;
}

size -= paddingLength;
}

int numCSRCs = data[0] & 0x0f;

size_t payloadOffset = 12 + 4 * numCSRCs;

if (size < payloadOffset) {
// Not enough data to fit the basic header and all the CSRC entries.
return -1;
}

if (data[0] & 0x10) {
// Header eXtension present.

if (size < payloadOffset + 4) {
// Not enough data to fit the basic header, all CSRC entries
// and the first 4 bytes of the extension header.

return -1;
}

const uint8_t *extensionData = &data[payloadOffset];

size_t extensionLength =
4 * (extensionData[2] << 8 | extensionData[3]);

if (size < payloadOffset + 4 + extensionLength) {
return -1;
}

payloadOffset += 4 + extensionLength;
}

uint32_t srcId = u32at(&data[8]);

sp<ARTPSource> source = findSource(s, srcId);

uint32_t rtpTime = u32at(&data[4]);

sp<AMessage> meta = buffer->meta();
meta->setInt32("ssrc", srcId);
meta->setInt32("rtp-time", rtpTime);
meta->setInt32("PT", data[1] & 0x7f);
meta->setInt32("M", data[1] >> 7);

buffer->setInt32Data(u16at(&data[2]));
buffer->setRange(payloadOffset, size - payloadOffset);
//这里十分重要void ARTPSource::processRTPPacket(const sp<ABuffer> &buffer)
source->processRTPPacket(buffer);
return OK;
}

在parsRTP中根据RTP格式对缓存区中的数据进行解析,最后调用ARTPSource::processRTPPacket进行后续处理。

void ARTPSource::processRTPPacket(const sp<ABuffer> &buffer) {
if (queuePacket(buffer) && mAssembler != NULL) {
mAssembler->onPacketReceived(this);
}
}

processRTPPacket中调用Assembler来将数据进行重组,这里最重要的方法是assembleMore

void ARTPAssembler::onPacketReceived(const sp<ARTPSource> &source) {
AssemblyStatus status;
for (;;) {
//assembleMore
status = assembleMore(source);
if (status == WRONG_SEQUENCE_NUMBER) {
if (mFirstFailureTimeUs >= 0) {
if (ALooper::GetNowUs() - mFirstFailureTimeUs > 10000ll) {
mFirstFailureTimeUs = -1;
// LOG(VERBOSE) << "waited too long for packet.";
packetLost();
continue;
}
} else {
mFirstFailureTimeUs = ALooper::GetNowUs();
}
break;
} else {
mFirstFailureTimeUs = -1;
if (status == NOT_ENOUGH_DATA) {
break;
}
}
}
}
ARTPAssembler::AssemblyStatus AMPEG4AudioAssembler::assembleMore(
const sp<ARTPSource> &source) {
//调用addPacket
AssemblyStatus status = addPacket(source);
if (status == MALFORMED_PACKET) {
mAccessUnitDamaged = true;
}
return status;
}

这里实际上是对无序的数据包进行排序,并调用submitAccessUnit提交AU数据。

ARTPAssembler::AssemblyStatus AMPEG4AudioAssembler::addPacket(
const sp<ARTPSource> &source) {

List<sp<ABuffer> > *queue = source->queue();
if (queue->empty()) {
return NOT_ENOUGH_DATA;
}
if (mNextExpectedSeqNoValid) {
List<sp<ABuffer> >::iterator it = queue->begin();
while (it != queue->end()) {
if ((uint32_t)(*it)->int32Data() >= mNextExpectedSeqNo) {
break;
}
it = queue->erase(it);
}
if (queue->empty()) {
return NOT_ENOUGH_DATA;
}
}

sp<ABuffer> buffer = *queue->begin();

if (!mNextExpectedSeqNoValid) {
mNextExpectedSeqNoValid = true;
mNextExpectedSeqNo = (uint32_t)buffer->int32Data();
} else if ((uint32_t)buffer->int32Data() != mNextExpectedSeqNo) {
#if VERBOSE
LOG(VERBOSE) << "Not the sequence number I expected";
#endif
return WRONG_SEQUENCE_NUMBER;
}

uint32_t rtpTime;
CHECK(buffer->meta()->findInt32("rtp-time", (int32_t *)&rtpTime));

//提交AccessUnit
if (mPackets.size() > 0 && rtpTime != mAccessUnitRTPTime) {
submitAccessUnit();
}
mAccessUnitRTPTime = rtpTime;
//将缓存添加到mPackets
mPackets.push_back(buffer);
queue->erase(queue->begin());
++mNextExpectedSeqNo;
return OK;
}

submitAccessUnit中回调‘accu’,交给MyHandler处理

void AMPEG4AudioAssembler::submitAccessUnit() {
CHECK(!mPackets.empty());

#if VERBOSE
LOG(VERBOSE) << "Access unit complete (" << mPackets.size() << " packets)";
#endif

sp<ABuffer> accessUnit = MakeCompoundFromPackets(mPackets);
accessUnit = removeLATMFraming(accessUnit);
CopyTimes(accessUnit, *mPackets.begin());

if (mAccessUnitDamaged) {
accessUnit->meta()->setInt32("damaged", true);
}

mPackets.clear();
mAccessUnitDamaged = false;
//回调‘accu’
sp<AMessage> msg = mNotifyMsg->dup();
msg->setBuffer("access-unit", accessUnit);
msg->post();
}
case 'accu':
{
int32_t timeUpdate;
if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
size_t trackIndex;
CHECK(msg->findSize("track-index", &trackIndex));

uint32_t rtpTime;
uint64_t ntpTime;
CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));

onTimeUpdate(trackIndex, rtpTime, ntpTime);
break;
}

int32_t first;
if (msg->findInt32("first-rtcp", &first)) {
mReceivedFirstRTCPPacket = true;
break;
}

if (msg->findInt32("first-rtp", &first)) {
mReceivedFirstRTPPacket = true;
break;
}

++mNumAccessUnitsReceived;
postAccessUnitTimeoutCheck();

size_t trackIndex;
CHECK(msg->findSize("track-index", &trackIndex));

if (trackIndex >= mTracks.size()) {
ALOGV("late packets ignored.");
break;
}

TrackInfo *track = &mTracks.editItemAt(trackIndex);

int32_t eos;
if (msg->findInt32("eos", &eos)) {
ALOGI("received BYE on track index %zu", trackIndex);
if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
ALOGI("No time established => fake existing data");

track->mEOSReceived = true;
mTryFakeRTCP = true;
mReceivedFirstRTCPPacket = true;
fakeTimestamps();
} else {
postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
}
return;
}

sp<ABuffer> accessUnit;
//取出accessUnit
CHECK(msg->findBuffer("access-unit", &accessUnit));

uint32_t seqNum = (uint32_t)accessUnit->int32Data();

if (mSeekPending) {
ALOGV("we're seeking, dropping stale packet.");
break;
}

if (seqNum < track->mFirstSeqNumInSegment) {
ALOGV("dropping stale access-unit (%d < %d)",
seqNum, track->mFirstSeqNumInSegment);
break;
}

if (track->mNewSegment) {
track->mNewSegment = false;
}
//调用onAccessUnitComplete
onAccessUnitComplete(trackIndex, accessUnit);
break;
}

‘accu’取出AU数据后调用onAccessUnitComplete进行处理,我们接下来看下这部分逻辑:

void onAccessUnitComplete(
int32_t trackIndex, const sp<ABuffer> &accessUnit) {
ALOGV("onAccessUnitComplete track %d", trackIndex);

if(!mPlayResponseParsed){
ALOGI("play response is not parsed, storing accessunit");
TrackInfo *track = &mTracks.editItemAt(trackIndex);
track->mPackets.push_back(accessUnit);
return;
}

handleFirstAccessUnit();

TrackInfo *track = &mTracks.editItemAt(trackIndex);

if (!mAllTracksHaveTime) {
ALOGV("storing accessUnit, no time established yet");
track->mPackets.push_back(accessUnit);
return;
}

while (!track->mPackets.empty()) {
sp<ABuffer> accessUnit = *track->mPackets.begin();
track->mPackets.erase(track->mPackets.begin());

if (addMediaTimestamp(trackIndex, track, accessUnit)) {
//postQueueAccessUnit
postQueueAccessUnit(trackIndex, accessUnit);
}
}

if (addMediaTimestamp(trackIndex, track, accessUnit)) {
postQueueAccessUnit(trackIndex, accessUnit);
}

if (track->mEOSReceived) {
postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
track->mEOSReceived = false;
}
}
void postQueueAccessUnit(
size_t trackIndex, const sp<ABuffer> &accessUnit) {
sp<AMessage> msg = mNotify->dup();
msg->setInt32("what", kWhatAccessUnit);
msg->setSize("trackIndex", trackIndex);
msg->setBuffer("accessUnit", accessUnit);
msg->post();
}

在RTSPSource中调用AnotherPacketSource queueAccessUnit(accessUnit)

case MyHandler::kWhatAccessUnit:
{
size_t trackIndex;
CHECK(msg->findSize("trackIndex", &trackIndex));

if (mTSParser == NULL) {
CHECK_LT(trackIndex, mTracks.size());
} else {
CHECK_EQ(trackIndex, 0u);
}

sp<ABuffer> accessUnit;
CHECK(msg->findBuffer("accessUnit", &accessUnit));

int32_t damaged;
if (accessUnit->meta()->findInt32("damaged", &damaged)
&& damaged) {
ALOGI("dropping damaged access unit.");
break;
}

if (mTSParser != NULL) {
size_t offset = 0;
status_t err = OK;
while (offset + 188 <= accessUnit->size()) {
err = mTSParser->feedTSPacket(
accessUnit->data() + offset, 188);
if (err != OK) {
break;
}

offset += 188;
}

if (offset < accessUnit->size()) {
err = ERROR_MALFORMED;
}

if (err != OK) {
sp<AnotherPacketSource> source = getSource(false /* audio */);
if (source != NULL) {
source->signalEOS(err);
}

source = getSource(true /* audio */);
if (source != NULL) {
source->signalEOS(err);
}
}
break;
}

TrackInfo *info = &mTracks.editItemAt(trackIndex);

sp<AnotherPacketSource> source = info->mSource;
if (source != NULL) {
uint32_t rtpTime;
CHECK(accessUnit->meta()->findInt32("rtp-time", (int32_t *)&rtpTime));

if (!info->mNPTMappingValid) {
// This is a live stream, we didn't receive any normal
// playtime mapping. We won't map to npt time.
source->queueAccessUnit(accessUnit);
break;
}

int64_t nptUs =
((double)rtpTime - (double)info->mRTPTime)
/ info->mTimeScale
* 1000000ll
+ info->mNormalPlaytimeUs;

accessUnit->meta()->setInt64("timeUs", nptUs);
//。。。。。。。。。。。。。。。
source->queueAccessUnit(accessUnit);
}
break;
}

queueAccessUnit(accessUnit);将AU数据存放到AnotherPacketSource 的mBuffers中供解码器解码播放:

void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) {
int32_t damaged;
if (buffer->meta()->findInt32("damaged", &damaged) && damaged) {
// LOG(VERBOSE) << "discarding damaged AU";
return;
}

Mutex::Autolock autoLock(mLock);
mBuffers.push_back(buffer);
mCondition.signal();

int32_t discontinuity;
if (buffer->meta()->findInt32("discontinuity", &discontinuity)){
ALOGV("queueing a discontinuity with queueAccessUnit");

mLastQueuedTimeUs = 0ll;
mEOSResult = OK;
mLatestEnqueuedMeta = NULL;

mDiscontinuitySegments.push_back(DiscontinuitySegment());
return;
}

int64_t lastQueuedTimeUs;
CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs));
mLastQueuedTimeUs = lastQueuedTimeUs;
ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)",
mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6);

// CHECK(!mDiscontinuitySegments.empty());
DiscontinuitySegment &tailSeg = *(--mDiscontinuitySegments.end());
if (lastQueuedTimeUs > tailSeg.mMaxEnqueTimeUs) {
tailSeg.mMaxEnqueTimeUs = lastQueuedTimeUs;
}
if (tailSeg.mMaxDequeTimeUs == -1) {
tailSeg.mMaxDequeTimeUs = lastQueuedTimeUs;
}

if (mLatestEnqueuedMeta == NULL) {
mLatestEnqueuedMeta = buffer->meta()->dup();
} else {
int64_t latestTimeUs = 0;
int64_t frameDeltaUs = 0;
CHECK(mLatestEnqueuedMeta->findInt64("timeUs", &latestTimeUs));
if (lastQueuedTimeUs > latestTimeUs) {
mLatestEnqueuedMeta = buffer->meta()->dup();
frameDeltaUs = lastQueuedTimeUs - latestTimeUs;
mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
} else if (!mLatestEnqueuedMeta->findInt64("durationUs", &frameDeltaUs)) {
// For B frames
frameDeltaUs = latestTimeUs - lastQueuedTimeUs;
mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
}
}
}

开始播放流程,这部分和介绍HLS的时候是重复的,方便查看,所以粘贴了过来,大体的任务就是初始化解码器,然后开始从输入缓冲区往解码器中添加数据。

void NuPlayer::start() {
(new AMessage(kWhatStart, this))->post();
}
case kWhatStart:
{
ALOGV("kWhatStart");
if (mStarted) {
//...............
} else {
onStart();
}
mPausedByClient = false;
break;
}
void NuPlayer::onStart(int64_t startPositionUs) {
if (!mSourceStarted) {
mSourceStarted = true;
mSource->start();
}

mOffloadAudio = false;
mAudioEOS = false;
mVideoEOS = false;
mStarted = true;

uint32_t flags = 0;

sp<MetaData> audioMeta = mSource->getFormatMeta(true /* audio */);
audio_stream_type_t streamType = AUDIO_STREAM_MUSIC;
if (mAudioSink != NULL) {
streamType = mAudioSink->getAudioStreamType();
}

sp<AMessage> videoFormat = mSource->getFormat(false /* audio */);

sp<AMessage> notify = new AMessage(kWhatRendererNotify, this);
++mRendererGeneration;
notify->setInt32("generation", mRendererGeneration);
mRenderer = new Renderer(mAudioSink, notify, flags);
mRendererLooper = new ALooper;
mRendererLooper->setName("NuPlayerRenderer");
mRendererLooper->start(false, false, ANDROID_PRIORITY_AUDIO);
mRendererLooper->registerHandler(mRenderer);

status_t err = mRenderer->setPlaybackSettings(mPlaybackSettings);

float rate = getFrameRate();
if (rate > 0) {
mRenderer->setVideoFrameRate(rate);
}

if (mVideoDecoder != NULL) {
mVideoDecoder->setRenderer(mRenderer);
}
if (mAudioDecoder != NULL) {
mAudioDecoder->setRenderer(mRenderer);
}

postScanSources();
}

紧接着我们看下初始化编码器部分:

void NuPlayer::postScanSources() {
if (mScanSourcesPending) {
return;
}
sp<AMessage> msg = new AMessage(kWhatScanSources, this);
msg->setInt32("generation", mScanSourcesGeneration);
msg->post();
mScanSourcesPending = true;
}
case kWhatScanSources:
{
int32_t generation;

mScanSourcesPending = false;

bool mHadAnySourcesBefore =
(mAudioDecoder != NULL) || (mVideoDecoder != NULL);

// initialize video before audio because successful initialization of
// video may change deep buffer mode of audio.
if (mSurface != NULL) {
instantiateDecoder(false, &mVideoDecoder);
}

// Don't try to re-open audio sink if there's an existing decoder.
if (mAudioSink != NULL && mAudioDecoder == NULL) {
instantiateDecoder(true, &mAudioDecoder);
}
}
status_t NuPlayer::instantiateDecoder(bool audio, sp<DecoderBase> *decoder) {

//获取格式
sp<AMessage> format = mSource->getFormat(audio);
format->setInt32("priority", 0 /* realtime */);

if (audio) {
sp<AMessage> notify = new AMessage(kWhatAudioNotify, this);
++mAudioDecoderGeneration;
notify->setInt32("generation", mAudioDecoderGeneration);
determineAudioModeChange();
if (mOffloadAudio) {
//....................
} else {
*decoder = new Decoder(notify, mSource, mPID, mRenderer);
}
} else {
sp<AMessage> notify = new AMessage(kWhatVideoNotify, this);
++mVideoDecoderGeneration;
notify->setInt32("generation", mVideoDecoderGeneration);
*decoder = new Decoder(notify, mSource, mPID, mRenderer, mSurface, mCCDecoder);
//...........................
}
//解码器初始化
(*decoder)->init();
//配置解码器
(*decoder)->configure(format);
//.........
return OK;
}

在这里创建出解码器并初始化它。

void NuPlayer::DecoderBase::configure(const sp<AMessage> &format) {
sp<AMessage> msg = new AMessage(kWhatConfigure, this);
msg->setMessage("format", format);
msg->post();
}

void NuPlayer::DecoderBase::init() {
mDecoderLooper->registerHandler(this);
}

void NuPlayer::Decoder::onConfigure(const sp<AMessage> &format) {

//创建MediaCodec
mCodec = MediaCodec::CreateByType(mCodecLooper, mime.c_str(), false /* encoder */, NULL /* err */, mPid);
//配置MediaCodec
err = mCodec->configure(format, mSurface, NULL /* crypto */, 0 /* flags */);
//如果是视频文件则设置宽高
if (!mIsAudio) {
int32_t width, height;
if (mOutputFormat->findInt32("width", &width)&& mOutputFormat->findInt32("height", &height)) {
mStats->setInt32("width", width);
mStats->setInt32("height", height);
}
}
//启动MediaCodec
err = mCodec->start();
}
sp<MediaCodec> MediaCodec::CreateByType(const sp<ALooper> &looper, const char *mime, bool encoder, status_t *err, pid_t pid) {
sp<MediaCodec> codec = new MediaCodec(looper, pid);
const status_t ret = codec->init(mime, true /* nameIsType */, encoder);
return ret == OK ? codec : NULL; // NULL deallocates codec.
}

这里说明mCodec是一个ACodec对象

status_t MediaCodec::init(const AString &name, bool nameIsType, bool encoder) {
mResourceManagerService->init();
if (nameIsType || !strncasecmp(name.c_str(), "omx.", 4)) {
//根据名称创建Codec
mCodec = new ACodec;
} else if (!nameIsType&& !strncasecmp(name.c_str(), "android.filter.", 15)) {
} else {
}
sp<AMessage> msg = new AMessage(kWhatInit, this);
msg->setString("name", name);
msg->setInt32("nameIsType", nameIsType);
if (nameIsType) {
msg->setInt32("encoder", encoder);
}
return err;
}
case kWhatInit:
{
//....................
mCodec->initiateAllocateComponent(format);
break;
}
void ACodec::initiateAllocateComponent(const sp<AMessage> &msg) {
msg->setWhat(kWhatAllocateComponent);
msg->setTarget(this);
msg->post();
}
case ACodec::kWhatAllocateComponent:
{
onAllocateComponent(msg);
handled = true;
break;
}

这里开始实例化编码器并设置状态

bool ACodec::UninitializedState::onAllocateComponent(const sp<AMessage> &msg) {

Vector<OMXCodec::CodecNameAndQuirks> matchingCodecs;
AString mime;
AString componentName;
uint32_t quirks = 0;
int32_t encoder = false;
if (msg->findString("componentName", &componentName)) {
ssize_t index = matchingCodecs.add();
OMXCodec::CodecNameAndQuirks *entry = &matchingCodecs.editItemAt(index);
entry->mName = String8(componentName.c_str());

if (!OMXCodec::findCodecQuirks(componentName.c_str(), &entry->mQuirks)) {
entry->mQuirks = 0;
}
} else {
CHECK(msg->findString("mime", &mime));
if (!msg->findInt32("encoder", &encoder)) {
encoder = false;
}
OMXCodec::findMatchingCodecs(
mime.c_str(),
encoder, // createEncoder
NULL, // matchComponentName
0, // flags
&matchingCodecs);
}

sp<CodecObserver> observer = new CodecObserver;
IOMX::node_id node = 0;

status_t err = NAME_NOT_FOUND;
for (size_t matchIndex = 0; matchIndex < matchingCodecs.size();++matchIndex) {
componentName = matchingCodecs.itemAt(matchIndex).mName.string();
quirks = matchingCodecs.itemAt(matchIndex).mQuirks;

pid_t tid = gettid();
int prevPriority = androidGetThreadPriority(tid);
androidSetThreadPriority(tid, ANDROID_PRIORITY_FOREGROUND);
err = omx->allocateNode(componentName.c_str(), observer, &node);
androidSetThreadPriority(tid, prevPriority);
node = 0;
}

notify = new AMessage(kWhatOMXMessageList, mCodec);
observer->setNotificationMessage(notify);

mCodec->mComponentName = componentName;
mCodec->mRenderTracker.setComponentName(componentName);
mCodec->mFlags = 0;
mCodec->mQuirks = quirks;
mCodec->mOMX = omx;
mCodec->mNode = node;

{
sp<AMessage> notify = mCodec->mNotify->dup();
notify->setInt32("what", CodecBase::kWhatComponentAllocated);
notify->setString("componentName", mCodec->mComponentName.c_str());
notify->post();
}

mCodec->changeState(mCodec->mLoadedState);
return true;
}

解码器的配置

status_t MediaCodec::configure(
const sp<AMessage> &format,
const sp<Surface> &surface,
const sp<ICrypto> &crypto,
uint32_t flags) {
sp<AMessage> msg = new AMessage(kWhatConfigure, this);

if (mIsVideo) {
format->findInt32("width", &mVideoWidth);
format->findInt32("height", &mVideoHeight);
if (!format->findInt32("rotation-degrees", &mRotationDegrees)) {
mRotationDegrees = 0;
}
}

msg->setMessage("format", format);
msg->setInt32("flags", flags);
msg->setObject("surface", surface);

//.....................
// save msg for reset
mConfigureMsg = msg;
//.....................
for (int i = 0; i <= kMaxRetry; ++i) {
if (i > 0) {
// Don't try to reclaim resource for the first time.
if (!mResourceManagerService->reclaimResource(resources)) {
break;
}
}
sp<AMessage> response;
err = PostAndAwaitResponse(msg, &response);
//.....................
}
return err;
}
case kWhatConfigure:
{
sp<AReplyToken> replyID;
CHECK(msg->senderAwaitsResponse(&replyID));


sp<RefBase> obj;
CHECK(msg->findObject("surface", &obj));

sp<AMessage> format;
CHECK(msg->findMessage("format", &format));

int32_t push;
if (msg->findInt32("push-blank-buffers-on-shutdown", &push) && push != 0) {
mFlags |= kFlagPushBlankBuffersOnShutdown;
}

if (obj != NULL) {
format->setObject("native-window", obj);
status_t err = handleSetSurface(static_cast<Surface *>(obj.get()));
if (err != OK) {
PostReplyWithError(replyID, err);
break;
}
} else {
handleSetSurface(NULL);
}

mReplyID = replyID;
setState(CONFIGURING);

void *crypto;

uint32_t flags;
CHECK(msg->findInt32("flags", (int32_t *)&flags));

if (flags & CONFIGURE_FLAG_ENCODE) {
format->setInt32("encoder", true);
mFlags |= kFlagIsEncoder;
}
//这里最重要
mCodec->initiateConfigureComponent(format);
break;
}
void ACodec::initiateConfigureComponent(const sp<AMessage> &msg) {
msg->setWhat(kWhatConfigureComponent);
msg->setTarget(this);
msg->post();
}
case ACodec::kWhatConfigureComponent:
{
onConfigureComponent(msg);
handled = true;
break;
}
bool ACodec::LoadedState::onConfigureComponent(
const sp<AMessage> &msg) {
ALOGV("onConfigureComponent");

CHECK(mCodec->mNode != 0);

status_t err = OK;
AString mime;
if (!msg->findString("mime", &mime)) {
err = BAD_VALUE;
} else {
err = mCodec->configureCodec(mime.c_str(), msg);
}
{
sp<AMessage> notify = mCodec->mNotify->dup();
notify->setInt32("what", CodecBase::kWhatComponentConfigured);
notify->setMessage("input-format", mCodec->mInputFormat);
notify->setMessage("output-format", mCodec->mOutputFormat);
notify->post();
}

return true;
}
case CodecBase::kWhatComponentConfigured:
{
if (mState == UNINITIALIZED || mState == INITIALIZED) {
// In case a kWhatError message came in and replied with error,
// we log a warning and ignore.
ALOGW("configure interrupted by error, current state %d", mState);
break;
}
CHECK_EQ(mState, CONFIGURING);

// reset input surface flag
mHaveInputSurface = false;

CHECK(msg->findMessage("input-format", &mInputFormat));
CHECK(msg->findMessage("output-format", &mOutputFormat));

int32_t usingSwRenderer;
if (mOutputFormat->findInt32("using-sw-renderer", &usingSwRenderer)
&& usingSwRenderer) {
mFlags |= kFlagUsesSoftwareRenderer;
}
setState(CONFIGURED);
(new AMessage)->postReply(mReplyID);
break;
}

这里才是解码器最详细的配置,有时间好好针对这个展开研究,这篇博客先针对整个流程进行分析:

status_t ACodec::configureCodec(
const char *mime, const sp<AMessage> &msg) {
int32_t encoder;
if (!msg->findInt32("encoder", &encoder)) {
encoder = false;
}

sp<AMessage> inputFormat = new AMessage();
sp<AMessage> outputFormat = mNotify->dup(); // will use this for kWhatOutputFormatChanged

mIsEncoder = encoder;

mInputMetadataType = kMetadataBufferTypeInvalid;
mOutputMetadataType = kMetadataBufferTypeInvalid;

status_t err = setComponentRole(encoder /* isEncoder */, mime);

if (err != OK) {
return err;
}

int32_t bitRate = 0;
// FLAC encoder doesn't need a bitrate, other encoders do
if (encoder && strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_FLAC)
&& !msg->findInt32("bitrate", &bitRate)) {
return INVALID_OPERATION;
}

int32_t storeMeta;
if (encoder
&& msg->findInt32("store-metadata-in-buffers", &storeMeta)
&& storeMeta != 0) {
err = mOMX->storeMetaDataInBuffers(mNode, kPortIndexInput, OMX_TRUE, &mInputMetadataType);
if (err != OK) {
ALOGE("[%s] storeMetaDataInBuffers (input) failed w/ err %d",
mComponentName.c_str(), err);

return err;
}
// For this specific case we could be using camera source even if storeMetaDataInBuffers
// returns Gralloc source. Pretend that we are; this will force us to use nBufferSize.
if (mInputMetadataType == kMetadataBufferTypeGrallocSource) {
mInputMetadataType = kMetadataBufferTypeCameraSource;
}

uint32_t usageBits;
if (mOMX->getParameter(
mNode, (OMX_INDEXTYPE)OMX_IndexParamConsumerUsageBits,
&usageBits, sizeof(usageBits)) == OK) {
inputFormat->setInt32(
"using-sw-read-often", !!(usageBits & GRALLOC_USAGE_SW_READ_OFTEN));
}
}

int32_t prependSPSPPS = 0;
if (encoder
&& msg->findInt32("prepend-sps-pps-to-idr-frames", &prependSPSPPS)
&& prependSPSPPS != 0) {
OMX_INDEXTYPE index;
err = mOMX->getExtensionIndex(
mNode,
"OMX.google.android.index.prependSPSPPSToIDRFrames",
&index);

if (err == OK) {
PrependSPSPPSToIDRFramesParams params;
InitOMXParams(&params);
params.bEnable = OMX_TRUE;

err = mOMX->setParameter(
mNode, index, &params, sizeof(params));
}

if (err != OK) {
ALOGE("Encoder could not be configured to emit SPS/PPS before "
"IDR frames. (err %d)", err);

return err;
}
}

// Only enable metadata mode on encoder output if encoder can prepend
// sps/pps to idr frames, since in metadata mode the bitstream is in an
// opaque handle, to which we don't have access.
int32_t video = !strncasecmp(mime, "video/", 6);
mIsVideo = video;
if (encoder && video) {
OMX_BOOL enable = (OMX_BOOL) (prependSPSPPS
&& msg->findInt32("store-metadata-in-buffers-output", &storeMeta)
&& storeMeta != 0);

err = mOMX->storeMetaDataInBuffers(mNode, kPortIndexOutput, enable, &mOutputMetadataType);
if (err != OK) {
ALOGE("[%s] storeMetaDataInBuffers (output) failed w/ err %d",
mComponentName.c_str(), err);
}

if (!msg->findInt64(
"repeat-previous-frame-after",
&mRepeatFrameDelayUs)) {
mRepeatFrameDelayUs = -1ll;
}

if (!msg->findInt64("max-pts-gap-to-encoder", &mMaxPtsGapUs)) {
mMaxPtsGapUs = -1ll;
}

if (!msg->findFloat("max-fps-to-encoder", &mMaxFps)) {
mMaxFps = -1;
}

if (!msg->findInt64("time-lapse", &mTimePerCaptureUs)) {
mTimePerCaptureUs = -1ll;
}

if (!msg->findInt32(
"create-input-buffers-suspended",
(int32_t*)&mCreateInputBuffersSuspended)) {
mCreateInputBuffersSuspended = false;
}
}

// NOTE: we only use native window for video decoders
sp<RefBase> obj;
bool haveNativeWindow = msg->findObject("native-window", &obj)
&& obj != NULL && video && !encoder;
mLegacyAdaptiveExperiment = false;
if (video && !encoder) {
inputFormat->setInt32("adaptive-playback", false);

int32_t usageProtected;
if (msg->findInt32("protected", &usageProtected) && usageProtected) {
if (!haveNativeWindow) {
ALOGE("protected output buffers must be sent to an ANativeWindow");
return PERMISSION_DENIED;
}
mFlags |= kFlagIsGrallocUsageProtected;
mFlags |= kFlagPushBlankBuffersToNativeWindowOnShutdown;
}
}
if (haveNativeWindow) {
sp<ANativeWindow> nativeWindow =
static_cast<ANativeWindow *>(static_cast<Surface *>(obj.get()));

// START of temporary support for automatic FRC - THIS WILL BE REMOVED
int32_t autoFrc;
if (msg->findInt32("auto-frc", &autoFrc)) {
bool enabled = autoFrc;
OMX_CONFIG_BOOLEANTYPE config;
InitOMXParams(&config);
config.bEnabled = (OMX_BOOL)enabled;
status_t temp = mOMX->setConfig(
mNode, (OMX_INDEXTYPE)OMX_IndexConfigAutoFramerateConversion,
&config, sizeof(config));
if (temp == OK) {
outputFormat->setInt32("auto-frc", enabled);
} else if (enabled) {
ALOGI("codec does not support requested auto-frc (err %d)", temp);
}
}
// END of temporary support for automatic FRC

int32_t tunneled;
if (msg->findInt32("feature-tunneled-playback", &tunneled) &&
tunneled != 0) {
ALOGI("Configuring TUNNELED video playback.");
mTunneled = true;

int32_t audioHwSync = 0;
if (!msg->findInt32("audio-hw-sync", &audioHwSync)) {
ALOGW("No Audio HW Sync provided for video tunnel");
}
err = configureTunneledVideoPlayback(audioHwSync, nativeWindow);
if (err != OK) {
ALOGE("configureTunneledVideoPlayback(%d,%p) failed!",
audioHwSync, nativeWindow.get());
return err;
}

int32_t maxWidth = 0, maxHeight = 0;
if (msg->findInt32("max-width", &maxWidth) &&
msg->findInt32("max-height", &maxHeight)) {

err = mOMX->prepareForAdaptivePlayback(
mNode, kPortIndexOutput, OMX_TRUE, maxWidth, maxHeight);
if (err != OK) {
ALOGW("[%s] prepareForAdaptivePlayback failed w/ err %d",
mComponentName.c_str(), err);
// allow failure
err = OK;
} else {
inputFormat->setInt32("max-width", maxWidth);
inputFormat->setInt32("max-height", maxHeight);
inputFormat->setInt32("adaptive-playback", true);
}
}
} else {
ALOGV("Configuring CPU controlled video playback.");
mTunneled = false;

// Explicity reset the sideband handle of the window for
// non-tunneled video in case the window was previously used
// for a tunneled video playback.
err = native_window_set_sideband_stream(nativeWindow.get(), NULL);
if (err != OK) {
ALOGE("set_sideband_stream(NULL) failed! (err %d).", err);
return err;
}

// Always try to enable dynamic output buffers on native surface
err = mOMX->storeMetaDataInBuffers(
mNode, kPortIndexOutput, OMX_TRUE, &mOutputMetadataType);
if (err != OK) {
ALOGE("[%s] storeMetaDataInBuffers failed w/ err %d",
mComponentName.c_str(), err);

// if adaptive playback has been requested, try JB fallback
// NOTE: THIS FALLBACK MECHANISM WILL BE REMOVED DUE TO ITS
// LARGE MEMORY REQUIREMENT

// we will not do adaptive playback on software accessed
// surfaces as they never had to respond to changes in the
// crop window, and we don't trust that they will be able to.
int usageBits = 0;
bool canDoAdaptivePlayback;

if (nativeWindow->query(
nativeWindow.get(),
NATIVE_WINDOW_CONSUMER_USAGE_BITS,
&usageBits) != OK) {
canDoAdaptivePlayback = false;
} else {
canDoAdaptivePlayback =
(usageBits &
(GRALLOC_USAGE_SW_READ_MASK |
GRALLOC_USAGE_SW_WRITE_MASK)) == 0;
}

int32_t maxWidth = 0, maxHeight = 0;
if (canDoAdaptivePlayback &&
msg->findInt32("max-width", &maxWidth) &&
msg->findInt32("max-height", &maxHeight)) {
ALOGV("[%s] prepareForAdaptivePlayback(%dx%d)",
mComponentName.c_str(), maxWidth, maxHeight);

err = mOMX->prepareForAdaptivePlayback(
mNode, kPortIndexOutput, OMX_TRUE, maxWidth,
maxHeight);
ALOGW_IF(err != OK,
"[%s] prepareForAdaptivePlayback failed w/ err %d",
mComponentName.c_str(), err);

if (err == OK) {
inputFormat->setInt32("max-width", maxWidth);
inputFormat->setInt32("max-height", maxHeight);
inputFormat->setInt32("adaptive-playback", true);
}
}
// allow failure
err = OK;
} else {
ALOGV("[%s] storeMetaDataInBuffers succeeded",
mComponentName.c_str());
CHECK(storingMetadataInDecodedBuffers());
mLegacyAdaptiveExperiment = ADebug::isExperimentEnabled(
"legacy-adaptive", !msg->contains("no-experiments"));

inputFormat->setInt32("adaptive-playback", true);
}

int32_t push;
if (msg->findInt32("push-blank-buffers-on-shutdown", &push)
&& push != 0) {
mFlags |= kFlagPushBlankBuffersToNativeWindowOnShutdown;
}
}

int32_t rotationDegrees;
if (msg->findInt32("rotation-degrees", &rotationDegrees)) {
mRotationDegrees = rotationDegrees;
} else {
mRotationDegrees = 0;
}
}

if (video) {
// determine need for software renderer
bool usingSwRenderer = false;
if (haveNativeWindow && mComponentName.startsWith("OMX.google.")) {
usingSwRenderer = true;
haveNativeWindow = false;
}

if (encoder) {
err = setupVideoEncoder(mime, msg);
} else {
err = setupVideoDecoder(mime, msg, haveNativeWindow);
}

if (err != OK) {
return err;
}

if (haveNativeWindow) {
mNativeWindow = static_cast<Surface *>(obj.get());
}

// initialize native window now to get actual output format
// TODO: this is needed for some encoders even though they don't use native window
err = initNativeWindow();
if (err != OK) {
return err;
}

// fallback for devices that do not handle flex-YUV for native buffers
if (haveNativeWindow) {
int32_t requestedColorFormat = OMX_COLOR_FormatUnused;
if (msg->findInt32("color-format", &requestedColorFormat) &&
requestedColorFormat == OMX_COLOR_FormatYUV420Flexible) {
status_t err = getPortFormat(kPortIndexOutput, outputFormat);
if (err != OK) {
return err;
}
int32_t colorFormat = OMX_COLOR_FormatUnused;
OMX_U32 flexibleEquivalent = OMX_COLOR_FormatUnused;
if (!outputFormat->findInt32("color-format", &colorFormat)) {
ALOGE("ouptut port did not have a color format (wrong domain?)");
return BAD_VALUE;
}
ALOGD("[%s] Requested output format %#x and got %#x.",
mComponentName.c_str(), requestedColorFormat, colorFormat);
if (!isFlexibleColorFormat(
mOMX, mNode, colorFormat, haveNativeWindow, &flexibleEquivalent)
|| flexibleEquivalent != (OMX_U32)requestedColorFormat) {
// device did not handle flex-YUV request for native window, fall back
// to SW renderer
ALOGI("[%s] Falling back to software renderer", mComponentName.c_str());
mNativeWindow.clear();
mNativeWindowUsageBits = 0;
haveNativeWindow = false;
usingSwRenderer = true;
if (storingMetadataInDecodedBuffers()) {
err = mOMX->storeMetaDataInBuffers(
mNode, kPortIndexOutput, OMX_FALSE, &mOutputMetadataType);
mOutputMetadataType = kMetadataBufferTypeInvalid; // just in case
// TODO: implement adaptive-playback support for bytebuffer mode.
// This is done by SW codecs, but most HW codecs don't support it.
inputFormat->setInt32("adaptive-playback", false);
}
if (err == OK) {
err = mOMX->enableGraphicBuffers(mNode, kPortIndexOutput, OMX_FALSE);
}
if (mFlags & kFlagIsGrallocUsageProtected) {
// fallback is not supported for protected playback
err = PERMISSION_DENIED;
} else if (err == OK) {
err = setupVideoDecoder(mime, msg, false);
}
}
}
}

if (usingSwRenderer) {
outputFormat->setInt32("using-sw-renderer", 1);
}
} else if (!strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_MPEG)) {
int32_t numChannels, sampleRate;
if (!msg->findInt32("channel-count", &numChannels)
|| !msg->findInt32("sample-rate", &sampleRate)) {
// Since we did not always check for these, leave them optional
// and have the decoder figure it all out.
err = OK;
} else {
err = setupRawAudioFormat(
encoder ? kPortIndexInput : kPortIndexOutput,
sampleRate,
numChannels);
}
} else if (!strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_AAC)) {
int32_t numChannels, sampleRate;
if (!msg->findInt32("channel-count", &numChannels)
|| !msg->findInt32("sample-rate", &sampleRate)) {
err = INVALID_OPERATION;
} else {
int32_t isADTS, aacProfile;
int32_t sbrMode;
int32_t maxOutputChannelCount;
int32_t pcmLimiterEnable;
drcParams_t drc;
if (!msg->findInt32("is-adts", &isADTS)) {
isADTS = 0;
}
if (!msg->findInt32("aac-profile", &aacProfile)) {
aacProfile = OMX_AUDIO_AACObjectNull;
}
if (!msg->findInt32("aac-sbr-mode", &sbrMode)) {
sbrMode = -1;
}

if (!msg->findInt32("aac-max-output-channel_count", &maxOutputChannelCount)) {
maxOutputChannelCount = -1;
}
if (!msg->findInt32("aac-pcm-limiter-enable", &pcmLimiterEnable)) {
// value is unknown
pcmLimiterEnable = -1;
}
if (!msg->findInt32("aac-encoded-target-level", &drc.encodedTargetLevel)) {
// value is unknown
drc.encodedTargetLevel = -1;
}
if (!msg->findInt32("aac-drc-cut-level", &drc.drcCut)) {
// value is unknown
drc.drcCut = -1;
}
if (!msg->findInt32("aac-drc-boost-level", &drc.drcBoost)) {
// value is unknown
drc.drcBoost = -1;
}
if (!msg->findInt32("aac-drc-heavy-compression", &drc.heavyCompression)) {
// value is unknown
drc.heavyCompression = -1;
}
if (!msg->findInt32("aac-target-ref-level", &drc.targetRefLevel)) {
// value is unknown
drc.targetRefLevel = -1;
}

err = setupAACCodec(
encoder, numChannels, sampleRate, bitRate, aacProfile,
isADTS != 0, sbrMode, maxOutputChannelCount, drc,
pcmLimiterEnable);
}
} else if (!strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_AMR_NB)) {
err = setupAMRCodec(encoder, false /* isWAMR */, bitRate);
} else if (!strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_AMR_WB)) {
err = setupAMRCodec(encoder, true /* isWAMR */, bitRate);
} else if (!strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_G711_ALAW)
|| !strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_G711_MLAW)) {
// These are PCM-like formats with a fixed sample rate but
// a variable number of channels.

int32_t numChannels;
if (!msg->findInt32("channel-count", &numChannels)) {
err = INVALID_OPERATION;
} else {
int32_t sampleRate;
if (!msg->findInt32("sample-rate", &sampleRate)) {
sampleRate = 8000;
}
err = setupG711Codec(encoder, sampleRate, numChannels);
}
} else if (!strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_FLAC)) {
int32_t numChannels = 0, sampleRate = 0, compressionLevel = -1;
if (encoder &&
(!msg->findInt32("channel-count", &numChannels)
|| !msg->findInt32("sample-rate", &sampleRate))) {
ALOGE("missing channel count or sample rate for FLAC encoder");
err = INVALID_OPERATION;
} else {
if (encoder) {
if (!msg->findInt32(
"complexity", &compressionLevel) &&
!msg->findInt32(
"flac-compression-level", &compressionLevel)) {
compressionLevel = 5; // default FLAC compression level
} else if (compressionLevel < 0) {
ALOGW("compression level %d outside [0..8] range, "
"using 0",
compressionLevel);
compressionLevel = 0;
} else if (compressionLevel > 8) {
ALOGW("compression level %d outside [0..8] range, "
"using 8",
compressionLevel);
compressionLevel = 8;
}
}
err = setupFlacCodec(
encoder, numChannels, sampleRate, compressionLevel);
}
} else if (!strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_RAW)) {
int32_t numChannels, sampleRate;
if (encoder
|| !msg->findInt32("channel-count", &numChannels)
|| !msg->findInt32("sample-rate", &sampleRate)) {
err = INVALID_OPERATION;
} else {
err = setupRawAudioFormat(kPortIndexInput, sampleRate, numChannels);
}
} else if (!strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_AC3)) {
int32_t numChannels;
int32_t sampleRate;
if (!msg->findInt32("channel-count", &numChannels)
|| !msg->findInt32("sample-rate", &sampleRate)) {
err = INVALID_OPERATION;
} else {
err = setupAC3Codec(encoder, numChannels, sampleRate);
}
} else if (!strcasecmp(mime, MEDIA_MIMETYPE_AUDIO_EAC3)) {
int32_t numChannels;
int32_t sampleRate;
if (!msg->findInt32("channel-count", &numChannels)
|| !msg->findInt32("sample-rate", &sampleRate)) {
err = INVALID_OPERATION;
} else {
err = setupEAC3Codec(encoder, numChannels, sampleRate);
}
}

if (err != OK) {
return err;
}

if (!msg->findInt32("encoder-delay", &mEncoderDelay)) {
mEncoderDelay = 0;
}

if (!msg->findInt32("encoder-padding", &mEncoderPadding)) {
mEncoderPadding = 0;
}

if (msg->findInt32("channel-mask", &mChannelMask)) {
mChannelMaskPresent = true;
} else {
mChannelMaskPresent = false;
}

int32_t maxInputSize;
if (msg->findInt32("max-input-size", &maxInputSize)) {
err = setMinBufferSize(kPortIndexInput, (size_t)maxInputSize);
} else if (!strcmp("OMX.Nvidia.aac.decoder", mComponentName.c_str())) {
err = setMinBufferSize(kPortIndexInput, 8192); // XXX
}

int32_t priority;
if (msg->findInt32("priority", &priority)) {
err = setPriority(priority);
}

int32_t rateInt = -1;
float rateFloat = -1;
if (!msg->findFloat("operating-rate", &rateFloat)) {
msg->findInt32("operating-rate", &rateInt);
rateFloat = (float)rateInt; // 16MHz (FLINTMAX) is OK for upper bound.
}
if (rateFloat > 0) {
err = setOperatingRate(rateFloat, video);
}

mBaseOutputFormat = outputFormat;

err = getPortFormat(kPortIndexInput, inputFormat);
if (err == OK) {
err = getPortFormat(kPortIndexOutput, outputFormat);
if (err == OK) {
mInputFormat = inputFormat;
mOutputFormat = outputFormat;
}
}
return err;
}

到了这里整个解码器的初始化和配置已经结束了,我们看下解码器的start阶段:

status_t MediaCodec::start() {
sp<AMessage> msg = new AMessage(kWhatStart, this);

status_t err;
Vector<MediaResource> resources;
const char *type = (mFlags & kFlagIsSecure) ?
kResourceSecureCodec : kResourceNonSecureCodec;
const char *subtype = mIsVideo ? kResourceVideoCodec : kResourceAudioCodec;
resources.push_back(MediaResource(String8(type), String8(subtype), 1));
// Don't know the buffer size at this point, but it's fine to use 1 because
// the reclaimResource call doesn't consider the requester's buffer size for now.
resources.push_back(MediaResource(String8(kResourceGraphicMemory), 1));
for (int i = 0; i <= kMaxRetry; ++i) {
if (i > 0) {
// Don't try to reclaim resource for the first time.
if (!mResourceManagerService->reclaimResource(resources)) {
break;
}
// Recover codec from previous error before retry start.
err = reset();
if (err != OK) {
ALOGE("retrying start: failed to reset codec");
break;
}
sp<AMessage> response;
err = PostAndAwaitResponse(mConfigureMsg, &response);
if (err != OK) {
ALOGE("retrying start: failed to configure codec");
break;
}
}
sp<AMessage> response;
err = PostAndAwaitResponse(msg, &response);
if (!isResourceError(err)) {
break;
}
}
return err;
}
case kWhatStart:
{
sp<AReplyToken> replyID;
CHECK(msg->senderAwaitsResponse(&replyID));

if (mState == FLUSHED) {
setState(STARTED);
if (mHavePendingInputBuffers) {
onInputBufferAvailable();
mHavePendingInputBuffers = false;
}
//我们重点看这里
mCodec->signalResume();
//..................
PostReplyWithError(replyID, OK);
break;
} else if (mState != CONFIGURED) {
PostReplyWithError(replyID, INVALID_OPERATION);
break;
}

mReplyID = replyID;
setState(STARTING);

mCodec->initiateStart();
break;
}

首先調用initiateStart初始化解码器状态

void ACodec::initiateStart() {
(new AMessage(kWhatStart, this))->post();
}
case ACodec::kWhatStart:
{
onStart();
handled = true;
break;
}
void ACodec::LoadedState::onStart() {
ALOGV("onStart");

status_t err = mCodec->mOMX->sendCommand(mCodec->mNode, OMX_CommandStateSet, OMX_StateIdle);
if (err != OK) {
mCodec->signalError(OMX_ErrorUndefined, makeNoSideEffectStatus(err));
} else {
mCodec->changeState(mCodec->mLoadedToIdleState);
}
}

接着开始获取数据进行解码

void ACodec::signalResume() {
(new AMessage(kWhatResume, this))->post();
}
case kWhatResume:
{
resume();
handled = true;
break;
}
void ACodec::ExecutingState::resume() {

submitOutputBuffers();
// Post all available input buffers
if (mCodec->mBuffers[kPortIndexInput].size() == 0u) {
ALOGW("[%s] we don't have any input buffers to resume", mCodec->mComponentName.c_str());
}

for (size_t i = 0; i < mCodec->mBuffers[kPortIndexInput].size(); i++) {
BufferInfo *info = &mCodec->mBuffers[kPortIndexInput].editItemAt(i);
if (info->mStatus == BufferInfo::OWNED_BY_US) {
postFillThisBuffer(info);
}
}
mActive = true;
}
void ACodec::BaseState::postFillThisBuffer(BufferInfo *info) {
if (mCodec->mPortEOS[kPortIndexInput]) {
return;
}

CHECK_EQ((int)info->mStatus, (int)BufferInfo::OWNED_BY_US);
sp<AMessage> notify = mCodec->mNotify->dup();
notify->setInt32("what", CodecBase::kWhatFillThisBuffer);
notify->setInt32("buffer-id", info->mBufferID);
info->mData->meta()->clear();
notify->setBuffer("buffer", info->mData);
sp<AMessage> reply = new AMessage(kWhatInputBufferFilled, mCodec);
reply->setInt32("buffer-id", info->mBufferID);
notify->setMessage("reply", reply);
notify->post();
info->mStatus = BufferInfo::OWNED_BY_UPSTREAM;
}
case CodecBase::kWhatFillThisBuffer:
{

//..........
if (mFlags & kFlagIsAsync) {
if (!mHaveInputSurface) {
if (mState == FLUSHED) {
mHavePendingInputBuffers = true;
} else {
onInputBufferAvailable();
}
}
} else if (mFlags & kFlagDequeueInputPending) {
CHECK(handleDequeueInputBuffer(mDequeueInputReplyID));
++mDequeueInputTimeoutGeneration;
mFlags &= ~kFlagDequeueInputPending;
mDequeueInputReplyID = 0;
} else {
postActivityNotificationIfPossible();
}
break;
}
void MediaCodec::onInputBufferAvailable() {
int32_t index;
while ((index = dequeuePortBuffer(kPortIndexInput)) >= 0) {
sp<AMessage> msg = mCallback->dup();
msg->setInt32("callbackID", CB_INPUT_AVAILABLE);
msg->setInt32("index", index);
msg->post();
}
}

还记得这个mCallback怎么来的吗?

void NuPlayer::Decoder::onConfigure(const sp<AMessage> &format) {

//.................
sp<AMessage> reply = new AMessage(kWhatCodecNotify, this);
mCodec->setCallback(reply);
//..................
}
status_t MediaCodec::setCallback(const sp<AMessage> &callback) {
sp<AMessage> msg = new AMessage(kWhatSetCallback, this);
msg->setMessage("callback", callback);

sp<AMessage> response;
return PostAndAwaitResponse(msg, &response);
}
case kWhatSetCallback:
{
sp<AReplyToken> replyID;
CHECK(msg->senderAwaitsResponse(&replyID));
sp<AMessage> callback;
CHECK(msg->findMessage("callback", &callback));

mCallback = callback;

if (mCallback != NULL) {
mFlags |= kFlagIsAsync;
} else {
mFlags &= ~kFlagIsAsync;
}

sp<AMessage> response = new AMessage;
response->postReply(replyID);
break;
}

所以根据上面我们可以知道接下来i调用的是kWhatCodecNotify 下的 CB_INPUT_AVAILABLE

case MediaCodec::CB_INPUT_AVAILABLE:
{
int32_t index;
CHECK(msg->findInt32("index", &index));

handleAnInputBuffer(index);
break;
}
bool NuPlayer::Decoder::handleAnInputBuffer(size_t index) {
if (isDiscontinuityPending()) {
return false;
}

sp<ABuffer> buffer;
mCodec->getInputBuffer(index, &buffer);

if (buffer == NULL) {
handleError(UNKNOWN_ERROR);
return false;
}

if (index >= mInputBuffers.size()) {
for (size_t i = mInputBuffers.size(); i <= index; ++i) {
mInputBuffers.add();
mMediaBuffers.add();
mInputBufferIsDequeued.add();
mMediaBuffers.editItemAt(i) = NULL;
mInputBufferIsDequeued.editItemAt(i) = false;
}
}
mInputBuffers.editItemAt(index) = buffer;

//CHECK_LT(bufferIx, mInputBuffers.size());

if (mMediaBuffers[index] != NULL) {
mMediaBuffers[index]->release();
mMediaBuffers.editItemAt(index) = NULL;
}
mInputBufferIsDequeued.editItemAt(index) = true;

if (!mCSDsToSubmit.isEmpty()) {
sp<AMessage> msg = new AMessage();
msg->setSize("buffer-ix", index);

sp<ABuffer> buffer = mCSDsToSubmit.itemAt(0);
ALOGI("[%s] resubmitting CSD", mComponentName.c_str());
msg->setBuffer("buffer", buffer);
mCSDsToSubmit.removeAt(0);
CHECK(onInputBufferFetched(msg));
return true;
}

while (!mPendingInputMessages.empty()) {
sp<AMessage> msg = *mPendingInputMessages.begin();
if (!onInputBufferFetched(msg)) {
break;
}
mPendingInputMessages.erase(mPendingInputMessages.begin());
}

if (!mInputBufferIsDequeued.editItemAt(index)) {
return true;
}

mDequeuedInputBuffers.push_back(index);

onRequestInputBuffers();
return true;
}
void NuPlayer::DecoderBase::onRequestInputBuffers() {
if (mRequestInputBuffersPending) {
return;
}

// doRequestBuffers() return true if we should request more data
if (doRequestBuffers()) {
mRequestInputBuffersPending = true;

sp<AMessage> msg = new AMessage(kWhatRequestInputBuffers, this);
msg->post(10 * 1000ll);
}
}
bool NuPlayer::Decoder::doRequestBuffers() {
// mRenderer is only NULL if we have a legacy widevine source that
// is not yet ready. In this case we must not fetch input.
if (isDiscontinuityPending() || mRenderer == NULL) {
return false;
}
status_t err = OK;
while (err == OK && !mDequeuedInputBuffers.empty()) {
size_t bufferIx = *mDequeuedInputBuffers.begin();
sp<AMessage> msg = new AMessage();
msg->setSize("buffer-ix", bufferIx);
err = fetchInputData(msg);
if (err != OK && err != ERROR_END_OF_STREAM) {
// if EOS, need to queue EOS buffer
break;
}
mDequeuedInputBuffers.erase(mDequeuedInputBuffers.begin());

if (!mPendingInputMessages.empty()
|| !onInputBufferFetched(msg)) {
mPendingInputMessages.push_back(msg);
}
}

return err == -EWOULDBLOCK
&& mSource->feedMoreTSData() == OK;
}
status_t NuPlayer::Decoder::fetchInputData(sp<AMessage> &reply) {
sp<ABuffer> accessUnit;
bool dropAccessUnit;
do {
status_t err = mSource->dequeueAccessUnit(mIsAudio, &accessUnit);

if (err == -EWOULDBLOCK) {
return err;
} else if (err != OK) {
if (err == INFO_DISCONTINUITY) {
int32_t type;
CHECK(accessUnit->meta()->findInt32("discontinuity", &type));

bool formatChange =
(mIsAudio &&
(type & ATSParser::DISCONTINUITY_AUDIO_FORMAT))
|| (!mIsAudio &&
(type & ATSParser::DISCONTINUITY_VIDEO_FORMAT));

bool timeChange = (type & ATSParser::DISCONTINUITY_TIME) != 0;

ALOGI("%s discontinuity (format=%d, time=%d)",
mIsAudio ? "audio" : "video", formatChange, timeChange);

bool seamlessFormatChange = false;
sp<AMessage> newFormat = mSource->getFormat(mIsAudio);
if (formatChange) {
seamlessFormatChange =
supportsSeamlessFormatChange(newFormat);
// treat seamless format change separately
formatChange = !seamlessFormatChange;
}

// For format or time change, return EOS to queue EOS input,
// then wait for EOS on output.
if (formatChange /* not seamless */) {
mFormatChangePending = true;
err = ERROR_END_OF_STREAM;
} else if (timeChange) {
rememberCodecSpecificData(newFormat);
mTimeChangePending = true;
err = ERROR_END_OF_STREAM;
} else if (seamlessFormatChange) {
// reuse existing decoder and don't flush
rememberCodecSpecificData(newFormat);
continue;
} else {
// This stream is unaffected by the discontinuity
return -EWOULDBLOCK;
}
}

// reply should only be returned without a buffer set
// when there is an error (including EOS)
CHECK(err != OK);

reply->setInt32("err", err);
return ERROR_END_OF_STREAM;
}

dropAccessUnit = false;
if (!mIsAudio
&& !mIsSecure
&& mRenderer->getVideoLateByUs() > 100000ll
&& mIsVideoAVC
&& !IsAVCReferenceFrame(accessUnit)) {
dropAccessUnit = true;
++mNumInputFramesDropped;
}
} while (dropAccessUnit);

// ALOGV("returned a valid buffer of %s data", mIsAudio ? "mIsAudio" : "video");
#if 0
int64_t mediaTimeUs;
CHECK(accessUnit->meta()->findInt64("timeUs", &mediaTimeUs));
ALOGV("[%s] feeding input buffer at media time %.3f",
mIsAudio ? "audio" : "video",
mediaTimeUs / 1E6);
#endif

if (mCCDecoder != NULL) {
mCCDecoder->decode(accessUnit);
}

reply->setBuffer("buffer", accessUnit);

return OK;
}
Contents
  1. 1. RTSP 概述:
  2. 2. RTSP 模型:
  3. 3. RTSP 协议消息格式:
  4. 4. 简单的RTSP 交互过程:
  5. 5. RTSP的主要命令表:
  6. 6. RTSP状态码:
  7. 7. SDP的格式:
  8. 8. RTP协议:
  9. 9. RTP协议报头格式
  10. 10. RTCP协议报头格式
  11. 11. 基于NuPLayer的RTSP 代码流程