diff options
author | Jordan Gong <jordan.gong@protonmail.com> | 2022-08-13 23:38:43 +0800 |
---|---|---|
committer | Jordan Gong <jordan.gong@protonmail.com> | 2022-08-13 23:38:43 +0800 |
commit | 957a2a46e7725184776c3c72860e8215164cc4ef (patch) | |
tree | 43e098595db4ee332bca5f6caecfbd02369debbe /libs/criteria.py | |
parent | 1b8f01ce9706905c36c6f11ed9deac8548ad7341 (diff) |
Implement distributed data parallel via torch elastic launcher
Diffstat (limited to 'libs/criteria.py')
-rw-r--r-- | libs/criteria.py | 19 |
1 files changed, 16 insertions, 3 deletions
diff --git a/libs/criteria.py b/libs/criteria.py index baa36ce..7d367c1 100644 --- a/libs/criteria.py +++ b/libs/criteria.py @@ -1,4 +1,6 @@ import torch +import torch.distributed as dist +import torch.distributed.rpc as rpc from torch import nn, Tensor from torch.nn import functional as F @@ -8,10 +10,21 @@ class InfoNCELoss(nn.Module): super().__init__() self.temp = temp + @staticmethod + def _norm_and_stack(feat: Tensor) -> Tensor: + local_feat_norm = F.normalize(feat) + local_feat_norm_stack = torch.stack(local_feat_norm.chunk(2)) + + return local_feat_norm_stack + def forward(self, feature: Tensor) -> tuple[Tensor, Tensor]: - bz = feature.size(0) // 2 - feat_norm = F.normalize(feature) - feat1_norm, feat2_norm = feat_norm.split(bz) + feat_norm = torch.cat([ + rpc.rpc_sync(f"worker{i}", self._norm_and_stack, (feature,)) + for i in range(dist.get_world_size()) + ], dim=1) + bz = feat_norm.size(1) + + feat1_norm, feat2_norm = feat_norm[0], feat_norm[1] logits = feat1_norm @ feat2_norm.T pos_logits_mask = torch.eye(bz, dtype=torch.bool) pos_logits = logits[pos_logits_mask].unsqueeze(-1) |