- Microsoft Cognitive Toolkit (CNTK) 教程
- 首页
- 介绍
- 入门
- CPU 和 GPU
- CNTK - 序列分类
- CNTK - 逻辑回归模型
- CNTK - 神经网络 (NN) 概念
- CNTK - 创建第一个神经网络
- CNTK - 训练神经网络
- CNTK - 内存中和大型数据集
- CNTK - 性能测量
- 神经网络分类
- 神经网络二元分类
- CNTK - 神经网络回归
- CNTK - 分类模型
- CNTK - 回归模型
- CNTK - 内存不足的数据集
- CNTK - 监控模型
- CNTK - 卷积神经网络
- CNTK - 循环神经网络
- Microsoft Cognitive Toolkit 资源
- Microsoft Cognitive Toolkit - 快速指南
- Microsoft Cognitive Toolkit - 资源
- Microsoft Cognitive Toolkit - 讨论
CNTK - 内存不足的数据集
本章将解释如何测量内存不足数据集的性能。
在之前的章节中,我们讨论了验证 NN 性能的各种方法,但是我们讨论的方法都是处理适合内存的数据集。
这里出现了一个问题,即内存不足的数据集怎么办?因为在生产环境中,我们需要大量数据来训练NN。在本节中,我们将讨论如何在使用小批量数据源和手动小批量循环时测量性能。
小批量数据源
在处理内存不足的数据集(即小批量数据源)时,我们需要为损失和度量设置与处理小型数据集(即内存中数据集)时使用的设置略微不同的设置。首先,我们将了解如何设置将数据馈送到 NN 模型训练器的方法。
以下是实现步骤:
步骤 1 - 首先,从cntk.io模块导入用于创建小批量数据源的组件,如下所示:
from cntk.io import StreamDef, StreamDefs, MinibatchSource, CTFDeserializer, INFINITY_REPEAT
步骤 2 - 接下来,创建一个名为create_datasource的新函数。此函数将有两个参数,即文件名和限制,默认值为INFINITELY_REPEAT。
def create_datasource(filename, limit =INFINITELY_REPEAT)
步骤 3 - 现在,在函数中,使用StreamDef类创建一个用于读取具有三个特征的标签字段的标签流定义。我们还需要将is_sparse设置为False,如下所示:
labels_stream = StreamDef(field=’labels’, shape=3, is_sparse=False)
步骤 4 - 接下来,创建一个从输入文件读取特征字段的StreamDef的另一个实例,如下所示。
feature_stream = StreamDef(field=’features’, shape=4, is_sparse=False)
步骤 5 - 现在,初始化CTFDeserializer实例类。指定我们需要反序列化的文件名和流,如下所示:
deserializer = CTFDeserializer(filename, StreamDefs(labels= label_stream, features=features_stream)
步骤 6 - 接下来,我们需要使用反序列化器创建minisourceBatch的实例,如下所示:
Minibatch_source = MinibatchSource(deserializer, randomize=True, max_sweeps=limit) return minibatch_source
步骤 7 - 最后,我们需要提供训练和测试源,我们在之前的章节中也创建了这些源。我们使用的是鸢尾花数据集。
training_source = create_datasource(‘Iris_train.ctf’) test_source = create_datasource(‘Iris_test.ctf’, limit=1)
创建MinibatchSource实例后,我们需要对其进行训练。我们可以使用与处理小型内存中数据集时相同的训练逻辑。在这里,我们将使用MinibatchSource实例作为损失函数的train方法的输入,如下所示:
以下是实现步骤:
步骤 1 - 为了记录训练会话的输出,首先从cntk.logging模块导入ProgressPrinter,如下所示:
from cntk.logging import ProgressPrinter
步骤 2 - 接下来,要设置训练会话,请从cntk.train模块导入trainer和training_session,如下所示:
from cntk.train import Trainer, training_session
步骤 3 - 现在,我们需要定义一些常量,例如minibatch_size、samples_per_epoch和num_epochs,如下所示:
minbatch_size = 16 samples_per_epoch = 150 num_epochs = 30 max_samples = samples_per_epoch * num_epochs
步骤 4 - 接下来,为了了解如何在 CNTK 中在训练期间读取数据,我们需要定义网络输入变量和小批量数据源中的流之间的映射。
input_map = { features: training_source.streams.features, labels: training_source.streams.labels }
步骤 5 - 接下来,要记录训练过程的输出,请使用新的ProgressPrinter实例初始化progress_printer变量。此外,初始化trainer并为其提供模型,如下所示:
progress_writer = ProgressPrinter(0) trainer: training_source.streams.labels
步骤 6 - 最后,要启动训练过程,我们需要调用training_session函数,如下所示:
session = training_session(trainer, mb_source=training_source, mb_size=minibatch_size, model_inputs_to_streams=input_map, max_samples=max_samples, test_config=test_config) session.train()
训练模型后,我们可以通过使用TestConfig对象并将其分配给train_session函数的test_config关键字参数来为此设置添加验证。
以下是实现步骤:
步骤 1 - 首先,我们需要从cntk.train模块导入TestConfig类,如下所示:
from cntk.train import TestConfig
步骤 2 - 现在,我们需要使用test_source作为输入创建一个新的TestConfig实例:
Test_config = TestConfig(test_source)
完整示例
from cntk.io import StreamDef, StreamDefs, MinibatchSource, CTFDeserializer, INFINITY_REPEAT def create_datasource(filename, limit =INFINITELY_REPEAT) labels_stream = StreamDef(field=’labels’, shape=3, is_sparse=False) feature_stream = StreamDef(field=’features’, shape=4, is_sparse=False) deserializer = CTFDeserializer(filename, StreamDefs(labels=label_stream, features=features_stream) Minibatch_source = MinibatchSource(deserializer, randomize=True, max_sweeps=limit) return minibatch_source training_source = create_datasource(‘Iris_train.ctf’) test_source = create_datasource(‘Iris_test.ctf’, limit=1) from cntk.logging import ProgressPrinter from cntk.train import Trainer, training_session minbatch_size = 16 samples_per_epoch = 150 num_epochs = 30 max_samples = samples_per_epoch * num_epochs input_map = { features: training_source.streams.features, labels: training_source.streams.labels } progress_writer = ProgressPrinter(0) trainer: training_source.streams.labels session = training_session(trainer, mb_source=training_source, mb_size=minibatch_size, model_inputs_to_streams=input_map, max_samples=max_samples, test_config=test_config) session.train() from cntk.train import TestConfig Test_config = TestConfig(test_source)
输出
------------------------------------------------------------------- average since average since examples loss last metric last ------------------------------------------------------ Learning rate per minibatch: 0.1 1.57 1.57 0.214 0.214 16 1.38 1.28 0.264 0.289 48 [………] Finished Evaluation [1]: Minibatch[1-1]:metric = 69.65*30;
手动小批量循环
正如我们上面看到的,通过在使用 CNTK 中的常规 API 进行训练时使用度量,可以轻松测量 NN 模型在训练期间和训练后的性能。但是,另一方面,在使用手动小批量循环时,情况并非如此简单。
在这里,我们使用下面给出的模型,该模型具有 4 个输入和 3 个输出,来自我们在前面章节中创建的鸢尾花数据集:
from cntk import default_options, input_variable from cntk.layers import Dense, Sequential from cntk.ops import log_softmax, relu, sigmoid from cntk.learners import sgd model = Sequential([ Dense(4, activation=sigmoid), Dense(3, activation=log_softmax) ]) features = input_variable(4) labels = input_variable(3) z = model(features)
接下来,模型的损失被定义为交叉熵损失函数和 F 度量的组合,如前面章节中所用。我们将使用criterion_factory实用程序将其创建为 CNTK 函数对象,如下所示:
import cntk from cntk.losses import cross_entropy_with_softmax, fmeasure @cntk.Function def criterion_factory(outputs, targets): loss = cross_entropy_with_softmax(outputs, targets) metric = fmeasure(outputs, targets, beta=1) return loss, metric loss = criterion_factory(z, labels) learner = sgd(z.parameters, 0.1) label_mapping = { 'Iris-setosa': 0, 'Iris-versicolor': 1, 'Iris-virginica': 2 }
现在,我们已经定义了损失函数,我们将了解如何在训练器中使用它来设置手动训练会话。
以下是实现步骤:
步骤 1 - 首先,我们需要导入必要的包,例如numpy和pandas来加载和预处理数据。
import pandas as pd import numpy as np
步骤 2 - 接下来,为了在训练期间记录信息,请导入ProgressPrinter类,如下所示:
from cntk.logging import ProgressPrinter
步骤 3 - 然后,我们需要从cntk.train模块导入trainer模块,如下所示:
from cntk.train import Trainer
步骤 4 - 接下来,创建一个新的ProgressPrinter实例,如下所示:
progress_writer = ProgressPrinter(0)
步骤 5 - 现在,我们需要使用参数损失、学习器和progress_writer初始化训练器,如下所示:
trainer = Trainer(z, loss, learner, progress_writer)
步骤 6 - 接下来,为了训练模型,我们将创建一个循环,该循环将迭代数据集三十次。这将是外部训练循环。
for _ in range(0,30):
步骤 7 - 现在,我们需要使用 pandas 从磁盘加载数据。然后,为了以小批量加载数据集,将chunksize关键字参数设置为 16。
input_data = pd.read_csv('iris.csv', names=['sepal_length', 'sepal_width','petal_length','petal_width', 'species'], index_col=False, chunksize=16)
步骤 8 - 现在,创建一个内部训练 for 循环来迭代每个小批量。
for df_batch in input_data:
步骤 9 - 现在在这个循环内部,使用iloc索引器读取前四列作为要从中进行训练的特征,并将它们转换为 float32:
feature_values = df_batch.iloc[:,:4].values feature_values = feature_values.astype(np.float32)
步骤 10 - 现在,读取最后一列作为要从中进行训练的标签,如下所示:
label_values = df_batch.iloc[:,-1]
步骤 11 - 接下来,我们将使用独热向量将标签字符串转换为它们的数字表示,如下所示:
label_values = label_values.map(lambda x: label_mapping[x])
步骤 12 - 此后,获取标签的数字表示。接下来,将它们转换为 numpy 数组,这样更容易使用它们,如下所示:
label_values = label_values.values
步骤 13 - 现在,我们需要创建一个新的 numpy 数组,该数组的行数与我们转换的标签值相同。
encoded_labels = np.zeros((label_values.shape[0], 3))
步骤 14 - 现在,为了创建独热编码的标签,请根据数字标签值选择列。
encoded_labels[np.arange(label_values.shape[0]), label_values] = 1.
步骤 15 - 最后,我们需要在训练器上调用train_minibatch方法,并为小批量提供已处理的特征和标签。
trainer.train_minibatch({features: feature_values, labels: encoded_labels})
完整示例
from cntk import default_options, input_variable from cntk.layers import Dense, Sequential from cntk.ops import log_softmax, relu, sigmoid from cntk.learners import sgd model = Sequential([ Dense(4, activation=sigmoid), Dense(3, activation=log_softmax) ]) features = input_variable(4) labels = input_variable(3) z = model(features) import cntk from cntk.losses import cross_entropy_with_softmax, fmeasure @cntk.Function def criterion_factory(outputs, targets): loss = cross_entropy_with_softmax(outputs, targets) metric = fmeasure(outputs, targets, beta=1) return loss, metric loss = criterion_factory(z, labels) learner = sgd(z.parameters, 0.1) label_mapping = { 'Iris-setosa': 0, 'Iris-versicolor': 1, 'Iris-virginica': 2 } import pandas as pd import numpy as np from cntk.logging import ProgressPrinter from cntk.train import Trainer progress_writer = ProgressPrinter(0) trainer = Trainer(z, loss, learner, progress_writer) for _ in range(0,30): input_data = pd.read_csv('iris.csv', names=['sepal_length', 'sepal_width','petal_length','petal_width', 'species'], index_col=False, chunksize=16) for df_batch in input_data: feature_values = df_batch.iloc[:,:4].values feature_values = feature_values.astype(np.float32) label_values = df_batch.iloc[:,-1] label_values = label_values.map(lambda x: label_mapping[x]) label_values = label_values.values encoded_labels = np.zeros((label_values.shape[0], 3)) encoded_labels[np.arange(label_values.shape[0]), label_values] = 1. trainer.train_minibatch({features: feature_values, labels: encoded_labels})
输出
------------------------------------------------------------------- average since average since examples loss last metric last ------------------------------------------------------ Learning rate per minibatch: 0.1 1.45 1.45 -0.189 -0.189 16 1.24 1.13 -0.0382 0.0371 48 [………]
在上面的输出中,我们在训练期间获得了损失和度量的输出。这是因为我们将度量和损失组合在一个函数对象中,并在训练器配置中使用了进度打印器。
现在,为了评估模型性能,我们需要执行与训练模型相同的任务,但是这次我们需要使用Evaluator实例来测试模型。这在下面的 Python 代码中显示:
from cntk import Evaluator evaluator = Evaluator(loss.outputs[1], [progress_writer]) input_data = pd.read_csv('iris.csv', names=['sepal_length', 'sepal_width','petal_length','petal_width', 'species'], index_col=False, chunksize=16) for df_batch in input_data: feature_values = df_batch.iloc[:,:4].values feature_values = feature_values.astype(np.float32) label_values = df_batch.iloc[:,-1] label_values = label_values.map(lambda x: label_mapping[x]) label_values = label_values.values encoded_labels = np.zeros((label_values.shape[0], 3)) encoded_labels[np.arange(label_values.shape[0]), label_values] = 1. evaluator.test_minibatch({ features: feature_values, labels: encoded_labels}) evaluator.summarize_test_progress()
现在,我们将得到类似于以下内容的输出:
输出
Finished Evaluation [1]: Minibatch[1-11]:metric = 74.62*143;