这一篇很重要,下面将结合chunk的结构来分析,数据是怎么样从网络上读取的,以及如何解析它们。
// rtmp消息块 typedef struct RTMPPacket { // chunk basic header(大部分情况是一个字节) uint8_t m_headerType; // Message type ID(1-7协议控制;8,9音视频;10以后为AMF编码消息) uint8_t m_packetType; // 是否含有Extend timeStamp字段 uint8_t m_hasAbsTimestamp; /* timestamp absolute or relative? */ // channel 即 stream id字段 int m_nChannel; // 时间戳 uint32_t m_nTimeStamp; /* timestamp */ // message stream id int32_t m_nInfoField2; /* last 4 bytes in a long header */ // chunk体的长度 uint32_t m_nBodySize; uint32_t m_nBytesRead; RTMPChunk *m_chunk; // 原始rtmp消息块 char *m_body; } RTMPPacket;其他更多信息 http://blog.csdn.net/nb_vol_1/article/details/58603868
读取一个chunk
入口函数是RTMP_ReadPacket,它的运行流程如下:
1、读取一个字节,这个字节代表chunk basic header(大部分情况下,basic header只占用一个字节) 2、解析basic header的fmt字段(m_headerType)和stream id字段(m_nChannel),其中fmt字段占2bit,stream id字段占6bit 3、计算调整stream id字段 4、根据fmt字段(m_headerType)计算chunk basic header+ chunk Msg header的长度(通过查表得到),下面是计算规则: fmt=0,那么chunk Msg header长度是11 fmt=1,那么chunk Msg header长度是7 fmt=2,那么chunk Msg header长度是3 fmt=3,那么chunk Msg header长度是0 5、如果chunk basic header+ chunk Msg header的长度等于12字节,那么表示这是一个”大“的头部,所有的chunk信息都可以从网络上读取得,到并且存在Extend timestamp字段 6、如果chunk basic header+ chunk Msg header的长度小于12字节,那么表示它不是一个”大“的头部,有一些chunk信息不能从网络上读取到,这些不能读取到的信息从上一个chunk中得到。 7、读取chunk Msg header 8、根据chunk Msg header的长度,分别解析出时间戳、message length、message type id、message stream id等字段 9、如果时间戳字段等于0xffffff,那么表示存在Extend timestamp字段,从网上读取它 10、读取chunk body 11、其他的一些操作!
intRTMP_ReadPacket(RTMP *r, RTMPPacket *packet){ uint8_t hbuf[RTMP_MAX_HEADER_SIZE] = { 0 }; char *header = (char *)hbuf; int nSize, hSize, nToRead, nChunk; int didAlloc = FALSE; RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d", __FUNCTION__, r->m_sb.sb_socket); // 先读取一个字节,这一个字节是chunk basic header if (ReadN(r, (char *)hbuf, 1) == 0) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header", __FUNCTION__); return FALSE; } // chunk basic header中的fmt字段,2bit packet->m_headerType = (hbuf[0] & 0xc0) >> 6; // chunk basic header中的stream id字段,6bit packet->m_nChannel = (hbuf[0] & 0x3f); header++; if (packet->m_nChannel == 0) { if (ReadN(r, (char *)&hbuf[1], 1) != 1) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 2nd byte", __FUNCTION__); return FALSE; } packet->m_nChannel = hbuf[1]; packet->m_nChannel += 64; header++; } else if (packet->m_nChannel == 1) { int tmp; if (ReadN(r, (char *)&hbuf[1], 2) != 2) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 3nd byte", __FUNCTION__); return FALSE; } tmp = (hbuf[2] << 8) + hbuf[1]; packet->m_nChannel = tmp + 64; RTMP_Log(RTMP_LOGDEBUG, "%s, m_nChannel: %0x", __FUNCTION__, packet->m_nChannel); header += 2; } // 根据basic header的fmt字段计算basic header + chunk msg header的长度 nSize = packetSize[packet->m_headerType]; // RTMP_LARGE_HEADER_SIZE 是完整的basic header 加上 chunk Msg header的长度 // 如果是”大“的头部 if (nSize == RTMP_LARGE_HEADER_SIZE) /* if we get a full header the timestamp is absolute */ packet->m_hasAbsTimestamp = TRUE; // 如果不是”大“的头部,那么复用上一个chunk的头部信息 else if (nSize < RTMP_LARGE_HEADER_SIZE) { /* using values from the last message of this channel */ if (r->m_vecChannelsIn[packet->m_nChannel]) memcpy(packet, r->m_vecChannelsIn[packet->m_nChannel], sizeof(RTMPPacket)); } // 减去1,表示chunk msg header的长度 nSize--; // 读取chunk msg header if (nSize > 0 && ReadN(r, header, nSize) != nSize) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header. type: %x", __FUNCTION__, (unsigned int)hbuf[0]); return FALSE; } hSize = nSize + (header - (char *)hbuf); // 如果chunk Msg header的长度大于3字节,表示存在Timestamp字段 if (nSize >= 3) { // 得到时间戳字段,占用3字节 packet->m_nTimeStamp = AMF_DecodeInt24(header); /*RTMP_Log(RTMP_LOGDEBUG, "%s, reading RTMP packet chunk on channel %x, headersz %i, timestamp %i, abs timestamp %i", __FUNCTION__, packet.m_nChannel, nSize, packet.m_nTimeStamp, packet.m_hasAbsTimestamp); */ if (nSize >= 6) { // 得到message的长度 packet->m_nBodySize = AMF_DecodeInt24(header + 3); packet->m_nBytesRead = 0; // 清空旧的数据 RTMPPacket_Free(packet); if (nSize > 6) { // 这里表示message type id packet->m_packetType = header[6]; // message stream id字段 if (nSize == 11) packet->m_nInfoField2 = DecodeInt32LE(header + 7); } } // 如果时间戳字段等于0xffffff,那么表示存在扩展时间戳字段 if (packet->m_nTimeStamp == 0xffffff) { // 读取扩展时间戳字段 if (ReadN(r, header + nSize, 4) != 4) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read extended timestamp", __FUNCTION__); return FALSE; } packet->m_nTimeStamp = AMF_DecodeInt32(header + nSize); hSize += 4; } } RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)hbuf, hSize); if (packet->m_nBodySize > 0 && packet->m_body == NULL) { if (!RTMPPacket_Alloc(packet, packet->m_nBodySize)) { RTMP_Log(RTMP_LOGDEBUG, "%s, failed to allocate packet", __FUNCTION__); return FALSE; } didAlloc = TRUE; packet->m_headerType = (hbuf[0] & 0xc0) >> 6; } nToRead = packet->m_nBodySize - packet->m_nBytesRead; nChunk = r->m_inChunkSize; if (nToRead < nChunk) nChunk = nToRead; /* Does the caller want the raw chunk? */ if (packet->m_chunk) { packet->m_chunk->c_headerSize = hSize; memcpy(packet->m_chunk->c_header, hbuf, hSize); packet->m_chunk->c_chunk = packet->m_body + packet->m_nBytesRead; packet->m_chunk->c_chunkSize = nChunk; } if (ReadN(r, packet->m_body + packet->m_nBytesRead, nChunk) != nChunk) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet body. len: %u", __FUNCTION__, packet->m_nBodySize); return FALSE; } RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)packet->m_body + packet->m_nBytesRead, nChunk); packet->m_nBytesRead += nChunk; /* keep the packet as ref for other packets on this channel */ if (!r->m_vecChannelsIn[packet->m_nChannel]) r->m_vecChannelsIn[packet->m_nChannel] = malloc(sizeof(RTMPPacket)); memcpy(r->m_vecChannelsIn[packet->m_nChannel], packet, sizeof(RTMPPacket)); if (RTMPPacket_IsReady(packet)) { /* make packet's timestamp absolute */ if (!packet->m_hasAbsTimestamp) packet->m_nTimeStamp += r->m_channelTimestamp[packet->m_nChannel]; /* timestamps seem to be always relative!! */ r->m_channelTimestamp[packet->m_nChannel] = packet->m_nTimeStamp; /* reset the data from the stored packet. we keep the header since we may use it later if a new packet for this channel */ /* arrives and requests to re-use some info (small packet header) */ r->m_vecChannelsIn[packet->m_nChannel]->m_body = NULL; r->m_vecChannelsIn[packet->m_nChannel]->m_nBytesRead = 0; r->m_vecChannelsIn[packet->m_nChannel]->m_hasAbsTimestamp = FALSE; /* can only be false if we reuse header */ } else { packet->m_body = NULL; /* so it won't be erased on free */ } return TRUE;}底层的读函数
ReadN
ReadN的作用是读取指定数量的字节,全部读取完毕才返回,否则就一直阻塞 它的操作如下: 1、如果套接字使用了HTTP进行封装,那么先发送HTTP请求,然后调用RTMPSockBuf_Fill函数读取数据 2、如果套接字没有经过包装,那么直接调用RTMPSockBuf_Fill函数读取数据
static intReadN(RTMP *r, char *buffer, int n){ int nOriginalSize = n; int avail; char *ptr; r->m_sb.sb_timedout = FALSE;#ifdef _DEBUG memset(buffer, 0, n);#endif ptr = buffer; while (n > 0) { int nBytes = 0, nRead; // 如果底层套接字使用了HTTP进行包装 if (r->Link.PRotocol & RTMP_FEATURE_HTTP) { while (!r->m_resplen) { if (r->m_sb.sb_size < 144) { // 先用HTTP向服务器进行请求:post if (!r->m_unackd) HTTP_Post(r, RTMPT_IDLE, "", 1); // 读取数据 if (RTMPSockBuf_Fill(&r->m_sb) < 1) { if (!r->m_sb.sb_timedout) RTMP_Close(r); return 0; } } // 读取HTTP的反馈 if (HTTP_read(r, 0) == -1) { RTMP_Log(RTMP_LOGDEBUG, "%s, No valid HTTP response found", __FUNCTION__); RTMP_Close(r); return 0; } } if (r->m_resplen && (r->m_sb.sb_size < r->m_resplen)) if (RTMPSockBuf_Fill(&r->m_sb) < 0) if (!r->m_sb.sb_timedout) RTMP_Close(r); avail = r->m_sb.sb_size; if (avail > r->m_resplen) avail = r->m_resplen; } // 如果底层使用原始的套接字(tcp) else { avail = r->m_sb.sb_size; if (avail == 0) { // 直接使用套接字读取数据 if (RTMPSockBuf_Fill(&r->m_sb) < 1) { if (!r->m_sb.sb_timedout) RTMP_Close(r); return 0; } avail = r->m_sb.sb_size; } } nRead = ((n < avail) ? n : avail); if (nRead > 0) { memcpy(ptr, r->m_sb.sb_start, nRead); r->m_sb.sb_start += nRead; r->m_sb.sb_size -= nRead; nBytes = nRead; r->m_nBytesIn += nRead; if (r->m_bSendCounter && r->m_nBytesIn > (r->m_nBytesInSent + r->m_nClientBW / 10)) if (!SendBytesReceived(r)) return FALSE; } /*RTMP_Log(RTMP_LOGDEBUG, "%s: %d bytes/n", __FUNCTION__, nBytes); */#ifdef _DEBUG fwrite(ptr, 1, nBytes, netstackdump_read);#endif if (nBytes == 0) { RTMP_Log(RTMP_LOGDEBUG, "%s, RTMP socket closed by peer", __FUNCTION__); /*goto again; */ RTMP_Close(r); break; } if (r->Link.protocol & RTMP_FEATURE_HTTP) r->m_resplen -= nBytes;#ifdef CRYPTO if (r->Link.rc4keyIn) { RC4_encrypt(r->Link.rc4keyIn, nBytes, ptr); }#endif n -= nBytes; ptr += nBytes; } return nOriginalSize - n;}RTMPSockBuf_Fill
RTMPSockBuf_Fill的作用是从网上读取数据,填充到套接字缓冲区中,只要读取到数据就可以返回,不必等到所有数据都读取完成,返回值是读取到的字节数
intRTMPSockBuf_Fill(RTMPSockBuf *sb){ int nBytes; if (!sb->sb_size) sb->sb_start = sb->sb_buf; while (1) { // 计算还需要读取的字节数 nBytes = sizeof(sb->sb_buf) - sb->sb_size - (sb->sb_start - sb->sb_buf);#if defined(CRYPTO) && !defined(NO_SSL) if (sb->sb_ssl) { nBytes = TLS_read(sb->sb_ssl, sb->sb_start + sb->sb_size, nBytes); } else#endif { // 读取数据 nBytes = recv(sb->sb_socket, sb->sb_start + sb->sb_size, nBytes, 0); } // 读取成功 if (nBytes != -1) { // 计算已经读取到的字节数 sb->sb_size += nBytes; } // 读取失败,出错! else { int sockerr = GetSockError(); RTMP_Log(RTMP_LOGDEBUG, "%s, recv returned %d. GetSockError(): %d (%s)", __FUNCTION__, nBytes, sockerr, strerror(sockerr)); if (sockerr == EINTR && !RTMP_ctrlC) continue; if (sockerr == EWOULDBLOCK || sockerr == EAGAIN) { sb->sb_timedout = TRUE; nBytes = 0; } } break; } return nBytes;}
新闻热点
疑难解答