跳转至

FunASR 批量转写脚本

使用方法

基础用法

Bash
1
2
3
4
5
6
7
source ~/funasr-env/bin/activate

# 转写整个目录
python batch_asr.py -d /path/audio/ -o /path/text/

# 转写单个文件
python batch_asr.py -i /path/audio/test.wav -o /path/text/

进阶用法

Bash
# 指定热词
python batch_asr.py -d ./audio/ -o ./output/ -w "公司名 产品名"

# 调整批处理大小(加快速度)
python batch_asr.py -d ./audio/ -o ./output/ -b 600

# 只处理 wav 文件
python batch_asr.py -d ./audio/ -o ./output/ --ext .wav

# 输出为 srt 字幕格式
python batch_asr.py -d ./audio/ -o ./output/ --suffix .srt

后台运行

Bash
nohup python batch_asr.py -d /path/audio/ -o /path/text/ > /var/log/batch_asr.log 2>&1 &
tail -f /var/log/batch_asr.log

脚本参数说明

参数 说明 必填 默认值
-d 输入音频目录 二选一
-i 输入单个音频文件 二选一
-o 输出目录 与输入同目录
-w 热词(空格分隔)
-b 批处理大小(秒) 300
--suffix 输出文件后缀名 .txt
--ext 处理的音频格式 wav/mp3/ogg/flac/m4a

脚本代码

创建文件 /root/batch_asr.py

Python
#!/usr/bin/env python3
"""
FunASR 批量语音转文字工具
用法: python batch_asr.py -d /path/audio/ -o /path/text/
"""

import argparse
import glob
import os
import sys
from pathlib import Path
from funasr import AutoModel

SUPPORTED_EXTENSIONS = (".wav", ".mp3", ".ogg", ".flac", ".m4a")


def parse_args():
    parser = argparse.ArgumentParser(
        description="FunASR 批量语音转文字工具",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
示例:
  python batch_asr.py -d ./audio/ -o ./output/
  python batch_asr.py -d ./audio/ -o ./output/ -w "热词" -b 300
  python batch_asr.py -i ./audio/test.wav -o ./output/
        """,
    )
    parser.add_argument("-d", "--dir", type=str, help="输入音频目录")
    parser.add_argument("-i", "--input", type=str, help="输入单个音频文件")
    parser.add_argument("-o", "--output", type=str, help="输出目录(默认与输入同目录)", required=True)
    parser.add_argument("-w", "--word", type=str, help="热词(可选)")
    parser.add_argument("-b", "--batch", type=int, default=300, help="批处理大小(秒)", dest="batch")
    parser.add_argument("--suffix", type=str, default=".txt", help="输出文件后缀名(默认 .txt)")
    parser.add_argument("--ext", nargs="+", default=SUPPORTED_EXTENSIONS, help="处理的音频格式")
    return parser.parse_args()


def get_audio_files(input_path, extensions):
    """收集所有音频文件"""
    audio_files = []

    if os.path.isfile(input_path):
        ext = Path(input_path).suffix.lower()
        if ext in extensions:
            audio_files.append(input_path)
        else:
            print(f"\u26a0 不支持的文件格式: {input_path}")
    elif os.path.isdir(input_path):
        for root, _, files in os.walk(input_path):
            for file in files:
                file_path = os.path.join(root, file)
                ext = Path(file_path).suffix.lower()
                if ext in extensions:
                    audio_files.append(file_path)
    else:
        print(f"\u274c 路径不存在: {input_path}")
        sys.exit(1)

    return sorted(audio_files)


def main():
    args = parse_args()

    # 1. 创建输出目录
    output_dir = Path(args.output)
    output_dir.mkdir(parents=True, exist_ok=True)
    print(f"\U0001f4c1 输出目录: {output_dir}")

    # 2. 收集文件
    input_path = args.dir or args.input
    audio_files = get_audio_files(input_path, args.ext)

    if not audio_files:
        print("\u274c 未找到音频文件")
        sys.exit(1)

    print(f"\U0001f4c2 找到 {len(audio_files)} 个音频文件")
    print(f"\U0001f524 热词: {args.word or '\u65e0'}")
    print("-" * 50)

    # 3. 加载模型
    print("\u23f3 加载模型...")
    model = AutoModel(
        model="paraformer-zh",
        vad_model="fsmn-vad",
        punc_model="ct-punc",
    )
    print("\u2705 模型加载完成")
    print("-" * 50)

    # 4. 批量转写
    success_count = 0
    fail_count = 0
    total_duration = 0

    for i, audio_file in enumerate(audio_files, 1):
        stem = Path(audio_file).stem
        output_file = output_dir / f"{stem}{args.suffix}"

        print(f"[{i}/{len(audio_files)}] 处理: {os.path.basename(audio_file)}", end=" ... ")

        try:
            result = model.generate(input=audio_file, batch_size_s=args.batch, hotword=args.word)

            text = result[0]["text"]
            with open(output_file, "w", encoding="utf-8") as f:
                f.write(text)

            if "time_speech" in result[0]:
                total_duration += result[0]["time_speech"]

            print(f"\u2705 已保存 -> {output_file}")
            success_count += 1

        except Exception as e:
            print(f"\u274c 失败: {str(e)[:50]}")
            fail_count += 1

    # 5. 统计
    print("-" * 50)
    print(f"\U0001f4ca 统计: 成功 {success_count} / 失败 {fail_count} / 总计 {len(audio_files)}")
    if total_duration > 0:
        print(f"\u23f1 音频总时长: {total_duration:.1f} 秒")
    print(f"\U0001f4c1 输出目录: {output_dir}")


if __name__ == "__main__":
    main()

脚本特点

特性 说明
自动扫描目录 递归处理子目录中的音频文件
多格式支持 wav / mp3 / ogg / flac / m4a
单次加载模型 只初始化一次模型,提升效率
进度显示 [1/100] 实时显示处理进度
错误隔离 单个文件失败不影响其他文件
统计汇总 处理完成后显示成功率、音频时长
支持热词 -w 参数传入热词提升识别率
可指定后缀 --suffix 控制输出文件格式

运行环境

项目 配置
系统 Rocky Linux 9.7
CPU 4 核
内存 16 GB
模型缓存 /root/.cache/modelscope/hub/models/

注意事项

  1. 内存占用:模型加载后占用约 4~6 GB,确保系统剩余内存充足。
  2. 文件大小:单次音频建议不超过 10 分钟,过大会导致内存溢出或超时。
  3. 批量性能:4 核 CPU RTF 约 0.146,4 秒音频约需 0.7 秒。大批量转写建议使用 nohup 后台运行。
  4. 模型复用:脚本启动时加载模型,多次执行需重启脚本。可考虑用 API 服务代替。