首页 > 学院 > 开发设计 > 正文

RTMP学习(十三)rtmpdump源码阅读(7)发送数据

2019-11-06 08:35:25
字体:
来源:转载
供稿:网友

发送数据

    下面分析一个chunk是怎么样被发送给服务器的

chunk的定义

	// rtmp消息块	typedef struct RTMPPacket	{		// chunk basic header(大部分情况是一个字节)		uint8_t m_headerType;		// Message type ID(1-7协议控制;8,9音视频;10以后为AMF编码消息)		uint8_t m_packetType;		// 是否含有Extend timeStamp字段		uint8_t m_hasAbsTimestamp;	/* timestamp absolute or relative? */		// channel 即 stream id字段		int m_nChannel;		// 时间戳		uint32_t m_nTimeStamp;	/* timestamp */		// message stream id		int32_t m_nInfoField2;	/* last 4 bytes in a long header */		// chunk体的长度		uint32_t m_nBodySize;		uint32_t m_nBytesRead;		RTMPChunk *m_chunk; // 原始rtmp消息块		char *m_body;	} RTMPPacket;

发送一个chunk

    发送数据的函数是RTMP_SendPacket,它的流程如下:

    1、判断chunk头部是不是“大”的头部,如果不是,表示它需要使用前一个chunk的头部信息,那么重新设置headerType,并获取上一个chunk的时间戳,后面需要用它来计算相对时间戳    2、根据headerType来计算basic header + chunk Msg header的长度heade_size    3、计算相对时间戳    4、根据stream id(m_nChannel)重新调整heade_size    5、如果chunk的时间戳字段大于等于0xffffff,那么表示相对时间戳字段存在,因此重新调整heade_size    6、使用headerType来构造basic header(因为basic header的长度也是1),并把它填入缓冲区中    7、开始处理chunk Msg header中的时间戳、message length、message type id、message stream id字段,先把它们编码,然后填入缓冲区中

    8、处理Extend timestamp字段,如果有必要,就把对它编码,然后填入缓冲区中    9、把缓冲区的数据发送给服务器    10、如果发送的chunk是一个远程函数调用,那么需要把这个chunk填入输出的channel数组中,因为等后面服务器发送数据来的时候,需要根据这些信息来判断函数调用是否成功(一般在HandleInvoke函数中)

intRTMP_SendPacket(RTMP *r, RTMPPacket *packet, int queue){	// 同一个流中的前一个packet	const RTMPPacket *PRevPacket = r->m_vecChannelsOut[packet->m_nChannel];	uint32_t last = 0;	int nSize;	int hSize, cSize;	char *header, *hptr, *hend, hbuf[RTMP_MAX_HEADER_SIZE], c;	uint32_t t;	char *buffer, *tbuf = NULL, *toff = NULL;	int nChunkSize;	int tlen;	// 如果要发送的chunk的头部不是“大”的头部,那么需要使用前一个chunk的头部信息	if (prevPacket && packet->m_headerType != RTMP_PACKET_SIZE_LARGE)	{		// 重新设置headerType		/* compress a bit by using the prev packet's attributes */		if (prevPacket->m_nBodySize == packet->m_nBodySize			&& prevPacket->m_packetType == packet->m_packetType			&& packet->m_headerType == RTMP_PACKET_SIZE_MEDIUM)			packet->m_headerType = RTMP_PACKET_SIZE_SMALL;		if (prevPacket->m_nTimeStamp == packet->m_nTimeStamp			&& packet->m_headerType == RTMP_PACKET_SIZE_SMALL)			packet->m_headerType = RTMP_PACKET_SIZE_MINIMUM;					// last表示时间戳		last = prevPacket->m_nTimeStamp;	}	if (packet->m_headerType > 3)	/* sanity */	{		RTMP_Log(RTMP_LOGERROR, "sanity failed!! trying to send header of type: 0x%02x.",			(unsigned char)packet->m_headerType);		return FALSE;	}	// basic header + chunk Msg header的长度	nSize = packetSize[packet->m_headerType];	hSize = nSize; cSize = 0;		// 相对时间戳	t = packet->m_nTimeStamp - last;	if (packet->m_body)	{		// 头部位于body的前面,header指向头部的其实地址		header = packet->m_body - nSize;		// hend表示头部的末尾,也是body的其实地址		hend = packet->m_body;	}	else	{		header = hbuf + 6;		hend = hbuf + sizeof(hbuf);	}	// 根据stream id的不同进行调整	if (packet->m_nChannel > 319)		cSize = 2;	else if (packet->m_nChannel > 63)		cSize = 1;		// 调整头部的长度	if (cSize)	{		header -= cSize;		hSize += cSize;	}	// 如果有Extend TimeStamp字段,把它放在头部的最后	if (nSize > 1 && t >= 0xffffff)	{		header -= 4;		hSize += 4;	}	hptr = header;		// 计算basic header	c = packet->m_headerType << 6;	switch (cSize)	{	case 0:		c |= packet->m_nChannel;		break;	case 1:		break;	case 2:		c |= 1;		break;	}		// 把basic header放入缓冲区中	*hptr++ = c;	if (cSize)	{		int tmp = packet->m_nChannel - 64;		*hptr++ = tmp & 0xff;		if (cSize == 2)			*hptr++ = tmp >> 8;	}	// 开始处理chunk Msg header字段		// 编码时间戳字段	if (nSize > 1)	{		hptr = AMF_EncodeInt24(hptr, hend, t > 0xffffff ? 0xffffff : t);	}	// 处理message length字段	if (nSize > 4)	{		hptr = AMF_EncodeInt24(hptr, hend, packet->m_nBodySize);				// 处理message type id		*hptr++ = packet->m_packetType;	}	// 处理message stream id	if (nSize > 8)		hptr += EncodeInt32LE(hptr, packet->m_nInfoField2);	// 处理extern timestamp字段	if (nSize > 1 && t >= 0xffffff)		hptr = AMF_EncodeInt32(hptr, hend, t);	nSize = packet->m_nBodySize;	buffer = packet->m_body;	nChunkSize = r->m_outChunkSize;	RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d, size=%d", __FUNCTION__, r->m_sb.sb_socket,		nSize);	/* send all chunks in one HTTP request */	// 如果使用HTTP的方式发送	if (r->Link.protocol & RTMP_FEATURE_HTTP)	{		int chunks = (nSize + nChunkSize - 1) / nChunkSize;		if (chunks > 1)		{			tlen = chunks * (cSize + 1) + nSize + hSize;			tbuf = malloc(tlen);			if (!tbuf)				return FALSE;			toff = tbuf;		}	}	while (nSize + hSize)	{		int wrote;		if (nSize < nChunkSize)			nChunkSize = nSize;		RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)header, hSize);		RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)buffer, nChunkSize);		if (tbuf)		{			memcpy(toff, header, nChunkSize + hSize);			toff += nChunkSize + hSize;		}		else		{			// 发送数据			wrote = WriteN(r, header, nChunkSize + hSize);			if (!wrote)				return FALSE;		}		nSize -= nChunkSize;		buffer += nChunkSize;		hSize = 0;		if (nSize > 0)		{			header = buffer - 1;			hSize = 1;			if (cSize)			{				header -= cSize;				hSize += cSize;			}			*header = (0xc0 | c);			if (cSize)			{				int tmp = packet->m_nChannel - 64;				header[1] = tmp & 0xff;				if (cSize == 2)					header[2] = tmp >> 8;			}		}	}	if (tbuf)	{		int wrote = WriteN(r, tbuf, toff - tbuf);		free(tbuf);		tbuf = NULL;		if (!wrote)			return FALSE;	}	/* we invoked a remote method */	// 如果是调用一个远程的函数	if (packet->m_packetType == RTMP_PACKET_TYPE_INVOKE)	{		AVal method;		char *ptr;		ptr = packet->m_body + 1;		// 对函数名编码		AMF_DecodeString(ptr, &method);		RTMP_Log(RTMP_LOGDEBUG, "Invoking %s", method.av_val);		/* keep it in call queue till result arrives */		// 把它放到调用队列中		if (queue) {			int txn;			ptr += 3 + method.av_len;			txn = (int)AMF_DecodeNumber(ptr);			AV_queue(&r->m_methodCalls, &r->m_numCalls, &method, txn);		}	}	if (!r->m_vecChannelsOut[packet->m_nChannel])		r->m_vecChannelsOut[packet->m_nChannel] = malloc(sizeof(RTMPPacket));		// 把packet放进输出channel队列中	memcpy(r->m_vecChannelsOut[packet->m_nChannel], packet, sizeof(RTMPPacket));	return TRUE;}

底层的写函数

WriteN

    WriteN函数的功能是把指定数量的字节发送给服务器,直到所有的数据都发送完成之后才返回,否则一直阻塞    1、如果使用了HTTP,那么调用HTTP_Post函数,实际内部调用RTMPSockBuf_Send来发送数据    2、如果使用普通的套接字,那么调用RTMPSockBuf_Send函数来发送数据

static intWriteN(RTMP *r, const char *buffer, int n){	const char *ptr = buffer;#ifdef CRYPTO	char *encrypted = 0;	char buf[RTMP_BUFFER_CACHE_SIZE];	if (r->Link.rc4keyOut)	{		if (n > sizeof(buf))			encrypted = (char *)malloc(n);		else			encrypted = (char *)buf;		ptr = encrypted;		RC4_encrypt2(r->Link.rc4keyOut, n, buffer, ptr);	}#endif	if (r->Link.ConnectPacket)	{		char *ConnectPacket = malloc(r->Link.HandshakeResponse.av_len + n);		memcpy(ConnectPacket, r->Link.HandshakeResponse.av_val, r->Link.HandshakeResponse.av_len);		memcpy(ConnectPacket + r->Link.HandshakeResponse.av_len, ptr, n);		ptr = ConnectPacket;		n += r->Link.HandshakeResponse.av_len;		r->Link.ConnectPacket = FALSE;	}	while (n > 0)	{		int nBytes;		// 发送数据		if (r->Link.protocol & RTMP_FEATURE_HTTP)			nBytes = HTTP_Post(r, RTMPT_SEND, ptr, n);		else			nBytes = RTMPSockBuf_Send(&r->m_sb, ptr, n);		/*RTMP_Log(RTMP_LOGDEBUG, "%s: %d/n", __FUNCTION__, nBytes); */		if (nBytes < 0)		{			int sockerr = GetSockError();			RTMP_Log(RTMP_LOGERROR, "%s, RTMP send error %d (%d bytes)", __FUNCTION__,				sockerr, n);			if (sockerr == EINTR && !RTMP_ctrlC)				continue;			RTMP_Close(r);			n = 1;			break;		}		if (nBytes == 0)			break;		n -= nBytes;		ptr += nBytes;	}#ifdef CRYPTO	if (encrypted && encrypted != buf)		free(encrypted);#endif	return n == 0;}

RTMPSockBuf_Send

    RTMPSockBuf_Send函数的作用是把数据发送给服务器,调用系统接口send来实现

intRTMPSockBuf_Send(RTMPSockBuf *sb, const char *buf, int len){	int rc;#ifdef _DEBUG	fwrite(buf, 1, len, netstackdump);#endif#if defined(CRYPTO) && !defined(NO_SSL)	if (sb->sb_ssl)	{		rc = TLS_write(sb->sb_ssl, buf, len);	}	else#endif	{		rc = send(sb->sb_socket, buf, len, 0);	}	return rc;}


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表