欢迎访问我的GitHub
这里分类和汇总了欣宸的全部原创(含配套源码):github.com/zq2599/blog…
准备工作
- 本文要学习的是lalserver处理rtmp推流的功能代码,因此必须要对rtmp协议有所了解,至少要知道握手、chunk、message、messageType、amf0命令的基本概念,有关rtmp协议的资料在网上已经很丰富了,这里就不展开了,仅提供一个wiki作为参考:en.wikipedia.org/wiki/Real-T…
- lalserver源码仓库:github.com/q191201771/…
- 在源码的逻辑和分支较多时,可以结合lal日志来确定真实的执行顺序,让日志输出更丰富的内容,因此这里也会先推流,拿到lal的日志,推流操作曾在《体验开源项目lal》一文中执行过,命令如下
./ffmpeg
-re
-stream_loop -1
-i ../videos/sample.mp4
-c copy
-f flv
'rtmp://127.0.0.1:1935/live/test110'
本篇概览
- 推拉流,这是流媒体技术中的基本功能,本篇通过阅读lal源码,了解推流功能的具体实现
- 本次学习的是rtmp推流服务端源码,总的来说,处理推流的流程如下
- 接下来就一起来学习lal的推流源码,并借此对推流功能做深入了解
处理rtmp推流请求的入口
- 在lalserver侧,起点是rtmp server收到一个远程TCP连接(默认1935端口),来看看lal是如何处理的,也就是rtmp server的处理逻辑,代码在lal/pkg/rtmp/server.go
func (server *Server) RunLoop() error {
for {
conn, err := server.ln.Accept()
if err != nil {
return err
}
go server.handleTcpConnect(conn)
}
}
- 从上述代码可见,每当收到一个TCP连接,就在一个协程中用handleTcpConnect方法处理这个连接,handleTcpConnect方法不涉及细节,内容很简单:为TCP连接创建ServerSession对象,将具体处理交给ServerSession对象执行
func (server *Server) handleTcpConnect(conn net.Conn) {
Log.Infof("accept a rtmp connection. remoteAddr=%s", conn.RemoteAddr().String())
session := NewServerSession(server, conn)
_ = session.RunLoop()
if session.DisposeByObserverFlag {
return
}
switch session.sessionStat.BaseType() {
case base.SessionBaseTypePubStr:
server.observer.OnDelRtmpPubSession(session)
case base.SessionBaseTypeSubStr:
server.observer.OnDelRtmpSubSession(session)
}
}
- 从上面的代码可以看出,主要业务逻辑都在ServerSession对象的RunLoop方法中,展开后如下所示,先握手(handshake),再执行具体的逻辑(runReadLoop)
func (s *ServerSession) RunLoop() (err error) {
if err = s.handshake(); err != nil {
_ = s.dispose(err)
return err
}
err = s.runReadLoop()
_ = s.dispose(err)
return err
}
握手
- 看握手(handshake)代码之前,先温习一下rtmp的握手流程
- 再看握手的源码,一目了然,不过代码和图略有点不同:lal的代码中,S0S1S2是连发的
func (s *ServerSession) handshake() error {
if err := s.hs.ReadC0C1(s.conn); err != nil {
return err
}
Log.Infof("[%s] W Handshake S0+S1+S2.", s.UniqueKey())
if err := s.hs.WriteS0S1S2(s.conn); err != nil {
return err
}
if err := s.hs.ReadC2(s.conn); err != nil {
return err
}
Log.Infof("[%s] < R Handshake C2.", s.UniqueKey())
return nil
}
- 想看下具体读取C0、C1、C2这些数据的具体逻辑?如下图,最普通的io读取而已
读取和处理chunk
- 握手成功后,接着就是lal/pkg/rtmp/server_session.go#runReadLoop方法,里面直接调用lal/pkg/rtmp/chunk_composer.go#RunLoop,记住RunLoop的第二个入参,那是从chunk中获取到完整消息后的回调方法(即lal/pkg/rtmp/server_session.go#doMsg),也是个重点
func (s *ServerSession) runReadLoop() error {
return s.chunkComposer.RunLoop(s.conn, s.doMsg)
}
- 真正的核心代码到了,推流客户端和lalserver握手成功后,推流操作的所有逻辑都在这里:lal/pkg/rtmp/chunk_composer.go#RunLoop,这里面的代码分为几部分,代码有点长就不贴出来了,来看几个关键逻辑
- 首先看到的是个无限循环,每当处理完一个chunk包后,就继续处理下一个chunk包
- 根据chunk steam id(csid)确定当前包对应的消息,该消息由一个或多个chunk包组成
- 下面这段比较重要,每个消息都有自己的csid,进而对应一个stream对象,该消息对应的所有包都保存在stream的成员变量中,并记录已经保存的长度,当保存的内容长度达到消息长度时,意味着该消息对应的所有数据已经全部获取完成了,可以执行处理该消息的代码了
- 上图红色箭头2所指的Flush方法,其实并没有内存或者硬盘的读写操作,而是仅修改了位置变量,表示buffer中那些是正式的消息内容,这个设计值得学习
- 从chunk中拿到了完整的消息,接下来就要执行处理消息的逻辑了,如下图,之所以有两处回调代码,是因为如果消息类型是聚集消息(Aggregate Message,22),就意味着从chunk中取得的一条消息,实际上是多条消息,需要拆分后逐一回调处理
- 以上就是chunk的处理逻辑了,现在已从chunk中得到完整消息,该看看消息的处理逻辑了
处理消息
- 前面的图中可以看出,处理消息的代码是红色箭头所指的cb(stream),实际对应的代码是server_session.go#doMsg
- doMsg的代码简单明了,根据不同消息类型做不同的操作
func (s *ServerSession) doMsg(stream *Stream) error {
if err := s.writeAcknowledgementIfNeeded(stream); err != nil {
return err
}
//log.Debugf("%d %d %v", stream.header.msgTypeId, stream.msgLen, stream.header)
switch stream.header.MsgTypeId {
case base.RtmpTypeIdWinAckSize:
return s.doWinAckSize(stream)
case base.RtmpTypeIdSetChunkSize:
// noop
// 因为底层的 chunk composer 已经处理过了,这里就不用处理
case base.RtmpTypeIdCommandMessageAmf0:
return s.doCommandMessage(stream)
case base.RtmpTypeIdCommandMessageAmf3:
return s.doCommandAmf3Message(stream)
case base.RtmpTypeIdMetadata:
return s.doDataMessageAmf0(stream)
case base.RtmpTypeIdAck:
return s.doAck(stream)
case base.RtmpTypeIdUserControl:
s.doUserControl(stream)
case base.RtmpTypeIdAudio:
fallthrough
case base.RtmpTypeIdVideo:
if s.sessionStat.BaseType() != base.SessionBaseTypePubStr {
return nazaerrors.Wrap(base.ErrRtmpUnexpectedMsg)
}
s.avObserver.OnReadRtmpAvMsg(stream.toAvMsg())
default:
Log.Warnf("[%s] read unknown message. typeid=%d, %s", s.UniqueKey(), stream.header.MsgTypeId, stream.toDebugString())
}
return nil
}
- 那么问题来了,为了学习推流的知识,每种消息的处理逻辑代码都要去读一遍吗?貌似有点多,而且这样顺序读也不清楚各消息之间的顺序或者依赖关系,这时候需要偷懒了...
- 为了搞清楚推流时协议交互的具体情况,对doMsg方法略作修改,增加下图黄色箭头这行代码,然后编译运行
- 再用FFmpeg做一次推流,拿到新的日志
- 推流后的日志如下(通过grep命令,只看上面那行日志)
msg header {Csid:3 MsgLen:139 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
msg header {Csid:2 MsgLen:4 MsgTypeId:1 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
msg header {Csid:3 MsgLen:36 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
msg header {Csid:3 MsgLen:32 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
msg header {Csid:3 MsgLen:25 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
msg header {Csid:8 MsgLen:37 MsgTypeId:20 MsgStreamId:1 TimestampAbs:0} - server_session.go:215
msg header {Csid:4 MsgLen:388 MsgTypeId:18 MsgStreamId:1 TimestampAbs:0} - server_session.go:215
msg header {Csid:6 MsgLen:43 MsgTypeId:9 MsgStreamId:1 TimestampAbs:0} - server_session.go:215
msg header {Csid:4 MsgLen:4 MsgTypeId:8 MsgStreamId:1 TimestampAbs:0} - server_session.go:215
msg header {Csid:6 MsgLen:105227 MsgTypeId:9 MsgStreamId:1 TimestampAbs:0} - server_session.go:215
msg header {Csid:4 MsgLen:969 MsgTypeId:8 MsgStreamId:1 TimestampAbs:0} - server_session.go:215
msg header {Csid:4 MsgLen:1013 MsgTypeId:8 MsgStreamId:1 TimestampAbs:21} - server_session.go:215
msg header {Csid:6 MsgLen:1559 MsgTypeId:9 MsgStreamId:1 TimestampAbs:40} - server_session.go:215
msg header {Csid:4 MsgLen:1028 MsgTypeId:8 MsgStreamId:1 TimestampAbs:43} - server_session.go:215
msg header {Csid:4 MsgLen:1032 MsgTypeId:8 MsgStreamId:1 TimestampAbs:64} - server_session.go:215
msg header {Csid:6 MsgLen:2158 MsgTypeId:9 MsgStreamId:1 TimestampAbs:80} - server_session.go:215
msg header {Csid:4 MsgLen:992 MsgTypeId:8 MsgStreamId:1 TimestampAbs:85} - server_session.go:215
msg header {Csid:4 MsgLen:960 MsgTypeId:8 MsgStreamId:1 TimestampAbs:107} - server_session.go:215
msg header {Csid:6 MsgLen:2213 MsgTypeId:9 MsgStreamId:1 TimestampAbs:120} - server_session.go:215
msg header {Csid:4 MsgLen:975 MsgTypeId:8 MsgStreamId:1 TimestampAbs:128} - server_session.go:215
msg header {Csid:4 MsgLen:991 MsgTypeId:8 MsgStreamId:1 TimestampAbs:149} - server_session.go:215
msg header {Csid:6 MsgLen:2528 MsgTypeId:9 MsgStreamId:1 TimestampAbs:160} - server_session.go:215
msg header {Csid:4 MsgLen:1011 MsgTypeId:8 MsgStreamId:1 TimestampAbs:171} - server_session.go:215
msg header {Csid:4 MsgLen:1002 MsgTypeId:8 MsgStreamId:1 TimestampAbs:192} - server_session.go:215
- 再来看wiki上对消息类型的解释
- 结合协议和日志可以看出,推流开始后,除了第二个设置chunk,其他主要是AMF0编码的RTMP命令消息,以及音频视频数据包
- 日志中有大量MsgTypeId等于20的消息,对应16进制是0x14,也就是AMF0命令,所以这类消息的处理逻辑需要重点关注,doMsg代码中可见,处理此类消息的方法是doCommandMessage
func (s *ServerSession) doCommandMessage(stream *Stream) error {
cmd, err := stream.msg.readStringWithType()
if err != nil {
return err
}
tid, err := stream.msg.readNumberWithType()
if err != nil {
return err
}
switch cmd {
case "connect":
return s.doConnect(tid, stream)
case "createStream":
return s.doCreateStream(tid, stream)
case "publish":
return s.doPublish(tid, stream)
case "play":
return s.doPlay(tid, stream)
case "releaseStream":
fallthrough
case "FCPublish":
fallthrough
case "FCUnpublish":
fallthrough
case "getStreamLength":
fallthrough
case "deleteStream":
Log.Debugf("[%s] read command message, ignore it. cmd=%s, %s", s.UniqueKey(), cmd, stream.toDebugString())
default:
Log.Errorf("[%s] read unknown command message. cmd=%s, %s", s.UniqueKey(), cmd, stream.toDebugString())
}
return nil
}
- 各个命令的处理方法,doConnect、doCreateStream、doPublish、doPlay它们的内部都有代表自己特征的日志,因此,根据日志内容很容易梳理出推流时收到命令的顺序,如下:
connect
->
releaseStream
->
FCPublish
->
createStream
->
publish
->
MetaDzta
->
然后就是音频视频的chunk包
- 打开doConnect方法,看lalserver收到FFmpeg的connect命令后做了什么,如下,可见从connect命令的参数中拿到了appName(本例中是live),tcUrl(本例中是rtmp://127.0.0.1:1935/live),然后以命令的形式向FFmpeg连续回复消息
func (s *ServerSession) doConnect(tid int, stream *Stream) error {
val, err := stream.msg.readObjectWithType()
if err != nil {
return err
}
s.appName, err = val.FindString("app")
if err != nil {
return err
}
s.tcUrl, err = val.FindString("tcUrl")
if err != nil {
Log.Warnf("[%s] tcUrl not exist.", s.UniqueKey())
}
Log.Infof("[%s] W Window Acknowledgement Size %d.", s.UniqueKey(), windowAcknowledgementSize)
if err := s.packer.writeWinAckSize(s.conn, windowAcknowledgementSize); err != nil {
return err
}
Log.Infof("[%s] > W Set Peer Bandwidth.", s.UniqueKey())
if err := s.packer.writePeerBandwidth(s.conn, peerBandwidth, peerBandwidthLimitTypeDynamic); err != nil {
return err
}
Log.Infof("[%s] > W SetChunkSize %d.", s.UniqueKey(), LocalChunkSize)
if err := s.packer.writeChunkSize(s.conn, LocalChunkSize); err != nil {
return err
}
Log.Infof("[%s] > W _result('NetConnection.Connect.Success').", s.UniqueKey())
oe, err := val.FindNumber("objectEncoding")
if oe != 0 && oe != 3 {
oe = 0
}
if err := s.packer.writeConnectResult(s.conn, tid, oe); err != nil {
return err
}
return nil
}
- connect完成后是createStream命令,对应的doCreateStream方法如下,非常简单,就是立即回复,消息类型还是createStream,想想也是,流相关的信息,lal这边都已经准备好了,收到创建流的命令也无需什么操作
func (s *ServerSession) doCreateStream(tid int, stream *Stream) error {
Log.Infof("[%s] W _result().", s.UniqueKey())
if err := s.packer.writeCreateStreamResult(s.conn, tid); err != nil {
return err
}
return nil
}
- 接下来就是publish命令,代码就不贴出了,主要是获取流名、回复onStatus、设置连接的超时时间等,另外如果有观察者,还会向其发送publish相关的事件,而消费publish事件的代码也值得一看,在lal/pkg/logic/server_manager__.go#OnNewRtmpPubSession,如下,可见主要是鉴权、该流相关的Group处理,以及外部监听的通知,另外,如果配置中开启了录制功能,在group.AddRtmpPubSession方法中就会做相关的初始化操作
func (sm *ServerManager) OnNewRtmpPubSession(session *rtmp.ServerSession) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
info := base.Session2PubStartInfo(session)
// 先做simple auth鉴权
if err := sm.option.Authentication.OnPubStart(info); err != nil {
return err
}
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
if err := group.AddRtmpPubSession(session); err != nil {
return err
}
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.NotifyHandler.OnPubStart(info)
return nil
}
- 接下来是Metadata类型的消息处理,该消息中包含了要推来的流的详细参数,例如宽高、码率、帧率等,对应方法是doDataMessageAmf0,该方法中最重要的是对观察者执行了通知,对应的方法在lal/pkg/logic/group__core_streaming.go#OnReadRtmpAvMsg,
- OnReadRtmpAvMsg方法中会调用broadcastByRtmpMsg,这是个重点,里面会根据group中保存的订阅者的情况,开始发送广播,让所有订阅者都能获得媒体流的详细参数(这里面代码非常复杂,涉及到不同协议的录制、转推、拉流等),在推流的场景,其中这段代码可以重点看下
- 等到上述命令全部响应完毕,意味着准备工作已经完成,可以静候媒体流数据的到来了
音频和视频消息的处理
- 回到doMsg方法看看音视频消息的处理,如下图,先做个最基本的检查,然后交给lal/pkg/logic/group__core_streaming.go#OnReadRtmpAvMsg处理
- 又是这个OnReadRtmpAvMsg方法,前面响应Metadata的时候就是它,现在处理音视频消息还是它,那阅读代码时就轻车熟路了,还是交给了broadcastByRtmpMsg方法去处理的
- broadcastByRtmpMsg中,重点关注lal/pkg/remux/gop_cache.go#Feed,这里面会对关键帧的seqheader和内容进行缓存
- broadcastByRtmpMsg方法中,还有本篇最重要的两处代码(个人是这么认为的)
- 第一处,如下图,如果是一个新来的拉流请求,会将媒体流属性写到拉流端,另外就是刚才缓存的关键帧及其seqheader信息,这样的结果是拉流端刚建立连接就能拿到关键帧(快速出首帧的效果),而不用等待推流端推来的最新关键帧,毕竟一个gop时间可能很长
- 第二段关键代码如下图,调用write2RtmpSubSessions,将本次收到的音视频消息转发
- 展开write2RtmpSubSessions,恍然大悟了,个人认为,这就是推拉流的核心代码,将每一个收到的音视频数据,直接从拉流端的TCP连接中写进去(展开session.Write方法即可见到)
func (group *Group) write2RtmpSubSessions(b []byte) {
for session := range group.rtmpSubSessionSet {
if session.IsFresh || session.ShouldWaitVideoKeyFrame {
continue
}
_ = session.Write(b)
}
}
推流结束的处理
- 如果用ctrl+c结束FFmpeg推流,lal这边会做什么处理呢?还是偷个懒,先看日志吧
2023/04/02 09:47:24.346491 ^[[22;36m INFO ^[[0mmsg header {Csid:4 MsgLen:986 MsgTypeId:8 MsgStreamId:1 TimestampAbs:9323} - server_session.go:215
2023/04/02 09:47:24.346558 ^[[22;36m INFO ^[[0mmsg header {Csid:4 MsgLen:950 MsgTypeId:8 MsgStreamId:1 TimestampAbs:9344} - server_session.go:215
2023/04/02 09:47:24.346605 ^[[22;36m INFO ^[[0mmsg header {Csid:6 MsgLen:5 MsgTypeId:9 MsgStreamId:1 TimestampAbs:9320} - server_session.go:215
2023/04/02 09:47:24.346654 ^[[22;33m WARN ^[[0m[RTMP2MPEGTS1] rtmp msg too short, ignore. header={Csid:6 MsgLen:5 MsgTypeId:9 MsgStreamId:1 TimestampAbs:9320}, payload=00000000 17 02 00 00 00 |.....|
- rtmp2mpegts.go:196
2023/04/02 09:47:24.346681 ^[[22;33m WARN ^[[0mrtmp msg too short, ignore. header={Csid:6 MsgLen:5 MsgTypeId:9 MsgStreamId:1 TimestampAbs:9320}, payload=00000000 17 02 00 00 00 |.....|
- rtmp2rtsp.go:102
2023/04/02 09:47:24.346958 ^[[22;36m INFO ^[[0mmsg header {Csid:3 MsgLen:34 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
2023/04/02 09:47:24.346987 ^[[22;34mDEBUG ^[[0m[RTMPPUBSUB1] read command message, ignore it. cmd=FCUnpublish, header={Csid:3 MsgLen:34 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0}, b=len(core)=128, rpos=23, wpos=34, hex=00000000 05 02 00 07 74 65 73 74 31 31 30 |....test110|
- server_session.go:357
2023/04/02 09:47:24.347012 ^[[22;36m INFO ^[[0mmsg header {Csid:3 MsgLen:34 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
2023/04/02 09:47:24.347028 ^[[22;34mDEBUG ^[[0m[RTMPPUBSUB1] read command message, ignore it. cmd=deleteStream, header={Csid:3 MsgLen:34 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0}, b=len(core)=128, rpos=24, wpos=34, hex=00000000 05 00 3f f0 00 00 00 00 00 00 |..?.......|
- server_session.go:357
2023/04/02 09:47:24.347050 ^[[22;34mDEBUG ^[[0m[NAZACONN1] close once. err=EOF - connection.go:504
2023/04/02 09:47:24.347168 ^[[22;36m INFO ^[[0m[RTMPPUBSUB1] lifecycle dispose rtmp ServerSession. err=EOF - server_session.go:538
2023/04/02 09:47:24.347183 ^[[22;34mDEBUG ^[[0m[NAZACONN1] Close. - connection.go:376
2023/04/02 09:47:24.347199 ^[[22;34mDEBUG ^[[0m[GROUP1] [RTMPPUBSUB1] del rtmp PubSession from group. - group__in.go:318
2023/04/02 09:47:24.347303 ^[[22;36m INFO ^[[0m[HLSMUXER1] lifecycle dispose hls muxer. - muxer.go:126
2023/04/02 09:47:24.570509 ^[[22;36m INFO ^[[0merase inactive group. [GROUP1] - server_manager__.go:299
2023/04/02 09:47:24.570639 ^[[22;36m INFO ^[[0m[GROUP1] lifecycle dispose group. - group__.go:207
- 从上述日志可见,lal会收到FFmpeg发来的FCUnpublish、deleteStream等命令,但lal并不理会这些命令,如下图,而是等到TCP连接出现EOF错误的时候,由该错误出发
- 具体的,处理TCP错误,结束推流的代码如下图所示
- 至此,推流服务的相关源码的学习就完成了,借助lal第一次了解到rtmp推流服务的细节,基本功扎实了,接下来的学习就会事半功倍,接下来咱们去挑战另一个基础功能:rtmp拉流
欢迎关注掘金:程序员欣宸
学习路上,你不孤单,欣宸原创一路相伴...