aboutsummaryrefslogtreecommitdiff
path: root/libs/criteria.py
diff options
context:
space:
mode:
authorJordan Gong <jordan.gong@protonmail.com>2022-08-13 23:38:43 +0800
committerJordan Gong <jordan.gong@protonmail.com>2022-08-13 23:38:43 +0800
commit957a2a46e7725184776c3c72860e8215164cc4ef (patch)
tree43e098595db4ee332bca5f6caecfbd02369debbe /libs/criteria.py
parent1b8f01ce9706905c36c6f11ed9deac8548ad7341 (diff)
Implement distributed data parallel via torch elastic launcher
Diffstat (limited to 'libs/criteria.py')
-rw-r--r--libs/criteria.py19
1 files changed, 16 insertions, 3 deletions
diff --git a/libs/criteria.py b/libs/criteria.py
index baa36ce..7d367c1 100644
--- a/libs/criteria.py
+++ b/libs/criteria.py
@@ -1,4 +1,6 @@
import torch
+import torch.distributed as dist
+import torch.distributed.rpc as rpc
from torch import nn, Tensor
from torch.nn import functional as F
@@ -8,10 +10,21 @@ class InfoNCELoss(nn.Module):
super().__init__()
self.temp = temp
+ @staticmethod
+ def _norm_and_stack(feat: Tensor) -> Tensor:
+ local_feat_norm = F.normalize(feat)
+ local_feat_norm_stack = torch.stack(local_feat_norm.chunk(2))
+
+ return local_feat_norm_stack
+
def forward(self, feature: Tensor) -> tuple[Tensor, Tensor]:
- bz = feature.size(0) // 2
- feat_norm = F.normalize(feature)
- feat1_norm, feat2_norm = feat_norm.split(bz)
+ feat_norm = torch.cat([
+ rpc.rpc_sync(f"worker{i}", self._norm_and_stack, (feature,))
+ for i in range(dist.get_world_size())
+ ], dim=1)
+ bz = feat_norm.size(1)
+
+ feat1_norm, feat2_norm = feat_norm[0], feat_norm[1]
logits = feat1_norm @ feat2_norm.T
pos_logits_mask = torch.eye(bz, dtype=torch.bool)
pos_logits = logits[pos_logits_mask].unsqueeze(-1)