注意事项
科大讯飞语音转写 API 文档链接: https://www.xfyun.cn/doc/asr/lfasr/API.html.
科大讯飞语音转写Python3的demo下载链接:http://xfyun-doc.ufile.ucloud.com.cn/1564736425808301/weblfasr_python3_demo.zip
上一篇写了用百度智能云进行音频文件转写的博客,但是那个效果啊,有点惨不忍睹,至少我的识别结果是这样。然后转而使用了一下科大讯飞的,想着科大讯飞专门做语音相关的这一块,应该会好些。语音转写的Python3的demo代码确实很不错,函数接口很简洁,本文代码都是这个demo里面的。识别准确率还是可以的,而且不需要像百度那样整点才开始识别,很快就返回了识别结果。
如果你的录音是不止一个人,而是像电话录音那种,想把转写结果中不同人说的话的分离出来,请按照下面这样添加预处理参数(demo中默认是没有添加这儿最后两个参数的,不添加的话,默认是不进行角色分离的):
这样的话,转写结果的speaker的值就不全是0了,而是根据不同的人对转写结果进行分离:
操作系统:Windows
Python:3.6
可用时长: 免费用户时长5小时,且用且珍惜。
音频属性: 采样率16k或8k、位长8bits或16bits、单声道&多声道
音频格式: wav/flac/opus/m4a/mp3
音频大小: 不超过500M
音频时长: 不超过5小时,建议5分钟以上
语言种类: 中文普通话、英文
转写结果保存时长 30天。(同一通录音不需要重新上传识别,如果你已经上传识别过了,之后只需要使用api.get_result_request(taskid)的方式即可再次获取识别结果,taskid是你第一次上传录音时给你分配的任务ID,避免重复上传浪费可用时长)
APP_ID, SECRET_KEY的获取
讯飞的好像不需要API_KEY,开放授权的方式和其他大厂的类似:
1、页面右上方“控制台”点击进入,登录讯飞账号(没有就注册一个),进入讯飞开放平台。
2、左侧导航栏上方,依次选择 语音识别->语音转写->离线语音转写识别。
3、服务申请。点击“创建应用”,“接口选择”已默认勾选完成,如无其他需求,无需勾选,完成其他资料后,点击最下方“立即创建”按钮。自己可以手动领取5小时免费试用体验包。
4、应用成功则页面显示“创建完毕”,点击”返回应用列表”, 查看新创建应用详情,在服务接口认证信息窗口就可以看到返回的AppID,SecretKey。
话不多说,直接上代码了
# -*- coding: utf-8 -*-
#
# author: yanmeng2
#
# 非实时转写调用demo
import
base64
import
hashlib
import
hmac
import
json
import
os
import
time
import
requests
lfasr_host
=
'http://raasr.xfyun.cn/api'
# 请求的接口名
api_prepare
=
'/prepare'
api_upload
=
'/upload'
api_merge
=
'/merge'
api_get_progress
=
'/getProgress'
api_get_result
=
'/getResult'
# 文件分片大小10M
file_piece_sice
=
10485760
# ——————————————————转写可配置参数————————————————
# 参数可在官网界面(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看,根据需求可自行在gene_params方法里添加修改
# 转写类型
lfasr_type
=
0
# 是否开启分词
has_participle
=
'false'
has_seperate
=
'true'
# 多候选词个数
max_alternatives
=
0
# 子用户标识
suid
=
''
class
SliceIdGenerator
:
"""slice id生成器"""
def
__init__
(
self
)
:
self
.
__ch
=
'aaaaaaaaa`'
def
getNextSliceId
(
self
)
:
ch
=
self
.
__ch
j
=
len
(
ch
)
-
1
while
j
>=
0
:
cj
=
ch
[
j
]
if
cj
!=
'z'
:
ch
=
ch
[
:
j
]
+
chr
(
ord
(
cj
)
+
1
)
+
ch
[
j
+
1
:
]
break
else
:
ch
=
ch
[
:
j
]
+
'a'
+
ch
[
j
+
1
:
]
j
=
j
-
1
self
.
__ch
=
ch
return
self
.
__ch
class
RequestApi
(
object
)
:
def
__init__
(
self
,
appid
,
secret_key
,
upload_file_path
)
:
self
.
appid
=
appid
self
.
secret_key
=
secret_key
self
.
upload_file_path
=
upload_file_path
# 根据不同的apiname生成不同的参数,本示例中未使用全部参数您可在官网(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看后选择适合业务场景的进行更换
def
gene_params
(
self
,
apiname
,
taskid
=
None
,
slice_id
=
None
)
:
appid
=
self
.
appid
secret_key
=
self
.
secret_key
upload_file_path
=
self
.
upload_file_path
ts
=
str
(
int
(
time
.
time
(
)
)
)
m2
=
hashlib
.
md5
(
)
m2
.
update
(
(
appid
+
ts
)
.
encode
(
'utf-8'
)
)
md5
=
m2
.
hexdigest
(
)
md5
=
bytes
(
md5
,
encoding
=
'utf-8'
)
# 以secret_key为key, 上面的md5为msg, 使用hashlib.sha1加密结果为signa
signa
=
hmac
.
new
(
secret_key
.
encode
(
'utf-8'
)
,
md5
,
hashlib
.
sha1
)
.
digest
(
)
signa
=
base64
.
b64encode
(
signa
)
signa
=
str
(
signa
,
'utf-8'
)
file_len
=
os
.
path
.
getsize
(
upload_file_path
)
file_name
=
os
.
path
.
basename
(
upload_file_path
)
param_dict
=
{
}
if
apiname
==
api_prepare
:
# slice_num是指分片数量,如果您使用的音频都是较短音频也可以不分片,直接将slice_num指定为1即可
slice_num
=
int
(
file_len
/
file_piece_sice
)
+
(
0
if
(
file_len
%
file_piece_sice
==
0
)
else
1
)
param_dict
[
'app_id'
]
=
appid
param_dict
[
'signa'
]
=
signa
param_dict
[
'ts'
]
=
ts
param_dict
[
'file_len'
]
=
str
(
file_len
)
param_dict
[
'file_name'
]
=
file_name
param_dict
[
'slice_num'
]
=
str
(
slice_num
)
elif
apiname
==
api_upload
:
param_dict
[
'app_id'
]
=
appid
param_dict
[
'signa'
]
=
signa
param_dict
[
'ts'
]
=
ts
param_dict
[
'task_id'
]
=
taskid
param_dict
[
'slice_id'
]
=
slice_id
elif
apiname
==
api_merge
:
param_dict
[
'app_id'
]
=
appid
param_dict
[
'signa'
]
=
signa
param_dict
[
'ts'
]
=
ts
param_dict
[
'task_id'
]
=
taskid
param_dict
[
'file_name'
]
=
file_name
elif
apiname
==
api_get_progress
or
apiname
==
api_get_result
:
param_dict
[
'app_id'
]
=
appid
param_dict
[
'signa'
]
=
signa
param_dict
[
'ts'
]
=
ts
param_dict
[
'task_id'
]
=
taskid
return
param_dict
# 请求和结果解析,结果中各个字段的含义可参考:https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html
def
gene_request
(
self
,
apiname
,
data
,
files
=
None
,
headers
=
None
)
:
response
=
requests
.
post
(
lfasr_host
+
apiname
,
data
=
data
,
files
=
files
,
headers
=
headers
)
result
=
json
.
loads
(
response
.
text
)
if
result
[
"ok"
]
==
0
:
print
(
"{} success:"
.
format
(
apiname
)
+
str
(
result
)
)
return
result
else
:
print
(
"{} error:"
.
format
(
apiname
)
+
str
(
result
)
)
exit
(
0
)
return
result
# 预处理
def
prepare_request
(
self
)
:
return
self
.
gene_request
(
apiname
=
api_prepare
,
data
=
self
.
gene_params
(
api_prepare
)
)
# 上传
def
upload_request
(
self
,
taskid
,
upload_file_path
)
:
file_object
=
open
(
upload_file_path
,
'rb'
)
try
:
index
=
1
sig
=
SliceIdGenerator
(
)
while
True
:
content
=
file_object
.
read
(
file_piece_sice
)
if
not
content
or
len
(
content
)
==
0
:
break
files
=
{
"filename"
:
self
.
gene_params
(
api_upload
)
.
get
(
"slice_id"
)
,
"content"
:
content
}
response
=
self
.
gene_request
(
api_upload
,
data
=
self
.
gene_params
(
api_upload
,
taskid
=
taskid
,
slice_id
=
sig
.
getNextSliceId
(
)
)
,
files
=
files
)
if
response
.
get
(
'ok'
)
!=
0
:
# 上传分片失败
print
(
'upload slice fail, response: '
+
str
(
response
)
)
return
False
print
(
'upload slice '
+
str
(
index
)
+
' success'
)
index
+=
1
finally
:
'file index:'
+
str
(
file_object
.
tell
(
)
)
file_object
.
close
(
)
return
True
# 合并
def
merge_request
(
self
,
taskid
)
:
return
self
.
gene_request
(
api_merge
,
data
=
self
.
gene_params
(
api_merge
,
taskid
=
taskid
)
)
# 获取进度
def
get_progress_request
(
self
,
taskid
)
:
return
self
.
gene_request
(
api_get_progress
,
data
=
self
.
gene_params
(
api_get_progress
,
taskid
=
taskid
)
)
# 获取结果
def
get_result_request
(
self
,
taskid
)
:
return
self
.
gene_request
(
api_get_result
,
data
=
self
.
gene_params
(
api_get_result
,
taskid
=
taskid
)
)
def
all_api_request
(
self
)
:
# 1. 预处理
pre_result
=
self
.
prepare_request
(
)
taskid
=
pre_result
[
"data"
]
# 2 . 分片上传
self
.
upload_request
(
taskid
=
taskid
,
upload_file_path
=
self
.
upload_file_path
)
# 3 . 文件合并
self
.
merge_request
(
taskid
=
taskid
)
# 4 . 获取任务进度
while
True
:
# 每隔20秒获取一次任务进度
progress
=
self
.
get_progress_request
(
taskid
)
progress_dic
=
progress
if
progress_dic
[
'err_no'
]
!=
0
and
progress_dic
[
'err_no'
]
!=
26605
:
print
(
'task error: '
+
progress_dic
[
'failed'
]
)
return
else
:
data
=
progress_dic
[
'data'
]
task_status
=
json
.
loads
(
data
)
if
task_status
[
'status'
]
==
9
:
print
(
'task '
+
taskid
+
' finished'
)
break
print
(
'The task '
+
taskid
+
' is in processing, task status: '
+
str
(
data
)
)
# 每次获取进度间隔20S
time
.
sleep
(
20
)
# 5 . 获取结果
self
.
get_result_request
(
taskid
=
taskid
)
# 注意:如果出现requests模块报错:"NoneType" object has no attribute 'read', 请尝试将requests模块更新到2.20.0或以上版本(本demo测试版本为2.20.0)
# 输入讯飞开放平台的appid,secret_key和待转写的文件路径
if
__name__
==
'__main__'
:
APP_ID
=
"***"
SECRET_KEY
=
"****"
file_path
=
r
"***.wav"
api
=
RequestApi
(
appid
=
APP_ID
,
secret_key
=
SECRET_KEY
,
upload_file_path
=
file_path
)
api
.
all_api_request
(
)
当然,你可以根据自己的需求对demo进行改进,比如你想并发识别录音,你可以添加多线程执行的函数,为了获取taskid方便,我在class的初始化里边添加了self.taskid = “None”,并在预处理结果返回之后重新对taskid赋值。
def
thread_func
(
wav_file_path
,
txt_file_path
)
:
# 线程函数,方便并发识别录音
doc
=
open
(
txt_file_path
,
'w'
,
encoding
=
'utf-8'
)
# doc.close()
api
=
RequestApi
(
appid
=
APP_ID
,
secret_key
=
SECRET_KEY
,
upload_file_path
=
wav_file_path
)
api
.
all_api_request
(
)
# demo中这个函数是完整过程执行,但我把提取结果的模块提出来了
print
(
'taskid is: '
+
api
.
taskid
,
file
=
doc
)
result
=
api
.
get_result_request
(
api
.
taskid
)
result
=
eval
(
result
[
'data'
]
)
# print(result)
for
x
in
result
:
print
(
x
)
print
(
x
,
file
=
doc
)
doc
.
close
(
)
#主函数写成类似这种
if
__name__
==
'__main__'
:
APP_ID
=
"***"
SECRET_KEY
=
"***"
file_read_path
=
r
"D:\MyProject\Python\Voice_SDK\20190820\\"
file_save_path
=
r
"D:\MyProject\Python\Voice_SDK\20190820_xunfei\\"
for
file
in
file_list
:
#多并发批量执行
wav_file_path
=
file_read_path
+
file
+
".wav"
txt_file_path
=
file_save_path
+
file
+
".txt"
t
=
threading
.
Thread
(
target
=
thread_func
,
args
=
(
wav_file_path
,
txt_file_path
)
)
t
.
start
(
)