首页 > 学院 > 开发设计 > 正文

RTMP学习(十二)rtmpdump源码阅读(6)数据接收

2019-11-06 08:38:22
字体:
来源:转载
供稿:网友

数据接收

    这一篇很重要,下面将结合chunk的结构来分析,数据是怎么样从网络上读取的,以及如何解析它们。

chunk的结构

	// rtmp消息块	typedef struct RTMPPacket	{		// chunk basic header(大部分情况是一个字节)		uint8_t m_headerType;		// Message type ID(1-7协议控制;8,9音视频;10以后为AMF编码消息)		uint8_t m_packetType;		// 是否含有Extend timeStamp字段		uint8_t m_hasAbsTimestamp;	/* timestamp absolute or relative? */		// channel 即 stream id字段		int m_nChannel;		// 时间戳		uint32_t m_nTimeStamp;	/* timestamp */		// message stream id		int32_t m_nInfoField2;	/* last 4 bytes in a long header */		// chunk体的长度		uint32_t m_nBodySize;		uint32_t m_nBytesRead;		RTMPChunk *m_chunk; // 原始rtmp消息块		char *m_body;	} RTMPPacket;

其他更多信息 http://blog.csdn.net/nb_vol_1/article/details/58603868

读取一个chunk

    入口函数是RTMP_ReadPacket,它的运行流程如下:

    1、读取一个字节,这个字节代表chunk basic header(大部分情况下,basic header只占用一个字节)    2、解析basic header的fmt字段(m_headerType)和stream id字段(m_nChannel),其中fmt字段占2bit,stream id字段占6bit    3、计算调整stream id字段    4、根据fmt字段(m_headerType)计算chunk basic header+ chunk Msg header的长度(通过查表得到),下面是计算规则:                fmt=0,那么chunk Msg header长度是11                fmt=1,那么chunk Msg header长度是7                fmt=2,那么chunk Msg header长度是3                fmt=3,那么chunk Msg header长度是0    5、如果chunk basic header+ chunk Msg header的长度等于12字节,那么表示这是一个”大“的头部,所有的chunk信息都可以从网络上读取得,到并且存在Extend timestamp字段    6、如果chunk basic header+ chunk Msg header的长度小于12字节,那么表示它不是一个”大“的头部,有一些chunk信息不能从网络上读取到,这些不能读取到的信息从上一个chunk中得到。    7、读取chunk Msg header    8、根据chunk Msg header的长度,分别解析出时间戳、message length、message type id、message stream id等字段    9、如果时间戳字段等于0xffffff,那么表示存在Extend timestamp字段,从网上读取它    10、读取chunk body    11、其他的一些操作!

intRTMP_ReadPacket(RTMP *r, RTMPPacket *packet){	uint8_t hbuf[RTMP_MAX_HEADER_SIZE] = { 0 };	char *header = (char *)hbuf;	int nSize, hSize, nToRead, nChunk;	int didAlloc = FALSE;	RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d", __FUNCTION__, r->m_sb.sb_socket);	// 先读取一个字节,这一个字节是chunk basic header	if (ReadN(r, (char *)hbuf, 1) == 0)	{		RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header", __FUNCTION__);		return FALSE;	}	// chunk basic header中的fmt字段,2bit	packet->m_headerType = (hbuf[0] & 0xc0) >> 6;		// chunk basic header中的stream id字段,6bit	packet->m_nChannel = (hbuf[0] & 0x3f);	header++;	if (packet->m_nChannel == 0)	{		if (ReadN(r, (char *)&hbuf[1], 1) != 1)		{			RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 2nd byte",				__FUNCTION__);			return FALSE;		}		packet->m_nChannel = hbuf[1];		packet->m_nChannel += 64;		header++;	}	else if (packet->m_nChannel == 1)	{		int tmp;		if (ReadN(r, (char *)&hbuf[1], 2) != 2)		{			RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 3nd byte",				__FUNCTION__);			return FALSE;		}		tmp = (hbuf[2] << 8) + hbuf[1];		packet->m_nChannel = tmp + 64;		RTMP_Log(RTMP_LOGDEBUG, "%s, m_nChannel: %0x", __FUNCTION__, packet->m_nChannel);		header += 2;	}	// 根据basic header的fmt字段计算basic header + chunk msg header的长度	nSize = packetSize[packet->m_headerType];	// RTMP_LARGE_HEADER_SIZE 是完整的basic header 加上 chunk Msg header的长度	// 如果是”大“的头部	if (nSize == RTMP_LARGE_HEADER_SIZE)	/* if we get a full header the timestamp is absolute */		packet->m_hasAbsTimestamp = TRUE;	// 如果不是”大“的头部,那么复用上一个chunk的头部信息	else if (nSize < RTMP_LARGE_HEADER_SIZE)	{				/* using values from the last message of this channel */		if (r->m_vecChannelsIn[packet->m_nChannel])			memcpy(packet, r->m_vecChannelsIn[packet->m_nChannel],			sizeof(RTMPPacket));	}	// 减去1,表示chunk msg header的长度	nSize--;	// 读取chunk msg header	if (nSize > 0 && ReadN(r, header, nSize) != nSize)	{		RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header. type: %x",			__FUNCTION__, (unsigned int)hbuf[0]);		return FALSE;	}	hSize = nSize + (header - (char *)hbuf);	// 如果chunk Msg header的长度大于3字节,表示存在Timestamp字段	if (nSize >= 3)	{		// 得到时间戳字段,占用3字节		packet->m_nTimeStamp = AMF_DecodeInt24(header);		/*RTMP_Log(RTMP_LOGDEBUG, "%s, reading RTMP packet chunk on channel %x, headersz %i, timestamp %i, abs timestamp %i", __FUNCTION__, packet.m_nChannel, nSize, packet.m_nTimeStamp, packet.m_hasAbsTimestamp); */		if (nSize >= 6)		{			// 得到message的长度			packet->m_nBodySize = AMF_DecodeInt24(header + 3);			packet->m_nBytesRead = 0;						// 清空旧的数据			RTMPPacket_Free(packet);			if (nSize > 6)			{				// 这里表示message type id				packet->m_packetType = header[6];				// message stream id字段				if (nSize == 11)					packet->m_nInfoField2 = DecodeInt32LE(header + 7);			}		}		// 如果时间戳字段等于0xffffff,那么表示存在扩展时间戳字段		if (packet->m_nTimeStamp == 0xffffff)		{			// 读取扩展时间戳字段			if (ReadN(r, header + nSize, 4) != 4)			{				RTMP_Log(RTMP_LOGERROR, "%s, failed to read extended timestamp",					__FUNCTION__);				return FALSE;			}			packet->m_nTimeStamp = AMF_DecodeInt32(header + nSize);			hSize += 4;		}	}	RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)hbuf, hSize);	if (packet->m_nBodySize > 0 && packet->m_body == NULL)	{		if (!RTMPPacket_Alloc(packet, packet->m_nBodySize))		{			RTMP_Log(RTMP_LOGDEBUG, "%s, failed to allocate packet", __FUNCTION__);			return FALSE;		}		didAlloc = TRUE;		packet->m_headerType = (hbuf[0] & 0xc0) >> 6;	}	nToRead = packet->m_nBodySize - packet->m_nBytesRead;	nChunk = r->m_inChunkSize;	if (nToRead < nChunk)		nChunk = nToRead;	/* Does the caller want the raw chunk? */	if (packet->m_chunk)	{		packet->m_chunk->c_headerSize = hSize;		memcpy(packet->m_chunk->c_header, hbuf, hSize);		packet->m_chunk->c_chunk = packet->m_body + packet->m_nBytesRead;		packet->m_chunk->c_chunkSize = nChunk;	}	if (ReadN(r, packet->m_body + packet->m_nBytesRead, nChunk) != nChunk)	{		RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet body. len: %u",			__FUNCTION__, packet->m_nBodySize);		return FALSE;	}	RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)packet->m_body + packet->m_nBytesRead, nChunk);	packet->m_nBytesRead += nChunk;	/* keep the packet as ref for other packets on this channel */	if (!r->m_vecChannelsIn[packet->m_nChannel])		r->m_vecChannelsIn[packet->m_nChannel] = malloc(sizeof(RTMPPacket));	memcpy(r->m_vecChannelsIn[packet->m_nChannel], packet, sizeof(RTMPPacket));	if (RTMPPacket_IsReady(packet))	{		/* make packet's timestamp absolute */		if (!packet->m_hasAbsTimestamp)			packet->m_nTimeStamp += r->m_channelTimestamp[packet->m_nChannel];	/* timestamps seem to be always relative!! */		r->m_channelTimestamp[packet->m_nChannel] = packet->m_nTimeStamp;		/* reset the data from the stored packet. we keep the header since we may use it later if a new packet for this channel */		/* arrives and requests to re-use some info (small packet header) */		r->m_vecChannelsIn[packet->m_nChannel]->m_body = NULL;		r->m_vecChannelsIn[packet->m_nChannel]->m_nBytesRead = 0;		r->m_vecChannelsIn[packet->m_nChannel]->m_hasAbsTimestamp = FALSE;	/* can only be false if we reuse header */	}	else	{		packet->m_body = NULL;	/* so it won't be erased on free */	}	return TRUE;}

底层的读函数

ReadN

    ReadN的作用是读取指定数量的字节,全部读取完毕才返回,否则就一直阻塞    它的操作如下:        1、如果套接字使用了HTTP进行封装,那么先发送HTTP请求,然后调用RTMPSockBuf_Fill函数读取数据        2、如果套接字没有经过包装,那么直接调用RTMPSockBuf_Fill函数读取数据

static intReadN(RTMP *r, char *buffer, int n){	int nOriginalSize = n;	int avail;	char *ptr;	r->m_sb.sb_timedout = FALSE;#ifdef _DEBUG	memset(buffer, 0, n);#endif	ptr = buffer;	while (n > 0)	{		int nBytes = 0, nRead;				// 如果底层套接字使用了HTTP进行包装		if (r->Link.PRotocol & RTMP_FEATURE_HTTP)		{			while (!r->m_resplen)			{				if (r->m_sb.sb_size < 144)				{					// 先用HTTP向服务器进行请求:post					if (!r->m_unackd)						HTTP_Post(r, RTMPT_IDLE, "", 1);										// 读取数据					if (RTMPSockBuf_Fill(&r->m_sb) < 1)					{						if (!r->m_sb.sb_timedout)							RTMP_Close(r);						return 0;					}				}								// 读取HTTP的反馈				if (HTTP_read(r, 0) == -1)				{					RTMP_Log(RTMP_LOGDEBUG, "%s, No valid HTTP response found", __FUNCTION__);					RTMP_Close(r);					return 0;				}			}			if (r->m_resplen && (r->m_sb.sb_size < r->m_resplen))				if (RTMPSockBuf_Fill(&r->m_sb) < 0)					if (!r->m_sb.sb_timedout)						RTMP_Close(r);			avail = r->m_sb.sb_size;			if (avail > r->m_resplen)				avail = r->m_resplen;		}		// 如果底层使用原始的套接字(tcp)		else		{			avail = r->m_sb.sb_size;			if (avail == 0)			{				// 直接使用套接字读取数据				if (RTMPSockBuf_Fill(&r->m_sb) < 1)				{					if (!r->m_sb.sb_timedout)						RTMP_Close(r);					return 0;				}				avail = r->m_sb.sb_size;			}		}		nRead = ((n < avail) ? n : avail);		if (nRead > 0)		{			memcpy(ptr, r->m_sb.sb_start, nRead);			r->m_sb.sb_start += nRead;			r->m_sb.sb_size -= nRead;			nBytes = nRead;			r->m_nBytesIn += nRead;			if (r->m_bSendCounter && r->m_nBytesIn > (r->m_nBytesInSent + r->m_nClientBW / 10))				if (!SendBytesReceived(r))					return FALSE;		}		/*RTMP_Log(RTMP_LOGDEBUG, "%s: %d bytes/n", __FUNCTION__, nBytes); */#ifdef _DEBUG		fwrite(ptr, 1, nBytes, netstackdump_read);#endif		if (nBytes == 0)		{			RTMP_Log(RTMP_LOGDEBUG, "%s, RTMP socket closed by peer", __FUNCTION__);			/*goto again; */			RTMP_Close(r);			break;		}		if (r->Link.protocol & RTMP_FEATURE_HTTP)			r->m_resplen -= nBytes;#ifdef CRYPTO		if (r->Link.rc4keyIn)		{			RC4_encrypt(r->Link.rc4keyIn, nBytes, ptr);		}#endif		n -= nBytes;		ptr += nBytes;	}	return nOriginalSize - n;}

RTMPSockBuf_Fill

    RTMPSockBuf_Fill的作用是从网上读取数据,填充到套接字缓冲区中,只要读取到数据就可以返回,不必等到所有数据都读取完成,返回值是读取到的字节数

intRTMPSockBuf_Fill(RTMPSockBuf *sb){	int nBytes;	if (!sb->sb_size)		sb->sb_start = sb->sb_buf;	while (1)	{		// 计算还需要读取的字节数		nBytes = sizeof(sb->sb_buf) - sb->sb_size - (sb->sb_start - sb->sb_buf);#if defined(CRYPTO) && !defined(NO_SSL)		if (sb->sb_ssl)		{			nBytes = TLS_read(sb->sb_ssl, sb->sb_start + sb->sb_size, nBytes);		}		else#endif		{			// 读取数据			nBytes = recv(sb->sb_socket, sb->sb_start + sb->sb_size, nBytes, 0);		}				// 读取成功		if (nBytes != -1)		{			// 计算已经读取到的字节数			sb->sb_size += nBytes;		}		// 读取失败,出错!		else		{			int sockerr = GetSockError();			RTMP_Log(RTMP_LOGDEBUG, "%s, recv returned %d. GetSockError(): %d (%s)",				__FUNCTION__, nBytes, sockerr, strerror(sockerr));			if (sockerr == EINTR && !RTMP_ctrlC)				continue;			if (sockerr == EWOULDBLOCK || sockerr == EAGAIN)			{				sb->sb_timedout = TRUE;				nBytes = 0;			}		}		break;	}	return nBytes;}


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表