From 957a2a46e7725184776c3c72860e8215164cc4ef Mon Sep 17 00:00:00 2001 From: Jordan Gong Date: Sat, 13 Aug 2022 23:38:43 +0800 Subject: Implement distributed data parallel via torch elastic launcher --- simclr/evaluate.py | 63 +++++++++++++++++++++++++++++++------------------- simclr/main.py | 67 +++++++++++++++++++++++++++++++++--------------------- 2 files changed, 80 insertions(+), 50 deletions(-) (limited to 'simclr') diff --git a/simclr/evaluate.py b/simclr/evaluate.py index 5c41b84..23bd299 100644 --- a/simclr/evaluate.py +++ b/simclr/evaluate.py @@ -6,6 +6,7 @@ from typing import Iterable, Callable import sys import torch +import torch.distributed as dist import yaml from torch.utils.data import Dataset from torchvision.datasets import CIFAR10, CIFAR100, ImageNet @@ -16,7 +17,7 @@ sys.path.insert(0, path) from libs.optimizers import LARS from libs.logging import Loggers, BaseBatchLogRecord, BaseEpochLogRecord -from libs.utils import BaseConfig +from libs.utils import BaseConfig, elastic_launch from simclr.main import SimCLRTrainer, SimCLRConfig from simclr.models import CIFARSimCLRResNet50, ImageNetSimCLRResNet50 @@ -190,7 +191,7 @@ class SimCLREvalTrainer(SimCLRTrainer): if k in self.models['backbone'].state_dict()} self.models['backbone'].load_state_dict(backbone_state_dict) - def train(self, num_iters: int, loss_fn: Callable, logger: Loggers, device: torch.device): + def train(self, num_iters: int, loss_fn: Callable, logger: Loggers, device: int): backbone, classifier = self.models.values() optim_b, optim_c = self.optims.values() sched_b, sched_c = self.scheds.values() @@ -218,24 +219,33 @@ class SimCLREvalTrainer(SimCLRTrainer): if self.finetune: optim_b.step() optim_c.step() - self.log(logger, self.BatchLogRecord( - batch, num_batches, global_batch, iter_, num_iters, - optim_c.param_groups[0]['lr'], train_loss.item() - )) + + if logger is not None: + self.log(logger, self.BatchLogRecord( + batch, num_batches, global_batch, iter_, num_iters, + optim_c.param_groups[0]['lr'], train_loss.item() + )) + dist.barrier() + if (iter_ + 1) % (num_iters // 10) == 0: - metrics = torch.Tensor(list(self.eval(loss_fn, device))).mean(0) - eval_loss = metrics[0].item() - eval_accuracy = metrics[1].item() - epoch_log = self.EpochLogRecord(iter_, num_iters, - eval_loss, eval_accuracy) - self.log(logger, epoch_log) - self.save_checkpoint(epoch_log) - if sched_b is not None and self.finetune: - sched_b.step() - if sched_c is not None: - sched_c.step() - - def eval(self, loss_fn: Callable, device: torch.device): + # TODO Gather results from other workers + metrics = torch.Tensor(list(self.eval(loss_fn, device))) + if logger is not None: + metrics_mean = metrics.mean(0) + eval_loss = metrics_mean[0].item() + eval_accuracy = metrics_mean[1].item() + epoch_log = self.EpochLogRecord(iter_, num_iters, + eval_loss, eval_accuracy) + self.log(logger, epoch_log) + self.save_checkpoint(epoch_log) + dist.barrier() + + if sched_b is not None and self.finetune: + sched_b.step() + if sched_c is not None: + sched_c.step() + + def eval(self, loss_fn: Callable, device: int): backbone, classifier = self.models.values() backbone.eval() classifier.eval() @@ -250,14 +260,13 @@ class SimCLREvalTrainer(SimCLRTrainer): yield loss.item(), accuracy.item() -if __name__ == '__main__': +def main(local_rank, global_rank): args = parse_args_and_config() config = SimCLREvalConfig.from_args(args) - device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') trainer = SimCLREvalTrainer( seed=args.seed, checkpoint_dir=args.checkpoint_dir, - device=device, + device=local_rank, inf_mode=False, num_iters=args.num_iters, config=config, @@ -267,5 +276,11 @@ if __name__ == '__main__': finetune=args.finetune, ) - loggers = trainer.init_logger(args.log_dir) - trainer.train(args.num_iters, torch.nn.CrossEntropyLoss(), loggers, device) + loggers = None + if global_rank == 0: + loggers = trainer.init_logger(args.log_dir) + trainer.train(args.num_iters, torch.nn.CrossEntropyLoss(), loggers, local_rank) + + +if __name__ == '__main__': + elastic_launch(main) diff --git a/simclr/main.py b/simclr/main.py index 69e2ab2..3456d41 100644 --- a/simclr/main.py +++ b/simclr/main.py @@ -1,11 +1,12 @@ import argparse import os -import sys from dataclasses import dataclass from pathlib import Path from typing import Iterable, Callable +import sys import torch +import torch.distributed as dist import yaml from torch.utils.data import Dataset from torchvision.datasets import CIFAR10, CIFAR100, ImageNet @@ -17,8 +18,7 @@ sys.path.insert(0, path) from libs.criteria import InfoNCELoss from libs.datautils import color_distortion, Clip, RandomGaussianBlur, TwinTransform from libs.optimizers import LARS -from libs.schedulers import LinearWarmupAndCosineAnneal, LinearLR -from libs.utils import Trainer, BaseConfig +from libs.utils import Trainer, BaseConfig, elastic_launch from libs.logging import BaseBatchLogRecord, Loggers from simclr.models import CIFARSimCLRResNet50, ImageNetSimCLRResNet50 @@ -254,11 +254,12 @@ class SimCLRTrainer(Trainer): self.optims = {n: LARS(o) if config.optim_config.optim == 'lars' else o for n, o in self.optims.items()} - def train(self, num_iters: int, loss_fn: Callable, logger: Loggers, device: torch.device): + def train(self, num_iters: int, loss_fn: Callable, logger: Loggers, device: int): model = self.models['model'] optim = self.optims['model_optim'] sched = self.scheds['model_optim_sched'] train_loader = iter(self.train_loader) + model.train() for iter_ in range(self.restore_iter, num_iters): input_, _ = next(train_loader) @@ -268,28 +269,37 @@ class SimCLRTrainer(Trainer): train_loss, train_accuracy = loss_fn(output) train_loss.backward() optim.step() - self.log(logger, self.BatchLogRecord( - iter_, num_iters, iter_, iter_, num_iters, - optim.param_groups[0]['lr'], - train_loss.item(), train_accuracy.item(), - eval_loss=None, eval_accuracy=None, - )) - if (iter_ + 1) % (num_iters // 100) == 0: - metrics = torch.Tensor(list(self.eval(loss_fn, device))).mean(0) - eval_loss = metrics[0].item() - eval_accuracy = metrics[1].item() - eval_log = self.BatchLogRecord( + + if logger is not None: + self.log(logger, self.BatchLogRecord( iter_, num_iters, iter_, iter_, num_iters, - lr=None, train_loss=None, train_accuracy=None, - eval_loss=eval_loss, eval_accuracy=eval_accuracy, - ) - self.log(logger, eval_log) - self.save_checkpoint(eval_log) + optim.param_groups[0]['lr'], + train_loss.item(), train_accuracy.item(), + eval_loss=None, eval_accuracy=None, + )) + dist.barrier() + + if (iter_ + 1) % (num_iters // 100) == 0: + # TODO Gather results from other workers + metrics = torch.Tensor(list(self.eval(loss_fn, device))) + if logger is not None: + metrics_mean = metrics.mean(0) + eval_loss = metrics_mean[0].item() + eval_accuracy = metrics_mean[1].item() + eval_log = self.BatchLogRecord( + iter_, num_iters, iter_, iter_, num_iters, + lr=None, train_loss=None, train_accuracy=None, + eval_loss=eval_loss, eval_accuracy=eval_accuracy, + ) + self.log(logger, eval_log) + self.save_checkpoint(eval_log) model.train() + dist.barrier() + if sched is not None: sched.step() - def eval(self, loss_fn: Callable, device: torch.device): + def eval(self, loss_fn: Callable, device: int): model = self.models['model'] model.eval() with torch.no_grad(): @@ -300,14 +310,13 @@ class SimCLRTrainer(Trainer): yield loss.item(), accuracy.item() -if __name__ == '__main__': +def main(local_rank, global_rank): args = parse_args_and_config() config = SimCLRConfig.from_args(args) - device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') trainer = SimCLRTrainer( seed=args.seed, checkpoint_dir=args.checkpoint_dir, - device=device, + device=local_rank, inf_mode=True, num_iters=args.num_iters, config=config, @@ -315,5 +324,11 @@ if __name__ == '__main__': out_dim=args.out_dim, ) - loggers = trainer.init_logger(args.log_dir) - trainer.train(args.num_iters, InfoNCELoss(args.temp), loggers, device) + loggers = None + if global_rank == 0: + loggers = trainer.init_logger(args.log_dir) + trainer.train(args.num_iters, InfoNCELoss(args.temp), loggers, local_rank) + + +if __name__ == '__main__': + elastic_launch(main) -- cgit v1.2.3