mirror of
https://github.com/m-bain/whisperX.git
synced 2025-07-01 18:17:27 -04:00
Merge branch 'main' into main
This commit is contained in:
@ -278,7 +278,7 @@ Bug finding and pull requests are also highly appreciated to keep this project g
|
|||||||
|
|
||||||
* [ ] Add benchmarking code (TEDLIUM for spd/WER & word segmentation)
|
* [ ] Add benchmarking code (TEDLIUM for spd/WER & word segmentation)
|
||||||
|
|
||||||
* [ ] Allow silero-vad as alternative VAD option
|
* [x] Allow silero-vad as alternative VAD option
|
||||||
|
|
||||||
* [ ] Improve diarization (word level). *Harder than first thought...*
|
* [ ] Improve diarization (word level). *Harder than first thought...*
|
||||||
|
|
||||||
@ -300,7 +300,9 @@ Borrows important alignment code from [PyTorch tutorial on forced alignment](htt
|
|||||||
And uses the wonderful pyannote VAD / Diarization https://github.com/pyannote/pyannote-audio
|
And uses the wonderful pyannote VAD / Diarization https://github.com/pyannote/pyannote-audio
|
||||||
|
|
||||||
|
|
||||||
Valuable VAD & Diarization Models from [pyannote audio](https://github.com/pyannote/pyannote-audio)
|
Valuable VAD & Diarization Models from:
|
||||||
|
- [pyannote audio][https://github.com/pyannote/pyannote-audio]
|
||||||
|
- [silero vad][https://github.com/snakers4/silero-vad]
|
||||||
|
|
||||||
Great backend from [faster-whisper](https://github.com/guillaumekln/faster-whisper) and [CTranslate2](https://github.com/OpenNMT/CTranslate2)
|
Great backend from [faster-whisper](https://github.com/guillaumekln/faster-whisper) and [CTranslate2](https://github.com/OpenNMT/CTranslate2)
|
||||||
|
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
from .transcribe import load_model
|
|
||||||
from .alignment import load_align_model, align
|
from .alignment import load_align_model, align
|
||||||
from .audio import load_audio
|
from .audio import load_audio
|
||||||
from .diarize import assign_word_speakers, DiarizationPipeline
|
from .diarize import assign_word_speakers, DiarizationPipeline
|
||||||
|
from .asr import load_model
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
""""
|
"""
|
||||||
Forced Alignment with Whisper
|
Forced Alignment with Whisper
|
||||||
C. Max Bain
|
C. Max Bain
|
||||||
"""
|
"""
|
||||||
@ -15,8 +15,13 @@ from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor
|
|||||||
|
|
||||||
from .audio import SAMPLE_RATE, load_audio
|
from .audio import SAMPLE_RATE, load_audio
|
||||||
from .utils import interpolate_nans
|
from .utils import interpolate_nans
|
||||||
from .types import AlignedTranscriptionResult, SingleSegment, SingleAlignedSegment, SingleWordSegment
|
from .types import (
|
||||||
import nltk
|
AlignedTranscriptionResult,
|
||||||
|
SingleSegment,
|
||||||
|
SingleAlignedSegment,
|
||||||
|
SingleWordSegment,
|
||||||
|
SegmentData,
|
||||||
|
)
|
||||||
from nltk.tokenize.punkt import PunktSentenceTokenizer, PunktParameters
|
from nltk.tokenize.punkt import PunktSentenceTokenizer, PunktParameters
|
||||||
|
|
||||||
PUNKT_ABBREVIATIONS = ['dr', 'vs', 'mr', 'mrs', 'prof']
|
PUNKT_ABBREVIATIONS = ['dr', 'vs', 'mr', 'mrs', 'prof']
|
||||||
@ -133,6 +138,8 @@ def align(
|
|||||||
|
|
||||||
# 1. Preprocess to keep only characters in dictionary
|
# 1. Preprocess to keep only characters in dictionary
|
||||||
total_segments = len(transcript)
|
total_segments = len(transcript)
|
||||||
|
# Store temporary processing values
|
||||||
|
segment_data: dict[int, SegmentData] = {}
|
||||||
for sdx, segment in enumerate(transcript):
|
for sdx, segment in enumerate(transcript):
|
||||||
# strip spaces at beginning / end, but keep track of the amount.
|
# strip spaces at beginning / end, but keep track of the amount.
|
||||||
if print_progress:
|
if print_progress:
|
||||||
@ -184,10 +191,12 @@ def align(
|
|||||||
sentence_splitter = PunktSentenceTokenizer(punkt_param)
|
sentence_splitter = PunktSentenceTokenizer(punkt_param)
|
||||||
sentence_spans = list(sentence_splitter.span_tokenize(text))
|
sentence_spans = list(sentence_splitter.span_tokenize(text))
|
||||||
|
|
||||||
segment["clean_char"] = clean_char
|
segment_data[sdx] = {
|
||||||
segment["clean_cdx"] = clean_cdx
|
"clean_char": clean_char,
|
||||||
segment["clean_wdx"] = clean_wdx
|
"clean_cdx": clean_cdx,
|
||||||
segment["sentence_spans"] = sentence_spans
|
"clean_wdx": clean_wdx,
|
||||||
|
"sentence_spans": sentence_spans
|
||||||
|
}
|
||||||
|
|
||||||
aligned_segments: List[SingleAlignedSegment] = []
|
aligned_segments: List[SingleAlignedSegment] = []
|
||||||
|
|
||||||
@ -203,13 +212,14 @@ def align(
|
|||||||
"end": t2,
|
"end": t2,
|
||||||
"text": text,
|
"text": text,
|
||||||
"words": [],
|
"words": [],
|
||||||
|
"chars": None,
|
||||||
}
|
}
|
||||||
|
|
||||||
if return_char_alignments:
|
if return_char_alignments:
|
||||||
aligned_seg["chars"] = []
|
aligned_seg["chars"] = []
|
||||||
|
|
||||||
# check we can align
|
# check we can align
|
||||||
if len(segment["clean_char"]) == 0:
|
if len(segment_data[sdx]["clean_char"]) == 0:
|
||||||
print(f'Failed to align segment ("{segment["text"]}"): no characters in this segment found in model dictionary, resorting to original...')
|
print(f'Failed to align segment ("{segment["text"]}"): no characters in this segment found in model dictionary, resorting to original...')
|
||||||
aligned_segments.append(aligned_seg)
|
aligned_segments.append(aligned_seg)
|
||||||
continue
|
continue
|
||||||
@ -219,8 +229,8 @@ def align(
|
|||||||
aligned_segments.append(aligned_seg)
|
aligned_segments.append(aligned_seg)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
text_clean = "".join(segment["clean_char"])
|
text_clean = "".join(segment_data[sdx]["clean_char"])
|
||||||
tokens = [model_dictionary.get(c, -1) for c in text_clean]
|
tokens = [model_dictionary[c] for c in text_clean]
|
||||||
|
|
||||||
f1 = int(t1 * SAMPLE_RATE)
|
f1 = int(t1 * SAMPLE_RATE)
|
||||||
f2 = int(t2 * SAMPLE_RATE)
|
f2 = int(t2 * SAMPLE_RATE)
|
||||||
@ -271,8 +281,8 @@ def align(
|
|||||||
word_idx = 0
|
word_idx = 0
|
||||||
for cdx, char in enumerate(text):
|
for cdx, char in enumerate(text):
|
||||||
start, end, score = None, None, None
|
start, end, score = None, None, None
|
||||||
if cdx in segment["clean_cdx"]:
|
if cdx in segment_data[sdx]["clean_cdx"]:
|
||||||
char_seg = char_segments[segment["clean_cdx"].index(cdx)]
|
char_seg = char_segments[segment_data[sdx]["clean_cdx"].index(cdx)]
|
||||||
start = round(char_seg.start * ratio + t1, 3)
|
start = round(char_seg.start * ratio + t1, 3)
|
||||||
end = round(char_seg.end * ratio + t1, 3)
|
end = round(char_seg.end * ratio + t1, 3)
|
||||||
score = round(char_seg.score, 3)
|
score = round(char_seg.score, 3)
|
||||||
@ -298,9 +308,9 @@ def align(
|
|||||||
aligned_subsegments = []
|
aligned_subsegments = []
|
||||||
# assign sentence_idx to each character index
|
# assign sentence_idx to each character index
|
||||||
char_segments_arr["sentence-idx"] = None
|
char_segments_arr["sentence-idx"] = None
|
||||||
for sdx, (sstart, send) in enumerate(segment["sentence_spans"]):
|
for sdx2, (sstart, send) in enumerate(segment_data[sdx]["sentence_spans"]):
|
||||||
curr_chars = char_segments_arr.loc[(char_segments_arr.index >= sstart) & (char_segments_arr.index <= send)]
|
curr_chars = char_segments_arr.loc[(char_segments_arr.index >= sstart) & (char_segments_arr.index <= send)]
|
||||||
char_segments_arr.loc[(char_segments_arr.index >= sstart) & (char_segments_arr.index <= send), "sentence-idx"] = sdx
|
char_segments_arr.loc[(char_segments_arr.index >= sstart) & (char_segments_arr.index <= send), "sentence-idx"] = sdx2
|
||||||
|
|
||||||
sentence_text = text[sstart:send]
|
sentence_text = text[sstart:send]
|
||||||
sentence_start = curr_chars["start"].min()
|
sentence_start = curr_chars["start"].min()
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
import os
|
import os
|
||||||
import warnings
|
from typing import List, Optional, Union
|
||||||
from typing import List, NamedTuple, Optional, Union
|
|
||||||
from dataclasses import replace
|
from dataclasses import replace
|
||||||
|
|
||||||
import ctranslate2
|
import ctranslate2
|
||||||
@ -14,8 +13,7 @@ from transformers.pipelines.pt_utils import PipelineIterator
|
|||||||
|
|
||||||
from .audio import N_SAMPLES, SAMPLE_RATE, load_audio, log_mel_spectrogram
|
from .audio import N_SAMPLES, SAMPLE_RATE, load_audio, log_mel_spectrogram
|
||||||
from .types import SingleSegment, TranscriptionResult
|
from .types import SingleSegment, TranscriptionResult
|
||||||
from .vad import VoiceActivitySegmentation, load_vad_model, merge_chunks
|
from .vads import Vad, Silero, Pyannote
|
||||||
|
|
||||||
|
|
||||||
def find_numeral_symbol_tokens(tokenizer):
|
def find_numeral_symbol_tokens(tokenizer):
|
||||||
numeral_symbol_tokens = []
|
numeral_symbol_tokens = []
|
||||||
@ -106,7 +104,7 @@ class FasterWhisperPipeline(Pipeline):
|
|||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
model: WhisperModel,
|
model: WhisperModel,
|
||||||
vad: VoiceActivitySegmentation,
|
vad,
|
||||||
vad_params: dict,
|
vad_params: dict,
|
||||||
options: TranscriptionOptions,
|
options: TranscriptionOptions,
|
||||||
tokenizer: Optional[Tokenizer] = None,
|
tokenizer: Optional[Tokenizer] = None,
|
||||||
@ -208,7 +206,16 @@ class FasterWhisperPipeline(Pipeline):
|
|||||||
# print(f2-f1)
|
# print(f2-f1)
|
||||||
yield {'inputs': audio[f1:f2]}
|
yield {'inputs': audio[f1:f2]}
|
||||||
|
|
||||||
vad_segments = self.vad_model({"waveform": torch.from_numpy(audio).unsqueeze(0), "sample_rate": SAMPLE_RATE})
|
# Pre-process audio and merge chunks as defined by the respective VAD child class
|
||||||
|
# In case vad_model is manually assigned (see 'load_model') follow the functionality of pyannote toolkit
|
||||||
|
if issubclass(type(self.vad_model), Vad):
|
||||||
|
waveform = self.vad_model.preprocess_audio(audio)
|
||||||
|
merge_chunks = self.vad_model.merge_chunks
|
||||||
|
else:
|
||||||
|
waveform = Pyannote.preprocess_audio(audio)
|
||||||
|
merge_chunks = Pyannote.merge_chunks
|
||||||
|
|
||||||
|
vad_segments = self.vad_model({"waveform": waveform, "sample_rate": SAMPLE_RATE})
|
||||||
vad_segments = merge_chunks(
|
vad_segments = merge_chunks(
|
||||||
vad_segments,
|
vad_segments,
|
||||||
chunk_size,
|
chunk_size,
|
||||||
@ -296,7 +303,8 @@ def load_model(
|
|||||||
compute_type="float16",
|
compute_type="float16",
|
||||||
asr_options: Optional[dict] = None,
|
asr_options: Optional[dict] = None,
|
||||||
language: Optional[str] = None,
|
language: Optional[str] = None,
|
||||||
vad_model: Optional[VoiceActivitySegmentation] = None,
|
vad_model: Optional[Vad]= None,
|
||||||
|
vad_method: Optional[str] = "pyannote",
|
||||||
vad_options: Optional[dict] = None,
|
vad_options: Optional[dict] = None,
|
||||||
model: Optional[WhisperModel] = None,
|
model: Optional[WhisperModel] = None,
|
||||||
task="transcribe",
|
task="transcribe",
|
||||||
@ -309,6 +317,7 @@ def load_model(
|
|||||||
whisper_arch - The name of the Whisper model to load.
|
whisper_arch - The name of the Whisper model to load.
|
||||||
device - The device to load the model on.
|
device - The device to load the model on.
|
||||||
compute_type - The compute type to use for the model.
|
compute_type - The compute type to use for the model.
|
||||||
|
vad_method - The vad method to use. vad_model has higher priority if is not None.
|
||||||
options - A dictionary of options to use for the model.
|
options - A dictionary of options to use for the model.
|
||||||
language - The language of the model. (use English for now)
|
language - The language of the model. (use English for now)
|
||||||
model - The WhisperModel instance to use.
|
model - The WhisperModel instance to use.
|
||||||
@ -374,6 +383,7 @@ def load_model(
|
|||||||
default_asr_options = TranscriptionOptions(**default_asr_options)
|
default_asr_options = TranscriptionOptions(**default_asr_options)
|
||||||
|
|
||||||
default_vad_options = {
|
default_vad_options = {
|
||||||
|
"chunk_size": 30, # needed by silero since binarization happens before merge_chunks
|
||||||
"vad_onset": 0.500,
|
"vad_onset": 0.500,
|
||||||
"vad_offset": 0.363
|
"vad_offset": 0.363
|
||||||
}
|
}
|
||||||
@ -381,10 +391,17 @@ def load_model(
|
|||||||
if vad_options is not None:
|
if vad_options is not None:
|
||||||
default_vad_options.update(vad_options)
|
default_vad_options.update(vad_options)
|
||||||
|
|
||||||
|
# Note: manually assigned vad_model has higher priority than vad_method!
|
||||||
if vad_model is not None:
|
if vad_model is not None:
|
||||||
|
print("Use manually assigned vad_model. vad_method is ignored.")
|
||||||
vad_model = vad_model
|
vad_model = vad_model
|
||||||
else:
|
else:
|
||||||
vad_model = load_vad_model(torch.device(device), use_auth_token=None, **default_vad_options)
|
if vad_method == "silero":
|
||||||
|
vad_model = Silero(**default_vad_options)
|
||||||
|
elif vad_method == "pyannote":
|
||||||
|
vad_model = Pyannote(torch.device(device), use_auth_token=None, **default_vad_options)
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Invalid vad_method: {vad_method}")
|
||||||
|
|
||||||
return FasterWhisperPipeline(
|
return FasterWhisperPipeline(
|
||||||
model=model,
|
model=model,
|
||||||
|
@ -79,7 +79,7 @@ def assign_word_speakers(
|
|||||||
|
|
||||||
|
|
||||||
class Segment:
|
class Segment:
|
||||||
def __init__(self, start, end, speaker=None):
|
def __init__(self, start:int, end:int, speaker:Optional[str]=None):
|
||||||
self.start = start
|
self.start = start
|
||||||
self.end = end
|
self.end = end
|
||||||
self.speaker = speaker
|
self.speaker = speaker
|
||||||
|
@ -46,6 +46,7 @@ def cli():
|
|||||||
parser.add_argument("--return_char_alignments", action='store_true', help="Return character-level alignments in the output json file")
|
parser.add_argument("--return_char_alignments", action='store_true', help="Return character-level alignments in the output json file")
|
||||||
|
|
||||||
# vad params
|
# vad params
|
||||||
|
parser.add_argument("--vad_method", type=str, default="pyannote", choices=["pyannote", "silero"], help="VAD method to be used")
|
||||||
parser.add_argument("--vad_onset", type=float, default=0.500, help="Onset threshold for VAD (see pyannote.audio), reduce this if speech is not being detected")
|
parser.add_argument("--vad_onset", type=float, default=0.500, help="Onset threshold for VAD (see pyannote.audio), reduce this if speech is not being detected")
|
||||||
parser.add_argument("--vad_offset", type=float, default=0.363, help="Offset threshold for VAD (see pyannote.audio), reduce this if speech is not being detected.")
|
parser.add_argument("--vad_offset", type=float, default=0.363, help="Offset threshold for VAD (see pyannote.audio), reduce this if speech is not being detected.")
|
||||||
parser.add_argument("--chunk_size", type=int, default=30, help="Chunk size for merging VAD segments. Default is 30, reduce this if the chunk is too long.")
|
parser.add_argument("--chunk_size", type=int, default=30, help="Chunk size for merging VAD segments. Default is 30, reduce this if the chunk is too long.")
|
||||||
@ -110,6 +111,7 @@ def cli():
|
|||||||
return_char_alignments: bool = args.pop("return_char_alignments")
|
return_char_alignments: bool = args.pop("return_char_alignments")
|
||||||
|
|
||||||
hf_token: str = args.pop("hf_token")
|
hf_token: str = args.pop("hf_token")
|
||||||
|
vad_method: str = args.pop("vad_method")
|
||||||
vad_onset: float = args.pop("vad_onset")
|
vad_onset: float = args.pop("vad_onset")
|
||||||
vad_offset: float = args.pop("vad_offset")
|
vad_offset: float = args.pop("vad_offset")
|
||||||
|
|
||||||
@ -175,7 +177,7 @@ def cli():
|
|||||||
results = []
|
results = []
|
||||||
tmp_results = []
|
tmp_results = []
|
||||||
# model = load_model(model_name, device=device, download_root=model_dir)
|
# model = load_model(model_name, device=device, download_root=model_dir)
|
||||||
model = load_model(model_name, device=device, device_index=device_index, download_root=model_dir, compute_type=compute_type, language=args['language'], asr_options=asr_options, vad_options={"vad_onset": vad_onset, "vad_offset": vad_offset}, task=task, threads=faster_whisper_threads)
|
model = load_model(model_name, device=device, device_index=device_index, download_root=model_dir, compute_type=compute_type, language=args['language'], asr_options=asr_options, vad_method=vad_method, vad_options={"chunk_size":chunk_size, "vad_onset": vad_onset, "vad_offset": vad_offset}, task=task, threads=faster_whisper_threads)
|
||||||
|
|
||||||
for audio_path in args.pop("audio"):
|
for audio_path in args.pop("audio"):
|
||||||
audio = load_audio(audio_path)
|
audio = load_audio(audio_path)
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
from typing import TypedDict, Optional, List
|
from typing import TypedDict, Optional, List, Tuple
|
||||||
|
|
||||||
|
|
||||||
class SingleWordSegment(TypedDict):
|
class SingleWordSegment(TypedDict):
|
||||||
@ -30,6 +30,17 @@ class SingleSegment(TypedDict):
|
|||||||
text: str
|
text: str
|
||||||
|
|
||||||
|
|
||||||
|
class SegmentData(TypedDict):
|
||||||
|
"""
|
||||||
|
Temporary processing data used during alignment.
|
||||||
|
Contains cleaned and preprocessed data for each segment.
|
||||||
|
"""
|
||||||
|
clean_char: List[str] # Cleaned characters that exist in model dictionary
|
||||||
|
clean_cdx: List[int] # Original indices of cleaned characters
|
||||||
|
clean_wdx: List[int] # Indices of words containing valid characters
|
||||||
|
sentence_spans: List[Tuple[int, int]] # Start and end indices of sentences
|
||||||
|
|
||||||
|
|
||||||
class SingleAlignedSegment(TypedDict):
|
class SingleAlignedSegment(TypedDict):
|
||||||
"""
|
"""
|
||||||
A single segment (up to multiple sentences) of a speech with word alignment.
|
A single segment (up to multiple sentences) of a speech with word alignment.
|
||||||
|
@ -241,7 +241,7 @@ class SubtitlesWriter(ResultWriter):
|
|||||||
line_count = 1
|
line_count = 1
|
||||||
# the next subtitle to yield (a list of word timings with whitespace)
|
# the next subtitle to yield (a list of word timings with whitespace)
|
||||||
subtitle: list[dict] = []
|
subtitle: list[dict] = []
|
||||||
times = []
|
times: list[tuple] = []
|
||||||
last = result["segments"][0]["start"]
|
last = result["segments"][0]["start"]
|
||||||
for segment in result["segments"]:
|
for segment in result["segments"]:
|
||||||
for i, original_timing in enumerate(segment["words"]):
|
for i, original_timing in enumerate(segment["words"]):
|
||||||
|
3
whisperx/vads/__init__.py
Normal file
3
whisperx/vads/__init__.py
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
from whisperx.vads.pyannote import Pyannote
|
||||||
|
from whisperx.vads.silero import Silero
|
||||||
|
from whisperx.vads.vad import Vad
|
@ -1,19 +1,21 @@
|
|||||||
import hashlib
|
import hashlib
|
||||||
import os
|
import os
|
||||||
import urllib
|
import urllib
|
||||||
from typing import Callable, Optional, Text, Union
|
from typing import Callable, Text, Union
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
|
||||||
import torch
|
import torch
|
||||||
from pyannote.audio import Model
|
from pyannote.audio import Model
|
||||||
from pyannote.audio.core.io import AudioFile
|
from pyannote.audio.core.io import AudioFile
|
||||||
from pyannote.audio.pipelines import VoiceActivityDetection
|
from pyannote.audio.pipelines import VoiceActivityDetection
|
||||||
from pyannote.audio.pipelines.utils import PipelineModel
|
from pyannote.audio.pipelines.utils import PipelineModel
|
||||||
from pyannote.core import Annotation, Segment, SlidingWindowFeature
|
from pyannote.core import Annotation, SlidingWindowFeature
|
||||||
|
from pyannote.core import Segment
|
||||||
from tqdm import tqdm
|
from tqdm import tqdm
|
||||||
|
|
||||||
from .diarize import Segment as SegmentX
|
from whisperx.diarize import Segment as SegmentX
|
||||||
|
from whisperx.vads.vad import Vad
|
||||||
|
|
||||||
# deprecated
|
# deprecated
|
||||||
VAD_SEGMENTATION_URL = "https://whisperx.s3.eu-west-2.amazonaws.com/model_weights/segmentation/0b5b3216d60a2d32fc086b47ea8c67589aaeb26b7e07fcbe620d6d0b83e209ea/pytorch_model.bin"
|
VAD_SEGMENTATION_URL = "https://whisperx.s3.eu-west-2.amazonaws.com/model_weights/segmentation/0b5b3216d60a2d32fc086b47ea8c67589aaeb26b7e07fcbe620d6d0b83e209ea/pytorch_model.bin"
|
||||||
@ -21,12 +23,12 @@ VAD_SEGMENTATION_URL = "https://whisperx.s3.eu-west-2.amazonaws.com/model_weight
|
|||||||
def load_vad_model(device, vad_onset=0.500, vad_offset=0.363, use_auth_token=None, model_fp=None):
|
def load_vad_model(device, vad_onset=0.500, vad_offset=0.363, use_auth_token=None, model_fp=None):
|
||||||
model_dir = torch.hub._get_torch_home()
|
model_dir = torch.hub._get_torch_home()
|
||||||
|
|
||||||
vad_dir = os.path.dirname(os.path.abspath(__file__))
|
main_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||||
|
|
||||||
os.makedirs(model_dir, exist_ok = True)
|
os.makedirs(model_dir, exist_ok = True)
|
||||||
if model_fp is None:
|
if model_fp is None:
|
||||||
# Dynamically resolve the path to the model file
|
# Dynamically resolve the path to the model file
|
||||||
model_fp = os.path.join(vad_dir, "assets", "pytorch_model.bin")
|
model_fp = os.path.join(main_dir, "assets", "pytorch_model.bin")
|
||||||
model_fp = os.path.abspath(model_fp) # Ensure the path is absolute
|
model_fp = os.path.abspath(model_fp) # Ensure the path is absolute
|
||||||
else:
|
else:
|
||||||
model_fp = os.path.abspath(model_fp) # Ensure any provided path is absolute
|
model_fp = os.path.abspath(model_fp) # Ensure any provided path is absolute
|
||||||
@ -236,41 +238,26 @@ class VoiceActivitySegmentation(VoiceActivityDetection):
|
|||||||
return segmentations
|
return segmentations
|
||||||
|
|
||||||
|
|
||||||
def merge_vad(vad_arr, pad_onset=0.0, pad_offset=0.0, min_duration_off=0.0, min_duration_on=0.0):
|
class Pyannote(Vad):
|
||||||
|
|
||||||
active = Annotation()
|
def __init__(self, device, use_auth_token=None, model_fp=None, **kwargs):
|
||||||
for k, vad_t in enumerate(vad_arr):
|
print(">>Performing voice activity detection using Pyannote...")
|
||||||
region = Segment(vad_t[0] - pad_onset, vad_t[1] + pad_offset)
|
super().__init__(kwargs['vad_onset'])
|
||||||
active[region, k] = 1
|
self.vad_pipeline = load_vad_model(device, use_auth_token=use_auth_token, model_fp=model_fp)
|
||||||
|
|
||||||
|
def __call__(self, audio: AudioFile, **kwargs):
|
||||||
|
return self.vad_pipeline(audio)
|
||||||
|
|
||||||
if pad_offset > 0.0 or pad_onset > 0.0 or min_duration_off > 0.0:
|
@staticmethod
|
||||||
active = active.support(collar=min_duration_off)
|
def preprocess_audio(audio):
|
||||||
|
return torch.from_numpy(audio).unsqueeze(0)
|
||||||
|
|
||||||
# remove tracks shorter than min_duration_on
|
@staticmethod
|
||||||
if min_duration_on > 0:
|
def merge_chunks(segments,
|
||||||
for segment, track in list(active.itertracks()):
|
|
||||||
if segment.duration < min_duration_on:
|
|
||||||
del active[segment, track]
|
|
||||||
|
|
||||||
active = active.for_json()
|
|
||||||
active_segs = pd.DataFrame([x['segment'] for x in active['content']])
|
|
||||||
return active_segs
|
|
||||||
|
|
||||||
def merge_chunks(
|
|
||||||
segments,
|
|
||||||
chunk_size,
|
chunk_size,
|
||||||
onset: float = 0.5,
|
onset: float = 0.5,
|
||||||
offset: Optional[float] = None,
|
offset: Optional[float] = None,
|
||||||
):
|
):
|
||||||
"""
|
|
||||||
Merge operation described in paper
|
|
||||||
"""
|
|
||||||
curr_end = 0
|
|
||||||
merged_segments = []
|
|
||||||
seg_idxs = []
|
|
||||||
speaker_idxs = []
|
|
||||||
|
|
||||||
assert chunk_size > 0
|
assert chunk_size > 0
|
||||||
binarize = Binarize(max_duration=chunk_size, onset=onset, offset=offset)
|
binarize = Binarize(max_duration=chunk_size, onset=onset, offset=offset)
|
||||||
segments = binarize(segments)
|
segments = binarize(segments)
|
||||||
@ -281,27 +268,5 @@ def merge_chunks(
|
|||||||
if len(segments_list) == 0:
|
if len(segments_list) == 0:
|
||||||
print("No active speech found in audio")
|
print("No active speech found in audio")
|
||||||
return []
|
return []
|
||||||
# assert segments_list, "segments_list is empty."
|
assert segments_list, "segments_list is empty."
|
||||||
# Make sur the starting point is the start of the segment.
|
return Vad.merge_chunks(segments_list, chunk_size, onset, offset)
|
||||||
curr_start = segments_list[0].start
|
|
||||||
|
|
||||||
for seg in segments_list:
|
|
||||||
if seg.end - curr_start > chunk_size and curr_end-curr_start > 0:
|
|
||||||
merged_segments.append({
|
|
||||||
"start": curr_start,
|
|
||||||
"end": curr_end,
|
|
||||||
"segments": seg_idxs,
|
|
||||||
})
|
|
||||||
curr_start = seg.start
|
|
||||||
seg_idxs = []
|
|
||||||
speaker_idxs = []
|
|
||||||
curr_end = seg.end
|
|
||||||
seg_idxs.append((seg.start, seg.end))
|
|
||||||
speaker_idxs.append(seg.speaker)
|
|
||||||
# add final
|
|
||||||
merged_segments.append({
|
|
||||||
"start": curr_start,
|
|
||||||
"end": curr_end,
|
|
||||||
"segments": seg_idxs,
|
|
||||||
})
|
|
||||||
return merged_segments
|
|
62
whisperx/vads/silero.py
Normal file
62
whisperx/vads/silero.py
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
from io import IOBase
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Mapping, Text
|
||||||
|
from typing import Optional
|
||||||
|
from typing import Union
|
||||||
|
|
||||||
|
import torch
|
||||||
|
|
||||||
|
from whisperx.diarize import Segment as SegmentX
|
||||||
|
from whisperx.vads.vad import Vad
|
||||||
|
|
||||||
|
AudioFile = Union[Text, Path, IOBase, Mapping]
|
||||||
|
|
||||||
|
|
||||||
|
class Silero(Vad):
|
||||||
|
# check again default values
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
print(">>Performing voice activity detection using Silero...")
|
||||||
|
super().__init__(kwargs['vad_onset'])
|
||||||
|
|
||||||
|
self.vad_onset = kwargs['vad_onset']
|
||||||
|
self.chunk_size = kwargs['chunk_size']
|
||||||
|
self.vad_pipeline, vad_utils = torch.hub.load(repo_or_dir='snakers4/silero-vad',
|
||||||
|
model='silero_vad',
|
||||||
|
force_reload=False,
|
||||||
|
onnx=False,
|
||||||
|
trust_repo=True)
|
||||||
|
(self.get_speech_timestamps, _, self.read_audio, _, _) = vad_utils
|
||||||
|
|
||||||
|
def __call__(self, audio: AudioFile, **kwargs):
|
||||||
|
"""use silero to get segments of speech"""
|
||||||
|
# Only accept 16000 Hz for now.
|
||||||
|
# Note: Silero models support both 8000 and 16000 Hz. Although other values are not directly supported,
|
||||||
|
# multiples of 16000 (e.g. 32000 or 48000) are cast to 16000 inside of the JIT model!
|
||||||
|
sample_rate = audio["sample_rate"]
|
||||||
|
if sample_rate != 16000:
|
||||||
|
raise ValueError("Only 16000Hz sample rate is allowed")
|
||||||
|
|
||||||
|
timestamps = self.get_speech_timestamps(audio["waveform"],
|
||||||
|
model=self.vad_pipeline,
|
||||||
|
sampling_rate=sample_rate,
|
||||||
|
max_speech_duration_s=self.chunk_size,
|
||||||
|
threshold=self.vad_onset
|
||||||
|
# min_silence_duration_ms = self.min_duration_off/1000
|
||||||
|
# min_speech_duration_ms = self.min_duration_on/1000
|
||||||
|
# ...
|
||||||
|
# See silero documentation for full option list
|
||||||
|
)
|
||||||
|
return [SegmentX(i['start'] / sample_rate, i['end'] / sample_rate, "UNKNOWN") for i in timestamps]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def preprocess_audio(audio):
|
||||||
|
return audio
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def merge_chunks(segments,
|
||||||
|
chunk_size,
|
||||||
|
onset: float = 0.5,
|
||||||
|
offset: Optional[float] = None,
|
||||||
|
):
|
||||||
|
assert chunk_size > 0
|
||||||
|
return Vad.merge_chunks(segments, chunk_size, onset, offset)
|
74
whisperx/vads/vad.py
Normal file
74
whisperx/vads/vad.py
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
from pyannote.core import Annotation, Segment
|
||||||
|
|
||||||
|
|
||||||
|
class Vad:
|
||||||
|
def __init__(self, vad_onset):
|
||||||
|
if not (0 < vad_onset < 1):
|
||||||
|
raise ValueError(
|
||||||
|
"vad_onset is a decimal value between 0 and 1."
|
||||||
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def preprocess_audio(audio):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# keep merge_chunks as static so it can be also used by manually assigned vad_model (see 'load_model')
|
||||||
|
@staticmethod
|
||||||
|
def merge_chunks(segments,
|
||||||
|
chunk_size,
|
||||||
|
onset: float,
|
||||||
|
offset: Optional[float]):
|
||||||
|
"""
|
||||||
|
Merge operation described in paper
|
||||||
|
"""
|
||||||
|
curr_end = 0
|
||||||
|
merged_segments = []
|
||||||
|
seg_idxs: list[tuple]= []
|
||||||
|
speaker_idxs: list[Optional[str]] = []
|
||||||
|
|
||||||
|
curr_start = segments[0].start
|
||||||
|
for seg in segments:
|
||||||
|
if seg.end - curr_start > chunk_size and curr_end - curr_start > 0:
|
||||||
|
merged_segments.append({
|
||||||
|
"start": curr_start,
|
||||||
|
"end": curr_end,
|
||||||
|
"segments": seg_idxs,
|
||||||
|
})
|
||||||
|
curr_start = seg.start
|
||||||
|
seg_idxs = []
|
||||||
|
speaker_idxs = []
|
||||||
|
curr_end = seg.end
|
||||||
|
seg_idxs.append((seg.start, seg.end))
|
||||||
|
speaker_idxs.append(seg.speaker)
|
||||||
|
# add final
|
||||||
|
merged_segments.append({
|
||||||
|
"start": curr_start,
|
||||||
|
"end": curr_end,
|
||||||
|
"segments": seg_idxs,
|
||||||
|
})
|
||||||
|
|
||||||
|
return merged_segments
|
||||||
|
|
||||||
|
# Unused function
|
||||||
|
@staticmethod
|
||||||
|
def merge_vad(vad_arr, pad_onset=0.0, pad_offset=0.0, min_duration_off=0.0, min_duration_on=0.0):
|
||||||
|
active = Annotation()
|
||||||
|
for k, vad_t in enumerate(vad_arr):
|
||||||
|
region = Segment(vad_t[0] - pad_onset, vad_t[1] + pad_offset)
|
||||||
|
active[region, k] = 1
|
||||||
|
|
||||||
|
if pad_offset > 0.0 or pad_onset > 0.0 or min_duration_off > 0.0:
|
||||||
|
active = active.support(collar=min_duration_off)
|
||||||
|
|
||||||
|
# remove tracks shorter than min_duration_on
|
||||||
|
if min_duration_on > 0:
|
||||||
|
for segment, track in list(active.itertracks()):
|
||||||
|
if segment.duration < min_duration_on:
|
||||||
|
del active[segment, track]
|
||||||
|
|
||||||
|
active = active.for_json()
|
||||||
|
active_segs = pd.DataFrame([x['segment'] for x in active['content']])
|
||||||
|
return active_segs
|
Reference in New Issue
Block a user