1拼音数据无声调:httpswwwjianguoyuncompDQ3els0Q-rqYBhi3pIgFIAA 2定义数据集:采用字符模型因此一个字符为一个样本。每个样本采用one-hot编码。 3样本是时间相关的分别实现序列的随机采样和序列的顺序划分 4标签Y与X同形状但时间超前1 5准备数据:一次梯度更新使用的数据形状为:时间步Batch类别数 6实现基本循环神经网
- 拼音数据(无声调):
拼音数据(无声调)可以从给定的链接中下载,保存为pinyin.txt文件。
- 定义数据集:
我们采用字符模型,因此一个字符为一个样本。每个样本采用one-hot编码。
import numpy as np
# 定义数据集类
class PinyinDataset:
def __init__(self, filename, batch_size, seq_length, encoding='utf-8'):
self.batch_size = batch_size
self.seq_length = seq_length
# 读取拼音数据
with open(filename, 'r', encoding=encoding) as f:
self.data = f.read()
self.vocab = list(set(self.data)) # 构建字符集合
self.vocab_size = len(self.vocab) # 字符集大小
# 构建字符到索引的映射
self.char_to_idx = { char:idx for idx, char in enumerate(self.vocab) }
self.idx_to_char = { idx:char for idx, char in enumerate(self.vocab) }
# 将数据转换为整数序列
self.data = [self.char_to_idx[char] for char in self.data]
self.num_batches = len(self.data) // (batch_size * seq_length)
self.data = self.data[:self.num_batches * batch_size * seq_length]
def __len__(self):
return len(self.data) - self.seq_length
def __getitem__(self, idx):
x = np.zeros((self.seq_length, self.batch_size, self.vocab_size), dtype=np.float32)
y = np.zeros((self.seq_length, self.batch_size, self.vocab_size), dtype=np.float32)
for i in range(self.seq_length):
for j in range(self.batch_size):
x[i, j, self.data[idx + i + j*self.seq_length]] = 1.0
y[i, j, self.data[idx + i + j*self.seq_length + 1]] = 1.0
return x, y
- 样本是时间相关的,分别实现序列的随机采样和序列的顺序划分:
# 随机采样
def random_sample(dataloader, seq_length):
idx = np.random.randint(len(dataloader)-seq_length)
return dataloader[idx:idx+seq_length]
# 序列顺序划分
def sequence_partition(dataloader, seq_length):
num_batches = len(dataloader) // seq_length
batches = [ dataloader[i*seq_length:(i+1)*seq_length] for i in range(num_batches) ]
return batches
- 标签Y与X同形状,但时间超前1:
由于是字符级别的预测,因此标签Y对应的是输入X的每个字符的下一个字符。因此标签Y与X同形状,但时间超前1。
- 准备数据:一次梯度更新使用的数据形状为:(时间步,Batch,类别数):
# 数据准备
batch_size = 32
seq_length = 64
train_dataset = PinyinDataset('pinyin.txt', batch_size, seq_length)
train_batches = sequence_partition(train_dataset, seq_length)
- 实现基本循环神经网络模型:
我们采用PyTorch内置的RNN或GRU循环单元,输出层的全连接使用RNN所有时间步的输出。隐状态初始值为0。测试前向传播。如果采用顺序划分,需梯度截断。
import torch
from torch import nn
from torch.nn import functional as F
# 定义循环神经网络模型
class RNNModel(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=1, rnn_type='rnn', dropout=0.0):
super(RNNModel, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.dropout = nn.Dropout(dropout)
self.rnn_type = rnn_type
if rnn_type == 'rnn':
self.rnn = nn.RNN(input_size, hidden_size, num_layers, dropout=dropout, batch_first=True)
elif rnn_type == 'gru':
self.rnn = nn.GRU(input_size, hidden_size, num_layers, dropout=dropout, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x, h):
# x: (batch_size, seq_length, input_size)
# h: (num_layers, batch_size, hidden_size)
out, h = self.rnn(x, h)
out = self.dropout(out)
out = self.fc(out.reshape(-1, self.hidden_size))
return out.reshape(x.shape[0], x.shape[1], -1), h
def init_hidden(self, batch_size):
if self.rnn_type == 'gru':
return torch.zeros(self.num_layers, batch_size, self.hidden_size)
else:
return torch.zeros(self.num_layers, batch_size, self.hidden_size)
# 测试前向传播
input_size = train_dataset.vocab_size
hidden_size = 128
output_size = train_dataset.vocab_size
model = RNNModel(input_size, hidden_size, output_size, num_layers=1, rnn_type='rnn', dropout=0.0)
x, y = train_dataset[0]
h = model.init_hidden(batch_size)
out, h = model(torch.from_numpy(x), h)
print(out.shape, h.shape)
# 采用顺序划分时需要梯度截断
def detach(hidden):
if isinstance(hidden, torch.Tensor):
return hidden.detach()
else:
return tuple(detach(v) for v in hidden)
- 训练:损失函数为平均交叉熵。
# 训练模型
def train(model, dataloader, num_epochs, learning_rate, gradient_clip):
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
for epoch in range(num_epochs):
h = model.init_hidden(batch_size)
for i, (x, y) in enumerate(dataloader):
model.train()
optimizer.zero_grad()
h = detach(h)
out, h = model(x, h)
loss = criterion(out.reshape(-1, train_dataset.vocab_size), y.reshape(-1).long())
loss.backward()
nn.utils.clip_grad_norm_(model.parameters(), gradient_clip)
optimizer.step()
if (i+1) % 100 == 0:
print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'
.format(epoch+1, num_epochs, i+1, len(dataloader), loss.item()))
return model
# 训练模型
model = RNNModel(input_size, hidden_size, output_size, num_layers=1, rnn_type='rnn', dropout=0.0)
learning_rate = 0.001
num_epochs = 10
gradient_clip = 1.0
model = train(model, train_batches, num_epochs, learning_rate, gradient_clip)
- 预测:给定一个前缀,进行单步预测和K步预测。
# 预测模型
def predict(model, prefix, num_preds):
model.eval()
h = model.init_hidden(batch_size)
for char in prefix:
x = np.zeros((1, 1, train_dataset.vocab_size), dtype=np.float32)
x[0, 0, train_dataset.char_to_idx[char]] = 1.0
out, h = model(torch.from_numpy(x), h)
result = list(prefix)
for i in range(num_preds):
x = np.zeros((1, 1, train_dataset.vocab_size), dtype=np.float32)
x[0, 0, train_dataset.char_to_idx[result[-1]]] = 1.0
out, h = model(torch.from_numpy(x), h)
prob = F.softmax(out[0, 0], dim=0).detach().numpy()
char_idx = np.random.choice(train_dataset.vocab_size, p=prob)
result.append(train_dataset.idx_to_char[char_idx])
return ''.join(result)
# 单步预测
prefix = 'ni'
num_preds = 10
print(predict(model, prefix, num_preds))
# K步预测
prefix = 'ni'
num_preds = 10
for i in range(num_preds):
print(predict(model, prefix, 1))
prefix = prefix[1:] + result[-1]
``
原文地址: https://www.cveoy.top/t/topic/fFeE 著作权归作者所有。请勿转载和采集!