CNTK - 内存数据集和大型数据集



在本章中,我们将学习如何在 CNTK 中处理内存数据集和大型数据集。

使用小型内存数据集进行训练

当我们谈论将数据馈送到 CNTK 训练器时,可能有许多方法,但这将取决于数据集的大小和数据的格式。数据集可以是小型内存数据集或大型数据集。

在本节中,我们将使用内存数据集。为此,我们将使用以下两个框架:

  • Numpy
  • Pandas

使用 Numpy 数组

在这里,我们将使用基于 numpy 的随机生成数据集在 CNTK 中进行操作。在这个例子中,我们将模拟二元分类问题的数据。假设我们有一组具有 4 个特征的观察值,并希望使用我们的深度学习模型预测两个可能的标签。

实现示例

为此,首先我们必须生成一组包含标签的独热向量表示的标签,我们希望预测。它可以通过以下步骤完成:

步骤 1 - 导入 numpy 包,如下所示:

import numpy as np
num_samples = 20000

步骤 2 - 接下来,使用 np.eye 函数生成标签映射,如下所示:

label_mapping = np.eye(2)

步骤 3 - 现在,使用 np.random.choice 函数,收集 20000 个随机样本,如下所示:

y = label_mapping[np.random.choice(2,num_samples)].astype(np.float32)

步骤 4 - 现在最后,使用 np.random.random 函数,生成一个随机浮点值的数组,如下所示:

x = np.random.random(size=(num_samples, 4)).astype(np.float32)

一旦我们生成一个随机浮点值的数组,我们需要将它们转换为 32 位浮点数,以便它可以与 CNTK 预期的格式匹配。让我们按照以下步骤执行此操作:

步骤 5 - 从 cntk.layers 模块导入 Dense 和 Sequential 层函数,如下所示:

from cntk.layers import Dense, Sequential

步骤 6 - 现在,我们需要为网络中的层导入激活函数。让我们导入 sigmoid 作为激活函数

from cntk import input_variable, default_options
from cntk.ops import sigmoid

步骤 7 - 现在,我们需要导入损失函数来训练网络。让我们导入 binary_cross_entropy 作为损失函数

from cntk.losses import binary_cross_entropy

步骤 8 - 接下来,我们需要为网络定义默认选项。在这里,我们将提供 sigmoid 激活函数作为默认设置。此外,使用 Sequential 层函数创建模型,如下所示:

with default_options(activation=sigmoid):
model = Sequential([Dense(6),Dense(2)])

步骤 9 - 接下来,使用 4 个输入特征初始化一个 input_variable,作为网络的输入。

features = input_variable(4)

步骤 10 - 现在,为了完成它,我们需要将特征变量连接到 NN。

z = model(features)

因此,现在我们有一个 NN,借助以下步骤,让我们使用内存数据集对其进行训练:

步骤 11 - 要训练此 NN,首先我们需要从 cntk.learners 模块导入学习器。我们将导入 sgd 学习器,如下所示:

from cntk.learners import sgd

步骤 12 - 同时,从 cntk.logging 模块导入 ProgressPrinter

from cntk.logging import ProgressPrinter
progress_writer = ProgressPrinter(0)

步骤 13 - 接下来,为标签定义一个新的输入变量,如下所示:

labels = input_variable(2)

步骤 14 - 为了训练 NN 模型,接下来,我们需要使用 binary_cross_entropy 函数定义一个损失。同时提供模型 z 和标签变量。

loss = binary_cross_entropy(z, labels)

步骤 15 - 接下来,初始化 sgd 学习器,如下所示:

learner = sgd(z.parameters, lr=0.1)

步骤 16 - 最后,在损失函数上调用 train 方法。同时提供输入数据、sgd 学习器和 progress_printer

training_summary=loss.train((x,y),parameter_learners=[learner],callbacks=[progress_writer])

完整的实现示例

import numpy as np
num_samples = 20000
label_mapping = np.eye(2)
y = label_mapping[np.random.choice(2,num_samples)].astype(np.float32)
x = np.random.random(size=(num_samples, 4)).astype(np.float32)
from cntk.layers import Dense, Sequential
from cntk import input_variable, default_options
from cntk.ops import sigmoid
from cntk.losses import binary_cross_entropy
with default_options(activation=sigmoid):
   model = Sequential([Dense(6),Dense(2)])
features = input_variable(4)
z = model(features)
from cntk.learners import sgd
from cntk.logging import ProgressPrinter
progress_writer = ProgressPrinter(0)
labels = input_variable(2)
loss = binary_cross_entropy(z, labels)
learner = sgd(z.parameters, lr=0.1)
training_summary=loss.train((x,y),parameter_learners=[learner],callbacks=[progress_writer])

输出

Build info:
     Built time: *** ** **** 21:40:10
     Last modified date: *** *** ** 21:08:46 2019
     Build type: Release
     Build target: CPU-only
     With ASGD: yes
     Math lib: mkl
     Build Branch: HEAD
     Build SHA1:ae9c9c7c5f9e6072cc9c94c254f816dbdc1c5be6 (modified)
     MPI distribution: Microsoft MPI
     MPI version: 7.0.12437.6
-------------------------------------------------------------------
average   since   average   since examples
loss      last    metric    last
------------------------------------------------------
Learning rate per minibatch: 0.1
1.52      1.52      0         0     32
1.51      1.51      0         0     96
1.48      1.46      0         0    224
1.45      1.42      0         0    480
1.42       1.4      0         0    992
1.41      1.39      0         0   2016
1.4       1.39      0         0   4064
1.39      1.39      0         0   8160
1.39      1.39      0         0  16352

使用 Pandas DataFrames

Numpy 数组在其可以包含的内容方面非常有限,并且是存储数据的最基本方法之一。例如,单个 n 维数组可以包含单个数据类型的数据。但另一方面,对于许多现实世界的案例,我们需要一个可以处理单个数据集中多个数据类型的库。

名为 Pandas 的 Python 库之一使使用此类数据集变得更加容易。它引入了 DataFrame (DF) 的概念,并允许我们从以各种格式存储在磁盘上的数据集加载 DF。例如,我们可以读取存储为 CSV、JSON、Excel 等的 DF。

您可以在 https://tutorialspoint.com/python_pandas/index.htm. 中更详细地了解 Python Pandas 库。

实现示例

在这个例子中,我们将使用根据四个属性对三种可能的鸢尾花物种进行分类的例子。我们之前也在各节中创建了这个深度学习模型。模型如下:

from cntk.layers import Dense, Sequential
from cntk import input_variable, default_options
from cntk.ops import sigmoid, log_softmax
from cntk.losses import binary_cross_entropy
model = Sequential([
Dense(4, activation=sigmoid),
Dense(3, activation=log_softmax)
])
features = input_variable(4)
z = model(features)

上述模型包含一个隐藏层和一个具有三个神经元的输出层,以匹配我们可以预测的类别数。

接下来,我们将使用 train 方法和 loss 函数来训练网络。为此,首先我们必须加载并预处理鸢尾花数据集,以便它与 NN 的预期布局和数据格式匹配。它可以通过以下步骤完成:

步骤 1 - 导入 numpyPandas 包,如下所示:

import numpy as np
import pandas as pd

步骤 2 - 接下来,使用 read_csv 函数将数据集加载到内存中

df_source = pd.read_csv(‘iris.csv’, names = [‘sepal_length’, ‘sepal_width’,
 ‘petal_length’, ‘petal_width’, ‘species’], index_col=False)

步骤 3 - 现在,我们需要创建一个字典,它将数据集中的标签与其对应的数字表示进行映射。

label_mapping = {‘Iris-Setosa’ : 0, ‘Iris-Versicolor’ : 1, ‘Iris-Virginica’ : 2}

步骤 4 - 现在,通过在 DataFrame 上使用 iloc 索引器,选择前四列,如下所示:

x = df_source.iloc[:, :4].values

步骤 5 -接下来,我们需要选择 species 列作为数据集的标签。它可以如下完成:

y = df_source[‘species’].values

步骤 6 - 现在,我们需要映射数据集中存在的标签,这可以通过使用 label_mapping 来完成。此外,使用 one_hot 编码将它们转换为独热编码数组。

y = np.array([one_hot(label_mapping[v], 3) for v in y])

步骤 7 - 接下来,要将特征和映射的标签与 CNTK 一起使用,我们需要将它们都转换为浮点数

x= x.astype(np.float32)
y= y.astype(np.float32)

众所周知,标签以字符串形式存储在数据集中,而 CNTK 无法处理这些字符串。这就是为什么它需要表示标签的独热编码向量。为此,我们可以定义一个函数,例如 one_hot,如下所示:

def one_hot(index, length):
result = np.zeros(length)
result[index] = index
return result

现在,我们有了正确格式的 numpy 数组,借助以下步骤,我们可以使用它们来训练我们的模型:

步骤 8 - 首先,我们需要导入损失函数来训练网络。让我们导入 binary_cross_entropy_with_softmax 作为损失函数

from cntk.losses import binary_cross_entropy_with_softmax

步骤 9 - 要训练此 NN,我们还需要从 cntk.learners 模块导入学习器。我们将导入 sgd 学习器,如下所示:

from cntk.learners import sgd

步骤 10 - 同时,从 cntk.logging 模块导入 ProgressPrinter

from cntk.logging import ProgressPrinter
progress_writer = ProgressPrinter(0)

步骤 11 - 接下来,为标签定义一个新的输入变量,如下所示:

labels = input_variable(3)

步骤 12 - 为了训练 NN 模型,接下来,我们需要使用 binary_cross_entropy_with_softmax 函数定义一个损失。同时提供模型 z 和标签变量。

loss = binary_cross_entropy_with_softmax (z, labels)

步骤 13 - 接下来,初始化 sgd 学习器,如下所示:

learner = sgd(z.parameters, 0.1)

步骤 14 - 最后,在损失函数上调用 train 方法。同时提供输入数据、sgd 学习器和 progress_printer

training_summary=loss.train((x,y),parameter_learners=[learner],callbacks=
[progress_writer],minibatch_size=16,max_epochs=5)

完整的实现示例

from cntk.layers import Dense, Sequential
from cntk import input_variable, default_options
from cntk.ops import sigmoid, log_softmax
from cntk.losses import binary_cross_entropy
model = Sequential([
Dense(4, activation=sigmoid),
Dense(3, activation=log_softmax)
])
features = input_variable(4)
z = model(features)
import numpy as np
import pandas as pd
df_source = pd.read_csv(‘iris.csv’, names = [‘sepal_length’, ‘sepal_width’, ‘petal_length’, ‘petal_width’, ‘species’], index_col=False)
label_mapping = {‘Iris-Setosa’ : 0, ‘Iris-Versicolor’ : 1, ‘Iris-Virginica’ : 2}
x = df_source.iloc[:, :4].values
y = df_source[‘species’].values
y = np.array([one_hot(label_mapping[v], 3) for v in y])
x= x.astype(np.float32)
y= y.astype(np.float32)
def one_hot(index, length):
result = np.zeros(length)
result[index] = index
return result
from cntk.losses import binary_cross_entropy_with_softmax
from cntk.learners import sgd
from cntk.logging import ProgressPrinter
progress_writer = ProgressPrinter(0)
labels = input_variable(3)
loss = binary_cross_entropy_with_softmax (z, labels)
learner = sgd(z.parameters, 0.1)
training_summary=loss.train((x,y),parameter_learners=[learner],callbacks=[progress_writer],minibatch_size=16,max_epochs=5)

输出

Build info:
     Built time: *** ** **** 21:40:10
     Last modified date: *** *** ** 21:08:46 2019
     Build type: Release
     Build target: CPU-only
     With ASGD: yes
     Math lib: mkl
     Build Branch: HEAD
     Build SHA1:ae9c9c7c5f9e6072cc9c94c254f816dbdc1c5be6 (modified)
     MPI distribution: Microsoft MPI
     MPI version: 7.0.12437.6
-------------------------------------------------------------------
average    since    average   since   examples
loss        last     metric   last
------------------------------------------------------
Learning rate per minibatch: 0.1
1.1         1.1        0       0      16
0.835     0.704        0       0      32
1.993      1.11        0       0      48
1.14       1.14        0       0     112
[………]

使用大型数据集进行训练

在上一节中,我们使用 Numpy 和 pandas 处理了小型内存数据集,但并非所有数据集都如此小。特别是包含图像、视频、声音样本的数据集很大。MinibatchSource 是一个组件,它可以分块加载数据,由 CNTK 提供,用于处理此类大型数据集。MinibatchSource 组件的一些特性如下:

  • MinibatchSource 可以通过自动随机化从数据源读取的样本,防止 NN 过拟合。

  • 它具有内置的转换管道,可用于增强数据。

  • 它在与训练过程分开的后台线程上加载数据。

在以下各节中,我们将探讨如何将小批量源与内存不足的数据一起使用来处理大型数据集。我们还将探讨如何使用它来馈送训练 NN。

创建 MinibatchSource 实例

在上一节中,我们使用了鸢尾花示例,并使用 Pandas DataFrames 处理了小型内存数据集。在这里,我们将用 MinibatchSource 替换使用来自 pandas DF 的数据的代码。首先,我们需要借助以下步骤创建一个 MinibatchSource 实例:

实现示例

步骤 1 - 首先,从 cntk.io 模块导入小批量源的组件,如下所示:

from cntk.io import StreamDef, StreamDefs, MinibatchSource, CTFDeserializer,
 INFINITY_REPEAT

步骤 2 - 现在,使用 StreamDef 类,为标签创建一个流定义。

labels_stream = StreamDef(field=’labels’, shape=3, is_sparse=False)

步骤 3 - 接下来,创建读取输入文件中的特征字段,创建另一个 StreamDef 实例,如下所示。

feature_stream = StreamDef(field=’features’, shape=4, is_sparse=False)

步骤 4 - 现在,我们需要提供 iris.ctf 文件作为输入并初始化 deserializer,如下所示:

deserializer = CTFDeserializer(‘iris.ctf’, StreamDefs(labels=
label_stream, features=features_stream)

步骤 5 - 最后,我们需要使用 deserializer 创建 minisourceBatch 实例,如下所示:

Minibatch_source = MinibatchSource(deserializer, randomize=True)

创建 MinibatchSource 实例 - 完整的实现示例

from cntk.io import StreamDef, StreamDefs, MinibatchSource, CTFDeserializer, INFINITY_REPEAT
labels_stream = StreamDef(field=’labels’, shape=3, is_sparse=False)
feature_stream = StreamDef(field=’features’, shape=4, is_sparse=False)
deserializer = CTFDeserializer(‘iris.ctf’, StreamDefs(labels=label_stream, features=features_stream)
Minibatch_source = MinibatchSource(deserializer, randomize=True)

创建 MCTF 文件

如您所见,我们正在从“iris.ctf”文件获取数据。它具有名为 CNTK 文本格式 (CTF) 的文件格式。必须创建一个 CTF 文件才能获取我们上面创建的 MinibatchSource 实例的数据。让我们看看如何创建 CTF 文件。

实现示例

步骤 1 - 首先,我们需要导入 pandas 和 numpy 包,如下所示:

import pandas as pd
import numpy as np

步骤 2 - 接下来,我们需要将我们的数据文件,即 iris.csv 加载到内存中。然后,将其存储在 df_source 变量中。

df_source = pd.read_csv(‘iris.csv’, names = [‘sepal_length’, ‘sepal_width’, ‘petal_length’, ‘petal_width’, ‘species’], index_col=False)

步骤 3 - 现在,使用 iloc 索引器作为特征,获取前四列的内容。此外,使用 species 列中的数据,如下所示:

features = df_source.iloc[: , :4].values
labels = df_source[‘species’].values

步骤 4 - 接下来,我们需要在标签名称与其数字表示之间创建映射。它可以通过创建 label_mapping 来完成,如下所示:

label_mapping = {‘Iris-Setosa’ : 0, ‘Iris-Versicolor’ : 1, ‘Iris-Virginica’ : 2}

步骤 5 - 现在,将标签转换为一组独热编码向量,如下所示:

labels = [one_hot(label_mapping[v], 3) for v in labels]

现在,就像我们之前做的那样,创建一个名为 one_hot 的实用函数来编码标签。它可以如下完成:

def one_hot(index, length):
result = np.zeros(length)
result[index] = 1
return result

由于我们已经加载并预处理了数据,现在是时候将其存储到磁盘上的CTF文件格式中了。我们可以借助以下Python代码来实现:

With open(‘iris.ctf’, ‘w’) as output_file:
for index in range(0, feature.shape[0]):
feature_values = ‘ ‘.join([str(x) for x in np.nditer(features[index])])
label_values = ‘ ‘.join([str(x) for x in np.nditer(labels[index])])
output_file.write(‘features {} | labels {} \n’.format(feature_values, label_values))

创建MCTF文件 - 完整实现示例

import pandas as pd
import numpy as np
df_source = pd.read_csv(‘iris.csv’, names = [‘sepal_length’, ‘sepal_width’, ‘petal_length’, ‘petal_width’, ‘species’], index_col=False)
features = df_source.iloc[: , :4].values
labels = df_source[‘species’].values
label_mapping = {‘Iris-Setosa’ : 0, ‘Iris-Versicolor’ : 1, ‘Iris-Virginica’ : 2}
labels = [one_hot(label_mapping[v], 3) for v in labels]
def one_hot(index, length):
result = np.zeros(length)
result[index] = 1
return result
With open(‘iris.ctf’, ‘w’) as output_file:
for index in range(0, feature.shape[0]):
feature_values = ‘ ‘.join([str(x) for x in np.nditer(features[index])])
label_values = ‘ ‘.join([str(x) for x in np.nditer(labels[index])])
output_file.write(‘features {} | labels {} \n’.format(feature_values, label_values))

数据馈送

一旦你创建了MinibatchSource实例,我们就需要对其进行训练。我们可以使用与处理小型内存数据集时相同的训练逻辑。在这里,我们将使用MinibatchSource实例作为损失函数训练方法的输入,如下所示:

实现示例

步骤1 - 为了记录训练会话的输出,首先从cntk.logging模块导入ProgressPrinter,如下所示:

from cntk.logging import ProgressPrinter

步骤2 - 接下来,要设置训练会话,从cntk.train模块导入trainertraining_session,如下所示:

from cntk.train import Trainer, 

步骤3 - 现在,我们需要定义一些常量集,例如minibatch_sizesamples_per_epochnum_epochs,如下所示:

minbatch_size = 16
samples_per_epoch = 150
num_epochs = 30

步骤4 - 接下来,为了让CNTK知道如何在训练期间读取数据,我们需要定义网络输入变量和minibatch源中数据流之间的映射关系。

input_map = {
     features: minibatch.source.streams.features,
     labels: minibatch.source.streams.features
}

步骤5 - 接下来,为了记录训练过程的输出,使用新的ProgressPrinter实例初始化progress_printer变量,如下所示:

progress_writer = ProgressPrinter(0)

步骤6 - 最后,我们需要在损失函数上调用训练方法,如下所示:

train_history = loss.train(minibatch_source,
parameter_learners=[learner],
  model_inputs_to_streams=input_map,
callbacks=[progress_writer],
epoch_size=samples_per_epoch,
max_epochs=num_epochs)

数据馈送 - 完整实现示例

from cntk.logging import ProgressPrinter
from cntk.train import Trainer, training_session
minbatch_size = 16
samples_per_epoch = 150
num_epochs = 30
input_map = {
   features: minibatch.source.streams.features,
   labels: minibatch.source.streams.features
}
progress_writer = ProgressPrinter(0)
train_history = loss.train(minibatch_source,
parameter_learners=[learner],
model_inputs_to_streams=input_map,
callbacks=[progress_writer],
epoch_size=samples_per_epoch,
max_epochs=num_epochs)

输出

-------------------------------------------------------------------
average   since   average   since  examples
loss      last     metric   last
------------------------------------------------------
Learning rate per minibatch: 0.1
1.21      1.21      0        0       32
1.15      0.12      0        0       96
[………]
广告