本文参考andrew Ng, Sequence Model, notebook-Trigger word detection.

概念

当你对苹果手机叫出”hey siri”,或对小米手机叫出”小爱同学”时,手机助手会立刻出现,这就是触发词识别系统,是一种识别音频并且在接收到某个触发词的时候激活的程序。不同于一般的语音识别需要大量的数据(超过10W小时)来训练,触发词系统的训练相对简单很多。

当你完成这个程序的时候,你可以将它扩展并布置到自己的电脑上,每当你说出某个激活词的时候,你的电脑可以随即自动打开某个app,或者是播放某一首音乐,听起来是不是很棒(吧?)。

因为声音数据是序列数据,我们用RNN&GRU来做这个用识别触发词的程序,当程序听见语音”激活”的时候就给出一个某种反应。

创建语音数据集

语音数据是很难获取的一类数据,这里我们需要的样本是10秒钟的数据,其中会随时出现我们的激活词。如果我们自己录制这些样本,因为数量比较大,这将会相当困难,于是我们考虑使用音频合成来制作我们的数据集。
首先,我们需要一些用于合成的元数据。因此我们需要去到各种环境中用不同人,不同口音录制”激活”以及任何其它语音。
将它们分为 positive\negative\background三类,positive为不同人不同口音念我们的激活词, negative 则是随机的其它词,background则是各种背景音,持续10秒。
PS:因为口音和词语长短不同,positive和negative的语音文件长度不定。

收集完语音后,我们需要对其进行处理。
一般来说,录制的声音文件根据设备的不同,拥有不同的HZ数。这里假设我们采集的样本为44100HZ,即录制的音频文件每秒有44100个数字。

要直接处理这种文件相对来说是较为困难的,所以更流行的做法是将数据用傅里叶变换转化为频谱(spectrogram),这是信号处理相关的知识。这里我们直接用软件包实现。

x = graph_spectrogram("audios/train_example.wav")
{% asset_img spectrogram-img.png 频谱片 %}

频谱图直观上体现的是频率(y轴)和时间(x轴)的图像关系,偏绿的颜色表示频率较高,而蓝色则相反。

频谱输出的数据维度是由程序的超参数和输入数据的长度决定的,这里我们使用的转换程序,10秒的数据将拥有5511个timestep,即Tx=5511.

print("spectrogram shape:", x.shape)

输出为(101,5511)。
因此我们可以定义:

Tx = 5511 
n_freq = 101 # 每个timestep中输入到模型中的频率数
Ty = 1375 # 我们程序的输出将会把10秒切分为1375份

合成语音

三个步骤

  1. 首先我们随机选取一个background音频
  2. 随机将0-4个正样本音频片段插入
  3. 随机将0-2个负样本音频片段插入
    因为是我们控制的插入,我们同时可以得到Ylabel数据,即第几个timestep为激活词刚刚说完的时刻,记为y<t>=1,但一般来说为了避免label数据过于稀疏,我们会把激活词结束后的一段timestep的y都置为1,这里我们选择的一段为50个timestep。

使用pydub包来处理音频,这个包将会用ms作为时间轴最小单位,10秒的数据将会有10000个timestep.

注意,合成后的音频应该依然为10秒,即正负样本都应该被background完全容纳,且二者互相不能重叠。

activates, negatives, backgrounds = load_raw_audio()

先来实现几个工具方法:

# 获取插入位置
def get_random_time_segment(segment_ms):
    """
    从10000个timestep中随机取一个timestep,作为之后的插入位置
    
    Arguments:
    segment_ms -- 要插入音频的ms长度
    
    Returns:
    segment_time -- a tuple(segment_start, segment_end) in ms,插入的开始和结束timestemp
    """
    
    segment_start = np.random.randint(low=0, high=10000-segment_ms)   # 确保插入的音频不会超出background的结尾
    segment_end = segment_start + segment_ms - 1
    
    return (segment_start, segment_end)

# 检查是否发生重叠
def is_overlapping(segment_time, previous_segments):
    """
    Arguments:
    segment_time -- a tuple(segment_start, segment_end) 新插入片段的起止时间
    previous_segments -- a list of tuples(segment_start, segment_end) 已经插入的片段的起止时间列表
    
    Returns:
    True or False 代表是否发生重叠
    """
    
    segment_start, segment_end = segment_time

    overlap = False
    
    for previous_start, previous_end in previous_segments:
        # 作业提示可以用 if ... <= ... and ... >= ... 这种形式实现,我没想出来呢
        if (previous_start > segment_end or segment_start > previous_end) == False :
            overlap = True

    return overlap

# 向background中随机位置插入音频片段,确保不会发生重叠和超出
def insert_audio_clip(background, audio_clip, previous_segments):
    """ 
    Arguments:
    background -- 10秒钟的背景音频.  
    audio_clip -- 要插入的音频. 
    previous_segments -- 已经插入在background中的音频的起止时间
    
    Returns:
    new_background -- 插入音频后的新的background
    """
    
    # Get the duration of the audio clip in ms
    segment_ms = len(audio_clip)
    
    segment_time = get_random_time_segment(segment_ms)
    
    while is_overlapping(segment_time, previous_segments):
        segment_time = get_random_time_segment(segment_ms)

    previous_segments.append(segment_time)
    
    # 将音频在background的对应位置叠加进去
    new_background = background.overlay(audio_clip, position = segment_time[0])
    
    return new_background, segment_time

插入了激活词语音后,我们就需要对Ylabel做出更新,如我们上面所说,将结束位置的后面50个y都更新为1。
因为我们的y的维度为1375而音频数据维度为10000,所以要记得做一下缩放。还要确保如果音频插入在background的尾部,导致后面没剩下50个值,注意不要越界。

def insert_ones(y, segment_end_ms):
    """
    Arguments:
    y -- numpy array of shape (1, Ty),训练样本的标签值
    segment_end_ms -- 插入的激活音频的结束时间
    
    Returns:
    y -- 更新后的 labels
    """
    
    # 缩放
    segment_end_y = int(segment_end_ms * Ty / 10000.0)
    
    for i in range(segment_end_y, segment_end_y + 50):
        if i < Ty:
            y[0, i] = 1
    
    return y

接下来就可以创建我们的训练样本了:

def create_training_example(background, activates, negatives):
    """
    Creates a training example with a given background, activates, and negatives.
    
    Arguments:
    background -- a 10 second background audio recording
    activates -- a list of audio segments of the word "activate"
    negatives -- a list of audio segments of random words that are not "activate"
    
    Returns:
    x -- the spectrogram of the training example
    y -- the label at each time step of the spectrogram
    """
    
    # 减小噪音
    background = background - 20

    y = np.zeros((1,Ty))

    previous_segments = []
    
    # 随机插入0-4段激活音频
    number_of_activates = np.random.randint(0, 5)
    random_indices = np.random.randint(len(activates), size=number_of_activates)
    random_activates = [activates[i] for i in random_indices]
    
    for random_activate in random_activates:
        background, segment_time = insert_audio_clip(background, random_activate, previous_segments)
        segment_start, segment_end = segment_time
        y = insert_ones(y, segment_end)

    # 随机插入0-2段negative语音
    number_of_negatives = np.random.randint(0, 3)
    random_indices = np.random.randint(len(negatives), size=number_of_negatives)
    random_negatives = [negatives[i] for i in random_indices]

    for random_negative in random_negatives:
        background, _ = insert_audio_clip(background, random_negative, previous_segments)
    
    # 标准化一波 
    background = match_target_amplitude(background, -20.0)

    # 导出训练样本
    file_handle = background.export("train" + ".wav", format="wav")
    print("File (train.wav) was saved in your directory.")
    
    # 绘制频谱图
    x = graph_spectrogram("train.wav")
    
    return x, y

然后我们就用这个方法去创建一大堆的训练样本。
然后将创建好的训练样本赋值给 XY

模型

我们用keres来实现这个模型。
模型共四层

  • 第一层是一个卷积层,因为我们的频谱数据为1维,所以是一维卷积(196个滤波器,滤波器长度15,步幅为4),卷积后再做一次BN,并用ReLu激活,再用Dropout进行一次正则化。
  • 第二层是一个GRU层,保证网络的记忆能力,并且也对输出的数据做Dropout和BN处理。
  • 第三层继续用一个GRU层。
  • 最后一层为全连接,且接sofxmax作为输出。

如下:

代码:

def model(input_shape):
    """
    用keras构造训练模型
    
    Argument:
    input_shape -- 模型输入数据的形状

    Returns:
    model -- keras model 实例
    """
    
    X_input = Input(shape = input_shape)
    
    # Layer 1: CONV layer 
    X = Conv1D(filters=196,kernel_size=15,strides=4)(X_input)   # CONV1D
    X = BatchNormalization()(X)                                 # Batch normalization
    X = Activation("relu")(X)                                   # ReLu activation
    X = Dropout(rate=0.8)(X)                                    # dropout (use 0.8)

    # Layer 2: First GRU Layer
    X = GRU(units=128, return_sequences = True)(X)           # GRU (use 128 units and return the sequences)
    X = Dropout(rate=0.8)(X)                                 # dropout (use 0.8)
    X = BatchNormalization()(X)                              # Batch normalization
    
    # Layer 3: Second GRU Layer
    X = GRU(units=128, return_sequences = True)(X)           # GRU (use 128 units and return the sequences)
    X = Dropout(rate=0.8)(X)                                 # dropout (use 0.8)
    X = BatchNormalization()(X)                              # Batch normalization
    X = Dropout(rate=0.8)(X)                                 # dropout (use 0.8)
    
    # Layer 4: Time-distributed dense layer
    X = TimeDistributed(Dense(1, activation = "sigmoid"))(X) # time distributed  (sigmoid)

    model = Model(inputs = X_input, outputs = X)
    
    return model

注意上面最后一层的 TimeDistributed 层,这里有关于它的使用的完全解释。
简单的说,它是使用timestep来进行操作,且对每个timestep的数据都共享权重。
比如这里我们用它包装了Dense,即是说对每个timestep的结果,都使用同样的参数计算Dense输出结果。

接下来就可以进行训练了:

opt = Adam(lr=0.0001, beta_1=0.9, beta_2=0.999, decay=0.01)
model.compile(loss='binary_crossentropy', optimizer=opt, metrics=["accuracy"])
model.fit(X, Y, batch_size = 5, epochs=100)

得到:
Epoch 100/100
26/26 [==============================] - 34s - loss: 0.0610 - acc: 0.9796

接下来如果有开发集,可以对模型进行测试,开发集样本最好来自真实分布,即去真实的环境说话进行录音。

预测和更多操作

接下来就可以用这个训练好的模型进行预测了:

x = graph_spectrogram(filename)
x  = x.swapaxes(0,1)
x = np.expand_dims(x, axis=0)
predictions = model.predict(x)

还可以在触发的时候做更多的操作:

consecutive_timesteps = 0
for i in range(Ty):
        consecutive_timesteps += 1
        if predictions[0,i,0] > threshold:
            # 任何操作
            whatever_you_wanna_do()