1. 拼音数据(无声调):

拼音数据(无声调)可以从给定的链接中下载,保存为pinyin.txt文件。

  1. 定义数据集:

我们采用字符模型,因此一个字符为一个样本。每个样本采用one-hot编码。

import numpy as np

# 定义数据集类
class PinyinDataset:
    def __init__(self, filename, batch_size, seq_length, encoding='utf-8'):
        self.batch_size = batch_size
        self.seq_length = seq_length
        
        # 读取拼音数据
        with open(filename, 'r', encoding=encoding) as f:
            self.data = f.read()
        self.vocab = list(set(self.data)) # 构建字符集合
        self.vocab_size = len(self.vocab) # 字符集大小
        
        # 构建字符到索引的映射
        self.char_to_idx = { char:idx for idx, char in enumerate(self.vocab) }
        self.idx_to_char = { idx:char for idx, char in enumerate(self.vocab) }
        
        # 将数据转换为整数序列
        self.data = [self.char_to_idx[char] for char in self.data]
        self.num_batches = len(self.data) // (batch_size * seq_length)
        self.data = self.data[:self.num_batches * batch_size * seq_length]
        
    def __len__(self):
        return len(self.data) - self.seq_length
    
    def __getitem__(self, idx):
        x = np.zeros((self.seq_length, self.batch_size, self.vocab_size), dtype=np.float32)
        y = np.zeros((self.seq_length, self.batch_size, self.vocab_size), dtype=np.float32)
        for i in range(self.seq_length):
            for j in range(self.batch_size):
                x[i, j, self.data[idx + i + j*self.seq_length]] = 1.0
                y[i, j, self.data[idx + i + j*self.seq_length + 1]] = 1.0
        return x, y
  1. 样本是时间相关的,分别实现序列的随机采样和序列的顺序划分:
# 随机采样
def random_sample(dataloader, seq_length):
    idx = np.random.randint(len(dataloader)-seq_length)
    return dataloader[idx:idx+seq_length]

# 序列顺序划分
def sequence_partition(dataloader, seq_length):
    num_batches = len(dataloader) // seq_length
    batches = [ dataloader[i*seq_length:(i+1)*seq_length] for i in range(num_batches) ]
    return batches
  1. 标签Y与X同形状,但时间超前1:

由于是字符级别的预测,因此标签Y对应的是输入X的每个字符的下一个字符。因此标签Y与X同形状,但时间超前1。

  1. 准备数据:一次梯度更新使用的数据形状为:(时间步,Batch,类别数):
# 数据准备
batch_size = 32
seq_length = 64
train_dataset = PinyinDataset('pinyin.txt', batch_size, seq_length)
train_batches = sequence_partition(train_dataset, seq_length)
  1. 实现基本循环神经网络模型:

我们采用PyTorch内置的RNN或GRU循环单元,输出层的全连接使用RNN所有时间步的输出。隐状态初始值为0。测试前向传播。如果采用顺序划分,需梯度截断。

import torch
from torch import nn
from torch.nn import functional as F

# 定义循环神经网络模型
class RNNModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1, rnn_type='rnn', dropout=0.0):
        super(RNNModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.dropout = nn.Dropout(dropout)
        self.rnn_type = rnn_type
        if rnn_type == 'rnn':
            self.rnn = nn.RNN(input_size, hidden_size, num_layers, dropout=dropout, batch_first=True)
        elif rnn_type == 'gru':
            self.rnn = nn.GRU(input_size, hidden_size, num_layers, dropout=dropout, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, x, h):
        # x: (batch_size, seq_length, input_size)
        # h: (num_layers, batch_size, hidden_size)
        out, h = self.rnn(x, h)
        out = self.dropout(out)
        out = self.fc(out.reshape(-1, self.hidden_size))
        return out.reshape(x.shape[0], x.shape[1], -1), h
    
    def init_hidden(self, batch_size):
        if self.rnn_type == 'gru':
            return torch.zeros(self.num_layers, batch_size, self.hidden_size)
        else:
            return torch.zeros(self.num_layers, batch_size, self.hidden_size)

# 测试前向传播
input_size = train_dataset.vocab_size
hidden_size = 128
output_size = train_dataset.vocab_size
model = RNNModel(input_size, hidden_size, output_size, num_layers=1, rnn_type='rnn', dropout=0.0)
x, y = train_dataset[0]
h = model.init_hidden(batch_size)
out, h = model(torch.from_numpy(x), h)
print(out.shape, h.shape)

# 采用顺序划分时需要梯度截断
def detach(hidden):
    if isinstance(hidden, torch.Tensor):
        return hidden.detach()
    else:
        return tuple(detach(v) for v in hidden)
  1. 训练:损失函数为平均交叉熵。
# 训练模型
def train(model, dataloader, num_epochs, learning_rate, gradient_clip):
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    for epoch in range(num_epochs):
        h = model.init_hidden(batch_size)
        for i, (x, y) in enumerate(dataloader):
            model.train()
            optimizer.zero_grad()
            h = detach(h)
            out, h = model(x, h)
            loss = criterion(out.reshape(-1, train_dataset.vocab_size), y.reshape(-1).long())
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), gradient_clip)
            optimizer.step()
            if (i+1) % 100 == 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'
                      .format(epoch+1, num_epochs, i+1, len(dataloader), loss.item()))
    return model

# 训练模型
model = RNNModel(input_size, hidden_size, output_size, num_layers=1, rnn_type='rnn', dropout=0.0)
learning_rate = 0.001
num_epochs = 10
gradient_clip = 1.0
model = train(model, train_batches, num_epochs, learning_rate, gradient_clip)
  1. 预测:给定一个前缀,进行单步预测和K步预测。
# 预测模型
def predict(model, prefix, num_preds):
    model.eval()
    h = model.init_hidden(batch_size)
    for char in prefix:
        x = np.zeros((1, 1, train_dataset.vocab_size), dtype=np.float32)
        x[0, 0, train_dataset.char_to_idx[char]] = 1.0
        out, h = model(torch.from_numpy(x), h)
    result = list(prefix)
    for i in range(num_preds):
        x = np.zeros((1, 1, train_dataset.vocab_size), dtype=np.float32)
        x[0, 0, train_dataset.char_to_idx[result[-1]]] = 1.0
        out, h = model(torch.from_numpy(x), h)
        prob = F.softmax(out[0, 0], dim=0).detach().numpy()
        char_idx = np.random.choice(train_dataset.vocab_size, p=prob)
        result.append(train_dataset.idx_to_char[char_idx])
    return ''.join(result)

# 单步预测
prefix = 'ni'
num_preds = 10
print(predict(model, prefix, num_preds))

# K步预测
prefix = 'ni'
num_preds = 10
for i in range(num_preds):
    print(predict(model, prefix, 1))
    prefix = prefix[1:] + result[-1]
``
1拼音数据无声调:httpswwwjianguoyuncompDQ3els0Q-rqYBhi3pIgFIAA 2定义数据集:采用字符模型因此一个字符为一个样本。每个样本采用one-hot编码。 3样本是时间相关的分别实现序列的随机采样和序列的顺序划分 4标签Y与X同形状但时间超前1 5准备数据:一次梯度更新使用的数据形状为:时间步Batch类别数 6实现基本循环神经网

原文地址: https://www.cveoy.top/t/topic/fFeE 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录