-
Notifications
You must be signed in to change notification settings - Fork 6
/
tutorial_tensordb_cv_mnist_master.py
executable file
·112 lines (93 loc) · 3.8 KB
/
tutorial_tensordb_cv_mnist_master.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#! /usr/bin/python
# -*- coding: utf8 -*-
import tensorlayer as tl
from tensorlayer.db import TensorDB
import shutil
from eAE.eAE import eAE
def create_mnist_dataset(db):
data, f_id = db.find_one_params(args={'type': 'mnist_dataset'})
# If cannot find MNIST dataset in TensorDB
if not data:
# Download and upload MNIST dataset to TensorDB
X_train, y_train, X_val, y_val, X_test, y_test = \
tl.files.load_mnist_dataset(shape=(-1, 28, 28, 1))
f_id = db.save_params(
[X_train, y_train, X_val, y_val, X_test, y_test],
args={'type': 'mnist_dataset'}
)
shutil.rmtree('./data/mnist')
def create_jobs(db, job_name, models_dict):
# job = db.find_one_job(args={'job_name': job_name})
# if not job:
# job_idx = 1
# for model, params_dict in models_dict.iteritems():
# n_jobs = len(params_dict.itervalues().next())
# for j in range(n_jobs):
# job_dict = {'model': model, 'job_name': job_name, 'job_id': job_idx}
# for k, v in params_dict.iteritems():
# job_dict.update({k: v[j]})
# db.save_job(args=job_dict)
# job_idx += 1
# else:
# print("You have already submitted this job.")
for model, params_dict in models_dict.iteritems():
n_jobs = len(params_dict.itervalues().next())
for j in range(n_jobs):
job_dict = {'model': model}
for k, v in params_dict.iteritems():
job_dict.update({k: v[j]})
db.save_job(args=job_dict)
def start_workers(db):
job_ids = []
for job in db.get_all_jobs():
job_ids.append(str(job['_id']))
# Check how many available workers
workers = ['node01', 'node02', 'node03', 'node04', 'node05']
def submit_job(node_name, job_id):
print('Assign job: {} to {}'.format(job_id, node_name))
worker(job_id)
# Submit jobs to all workers
submit_job(workers[0], job_ids[0])
submit_job(workers[2], job_ids[2])
submit_job(workers[4], job_ids[4])
def main():
# This is to initialize the connection to your MondonDB server
# Note: make sure your MongoDB is reachable before changing this line
db = TensorDB(ip='IP_ADDRESS_OR_YOUR_MONGODB', port=27017, db_name='DATABASE_NAME', user_name=None, password=None, studyID='ANY_ID (e.g., mnist)')
create_mnist_dataset(db=db)
create_jobs(db=db, job_name="cv_mnist", models_dict={
"cnn": {
"lr": [0.01, 0.001, 0.001],
"n_cnn_layers": [1, 2, 2],
"n_filters": [64, 128, 256],
"n_epochs": [10, 10, 10],
},
"mlp": {
"lr": [0.05, 0.0001],
"n_layers": [1, 2],
"n_epochs": [10, 10],
}
})
# Setting up the connection to interface
ip = "IP_ADDRESS_OF_EAE (e.g., interfaceeae.doc.ic.ac.uk)"
port = 443
eae = eAE(ip, port)
# Testing if the interface is Alive
is_alive = eae.is_eae_alive()
if is_alive != 200:
raise Exception("!!!")
# Get all jobs
jobs = db.get_all_jobs()
args = [str(j['_id']) for j in jobs]
# We submit a dummy job
parameters_set = "\n".join(args)
cluster = "NAME_OF_CLUSTER (e.g., gpu_dev)"
computation_type = "COMPUTATION_TYPE (e.g., GPU)"
main_file = "ABSOLUTE_PATH_TO_MAIN_FILE"
data_files = ['ABSOLUTE_PATH_TO_DIRECTORY_OR_FILES_TO_BE_COPIED_TO_RUN_THE_MAIN_FILE']
host_ip = "IP_ADDRESS_OF_HOST_MACHINE_RUNNING_THIS_SCRIPT"
ssh_port = "SSH_PORT_OF_HOST_MACHINE"
job = eae.submit_jobs(parameters_set, cluster, computation_type, main_file, data_files, host_ip, ssh_port)
print(job)
if __name__ == '__main__':
main()