import warnings from typing import TYPE_CHECKING, Optional, Tuple, Union import numpy as np import torch import tqdm import ffmpeg from whisper.audio import ( FRAMES_PER_SECOND, HOP_LENGTH, N_FRAMES, N_SAMPLES, SAMPLE_RATE, CHUNK_LENGTH, log_mel_spectrogram, pad_or_trim, load_audio ) from whisper.decoding import DecodingOptions, DecodingResult from whisper.timing import add_word_timestamps from whisper.tokenizer import LANGUAGES, TO_LANGUAGE_CODE, get_tokenizer from whisper.utils import ( exact_div, format_timestamp, make_safe, ) if TYPE_CHECKING: from whisper.model import Whisper from .vad import merge_chunks def transcribe( model: "Whisper", audio: Union[str, np.ndarray, torch.Tensor] = None, mel: np.ndarray = None, verbose: Optional[bool] = None, temperature: Union[float, Tuple[float, ...]] = (0.0, 0.2, 0.4, 0.6, 0.8, 1.0), compression_ratio_threshold: Optional[float] = 2.4, logprob_threshold: Optional[float] = -1.0, no_speech_threshold: Optional[float] = 0.6, condition_on_previous_text: bool = True, initial_prompt: Optional[str] = None, word_timestamps: bool = False, prepend_punctuations: str = "\"'“¿([{-", append_punctuations: str = "\"'.。,,!!??::”)]}、", **decode_options, ): """ Transcribe an audio file using Whisper. We redefine the Whisper transcribe function to allow mel input (for sequential slicing of audio) Parameters ---------- model: Whisper The Whisper model instance audio: Union[str, np.ndarray, torch.Tensor] The path to the audio file to open, or the audio waveform mel: np.ndarray Mel spectrogram of audio segment. verbose: bool Whether to display the text being decoded to the console. If True, displays all the details, If False, displays minimal details. If None, does not display anything temperature: Union[float, Tuple[float, ...]] Temperature for sampling. It can be a tuple of temperatures, which will be successively used upon failures according to either `compression_ratio_threshold` or `logprob_threshold`. compression_ratio_threshold: float If the gzip compression ratio is above this value, treat as failed logprob_threshold: float If the average log probability over sampled tokens is below this value, treat as failed no_speech_threshold: float If the no_speech probability is higher than this value AND the average log probability over sampled tokens is below `logprob_threshold`, consider the segment as silent condition_on_previous_text: bool if True, the previous output of the model is provided as a prompt for the next window; disabling may make the text inconsistent across windows, but the model becomes less prone to getting stuck in a failure loop, such as repetition looping or timestamps going out of sync. word_timestamps: bool Extract word-level timestamps using the cross-attention pattern and dynamic time warping, and include the timestamps for each word in each segment. prepend_punctuations: str If word_timestamps is True, merge these punctuation symbols with the next word append_punctuations: str If word_timestamps is True, merge these punctuation symbols with the previous word initial_prompt: Optional[str] Optional text to provide as a prompt for the first window. This can be used to provide, or "prompt-engineer" a context for transcription, e.g. custom vocabularies or proper nouns to make it more likely to predict those word correctly. decode_options: dict Keyword arguments to construct `DecodingOptions` instances Returns ------- A dictionary containing the resulting text ("text") and segment-level details ("segments"), and the spoken language ("language"), which is detected when `decode_options["language"]` is None. """ dtype = torch.float16 if decode_options.get("fp16", True) else torch.float32 if model.device == torch.device("cpu"): if torch.cuda.is_available(): warnings.warn("Performing inference on CPU when CUDA is available") if dtype == torch.float16: warnings.warn("FP16 is not supported on CPU; using FP32 instead") dtype = torch.float32 if dtype == torch.float32: decode_options["fp16"] = False # Pad 30-seconds of silence to the input audio, for slicing if mel is None: if audio is None: raise ValueError("Transcribe needs either audio or mel as input, currently both are none.") mel = log_mel_spectrogram(audio, padding=N_SAMPLES) content_frames = mel.shape[-1] - N_FRAMES if decode_options.get("language", None) is None: if not model.is_multilingual: decode_options["language"] = "en" else: if verbose: print( "Detecting language using up to the first 30 seconds. Use `--language` to specify the language" ) mel_segment = pad_or_trim(mel, N_FRAMES).to(model.device).to(dtype) _, probs = model.detect_language(mel_segment) decode_options["language"] = max(probs, key=probs.get) if verbose is not None: print( f"Detected language: {LANGUAGES[decode_options['language']].title()}" ) language: str = decode_options["language"] task: str = decode_options.get("task", "transcribe") tokenizer = get_tokenizer(model.is_multilingual, language=language, task=task) if word_timestamps and task == "translate": warnings.warn("Word-level timestamps on translations may not be reliable.") def decode_with_fallback(segment: torch.Tensor) -> DecodingResult: temperatures = ( [temperature] if isinstance(temperature, (int, float)) else temperature ) decode_result = None for t in temperatures: kwargs = {**decode_options} if t > 0: # disable beam_size and patience when t > 0 kwargs.pop("beam_size", None) kwargs.pop("patience", None) else: # disable best_of when t == 0 kwargs.pop("best_of", None) options = DecodingOptions(**kwargs, temperature=t) decode_result = model.decode(segment, options) needs_fallback = False if ( compression_ratio_threshold is not None and decode_result.compression_ratio > compression_ratio_threshold ): needs_fallback = True # too repetitive if ( logprob_threshold is not None and decode_result.avg_logprob < logprob_threshold ): needs_fallback = True # average log probability is too low if not needs_fallback: break return decode_result seek = 0 input_stride = exact_div( N_FRAMES, model.dims.n_audio_ctx ) # mel frames per output token: 2 time_precision = ( input_stride * HOP_LENGTH / SAMPLE_RATE ) # time per output token: 0.02 (seconds) all_tokens = [] all_segments = [] prompt_reset_since = 0 if initial_prompt is not None: initial_prompt_tokens = tokenizer.encode(" " + initial_prompt.strip()) all_tokens.extend(initial_prompt_tokens) else: initial_prompt_tokens = [] def new_segment( *, start: float, end: float, tokens: torch.Tensor, result: DecodingResult ): tokens = tokens.tolist() text_tokens = [token for token in tokens if token < tokenizer.eot] return { "seek": seek, "start": start, "end": end, "text": tokenizer.decode(text_tokens), "tokens": tokens, "temperature": result.temperature, "avg_logprob": result.avg_logprob, "compression_ratio": result.compression_ratio, "no_speech_prob": result.no_speech_prob, } # show the progress bar when verbose is False (if True, transcribed text will be printed) with tqdm.tqdm( total=content_frames, unit="frames", disable=verbose is not False ) as pbar: while seek < content_frames: time_offset = float(seek * HOP_LENGTH / SAMPLE_RATE) mel_segment = mel[:, seek : seek + N_FRAMES] segment_size = min(N_FRAMES, content_frames - seek) segment_duration = segment_size * HOP_LENGTH / SAMPLE_RATE mel_segment = pad_or_trim(mel_segment, N_FRAMES).to(model.device).to(dtype) decode_options["prompt"] = all_tokens[prompt_reset_since:] result: DecodingResult = decode_with_fallback(mel_segment) tokens = torch.tensor(result.tokens) if no_speech_threshold is not None: # no voice activity check should_skip = result.no_speech_prob > no_speech_threshold if ( logprob_threshold is not None and result.avg_logprob > logprob_threshold ): # don't skip if the logprob is high enough, despite the no_speech_prob should_skip = False if should_skip: seek += segment_size # fast-forward to the next segment boundary continue previous_seek = seek current_segments = [] timestamp_tokens: torch.Tensor = tokens.ge(tokenizer.timestamp_begin) single_timestamp_ending = timestamp_tokens[-2:].tolist() == [False, True] consecutive = torch.where(timestamp_tokens[:-1] & timestamp_tokens[1:])[0] consecutive.add_(1) if len(consecutive) > 0: # if the output contains two consecutive timestamp tokens slices = consecutive.tolist() if single_timestamp_ending: slices.append(len(tokens)) last_slice = 0 for current_slice in slices: sliced_tokens = tokens[last_slice:current_slice] start_timestamp_pos = ( sliced_tokens[0].item() - tokenizer.timestamp_begin ) end_timestamp_pos = ( sliced_tokens[-1].item() - tokenizer.timestamp_begin ) # clamp end-time to at least be 1 frame after start-time end_timestamp_pos = max(end_timestamp_pos, start_timestamp_pos + time_precision) current_segments.append( new_segment( start=time_offset + start_timestamp_pos * time_precision, end=time_offset + end_timestamp_pos * time_precision, tokens=sliced_tokens, result=result, ) ) last_slice = current_slice if single_timestamp_ending: # single timestamp at the end means no speech after the last timestamp. seek += segment_size else: # otherwise, ignore the unfinished segment and seek to the last timestamp last_timestamp_pos = ( tokens[last_slice - 1].item() - tokenizer.timestamp_begin ) seek += last_timestamp_pos * input_stride else: duration = segment_duration timestamps = tokens[timestamp_tokens.nonzero().flatten()] if ( len(timestamps) > 0 and timestamps[-1].item() != tokenizer.timestamp_begin ): # no consecutive timestamps but it has a timestamp; use the last one. last_timestamp_pos = ( timestamps[-1].item() - tokenizer.timestamp_begin ) duration = last_timestamp_pos * time_precision current_segments.append( new_segment( start=time_offset, end=time_offset + duration, tokens=tokens, result=result, ) ) seek += segment_size if not condition_on_previous_text or result.temperature > 0.5: # do not feed the prompt tokens if a high temperature was used prompt_reset_since = len(all_tokens) if word_timestamps: add_word_timestamps( segments=current_segments, model=model, tokenizer=tokenizer, mel=mel_segment, num_frames=segment_size, prepend_punctuations=prepend_punctuations, append_punctuations=append_punctuations, ) word_end_timestamps = [ w["end"] for s in current_segments for w in s["words"] ] if not single_timestamp_ending and len(word_end_timestamps) > 0: seek_shift = round( (word_end_timestamps[-1] - time_offset) * FRAMES_PER_SECOND ) if seek_shift > 0: seek = previous_seek + seek_shift if verbose: for segment in current_segments: start, end, text = segment["start"], segment["end"], segment["text"] line = f"[{format_timestamp(start)} --> {format_timestamp(end)}] {text}" print(make_safe(line)) # if a segment is instantaneous or does not contain text, clear it for i, segment in enumerate(current_segments): if segment["start"] == segment["end"] or segment["text"].strip() == "": segment["text"] = "" segment["tokens"] = [] segment["words"] = [] all_segments.extend( [ {"id": i, **segment} for i, segment in enumerate( current_segments, start=len(all_segments) ) ] ) all_tokens.extend( [token for segment in current_segments for token in segment["tokens"]] ) # update progress bar pbar.update(min(content_frames, seek) - previous_seek) return dict( text=tokenizer.decode(all_tokens[len(initial_prompt_tokens) :]), segments=all_segments, language=language, ) def transcribe_with_vad( model: "Whisper", audio: str, vad_pipeline, mel = None, verbose: Optional[bool] = None, **kwargs ): """ Transcribe per VAD segment """ vad_segments = vad_pipeline(audio) # if not torch.is_tensor(audio): # if isinstance(audio, str): audio = load_audio(audio) audio = torch.from_numpy(audio) prev = 0 output = {"segments": []} # merge segments to approx 30s inputs to make whisper most appropraite vad_segments = merge_chunks(vad_segments, chunk_size=CHUNK_LENGTH) if len(vad_segments) == 0: return output print(">>Performing transcription...") for sdx, seg_t in enumerate(vad_segments): if verbose: print(f"~~ Transcribing VAD chunk: ({format_timestamp(seg_t['start'])} --> {format_timestamp(seg_t['end'])}) ~~") seg_f_start, seg_f_end = int(seg_t["start"] * SAMPLE_RATE), int(seg_t["end"] * SAMPLE_RATE) local_f_start, local_f_end = seg_f_start - prev, seg_f_end - prev audio = audio[local_f_start:] # seek forward seg_audio = audio[:local_f_end-local_f_start] # seek forward prev = seg_f_start local_mel = log_mel_spectrogram(seg_audio, padding=N_SAMPLES) # need to pad result = transcribe(model, audio, mel=local_mel, verbose=verbose, **kwargs) seg_t["text"] = result["text"] output["segments"].append( { "start": seg_t["start"], "end": seg_t["end"], "language": result["language"], "text": result["text"], "seg-text": [x["text"] for x in result["segments"]], "seg-start": [x["start"] for x in result["segments"]], "seg-end": [x["end"] for x in result["segments"]], } ) output["language"] = output["segments"][0]["language"] return output