#!/usr/bin/env python3
"""
FunASR 语音转写 API 服务
用法: python asr_server.py
依赖: pip install fastapi uvicorn python-multipart
"""
import os
import time
import tempfile
from pathlib import Path
from contextlib import asynccontextmanager
from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from fastapi.responses import JSONResponse
from funasr import AutoModel
# 支持的文件扩展名
ALLOWED_EXTENSIONS = {".wav", ".mp3", ".ogg", ".flac", ".m4a"}
# 全局模型实例
model_instance = None
def load_asr_model():
"""加载 FunASR 模型"""
global model_instance
if model_instance is None:
print("\u23f3 正在加载 FunASR 模型 (首次运行约需 10-20s)...")
model_instance = AutoModel(
model="paraformer-zh",
vad_model="fsmn-vad",
punc_model="ct-punc",
)
print("\u2705 模型加载完成")
return model_instance
@asynccontextmanager
async def lifespan(app: FastAPI):
"""服务启动时初始化模型"""
load_asr_model()
yield
app = FastAPI(
title="FunASR Service",
description="基于 FunASR 的语音转文字 API 服务",
lifespan=lifespan,
)
@app.get("/health")
async def health_check():
"""健康检查接口"""
return {
"status": "healthy",
"model_loaded": model_instance is not None,
"uptime": time.time(),
}
@app.post("/asr")
async def transcribe_audio(
file: UploadFile = File(...),
hotword: str = Form(None),
):
"""
语音转文字接口
- file: 音频文件(wav/mp3/ogg/flac/m4a)
- hotword: 热词(可选),多个词用空格分隔
"""
global model_instance
model_instance = load_asr_model()
# 1. 校验文件格式
file_ext = Path(file.filename).suffix.lower() if file.filename else ""
if file_ext not in ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"不支持的文件格式: {file_ext}。支持: {', '.join(ALLOWED_EXTENSIONS)}",
)
# 2. 保存临时文件
temp_dir = Path(tempfile.gettempdir())
temp_file_path = temp_dir / f"funasr_{int(time.time() * 1000)}_{file.filename}"
try:
with open(temp_file_path, "wb") as f:
content = await file.read()
f.write(content)
print(f"\U0001f3a4 开始转写: {file.filename} ({len(content)/1024/1024:.1f}MB)")
# 3. 执行转写
start_time = time.time()
result = model_instance.generate(
input=str(temp_file_path),
batch_size_s=300,
hotword=hotword,
)
end_time = time.time()
# 4. 解析结果
if result and len(result) > 0 and "text" in result[0]:
text = result[0]["text"]
duration = result[0].get("time_speech", end_time - start_time)
return JSONResponse(
content={
"status": "success",
"filename": file.filename,
"text": text,
"duration_s": round(duration, 2),
"cost_time_s": round(end_time - start_time, 2),
}
)
else:
raise HTTPException(status_code=500, detail="转写结果为空,音频可能无法识别")
except Exception as e:
raise HTTPException(status_code=500, detail=f"转写失败: {str(e)}")
finally:
# 5. 清理临时文件
if temp_file_path.exists():
os.remove(temp_file_path)
if __name__ == "__main__":
import uvicorn
# host="0.0.0.0" 允许局域网访问
# port=8000 端口号
# workers=1 CPU环境建议单进程
uvicorn.run(app, host="0.0.0.0", port=8000, workers=1)