-
Notifications
You must be signed in to change notification settings - Fork 24
/
trainer.py
129 lines (115 loc) · 5.6 KB
/
trainer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import os
from time import time
from tqdm import tqdm
import torch
from torch.nn.utils.clip_grad import clip_grad_norm_
from recbole.trainer import Trainer
from recbole.utils import set_color, get_gpu_usage
class DDPPretrainTrainer(Trainer):
def __init__(self, config, model):
super(DDPPretrainTrainer, self).__init__(config, model)
self.pretrain_epochs = self.config['pretrain_epochs']
self.save_step = self.config['save_step']
self.rank = config['rank']
self.world_size = config['world_size']
self.lrank = self._build_distribute(rank=self.rank, world_size=self.world_size)
self.logger.info(f'Let\'s use {torch.cuda.device_count()} GPUs to train {self.config["model"]} ...')
def _build_distribute(self, rank, world_size):
from torch.nn.parallel import DistributedDataParallel
# credit to @Juyong Jiang
# 1 set backend
torch.distributed.init_process_group(backend='nccl', rank=rank, world_size=world_size)
# 2 get distributed id
local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
device_dis = torch.device("cuda", local_rank)
# 3, 4 assign model to be distributed
self.model.to(device_dis)
self.model = DistributedDataParallel(self.model,
device_ids=[local_rank],
output_device=local_rank).module
return local_rank
def save_pretrained_model(self, epoch, saved_model_file):
r"""Store the model parameters information and training information.
Args:
epoch (int): the current epoch id
saved_model_file (str): file name for saved pretrained model
"""
state = {
'config': self.config,
'epoch': epoch,
'state_dict': self.model.state_dict(),
'optimizer': self.optimizer.state_dict(),
}
torch.save(state, saved_model_file)
def _trans_dataload(self, interaction):
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
#using pytorch dataload to re-wrap dataset
def sub_trans(dataset):
dis_loader = DataLoader(dataset=dataset,
batch_size=dataset.shape[0],
sampler=DistributedSampler(dataset, shuffle=False))
for data in dis_loader:
batch_data = data
return batch_data
#change `interaction` datatype to a python `dict` object.
#for some methods, you may need transfer more data unit like the following way.
data_dict = {}
for k, v in interaction.interaction.items():
data_dict[k] = sub_trans(v)
return data_dict
def _train_epoch(self, train_data, epoch_idx, loss_func=None, show_progress=False):
self.model.train()
loss_func = loss_func or self.model.calculate_loss
total_loss = None
iter_data = (
tqdm(
train_data,
total=len(train_data),
ncols=100,
desc=set_color(f"Train {epoch_idx:>5}", 'pink'),
) if show_progress else train_data
)
for batch_idx, interaction in enumerate(iter_data):
interaction = interaction.to(self.device)
interaction = self._trans_dataload(interaction)
self.optimizer.zero_grad()
losses = loss_func(interaction)
if isinstance(losses, tuple):
loss = sum(losses)
loss_tuple = tuple(per_loss.item() for per_loss in losses)
total_loss = loss_tuple if total_loss is None else tuple(map(sum, zip(total_loss, loss_tuple)))
else:
loss = losses
total_loss = losses.item() if total_loss is None else total_loss + losses.item()
self._check_nan(loss)
loss.backward()
if self.clip_grad_norm:
clip_grad_norm_(self.model.parameters(), **self.clip_grad_norm)
self.optimizer.step()
if self.gpu_available and show_progress:
iter_data.set_postfix_str(set_color('GPU RAM: ' + get_gpu_usage(self.device), 'yellow'))
return total_loss
def pretrain(self, train_data, verbose=True, show_progress=False):
for epoch_idx in range(self.start_epoch, self.pretrain_epochs):
# train
training_start_time = time()
train_loss = self._train_epoch(train_data, epoch_idx, show_progress=show_progress)
self.train_loss_dict[epoch_idx] = sum(train_loss) if isinstance(train_loss, tuple) else train_loss
training_end_time = time()
train_loss_output = \
self._generate_train_loss_output(epoch_idx, training_start_time, training_end_time, train_loss)
if verbose:
self.logger.info(train_loss_output)
self._add_train_loss_to_tensorboard(epoch_idx, train_loss)
if (epoch_idx + 1) % self.save_step == 0 and self.lrank == 0:
saved_model_file = os.path.join(
self.checkpoint_dir,
'{}-{}-{}.pth'.format(self.config['model'], self.config['dataset'], str(epoch_idx + 1))
)
self.save_pretrained_model(epoch_idx, saved_model_file)
update_output = set_color('Saving current', 'blue') + ': %s' % saved_model_file
if verbose:
self.logger.info(update_output)
return self.best_valid_score, self.best_valid_result