This post was originally published by Chaim Rand at Towards Data Science
What to look out for when scaling your training to multiple workers
These days data distributed training is all the rage. In data distributed training learning is performed on multiple workers in parallel. The multiple workers can reside on one or more training machines. Each worker starts off with its own identical copy of the full model and performs each training step on a different subset (local batch) of the training data. After each training step it publishes its resultant gradients and updates its own model taking into account the combined knowledge learned by all of the models. Denoting the number of workers by k and the local batch size by b, the result of performing distributed training on k workers is that at each training step, the model trains on a global batch size of k*b samples. It is easy to see the allure of data distributed training. More samples per train step means faster training, faster training means faster convergence, and faster convergence means faster deployment. Why train ImageNet for 29 hours if we can train it in one? Why train BERT for 3 days if we can train it in just 76 minutes? For especially large or complex networks, distributed training is all but essential for the model to train in a time period that is low enough for the model to be usable.
The problem is that many tutorials on the topic might give you the impression that converting your training script to perform data distributed training is almost as easy as flicking a switch, and that linear scaling of training convergence is all but guaranteed. However, for many models, certainly complex models, this could not be further from the truth. There are many details that need to be addressed to succeed at distributed training including how to adjust the optimizer settings, how to minimize the overhead of sharing gradients, and how the training data is processed. The intention of this blog post is to give you a sense of the “real deal” with data distributed training. We will cover some of the implementation details and challenges, as well as emphasize the importance of setting clear performance goals and having appropriate tools for measuring those goals.
While my focus will be on TensorFlow 2 Keras models (specifically TensorFlow 2.3) the main points we will cover are just as relevant to other training frameworks as well.
A cloud service environment such as Amazon Web Services (AWS), Google Cloud Platform (GCP), or Microsoft Azure is a natural environment for multi worker training. The most obvious reason for this is the offering of virtually infinite scale. In the cloud I can scale to as many workers as I want depending on my development scheduling commitments and cost constraints. Another advantage of training in the cloud is that it spares you the time and headache of setting up and maintaining your own machines for multi worker training. In addition, cloud environments often offer specific services for facilitating multi worker training. I will make some references to services offered by AWS but, again, the general comments are just as relevant to other environments and it is likely that equivalent or similar support exists in other cloud environments as well.
Important Disclaimer: I have been sitting on a draft of this post for quite a while. My feeling was that the post was never quite complete. There were always more frameworks, other data gradient sharing methods, and other optimizers to consider. After a while I came to the realization that with all the new papers, algorithms, and techniques constantly being introduced the post would never be complete. So here it is, for what it’s worth… my incomplete blog post. If there are additions that you believe should be included please don’t hesitate to reach out to me.
Note: In the context of this post a worker refers to an individual processing core (e.g. GPU). In other contexts the term worker might refer to a training instance that may contain multiple cores. In the TensorFlow documentation, for example, a worker refers to a training instance consisting of one or more compute devices.
There are two main challenges when running data distributed training:
Model convergence: Suppose that when running on a single worker our model converges after training on N samples, or after n traversals of the training data. When scaling to multiple workers of the same type, our goal will be to converge after the same overall number of samples/traversals of the training data. For example, if we scale to K workers, we will aim for convergence by the time each worker has seen n/k samples or traversed the training data N/k times. As we will see below, reaching this goal often requires tuning some of the elements of the training algorithm.
The training time performance: Training time performance refers to the speed at which we are able to train. We will measure this by the number of samples per second on which the model trains. If we are able to train at a rate of R samples per second on a single worker, our goal will be to train at a rate of R*k samples per second when running on k workers of the same type. A priori, it is not immediately obvious how this goal should be achievable given the fact that data-distributed training includes the overhead of an additional step that does not exist when training on a single worker, i.e. the gradient sharing. We will discuss below different techniques for minimizing this overhead as well as other implementation details that might affect the training speed
If we can achieve both goals, converging on the same overall number of samples and in 1/k the amount of time, then we have succeeded to scale our training linearly as a function of the number of workers. Linear scaling means that if training on a single worker takes T time, then training on k workers will take T/k. In many cases you might find that you are able to train successfully with multiple workers but you are unable to achieve linear scaling. This might still be good enough. If you are able to train in significantly less than T time, the cost of running on k workers might be worth the expedited development. In such cases, you will need to make clear decisions on what the acceptable tradeoffs are. Clearly, if training on k workers takes T time, i.e. just as long as a single worker, then you are better off sticking to one worker. But what if you are able to cut the development time in half using 4 workers? Under certain circumstances this might be an acceptable, albeit less than ideal, tradeoff.
Note that while measuring the impact of different implementation details on the rate of training (in samples per second) requires just a few training iterations, measuring the impact on the time to convergence can require considerably more time.
We will keep these two fundamental challenges in mind throughout our discussion and examine how they are impacted by different implementation choices.
Performance analysis refers to the measurement of the speed at which the training is running (in samples per second) and the manner in which the training resources are being utilized.
In previous posts (e.g. here and here) I have advocated for having strong tools and methodologies for analyzing performance and for integrating performance analysis into your DNN development processes. The importance of performance profiling is even greater when scaling to multiple workers. In fact, in my view, it is a prerequisite. As discussed above, one of the main challenges of distributed training is maintaining acceptable training time performance. Obviously, in order to do this, we require the appropriate tools in order to develop a full understanding of: our single worker performance, how the performance changes as we scale to multiple workers, and how this is impacted by different implementation decisions.
Before even considering scaling to multiple workers you should first make sure you have exhausted all potential for getting the most performance out of your single worker. Not only is this a much cheaper way of increasing training speed, but it will also magnify the potential performance gains when you do ultimately scale to multiple workers. For example, if your single worker utilization is at 25%, you’d be much better off working on identifying and solving your performance bottlenecks before even considering data distribution. You could potentially boost your performance by a factor of four without the cost of additional workers and without the additional overhead of gradient sharing.
There are many different tools and techniques for measuring performance. The TensorFlow Profiler offers many performance statistics for evaluating the models training in TensorFlow including step time, trace viewing, memory utilization, device to device data transfer (gradient sharing), and more. When using the Amazon SageMaker service you can take advantage of the profiling functionalities of Amazon SageMaker Debugger to measure system resource utilization and find correlations between patterns in resource utilization and the training phase.
Be sure to check out some of my previous posts on performance profiling for more details on this important topic.
One of the first decisions you will need to make when scaling to multi-worker training is what framework to use. In this post we will focus on two of the available options:
- Horovod — a popular library that supports TensorFlow, Keras, PyTorch, and Apache MXNet, and
- the distributed training support that is built into TensorFlow.
What both options have in common is that they both enable you to convert your training script to run on multiple workers with just a few lines of code. The documentation of both is comprehensive and easy to use, and tutorials exist in abundance. The gradient sharing algorithms are also quite similar (more on this below). The TensorFlow distributed training option is integrated into TensorFlow and thus has more intimate knowledge of its inner workings. A single instance of the training script is invoked on each instance, and TensorFlow handles the duplication of the model on the multiple workers, the distribution of input across workers, and the gradient sharing. Horovod, on the other hand, is more of a black box solution in that it has just a few points of interaction with the training cycle, primarily the gradient accumulation. In the Horovod option the training script is run for every worker. Each invocation of the script creates a copy of the model on its own corresponding worker and manages the input data stream independently.
It would be difficult to point to any one of the two options as being better than the other. In many cases the frameworks performance is dependent on the model architecture, model hyperparameters, and other details of the distributed training implementation. My strong recommendation would be to familiarize yourself with both. First, it’s always good to have options. More importantly, you might find that some of your models scale better with one framework over the other. Also, you might come across situations where one of the options is not available. For example, up until TensorFlow version 2.4, there was a bug in the built-in distributed training code that prohibited one from using batch normalization layer with the renorm flag set to False. On the other hand, if you choose to run your training on TPU cores you have no choice other than to use the built-in TensorFlow TPU distribution strategy. Lastly, you might find that some desirable features are implemented only on one of the frameworks. For example, in a recent post I expanded on Elastic Horovod, a compelling feature of Horovod that enables continuous training even in the event of interruptions to some of the workers.
One additional framework worth mentioning is Amazon SageMaker’s Distributed Data Parallel Library. As we mentioned above and will discuss in more detail in the next section, one of the main challenges of distributed training is how to reduce the overhead of the gradient sharing. The SageMaker library introduces a new algorithm that takes advantage of the specific Amazon cloud infrastructure internals to optimize the gradient data communication. Take a look at the recent announcement of this feature for more details.
The core ingredient of a successful data-distributed training run is the gradient sharing strategy. A strong strategy needs to both 1. ensure that all of the workers are synchronized in their training and 2. do so in a manner that minimizes the overhead. If your gradient sharing algorithm does not succeed in maintaining synchrony between the worker models then you may as well just run many independent (wasteful) training sessions on each worker. If your algorithm succeeds in maintaining synchrony, but in a way that triples the step time, then the value of using multiple workers will be low.
We will separate the discussion into two parts. First we will talk about some of the main ways in which gradient sharing algorithms can differ. Then, assuming a fixed gradient sharing algorithm, we will cover some of the techniques for reducing the overhead of the data traffic.
Gradient Sharing Algorithms
Gradient sharing algorithms can be broadly classified according to the following two parameters:
Parameter Server vs Peer to Peer Communication: In a parameter server based algorithm one or more dedicated parameter servers collects gradient updates for all of the workers and then broadcasts the results back to the workers. In a solution that uses multiple parameter servers the variables are distributed between servers. Depending on the implementation, the parameter servers may reside either on the CPU or on the GPU. They can reside on dedicated servers or on the existing training resources. See, for example, TensorFlow’s solution for gradient sharing based on parameter servers.
In a peer to peer based solution, each worker communicates its resultant gradients and collects the gradient updates from the other workers. There are a variety of AllReduce algorithms for collecting and accumulating the gradients. An example of a naïve AllReduce algorithm is one in which each worker sends its gradients to each one of the other workers. This results in an overall data payload of k*(k-1)*G where k is the number of workers and G is the size of all of the gradients. However, many far more optimal AllReduce algorithms exist. The most common one is the Ring-AllReduce in which multiple messages pass between workers in a one-directional ring. Using this technique, the overall data payload can be reduced to 2*(k-1)*G dispersed over 2*(k-1) communication hops. Take a look at this post for a detailed description on how Ring-AllReduce works:
Often, AllReduce algorithms will be designed so as to maximize utilization of the underlying hardware. See, for example, TensorFlow’s HierarchicalCopyAllReduce.
The specific implementation of the either option, parameter server or peer to peer communication, dictates what the map of data flow will be — the channels of communication and the amount of data that will pass on each channel.
Synchronous vs Asynchronous Algorithms: In a synchronous algorithm the training steps on all workers are performed in synchrony. After each step each worker waits for all of the other workers to share their gradients and proceeds to the next step after applying the resultant parameter update to its own copy of the model. Synchronous algorithms ensure that the model replicas are aligned at all stages of the training. In an asynchronous algorithm (usually associated with a parameter server based strategy) workers do not wait. They collect whatever updates are available, immediately apply them, and proceed to the next step. As a result the state of the models on each replica are likely to differ at each step. Naturally, the asynchronous approach will have lower latency. On the other hand, the added noise resulting from the differences in model replicas might have a negative impact on the rate of convergence.
Obviously, the optimal choice of algorithm is very much dependent on the specific model and the training environment. The most common algorithm offered today, and a good place to start, is the synchronous Ring-AllReduce algorithm or some variation of it. This is the underlying algorithm used by Horovod and is also the default algorithm used by TensorFlow’s mirrored strategy. Take a look at TensorFlow’s distributed training guide for an overview of the additional distributed strategies that are supported.
Optimizing Data Transfer
Clearly, the choice of gradient sharing algorithm will impact the amount of data traffic and thus the overall overhead to the training time. In this section we assume a fixed algorithm and discuss different techniques for reducing the overhead of the data flow.
- Utilize dedicated hardware to accelerate data transfer rates: For example, if your workers are connected via high speed interconnects they should be the preferred channel of communication. In most cases you should prefer to distribute your training on a single instance with 8 interconnected workers over 8 single worker instances that will communicate over the network. Another example is Amazon SageMaker’s Distributed Data Parallel Library mentioned above that implements a parameter server based solution that is able to provide competitive results by utilizing specific components of the underlying AWS cloud infrastructure. See the white paper for details.
As a final example, one of the main reasons that dedicated training accelerators such as Google TPU and Habana Gaudi are able to demonstrate impressive scale rates is that they are specifficaly designed with the intention of maximizing the rate of the data flow between the individual workers.
- Tuning the gradient sharing controls: Some of the gradient sharing algorithms might include controls for fine tuning their performance. For example, performance of some algorithms might be impacted by the amount of system resources allocated to them.
- Reducing precision of gradients: You can reduce the payload of the gradient sharing by reducing the precision of the gradients (such as using float16 instead of float32) or using some other compression method. Make sure that your training does not suffer from this kind of manipulation.
- Overlap gradient sharing with the back propagation: A trivial algorithm will wait until all gradients are calculated before broadcasting the results. However, one could communicate the individual gradients as soon as they are calculated. By breaking the gradient update message into multiple packets containing subsets of the gradients ordered according to when they are calculated, we can reduce the overhead of the gradient sharing by overlapping it with the back propagation.
To see how to implement some of these techniques in TensorFlow, I recommend the following video from last year’s TFDevSummit.
Note: Our focus in this section has been on the potential overhead introduced to the training step time. However, there are other possible side affects of gradient sharing that you should be mindful of. For example, training instances have an upper bound on their network IO bandwidth. Whereas in a training environment with a single instance this bandwidth is used almost exclusively for data input, in a multi instance environment in which gradient sharing is performed over the network, this network bandwidth is now shared between data input and the gradient messages. If your single instance has a network IO bottleneck this will have negative implications on your ability to scale linearly. Similarly, if you are CPU bottlenecked and you choose a gradient sharing algorithm that requires CPU cycles, you will also face increasing challenges scaling linearly.
There are two main strategies for managing the input data in a data distributed training scenario. In the first, we ensure that each worker processes a disjoint subset of the data by sharding (splitting) the input. In the second, we simply feed randomly shuffled copies of the complete dataset into all the workers. While this might sound simple enough, there are a number of implementation details that could have a meaningful impact on your ability to train successfully.
There are a number of ways to shard your data. You can shard your data during the dataset creation. If your data is grouped into files, you can shard the list of files during the creation of the Dataset object. The worst thing to do would be to process all of the input data and shard it at the entry to the workers as this: introduces wasteful overhead, could introduce a performance bottleneck on the CPU and/or on the network input, and could slow down your training considerably. Unfortunately, this is sometimes the default behavior of TensorFlow’s dataset distribution. As documented here, TesnorFlow will attempt to shard by files, but if unsuccessful, it will shard at the dataset record level. Note that in the context of the TensorFlow documentation a worker refers to a training instance.
Example — TensorFlow with PipeModeDataset: As I have expanded on in the past, one compelling way to train on large datasets in Amazon’s SageMaker environment is to use Amazon SageMaker Pipe Mode. Unfortunately, TensorFlow’s default behavior on the PipeModeDataset will be to shard at the record level. Recall that on each training instance Tensorflow will manage the data input for all workers. Suppose we are running with 4 instances with 8 GPU workers each, for a total of 32 workers. Each training instance will end up pulling and processing 4 times the amount of data that it actually needs. Given the bounds on both Network input bandwidth and on the CPU cycles, this might introduce a bottleneck and slow you down. One solution is to configure the pipe input creation with the distribution setting set to ‘ShardedByS3Key‘ combined with customizing the dataset sharding using the distribute_datasets_from_function function. This way, we can make sure that each instance processes only the records that will be used.
Random Shuffles of the Full Dataset
An alternative to sharding is to simply enter random shuffles of the full dataset to each of the workers. Note that the resulting data traversal can be different than in the case of sharding. In particular, the random shuffle may result in certain records being processed multiple times in the beginning of training and not at all towards the end of training rather than being dispersed evenly over the course of training. In most cases this alternative does not impact the rate of convergence, but there could be models that are sensitive to this.
Example — Horovod with PipeModeDataset: Contrary to TensorFlow, in Horovod the input to each worker is handled independently. In particular, in the case of k workers the sharding option would require defining k unique datasets. However, if you use Amazon SageMaker Pipe Mode, you will find that you are limited to 20 pipes per training job. Even if you require just a single pipe per worker you will be limited to 20 workers. If you require more pipes per worker you will have a lower limit. One possible solution combines sharding with shuffling. Suppose each worker requires a single pipe and that you have 4 instances with 8 GPU workers each. Create 8 pipes, with indexes 1 through 8, all pointing to the same data source, each pipe with the distribution setting set to ‘ShardedByS3Key’ and each with a unique ShuffleConfig instance i.e. a unique shuffle seed. On each instance connect worker with local id (local_rank) i to the pipe with the same id. The result is that each of the 8 pipes will contain a unique shuffle and that, for each epoch, all workers of local id i will traverse the complete dataset associated with pipe i. With this kind of solution you can scale to an unlimited number of training instances without requiring additional pipes. You will still be limited to at most 2 pipes per worker (e.g. train and test) since 3 would require 3*8=24 total pipes which is larger than the 20 pipe limit, but this restriction is far more manageable.
As we stated in the very first section, one of the two main challenges to successful distributed training is to minimize the rise in the number of overall samples (or the overall number of traversals of the training dataset) required to reach model convergence. When running training in a distributed setting with k workers the global batch size increases by a factor of k. In many cases you will find that the model hyperparameters that were used to learn for the original batch size no longer work for the large batch size. This is especially true if the batch size is much larger, i.e. in the case where we have a large number of workers. Many tutorials might have you believe that all that is required is a (linear) correction to the learning rate of the chosen model optimizer. Sadly, for non-trivial models, this is often not the case. You may discover that finding the appropriate hyperparameters for convergence in the case of k workers can be difficult. You might even come to the conclusion that it can’t be done. To make matters worse, you might succeed in finding a recipe that works for k workers, only to discover that the same recipe fails on a different number of workers. Addressing this problem is an active area of research with dedicated training optimizers such as LAMB and LARS being proposed for this task. Keep in mind that changing either the optimizer or one of the other hyperparameters might increase the training step time, making the goal of reaching linear scaling that much harder. For example, the TensorFlow Addons Library includes an implementation of the LAMB optimizer. However, you might find that it increases the step time compared to your default optimizer in which case you may want to consider creating a custom kernel to ensure its optimality.
The bottom line is that while implementing the mechanics of (highly) data parallelized training might be easy, ensuring model convergence, and in a timely manner, can sometimes be quite difficult.
Training resources are expensive and the cost typically scales linearly with the number of workers. While the high cost is a non-negotiable symptom of distributed training, we should always be conscious of this and seek opportunities to adjust our development habits to reduce cost. Here are a few suggestions:
Use low cost “spot” instances: Many cloud service providers offer significant discounts for surplus compute instances. In AWS these are called Amazon EC2 Spot Instances, in Google Cloud they are called Preemptible VM Instances, and in Microsoft Azure they are called Low-Priority VMs. The tradeoff is that such instances can be terminated mid-use if demand for them increases, and most modern day frameworks will fail the distributed training if one of the workers becomes unresponsive. However, using spot instances might be an attractive option for you, assuming that your training session is not in the critical path and that you capture checkpoints regularly that can be resumed from in the event of interruption. You can also consider using a fault tolerant solution such as Elastic Horovod, a compelling feature of Horovod that allows for uninterrupted training even in the event of spot terminations while allowing for on the fly updates to the training hyper-parameters based on the changing number of workers. Check out my recent post on this topic.
Automated monitoring and management of training sessions: It is imperative that you introduce automated methods for monitoring training sessions, identifying failing runs, and terminating them. If you rely only on manual discovery you might find yourself paying excessively for meaningless training cycles.
Use advanced hyper-parameter tuning methods: There are well known advantages to advanced (often Bayesian based) hyper-parameter tuning over simplistic methods such as manual search or grid search. Such techniques are all the more important in a distributed setting given the high cost and given the possibility of increased complexity in identifying appropriate hyper-parameters, as discussed above.
Offload evaluations to single worker instances: Using multiple instances will require some modifications to your training flow. Checkpoint management should be handled by just one of the workers (the chief worker). If you are using Horovod and your training session requires installing dependent packages, you need to ensure that the installation is performed by only one worker per instance (while the other workers sleep) so as to avoid race conditions. One element, often overlooked, are evaluations. Training flows often interleave periodic evaluations in order to monitor how the evaluation metrics change over time. While you certainly can perform distributed evaluation on multiple workers (there are easy methods for accumulating the metrics across workers, and the batch size can be tuned to maximize resource utilization) let me go out on a limb and surmise that accelerating evaluations is not the reason you are exploring using multiple instances. One thing you might consider is to spawn the evaluations onto other single worker instances rather than run them in the multi-instance environment. Not only will this spare you the overhead of distributed evaluation, but you will also enjoy an additional boost to your training speed resulting from running uninterrupted training. Of course, if you require fast evaluations and/or if your training flow depends on the results of the evaluations, this might not be a good option for you.
Perform distributed training wisely: Being smart about when you choose to perform can lead to significant cost savings. This is best demonstrated by example: Suppose you have succeeded in scaling to a k-worker setting such that your training time has been reduced by a factor of 2/k (as opposed to the desired factor of 1/k) and you have made the decision that, in the general case, the extra cost is worth the accelerated development time. Now suppose that you are kicking off a new training session in a specific setting in which you are indifferent to whether the training is completed in time T or in time 2T/k, e.g. just before heading home for the weekend. In such a case, you might want to reconsider running distributed training so as to halve the training cost.
In a recent post I spoke about some additional development habits for increasing training productivity. Such habits become all the more important in a distributed training setting.
Running data-distributed training is a compelling way to accelerate your training, reduce your development cycle time, and increase your market competitiveness. However, succeeding at data-distributed training might also require a significant development effort. Before embarking on this adventure make sure you have the appropriate tools for evaluating your progress. Set clear goals, clear success criteria, and clear bounds for acceptable scaling factors. Try to get an early sense of the complexity of tuning the model to ensure convergence on a high global batch size. Calculate your decisions wisely by implementing data distributed training only when and where it justifies the return.
This post was originally published by Chaim Rand at Towards Data Science