首先要知道创建流的时机是在创建完网络连接之后,那么什么时候创建完网络连接呢?答案是在函数HandleInvoke中。因此可以推断,创建流的具体操作也是在这个函数里面!
建立流连接的函数是RTMP_ConnectStream,它的功能是在播放开始之前不断的读取数据包,然后分析数据包的内容,如果有必要的的话就进行解析和处理。
intRTMP_ConnectStream(RTMP *r, int seekTime){ RTMPPacket packet = { 0 }; /* seekTime was already set by SetupStream / SetupURL. * This is only needed by ReconnectStream. */ if (seekTime > 0) r->Link.seekTime = seekTime; r->m_mediaChannel = 0; // 还没开始播放 而且 套接字连接已经建立 而且 读取数据成功 while (!r->m_bPlaying && RTMP_IsConnected(r) && RTMP_ReadPacket(r, &packet)) { if (RTMPPacket_IsReady(&packet)) { if (!packet.m_nBodySize) continue; if ((packet.m_packetType == RTMP_PACKET_TYPE_AUDIO) || (packet.m_packetType == RTMP_PACKET_TYPE_VIDEO) || (packet.m_packetType == RTMP_PACKET_TYPE_INFO)) { RTMP_Log(RTMP_LOGWARNING, "Received FLV packet before play()! Ignoring."); RTMPPacket_Free(&packet); continue; } // 处理数据包 RTMP_ClientPacket(r, &packet); RTMPPacket_Free(&packet); } } return r->m_bPlaying;}剩下的就不多说了,RTMP_ConnectStream调用RTMP_ClientPacket来处理服务器发送来的数据包,根据类型的不同调用不同的函数。
/*** 解析服务器发送来的packet*/intRTMP_ClientPacket(RTMP *r, RTMPPacket *packet){ int bHasMediaPacket = 0; switch (packet->m_packetType) { // 设置块大小 case RTMP_PACKET_TYPE_CHUNK_SIZE: /* chunk size */ HandleChangeChunkSize(r, packet); break; // 报告 case RTMP_PACKET_TYPE_BYTES_READ_REPORT: /* bytes read report */ RTMP_Log(RTMP_LOGDEBUG, "%s, received: bytes read report", __FUNCTION__); break; // 用户的控制命令 case RTMP_PACKET_TYPE_CONTROL: /* ctrl */ HandleCtrl(r, packet); break; // 确认窗口大小 Window Acknowledgement Size case RTMP_PACKET_TYPE_SERVER_BW: /* server bw */ HandleServerBW(r, packet); break; // 设置带宽 case RTMP_PACKET_TYPE_CLIENT_BW: /* client bw */ HandleClientBW(r, packet); break; // 音频数据 case RTMP_PACKET_TYPE_AUDIO: /* audio data */ /*RTMP_Log(RTMP_LOGDEBUG, "%s, received: audio %lu bytes", __FUNCTION__, packet.m_nBodySize); */ HandleAudio(r, packet); bHasMediaPacket = 1; if (!r->m_mediaChannel) r->m_mediaChannel = packet->m_nChannel; if (!r->m_pausing) r->m_mediaStamp = packet->m_nTimeStamp; break; // 视频数据 case RTMP_PACKET_TYPE_VIDEO: /* video data */ /*RTMP_Log(RTMP_LOGDEBUG, "%s, received: video %lu bytes", __FUNCTION__, packet.m_nBodySize); */ HandleVideo(r, packet); bHasMediaPacket = 1; if (!r->m_mediaChannel) r->m_mediaChannel = packet->m_nChannel; if (!r->m_pausing) r->m_mediaStamp = packet->m_nTimeStamp; break; // flex 流发送(AMF3编码) case RTMP_PACKET_TYPE_FLEX_STREAM_SEND: /* flex stream send */ RTMP_Log(RTMP_LOGDEBUG, "%s, flex stream send, size %u bytes, not supported, ignoring", __FUNCTION__, packet->m_nBodySize); break; // flex共享对象(AMF3编码) case RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT: /* flex shared object */ RTMP_Log(RTMP_LOGDEBUG, "%s, flex shared object, size %u bytes, not supported, ignoring", __FUNCTION__, packet->m_nBodySize); break; // flex消息(AMF3编码) case RTMP_PACKET_TYPE_FLEX_MESSAGE: /* flex message */ { RTMP_Log(RTMP_LOGDEBUG, "%s, flex message, size %u bytes, not fully supported", __FUNCTION__, packet->m_nBodySize); /*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */ /* some DEBUG code */#if 0 RTMP_LIB_AMFObject obj; int nRes = obj.Decode(packet.m_body+1, packet.m_nBodySize-1); if(nRes < 0) { RTMP_Log(RTMP_LOGERROR, "%s, error decoding AMF3 packet", __FUNCTION__); /*return; */ } obj.Dump();#endif if (HandleInvoke(r, packet->m_body + 1, packet->m_nBodySize - 1) == 1) bHasMediaPacket = 2; break; } // info数据(AMF0编码) case RTMP_PACKET_TYPE_INFO: /* metadata (notify) */ RTMP_Log(RTMP_LOGDEBUG, "%s, received: notify %u bytes", __FUNCTION__, packet->m_nBodySize); // 处理元数据 if (HandleMetadata(r, packet->m_body, packet->m_nBodySize)) bHasMediaPacket = 1; break; // 共享对象 case RTMP_PACKET_TYPE_SHARED_OBJECT: RTMP_Log(RTMP_LOGDEBUG, "%s, shared object, not supported, ignoring", __FUNCTION__); break; // 命令消息 case RTMP_PACKET_TYPE_INVOKE: /* invoke */ RTMP_Log(RTMP_LOGDEBUG, "%s, received: invoke %u bytes", __FUNCTION__, packet->m_nBodySize); /*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */ // 处理命令 if (HandleInvoke(r, packet->m_body, packet->m_nBodySize) == 1) bHasMediaPacket = 2; break; // Flash视频 case RTMP_PACKET_TYPE_FLASH_VIDEO: { /* go through FLV packets and handle metadata packets */ unsigned int pos = 0; uint32_t nTimeStamp = packet->m_nTimeStamp; while (pos + 11 < packet->m_nBodySize) { uint32_t dataSize = AMF_DecodeInt24(packet->m_body + pos + 1); /* size without header (11) and PRevTagSize (4) */ if (pos + 11 + dataSize + 4 > packet->m_nBodySize) { RTMP_Log(RTMP_LOGWARNING, "Stream corrupt?!"); break; } if (packet->m_body[pos] == 0x12) { HandleMetadata(r, packet->m_body + pos + 11, dataSize); } else if (packet->m_body[pos] == 8 || packet->m_body[pos] == 9) { nTimeStamp = AMF_DecodeInt24(packet->m_body + pos + 4); nTimeStamp |= (packet->m_body[pos + 7] << 24); } pos += (11 + dataSize + 4); } if (!r->m_pausing) r->m_mediaStamp = nTimeStamp; /* FLV tag(s) */ /*RTMP_Log(RTMP_LOGDEBUG, "%s, received: FLV tag(s) %lu bytes", __FUNCTION__, packet.m_nBodySize); */ bHasMediaPacket = 1; break; } default: RTMP_Log(RTMP_LOGDEBUG, "%s, unknown packet type received: 0x%02x", __FUNCTION__, packet->m_packetType);#ifdef _DEBUG RTMP_LogHex(RTMP_LOGDEBUG, packet->m_body, packet->m_nBodySize);#endif } return bHasMediaPacket;}特别说一下:HandleInvoke和HandleCtrl的区别:
1、HandleInvoke处理命令消息,这些命令消息是使用AMF0格式进行编码的
2、HandleCtrl处理用户控制消息,主要用于控制流的各种状态
在这些函数中我们需要关注的是HandleInvoke。
创建流
处理命令消息
终于快说道重点了,但是在说重点之前还是要看一下,创建流的时机,在函数HandleInvoke中,我们可以看到,处理完成“connect”命令的反馈之后,调用了RTMP_SendCreateStream函数,这个函数的作用就是创建流!
/*** 处理命令消息** 这些命令消息是使用AMF0格式进行编码的** 下面的代码是精简过的*/static intHandleInvoke(RTMP *r, const char *body, unsigned int nBodySize){ AMFObject obj; AVal method; double txn; int ret = 0, nRes; char pbuf[256], *pend = pbuf + sizeof(pbuf), *enc, **params = NULL; char *host = r->Link.hostname.av_len ? r->Link.hostname.av_val : ""; char *pageUrl = r->Link.pageUrl.av_len ? r->Link.pageUrl.av_val : ""; int param_count; AVal av_Command, av_Response; if (body[0] != 0x02) /* make sure it is a string method name we start with */ { RTMP_Log(RTMP_LOGWARNING, "%s, Sanity failed. no string method in invoke packet", __FUNCTION__); return 0; } nRes = AMF_Decode(&obj, body, nBodySize, FALSE); if (nRes < 0) { RTMP_Log(RTMP_LOGERROR, "%s, error decoding invoke packet", __FUNCTION__); return 0; } AMF_Dump(&obj); AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method); txn = AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1)); RTMP_Log(RTMP_LOGDEBUG, "%s, server invoking <%s>", __FUNCTION__, method.av_val); // 处理”result“ if (AVMATCH(&method, &av__result)) { AVal methodInvoked = { 0 }; int i; for (i = 0; i < r->m_numCalls; i++) { if (r->m_methodCalls[i].num == (int)txn) { methodInvoked = r->m_methodCalls[i].name; AV_erase(r->m_methodCalls, &r->m_numCalls, i, FALSE); break; } } if (!methodInvoked.av_val) { RTMP_Log(RTMP_LOGDEBUG, "%s, received result id %f without matching request", __FUNCTION__, txn); goto leave; } RTMP_Log(RTMP_LOGDEBUG, "%s, received result for method call <%s>", __FUNCTION__, methodInvoked.av_val); // ”connect“命令完成 if (AVMATCH(&methodInvoked, &av_connect)) { if (r->Link.token.av_len) { AMFObjectProperty p; if (RTMP_FindFirstMatchingProperty(&obj, &av_secureToken, &p)) { DecodeTEA(&r->Link.token, &p.p_vu.p_aval); SendSecureTokenResponse(r, &p.p_vu.p_aval); } } if (r->Link.protocol & RTMP_FEATURE_WRITE) { SendReleaseStream(r); SendFCPublish(r); } else { // 向服务器发送”确认窗口大小“ RTMP_SendServerBW(r); RTMP_SendCtrl(r, 3, 0, 300); } // 删除无关代码*** // 创建流 RTMP_SendCreateStream(r); } // 删除无关代码*** // ”创建流“命令成功 else if (AVMATCH(&methodInvoked, &av_createStream)) { r->m_stream_id = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3)); // 删除无关代码*** if (r->Link.protocol & RTMP_FEATURE_WRITE) { SendPublish(r); } else { if (r->Link.lFlags & RTMP_LF_PLST) SendPlaylist(r); // 发送播放命令 SendPlay(r); RTMP_SendCtrl(r, 3, r->m_stream_id, r->m_nBufferMS); } } else if (AVMATCH(&methodInvoked, &av_play) || AVMATCH(&methodInvoked, &av_publish)) { r->m_bPlaying = TRUE; } free(methodInvoked.av_val); } else if (AVMATCH(&method, &av_onBWDone)) { if (!r->m_nBWCheckCounter) SendCheckBW(r); } // 删除其他代码*** // *** // 删除其他代码***leave: AMF_Reset(&obj); return ret;}创建流的函数
// 发送”创建流“命令intRTMP_SendCreateStream(RTMP *r){ RTMPPacket packet; char pbuf[256], *pend = pbuf + sizeof(pbuf); char *enc; packet.m_nChannel = 0x03; /* control channel (invoke) */ packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM; packet.m_packetType = RTMP_PACKET_TYPE_INVOKE; packet.m_nTimeStamp = 0; packet.m_nInfoField2 = 0; packet.m_hasAbsTimestamp = 0; packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE; enc = packet.m_body; // 创建流的命令 enc = AMF_EncodeString(enc, pend, &av_createStream); enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes); *enc++ = AMF_NULL; /* NULL */ packet.m_nBodySize = enc - packet.m_body; // 发送数据包 return RTMP_SendPacket(r, &packet, TRUE);}
新闻热点
疑难解答