使用循环神经网络学习汉语拼音的拼写本次实验重点为准备数据和模型。1拼音数据无声调:httpswwwjianguoyuncompDQ3els0Q-rqYBhi3pIgFIAA2定义数据集:采用字符模型因此一个字符为一个样本。每个样本采用one-hot编码。3样本是时间相关的分别实现序列的随机采样和序列的顺序划分4标签Y与X同形状但时间超前15准备数据:一次梯度更新使用的数据形状为:时间步Batch类
1.加载数据
import os import numpy as np
加载拼音数据,文件名为'pinyin.txt'
with open('pinyin.txt', 'r', encoding='utf-8') as f: data = f.read()
去除换行符
data = data.replace('\n', '')
去重并排序
chars = sorted(list(set(data)))
字符到索引的映射
char_to_idx = {char: idx for idx, char in enumerate(chars)}
索引到字符的映射
idx_to_char = {idx: char for idx, char in enumerate(chars)}
将数据转化为索引序列
data_idx = [char_to_idx[char] for char in data]
定义每个样本的长度,这里设定为50
seq_length = 50
定义每个batch的大小
batch_size = 32
计算序列的数量
num_seqs = len(data_idx) // seq_length
每个batch的数量
num_batches = num_seqs // batch_size
数据集
data_x = np.zeros((num_batches, batch_size, seq_length), dtype=np.int32) data_y = np.zeros((num_batches, batch_size, seq_length), dtype=np.int32)
填充数据集
for i in range(num_batches * batch_size): batch_idx = i // seq_length seq_idx = i % seq_length char_idx = data_idx[batch_idx * seq_length + seq_idx] if i < num_batches * batch_size - 1: data_x[batch_idx, i % batch_size, seq_idx] = char_idx data_y[batch_idx, i % batch_size, seq_idx] = data_idx[batch_idx * seq_length + seq_idx + 1]
将数据集划分为训练集和测试集
train_idx = int(num_batches * 0.8) train_x, test_x = data_x[:train_idx], data_x[train_idx:] train_y, test_y = data_y[:train_idx], data_y[train_idx:]
打印数据集信息
print('数据集大小:', len(data_idx)) print('字符数:', len(chars)) print('每个样本长度:', seq_length) print('每个batch大小:', batch_size) print('序列数量:', num_seqs) print('batch数量:', num_batches) print('训练集大小:', len(train_x)) print('测试集大小:', len(test_x))
2.定义模型
import torch import torch.nn as nn
class RNN(nn.Module): def init(self, input_size, hidden_size, output_size, num_layers=1): super(RNN, self).init() self.hidden_size = hidden_size self.num_layers = num_layers self.embedding = nn.Embedding(input_size, hidden_size) self.rnn = nn.RNN(hidden_size, hidden_size, num_layers) self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x, h0):
# x: (seq_length, batch_size)
x = self.embedding(x)
# x: (seq_length, batch_size, hidden_size)
out, hn = self.rnn(x, h0)
# out: (seq_length, batch_size, hidden_size)
# hn: (num_layers, batch_size, hidden_size)
out = out.view(-1, self.hidden_size)
# out: (seq_length * batch_size, hidden_size)
out = self.fc(out)
# out: (seq_length * batch_size, output_size)
return out, hn
def init_hidden(self, batch_size):
return torch.zeros(self.num_layers, batch_size, self.hidden_size)
3.训练模型
import time import math
定义超参数
input_size = len(chars) hidden_size = 128 output_size = len(chars) num_layers = 2 lr = 0.001 epochs = 50
创建模型
model = RNN(input_size, hidden_size, output_size, num_layers)
定义损失函数和优化器
criterion = nn.CrossEntropyLoss() optimizer = torch.optim.Adam(model.parameters(), lr=lr)
训练循环
for epoch in range(epochs): start_time = time.time() # 初始化隐状态 h0 = model.init_hidden(batch_size) # 初始化损失 loss = 0.0 # 训练循环 for i in range(num_batches): # 获取当前batch的数据 x = train_x[i] y = train_y[i] # 转化为tensor x = torch.from_numpy(x).long().transpose(0, 1) y = torch.from_numpy(y).long().transpose(0, 1) # 初始化梯度 optimizer.zero_grad() # 前向传播 outputs, hn = model(x, h0) # 计算损失 loss = criterion(outputs, y.reshape(-1)) # 反向传播 loss.backward() # 梯度截断 nn.utils.clip_grad_norm_(model.parameters(), 5) # 更新参数 optimizer.step() # 更新隐状态 h0 = hn.detach()
end_time = time.time()
# 计算训练集准确率
train_acc = 0.0
for i in range(num_batches):
x = train_x[i]
y = train_y[i]
x = torch.from_numpy(x).long().transpose(0, 1)
y = torch.from_numpy(y).long().transpose(0, 1)
h0 = model.init_hidden(batch_size)
outputs, _ = model(x, h0)
preds = torch.argmax(outputs, dim=1).reshape(batch_size, seq_length)
train_acc += torch.sum(preds == y).item() / (batch_size * seq_length)
train_acc /= num_batches
# 计算测试集准确率
test_acc = 0.0
for i in range(len(test_x)):
x = test_x[i]
y = test_y[i]
x = torch.from_numpy(x).long().transpose(0, 1)
y = torch.from_numpy(y).long().transpose(0, 1)
h0 = model.init_hidden(batch_size)
outputs, _ = model(x, h0)
preds = torch.argmax(outputs, dim=1).reshape(batch_size, seq_length)
test_acc += torch.sum(preds == y).item() / (batch_size * seq_length)
test_acc /= len(test_x)
# 打印训练信息
print('Epoch [{}/{}], Loss: {:.4f}, Train Acc: {:.4f}, Test Acc: {:.4f}, Time: {:.2f}s'.format(
epoch + 1, epochs, loss.item(), train_acc, test_acc, end_time - start_time))
4.预测模型
定义前缀
prefix = 'ni hao'
单步预测
h0 = model.init_hidden(1) x = torch.from_numpy(np.array([char_to_idx[char] for char in prefix])).long().unsqueeze(1) for i in range(len(prefix)): output, h0 = model(x[i].unsqueeze(0), h0) pred = torch.argmax(output, dim=1).item() print(idx_to_char[pred], end='')
K步预测
h0 = model.init_hidden(1) x = torch.from_numpy(np.array([char_to_idx[char] for char in prefix])).long().unsqueeze(1) for i in range(len(prefix)): output, h0 = model(x[i].unsqueeze(0), h0) pred = torch.argmax(output, dim=1).item() x = torch.cat((x, torch.tensor([[pred]])), dim=0) for i in range(50): output, h0 = model(x[-1].unsqueeze(0), h0) pred = torch.argmax(output, dim=1).item() x = torch.cat((x, torch.tensor([[pred]])), dim=0) print(''.join([idx_to_char[idx] for idx in x.squeeze()])
原文地址: https://www.cveoy.top/t/topic/fDZF 著作权归作者所有。请勿转载和采集!