#!/usr/bin/env python3
"""
FunASR 批量语音转文字工具
用法: python batch_asr.py -d /path/audio/ -o /path/text/
"""
import argparse
import glob
import os
import sys
from pathlib import Path
from funasr import AutoModel
SUPPORTED_EXTENSIONS = (".wav", ".mp3", ".ogg", ".flac", ".m4a")
def parse_args():
parser = argparse.ArgumentParser(
description="FunASR 批量语音转文字工具",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python batch_asr.py -d ./audio/ -o ./output/
python batch_asr.py -d ./audio/ -o ./output/ -w "热词" -b 300
python batch_asr.py -i ./audio/test.wav -o ./output/
""",
)
parser.add_argument("-d", "--dir", type=str, help="输入音频目录")
parser.add_argument("-i", "--input", type=str, help="输入单个音频文件")
parser.add_argument("-o", "--output", type=str, help="输出目录(默认与输入同目录)", required=True)
parser.add_argument("-w", "--word", type=str, help="热词(可选)")
parser.add_argument("-b", "--batch", type=int, default=300, help="批处理大小(秒)", dest="batch")
parser.add_argument("--suffix", type=str, default=".txt", help="输出文件后缀名(默认 .txt)")
parser.add_argument("--ext", nargs="+", default=SUPPORTED_EXTENSIONS, help="处理的音频格式")
return parser.parse_args()
def get_audio_files(input_path, extensions):
"""收集所有音频文件"""
audio_files = []
if os.path.isfile(input_path):
ext = Path(input_path).suffix.lower()
if ext in extensions:
audio_files.append(input_path)
else:
print(f"\u26a0 不支持的文件格式: {input_path}")
elif os.path.isdir(input_path):
for root, _, files in os.walk(input_path):
for file in files:
file_path = os.path.join(root, file)
ext = Path(file_path).suffix.lower()
if ext in extensions:
audio_files.append(file_path)
else:
print(f"\u274c 路径不存在: {input_path}")
sys.exit(1)
return sorted(audio_files)
def main():
args = parse_args()
# 1. 创建输出目录
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
print(f"\U0001f4c1 输出目录: {output_dir}")
# 2. 收集文件
input_path = args.dir or args.input
audio_files = get_audio_files(input_path, args.ext)
if not audio_files:
print("\u274c 未找到音频文件")
sys.exit(1)
print(f"\U0001f4c2 找到 {len(audio_files)} 个音频文件")
print(f"\U0001f524 热词: {args.word or '\u65e0'}")
print("-" * 50)
# 3. 加载模型
print("\u23f3 加载模型...")
model = AutoModel(
model="paraformer-zh",
vad_model="fsmn-vad",
punc_model="ct-punc",
)
print("\u2705 模型加载完成")
print("-" * 50)
# 4. 批量转写
success_count = 0
fail_count = 0
total_duration = 0
for i, audio_file in enumerate(audio_files, 1):
stem = Path(audio_file).stem
output_file = output_dir / f"{stem}{args.suffix}"
print(f"[{i}/{len(audio_files)}] 处理: {os.path.basename(audio_file)}", end=" ... ")
try:
result = model.generate(input=audio_file, batch_size_s=args.batch, hotword=args.word)
text = result[0]["text"]
with open(output_file, "w", encoding="utf-8") as f:
f.write(text)
if "time_speech" in result[0]:
total_duration += result[0]["time_speech"]
print(f"\u2705 已保存 -> {output_file}")
success_count += 1
except Exception as e:
print(f"\u274c 失败: {str(e)[:50]}")
fail_count += 1
# 5. 统计
print("-" * 50)
print(f"\U0001f4ca 统计: 成功 {success_count} / 失败 {fail_count} / 总计 {len(audio_files)}")
if total_duration > 0:
print(f"\u23f1 音频总时长: {total_duration:.1f} 秒")
print(f"\U0001f4c1 输出目录: {output_dir}")
if __name__ == "__main__":
main()