Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel execution of Benchmark #124

Merged
merged 6 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 43 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ from diart.sinks import RTTMWriter

pipeline = OnlineSpeakerDiarization()
mic = MicrophoneAudioSource(pipeline.config.sample_rate)
inference = RealTimeInference(pipeline, mic, do_plot=True)
inference = RealTimeInference(pipeline, mic)
inference.attach_observers(RTTMWriter(mic.uri, "/output/file.rttm"))
prediction = inference()
```
Expand All @@ -127,28 +127,45 @@ For inference and evaluation on a dataset we recommend to use `Benchmark` (see n

## Custom models

Third-party models can be integrated seamlessly by subclassing `SegmentationModel` and `EmbeddingModel`:
Third-party models can be integrated seamlessly by subclassing `SegmentationModel` and `EmbeddingModel` (which are PyTorch `Module` subclasses):

```python
import torch
from typing import Optional
from diart import OnlineSpeakerDiarization, PipelineConfig
from diart.models import EmbeddingModel
from diart.models import EmbeddingModel, SegmentationModel
from diart.sources import MicrophoneAudioSource
from diart.inference import RealTimeInference


def model_loader():
return load_pretrained_model("my_model.ckpt")


class MySegmentationModel(SegmentationModel):
def __init__(self):
super().__init__(model_loader)

@property
def sample_rate(self) -> int:
return 16000

@property
def duration(self) -> float:
return 2 # seconds

def forward(self, waveform):
# self.model is created lazily
return self.model(waveform)


class MyEmbeddingModel(EmbeddingModel):
def __init__(self):
super().__init__()
self.my_pretrained_model = load("my_model.ckpt")
super().__init__(model_loader)

def __call__(
self,
waveform: torch.Tensor,
weights: Optional[torch.Tensor] = None
) -> torch.Tensor:
return self.my_pretrained_model(waveform, weights)
def forward(self, waveform, weights):
# self.model is created lazily
return self.model(waveform, weights)


config = PipelineConfig(embedding=MyEmbeddingModel())
pipeline = OnlineSpeakerDiarization(config)
mic = MicrophoneAudioSource(config.sample_rate)
Expand Down Expand Up @@ -225,7 +242,7 @@ from diart.blocks import SpeakerSegmentation, OverlapAwareSpeakerEmbedding

segmentation = SpeakerSegmentation.from_pyannote("pyannote/segmentation")
embedding = OverlapAwareSpeakerEmbedding.from_pyannote("pyannote/embedding")
sample_rate = segmentation.model.get_sample_rate()
sample_rate = segmentation.model.sample_rate
mic = MicrophoneAudioSource(sample_rate)

stream = mic.stream.pipe(
Expand Down Expand Up @@ -318,22 +335,29 @@ diart.benchmark /wav/dir --reference /rttm/dir --tau=0.555 --rho=0.422 --delta=1
or using the inference API:

```python
from diart.inference import Benchmark
from diart.inference import Benchmark, Parallelize
from diart import OnlineSpeakerDiarization, PipelineConfig
from diart.models import SegmentationModel

benchmark = Benchmark("/wav/dir", "/rttm/dir")

name = "pyannote/segmentation@Interspeech2021"
segmentation = SegmentationModel.from_pyannote(name)
config = PipelineConfig(
# Set the model used in the paper
segmentation=SegmentationModel.from_pyannote("pyannote/segmentation@Interspeech2021"),
segmentation=segmentation,
step=0.5,
latency=0.5,
tau_active=0.555,
rho_update=0.422,
delta_new=1.517
)
pipeline = OnlineSpeakerDiarization(config)
benchmark = Benchmark("/wav/dir", "/rttm/dir")
benchmark(pipeline)
benchmark(OnlineSpeakerDiarization, config)

# Run the same benchmark in parallel
p_benchmark = Parallelize(benchmark, num_workers=4)
if __name__ == "__main__": # Needed for multiprocessing
p_benchmark(OnlineSpeakerDiarization, config)
```

This pre-calculates model outputs in batches, so it runs a lot faster.
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ pyannote.database>=4.1.1
pyannote.metrics>=3.2
optuna>=2.10
websockets>=10.3
rich>=12.5.1
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ install_requires=
pyannote.metrics>=3.2
optuna>=2.10
websockets>=10.3
rich>=12.5.1

[options.packages.find]
where=src
Expand Down
1 change: 1 addition & 0 deletions src/diart/argdoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@
MAX_SPEAKERS = "Maximum number of speakers"
CPU = "Force models to run on CPU"
BATCH_SIZE = "For segmentation and embedding pre-calculation. If BATCH_SIZE < 2, run fully online and estimate real-time latency"
NUM_WORKERS = "Number of parallel workers"
OUTPUT = "Directory to store the system's output in RTTM format"
HF_TOKEN = "Huggingface authentication token for hosted models ('true' | 'false' | <token>). If 'true', it will use the token from huggingface-cli login"
21 changes: 9 additions & 12 deletions src/diart/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@

import diart.argdoc as argdoc
import pandas as pd
import torch
from diart import utils
from diart.blocks import OnlineSpeakerDiarization, PipelineConfig
from diart.inference import Benchmark
from diart.models import SegmentationModel, EmbeddingModel
from diart.inference import Benchmark, Parallelize


def run():
Expand All @@ -28,18 +25,14 @@ def run():
parser.add_argument("--beta", default=10, type=float, help=f"{argdoc.BETA}. Defaults to 10")
parser.add_argument("--max-speakers", default=20, type=int, help=f"{argdoc.MAX_SPEAKERS}. Defaults to 20")
parser.add_argument("--batch-size", default=32, type=int, help=f"{argdoc.BATCH_SIZE}. Defaults to 32")
parser.add_argument("--num-workers", default=0, type=int,
help=f"{argdoc.NUM_WORKERS}. Defaults to 0 (no parallelism)")
parser.add_argument("--cpu", dest="cpu", action="store_true",
help=f"{argdoc.CPU}. Defaults to GPU if available, CPU otherwise")
parser.add_argument("--output", type=Path, help=f"{argdoc.OUTPUT}. Defaults to no writing")
parser.add_argument("--hf-token", default="true", type=str,
help=f"{argdoc.HF_TOKEN}. Defaults to 'true' (required by pyannote)")
args = parser.parse_args()
args.device = torch.device("cpu") if args.cpu else None
args.hf_token = utils.parse_hf_token_arg(args.hf_token)

# Download pyannote models (or get from cache)
args.segmentation = SegmentationModel.from_pyannote(args.segmentation, args.hf_token)
args.embedding = EmbeddingModel.from_pyannote(args.embedding, args.hf_token)

benchmark = Benchmark(
args.root,
Expand All @@ -50,8 +43,12 @@ def run():
batch_size=args.batch_size,
)

pipeline = OnlineSpeakerDiarization(PipelineConfig.from_namespace(args))
report = benchmark(pipeline)
config = PipelineConfig.from_dict(vars(args))
if args.num_workers > 0:
benchmark = Parallelize(benchmark, args.num_workers)

report = benchmark(OnlineSpeakerDiarization, config)

if args.output is not None and isinstance(report, pd.DataFrame):
report.to_csv(args.output / "benchmark_report.csv")

Expand Down
8 changes: 2 additions & 6 deletions src/diart/blocks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@
OverlapAwareSpeakerEmbedding,
)
from .segmentation import SpeakerSegmentation
from .diarization import (
OnlineSpeakerDiarization,
BasePipeline,
PipelineConfig,
BasePipelineConfig,
)
from .diarization import OnlineSpeakerDiarization, BasePipeline
from .config import BasePipelineConfig, PipelineConfig
from .utils import Binarize, Resample, AdjustVolume
142 changes: 142 additions & 0 deletions src/diart/blocks/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
from typing import Any, Optional, Union

import torch
from typing_extensions import Literal

from .. import models as m
from .. import utils


class BasePipelineConfig:
@property
def duration(self) -> float:
raise NotImplementedError

@property
def step(self) -> float:
raise NotImplementedError

@property
def latency(self) -> float:
raise NotImplementedError

@property
def sample_rate(self) -> int:
raise NotImplementedError

@staticmethod
def from_dict(data: Any) -> 'BasePipelineConfig':
raise NotImplementedError


class PipelineConfig(BasePipelineConfig):
def __init__(
self,
segmentation: Optional[m.SegmentationModel] = None,
embedding: Optional[m.EmbeddingModel] = None,
duration: Optional[float] = None,
step: float = 0.5,
latency: Optional[Union[float, Literal["max", "min"]]] = None,
tau_active: float = 0.6,
rho_update: float = 0.3,
delta_new: float = 1,
gamma: float = 3,
beta: float = 10,
max_speakers: int = 20,
device: Optional[torch.device] = None,
**kwargs,
):
# Default segmentation model is pyannote/segmentation
self.segmentation = segmentation
if self.segmentation is None:
self.segmentation = m.SegmentationModel.from_pyannote("pyannote/segmentation")

# Default duration is the one given by the segmentation model
self._duration = duration

# Expected sample rate is given by the segmentation model
self._sample_rate: Optional[int] = None

# Default embedding model is pyannote/embedding
self.embedding = embedding
if self.embedding is None:
self.embedding = m.EmbeddingModel.from_pyannote("pyannote/embedding")

# Latency defaults to the step duration
self._step = step
self._latency = latency
if self._latency is None or self._latency == "min":
self._latency = self._step
elif self._latency == "max":
self._latency = self._duration

self.tau_active = tau_active
self.rho_update = rho_update
self.delta_new = delta_new
self.gamma = gamma
self.beta = beta
self.max_speakers = max_speakers

self.device = device
if self.device is None:
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

@staticmethod
def from_dict(data: Any) -> 'PipelineConfig':
# Check for explicit device, otherwise check for 'cpu' bool, otherwise pass None
device = utils.get(data, "device", None)
if device is None:
device = torch.device("cpu") if utils.get(data, "cpu", False) else None

# Instantiate models
hf_token = utils.parse_hf_token_arg(utils.get(data, "hf_token", True))
segmentation = utils.get(data, "segmentation", "pyannote/segmentation")
segmentation = m.SegmentationModel.from_pyannote(segmentation, hf_token)
embedding = utils.get(data, "embedding", "pyannote/embedding")
embedding = m.EmbeddingModel.from_pyannote(embedding, hf_token)

# Hyper-parameters and their aliases
tau = utils.get(data, "tau_active", None)
if tau is None:
tau = utils.get(data, "tau", 0.6)
rho = utils.get(data, "rho_update", None)
if rho is None:
rho = utils.get(data, "rho", 0.3)
delta = utils.get(data, "delta_new", None)
if delta is None:
delta = utils.get(data, "delta", 1)

return PipelineConfig(
segmentation=segmentation,
embedding=embedding,
duration=utils.get(data, "duration", None),
step=utils.get(data, "step", 0.5),
latency=utils.get(data, "latency", None),
tau_active=tau,
rho_update=rho,
delta_new=delta,
gamma=utils.get(data, "gamma", 3),
beta=utils.get(data, "beta", 10),
max_speakers=utils.get(data, "max_speakers", 20),
device=device,
)

@property
def duration(self) -> float:
if self._duration is None:
self._duration = self.segmentation.duration
return self._duration

@property
def step(self) -> float:
return self._step

@property
def latency(self) -> float:
return self._latency

@property
def sample_rate(self) -> int:
if self._sample_rate is None:
self._sample_rate = self.segmentation.sample_rate
return self._sample_rate
Loading