mirror of
https://github.com/m-bain/whisperX.git
synced 2025-07-01 18:17:27 -04:00
429 lines
16 KiB
Python
429 lines
16 KiB
Python
import warnings
|
||
from typing import TYPE_CHECKING, Optional, Tuple, Union
|
||
import numpy as np
|
||
import torch
|
||
import tqdm
|
||
import ffmpeg
|
||
from whisper.audio import (
|
||
FRAMES_PER_SECOND,
|
||
HOP_LENGTH,
|
||
N_FRAMES,
|
||
N_SAMPLES,
|
||
SAMPLE_RATE,
|
||
CHUNK_LENGTH,
|
||
log_mel_spectrogram,
|
||
pad_or_trim,
|
||
load_audio
|
||
)
|
||
from whisper.decoding import DecodingOptions, DecodingResult
|
||
from whisper.timing import add_word_timestamps
|
||
from whisper.tokenizer import LANGUAGES, TO_LANGUAGE_CODE, get_tokenizer
|
||
from whisper.utils import (
|
||
exact_div,
|
||
format_timestamp,
|
||
make_safe,
|
||
)
|
||
|
||
if TYPE_CHECKING:
|
||
from whisper.model import Whisper
|
||
|
||
from .vad import merge_chunks
|
||
|
||
def transcribe(
|
||
model: "Whisper",
|
||
audio: Union[str, np.ndarray, torch.Tensor] = None,
|
||
mel: np.ndarray = None,
|
||
verbose: Optional[bool] = None,
|
||
temperature: Union[float, Tuple[float, ...]] = (0.0, 0.2, 0.4, 0.6, 0.8, 1.0),
|
||
compression_ratio_threshold: Optional[float] = 2.4,
|
||
logprob_threshold: Optional[float] = -1.0,
|
||
no_speech_threshold: Optional[float] = 0.6,
|
||
condition_on_previous_text: bool = True,
|
||
initial_prompt: Optional[str] = None,
|
||
word_timestamps: bool = False,
|
||
prepend_punctuations: str = "\"'“¿([{-",
|
||
append_punctuations: str = "\"'.。,,!!??::”)]}、",
|
||
**decode_options,
|
||
):
|
||
"""
|
||
Transcribe an audio file using Whisper.
|
||
We redefine the Whisper transcribe function to allow mel input (for sequential slicing of audio)
|
||
|
||
Parameters
|
||
----------
|
||
model: Whisper
|
||
The Whisper model instance
|
||
|
||
audio: Union[str, np.ndarray, torch.Tensor]
|
||
The path to the audio file to open, or the audio waveform
|
||
|
||
mel: np.ndarray
|
||
Mel spectrogram of audio segment.
|
||
|
||
verbose: bool
|
||
Whether to display the text being decoded to the console. If True, displays all the details,
|
||
If False, displays minimal details. If None, does not display anything
|
||
|
||
temperature: Union[float, Tuple[float, ...]]
|
||
Temperature for sampling. It can be a tuple of temperatures, which will be successively used
|
||
upon failures according to either `compression_ratio_threshold` or `logprob_threshold`.
|
||
|
||
compression_ratio_threshold: float
|
||
If the gzip compression ratio is above this value, treat as failed
|
||
|
||
logprob_threshold: float
|
||
If the average log probability over sampled tokens is below this value, treat as failed
|
||
|
||
no_speech_threshold: float
|
||
If the no_speech probability is higher than this value AND the average log probability
|
||
over sampled tokens is below `logprob_threshold`, consider the segment as silent
|
||
|
||
condition_on_previous_text: bool
|
||
if True, the previous output of the model is provided as a prompt for the next window;
|
||
disabling may make the text inconsistent across windows, but the model becomes less prone to
|
||
getting stuck in a failure loop, such as repetition looping or timestamps going out of sync.
|
||
|
||
word_timestamps: bool
|
||
Extract word-level timestamps using the cross-attention pattern and dynamic time warping,
|
||
and include the timestamps for each word in each segment.
|
||
|
||
prepend_punctuations: str
|
||
If word_timestamps is True, merge these punctuation symbols with the next word
|
||
|
||
append_punctuations: str
|
||
If word_timestamps is True, merge these punctuation symbols with the previous word
|
||
|
||
initial_prompt: Optional[str]
|
||
Optional text to provide as a prompt for the first window. This can be used to provide, or
|
||
"prompt-engineer" a context for transcription, e.g. custom vocabularies or proper nouns
|
||
to make it more likely to predict those word correctly.
|
||
|
||
decode_options: dict
|
||
Keyword arguments to construct `DecodingOptions` instances
|
||
|
||
Returns
|
||
-------
|
||
A dictionary containing the resulting text ("text") and segment-level details ("segments"), and
|
||
the spoken language ("language"), which is detected when `decode_options["language"]` is None.
|
||
"""
|
||
dtype = torch.float16 if decode_options.get("fp16", True) else torch.float32
|
||
if model.device == torch.device("cpu"):
|
||
if torch.cuda.is_available():
|
||
warnings.warn("Performing inference on CPU when CUDA is available")
|
||
if dtype == torch.float16:
|
||
warnings.warn("FP16 is not supported on CPU; using FP32 instead")
|
||
dtype = torch.float32
|
||
|
||
if dtype == torch.float32:
|
||
decode_options["fp16"] = False
|
||
|
||
# Pad 30-seconds of silence to the input audio, for slicing
|
||
if mel is None:
|
||
if audio is None:
|
||
raise ValueError("Transcribe needs either audio or mel as input, currently both are none.")
|
||
mel = log_mel_spectrogram(audio, padding=N_SAMPLES)
|
||
content_frames = mel.shape[-1] - N_FRAMES
|
||
|
||
if decode_options.get("language", None) is None:
|
||
if not model.is_multilingual:
|
||
decode_options["language"] = "en"
|
||
else:
|
||
if verbose:
|
||
print(
|
||
"Detecting language using up to the first 30 seconds. Use `--language` to specify the language"
|
||
)
|
||
mel_segment = pad_or_trim(mel, N_FRAMES).to(model.device).to(dtype)
|
||
_, probs = model.detect_language(mel_segment)
|
||
decode_options["language"] = max(probs, key=probs.get)
|
||
if verbose is not None:
|
||
print(
|
||
f"Detected language: {LANGUAGES[decode_options['language']].title()}"
|
||
)
|
||
|
||
language: str = decode_options["language"]
|
||
task: str = decode_options.get("task", "transcribe")
|
||
tokenizer = get_tokenizer(model.is_multilingual, language=language, task=task)
|
||
|
||
if word_timestamps and task == "translate":
|
||
warnings.warn("Word-level timestamps on translations may not be reliable.")
|
||
|
||
def decode_with_fallback(segment: torch.Tensor) -> DecodingResult:
|
||
temperatures = (
|
||
[temperature] if isinstance(temperature, (int, float)) else temperature
|
||
)
|
||
decode_result = None
|
||
|
||
for t in temperatures:
|
||
kwargs = {**decode_options}
|
||
if t > 0:
|
||
# disable beam_size and patience when t > 0
|
||
kwargs.pop("beam_size", None)
|
||
kwargs.pop("patience", None)
|
||
else:
|
||
# disable best_of when t == 0
|
||
kwargs.pop("best_of", None)
|
||
|
||
options = DecodingOptions(**kwargs, temperature=t)
|
||
decode_result = model.decode(segment, options)
|
||
|
||
needs_fallback = False
|
||
if (
|
||
compression_ratio_threshold is not None
|
||
and decode_result.compression_ratio > compression_ratio_threshold
|
||
):
|
||
needs_fallback = True # too repetitive
|
||
if (
|
||
logprob_threshold is not None
|
||
and decode_result.avg_logprob < logprob_threshold
|
||
):
|
||
needs_fallback = True # average log probability is too low
|
||
|
||
if not needs_fallback:
|
||
break
|
||
|
||
return decode_result
|
||
|
||
seek = 0
|
||
input_stride = exact_div(
|
||
N_FRAMES, model.dims.n_audio_ctx
|
||
) # mel frames per output token: 2
|
||
time_precision = (
|
||
input_stride * HOP_LENGTH / SAMPLE_RATE
|
||
) # time per output token: 0.02 (seconds)
|
||
all_tokens = []
|
||
all_segments = []
|
||
prompt_reset_since = 0
|
||
|
||
if initial_prompt is not None:
|
||
initial_prompt_tokens = tokenizer.encode(" " + initial_prompt.strip())
|
||
all_tokens.extend(initial_prompt_tokens)
|
||
else:
|
||
initial_prompt_tokens = []
|
||
|
||
def new_segment(
|
||
*, start: float, end: float, tokens: torch.Tensor, result: DecodingResult
|
||
):
|
||
tokens = tokens.tolist()
|
||
text_tokens = [token for token in tokens if token < tokenizer.eot]
|
||
return {
|
||
"seek": seek,
|
||
"start": start,
|
||
"end": end,
|
||
"text": tokenizer.decode(text_tokens),
|
||
"tokens": tokens,
|
||
"temperature": result.temperature,
|
||
"avg_logprob": result.avg_logprob,
|
||
"compression_ratio": result.compression_ratio,
|
||
"no_speech_prob": result.no_speech_prob,
|
||
}
|
||
|
||
|
||
# show the progress bar when verbose is False (if True, transcribed text will be printed)
|
||
with tqdm.tqdm(
|
||
total=content_frames, unit="frames", disable=verbose is not False
|
||
) as pbar:
|
||
while seek < content_frames:
|
||
time_offset = float(seek * HOP_LENGTH / SAMPLE_RATE)
|
||
mel_segment = mel[:, seek : seek + N_FRAMES]
|
||
segment_size = min(N_FRAMES, content_frames - seek)
|
||
segment_duration = segment_size * HOP_LENGTH / SAMPLE_RATE
|
||
mel_segment = pad_or_trim(mel_segment, N_FRAMES).to(model.device).to(dtype)
|
||
|
||
decode_options["prompt"] = all_tokens[prompt_reset_since:]
|
||
result: DecodingResult = decode_with_fallback(mel_segment)
|
||
tokens = torch.tensor(result.tokens)
|
||
if no_speech_threshold is not None:
|
||
# no voice activity check
|
||
should_skip = result.no_speech_prob > no_speech_threshold
|
||
if (
|
||
logprob_threshold is not None
|
||
and result.avg_logprob > logprob_threshold
|
||
):
|
||
# don't skip if the logprob is high enough, despite the no_speech_prob
|
||
should_skip = False
|
||
|
||
if should_skip:
|
||
seek += segment_size # fast-forward to the next segment boundary
|
||
continue
|
||
|
||
previous_seek = seek
|
||
current_segments = []
|
||
|
||
timestamp_tokens: torch.Tensor = tokens.ge(tokenizer.timestamp_begin)
|
||
single_timestamp_ending = timestamp_tokens[-2:].tolist() == [False, True]
|
||
|
||
consecutive = torch.where(timestamp_tokens[:-1] & timestamp_tokens[1:])[0]
|
||
consecutive.add_(1)
|
||
if len(consecutive) > 0:
|
||
# if the output contains two consecutive timestamp tokens
|
||
slices = consecutive.tolist()
|
||
if single_timestamp_ending:
|
||
slices.append(len(tokens))
|
||
|
||
last_slice = 0
|
||
for current_slice in slices:
|
||
sliced_tokens = tokens[last_slice:current_slice]
|
||
start_timestamp_pos = (
|
||
sliced_tokens[0].item() - tokenizer.timestamp_begin
|
||
)
|
||
end_timestamp_pos = (
|
||
sliced_tokens[-1].item() - tokenizer.timestamp_begin
|
||
)
|
||
current_segments.append(
|
||
new_segment(
|
||
start=time_offset + start_timestamp_pos * time_precision,
|
||
end=time_offset + end_timestamp_pos * time_precision,
|
||
tokens=sliced_tokens,
|
||
result=result,
|
||
)
|
||
)
|
||
last_slice = current_slice
|
||
|
||
if single_timestamp_ending:
|
||
# single timestamp at the end means no speech after the last timestamp.
|
||
seek += segment_size
|
||
else:
|
||
# otherwise, ignore the unfinished segment and seek to the last timestamp
|
||
last_timestamp_pos = (
|
||
tokens[last_slice - 1].item() - tokenizer.timestamp_begin
|
||
)
|
||
seek += last_timestamp_pos * input_stride
|
||
else:
|
||
duration = segment_duration
|
||
timestamps = tokens[timestamp_tokens.nonzero().flatten()]
|
||
if (
|
||
len(timestamps) > 0
|
||
and timestamps[-1].item() != tokenizer.timestamp_begin
|
||
):
|
||
# no consecutive timestamps but it has a timestamp; use the last one.
|
||
last_timestamp_pos = (
|
||
timestamps[-1].item() - tokenizer.timestamp_begin
|
||
)
|
||
duration = last_timestamp_pos * time_precision
|
||
|
||
current_segments.append(
|
||
new_segment(
|
||
start=time_offset,
|
||
end=time_offset + duration,
|
||
tokens=tokens,
|
||
result=result,
|
||
)
|
||
)
|
||
seek += segment_size
|
||
|
||
if not condition_on_previous_text or result.temperature > 0.5:
|
||
# do not feed the prompt tokens if a high temperature was used
|
||
prompt_reset_since = len(all_tokens)
|
||
|
||
if word_timestamps:
|
||
add_word_timestamps(
|
||
segments=current_segments,
|
||
model=model,
|
||
tokenizer=tokenizer,
|
||
mel=mel_segment,
|
||
num_frames=segment_size,
|
||
prepend_punctuations=prepend_punctuations,
|
||
append_punctuations=append_punctuations,
|
||
)
|
||
word_end_timestamps = [
|
||
w["end"] for s in current_segments for w in s["words"]
|
||
]
|
||
if not single_timestamp_ending and len(word_end_timestamps) > 0:
|
||
seek_shift = round(
|
||
(word_end_timestamps[-1] - time_offset) * FRAMES_PER_SECOND
|
||
)
|
||
if seek_shift > 0:
|
||
seek = previous_seek + seek_shift
|
||
|
||
if verbose:
|
||
for segment in current_segments:
|
||
start, end, text = segment["start"], segment["end"], segment["text"]
|
||
line = f"[{format_timestamp(start)} --> {format_timestamp(end)}] {text}"
|
||
print(make_safe(line))
|
||
|
||
# if a segment is instantaneous or does not contain text, clear it
|
||
for i, segment in enumerate(current_segments):
|
||
if segment["start"] == segment["end"] or segment["text"].strip() == "":
|
||
segment["text"] = ""
|
||
segment["tokens"] = []
|
||
segment["words"] = []
|
||
|
||
all_segments.extend(
|
||
[
|
||
{"id": i, **segment}
|
||
for i, segment in enumerate(
|
||
current_segments, start=len(all_segments)
|
||
)
|
||
]
|
||
)
|
||
all_tokens.extend(
|
||
[token for segment in current_segments for token in segment["tokens"]]
|
||
)
|
||
|
||
# update progress bar
|
||
pbar.update(min(content_frames, seek) - previous_seek)
|
||
|
||
|
||
return dict(
|
||
text=tokenizer.decode(all_tokens[len(initial_prompt_tokens) :]),
|
||
segments=all_segments,
|
||
language=language,
|
||
)
|
||
|
||
|
||
def transcribe_with_vad(
|
||
model: "Whisper",
|
||
audio: str,
|
||
vad_pipeline,
|
||
mel = None,
|
||
verbose: Optional[bool] = None,
|
||
**kwargs
|
||
):
|
||
"""
|
||
Transcribe per VAD segment
|
||
"""
|
||
|
||
vad_segments = vad_pipeline(audio)
|
||
|
||
# if not torch.is_tensor(audio):
|
||
# if isinstance(audio, str):
|
||
audio = load_audio(audio)
|
||
audio = torch.from_numpy(audio)
|
||
|
||
prev = 0
|
||
output = {"segments": []}
|
||
|
||
# merge segments to approx 30s inputs to make whisper most appropraite
|
||
vad_segments = merge_chunks(vad_segments, chunk_size=CHUNK_LENGTH)
|
||
if len(vad_segments) == 0:
|
||
return output
|
||
|
||
print(">>Performing transcription...")
|
||
for sdx, seg_t in enumerate(vad_segments):
|
||
if verbose:
|
||
print(f"~~ Transcribing VAD chunk: ({format_timestamp(seg_t['start'])} --> {format_timestamp(seg_t['end'])}) ~~")
|
||
seg_f_start, seg_f_end = int(seg_t["start"] * SAMPLE_RATE), int(seg_t["end"] * SAMPLE_RATE)
|
||
local_f_start, local_f_end = seg_f_start - prev, seg_f_end - prev
|
||
audio = audio[local_f_start:] # seek forward
|
||
seg_audio = audio[:local_f_end-local_f_start] # seek forward
|
||
prev = seg_f_start
|
||
local_mel = log_mel_spectrogram(seg_audio, padding=N_SAMPLES)
|
||
# need to pad
|
||
|
||
result = transcribe(model, audio, mel=local_mel, verbose=verbose, **kwargs)
|
||
seg_t["text"] = result["text"]
|
||
output["segments"].append(
|
||
{
|
||
"start": seg_t["start"],
|
||
"end": seg_t["end"],
|
||
"language": result["language"],
|
||
"text": result["text"],
|
||
"seg-text": [x["text"] for x in result["segments"]],
|
||
"seg-start": [x["start"] for x in result["segments"]],
|
||
"seg-end": [x["end"] for x in result["segments"]],
|
||
}
|
||
)
|
||
|
||
output["language"] = output["segments"][0]["language"]
|
||
|
||
return output |