本文参考andrew Ng, Sequence Model, notebook-Trigger word detection.
概念
当你对苹果手机叫出”hey siri”,或对小米手机叫出”小爱同学”时,手机助手会立刻出现,这就是触发词识别系统,是一种识别音频并且在接收到某个触发词的时候激活的程序。不同于一般的语音识别需要大量的数据(超过10W小时)来训练,触发词系统的训练相对简单很多。
当你完成这个程序的时候,你可以将它扩展并布置到自己的电脑上,每当你说出某个激活词的时候,你的电脑可以随即自动打开某个app,或者是播放某一首音乐,听起来是不是很棒(吧?)。
因为声音数据是序列数据,我们用RNN&GRU来做这个用识别触发词的程序,当程序听见语音”激活”的时候就给出一个某种反应。
创建语音数据集
语音数据是很难获取的一类数据,这里我们需要的样本是10秒钟的数据,其中会随时出现我们的激活词。如果我们自己录制这些样本,因为数量比较大,这将会相当困难,于是我们考虑使用音频合成来制作我们的数据集。
首先,我们需要一些用于合成的元数据。因此我们需要去到各种环境中用不同人,不同口音录制”激活”以及任何其它语音。
将它们分为 positive\negative\background三类,positive为不同人不同口音念我们的激活词, negative 则是随机的其它词,background则是各种背景音,持续10秒。
PS:因为口音和词语长短不同,positive和negative的语音文件长度不定。
收集完语音后,我们需要对其进行处理。
一般来说,录制的声音文件根据设备的不同,拥有不同的HZ数。这里假设我们采集的样本为44100HZ,即录制的音频文件每秒有44100个数字。
要直接处理这种文件相对来说是较为困难的,所以更流行的做法是将数据用傅里叶变换转化为频谱(spectrogram),这是信号处理相关的知识。这里我们直接用软件包实现。
x = graph_spectrogram("audios/train_example.wav")
{% asset_img spectrogram-img.png 频谱片 %}
频谱图直观上体现的是频率(y轴)和时间(x轴)的图像关系,偏绿的颜色表示频率较高,而蓝色则相反。
频谱输出的数据维度是由程序的超参数和输入数据的长度决定的,这里我们使用的转换程序,10秒的数据将拥有5511个timestep,即Tx=5511.
print("spectrogram shape:", x.shape)
输出为(101,5511)。
因此我们可以定义:
Tx = 5511
n_freq = 101 # 每个timestep中输入到模型中的频率数
Ty = 1375 # 我们程序的输出将会把10秒切分为1375份
合成语音
三个步骤
- 首先我们随机选取一个background音频
- 随机将0-4个正样本音频片段插入
- 随机将0-2个负样本音频片段插入
因为是我们控制的插入,我们同时可以得到Ylabel数据,即第几个timestep为激活词刚刚说完的时刻,记为y<t>=1,但一般来说为了避免label数据过于稀疏,我们会把激活词结束后的一段timestep的y都置为1,这里我们选择的一段为50个timestep。
使用pydub包来处理音频,这个包将会用ms作为时间轴最小单位,10秒的数据将会有10000个timestep.
注意,合成后的音频应该依然为10秒,即正负样本都应该被background完全容纳,且二者互相不能重叠。
activates, negatives, backgrounds = load_raw_audio()
先来实现几个工具方法:
# 获取插入位置
def get_random_time_segment(segment_ms):
"""
从10000个timestep中随机取一个timestep,作为之后的插入位置
Arguments:
segment_ms -- 要插入音频的ms长度
Returns:
segment_time -- a tuple(segment_start, segment_end) in ms,插入的开始和结束timestemp
"""
segment_start = np.random.randint(low=0, high=10000-segment_ms) # 确保插入的音频不会超出background的结尾
segment_end = segment_start + segment_ms - 1
return (segment_start, segment_end)
# 检查是否发生重叠
def is_overlapping(segment_time, previous_segments):
"""
Arguments:
segment_time -- a tuple(segment_start, segment_end) 新插入片段的起止时间
previous_segments -- a list of tuples(segment_start, segment_end) 已经插入的片段的起止时间列表
Returns:
True or False 代表是否发生重叠
"""
segment_start, segment_end = segment_time
overlap = False
for previous_start, previous_end in previous_segments:
# 作业提示可以用 if ... <= ... and ... >= ... 这种形式实现,我没想出来呢
if (previous_start > segment_end or segment_start > previous_end) == False :
overlap = True
return overlap
# 向background中随机位置插入音频片段,确保不会发生重叠和超出
def insert_audio_clip(background, audio_clip, previous_segments):
"""
Arguments:
background -- 10秒钟的背景音频.
audio_clip -- 要插入的音频.
previous_segments -- 已经插入在background中的音频的起止时间
Returns:
new_background -- 插入音频后的新的background
"""
# Get the duration of the audio clip in ms
segment_ms = len(audio_clip)
segment_time = get_random_time_segment(segment_ms)
while is_overlapping(segment_time, previous_segments):
segment_time = get_random_time_segment(segment_ms)
previous_segments.append(segment_time)
# 将音频在background的对应位置叠加进去
new_background = background.overlay(audio_clip, position = segment_time[0])
return new_background, segment_time
插入了激活词语音后,我们就需要对Ylabel做出更新,如我们上面所说,将结束位置的后面50个y都更新为1。
因为我们的y的维度为1375而音频数据维度为10000,所以要记得做一下缩放。还要确保如果音频插入在background的尾部,导致后面没剩下50个值,注意不要越界。
def insert_ones(y, segment_end_ms):
"""
Arguments:
y -- numpy array of shape (1, Ty),训练样本的标签值
segment_end_ms -- 插入的激活音频的结束时间
Returns:
y -- 更新后的 labels
"""
# 缩放
segment_end_y = int(segment_end_ms * Ty / 10000.0)
for i in range(segment_end_y, segment_end_y + 50):
if i < Ty:
y[0, i] = 1
return y
接下来就可以创建我们的训练样本了:
def create_training_example(background, activates, negatives):
"""
Creates a training example with a given background, activates, and negatives.
Arguments:
background -- a 10 second background audio recording
activates -- a list of audio segments of the word "activate"
negatives -- a list of audio segments of random words that are not "activate"
Returns:
x -- the spectrogram of the training example
y -- the label at each time step of the spectrogram
"""
# 减小噪音
background = background - 20
y = np.zeros((1,Ty))
previous_segments = []
# 随机插入0-4段激活音频
number_of_activates = np.random.randint(0, 5)
random_indices = np.random.randint(len(activates), size=number_of_activates)
random_activates = [activates[i] for i in random_indices]
for random_activate in random_activates:
background, segment_time = insert_audio_clip(background, random_activate, previous_segments)
segment_start, segment_end = segment_time
y = insert_ones(y, segment_end)
# 随机插入0-2段negative语音
number_of_negatives = np.random.randint(0, 3)
random_indices = np.random.randint(len(negatives), size=number_of_negatives)
random_negatives = [negatives[i] for i in random_indices]
for random_negative in random_negatives:
background, _ = insert_audio_clip(background, random_negative, previous_segments)
# 标准化一波
background = match_target_amplitude(background, -20.0)
# 导出训练样本
file_handle = background.export("train" + ".wav", format="wav")
print("File (train.wav) was saved in your directory.")
# 绘制频谱图
x = graph_spectrogram("train.wav")
return x, y
然后我们就用这个方法去创建一大堆的训练样本。
然后将创建好的训练样本赋值给 X 和 Y
模型
我们用keres来实现这个模型。
模型共四层
- 第一层是一个卷积层,因为我们的频谱数据为1维,所以是一维卷积(196个滤波器,滤波器长度15,步幅为4),卷积后再做一次BN,并用ReLu激活,再用Dropout进行一次正则化。
- 第二层是一个GRU层,保证网络的记忆能力,并且也对输出的数据做Dropout和BN处理。
- 第三层继续用一个GRU层。
- 最后一层为全连接,且接sofxmax作为输出。
如下:
代码:
def model(input_shape):
"""
用keras构造训练模型
Argument:
input_shape -- 模型输入数据的形状
Returns:
model -- keras model 实例
"""
X_input = Input(shape = input_shape)
# Layer 1: CONV layer
X = Conv1D(filters=196,kernel_size=15,strides=4)(X_input) # CONV1D
X = BatchNormalization()(X) # Batch normalization
X = Activation("relu")(X) # ReLu activation
X = Dropout(rate=0.8)(X) # dropout (use 0.8)
# Layer 2: First GRU Layer
X = GRU(units=128, return_sequences = True)(X) # GRU (use 128 units and return the sequences)
X = Dropout(rate=0.8)(X) # dropout (use 0.8)
X = BatchNormalization()(X) # Batch normalization
# Layer 3: Second GRU Layer
X = GRU(units=128, return_sequences = True)(X) # GRU (use 128 units and return the sequences)
X = Dropout(rate=0.8)(X) # dropout (use 0.8)
X = BatchNormalization()(X) # Batch normalization
X = Dropout(rate=0.8)(X) # dropout (use 0.8)
# Layer 4: Time-distributed dense layer
X = TimeDistributed(Dense(1, activation = "sigmoid"))(X) # time distributed (sigmoid)
model = Model(inputs = X_input, outputs = X)
return model
注意上面最后一层的 TimeDistributed 层,这里有关于它的使用的完全解释。
简单的说,它是使用timestep来进行操作,且对每个timestep的数据都共享权重。
比如这里我们用它包装了Dense,即是说对每个timestep的结果,都使用同样的参数计算Dense输出结果。
接下来就可以进行训练了:
opt = Adam(lr=0.0001, beta_1=0.9, beta_2=0.999, decay=0.01)
model.compile(loss='binary_crossentropy', optimizer=opt, metrics=["accuracy"])
model.fit(X, Y, batch_size = 5, epochs=100)
得到:
Epoch 100/100
26/26 [==============================] - 34s - loss: 0.0610 - acc: 0.9796
接下来如果有开发集,可以对模型进行测试,开发集样本最好来自真实分布,即去真实的环境说话进行录音。
预测和更多操作
接下来就可以用这个训练好的模型进行预测了:
x = graph_spectrogram(filename)
x = x.swapaxes(0,1)
x = np.expand_dims(x, axis=0)
predictions = model.predict(x)
还可以在触发的时候做更多的操作:
consecutive_timesteps = 0
for i in range(Ty):
consecutive_timesteps += 1
if predictions[0,i,0] > threshold:
# 任何操作
whatever_you_wanna_do()
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!