Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add nonzero in ops_infer_shape_in_runtime #69027

Merged
merged 8 commits into from
Oct 31, 2024
1 change: 1 addition & 0 deletions paddle/phi/api/generator/dist_api_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@
"linear_interp",
"nearest_interp",
"trilinear_interp",
"nonzero",
]


Expand Down
146 changes: 144 additions & 2 deletions test/auto_parallel/pir/semi_auto_parallel_simple_net_ep.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,26 @@ def __init__(self):
self.expert_mesh_list.append(dist.ProcessMesh([1]))


class Config_shared:
def __init__(self):
self.batch_num = 5
self.batch_size = 4
self.input_size = 32
self.hidden_size = 16
self.shared_hidden_size = 32
self.class_num = 10
self.run_ep = False
self.mesh = dist.ProcessMesh([0, 1], dim_names=["x"])
self.num_devices = 2
self.num_experts = 4
self.expert_mesh_list = []
for i in range(self.num_devices):
for j in range(self.num_experts // self.num_devices):
self.expert_mesh_list.append(
dist.ProcessMesh([i], dim_names=["x"])
)


class RandomDataset(paddle.io.Dataset):
def __init__(self, images, labels, num_samples, return_dict=False):
self.images = images
Expand All @@ -58,10 +78,13 @@ def __len__(self):


class MLP(nn.Layer):
def __init__(self, config):
def __init__(self, config, is_shared=False):
super().__init__()
self.config = config
self.input_size = config.input_size
if is_shared:
self.input_size = config.input_size * config.num_experts
else:
self.input_size = config.input_size
self.hidden_size = config.hidden_size
self.class_num = config.class_num
self.down_proj = nn.Linear(
Expand Down Expand Up @@ -120,6 +143,86 @@ def forward(self, x):
return out


class DemoSharedLayer(nn.Layer):
def __init__(self, config):
super().__init__()
self.config = config
self.gate = nn.Linear(
config.input_size, config.hidden_size, bias_attr=False
)
self.gate.weight = dist.shard_tensor(
self.gate.weight, config.mesh, [dist.Replicate()]
)
self.shared_gate = nn.Linear(
config.input_size, config.hidden_size, bias_attr=False
)
self.shared_gate.weight = dist.shard_tensor(
self.shared_gate.weight, config.mesh, [dist.Replicate()]
)
self.shared_expert = MLP(config, is_shared=True)
self.experts = nn.LayerList()
for i in range(self.config.num_experts):
self.experts.append(MLP(config))
if config.run_ep:
self.shared_expert.redistribute_expert(
self.config.mesh, [dist.Replicate()]
)
for i, expert in enumerate(self.experts):
expert.redistribute_expert(
config.expert_mesh_list[i], [dist.Replicate()]
)

def forward(self, x):
h = self.gate(x)
y = self.shared_gate(x)

if self.config.run_ep:
local_val_list = dist.auto_parallel.api.moe_sub_mesh_tensors(
h, self.config.mesh, 0, [dist.Shard(0)]
)
else:
local_val_list = paddle.split(
h, num_or_sections=self.config.num_experts, axis=0
)
expert_out_list = []
if self.config.run_ep:
for i in range(self.config.num_devices):
device_input = paddle.split(
local_val_list[i],
num_or_sections=self.config.num_experts
// self.config.num_devices,
axis=0,
)
device_out = []
for j in range(
self.config.num_experts // self.config.num_devices
):
local_val = device_input[j]
device_out.append(
self.experts[
i
* self.config.num_experts
// self.config.num_devices
+ j
](local_val)
)
expert_out_list.append(paddle.stack(device_out, axis=0))
else:
for i, expert in enumerate(self.experts):
local_val = local_val_list[i]
expert_out_list.append(expert(local_val))
z = self.shared_expert(y)
if self.config.run_ep:
out = dist.auto_parallel.api.moe_global_mesh_tensor(
expert_out_list, self.config.mesh, [dist.Shard(0)], 0
)
else:
out = paddle.stack(expert_out_list, axis=0)
out = out.reshape((-1, self.config.class_num))
out = paddle.squeeze(out)
return out + z


class Criterion(nn.Layer):
def __init__(self):
super().__init__()
Expand Down Expand Up @@ -176,6 +279,13 @@ def build(self, config):
criterion = Criterion()
return model, dataloader, criterion, optimizer

def build_shared(self, config):
model = DemoSharedLayer(config)
dataloader = self.create_data_loader(config)
optimizer = self.create_optimizer(model)
criterion = Criterion()
return model, dataloader, criterion, optimizer

def train(self, config, model, train_dataloader, criterion, optimizer):
tr_loss = float(0)
global_step = 0
Expand Down Expand Up @@ -244,6 +354,37 @@ def run_dy2st(self):

return np.array(loss_list)

def run_shared_ep(self):
self.set_seed(self._seed)
config = Config_shared()
config.run_ep = True
model, train_dataloader, criterion, optimizer = self.build_shared(
config
)

dist_dataloader = dist.shard_dataloader(
train_dataloader, config.mesh, shard_dims="x"
)
loss = self.train(config, model, dist_dataloader, criterion, optimizer)

return loss

def run_shared_replicate(self):
self.set_seed(self._seed)
config = Config_shared()
config.run_ep = False
model, train_dataloader, criterion, optimizer = self.build_shared(
config
)

loss = self.train(config, model, train_dataloader, criterion, optimizer)
return loss

def test_ep_shared_demo_net(self):
ep_loss = self.run_shared_ep()
replicate_loss = self.run_shared_replicate()
np.testing.assert_allclose(ep_loss, replicate_loss, rtol=1e-6)

def test_ep_demo_net(self):
replicate_loss = self.run_replicate()
ep_loss = self.run_ep()
Expand All @@ -266,6 +407,7 @@ def test_ep_demo_net(self):

def run_test_case(self):
self.test_ep_demo_net()
self.test_ep_shared_demo_net()


if __name__ == "__main__":
Expand Down