下面分析一个chunk是怎么样被发送给服务器的
// rtmp消息块 typedef struct RTMPPacket { // chunk basic header(大部分情况是一个字节) uint8_t m_headerType; // Message type ID(1-7协议控制;8,9音视频;10以后为AMF编码消息) uint8_t m_packetType; // 是否含有Extend timeStamp字段 uint8_t m_hasAbsTimestamp; /* timestamp absolute or relative? */ // channel 即 stream id字段 int m_nChannel; // 时间戳 uint32_t m_nTimeStamp; /* timestamp */ // message stream id int32_t m_nInfoField2; /* last 4 bytes in a long header */ // chunk体的长度 uint32_t m_nBodySize; uint32_t m_nBytesRead; RTMPChunk *m_chunk; // 原始rtmp消息块 char *m_body; } RTMPPacket;发送一个chunk
发送数据的函数是RTMP_SendPacket,它的流程如下:
1、判断chunk头部是不是“大”的头部,如果不是,表示它需要使用前一个chunk的头部信息,那么重新设置headerType,并获取上一个chunk的时间戳,后面需要用它来计算相对时间戳 2、根据headerType来计算basic header + chunk Msg header的长度heade_size 3、计算相对时间戳 4、根据stream id(m_nChannel)重新调整heade_size 5、如果chunk的时间戳字段大于等于0xffffff,那么表示相对时间戳字段存在,因此重新调整heade_size 6、使用headerType来构造basic header(因为basic header的长度也是1),并把它填入缓冲区中 7、开始处理chunk Msg header中的时间戳、message length、message type id、message stream id字段,先把它们编码,然后填入缓冲区中
8、处理Extend timestamp字段,如果有必要,就把对它编码,然后填入缓冲区中 9、把缓冲区的数据发送给服务器 10、如果发送的chunk是一个远程函数调用,那么需要把这个chunk填入输出的channel数组中,因为等后面服务器发送数据来的时候,需要根据这些信息来判断函数调用是否成功(一般在HandleInvoke函数中)
intRTMP_SendPacket(RTMP *r, RTMPPacket *packet, int queue){ // 同一个流中的前一个packet const RTMPPacket *PRevPacket = r->m_vecChannelsOut[packet->m_nChannel]; uint32_t last = 0; int nSize; int hSize, cSize; char *header, *hptr, *hend, hbuf[RTMP_MAX_HEADER_SIZE], c; uint32_t t; char *buffer, *tbuf = NULL, *toff = NULL; int nChunkSize; int tlen; // 如果要发送的chunk的头部不是“大”的头部,那么需要使用前一个chunk的头部信息 if (prevPacket && packet->m_headerType != RTMP_PACKET_SIZE_LARGE) { // 重新设置headerType /* compress a bit by using the prev packet's attributes */ if (prevPacket->m_nBodySize == packet->m_nBodySize && prevPacket->m_packetType == packet->m_packetType && packet->m_headerType == RTMP_PACKET_SIZE_MEDIUM) packet->m_headerType = RTMP_PACKET_SIZE_SMALL; if (prevPacket->m_nTimeStamp == packet->m_nTimeStamp && packet->m_headerType == RTMP_PACKET_SIZE_SMALL) packet->m_headerType = RTMP_PACKET_SIZE_MINIMUM; // last表示时间戳 last = prevPacket->m_nTimeStamp; } if (packet->m_headerType > 3) /* sanity */ { RTMP_Log(RTMP_LOGERROR, "sanity failed!! trying to send header of type: 0x%02x.", (unsigned char)packet->m_headerType); return FALSE; } // basic header + chunk Msg header的长度 nSize = packetSize[packet->m_headerType]; hSize = nSize; cSize = 0; // 相对时间戳 t = packet->m_nTimeStamp - last; if (packet->m_body) { // 头部位于body的前面,header指向头部的其实地址 header = packet->m_body - nSize; // hend表示头部的末尾,也是body的其实地址 hend = packet->m_body; } else { header = hbuf + 6; hend = hbuf + sizeof(hbuf); } // 根据stream id的不同进行调整 if (packet->m_nChannel > 319) cSize = 2; else if (packet->m_nChannel > 63) cSize = 1; // 调整头部的长度 if (cSize) { header -= cSize; hSize += cSize; } // 如果有Extend TimeStamp字段,把它放在头部的最后 if (nSize > 1 && t >= 0xffffff) { header -= 4; hSize += 4; } hptr = header; // 计算basic header c = packet->m_headerType << 6; switch (cSize) { case 0: c |= packet->m_nChannel; break; case 1: break; case 2: c |= 1; break; } // 把basic header放入缓冲区中 *hptr++ = c; if (cSize) { int tmp = packet->m_nChannel - 64; *hptr++ = tmp & 0xff; if (cSize == 2) *hptr++ = tmp >> 8; } // 开始处理chunk Msg header字段 // 编码时间戳字段 if (nSize > 1) { hptr = AMF_EncodeInt24(hptr, hend, t > 0xffffff ? 0xffffff : t); } // 处理message length字段 if (nSize > 4) { hptr = AMF_EncodeInt24(hptr, hend, packet->m_nBodySize); // 处理message type id *hptr++ = packet->m_packetType; } // 处理message stream id if (nSize > 8) hptr += EncodeInt32LE(hptr, packet->m_nInfoField2); // 处理extern timestamp字段 if (nSize > 1 && t >= 0xffffff) hptr = AMF_EncodeInt32(hptr, hend, t); nSize = packet->m_nBodySize; buffer = packet->m_body; nChunkSize = r->m_outChunkSize; RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d, size=%d", __FUNCTION__, r->m_sb.sb_socket, nSize); /* send all chunks in one HTTP request */ // 如果使用HTTP的方式发送 if (r->Link.protocol & RTMP_FEATURE_HTTP) { int chunks = (nSize + nChunkSize - 1) / nChunkSize; if (chunks > 1) { tlen = chunks * (cSize + 1) + nSize + hSize; tbuf = malloc(tlen); if (!tbuf) return FALSE; toff = tbuf; } } while (nSize + hSize) { int wrote; if (nSize < nChunkSize) nChunkSize = nSize; RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)header, hSize); RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)buffer, nChunkSize); if (tbuf) { memcpy(toff, header, nChunkSize + hSize); toff += nChunkSize + hSize; } else { // 发送数据 wrote = WriteN(r, header, nChunkSize + hSize); if (!wrote) return FALSE; } nSize -= nChunkSize; buffer += nChunkSize; hSize = 0; if (nSize > 0) { header = buffer - 1; hSize = 1; if (cSize) { header -= cSize; hSize += cSize; } *header = (0xc0 | c); if (cSize) { int tmp = packet->m_nChannel - 64; header[1] = tmp & 0xff; if (cSize == 2) header[2] = tmp >> 8; } } } if (tbuf) { int wrote = WriteN(r, tbuf, toff - tbuf); free(tbuf); tbuf = NULL; if (!wrote) return FALSE; } /* we invoked a remote method */ // 如果是调用一个远程的函数 if (packet->m_packetType == RTMP_PACKET_TYPE_INVOKE) { AVal method; char *ptr; ptr = packet->m_body + 1; // 对函数名编码 AMF_DecodeString(ptr, &method); RTMP_Log(RTMP_LOGDEBUG, "Invoking %s", method.av_val); /* keep it in call queue till result arrives */ // 把它放到调用队列中 if (queue) { int txn; ptr += 3 + method.av_len; txn = (int)AMF_DecodeNumber(ptr); AV_queue(&r->m_methodCalls, &r->m_numCalls, &method, txn); } } if (!r->m_vecChannelsOut[packet->m_nChannel]) r->m_vecChannelsOut[packet->m_nChannel] = malloc(sizeof(RTMPPacket)); // 把packet放进输出channel队列中 memcpy(r->m_vecChannelsOut[packet->m_nChannel], packet, sizeof(RTMPPacket)); return TRUE;}底层的写函数
WriteN
WriteN函数的功能是把指定数量的字节发送给服务器,直到所有的数据都发送完成之后才返回,否则一直阻塞 1、如果使用了HTTP,那么调用HTTP_Post函数,实际内部调用RTMPSockBuf_Send来发送数据 2、如果使用普通的套接字,那么调用RTMPSockBuf_Send函数来发送数据
static intWriteN(RTMP *r, const char *buffer, int n){ const char *ptr = buffer;#ifdef CRYPTO char *encrypted = 0; char buf[RTMP_BUFFER_CACHE_SIZE]; if (r->Link.rc4keyOut) { if (n > sizeof(buf)) encrypted = (char *)malloc(n); else encrypted = (char *)buf; ptr = encrypted; RC4_encrypt2(r->Link.rc4keyOut, n, buffer, ptr); }#endif if (r->Link.ConnectPacket) { char *ConnectPacket = malloc(r->Link.HandshakeResponse.av_len + n); memcpy(ConnectPacket, r->Link.HandshakeResponse.av_val, r->Link.HandshakeResponse.av_len); memcpy(ConnectPacket + r->Link.HandshakeResponse.av_len, ptr, n); ptr = ConnectPacket; n += r->Link.HandshakeResponse.av_len; r->Link.ConnectPacket = FALSE; } while (n > 0) { int nBytes; // 发送数据 if (r->Link.protocol & RTMP_FEATURE_HTTP) nBytes = HTTP_Post(r, RTMPT_SEND, ptr, n); else nBytes = RTMPSockBuf_Send(&r->m_sb, ptr, n); /*RTMP_Log(RTMP_LOGDEBUG, "%s: %d/n", __FUNCTION__, nBytes); */ if (nBytes < 0) { int sockerr = GetSockError(); RTMP_Log(RTMP_LOGERROR, "%s, RTMP send error %d (%d bytes)", __FUNCTION__, sockerr, n); if (sockerr == EINTR && !RTMP_ctrlC) continue; RTMP_Close(r); n = 1; break; } if (nBytes == 0) break; n -= nBytes; ptr += nBytes; }#ifdef CRYPTO if (encrypted && encrypted != buf) free(encrypted);#endif return n == 0;}RTMPSockBuf_Send
RTMPSockBuf_Send函数的作用是把数据发送给服务器,调用系统接口send来实现
intRTMPSockBuf_Send(RTMPSockBuf *sb, const char *buf, int len){ int rc;#ifdef _DEBUG fwrite(buf, 1, len, netstackdump);#endif#if defined(CRYPTO) && !defined(NO_SSL) if (sb->sb_ssl) { rc = TLS_write(sb->sb_ssl, buf, len); } else#endif { rc = send(sb->sb_socket, buf, len, 0); } return rc;}
新闻热点
疑难解答