Skip to content

Commit

Permalink
feat: add initial codes written by claude sonnet
Browse files Browse the repository at this point in the history
  • Loading branch information
itzmanish committed Sep 15, 2024
1 parent 277c58b commit 4d3237b
Show file tree
Hide file tree
Showing 18 changed files with 1,207 additions and 0 deletions.
13 changes: 13 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/astscale
/dialog
/goproxy
/go.work*
/gophone
/soundplayer
/.vscode
/g711
/pbx
/rtp_session*
/rtp_playout_buffer*
/MEDIA_QUALITY.md
/asterisk-sounds-wav
62 changes: 62 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package main

import (
"context"
"os"
"os/signal"
"syscall"
"time"

"net/http"

"github.com/itzmanish/sipnexus"
"github.com/itzmanish/sipnexus/pkg/logger"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func main() {
// Initialize logger
log := logger.NewLogger()

// Create a new SIP server
sipServer, err := sipnexus.NewServer(log)
if err != nil {
log.Fatal("Failed to create SIP server: " + err.Error())
}

// Start Prometheus metrics server
http.Handle("/metrics", promhttp.Handler())
go http.ListenAndServe(":8080", nil)

// Start the SIP server
go func() {
if err := sipServer.Start(); err != nil {
log.Fatal("Failed to start SIP server: " + err.Error())
}
}()

// Initialize and connect components
sipServer.mediaGateway.SetTranscodingService(sipServer.transcodingService)

// Start DTMF handler
sipServer.dtmfHandler.Start()

// Wait for interrupt signal to gracefully shutdown the server
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit

log.Info("Shutting down server...")

// Create a deadline to wait for
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// Doesn't block if no connections, but will otherwise wait
// until the timeout deadline.
if err := sipServer.Shutdown(ctx); err != nil {
log.Error("Server forced to shutdown: " + err.Error())
}

log.Info("Server exiting")
}
156 changes: 156 additions & 0 deletions conferencing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package sipnexus

import (
"fmt"
"sync"

muxgo "github.com/itzmanish/audio-multiplexer-go"
"github.com/itzmanish/sipnexus/pkg/logger"
"github.com/pion/rtp"
)

type ConferenceRoom struct {
ID string
Participants map[string]*Participant
Mixer *muxgo.Mixer
mu sync.RWMutex
}

type Participant struct {
ID string
Stream chan *rtp.Packet
}

type ConferencingService struct {
logger logger.Logger
rooms map[string]*ConferenceRoom
mu sync.RWMutex
}

func NewConferencingService(logger logger.Logger) *ConferencingService {
return &ConferencingService{
logger: logger,
rooms: make(map[string]*ConferenceRoom),
}
}

func (c *ConferencingService) CreateRoom(roomID string) error {
c.mu.Lock()
defer c.mu.Unlock()

if _, exists := c.rooms[roomID]; exists {
return fmt.Errorf("room %s already exists", roomID)
}

mixer := muxgo.NewMixer(8000, 20) // Assuming 8kHz sample rate and 20ms frame size
c.rooms[roomID] = &ConferenceRoom{
ID: roomID,
Participants: make(map[string]*Participant),
Mixer: mixer,
}

c.logger.Info(fmt.Sprintf("Created conference room: %s", roomID))
return nil
}

func (c *ConferencingService) JoinRoom(roomID, participantID string) (chan *rtp.Packet, error) {
c.mu.Lock()
defer c.mu.Unlock()

room, exists := c.rooms[roomID]
if !exists {
return nil, fmt.Errorf("room %s does not exist", roomID)
}

if _, exists := room.Participants[participantID]; exists {
return nil, fmt.Errorf("participant %s already in room %s", participantID, roomID)
}

stream := make(chan *rtp.Packet, 100)
room.Participants[participantID] = &Participant{
ID: participantID,
Stream: stream,
}

room.Mixer.AddSource(participantID)

c.logger.Info(fmt.Sprintf("Participant %s joined room %s", participantID, roomID))
return stream, nil
}

func (c *ConferencingService) LeaveRoom(roomID, participantID string) error {
c.mu.Lock()
defer c.mu.Unlock()

room, exists := c.rooms[roomID]
if !exists {
return fmt.Errorf("room %s does not exist", roomID)
}

if _, exists := room.Participants[participantID]; !exists {
return fmt.Errorf("participant %s not in room %s", participantID, roomID)
}

close(room.Participants[participantID].Stream)
delete(room.Participants, participantID)
room.Mixer.RemoveSource(participantID)

c.logger.Info(fmt.Sprintf("Participant %s left room %s", participantID, roomID))
return nil
}

func (c *ConferencingService) ProcessRTPPacket(roomID string, participantID string, packet *rtp.Packet) {
c.mu.RLock()
room, exists := c.rooms[roomID]
c.mu.RUnlock()

if !exists {
c.logger.Warn("Attempt to process RTP packet for non-existent room: " + roomID)
return
}

room.mu.Lock()
defer room.mu.Unlock()

// Convert RTP packet payload to audio samples
samples := c.rtpToSamples(packet)

// Add samples to the mixer
room.Mixer.AddSamples(participantID, samples)

// Mix the audio
mixedSamples := room.Mixer.Mix()

// Convert mixed samples back to RTP packet
mixedPacket := c.samplesToRTP(mixedSamples, packet.Header)

// Send the mixed audio to all participants except the sender
for id, participant := range room.Participants {
if id != participantID {
select {
case participant.Stream <- mixedPacket:
default:
c.logger.Warn("Participant buffer full, dropping packet: " + id)
}
}
}
}

func (c *ConferencingService) rtpToSamples(packet *rtp.Packet) []float32 {
samples := make([]float32, len(packet.Payload))
for i, b := range packet.Payload {
samples[i] = float32(int16(b<<8)) / 32768.0
}
return samples
}

func (c *ConferencingService) samplesToRTP(samples []float32, header rtp.Header) *rtp.Packet {
payload := make([]byte, len(samples))
for i, s := range samples {
payload[i] = byte(int16(s*32768.0) >> 8)
}
return &rtp.Packet{
Header: header,
Payload: payload,
}
}
62 changes: 62 additions & 0 deletions consistent_hash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package sipnexus

import (
"fmt"
"hash/fnv"
"sort"
"sync"
)

type ConsistentHash struct {
circle map[uint32]string
sortedHashes []uint32
virtualNodes int
mu sync.RWMutex
}

func NewConsistentHash(virtualNodes int) *ConsistentHash {
return &ConsistentHash{
circle: make(map[uint32]string),
virtualNodes: virtualNodes,
}
}

func (c *ConsistentHash) Add(node string) {
c.mu.Lock()
defer c.mu.Unlock()

for i := 0; i < c.virtualNodes; i++ {
hash := c.hash(fmt.Sprintf("%s:%d", node, i))
c.circle[hash] = node
c.sortedHashes = append(c.sortedHashes, hash)
}
sort.Slice(c.sortedHashes, func(i, j int) bool {
return c.sortedHashes[i] < c.sortedHashes[j]
})
}

func (c *ConsistentHash) Get(key string) string {
c.mu.RLock()
defer c.mu.RUnlock()

if len(c.circle) == 0 {
return ""
}

hash := c.hash(key)
idx := sort.Search(len(c.sortedHashes), func(i int) bool {
return c.sortedHashes[i] >= hash
})

if idx == len(c.sortedHashes) {
idx = 0
}

return c.circle[c.sortedHashes[idx]]
}

func (c *ConsistentHash) hash(key string) uint32 {
h := fnv.New32a()
h.Write([]byte(key))
return h.Sum32()
}
Loading

0 comments on commit 4d3237b

Please sign in to comment.