Skip to content

Commit

Permalink
add livego lib
Browse files Browse the repository at this point in the history
  • Loading branch information
lei006 committed Mar 9, 2021
1 parent d370d5b commit c8adb90
Show file tree
Hide file tree
Showing 80 changed files with 10,527 additions and 30 deletions.
12 changes: 12 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,22 @@ module github.com/lei006/go-assist
go 1.16

require (
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6 // indirect
github.com/auth0/go-jwt-middleware v1.0.0 // indirect
github.com/beego/beego/v2 v2.0.1 // indirect
github.com/deepch/vdk v0.0.0-20210218105644-f6de3acad035 // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/form3tech-oss/jwt-go v3.2.2+incompatible // indirect
github.com/go-redis/redis/v7 v7.4.0 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pixelbender/go-sdp v1.1.0 // indirect
github.com/satori/go.uuid v1.2.0 // indirect
github.com/sirupsen/logrus v1.4.2 // indirect
github.com/sohaha/zlsgo v1.1.5 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.7.1 // indirect
github.com/teris-io/shortid v0.0.0-20201117134242-e59966efd125 // indirect
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77 // indirect
golang.org/x/sys v0.0.0-20210228012217-479acdf4ea46 // indirect
)
155 changes: 155 additions & 0 deletions livego/av/av.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package av

import (
"fmt"
"io"
)

const (
TAG_AUDIO = 8
TAG_VIDEO = 9
TAG_SCRIPTDATAAMF0 = 18
TAG_SCRIPTDATAAMF3 = 0xf
)

const (
MetadatAMF0 = 0x12
MetadataAMF3 = 0xf
)

const (
SOUND_MP3 = 2
SOUND_NELLYMOSER_16KHZ_MONO = 4
SOUND_NELLYMOSER_8KHZ_MONO = 5
SOUND_NELLYMOSER = 6
SOUND_ALAW = 7
SOUND_MULAW = 8
SOUND_AAC = 10
SOUND_SPEEX = 11

SOUND_5_5Khz = 0
SOUND_11Khz = 1
SOUND_22Khz = 2
SOUND_44Khz = 3

SOUND_8BIT = 0
SOUND_16BIT = 1

SOUND_MONO = 0
SOUND_STEREO = 1

AAC_SEQHDR = 0
AAC_RAW = 1
)

const (
AVC_SEQHDR = 0
AVC_NALU = 1
AVC_EOS = 2

FRAME_KEY = 1
FRAME_INTER = 2

VIDEO_H264 = 7
)

var (
PUBLISH = "publish"
PLAY = "play"
)

// Header can be converted to AudioHeaderInfo or VideoHeaderInfo
type Packet struct {
IsAudio bool
IsVideo bool
IsMetadata bool
TimeStamp uint32 // dts
StreamID uint32
Header PacketHeader
Data []byte
}

type PacketHeader interface {
}

type AudioPacketHeader interface {
PacketHeader
SoundFormat() uint8
AACPacketType() uint8
}

type VideoPacketHeader interface {
PacketHeader
IsKeyFrame() bool
IsSeq() bool
CodecID() uint8
CompositionTime() int32
}

type Demuxer interface {
Demux(*Packet) (ret *Packet, err error)
}

type Muxer interface {
Mux(*Packet, io.Writer) error
}

type SampleRater interface {
SampleRate() (int, error)
}

type CodecParser interface {
SampleRater
Parse(*Packet, io.Writer) error
}

type GetWriter interface {
GetWriter(Info) WriteCloser
}

type Handler interface {
HandleReader(ReadCloser)
HandleWriter(WriteCloser)
}

type Alive interface {
Alive() bool
}

type Closer interface {
Info() Info
Close(error)
}

type CalcTime interface {
CalcBaseTimestamp()
}

type Info struct {
Key string
URL string
UID string
Inter bool
}

func (info Info) IsInterval() bool {
return info.Inter
}

func (info Info) String() string {
return fmt.Sprintf("<key: %s, URL: %s, UID: %s, Inter: %v>",
info.Key, info.URL, info.UID, info.Inter)
}

type ReadCloser interface {
Closer
Alive
Read(*Packet) error
}

type WriteCloser interface {
Closer
Alive
CalcTime
Write(*Packet) error
}
55 changes: 55 additions & 0 deletions livego/av/rwbase.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package av

import (
"sync"
"time"
)

type RWBaser struct {
lock sync.Mutex
timeout time.Duration
PreTime time.Time
BaseTimestamp uint32
LastVideoTimestamp uint32
LastAudioTimestamp uint32
}

func NewRWBaser(duration time.Duration) RWBaser {
return RWBaser{
timeout: duration,
PreTime: time.Now(),
}
}

func (rw *RWBaser) BaseTimeStamp() uint32 {
return rw.BaseTimestamp
}

func (rw *RWBaser) CalcBaseTimestamp() {
if rw.LastAudioTimestamp > rw.LastVideoTimestamp {
rw.BaseTimestamp = rw.LastAudioTimestamp
} else {
rw.BaseTimestamp = rw.LastVideoTimestamp
}
}

func (rw *RWBaser) RecTimeStamp(timestamp, typeID uint32) {
if typeID == TAG_VIDEO {
rw.LastVideoTimestamp = timestamp
} else if typeID == TAG_AUDIO {
rw.LastAudioTimestamp = timestamp
}
}

func (rw *RWBaser) SetPreTime() {
rw.lock.Lock()
rw.PreTime = time.Now()
rw.lock.Unlock()
}

func (rw *RWBaser) Alive() bool {
rw.lock.Lock()
b := !(time.Now().Sub(rw.PreTime) >= rw.timeout)
rw.lock.Unlock()
return b
}
134 changes: 134 additions & 0 deletions livego/configure/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package configure

import (
"fmt"

"github.com/lei006/go-assist/livego/utils/uid"

"github.com/go-redis/redis/v7"
"github.com/patrickmn/go-cache"
log "github.com/sirupsen/logrus"
)

type RoomKeysType struct {
redisCli *redis.Client
localCache *cache.Cache
}

var RoomKeys = &RoomKeysType{
localCache: cache.New(cache.NoExpiration, 0),
}

var saveInLocal = true

func Init() {
saveInLocal = len(Config.GetString("redis_addr")) == 0
if saveInLocal {
return
}

RoomKeys.redisCli = redis.NewClient(&redis.Options{
Addr: Config.GetString("redis_addr"),
Password: Config.GetString("redis_pwd"),
DB: 0,
})

_, err := RoomKeys.redisCli.Ping().Result()
if err != nil {
log.Panic("Redis: ", err)
}

log.Info("Redis connected")
}

// set/reset a random key for channel
func (r *RoomKeysType) SetKey(channel string) (key string, err error) {
if !saveInLocal {
for {
key = uid.RandStringRunes(48)
if _, err = r.redisCli.Get(key).Result(); err == redis.Nil {
err = r.redisCli.Set(channel, key, 0).Err()
if err != nil {
return
}

err = r.redisCli.Set(key, channel, 0).Err()
return
} else if err != nil {
return
}
}
}

for {
key = uid.RandStringRunes(48)
if _, found := r.localCache.Get(key); !found {
r.localCache.SetDefault(channel, key)
r.localCache.SetDefault(key, channel)
break
}
}
return
}

func (r *RoomKeysType) GetKey(channel string) (newKey string, err error) {
if !saveInLocal {
if newKey, err = r.redisCli.Get(channel).Result(); err == redis.Nil {
newKey, err = r.SetKey(channel)
log.Debugf("[KEY] new channel [%s]: %s", channel, newKey)
return
}

return
}

var key interface{}
var found bool
if key, found = r.localCache.Get(channel); found {
return key.(string), nil
}
newKey, err = r.SetKey(channel)
log.Debugf("[KEY] new channel [%s]: %s", channel, newKey)
return
}

func (r *RoomKeysType) GetChannel(key string) (channel string, err error) {
if !saveInLocal {
return r.redisCli.Get(key).Result()
}

chann, found := r.localCache.Get(key)
if found {
return chann.(string), nil
} else {
return "", fmt.Errorf("%s does not exists", key)
}
}

func (r *RoomKeysType) DeleteChannel(channel string) bool {
if !saveInLocal {
return r.redisCli.Del(channel).Err() != nil
}

key, ok := r.localCache.Get(channel)
if ok {
r.localCache.Delete(channel)
r.localCache.Delete(key.(string))
return true
}
return false
}

func (r *RoomKeysType) DeleteKey(key string) bool {
if !saveInLocal {
return r.redisCli.Del(key).Err() != nil
}

channel, ok := r.localCache.Get(key)
if ok {
r.localCache.Delete(channel.(string))
r.localCache.Delete(key)
return true
}
return false
}
Loading

0 comments on commit c8adb90

Please sign in to comment.