基于RNN的汉语拼音预测模型实现

本文将介绍如何使用循环神经网络(RNN)构建一个汉语拼音预测模型,并使用PyTorch框架进行实现。模型的训练和预测过程将详细讲解,同时还会分析时间步数对模型性能的影响。

1. 数据准备

首先,我们需要准备汉语拼音数据。这里使用一个包含所有汉语拼音的文本文件作为数据源。

import numpy as np
import matplotlib.pyplot as plt
import torch
import collections # 统计出现次数用
with open('D:\汉语音节表 (1).txt', encoding='UTF-8') as f:
    line = f.readline()  # 只有一行

# 提取词元,本次的词元为字符
tokens = list(line)
print('音节表中共有%d个字符' % len(tokens))

# 对词元进行统计,得到语料
counter = collections.Counter(tokens)
token_freq = sorted(counter.items(), key=lambda x: x[1], reverse=True)
print('共有%d个不重复字符, 按出现次数前3个为:' % len(token_freq))
print(token_freq[:3])

# 构建字符与数字之间的关系
idx2token = [x[0] for x in token_freq]
token2idx = {token : idx for idx, token in enumerate(idx2token)}

# 将拼音表转化成数字
corpus = np.asarray([token2idx[t] for t in tokens])
print('拼音表转化前:')
print(tokens[:30])
print('拼音表转化后:')
print(corpus[:30])

2. 数据处理

为了方便模型训练,我们将数据处理成序列的形式。这里定义了两个函数:get_step_ind 用于生成单个序列,get_batch_ind 用于生成包含多个序列的Batch。

def get_step_ind(n_begin, n_steps, n_tokens):
    ''' 生成一个样本序列,以索引表示
    '''
    assert n_begin + n_steps < n_tokens, 'error: n_begin'
    x_ind = np.arange(n_begin,n_begin+n_steps)
    y_ind = x_ind + 1
    return (x_ind, y_ind)

def get_batch_ind(n_begins, n_steps, n_tokens):
    ''' 生成样本序列构成的Batch,以索引表示
    '''
    x_inds_list, y_inds_list = [],[]
    for n_begin in n_begins:
        x_ind, y_ind = get_step_ind(n_begin, n_steps, n_tokens)
        x_inds_list.append(x_ind)
        y_inds_list.append(y_ind)
    x_inds = np.vstack(x_inds_list)
    y_inds = np.vstack(y_inds_list)
    return (x_inds.T, y_inds.T)

3. 随机采样和顺序划分

我们使用两种方法对数据进行采样:随机采样和顺序划分。

  • 随机采样: get_random_batch 函数从数据中随机选择一定数量的序列,构成一个Batch。
  • 顺序划分: get_seq_batches 函数将数据按照顺序分成多个Batch,每个Batch包含一定数量的连续序列。
def get_random_batch(batch_size,n_steps, n_tokens):
    '''随机采样
       返回一个Batch
    '''
    valid_range = np.arange(n_tokens-n_steps-1)
    n_begins = np.random.choice(valid_range, size=(batch_size,), replace=False)
    return get_batch_ind(n_begins, n_steps, n_tokens)

def get_seq_batches(n_batch,batch_size, n_steps, n_tokens):
    ''' 顺序划分
        返回多个Batch,Batch之间按时间相关
    '''
    valid_range = np.arange(n_tokens-n_steps*n_batch-1)
    n_begins_0 = np.random.choice(valid_range, size=(batch_size,), replace=False)
    
    batch_seq_list = []
    for n_batch_seq in np.arange(n_batch):
        batch_seq_list.append( get_batch_ind(n_begins_0, n_steps, n_tokens) )
        n_begins_0 += n_steps
    
    return batch_seq_list
            
batch_size = 2
n_steps = 8
n_tokens = len(tokens)
n_batch = 3
n_vocab = len(idx2token)

X_rand_batch, Y_rand_batch = get_random_batch(batch_size,n_steps, n_tokens)
print('随机采样,以位置表示')
print(X_rand_batch)
print('随机采样,转化成数字')
print(corpus[X_rand_batch])
print('随机采样,将其中一个序列转化成字符')
print([idx2token[t] for t in corpus[X_rand_batch][:,0]] )
XY_seq_list = get_seq_batches(n_batch,batch_size,n_steps, n_tokens)
print('顺序划分,以位置表示')
print(XY_seq_list[0][0])
print(XY_seq_list[1][0])
print(XY_seq_list[-1][0])

4. One-hot 编码

我们将每个字符使用 one-hot 编码进行表示,方便模型的输入。

# one-hot 编码
one_hot_matrix = np.eye(len(idx2token))
X_enc = one_hot_matrix[corpus[X_rand_batch]] # 以随机采样一个Batch中的X为例
print('X的形状:')
print(X_enc.shape)

5. 模型构建

我们使用 PyTorch 框架构建 RNN 模型,循环单元可以选择 nn.RNN 或者 nn.GRU。模型结构如下:

  • 输入层: 将 one-hot 编码后的字符作为输入。
  • 循环层: 使用 nn.RNNnn.GRU 作为循环层,并设置隐藏层的大小和层数。
  • 输出层: 使用全连接层将循环层的输出映射到词表大小,并使用 softmax 函数进行概率预测。
class MyRNN(torch.nn.Module):
    def __init__(self, n_vocab, n_hid, n_layers=1):
        super(MyRNN, self).__init__()
        self.n_vocab = n_vocab
        self.n_hid = n_hid
        self.n_layers = n_layers
        
        self.rnn = torch.nn.RNN(self.n_vocab, self.n_hid, self.n_layers)
#         self.rnn = torch.nn.GRU(self.n_vocab, self.n_hid, self.n_layers)
        self.linear = torch.nn.Linear(self.n_hid, self.n_vocab)
        
    def forward(self, X, state):
        X = X.to(torch.float32)
        Y, state = self.rnn(X, state)
        
        # 全连接层⾸先将Y的形状改为(时间步数*批量⼤⼩,隐藏单元数)
        # 它的输出形状是(时间步数*批量⼤⼩,词表⼤⼩)。
        output = self.linear(Y.reshape((-1, Y.shape[-1])))
        return output, state
    
    def begin_state(self, batch_size):
        return torch.zeros((self.n_layers, batch_size, self.n_hid))

6. 模型训练

模型训练使用交叉熵损失函数和 Adam 优化器。

def train_data_iter(n_batch):
    for ii in range(n_batch):
        X_rand_batch, Y_rand_batch = get_random_batch(batch_size,n_steps, n_tokens)
        one_hot_matrix = np.eye(len(idx2token))
        X_enc = one_hot_matrix[corpus[X_rand_batch]]
        Y_enc = one_hot_matrix[corpus[Y_rand_batch]]
        yield torch.from_numpy(X_enc), torch.from_numpy(Y_enc)
        
batch_size = 10
n_steps = 100
        
model = MyRNN(n_vocab, 500)

criteria = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

train_loss_list = []
train_acc_list = []
model.train()
for batch_id, (X,Y) in enumerate(train_data_iter(1000)):
    state = model.begin_state(batch_size) 
    Y_hat, state = model(X, state)
    Y_target = Y.reshape((-1, Y.shape[-1]))
    loss = criteria(Y_hat, Y_target)
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    correct_batch = (Y_hat.argmax(1) == Y_target.argmax(1)).type(torch.float).sum().item()
    
    train_acc_batch = 100 * correct_batch / (n_steps * batch_size)
    
    train_loss_list.append(loss.item())
    train_acc_list.append(train_acc_batch)
    
    if batch_id % 100 == 0:
        print('batch_id:%d, loss:%f, acc:%f' % (batch_id, loss.item(), train_acc_batch))
    
train_loss = np.asarray(train_loss_list)
train_acc = np.asarray(train_acc_list)

print('训练后的困惑度:%f' % np.exp(train_loss[-1]))

plt.figure()
plt.plot(np.arange(len(train_loss)), np.exp(train_loss))
plt.grid(True)
plt.xlabel('n_updates')
plt.ylabel('perplexity')

plt.figure()
plt.plot(np.arange(len(train_acc)), train_acc)
plt.grid(True)
plt.xlabel('n_updates')
plt.ylabel('ACC')

7. 模型预测

模型训练完成后,我们可以进行预测。这里实现了两种预测方式:单步预测和 K 步预测。

  • 单步预测: 输入一个字符,模型预测下一个字符。
  • K 步预测: 输入一个字符序列,模型预测接下来的 K 个字符。
# 单步预测
def predict_one_step(model, prefix, state, idx2token):
    prefix = prefix[-1:] # 只取最后一个字符
    X = torch.zeros((1,1,n_vocab))
    X[0,0,token2idx[prefix]] = 1
    Y_hat, state = model(X, state)
    return Y_hat, state

# K 步预测
def predict_k_step(model, prefix, k, state, idx2token):
    Y_hat, state = predict_one_step(model, prefix, state, idx2token)
    for ii in range(k):
        predict_idx = Y_hat.argmax(1).item()
        prefix += idx2token[predict_idx]
        Y_hat, state = predict_one_step(model, prefix, state, idx2token)
    return prefix

# 测试单步预测和K步预测
prefix = 'zh'
state = model.begin_state(1)
print('单步预测结果:', predict_one_step(model, prefix, state, idx2token))
print('K步预测结果:', predict_k_step(model, prefix, 10, state, idx2token))

8. 时间步数的影响

时间步数是 RNN 模型的一个重要参数,它决定了模型可以学习到的序列长度。时间步数的选择会影响模型的性能。

我们可以通过改变时间步数,测试模型的性能,并分析其原因。

# 测试不同时间步数的模型性能
time_steps = [10, 20, 50, 100]
for n_steps in time_steps:
    model = MyRNN(n_vocab, 500)
    # 重新训练模型
    # ...
    # 测试模型性能
    # ...
    print('时间步数:%d, 性能:...' % n_steps)

总结

本文介绍了基于 RNN 的汉语拼音预测模型实现,并对时间步数的影响进行了测试和分析。

  • 模型构建时可以选择 nn.RNNnn.GRU 作为循环单元。
  • 训练时需要使用交叉熵损失函数和 Adam 优化器。
  • 时间步数的选择会影响模型的性能,需要根据具体情况进行调整。

希望本文能够帮助你更好地理解 RNN 模型,并应用它进行汉语拼音预测。

基于RNN的汉语拼音预测模型实现

原文地址: https://www.cveoy.top/t/topic/ovVJ 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录