首页 > 学院 > 开发设计 > 正文

RTMP学习(十)rtmpdump源码阅读(5)创建流

2019-11-06 08:42:35
字体:
来源:转载
供稿:网友

创建流连接

    首先要知道创建流的时机是在创建完网络连接之后,那么什么时候创建完网络连接呢?答案是在函数HandleInvoke中。因此可以推断,创建流的具体操作也是在这个函数里面!

入口函数

    建立流连接的函数是RTMP_ConnectStream,它的功能是在播放开始之前不断的读取数据包,然后分析数据包的内容,如果有必要的的话就进行解析和处理。

intRTMP_ConnectStream(RTMP *r, int seekTime){	RTMPPacket packet = { 0 };	/* seekTime was already set by SetupStream / SetupURL.	 * This is only needed by ReconnectStream.	 */	if (seekTime > 0)		r->Link.seekTime = seekTime;	r->m_mediaChannel = 0;	// 还没开始播放 而且 套接字连接已经建立 而且 读取数据成功	while (!r->m_bPlaying && RTMP_IsConnected(r) && RTMP_ReadPacket(r, &packet))	{		if (RTMPPacket_IsReady(&packet))		{			if (!packet.m_nBodySize)				continue;			if ((packet.m_packetType == RTMP_PACKET_TYPE_AUDIO) ||				(packet.m_packetType == RTMP_PACKET_TYPE_VIDEO) ||				(packet.m_packetType == RTMP_PACKET_TYPE_INFO))			{				RTMP_Log(RTMP_LOGWARNING, "Received FLV packet before play()! Ignoring.");				RTMPPacket_Free(&packet);				continue;			}			// 处理数据包			RTMP_ClientPacket(r, &packet);			RTMPPacket_Free(&packet);		}	}	return r->m_bPlaying;}

    剩下的就不多说了,RTMP_ConnectStream调用RTMP_ClientPacket来处理服务器发送来的数据包,根据类型的不同调用不同的函数。

/*** 解析服务器发送来的packet*/intRTMP_ClientPacket(RTMP *r, RTMPPacket *packet){	int bHasMediaPacket = 0;	switch (packet->m_packetType)	{		// 设置块大小	case RTMP_PACKET_TYPE_CHUNK_SIZE:		/* chunk size */		HandleChangeChunkSize(r, packet);		break;		// 报告	case RTMP_PACKET_TYPE_BYTES_READ_REPORT:		/* bytes read report */		RTMP_Log(RTMP_LOGDEBUG, "%s, received: bytes read report", __FUNCTION__);		break;		// 用户的控制命令	case RTMP_PACKET_TYPE_CONTROL:		/* ctrl */		HandleCtrl(r, packet);		break;		// 确认窗口大小 Window Acknowledgement Size	case RTMP_PACKET_TYPE_SERVER_BW:		/* server bw */		HandleServerBW(r, packet);		break;		// 设置带宽	case RTMP_PACKET_TYPE_CLIENT_BW:		/* client bw */		HandleClientBW(r, packet);		break;		// 音频数据	case RTMP_PACKET_TYPE_AUDIO:		/* audio data */		/*RTMP_Log(RTMP_LOGDEBUG, "%s, received: audio %lu bytes", __FUNCTION__, packet.m_nBodySize); */		HandleAudio(r, packet);		bHasMediaPacket = 1;		if (!r->m_mediaChannel)			r->m_mediaChannel = packet->m_nChannel;		if (!r->m_pausing)			r->m_mediaStamp = packet->m_nTimeStamp;		break;		// 视频数据	case RTMP_PACKET_TYPE_VIDEO:		/* video data */		/*RTMP_Log(RTMP_LOGDEBUG, "%s, received: video %lu bytes", __FUNCTION__, packet.m_nBodySize); */		HandleVideo(r, packet);		bHasMediaPacket = 1;		if (!r->m_mediaChannel)			r->m_mediaChannel = packet->m_nChannel;		if (!r->m_pausing)			r->m_mediaStamp = packet->m_nTimeStamp;		break;		// flex 流发送(AMF3编码)	case RTMP_PACKET_TYPE_FLEX_STREAM_SEND:		/* flex stream send */		RTMP_Log(RTMP_LOGDEBUG,			"%s, flex stream send, size %u bytes, not supported, ignoring",			__FUNCTION__, packet->m_nBodySize);		break;		// flex共享对象(AMF3编码)	case RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT:		/* flex shared object */		RTMP_Log(RTMP_LOGDEBUG,			"%s, flex shared object, size %u bytes, not supported, ignoring",			__FUNCTION__, packet->m_nBodySize);		break;		// flex消息(AMF3编码)	case RTMP_PACKET_TYPE_FLEX_MESSAGE:		/* flex message */	{		RTMP_Log(RTMP_LOGDEBUG,			"%s, flex message, size %u bytes, not fully supported",			__FUNCTION__, packet->m_nBodySize);		/*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */		/* some DEBUG code */#if 0		RTMP_LIB_AMFObject obj;		int nRes = obj.Decode(packet.m_body+1, packet.m_nBodySize-1);		if(nRes < 0) {			RTMP_Log(RTMP_LOGERROR, "%s, error decoding AMF3 packet", __FUNCTION__);			/*return; */		}		obj.Dump();#endif		if (HandleInvoke(r, packet->m_body + 1, packet->m_nBodySize - 1) == 1)			bHasMediaPacket = 2;		break;	}		// info数据(AMF0编码)	case RTMP_PACKET_TYPE_INFO:		/* metadata (notify) */		RTMP_Log(RTMP_LOGDEBUG, "%s, received: notify %u bytes", __FUNCTION__,			packet->m_nBodySize);		// 处理元数据		if (HandleMetadata(r, packet->m_body, packet->m_nBodySize))			bHasMediaPacket = 1;		break;	// 共享对象	case RTMP_PACKET_TYPE_SHARED_OBJECT:		RTMP_Log(RTMP_LOGDEBUG, "%s, shared object, not supported, ignoring",			__FUNCTION__);		break;		// 命令消息	case RTMP_PACKET_TYPE_INVOKE:		/* invoke */		RTMP_Log(RTMP_LOGDEBUG, "%s, received: invoke %u bytes", __FUNCTION__,			packet->m_nBodySize);		/*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */		// 处理命令		if (HandleInvoke(r, packet->m_body, packet->m_nBodySize) == 1)			bHasMediaPacket = 2;		break;		// Flash视频	case RTMP_PACKET_TYPE_FLASH_VIDEO:	{		/* go through FLV packets and handle metadata packets */		unsigned int pos = 0;		uint32_t nTimeStamp = packet->m_nTimeStamp;		while (pos + 11 < packet->m_nBodySize)		{			uint32_t dataSize = AMF_DecodeInt24(packet->m_body + pos + 1);	/* size without header (11) and PRevTagSize (4) */			if (pos + 11 + dataSize + 4 > packet->m_nBodySize)			{				RTMP_Log(RTMP_LOGWARNING, "Stream corrupt?!");				break;			}			if (packet->m_body[pos] == 0x12)			{				HandleMetadata(r, packet->m_body + pos + 11, dataSize);			}			else if (packet->m_body[pos] == 8 || packet->m_body[pos] == 9)			{				nTimeStamp = AMF_DecodeInt24(packet->m_body + pos + 4);				nTimeStamp |= (packet->m_body[pos + 7] << 24);			}			pos += (11 + dataSize + 4);		}		if (!r->m_pausing)			r->m_mediaStamp = nTimeStamp;		/* FLV tag(s) */		/*RTMP_Log(RTMP_LOGDEBUG, "%s, received: FLV tag(s) %lu bytes", __FUNCTION__, packet.m_nBodySize); */		bHasMediaPacket = 1;		break;	}	default:		RTMP_Log(RTMP_LOGDEBUG, "%s, unknown packet type received: 0x%02x", __FUNCTION__,			packet->m_packetType);#ifdef _DEBUG		RTMP_LogHex(RTMP_LOGDEBUG, packet->m_body, packet->m_nBodySize);#endif	}	return bHasMediaPacket;}

特别说一下:HandleInvoke和HandleCtrl的区别:

1、HandleInvoke处理命令消息,这些命令消息是使用AMF0格式进行编码的

2、HandleCtrl处理用户控制消息,主要用于控制流的各种状态

在这些函数中我们需要关注的是HandleInvoke。

创建流

处理命令消息

    终于快说道重点了,但是在说重点之前还是要看一下,创建流的时机,在函数HandleInvoke中,我们可以看到,处理完成“connect”命令的反馈之后,调用了RTMP_SendCreateStream函数,这个函数的作用就是创建流!

/*** 处理命令消息** 这些命令消息是使用AMF0格式进行编码的** 下面的代码是精简过的*/static intHandleInvoke(RTMP *r, const char *body, unsigned int nBodySize){	AMFObject obj;	AVal method;	double txn;	int ret = 0, nRes;	char pbuf[256], *pend = pbuf + sizeof(pbuf), *enc, **params = NULL;	char *host = r->Link.hostname.av_len ? r->Link.hostname.av_val : "";	char *pageUrl = r->Link.pageUrl.av_len ? r->Link.pageUrl.av_val : "";	int param_count;	AVal av_Command, av_Response;	if (body[0] != 0x02)		/* make sure it is a string method name we start with */	{		RTMP_Log(RTMP_LOGWARNING, "%s, Sanity failed. no string method in invoke packet",			__FUNCTION__);		return 0;	}	nRes = AMF_Decode(&obj, body, nBodySize, FALSE);	if (nRes < 0)	{		RTMP_Log(RTMP_LOGERROR, "%s, error decoding invoke packet", __FUNCTION__);		return 0;	}	AMF_Dump(&obj);	AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method);	txn = AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1));	RTMP_Log(RTMP_LOGDEBUG, "%s, server invoking <%s>", __FUNCTION__, method.av_val);	// 处理”result“	if (AVMATCH(&method, &av__result))	{		AVal methodInvoked = { 0 };		int i;		for (i = 0; i < r->m_numCalls; i++) {			if (r->m_methodCalls[i].num == (int)txn) {				methodInvoked = r->m_methodCalls[i].name;				AV_erase(r->m_methodCalls, &r->m_numCalls, i, FALSE);				break;			}		}		if (!methodInvoked.av_val) {			RTMP_Log(RTMP_LOGDEBUG, "%s, received result id %f without matching request",				__FUNCTION__, txn);			goto leave;		}		RTMP_Log(RTMP_LOGDEBUG, "%s, received result for method call <%s>", __FUNCTION__,			methodInvoked.av_val);		// ”connect“命令完成		if (AVMATCH(&methodInvoked, &av_connect))		{			if (r->Link.token.av_len)			{				AMFObjectProperty p;				if (RTMP_FindFirstMatchingProperty(&obj, &av_secureToken, &p))				{					DecodeTEA(&r->Link.token, &p.p_vu.p_aval);					SendSecureTokenResponse(r, &p.p_vu.p_aval);				}			}			if (r->Link.protocol & RTMP_FEATURE_WRITE)			{				SendReleaseStream(r);				SendFCPublish(r);			}			else			{				// 向服务器发送”确认窗口大小“				RTMP_SendServerBW(r);				RTMP_SendCtrl(r, 3, 0, 300);			}						// 删除无关代码***						// 创建流			RTMP_SendCreateStream(r);		}		// 删除无关代码***				// ”创建流“命令成功		else if (AVMATCH(&methodInvoked, &av_createStream))		{			r->m_stream_id = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3));			// 删除无关代码***			if (r->Link.protocol & RTMP_FEATURE_WRITE)			{				SendPublish(r);			}			else			{				if (r->Link.lFlags & RTMP_LF_PLST)					SendPlaylist(r);								// 发送播放命令				SendPlay(r);				RTMP_SendCtrl(r, 3, r->m_stream_id, r->m_nBufferMS);			}		}		else if (AVMATCH(&methodInvoked, &av_play) ||			AVMATCH(&methodInvoked, &av_publish))		{			r->m_bPlaying = TRUE;		}		free(methodInvoked.av_val);	}	else if (AVMATCH(&method, &av_onBWDone))	{		if (!r->m_nBWCheckCounter)			SendCheckBW(r);	}	// 删除其他代码***	// ***	// 删除其他代码***leave:	AMF_Reset(&obj);	return ret;}

创建流的函数

// 发送”创建流“命令intRTMP_SendCreateStream(RTMP *r){	RTMPPacket packet;	char pbuf[256], *pend = pbuf + sizeof(pbuf);	char *enc;	packet.m_nChannel = 0x03;	/* control channel (invoke) */	packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM;	packet.m_packetType = RTMP_PACKET_TYPE_INVOKE;	packet.m_nTimeStamp = 0;	packet.m_nInfoField2 = 0;	packet.m_hasAbsTimestamp = 0;	packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;	enc = packet.m_body;	// 创建流的命令	enc = AMF_EncodeString(enc, pend, &av_createStream);	enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);	*enc++ = AMF_NULL;		/* NULL */	packet.m_nBodySize = enc - packet.m_body;	// 发送数据包	return RTMP_SendPacket(r, &packet, TRUE);}


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表