Skip to content

Commit

Permalink
add Prob sampler (#129)
Browse files Browse the repository at this point in the history
* add version info

* add weighted sampler: support 4 prob style

* add ut for sampler

* remove unused print

* remove unused import

* import reused functions from tf version

* add dependecies from tf version

* modify requirements

* run testmodel

* update url

* recover ut

* depend on release version

* fix uts in test_fitting_net

* merge prob_sys_size with prob_sys_size;0:nsys:1.0

---------

Co-authored-by: Duo <50307526+iProzd@users.noreply.github.com>
  • Loading branch information
CaRoLZhangxy and iProzd authored Oct 31, 2023
1 parent 0db0fdd commit 6c37765
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 27 deletions.
7 changes: 6 additions & 1 deletion deepmd_pt/entrypoints/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,14 @@ def freeze(FLAGS):
# TODO: _extra_files
})


#avoid logger conflicts of tf version
def clean_loggers():
logger = logging.getLogger()
while logger.hasHandlers():
logger.removeHandler(logger.handlers[0])
@record
def main(args=None):
clean_loggers()
logging.basicConfig(
level=logging.WARNING if env.LOCAL_RANK else logging.INFO,
format=f"%(asctime)-15s {os.environ.get('RANK') or ''} [%(filename)s:%(lineno)d] %(levelname)s %(message)s"
Expand Down
5 changes: 2 additions & 3 deletions deepmd_pt/model/task/ener.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, ntypes, embedding_width, neuron, bias_atom_e, resnet_dt=True,

filter_layers = []
for type_i in range(self.ntypes):
bias_type = 0.0 if self.use_tebd else bias_atom_e[type_i]
bias_type = 0.0
one = ResidualDeep(type_i, embedding_width, neuron, bias_type, resnet_dt=resnet_dt)
filter_layers.append(one)
self.filter_layers = torch.nn.ModuleList(filter_layers)
Expand Down Expand Up @@ -69,8 +69,7 @@ def forward(self,
for type_i, filter_layer in enumerate(self.filter_layers):
mask = atype == type_i
atom_energy = filter_layer(inputs)
if not env.ENERGY_BIAS_TRAINABLE:
atom_energy = atom_energy + self.bias_atom_e[type_i]
atom_energy = atom_energy + self.bias_atom_e[type_i]
atom_energy = atom_energy * mask.unsqueeze(-1)
outs = outs + atom_energy # Shape is [nframes, natoms[0], 1]
return outs.to(env.GLOBAL_PT_FLOAT_PRECISION), None
Expand Down
20 changes: 17 additions & 3 deletions deepmd_pt/train/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from deepmd_pt.loss import EnergyStdLoss, DenoiseLoss
from deepmd_pt.model.model import get_model
from deepmd_pt.train.wrapper import ModelWrapper
from deepmd_pt.utils.dataloader import BufferedIterator
from deepmd_pt.utils.dataloader import BufferedIterator, get_weighted_sampler
from pathlib import Path
from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm
Expand Down Expand Up @@ -99,9 +99,23 @@ def get_opt_param(params):
return opt_type, opt_param

def get_data_loader(_training_data, _validation_data, _training_params):
if 'auto_prob' in _training_params['training_data']:
train_sampler = get_weighted_sampler(_training_data, _training_params['training_data']['auto_prob'])
elif 'sys_probs' in _training_params['training_data']:
train_sampler = get_weighted_sampler(_training_data, _training_params['training_data']['sys_probs'],sys_prob=True)
else:
train_sampler = get_weighted_sampler(_training_data, 'prob_sys_size')


if 'auto_prob' in _training_params['validation_data']:
valid_sampler = get_weighted_sampler(_validation_data, _training_params['validation_data']['auto_prob'])
elif 'sys_probs' in _training_params['validation_data']:
valid_sampler = get_weighted_sampler(_validation_data, _training_params['validation_data']['sys_probs'],sys_prob=True)
else:
valid_sampler = get_weighted_sampler(_validation_data, 'prob_sys_size')
training_dataloader = DataLoader(
_training_data,
sampler=torch.utils.data.RandomSampler(_training_data),
sampler=train_sampler,
batch_size=None,
num_workers=8, # setting to 0 diverges the behavior of its iterator; should be >=1
drop_last=False,
Expand All @@ -110,7 +124,7 @@ def get_data_loader(_training_data, _validation_data, _training_params):
training_data_buffered = BufferedIterator(iter(training_dataloader))
validation_dataloader = DataLoader(
_validation_data,
sampler=torch.utils.data.RandomSampler(_validation_data),
sampler=valid_sampler,
batch_size=None,
num_workers=1,
drop_last=False,
Expand Down
37 changes: 30 additions & 7 deletions deepmd_pt/utils/dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import queue
import time
import numpy as np
from threading import Thread
from typing import Callable, Dict, List, Tuple, Type, Union
from multiprocessing.dummy import Pool
Expand All @@ -11,11 +12,15 @@
import torch.distributed as dist
from deepmd_pt.utils import env
from deepmd_pt.utils.dataset import DeepmdDataSetForLoader
from torch.utils.data import DataLoader, Dataset
from torch.utils.data import DataLoader, Dataset, WeightedRandomSampler
from torch.utils.data.distributed import DistributedSampler
from tqdm import tqdm
import torch.multiprocessing

from deepmd.utils.data_system import (
prob_sys_size_ext,
process_sys_probs
)
torch.multiprocessing.set_sharing_strategy("file_system")


Expand Down Expand Up @@ -75,6 +80,7 @@ def construct_dataset(system):

self.sampler_list: List[DistributedSampler] = []
self.index = []
self.total_batch = 0

self.dataloaders = []
for system in self.systems:
Expand Down Expand Up @@ -105,9 +111,8 @@ def construct_dataset(system):
shuffle=(not dist.is_initialized()) and shuffle,
)
self.dataloaders.append(system_dataloader)
for _ in range(len(system_dataloader)):
self.index.append(len(self.dataloaders) - 1)

self.index.append(len(system_dataloader))
self.total_batch += len(system_dataloader)
# Initialize iterator instances for DataLoader
self.iters = []
for item in self.dataloaders:
Expand All @@ -124,11 +129,11 @@ def set_noise(self, noise_settings):
system.set_noise(noise_settings)

def __len__(self):
return len(self.index)
return len(self.dataloaders)

def __getitem__(self, idx):
# logging.warning(str(torch.distributed.get_rank())+" idx: "+str(idx)+" index: "+str(self.index[idx]))
return next(self.iters[self.index[idx]])
#logging.warning(str(torch.distributed.get_rank())+" idx: "+str(idx)+" index: "+str(self.index[idx]))
return next(self.iters[idx])


_sentinel = object()
Expand Down Expand Up @@ -257,3 +262,21 @@ def collate_batch(batch):
else:
result[key] = collate_tensor_fn([d[key] for d in batch])
return result

def get_weighted_sampler(training_data,prob_style,sys_prob=False):
if sys_prob == False:
if prob_style == "prob_uniform":
prob_v = 1.0 / float(training_data.__len__())
probs = [prob_v for ii in range(training_data.__len__())]
else:#prob_sys_size;A:B:p1;C:D:p2 or prob_sys_size = prob_sys_size;0:nsys:1.0
if prob_style == "prob_sys_size":
style = "prob_sys_size;0:{}:1.0".format(len(training_data))
else:
style = prob_style
probs = prob_sys_size_ext(style,len(training_data),training_data.index)
else:
probs = process_sys_probs(prob_style,training_data.index)
logging.info("Generated weighted sampler with prob array: "+str(probs))
#training_data.total_batch is the size of one epoch, you can increase it to avoid too many rebuilding of iteraters
sampler = WeightedRandomSampler(probs,training_data.total_batch, replacement = True)
return sampler
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies = [
'tqdm',
'h5py',
'wandb',
'deepmd-kit >= 2.2.7',
]
requires-python = ">=3.8"
readme = "README.md"
Expand Down
4 changes: 2 additions & 2 deletions tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
tensorflow==2.10.0
deepmd-kit==2.1.5
tensorflow>=2.14.0
deepmd-kit>=2.2.7
coverage
pytest
23 changes: 12 additions & 11 deletions tests/test_fitting_net.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ def gen_key(type_id, layer_id, w_or_b):
return (type_id, layer_id, w_or_b)


def base_fitting_net(dp_fn, embedding, natoms):
def base_fitting_net(dp_fn, embedding, natoms, atype):
g = tf.Graph()
with g.as_default():
t_embedding = tf.placeholder(GLOBAL_NP_FLOAT_PRECISION, [None, None])
t_natoms = tf.placeholder(tf.int32, [None])
t_energy = dp_fn.build(t_embedding, t_natoms, {})
t_atype = tf.placeholder(tf.int32, [None, None])
t_energy = dp_fn.build(t_embedding, t_natoms, {'atype': t_atype})
init_op = tf.global_variables_initializer()
t_vars = {}
for var in tf.global_variables():
Expand All @@ -55,7 +56,8 @@ def base_fitting_net(dp_fn, embedding, natoms):
sess.run(init_op)
energy, values = sess.run([t_energy, t_vars], feed_dict={
t_embedding: embedding,
t_natoms: natoms
t_natoms: natoms,
t_atype: atype,
})
return energy, values

Expand All @@ -69,14 +71,18 @@ def setUp(self):
self.embedding = np.random.uniform(size=[4, nloc * self.embedding_width])
self.ntypes = self.natoms.size - 2
self.n_neuron = [32, 32, 32]
self.atype = np.zeros([4, nloc], dtype=np.int32)
cnt = 0
for i in range(self.ntypes):
self.atype[:, cnt:cnt + self.natoms[i + 2]] = i
cnt += self.natoms[i + 2]

fake_d = FakeDescriptor(2, 30)
self.dp_fn = EnerFitting(fake_d, self.n_neuron)
self.dp_fn.bias_atom_e = np.random.uniform(size=[self.ntypes])
self.dp_fn.bias_atom_e = [1e8, 0]

def test_consistency(self):
dp_energy, values = base_fitting_net(self.dp_fn, self.embedding, self.natoms)
dp_energy, values = base_fitting_net(self.dp_fn, self.embedding, self.natoms, self.atype)
my_fn = EnergyFittingNet(self.ntypes, self.embedding_width, self.n_neuron, self.dp_fn.bias_atom_e, use_tebd=False)
for name, param in my_fn.named_parameters():
matched = re.match('filter_layers\.(\d).deep_layers\.(\d)\.([a-z]+)', name)
Expand All @@ -94,12 +100,7 @@ def test_consistency(self):
param.data.copy_(torch.from_numpy(var))
embedding = torch.from_numpy(self.embedding)
embedding = embedding.view(4, -1, self.embedding_width)
natoms = torch.from_numpy(self.natoms)
atype = torch.zeros(1, natoms[0], dtype=torch.long)
cnt = 0
for i in range(natoms.shape[0] - 2):
atype[:, cnt:cnt + natoms[i + 2]] = i
cnt += natoms[i + 2]
atype = torch.from_numpy(self.atype)
my_energy, _ = my_fn(embedding, atype)
my_energy = my_energy.detach()
self.assertTrue(np.allclose(dp_energy, my_energy.numpy().reshape([-1])))
Expand Down
77 changes: 77 additions & 0 deletions tests/test_sampler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import numpy as np
import os
import unittest
import json

from deepmd.utils.data_system import DeepmdDataSystem
from deepmd.utils import random as tf_random
from deepmd.common import expand_sys_str

from deepmd_pt.utils.dataloader import DpLoaderSet, get_weighted_sampler
from deepmd_pt.utils import env

CUR_DIR = os.path.dirname(__file__)



class TestSampler(unittest.TestCase):

def setUp(self):
with open(env.TEST_CONFIG, 'r') as fin:
content = fin.read()
config = json.loads(content)
model_config = config['model']
self.rcut = model_config['descriptor']['rcut']
self.rcut_smth = model_config['descriptor']['rcut_smth']
self.sel = model_config['descriptor']['sel']
self.batch_size = config['training']['training_data']['batch_size']
self.systems = config['training']['validation_data']['systems']
if isinstance(self.systems, str):
self.systems = expand_sys_str(self.systems)
self.my_dataset = DpLoaderSet(self.systems, self.batch_size,
model_params={
'descriptor': {
'sel': self.sel,
'rcut': self.rcut,
},
'type_map': model_config['type_map']
}, seed=10)

tf_random.seed(10)
self.dp_dataset = DeepmdDataSystem(self.systems, self.batch_size, 1, self.rcut)

def test_auto_prob_uniform(self):
auto_prob_style= 'prob_uniform'
sampler = get_weighted_sampler(self.my_dataset,prob_style=auto_prob_style)
my_probs = np.array(sampler.weights)
self.dp_dataset.set_sys_probs(auto_prob_style=auto_prob_style)
dp_probs = np.array(self.dp_dataset.sys_probs)
self.assertTrue(np.allclose(my_probs,dp_probs))


def test_auto_prob_sys_size(self):
auto_prob_style= 'prob_sys_size'
sampler = get_weighted_sampler(self.my_dataset,prob_style=auto_prob_style)
my_probs = np.array(sampler.weights)
self.dp_dataset.set_sys_probs(auto_prob_style=auto_prob_style)
dp_probs = np.array(self.dp_dataset.sys_probs)
self.assertTrue(np.allclose(my_probs,dp_probs))

def test_auto_prob_sys_size_ext(self):
auto_prob_style= 'prob_sys_size;0:1:0.2;1:3:0.8'
sampler = get_weighted_sampler(self.my_dataset,prob_style=auto_prob_style)
my_probs = np.array(sampler.weights)
self.dp_dataset.set_sys_probs(auto_prob_style=auto_prob_style)
dp_probs = np.array(self.dp_dataset.sys_probs)
self.assertTrue(np.allclose(my_probs,dp_probs))

def test_sys_probs(self):
sys_probs= [0.1,0.4,0.5]
sampler = get_weighted_sampler(self.my_dataset,prob_style=sys_probs,sys_prob=True)
my_probs = np.array(sampler.weights)
self.dp_dataset.set_sys_probs(sys_probs=sys_probs)
dp_probs = np.array(self.dp_dataset.sys_probs)
self.assertTrue(np.allclose(my_probs,dp_probs))

if __name__ == '__main__':
unittest.main()

0 comments on commit 6c37765

Please sign in to comment.