Осмысление больших данных

Преодоление узких мест при предварительной обработке данных с помощью TensorFlow Data Service, NVIDIA DALI и других методов

Максимальное использование ресурсов обучения, ускорение обучения, экономия денег

В предыдущем посте я говорил о важности профилирования производительности ваших тренировочных сессий DNN как средства максимально эффективного использования ваших обучающих ресурсов, ускорения обучения и экономии денег. Я описал типичный конвейер обучения (см. Диаграмму ниже), рассмотрел некоторые из потенциальных узких мест производительности и рассмотрел некоторые инструменты, доступные для выявления таких узких мест. В этом посте я хотел бы подробнее остановиться на одном из наиболее распространенных узких мест производительности, узком месте ЦП и некоторых способах его преодоления. В частности, мы обсудим узкие места, возникающие в конвейере предварительной обработки данных, и способы их преодоления.

В контексте этого сообщения мы будем предполагать, что мы используем TensorFlow, в частности TensorFlow 2.4, для обучения модели обработки изображений на GPU, но контент, как правило, не менее актуален для других платформ обучения, других типов моделей и других ускорителей обучения.

Узкое место предварительной обработки данных

Узкое место ЦП возникает, когда ресурс графического процессора не используется в результате того, что один или несколько ЦП достигли максимального использования. В этой ситуации графический процессор будет частично бездействовать, пока он ожидает, пока процессор передаст данные обучения. Это нежелательное состояние. Поскольку графический процессор, как правило, является самым дорогим ресурсом в системе, вашей целью всегда должно быть максимальное его использование. Не вдаваясь в слишком много технических деталей, узкое место ЦП обычно возникает, когда соотношение между «объемом» предварительной обработки данных, которое выполняется на ЦП, и «объемом» вычислений, выполняемых моделью на графическом процессоре, составляет больше, чем соотношение между общей вычислительной мощностью ЦП и общей вычислительной мощностью графического процессора. Например, если и ядра вашего ЦП, и графический процессор используются максимально, а затем вы переходите на более мощный графический процессор или переходите на систему с меньшим количеством ядер ЦП, производительность вашего обучения во время выполнения будет ограничена ЦП.

Естественно, первым делом вам нужно будет просто переключиться на машину с более подходящим соотношением вычислительных ресурсов ЦП и ГП. Но, к сожалению, у большинства из нас нет такой свободы. И хотя облачные сервисы, такие как Amazon SageMaker, предлагают множество типов обучающих инстансов с разными соотношениями вычислений CPU и GPU, вы можете обнаружить, что ни один из них не полностью соответствует вашим конкретным потребностям.

Предполагая, что вы застряли в имеющейся у вас системе, какие шаги вы можете предпринять, чтобы устранить узкое место в производительности и ускорить обучение?

В следующих разделах мы предложим четыре шага для устранения узкого места предварительной обработки данных.

  1. Определите любые операции, которые можно перенести на этап подготовки данных
  2. Оптимизация конвейера предварительной обработки данных
  3. Выполните некоторые этапы предварительной обработки на графическом процессоре
  4. Используйте службу данных TensorFlow, чтобы передать часть вычислений ЦП другим машинам

Чтобы облегчить наше обсуждение, мы построим игрушечный пример на основе Resnet50.

Пример использования

В приведенном ниже блоке кода я построил модель, используя встроенное в приложение Resnet50 приложение TensorFlow. Я добавил относительно тяжелый конвейер предварительной обработки данных, который включает расширение, фильтрацию размытия и ряд уровней предварительной обработки TensorFlow. (См. Документацию о преимуществах использования таких слоев.)

import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.layers.experimental import preprocessing
def get_dataset(batch_size):
    # parse TFRecord
    def parse_image_function(example_proto):
        image_feature_description = 
           {'image': tf.io.FixedLenFeature([], tf.string),
            'label': tf.io.FixedLenFeature([], tf.int64)}
        features = tf.io.parse_single_example(
                      example_proto, image_feature_description)
        image = tf.io.decode_raw(features['image'], tf.uint8)
        image.set_shape([3 * 32 * 32])
        image = tf.reshape(image, [32, 32, 3])
        label = tf.cast(features['label'], tf.int32)
        return image, label
    # dilation filter
    def dilate(image, label):
        dilateFilter = tf.zeros([3, 3, 3], tf.uint8)
        image = tf.expand_dims(image, 0)
        image = tf.nn.dilation2d(
                    image, dilateFilter, strides=[1, 1, 1, 1],
                    dilations=[1, 1, 1, 1],
                    padding='SAME', 
                    data_format='NHWC')
        image = tf.squeeze(image)
        return image, label
    # blur filter
    def blur(image, label):
        image = tfa.image.gaussian_filter2d(image=image,
                            filter_shape=(11, 11), sigma=0.8)
        return image, label
    # rescale filter
    def rescale(image, label):
        image = preprocessing.Rescaling(1.0 / 255)(image)
        return image, label
    # augmentation filters
    def augment(image, label):
        data_augmentation = tf.keras.Sequential(
           [preprocessing.RandomFlip("horizontal"),
            preprocessing.RandomRotation(0.1),
            preprocessing.RandomZoom(0.1)])
        image = data_augmentation(image)
        return image, label
    autotune = tf.data.experimental.AUTOTUNE
    options = tf.data.Options()
    options.experimental_deterministic = False
    records = tf.data.Dataset.list_files('data/*', 
                            shuffle=True).with_options(options)
    # load from TFRecord files
    ds = tf.data.TFRecordDataset(records, 
                 num_parallel_reads=autotune).repeat()
    ds = ds.map(parse_image_function, num_parallel_calls=autotune)
    ds = ds.map(dilate, num_parallel_calls=autotune)
    ds = ds.map(blur, num_parallel_calls=autotune)
    ds = ds.batch(batch_size)
    ds = ds.map(rescale,num_parallel_calls=autotune)
    ds = ds.map(augment, num_parallel_calls=autotune)
    ds = ds.prefetch(autotune)
    return ds
if __name__ == "__main__":    
    model = ResNet50(weights=None,
                     input_shape=(32, 32, 3),
                     classes=10)
    model.compile(loss=tf.losses.SparseCategoricalCrossentropy(),
                  optimizer=tf.optimizers.Adam())
    dataset = get_dataset(batch_size = 1024)
    model.fit(dataset, steps_per_epoch=100, epochs=10))

Исходные данные хранятся в файлах TFRecord, которые я создал из набора данных CIFAR-10 (с помощью сценария this).

Я создал этот пример, чтобы искусственно создать узкое место в производительности. Я бы ни при каких обстоятельствах не рекомендовал использовать его для реальных тренировок.

Все тесты проводились на инстансе типа Amazon ec2 p2.xlarge с использованием Amazon Deep Learning AMI.

Выявление узкого места

Существует ряд различных инструментов и методов для оценки производительности учебного сеанса во время выполнения, а также для выявления и изучения узких мест входного конвейера. Рассмотрим лишь некоторые из них:

Системные метрики

Первое, что нужно проверить, - это использование ресурсов системы. Есть несколько способов сделать это. Команда Linux top показывает загрузку процессора. Чтобы увидеть, как загружается ядро ​​ЦП, введите 1 во время работы top. Для измерения загрузки графического процессора вы можете использовать nvidia-smi. При обучении в Amazon EC2 вы можете использовать Amazon CloudWatch для мониторинга системных метрик. Хотя метрики графического процессора не включены по умолчанию, вы можете добавить их с помощью утилиты gpumon. Ниже приведен пример графика использования ЦП и графического процессора, снятого в нескольких различных экспериментах.

В представленном выше варианте использования средняя загруженность графического процессора составляет менее 50% при длительных периодах простоя. В то же время ЦП загружен очень сильно, причем некоторые ядра достигают максимальной загрузки.

Профилировщики производительности

Чтобы перейти на следующий уровень детализации того, как проходит обучение, вы можете использовать профилировщик производительности.

Профилировщик TensorFlow: встроенный Профилировщик TensorFlow включает в себя множество средств аналитики производительности и, в частности, инструменты для анализа производительности конвейера ввода. Вы можете просматривать с помощью TensorBoard, установив плагин профиля TensorBoard. Один из способов включить профилировщик - это запрограммировать цикл обучения с помощью обратного вызова TensorBoad.

# program the callback to capture steps 20-24
cb = tf.keras.callbacks.TensorBoard(
        log_dir='/tmp/profile', profile_batch='20,24',
        histogram_freq=0, write_images=False)
model.fit(dataset, steps_per_epoch=100, epochs=10, callbacks=[cb])

Ниже приведена страница обзора профилирования для нашего примера использования, на котором явно видно узкое место при вводе данных.

Средство просмотра трассировки позволяет углубиться в детали выполнения конвейера и изучить поток данных между процессором и графическим процессором. В нашем примере вы можете четко видеть длительные периоды простоя графического процессора из-за узких мест при вводе данных.

Amazon SageMaker Debugger: если вы тренируетесь в среде Amazon SageMaker, вы можете воспользоваться функциями профилирования, встроенными в Amazon SageMaker Debugger. Вот пример того, как в Amazon SageMaker Studio появится серьезное узкое место во входном конвейере.

Профилировщики Linux. Профилировщики производительности Linux общего назначения также часто помогают при анализе узких мест при обучении. Например, используя Linux perf utility, мы можем увидеть, что наш ЦП тратит большую часть своего времени на внутреннюю функцию линейной алгебры:

Измерение пропускной способности

Поскольку целью нашего анализа является ускорение времени выполнения обучения, вполне естественно, что мы будем использовать эту метрику в качестве меры нашей производительности.

В нашем примере мы будем использовать среднее время выполнения одной (100 шагов) эпохи в качестве нашей основной метрики производительности и измерять, как различные изменения в модели влияют на это значение. Среднее время выполнения одной эпохи модели, приведенной выше, составляет 122 секунды.

Полезный метод (описанный здесь) для измерения того, какой была бы среда выполнения, если бы не узкое место ввода данных, - это кэширование первого обработанного входного пакета и использование того же кешированного пакета для всех последующих шагов. Это, по сути, отключает конвейер предварительной обработки и позволяет нам вычислить идеальное время выполнения эпохи.

Чтобы реализовать эту технику, мы просто добавляем следующую строку кода в конце создания нашего набора данных:

ds = ds.take(1).cache().repeat()

Применяя эту технику к нашему примеру, мы можем сократить время выполнения до 58 секунд. Другими словами, если бы не узкое место при вводе данных, мы смогли бы ускорить обучение более чем в 2 раза.

В следующих разделах мы рассмотрим ряд предлагаемых шагов для устранения узкого места во входном конвейере. Мы продемонстрируем некоторые шаги на нашем игрушечном примере, имея в виду целевое время выполнения, которое мы только что рассчитали, 58 секунд на эпоху.

Подготовка операций к этапу подготовки данных

Первое, что нужно сделать, чтобы устранить узкое место предварительной обработки данных, - это определить любые операции, которые могут быть предприняты на этапе создания необработанных записей данных. Чем больше операций мы можем переместить в фазу создания данных, тем больше мы сможем освободить циклы ЦП во время обучения. Любые операции, которые выполняются в начале конвейера детерминированным образом (не имеют случайного компонента), которые не зависят от гиперпараметра и не увеличивают чрезмерно размер данных, являются хорошими кандидатами на предварительную обработку. В нашем примере с игрушкой операция расширения (при условии, что она не зависит от гиперпараметра) соответствует этим критериям. Итак, первое, что мы сделаем, это отключим операцию расширения и предположим, что TFRecords содержат данные изображения после того, как они уже подверглись соответствующему расширению.

В нашей конкретной реализации фильтр размытия также мог бы быть хорошим кандидатом для предварительной обработки, но поскольку в большинстве случаев размытие применяется случайным образом, мы оставим его.

Если удалить только операцию расширения, время выполнения сокращается до 115 секунд за эпоху. Это меньше, чем наше начальное значение 122 секунды для эпохи, но нам еще предстоит пройти долгий путь, чтобы достичь нашей цели, равной 58 секунд для эпохи.

Следует обратить внимание на то, что определенные операции могут изменить размер ваших записей данных и, таким образом, могут повлиять на общий размер вашего набора данных, а также на объем сетевого трафика во время обучения (если обучающий набор хранится удаленно. ). Если вы решите заранее подготовить операции, которые чрезмерно увеличивают размер данных, вы можете столкнуться с риском замены одного узкого места другим, то есть узким местом сетевого ввода-вывода или загрузки данных.

Оптимизация конвейера предварительной обработки данных

После того, как мы переместили как можно больше операций на этап создания данных, вторым шагом будет определение способов оптимизации производительности оставшегося конвейера.

Настройка трубопроводов

Часто небольшие изменения в настройке входного конвейера могут снизить накладные расходы на производительность. Вот лишь несколько вещей, которые вы можете попробовать:

  1. Если у вас есть несколько функций карты набора данных, которые относительно малы, рассмотрите возможность сгруппировать их в одну функцию карты.
  2. И наоборот, если у вас есть очень большая функция карты набора данных, подумайте о том, чтобы разбить ее на две или несколько более мелких функций, чтобы лучше использовать встроенную поддержку параллельных вызовов.
  3. Ищите операции, которые можно было бы применять после пакетной обработки, а не для каждой записи. (В нашем примере функцию размытия теоретически можно применить к пакетам обучения, но, поскольку она обычно применяется случайным образом, мы оставим ее для каждой записи.)
  4. По возможности используйте типы с низкой точностью. Отложите литье с более высокой точностью до конца конвейера.
  5. Если ваш конвейер включает tf.numpy_function или tf.py_function, рассмотрите возможность использования вместо них примитивов TensorFlow.

Оптимизация и расширения ЦП

Убедитесь, что ваши двоичные файлы TensorFlow были настроены (и скомпилированы) для использования всех преимуществ вашего процессора и его расширений. Например, если вы используете современный процессор ISA x86 (например, Intel или AMD), обязательно используйте двоичные файлы TensorFlow, оптимизированные для использования расширенных векторных расширений процессора (например, AVX2). Intel, как правило, предлагает широкий спектр двоичных файлов, специально оптимизированных для работы на процессорах Intel, включая intelpython и TensorFlow-mkl.

Обратите внимание, что одним из преимуществ использования облачного решения для обучения является то, что облачная среда обучения (предположительно) настроена для использования всех преимуществ ресурсов облачной системы.

Балансировка нагрузки ЦП

Когда у вас есть узкое место ЦП в многопроцессорной системе, вы можете обнаружить, что, хотя одно или несколько ядер ЦП полностью загружены, другие нет. На самом деле это довольно распространенное явление. Одна вещь, которую вы можете попробовать, - это улучшить балансировку нагрузки между ЦП, чтобы общая загрузка ЦП увеличилась. Вы можете попробовать это, используя API tf.config.set_logical_device_configuration для разделения вычислений ЦП на несколько логических устройств и API tf.device, чтобы указать, где должна выполняться каждая операция. Вы также можете попытаться улучшить балансировку нагрузки, поигравшись с различными параметрами аргумента num_parallel_calls функции tf.data.Dataset.map (вместо того, чтобы полагаться на функцию автонастройки TensorFlow). В любом случае имейте в виду, что это, вероятно, будет утомительным, мучительным усилием, и что даже малейшее изменение вашей модели, вероятно, потребует пересчета балансировки нагрузки.

Операции разгрузки на GPU

Как и в нашем примере, вы можете обнаружить, что даже после того, как вы исчерпали все возможности для переноса операций на этап создания данных и оптимизации кода ЦП, вы по-прежнему сталкиваетесь с узким местом предварительной обработки данных. Следующий вариант, который следует рассмотреть, - это изменить балансировку нагрузки между ЦП и ГП, перенеся некоторые операции предварительной обработки на ГП. Обратной стороной этого подхода является то, что мы почти наверняка увеличим время выполнения шага графического процессора. Кроме того, поскольку мы увеличиваем размер графа вычислений, который выполняется на графическом процессоре, нам может потребоваться освободить часть памяти графического процессора, запустив пакет меньшего размера. В результате маловероятно, что мы сможем достичь целевой производительности, рассчитанной нами выше. Но если это сокращает общее время поезда, оно того стоит. Давайте рассмотрим несколько способов переноса операций предварительной обработки на графический процессор.

Отложить операции предварительной обработки на GPU

В большинстве случаев лучший способ разгрузить ЦП - это переместить операции, которые выполняются в конце конвейера предварительной обработки, на ГП. Ориентируясь на эти «хвостовые» операции, а не на операции в середине конвейера, мы избегаем накладных расходов на передачу данных между GPU и CPU. Если на входе модели выполняются «хвостовые» операции, мы можем разместить их во главе модели. Если они выполняются с данными этикетки, мы можем изменить нашу функцию потерь для выполнения этих операций перед применением фактических потерь.

В нашем примере мы удалили дополнения из нашего входного конвейера и вместо этого применили их к началу нашего графа вычислений GPU:

data_augmentation = tf.keras.Sequential(
        [preprocessing.RandomFlip("horizontal"),
         preprocessing.RandomRotation(0.1),
         preprocessing.RandomZoom(0.1)])
inputs = tf.keras.Input(shape=(32, 32, 3))
# Augment images
x = data_augmentation(inputs)
# Add the rest of the model
outputs = tf.keras.applications.ResNet50(weights=None,
                      input_shape=(32, 32, 3), classes=10)(x)
model = tf.keras.Model(inputs, outputs)

Применяя это изменение, мы можем сократить время выполнения до 108 секунд за эпоху.

С tf.device

Обернув операции с помощью оператора tf.device (‘/ device: GPU: 0’), мы можем принудительно запустить определенные операции на графическом процессоре. Обратной стороной этого метода является то, что он требует передачи данных на устройство с графическим процессором и обратно.

В нашем примере мы решили применить эту технику к функции размытия, изменив ее следующим образом:

def blur(image, label):
    import tensorflow_addons as tfa
    with tf.device('/device:GPU:0'):
        image = tfa.image.gaussian_filter2d(image=image,
                           filter_shape=(11, 11), sigma=0.8)
        return image, label

При запуске функции размытия на графическом процессоре таким образом, оставляя аугментации на центральном процессоре, мы получаем время выполнения эпохи 97 секунд. При сочетании обоих методов время выполнения эпохи составляет 98 секунд.

Используя средство просмотра трассировки профилировщика TensorFlow, мы можем увидеть, как метод tf.device увеличивает трафик данных между ЦП и ГП:

Сравнивая выделенные потоки в этом эксперименте с теми же потоками в захвате средства просмотра трассировки выше, мы видим, что имеется значительно больше копий памяти в и из GPU. Мы также видим, что графический процессор намного активнее.

Другой способ проверить, действительно ли функция размытия выполняется на ЦП, - это установить tf.debugging.set_log_device_placement (True). Вы можете запустить пример один раз с функцией размытия на ЦП и один раз с функцией размытия на графическом процессоре, и посмотреть, как это влияет на вывод процедуры размещения устройства журнала.

NVIDAI DALI

NVIDAI DALI - это фреймворк для построения высокооптимизированных конвейеров предварительной обработки. В частности, с помощью NVIDIA DALI вы можете запрограммировать части вашего конвейера или весь конвейер для работы на графическом процессоре. Трубопровод DALI построен на основе операций DALI. DALI поставляется со списком поддерживаемых операций, а также API для создания пользовательских операций. Используя плагин TensorFlow DALI, конвейеры DALI могут быть обернуты tf.data.Dataset API-совместимым, DALIDataset, как показано здесь. Кроме того, DALI поддерживает загрузку из файлов TFRecord, как показано здесь. К сожалению, на момент написания этой статьи документированная поддержка DALI ограничена TensorFlow, совместимым с версией 1. (Те из вас, кто читал мои предыдущие блоги, уже должны знать, как я отношусь к использованию устаревшего кода.) Кроме того, NVIDIA DALI была разработана для графических процессоров NVIDIA. Он не будет работать на других ускорителях машинного обучения. Еще одно соображение - распределенное обучение. Хотя DALI поддерживает обучение с несколькими графическими процессорами, в зависимости от того, как вы реализуете распределенное обучение (например, с Horovod или стратегией распространения TensorFlow, с model.fit () или настраиваемым циклом обучения), интегрируя Конвейер DALI будет варьироваться от немного более сложного до гораздо более сложного. Если вы сильно хотите использовать новейшие функции TensorFlow или хотите, чтобы ваш код был совместим с другими ускорителями (AWS Tranium, Habana Gaudi, TPU и т. Д.), Или если вы конвертируете свой конвейер в DALI операции потребуют много работы, или, если вы полагаетесь на высокоуровневые API-интерфейсы распределенного обучения TensorFlow, NVIDIA DALI может оказаться для вас неподходящим решением.

Использование DALI требует использования пакета Python плагина TensorFlow DALI. См. Документацию для инструкций по установке. В блоке кода ниже я показываю, как преобразовать конвейер из нашего варианта использования в NVIDIA DALI. Я исключил некоторые случайные дополнения, так как не было встроенных соответствующих операций DALI.

from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
import nvidia.dali.tfrecord as tfrec
import nvidia.dali.plugin.tf as dali_tf
import tensorflow.compat.v1 as tf
tf.disable_eager_execution()

class TFRecordPipeline(Pipeline):
    def __init__(self, batch_size, num_threads,
                 device = 'cpu', device_id = 0):
        super(TFRecordPipeline, self).__init__(batch_size,
                                         num_threads,
                                         device_id)
        self.input = ops.TFRecordReader(
             path = ['data/train0.tfrecords'],
             index_path = ['index/train0'],
             features = {
              "image": tfrec.FixedLenFeature((), tfrec.string, ""),
              "label": tfrec.FixedLenFeature([], tfrec.int64,  -1)})
        self.decode = ops.Cast(device=device,dtype=types.UINT8)
        self.reshape = ops.Reshape(device=device,
                                   shape=[32, 32, 3])
        self.cast = ops.Cast(device=device,
                             dtype=types.DALIDataType.INT32)
        self.blur = ops.GaussianBlur(device=device,
                                     window_size=11,sigma=0.8)
        self.iter = 0
    def define_graph(self):
        inputs = self.input()
        images = self.decode(inputs["image"].gpu())
        images = self.reshape(images)
        images = self.blur(images)/255.
        labels = self.cast(inputs["label"].gpu())
        return (images, labels)
    def iter_setup(self):
        pass

if __name__ == "__main__":
    batch_size = 1024
    shapes = ((batch_size, 32, 32, 3),
              (batch_size))
    pipe = TFRecordPipeline(batch_size=batch_size, 
                            num_threads=4, 
                            device='gpu', 
                            device_id=0)
    with tf.device('/gpu:0'):
        # Create dataset
        ds = dali_tf.DALIDataset(
            pipeline=pipe,
            batch_size=batch_size,
            output_shapes=shapes,
            output_dtypes=(tf.float32, tf.int32),
            device_id=0)
        model = tf.keras.applications.resnet.ResNet50(
                   weights=None, 
                   input_shape=(32, 32, 3), 
                   classes=10)
        model.compile(
             loss=tf.keras.losses.SparseCategoricalCrossentropy(),
             optimizer=tf.keras.optimizers.Adam())
        model.fit(ds, steps_per_epoch=100, epochs=10)

Я запускал скрипт в TensorFlow 2.3 (казалось, что на момент написания этой статьи DALI не был обновлен для поддержки TensorFlow 2.4). Итоговое время выполнения 100-шаговой эпохи составило 77 секунд. Хотя это испытание не включало дополнений, очевидно, что DALI предлагает потенциал для значительного улучшения времени выполнения.

Как я упоминал выше, для операций выгрузки на графический процессор может потребоваться освобождение некоторой памяти для уменьшения размера пакета обучения. Оказывается, в нашем игрушечном примере этого не требовалось. (Это, вероятно, означает, что мы могли бы начать с большего размера пакета.) Этот результат не обязательно будет перенесен на другие модели, особенно если вы стараетесь максимально увеличить память графического процессора и размер пакета.

Выгрузка предварительной обработки данных на другие машины

Последний вариант, который мы исследуем, - переложить часть операций предварительной обработки на другие машины. Вместо того, чтобы переносить предварительные вычисления на GPU, мы перенесем их на ядра CPU на вспомогательных машинах. Мы рассмотрим этот подход с помощью относительно новой функции службы данных TensorFlow. Представленный в TensorFlow версии 2.3, tf.data.experimental.service предоставляет API-интерфейсы для определения выделенных рабочих машин для выполнения предварительной обработки данных. Сервер диспетчеризации отвечает за распределение задач предварительной обработки на один или несколько рабочих серверов, каждый из которых загружает необработанные данные непосредственно из хранилища и отправляет обработанные данные на устройство GPU. Применяя tf.data.experimental.service.distribute к набору данных, вы можете запрограммировать набор данных для выполнения всех операций предварительной обработки до точки приложения на выделенных рабочих процессах. Количество и типы используемых рабочих сервисов, а также то, где они будут применяться, должны определяться такими соображениями, как серьезность вашего узкого места, доступность и стоимость вспомогательных машин, способ воздействия операций предварительной обработки. размер данных и то, как это влияет на сетевой трафик. Например, если вы выберете удаленную рабочую машину с низкой пропускной способностью сети и запрограммируете операцию предварительной обработки, которая увеличивает размер данных для запуска на рабочем компьютере, вы можете не увидеть никакого улучшения производительности.

Давайте продемонстрируем использование этого API на нашем игрушечном примере. Для этой демонстрации я выбрал один вспомогательный инстанс Amazon EC2 c5.4xlarge с 16 ядрами ЦП и тем же AMI Amazon Deep Learning. Связь между p2.xlarge и c5.4xlarge будет использовать сетевой протокол grpc, поэтому вам необходимо убедиться, что оба экземпляра находятся в группе безопасности, которая разрешает входящий трафик соответствующего протокола, один от другого.

На рабочем устройстве мы запускаем следующий скрипт, где «10.0.1.171» - это IP-адрес вспомогательного устройства:

import tensorflow as tf
d_config = tf.data.experimental.service.DispatcherConfig(port=5000)
dispatcher = tf.data.experimental.service.DispatchServer(d_config)
w_config = tf.data.experimental.service.WorkerConfig(port=5001,
    dispatcher_address=dispatcher.target.split("://")[1],
    worker_address='10.0.1.171:5001')
worker = tf.data.experimental.service.WorkerServer(w_config)
dispatcher.join()

Обратите внимание, что мы запускаем диспетчерский сервер и рабочий сервер на одном компьютере. Мы также следим за тем, чтобы файлы TFRecord были скопированы на этот компьютер, поскольку рабочие будут загружать необработанные данные из этих файлов. На машине с графическим процессором мы изменили сценарий поезда следующим образом:

autotune = tf.data.experimental.AUTOTUNE
options = tf.data.Options()
options.experimental_deterministic = False
records = tf.data.Dataset.list_files('data/*', shuffle=True).with_options(options)
ds = tf.data.TFRecordDataset(records, num_parallel_reads=autotune).repeat()
ds = ds.map(parse_image_function, num_parallel_calls=autotune)
ds = ds.map(blur, num_parallel_calls=autotune)
# use the TensorFlow Data Service
ds = ds.apply(tf.data.experimental.service.distribute(
      processing_mode="parallel_epochs",
      service='grpc://10.0.1.171:5000'))
ds = ds.batch(batch_size)
ds = ds.map(rescale,num_parallel_calls=autotune)
ds = ds.map(augment, num_parallel_calls=autotune)
ds = ds.prefetch(autotune)

Обратите внимание, что мы запрограммировали только синтаксический анализ записи и функцию сильного размытия для работы с воркером. Дозирование и дополнения остаются на основном устройстве.

Результаты запуска этой установки не могут быть лучше! Время выполнения на эпоху составляет 58 секунд, что соответствует цели, которую мы для себя установили выше. Используя вспомогательное устройство ЦП и службу данных TensorFlow для разгрузки предварительных вычислений, мы полностью решили узкое место ЦП! И действительно, мы обнаружили, что средняя загрузка графического процессора в этом случае выросла примерно на 97%.

Резюме

В таблице ниже мы суммируем наши выводы по нашей игрушке, модели resnet50:

В этом посте мы рассмотрели несколько способов устранения узких мест в производительности конвейера ввода данных. В частности, мы показали, как можно использовать TensorFlow Data Service для полного устранения этого узкого места. Этот обзор не претендует на полноту. Вероятно, появятся дополнительные инструменты и методы. Хотя мы продемонстрировали, как применять эти методы к игрушечной модели Resnet50, их влияние наверняка будет различаться в зависимости от модели и набора данных. Не стесняйтесь делиться своими собственными инструментами, методами и опытом.