add .ass output

This commit is contained in:
Max Bain
2022-12-17 17:24:48 +00:00
parent 938341c05a
commit 645d55903a
8 changed files with 462 additions and 42 deletions

View File

@ -12,7 +12,7 @@ from .audio import SAMPLE_RATE, N_FRAMES, HOP_LENGTH, pad_or_trim, log_mel_spect
from .alignment import get_trellis, backtrack, merge_repeats, merge_words
from .decoding import DecodingOptions, DecodingResult
from .tokenizer import LANGUAGES, TO_LANGUAGE_CODE, get_tokenizer
from .utils import exact_div, format_timestamp, optional_int, optional_float, str2bool, write_txt, write_vtt, write_srt
from .utils import exact_div, format_timestamp, optional_int, optional_float, str2bool, write_txt, write_vtt, write_srt, write_ass
if TYPE_CHECKING:
from .model import Whisper
@ -269,7 +269,6 @@ def align(
MAX_DURATION = audio.shape[1] / SAMPLE_RATE
prev_t2 = 0
word_level = []
for idx, segment in enumerate(transcript):
t1 = max(segment['start'] - extend_duration, 0)
t2 = min(segment['end'] + extend_duration, MAX_DURATION)
@ -287,10 +286,11 @@ def align(
transcription = segment['text'].strip()
t_words = transcription.split(' ')
t_words_clean = [re.sub(r"[^a-zA-Z' ]", "", x) for x in t_words]
t_words_clean = [''.join([w for w in word if w.upper() in model_dictionary.keys()]) for word in t_words]
t_words_nonempty = [x for x in t_words_clean if x != ""]
t_words_nonempty_idx = [x for x in range(len(t_words_clean)) if t_words_clean[x] != ""]
segment['word-level'] = []
if len(t_words_nonempty) > 0:
transcription_cleaned = "|".join(t_words_nonempty).upper()
tokens = [model_dictionary[c] for c in transcription_cleaned]
@ -315,27 +315,25 @@ def align(
segment['end'] = t2_actual
prev_t2 = segment['end']
# merge missing words to previous, or merge with next word ahead if idx == 0
for x in range(len(t_local)):
curr_word = t_words[x]
curr_timestamp = t_local[x]
if curr_timestamp is not None:
word_level.append({"text": curr_word, "start": curr_timestamp[0], "end": curr_timestamp[1]})
segment['word-level'].append({"text": curr_word, "start": curr_timestamp[0], "end": curr_timestamp[1]})
else:
if x == 0:
t_words[x+1] = " ".join([curr_word, t_words[x+1]])
else:
word_level[-1]['text'] += ' ' + curr_word
segment['word-level'].append({"text": curr_word, "start": None, "end": None})
else:
# then we resort back to original whisper timestamps
# segment['start] and segment['end'] are unchanged
prev_t2 = 0
word_level.append({"text": segment['text'], "start": segment['start'], "end":segment['end']})
segment['word-level'].append({"text": segment['text'], "start": segment['start'], "end":segment['end']})
print(f"[{format_timestamp(segment['start'])} --> {format_timestamp(segment['end'])}] {segment['text']}")
return {"segments": transcript}, {"segments": word_level}
return {"segments": transcript}
def cli():
from . import available_models
@ -347,11 +345,9 @@ def cli():
parser.add_argument("--device", default="cuda" if torch.cuda.is_available() else "cpu", help="device to use for PyTorch inference")
# alignment params
parser.add_argument("--align_model", default="WAV2VEC2_ASR_LARGE_LV60K_960H", help="Name of phoneme-level ASR model to do alignment")
parser.add_argument("--align_extend", default=1, type=float, help="Seconds before and after to extend the whisper segments for alignment")
parser.add_argument("--align_extend", default=2, type=float, help="Seconds before and after to extend the whisper segments for alignment")
parser.add_argument("--align_from_prev", default=True, type=bool, help="Whether to clip the alignment start time of current segment to the end time of the last aligned word of the previous segment")
# parser.add_argument("--align_interpolate_missing", default=True, type=bool, help="Whether to interpolate the timestamp of words not tokenized by the align model, e.g. integers")
parser.add_argument("--output_dir", "-o", type=str, default=".", help="directory to save the outputs")
parser.add_argument("--output_type", default="srt", choices=['all', 'srt', 'vtt', 'txt'], help="directory to save the outputs")
@ -417,7 +413,7 @@ def cli():
for audio_path in args.pop("audio"):
result = transcribe(model, audio_path, temperature=temperature, **args)
result_aligned, result_aligned_word = align(result["segments"], align_model, align_dictionary, audio_path, device,
result_aligned = align(result["segments"], align_model, align_dictionary, audio_path, device,
extend_duration=align_extend, start_from_previous=align_from_prev)
audio_basename = os.path.basename(audio_path)
@ -425,22 +421,20 @@ def cli():
if output_type in ["txt", "all"]:
with open(os.path.join(output_dir, audio_basename + ".txt"), "w", encoding="utf-8") as txt:
write_txt(result_aligned["segments"], file=txt)
with open(os.path.join(output_dir, audio_basename + ".word.txt"), "w", encoding="utf-8") as txt:
write_txt(result_aligned_word["segments"], file=txt)
# save VTT
if output_type in ["vtt", "all"]:
with open(os.path.join(output_dir, audio_basename + ".vtt"), "w", encoding="utf-8") as vtt:
write_vtt(result_aligned["segments"], file=vtt)
with open(os.path.join(output_dir, audio_basename + ".word.vtt"), "w", encoding="utf-8") as vtt:
write_vtt(result_aligned_word["segments"], file=vtt)
# save SRT
if output_type in ["srt", "all"]:
with open(os.path.join(output_dir, audio_basename + ".srt"), "w", encoding="utf-8") as srt:
write_srt(result_aligned["segments"], file=srt)
with open(os.path.join(output_dir, audio_basename + ".word.srt"), "w", encoding="utf-8") as srt:
write_srt(result_aligned_word["segments"], file=srt)
# save ASS
with open(os.path.join(output_dir, audio_basename + ".ass"), "w", encoding="utf-8") as srt:
write_ass(result_aligned["segments"], file=srt)
if __name__ == '__main__':