流连接建立之后,就可以请求播放了。
先看一下请求播放的流程是怎么样的,具体操作如下:
1、客户端发送命令“播放”给服务器 2、服务器接收到命令之后,发送消息“设置块大小”给客户端 3、服务器发送“stream begin”给客户端,告诉客户端 流的id 4、播放命令成功的话,服务器发送“响应状态”给客户端,告诉客户端播放成功 5、服务器发送音视频数据给客户端
关键的代码同样在HandleInvoke函数中
/*** 处理命令消息** 这些命令消息是使用AMF0格式进行编码的** 下面的代码是精简过的*/static intHandleInvoke(RTMP *r, const char *body, unsigned int nBodySize){ AMFObject obj; AVal method; double txn; int ret = 0, nRes; char pbuf[256], *pend = pbuf + sizeof(pbuf), *enc, **params = NULL; char *host = r->Link.hostname.av_len ? r->Link.hostname.av_val : ""; char *pageUrl = r->Link.pageUrl.av_len ? r->Link.pageUrl.av_val : ""; int param_count; AVal av_Command, av_Response; if (body[0] != 0x02) /* make sure it is a string method name we start with */ { RTMP_Log(RTMP_LOGWARNING, "%s, Sanity failed. no string method in invoke packet", __FUNCTION__); return 0; } nRes = AMF_Decode(&obj, body, nBodySize, FALSE); if (nRes < 0) { RTMP_Log(RTMP_LOGERROR, "%s, error decoding invoke packet", __FUNCTION__); return 0; } AMF_Dump(&obj); AMFPRop_GetString(AMF_GetProp(&obj, NULL, 0), &method); txn = AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1)); RTMP_Log(RTMP_LOGDEBUG, "%s, server invoking <%s>", __FUNCTION__, method.av_val); // 处理”result“ if (AVMATCH(&method, &av__result)) { AVal methodInvoked = { 0 }; int i; for (i = 0; i < r->m_numCalls; i++) { if (r->m_methodCalls[i].num == (int)txn) { methodInvoked = r->m_methodCalls[i].name; AV_erase(r->m_methodCalls, &r->m_numCalls, i, FALSE); break; } } if (!methodInvoked.av_val) { RTMP_Log(RTMP_LOGDEBUG, "%s, received result id %f without matching request", __FUNCTION__, txn); goto leave; } RTMP_Log(RTMP_LOGDEBUG, "%s, received result for method call <%s>", __FUNCTION__, methodInvoked.av_val); // ”connect“命令完成 if (AVMATCH(&methodInvoked, &av_connect)) { if (r->Link.token.av_len) { AMFObjectProperty p; if (RTMP_FindFirstMatchingProperty(&obj, &av_secureToken, &p)) { DecodeTEA(&r->Link.token, &p.p_vu.p_aval); SendSecureTokenResponse(r, &p.p_vu.p_aval); } } if (r->Link.protocol & RTMP_FEATURE_WRITE) { SendReleaseStream(r); SendFCPublish(r); } else { // 向服务器发送”确认窗口大小“ RTMP_SendServerBW(r); RTMP_SendCtrl(r, 3, 0, 300); } // 删除无关代码*** // 创建流 RTMP_SendCreateStream(r); } // 删除无关代码*** // ”创建流“命令成功 else if (AVMATCH(&methodInvoked, &av_createStream)) { r->m_stream_id = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3)); // 删除无关代码*** if (r->Link.protocol & RTMP_FEATURE_WRITE) { SendPublish(r); } else { if (r->Link.lFlags & RTMP_LF_PLST) SendPlaylist(r); // 发送播放命令 SendPlay(r); RTMP_SendCtrl(r, 3, r->m_stream_id, r->m_nBufferMS); } } else if (AVMATCH(&methodInvoked, &av_play) || AVMATCH(&methodInvoked, &av_publish)) { r->m_bPlaying = TRUE; } free(methodInvoked.av_val); } else if (AVMATCH(&method, &av_onBWDone)) { if (!r->m_nBWCheckCounter) SendCheckBW(r); } // 删除其他代码*** // *** // 删除其他代码***leave: AMF_Reset(&obj); return ret;}可以看到,创建流完成之后,即else if (AVMATCH(&methodInvoked, &av_createStream))条件判断里面,调用了SendPlay函数,用来发送播放命令。客户端发送播放命令
static intSendPlay(RTMP *r){ RTMPPacket packet; char pbuf[1024], *pend = pbuf + sizeof(pbuf); char *enc; packet.m_nChannel = 0x08; /* we make 8 our stream channel */ packet.m_headerType = RTMP_PACKET_SIZE_LARGE; packet.m_packetType = RTMP_PACKET_TYPE_INVOKE; packet.m_nTimeStamp = 0; packet.m_nInfoField2 = r->m_stream_id; /*0x01000000; */ packet.m_hasAbsTimestamp = 0; packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE; enc = packet.m_body; // 播放命令 enc = AMF_EncodeString(enc, pend, &av_play); enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes); *enc++ = AMF_NULL; RTMP_Log(RTMP_LOGDEBUG, "%s, seekTime=%d, stopTime=%d, sending play: %s", __FUNCTION__, r->Link.seekTime, r->Link.stopTime, r->Link.playpath.av_val); enc = AMF_EncodeString(enc, pend, &r->Link.playpath); if (!enc) return FALSE; /* Optional parameters start and len. * * start: -2, -1, 0, positive number * -2: looks for a live stream, then a recorded stream, * if not found any open a live stream * -1: plays a live stream * >=0: plays a recorded streams from 'start' milliseconds */ if (r->Link.lFlags & RTMP_LF_LIVE) enc = AMF_EncodeNumber(enc, pend, -1000.0); else { if (r->Link.seekTime > 0.0 || r->Link.stopTime) enc = AMF_EncodeNumber(enc, pend, r->Link.seekTime); /* resume from here */ } if (!enc) return FALSE; /* len: -1, 0, positive number * -1: plays live or recorded stream to the end (default) * 0: plays a frame 'start' ms away from the beginning * >0: plays a live or recoded stream for 'len' milliseconds */ /*enc += EncodeNumber(enc, -1.0); */ /* len */ if (r->Link.stopTime) { enc = AMF_EncodeNumber(enc, pend, r->Link.stopTime - r->Link.seekTime); if (!enc) return FALSE; } packet.m_nBodySize = enc - packet.m_body; // 发送数据包 return RTMP_SendPacket(r, &packet, TRUE);}服务器发送“设置块大小”命令
服务器接收到“播放”命令之后,就向客户端回应“设置块大小”命令,处理这个命令的函数是HandleChangeChunkSize
static voidHandleChangeChunkSize(RTMP *r, const RTMPPacket *packet){ if (packet->m_nBodySize >= 4) { r->m_inChunkSize = AMF_DecodeInt32(packet->m_body); RTMP_Log(RTMP_LOGDEBUG, "%s, received: chunk size change to %d", __FUNCTION__, r->m_inChunkSize); }}服务器发送“stream begin”命令
服务器发送完“设置块大小”命令之后,还会继续发送“stream begin”命令,这个命令由HandleCtrl函数处理
/*** 处理用户控制消息,主要用于控制流的各种状态*/static voidHandleCtrl(RTMP *r, const RTMPPacket *packet){ short nType = -1; unsigned int tmp; if (packet->m_body && packet->m_nBodySize >= 2) nType = AMF_DecodeInt16(packet->m_body); RTMP_Log(RTMP_LOGDEBUG, "%s, received ctrl, type: %d, len: %d", __FUNCTION__, nType, packet->m_nBodySize); /*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */ if (packet->m_nBodySize >= 6) { switch (nType) { // 流开始 case 0: tmp = AMF_DecodeInt32(packet->m_body + 2); RTMP_Log(RTMP_LOGDEBUG, "%s, Stream Begin %d", __FUNCTION__, tmp); break; // 删除无关代码*** } } // 删除无关代码***}服务器发送“响应状态”信息
服务器发送完“stream begin”之后,根据播放是否成功发送“响应状态”信息给客户端,处理这个消息的函数是HandleInvoke
/*** 处理命令消息** 这些命令消息是使用AMF0格式进行编码的*/static intHandleInvoke(RTMP *r, const char *body, unsigned int nBodySize){ AMFObject obj; AVal method; double txn; int ret = 0, nRes; char pbuf[256], *pend = pbuf + sizeof(pbuf), *enc, **params = NULL; char *host = r->Link.hostname.av_len ? r->Link.hostname.av_val : ""; char *pageUrl = r->Link.pageUrl.av_len ? r->Link.pageUrl.av_val : ""; int param_count; AVal av_Command, av_Response; if (body[0] != 0x02) /* make sure it is a string method name we start with */ { RTMP_Log(RTMP_LOGWARNING, "%s, Sanity failed. no string method in invoke packet", __FUNCTION__); return 0; } nRes = AMF_Decode(&obj, body, nBodySize, FALSE); if (nRes < 0) { RTMP_Log(RTMP_LOGERROR, "%s, error decoding invoke packet", __FUNCTION__); return 0; } AMF_Dump(&obj); AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method); txn = AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1)); RTMP_Log(RTMP_LOGDEBUG, "%s, server invoking <%s>", __FUNCTION__, method.av_val); if (AVMATCH(&method, &av__result)) { AVal methodInvoked = { 0 }; int i; for (i = 0; i < r->m_numCalls; i++) { if (r->m_methodCalls[i].num == (int)txn) { methodInvoked = r->m_methodCalls[i].name; AV_erase(r->m_methodCalls, &r->m_numCalls, i, FALSE); break; } } if (!methodInvoked.av_val) { RTMP_Log(RTMP_LOGDEBUG, "%s, received result id %f without matching request", __FUNCTION__, txn); goto leave; } RTMP_Log(RTMP_LOGDEBUG, "%s, received result for method call <%s>", __FUNCTION__, methodInvoked.av_val); // 设置播放成功 if (AVMATCH(&methodInvoked, &av_play) || AVMATCH(&methodInvoked, &av_publish)) { r->m_bPlaying = TRUE; } free(methodInvoked.av_val); } // 删除无关代码*** // 处理响应状态 else if (AVMATCH(&method, &av_onStatus)) { AMFObject obj2; AVal code, level; AMFProp_GetObject(AMF_GetProp(&obj, NULL, 3), &obj2); AMFProp_GetString(AMF_GetProp(&obj2, &av_code, -1), &code); AMFProp_GetString(AMF_GetProp(&obj2, &av_level, -1), &level); RTMP_Log(RTMP_LOGDEBUG, "%s, onStatus: %s", __FUNCTION__, code.av_val); if (AVMATCH(&code, &av_NetStream_Failed) || AVMATCH(&code, &av_NetStream_Play_Failed) || AVMATCH(&code, &av_NetStream_Play_StreamNotFound) || AVMATCH(&code, &av_NetConnection_Connect_InvalidApp)) { r->m_stream_id = -1; RTMP_Close(r); RTMP_Log(RTMP_LOGERROR, "Closing connection: %s", code.av_val); } // 判断播放是否成功 else if (AVMATCH(&code, &av_NetStream_Play_Start) || AVMATCH(&code, &av_NetStream_Play_PublishNotify)) { int i; r->m_bPlaying = TRUE; for (i = 0; i < r->m_numCalls; i++) { if (AVMATCH(&r->m_methodCalls[i].name, &av_play)) { AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE); break; } } } else if (AVMATCH(&code, &av_NetStream_Publish_Start)) { int i; r->m_bPlaying = TRUE; for (i = 0; i < r->m_numCalls; i++) { if (AVMATCH(&r->m_methodCalls[i].name, &av_publish)) { AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE); break; } } } /* Return 1 if this is a Play.Complete or Play.Stop */ else if (AVMATCH(&code, &av_NetStream_Play_Complete) || AVMATCH(&code, &av_NetStream_Play_Stop) || AVMATCH(&code, &av_NetStream_Play_UnpublishNotify)) { RTMP_Close(r); ret = 1; } else if (AVMATCH(&code, &av_NetStream_Seek_Notify)) { r->m_read.flags &= ~RTMP_READ_SEEKING; } else if (AVMATCH(&code, &av_NetStream_Pause_Notify)) { if (r->m_pausing == 1 || r->m_pausing == 2) { RTMP_SendPause(r, FALSE, r->m_pauseStamp); r->m_pausing = 3; } } } // 删除无关代码***leave: AMF_Reset(&obj); return ret;}处理完上面的步骤之后,播放就可以开始了!
新闻热点
疑难解答