Skip to content

Commit

Permalink
Merge pull request #100 from joestarzxh/master
Browse files Browse the repository at this point in the history
[fix]gb主动关闭客户端
  • Loading branch information
joestarzxh authored Jul 18, 2024
2 parents d5b10b8 + 12d4f75 commit 1466895
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 112 deletions.
1 change: 0 additions & 1 deletion conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ type HlsConfig struct {
type GB28181Config struct {
Enable bool `json:"enable"` // gb28181使能标志
ListenAddr string `json:"listen_addr"` // gb28181监听地址
SipNetwork string `json:"sip_network"` // 传输协议,默认UDP,可选TCP
SipIP string `json:"sip_ip"` // sip 服务器公网IP
SipPort uint16 `json:"sip_port"` // sip 服务器端口,默认 5060
Serial string `json:"serial"` // sip 服务器 id, 默认 34020000002000000001
Expand Down
1 change: 0 additions & 1 deletion conf/lalmax.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
"realm": "3402000000",
"sip_ip": "192.168.254.165",
"sip_port": 5060,
"sip_network": "udp",
"username": "",
"media_config": {
"media_ip": "192.168.254.165"
Expand Down
35 changes: 14 additions & 21 deletions gb28181/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,9 @@ func (channel *Channel) Invite(opt *InviteOptions, streamName string, playInfo *
nazalog.Error("invite failed, err:", err, " invite msg:", invite.String())

//jay 在media端口监听成功后,但是sip发送失败时
if !playInfo.SinglePort {
if channel.observer != nil {
if err = channel.observer.OnStopMediaServer(playInfo.NetWork, playInfo.SinglePort, channel.device.ID, channel.ChannelId); err != nil {
nazalog.Errorf("gb28181 MediaServer stop err:%s", err.Error())
}
if channel.observer != nil {
if err = channel.observer.OnStopMediaServer(playInfo.NetWork, playInfo.SinglePort, channel.device.ID, channel.ChannelId, ""); err != nil {
nazalog.Errorf("gb28181 MediaServer stop err:%s", err.Error())
}
}

Expand Down Expand Up @@ -249,15 +247,15 @@ func (channel *Channel) Invite(opt *InviteOptions, streamName string, playInfo *
channel.ackReq = ackReq
channel.playInfo = playInfo

err = sipsvr.Send(ackReq)
err = channel.device.sipSvr.Send(ackReq)
} else {
if !playInfo.SinglePort {
if channel.observer != nil {
if err = channel.observer.OnStopMediaServer(playInfo.NetWork, playInfo.SinglePort, channel.device.ID, channel.ChannelId); err != nil {
nazalog.Errorf("gb28181 MediaServer stop err:%s", err.Error())
}

if channel.observer != nil {
if err = channel.observer.OnStopMediaServer(playInfo.NetWork, playInfo.SinglePort, channel.device.ID, channel.ChannelId, ""); err != nil {
nazalog.Errorf("gb28181 MediaServer stop err:%s", err.Error())
}
}

}
return
}
Expand All @@ -271,11 +269,9 @@ func (channel *Channel) GetCallId() string {
}
func (channel *Channel) stopMediaServer() (err error) {
if channel.playInfo != nil {
if !channel.playInfo.SinglePort {
if channel.observer != nil {
if err = channel.observer.OnStopMediaServer(channel.playInfo.NetWork, channel.playInfo.SinglePort, channel.device.ID, channel.ChannelId); err != nil {
nazalog.Errorf("gb28181 MediaServer stop err:%s", err.Error())
}
if channel.observer != nil {
if err = channel.observer.OnStopMediaServer(channel.playInfo.NetWork, channel.playInfo.SinglePort, channel.device.ID, channel.ChannelId, channel.playInfo.StreamName); err != nil {
nazalog.Errorf("gb28181 MediaServer stop err:%s", err.Error())
}
}
}
Expand All @@ -288,21 +284,18 @@ func (channel *Channel) byeClear() (err error) {
return
}
func (channel *Channel) Bye(streamName string) (err error) {

if channel.ackReq != nil {
byeReq := channel.ackReq
channel.ackReq = nil
byeReq.SetMethod(sip.BYE)
seq, _ := byeReq.CSeq()
seq.SeqNo += 1
sipsvr.Send(byeReq)
channel.device.sipSvr.Send(byeReq)
} else {
err = errors.New("channel has been closed")
}

channel.stopMediaServer()
return err

}
func (channel *Channel) CreateRequst(Method sip.RequestMethod, conf config.GB28181Config) (req sip.Request) {
d := channel.device
Expand Down Expand Up @@ -357,7 +350,7 @@ func (channel *Channel) CreateRequst(Method sip.RequestMethod, conf config.GB281
nil,
)

req.SetTransport(conf.SipNetwork)
req.SetTransport(channel.device.network)
req.SetDestination(d.NetAddr)
return req
}
Expand Down
13 changes: 11 additions & 2 deletions gb28181/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gb28181

import (
"context"
"github.com/ghettovoice/gosip"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -58,12 +59,20 @@ type Device struct {

observer IMediaOpObserver
conf config.GB28181Config

network string
sipSvr gosip.Server
}

func (d *Device) WithMediaServer(observer IMediaOpObserver) {
d.observer = observer
}

func (d *Device) WithSipSvr(sipSvr gosip.Server) *Device {
d.sipSvr = sipSvr
return d
}

func (d *Device) syncChannels() {
if time.Since(d.lastSyncTime) > 2*time.Second {
d.lastSyncTime = time.Now()
Expand Down Expand Up @@ -171,7 +180,7 @@ func (d *Device) CreateRequest(Method sip.RequestMethod, conf config.GB28181Conf
nil,
)

req.SetTransport(conf.SipNetwork)
req.SetTransport(d.network)
req.SetDestination(d.NetAddr)
return
}
Expand Down Expand Up @@ -330,5 +339,5 @@ func (d *Device) UpdateChannelPosition(channelId string, gpsTime string, lng str
}

func (d *Device) SipRequestForResponse(request sip.Request) (sip.Response, error) {
return sipsvr.RequestWithContext(context.Background(), request)
return d.sipSvr.RequestWithContext(context.Background(), request)
}
23 changes: 20 additions & 3 deletions gb28181/mediaserver/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net"
"sync"
"time"

"github.com/q191201771/lalmax/gb28181/mpegps"
Expand Down Expand Up @@ -49,6 +50,10 @@ type Conn struct {

buffer *bytes.Buffer
key string

mediaServer *GB28181MediaServer
one sync.Once
oneSaveConn sync.Once
}

func NewConn(conn net.Conn, observer IGbObserver, lal logic.ILalServer) *Conn {
Expand All @@ -65,13 +70,16 @@ func NewConn(conn net.Conn, observer IGbObserver, lal logic.ILalServer) *Conn {

return c
}
func (c *Conn) SetMediaServer(mediaServer *GB28181MediaServer) {
c.mediaServer = mediaServer
}
func (c *Conn) SetKey(key string) {
c.key = key
}
func (c *Conn) Serve() (err error) {
defer func() {
nazalog.Info("conn close, err:", err)
c.conn.Close()
c.Close()

if c.observer != nil {
c.observer.NotifyClose(c.streamName)
Expand All @@ -85,7 +93,7 @@ func (c *Conn) Serve() (err error) {
nazalog.Info("gb28181 conn, remoteaddr:", c.conn.RemoteAddr().String(), " localaddr:", c.conn.LocalAddr().String())

for {
c.conn.SetReadDeadline(time.Now().Add(30 * time.Second))
c.conn.SetReadDeadline(time.Now().Add(10 * time.Second))
pkt := &rtp.Packet{}
if c.conn.RemoteAddr().Network() == "udp" {
buf := make([]byte, 1472*4)
Expand Down Expand Up @@ -137,6 +145,11 @@ func (c *Conn) Serve() (err error) {
}
c.check = true
c.streamName = mediaInfo.StreamName
c.oneSaveConn.Do(func() {
if c.mediaServer != nil {
c.mediaServer.conns.Store(c.streamName, c)
}
})
if len(mediaInfo.DumpFileName) > 0 {
c.psDumpFile = base.NewDumpFile()
if err = c.psDumpFile.OpenToWrite(mediaInfo.DumpFileName); err != nil {
Expand Down Expand Up @@ -253,7 +266,11 @@ func (c *Conn) OnFrame(frame []byte, cid mpegps.PsStreamType, pts uint64, dts ui
c.lalSession.FeedAvPacket(pkt)
}
}

func (c *Conn) Close() {
c.one.Do(func() {
c.conn.Close()
})
}
func getPayloadType(cid mpegps.PsStreamType) base.AvPacketPt {
switch cid {
case mpegps.PsStreamAac:
Expand Down
26 changes: 17 additions & 9 deletions gb28181/mediaserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type GB28181MediaServer struct {
observer IGbObserver
mediaKey string

conn *Conn //增加链接对象,目前只适用于多端口
conns sync.Map //增加链接对象,目前只适用于多端口
}

func NewGB28181MediaServer(listenPort int, mediaKey string, observer IGbObserver, lal logic.ILalServer) *GB28181MediaServer {
Expand Down Expand Up @@ -61,21 +61,29 @@ func (s *GB28181MediaServer) Start(listener net.Listener) (err error) {

c := NewConn(conn, s.observer, s.lalServer)
c.SetKey(s.mediaKey)

s.conn = c
go c.Serve()
c.SetMediaServer(s)
go func() {
c.Serve()
s.conns.Delete(c.streamName)
}()
}
}()
}
return
}
func (s *GB28181MediaServer) CloseConn(streamName string) {
if v, ok := s.conns.Load(streamName); ok {
conn := v.(*Conn)
conn.Close()
}
}
func (s *GB28181MediaServer) Dispose() {
s.disposeOnce.Do(func() {

if s.conn != nil {
s.conn.conn.Close()
}

s.conns.Range(func(_, value any) bool {
conn := value.(*Conn)
conn.Close()
return true
})
if s.listener != nil {
s.listener.Close()
s.listener = nil
Expand Down
Loading

0 comments on commit 1466895

Please sign in to comment.