-
Notifications
You must be signed in to change notification settings - Fork 0
/
conferencing.go
156 lines (125 loc) · 3.77 KB
/
conferencing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package sipnexus
import (
"fmt"
"sync"
muxgo "github.com/itzmanish/audio-multiplexer-go"
"github.com/itzmanish/sipnexus/pkg/logger"
"github.com/pion/rtp"
)
type ConferenceRoom struct {
ID string
Participants map[string]*Participant
Mixer *muxgo.Mixer
mu sync.RWMutex
}
type Participant struct {
ID string
Stream chan *rtp.Packet
}
type ConferencingService struct {
logger logger.Logger
rooms map[string]*ConferenceRoom
mu sync.RWMutex
}
func NewConferencingService(logger logger.Logger) *ConferencingService {
return &ConferencingService{
logger: logger,
rooms: make(map[string]*ConferenceRoom),
}
}
func (c *ConferencingService) CreateRoom(roomID string) error {
c.mu.Lock()
defer c.mu.Unlock()
if _, exists := c.rooms[roomID]; exists {
return fmt.Errorf("room %s already exists", roomID)
}
mixer := muxgo.NewMixer(8000, 20) // Assuming 8kHz sample rate and 20ms frame size
c.rooms[roomID] = &ConferenceRoom{
ID: roomID,
Participants: make(map[string]*Participant),
Mixer: mixer,
}
c.logger.Info(fmt.Sprintf("Created conference room: %s", roomID))
return nil
}
func (c *ConferencingService) JoinRoom(roomID, participantID string) (chan *rtp.Packet, error) {
c.mu.Lock()
defer c.mu.Unlock()
room, exists := c.rooms[roomID]
if !exists {
return nil, fmt.Errorf("room %s does not exist", roomID)
}
if _, exists := room.Participants[participantID]; exists {
return nil, fmt.Errorf("participant %s already in room %s", participantID, roomID)
}
stream := make(chan *rtp.Packet, 100)
room.Participants[participantID] = &Participant{
ID: participantID,
Stream: stream,
}
room.Mixer.AddSource(participantID)
c.logger.Info(fmt.Sprintf("Participant %s joined room %s", participantID, roomID))
return stream, nil
}
func (c *ConferencingService) LeaveRoom(roomID, participantID string) error {
c.mu.Lock()
defer c.mu.Unlock()
room, exists := c.rooms[roomID]
if !exists {
return fmt.Errorf("room %s does not exist", roomID)
}
if _, exists := room.Participants[participantID]; !exists {
return fmt.Errorf("participant %s not in room %s", participantID, roomID)
}
close(room.Participants[participantID].Stream)
delete(room.Participants, participantID)
room.Mixer.RemoveSource(participantID)
c.logger.Info(fmt.Sprintf("Participant %s left room %s", participantID, roomID))
return nil
}
func (c *ConferencingService) ProcessRTPPacket(roomID string, participantID string, packet *rtp.Packet) {
c.mu.RLock()
room, exists := c.rooms[roomID]
c.mu.RUnlock()
if !exists {
c.logger.Warn("Attempt to process RTP packet for non-existent room: " + roomID)
return
}
room.mu.Lock()
defer room.mu.Unlock()
// Convert RTP packet payload to audio samples
samples := c.rtpToSamples(packet)
// Add samples to the mixer
room.Mixer.AddSamples(participantID, samples)
// Mix the audio
mixedSamples := room.Mixer.Mix()
// Convert mixed samples back to RTP packet
mixedPacket := c.samplesToRTP(mixedSamples, packet.Header)
// Send the mixed audio to all participants except the sender
for id, participant := range room.Participants {
if id != participantID {
select {
case participant.Stream <- mixedPacket:
default:
c.logger.Warn("Participant buffer full, dropping packet: " + id)
}
}
}
}
func (c *ConferencingService) rtpToSamples(packet *rtp.Packet) []float32 {
samples := make([]float32, len(packet.Payload))
for i, b := range packet.Payload {
samples[i] = float32(int16(b<<8)) / 32768.0
}
return samples
}
func (c *ConferencingService) samplesToRTP(samples []float32, header rtp.Header) *rtp.Packet {
payload := make([]byte, len(samples))
for i, s := range samples {
payload[i] = byte(int16(s*32768.0) >> 8)
}
return &rtp.Packet{
Header: header,
Payload: payload,
}
}