How to Optimize Your DL Data-Input Pipeline with a Custom PyTorch Operator | by Chaim Rand | Aug, 2023

How to Optimize Your DL Data-Input Pipeline with a Custom PyTorch Operator | by Chaim Rand | Aug, 2023

PyTorch Model Performance Analysis and Optimization — Part 5

Chaim Rand
Towards Data Science
Photo by Alexander Grey on Unsplash

This post is the fifth in a series of posts on the topic of performance analysis and optimization of GPU-based PyTorch workloads and a direct sequel to part four. In part four, we demonstrated how PyTorch Profiler and TensorBoard can be used to identify, analyze, and address performance bottlenecks in the data pre-processing pipeline of a DL training workload. In this post we discuss PyTorch’s support for creating custom operators and demonstrate how it enables us to solve performance bottlenecks on the data input pipeline, accelerate DL workloads, and reduce the cost of training. Thanks go to Yitzhak Levi and Gilad Wasserman for their contributions to this post. The code associated with this post can be found in this GitHub repository.

PyTorch offers a number of ways for creating customized operations including extending torch.nn with custom Modules and/or Functions. In this post we are interested in PyTorch’s support for integrating customized C++ code. This capability is important due to the fact that some operations can be implemented (much) more efficiently and/or easily in C++ than in Python. Using designated PyTorch utilities, such as CppExtension, these operations can be easily included as “extensions” to PyTorch without needing to pull and recompile the entire PyTorch code base. For more on the motivation behind this feature and details of how to use it, please see the official PyTorch tutorial on custom C++ and CUDA extensions. Since our interest in this post is to accelerate the CPU-based data pre-processing pipeline, we will suffice with a C++ extension and not require CUDA code. In a future post we hope to demonstrate how to use this functionality to implement a custom CUDA extension in order to accelerate training code running on the GPU.

In our previous post we defined a data input pipeline that started with decoding a 533x800 JPEG image and then extracting a random 256x256 crop which, following a few additional transformations, is fed into the training loop. We used PyTorch Profiler and TensorBoard to measure the time associated with loading the image from file and acknowledged the wastefulness of decoding. For the sake of completeness, we copy in the code below:

import numpy as np
from PIL import Image
from torchvision.datasets.vision import VisionDataset
input_img_size = [533, 800]
img_size = 256

class FakeDataset(VisionDataset):
def __init__(self, transform):
super().__init__(root=None, transform=transform)
size = 10000
self.img_files = [f'{i}.jpg' for i in range(size)]
self.targets = np.random.randint(low=0,high=num_classes,
size=(size),dtype=np.uint8).tolist()

def __getitem__(self, index):
img_file, target = self.img_files[index], self.targets[index]
img = Image.open(img_file)
if self.transform is not None:
img = self.transform(img)
return img, target

def __len__(self):
return len(self.img_files)

transform = T.Compose(
[T.PILToTensor(),
T.RandomCrop(img_size),
RandomMask(),
ConvertColor(),
Scale()])

Recall from our previous post that the optimized average step time we reached was 0.72 seconds. Presumably, were we able to decode only the crop in which we were interested, our pipeline would have run faster. Unfortunately, as of the time of this writing PyTorch does not include a function that supported this. However, using the tools for custom-op creation, we can define and implement our own function!

The libjpeg-turbo library is a JPEG image codec that includes a number of enhancements and optimizations compared to libjpeg. In particular, libjpeg-turbo includes a number of functions that enable us to decode only a predefined crop within an image such as jpeg_skip_scanlines and jpeg_crop_scanline. If you are running in a conda environment you can install with the following command:

conda install -c conda-forge libjpeg-turbo

Note that libjpeg-turbo comes pre-installed in the official AWS PyTorch 2.0 Deep Learning Docker image that we will use in our experiments below.

In the code block below we modify the decode_jpeg function of torchvision 0.15 to decode and return a requested crop from an input JPEG encoded image.

torch::Tensor decode_and_crop_jpeg(const torch::Tensor& data,
unsigned int crop_y,
unsigned int crop_x,
unsigned int crop_height,
unsigned int crop_width) {
struct jpeg_decompress_struct cinfo;
struct torch_jpeg_error_mgr jerr;

auto datap = data.data_ptr();
// Setup decompression structure
cinfo.err = jpeg_std_error(&jerr.pub);
jerr.pub.error_exit = torch_jpeg_error_exit;
/* Establish the setjmp return context for my_error_exit to use. */
setjmp(jerr.setjmp_buffer);
jpeg_create_decompress(&cinfo);
torch_jpeg_set_source_mgr(&cinfo, datap, data.numel());

// read info from header.
jpeg_read_header(&cinfo, TRUE);

int channels = cinfo.num_components;

jpeg_start_decompress(&cinfo);

int stride = crop_width * channels;
auto tensor =
torch::empty({int64_t(crop_height), int64_t(crop_width), channels},
torch::kU8);
auto ptr = tensor.data_ptr();

unsigned int update_width = crop_width;
jpeg_crop_scanline(&cinfo, &crop_x, &update_width);
jpeg_skip_scanlines(&cinfo, crop_y);

const int offset = (cinfo.output_width - crop_width) * channels;
uint8_t* temp = nullptr;
if(offset > 0) temp = new uint8_t[cinfo.output_width * channels];

while (cinfo.output_scanline < crop_y + crop_height) {
/* jpeg_read_scanlines expects an array of pointers to scanlines.
* Here the array is only one element long, but you could ask for
* more than one scanline at a time if that's more convenient.
*/
if(offset>0){
jpeg_read_scanlines(&cinfo, &temp, 1);
memcpy(ptr, temp + offset, stride);
}
else
jpeg_read_scanlines(&cinfo, &ptr, 1);
ptr += stride;
}
if(offset > 0){
delete[] temp;
temp = nullptr;
}
if (cinfo.output_scanline < cinfo.output_height) {
// Skip the rest of scanlines, required by jpeg_destroy_decompress.
jpeg_skip_scanlines(&cinfo,
cinfo.output_height - crop_y - crop_height);
}
jpeg_finish_decompress(&cinfo);
jpeg_destroy_decompress(&cinfo);
return tensor.permute({2, 0, 1});
}

PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) {
m.def("decode_and_crop_jpeg",&decode_and_crop_jpeg,"decode_and_crop_jpeg");
}

The full C++ file can be found here.

In the next section, we will follow the steps in the PyTorch tutorial in order to convert this into a PyTorch operator that we can use in our pre-processing pipeline.

As described in the PyTorch tutorial, there are different ways of deploying a custom operator. There are a number of considerations that might factor into your deployment design. Here are a few examples of what we find important:

  1. Just in time compilation: In order to ensure that our C++ extension is compiled against the same version of PyTorch that we train with, we program our deployment script to compile the code right before training within the training environment.
  2. Multi-process support: The deployment script must support the possibility that our C++ extension will be loaded from multiple processes (e.g., multiple DataLoader workers).
  3. Managed-training support: Since we often train in managed training environments (such as Amazon SageMaker) we require that the deployment script support this option. (See here for more on the topic of customizing a managed training environment.)

In the code block below we define a simple setup.py script that compiles and installs our custom function, as described here.

from setuptools import setup
from torch.utils import cpp_extension

setup(name='decode_and_crop_jpeg',
ext_modules=[cpp_extension.CppExtension('decode_and_crop_jpeg',
['decode_and_crop_jpeg.cpp'],
libraries=['jpeg'])],
cmdclass={'build_ext': cpp_extension.BuildExtension})

We place our C++ file and the setup.py script in a folder named custom_op and define an __init__.py that ensures that the setup script is run a single time and by a single process:

import os
import sys
import subprocess
import shlex
import filelock

p_dir = os.path.dirname(__file__)

with filelock.FileLock(os.path.join(pkg_dir, f".lock")):
try:
from custom_op.decode_and_crop_jpeg import decode_and_crop_jpeg
except ImportError:
install_cmd = f"{sys.executable} setup.py build_ext --inplace"
subprocess.run(shlex.split(install_cmd), capture_output=True, cwd=p_dir)
from custom_op.decode_and_crop_jpeg import decode_and_crop_jpeg

Last, we revise our data input pipeline to use our newly created customized function:

from torchvision.datasets.vision import VisionDataset
input_img_size = [533, 800]
class FakeDataset(VisionDataset):
def __init__(self, transform):
super().__init__(root=None, transform=transform)
size = 10000
self.img_files = [f'{i}.jpg' for i in range(size)]
self.targets = np.random.randint(low=0,high=num_classes,
size=(size),dtype=np.uint8).tolist()

def __getitem__(self, index):
img_file, target = self.img_files[index], self.targets[index]
with torch.profiler.record_function('decode_and_crop_jpeg'):
import random
from custom_op.decode_and_crop_jpeg import decode_and_crop_jpeg
with open(img_file, 'rb') as f:
x = torch.frombuffer(f.read(), dtype=torch.uint8)
h_offset = random.randint(0, input_img_size[0] - img_size)
w_offset = random.randint(0, input_img_size[1] - img_size)
img = decode_and_crop_jpeg(x, h_offset, w_offset,
img_size, img_size)

if self.transform is not None:
img = self.transform(img)
return img, target

def __len__(self):
return len(self.img_files)

transform = T.Compose(
[RandomMask(),
ConvertColor(),
Scale()])

Following the optimization we have described, our step time drops to 0.48 seconds (from 0.72) for a 50% performance boost! Naturally, the impact of our optimization is directly related to the size of the raw JPEG images and our choice of crop size.

Bottlenecks in the data pre-processing pipeline are common occurrences that can cause GPU starvation and slow down training. Given the potential cost implications, it is imperative that you have a variety of tools and techniques for analyzing and solving them. In this post we have reviewed the option of optimizing the data input pipeline by creating a custom C++ PyTorch extension, demonstrated its ease of use, and shown its potential impact. Of course, the potential gains from this kind of optimization mechanism will vary greatly based on the project and the details of the performance bottleneck.

What Next? The optimization technique discussed here joins a wide range of input pipeline optimization methods we have discussed in many of our blog posts. We encourage you to check them out (e.g., starting here).

Source link

Leave a Reply