Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dataloader issure when use parallel #1187

Open
linxy-1992 opened this issue Aug 20, 2024 · 3 comments
Open

dataloader issure when use parallel #1187

linxy-1992 opened this issue Aug 20, 2024 · 3 comments

Comments

@linxy-1992
Copy link

code:
ministdsta<-minist_dataset(xarray2,label=dfminist2[,1])
ministdlta<-dataloader(ministdsta,batch_size=200,shuffle=T,
num_workers = 2,pin_memory=T,
worker_globals=list(xarray2,dfminist2,ministdsta)
)
result:
Warning messages:
1: Datasets used with parallel dataloader (num_workers > 0) shouldn't have fields containing tensors as they can't be correctly passed to the wroker subprocesses.

  • A field named 'y' exists.
    2: Datasets used with parallel dataloader (num_workers > 0) shouldn't have fields containing tensors as they can't be correctly passed to the wroker subprocesses.
  • A field named 'x' exists.

and then when run in coorp.
code:
torch_manual_seed(1)

for(epoch in 1:100) {
coro::loop(for(b in ministdlta) {

b1<-b[[1]]
b2<-b[[2]]

})
print(epoch)}

result:
Error in self$.pop_task():
! Error when getting dataset item.
Caused by error:
! in callr subprocess.
Caused by error:
! 找不到对象'.socket_con'
Run rlang::last_trace() to see where the error occurred.

rlang::last_trace()
<error/runtime_error>
Error in self$.pop_task():
! Error when getting dataset item.
Caused by error:
! in callr subprocess.
Caused by error:
! 找不到对象'.socket_con'


Backtrace:
x

  1. +-coro::loop(...)
  2. | -rlang::eval_bare(loop, env)
  3. -coro (local) <fn>()
  4. +-coro::is_exhausted(elt <<- iterator())
  5. +-elt <<- iterator()
  6. | -rlang::env_poke(env, lhs, value, inherit = TRUE, create = FALSE)
  7. -torch (local) iterator()
  8. \-torch::dataloader_next(iter, coro::exhausted())
    
  9.   \-iter$.next()
    
  10.     \-self$.next_data()
    
  11.       \-self$.pop_task()
    

Run rlang::last_trace(drop = FALSE) to see 1 hidden frame.

@linxy-1992
Copy link
Author

linxy-1992 commented Aug 22, 2024

I'm sorry, but I can already see why the dataloader parellel I wrote myself why it reported an error.
Here's an example of what works, in which I'll explain how to write the dataloader parallel correctly.

library(torch)
dfminist2<-read.csv("mnist_train.csv",header=F)
dfminist2[,1]<-dfminist2[,1]+1
dfminist2<-as.matrix(dfminist2)

minist_dataset<-dataset(
initialize=function(xarray,label){
#It is important to note that the data passed into initialize needs to be in the form of an array, matrix or vector.
#And you can't convert the attributes of the data to a tensor in the initialise section。
#if you do, you will get an unknown error when using dataloader parallel.
self$x<-xarray
self$y<-label
},.getitem=function(index){
#Extraction and transformation of data in the getitem section.
x_re<-torch_tensor(self$x[index,])$reshape(c(1,28,28))
y_re<-torch_tensor(self$y[index],dtype=torch_long())
list(x=x_re,y=y_re)
},.length=function(){length(self$y)})

ministdsta<-minist_dataset(dfminist2[,-1],dfminist2[,1])
ministdlta<-dataloader(ministdsta,batch_size=100,shuffle=T,
num_workers =2,#The number of lines needed is set here,
#but of course if the data conversion job is simple, as it is in my example,
#there is no need for multiple lines. Otherwise it will be much slower than a single thread.

#The following is if you use another data, function, or package in your dataset section.
#Then you need to broadcast these variables as names in multiple threads of R.
#As needed in the parrallel package.
#worker_init_fn = c("myfun1","myfun2"),
#worker_globals = c("mydadtaname1","mydataname2"),
#worker_packages = c("torchvison","otherpackagesname")
)

#In the training process, you need to pay attention to the following, the dataset and dataloader part should be written in the for_epoch loop.
for(epoch in 1:100){

#Execute dataset , dataloader and optimizer here
ministdsta<-mnist_dataset_tr(trx,dftr[,1]+1)
ministdlta<-dataloader(ministdsta,batch_size=100,shuffle=T,num_workers=6)
optimizer<-optim_adamw(model$parameters,lr=0.001)

model$train()
coro::loop(for(b in ministdlta){
optimizer$zero_grad()
output<-model(b[[1]]$to(device = device))
loss<-nnf_multilabel_soft_margin_loss(output,b[[2]]$to(device = device))
loss$backward()
optimizer$step()
})

#After each epoch, you need to delete loss,output,b,optimizer,dataset,dataloader
#And manually free cpu memory and gpu memory
rm(list=c("loss","output","b","optimizer","ministdsta","ministdlta"))
gc()
cuda_empty_cache()

#In this way, you can use dataloader parallel normally, especially if you use gpu for neural network accelerated training。
#you can greatly increase the gpu usage and shorten the dataload time.

@linxy-1992
Copy link
Author

#The above dataloader parallel in the large batch_size is still index error, after practice, although the underlying code can not be corrected, but through the following code to a large extent to prevent the occurrence of such an error occurs.

#code
minist_dataset<-dataset(
initialize=function(xarray,label){
#It is important to note that the data passed into initialize needs to be in the form of an array, matrix or vector.
#And you can't convert the attributes of the data to a tensor in the initialise section。
#if you do, you will get an unknown error when using dataloader parallel.
self$x<-xarray
self$y<-label
},.getitem=function(index){
#Set the dataset function internally to be single-threaded; note that this setting does not affect the multi-threaded operations of the torch in the main process.Using a single thread reduces the probability of index errors on dataset.
torch_set_num_threads(1)
#Extraction and transformation of data in the getitem section.
x_re<-torch_tensor(self$x[index,])$reshape(c(1,28,28))
y_re<-torch_tensor(self$y[index],dtype=torch_long())
list(x=x_re,y=y_re)
},.length=function(){length(self$y)})

ministdsta<-minist_dataset(dfminist2[,-1],dfminist2[,1])
ministdlta<-dataloader(ministdsta,batch_size=100,shuffle=T,
num_workers =2,#The number of lines needed is set here,
#but of course if the data conversion job is simple, as it is in my example,
#there is no need for multiple lines. Otherwise it will be much slower than a single thread.

#The following is if you use another data, function, or package in your dataset section.
#Then you need to broadcast these variables as names in multiple threads of R.
#As needed in the parrallel package.
#worker_init_fn = c("myfun1","myfun2"),
#worker_globals = c("mydadtaname1","mydataname2"),
#worker_packages = c("torchvison","otherpackagesname")
)

for(epoch in 1:100){
#Separating the multi-threading of the dataloader from the training of the neural network means that the seeming batch_data is pulled out of the dataloader before formal training takes place.
mydstr<-my_dataset_tr(trx,dftr[,1]+1)
mydltr<-dataloader(mydstr,batch_size=100,shuffle=T,num_workers=6)
repeat{#The key is to use a repeat loop and use tryCatch to filter for possible dataloader errors.
batch_list<-tryCatch({coro::collect(mydltr)},error=function(e){})
#If there is no error, abort the loop; if there is an error at dataloader, repeat dataloader.
if(!is.null(batch_list)){break}}
optimizer<-optim_adamw(model$parameters,lr=0.001)
model$train()
train_losses<-c()
pre<-c()
true<-c()
for(b in batch_list){#In the formal training process, the loop is performed directly for batch_list, not for dataloader.
optimizer$zero_grad()
output<-model(b[[1]]$to(device = device))
loss<-nnf_multilabel_soft_margin_loss(output,b[[2]]$to(device = device))
loss$backward()
optimizer$step()
train_losses<-c(train_losses,loss$item())#
pred<-torch_max(output$to(device="cpu"),dim=2)[[2]]#
pre<-c(pre,as.numeric(pred))
true<-c(true,as.numeric(torch_max(b[[2]],dim=2)[[2]]))
}
train_auc<-mean(true==pre)
rm(list=c("loss","output","b","optimizer","mydstr","mydltr","batch_list"))
gc()
cuda_empty_cache()
}

#This way, the dataloader parallel can be used normally

@linxy-1992
Copy link
Author

Final solution for dataloader parallel to report errors during neural network training. The previous problem solving method is only suitable for small datasets, where data can be loaded into memory in one go, this method is not suitable for large datasets, especially when the data is based on images.
The solution is to not use the coro::loop() and coro::collect() functions, to customise a data loading and training process, and to use the tryCatch function to skip possible errors during the process, so as not to affect the normal training of the neural network.

######################dataset##########################
my_dataset_tr<-dataset(
initialize=function(data_df){
self$x<-data_df$filepaths
self$y<-data_df$class.id
},.getitem=function(index){
torch_set_num_threads(1)
x1<-image_read(self$x[index])%>%transform_to_tensor()
x2<-transform_normalize(x1,mean = c(0.485, 0.456, 0.406), std = c(0.229, 0.224, 0.225))
x1_y<-rep(0,200)
x1_y[self$y[index]]<-1
y<-torch_tensor(x1_y,dtype=torch_float())
list(x=x2,y=y)
},.length=function(){length(self$y)})

mydstr<-my_dataset_tr(d_tr)
mydltr<-dataloader(mydstr,batch_size=400,shuffle=T,
num_workers=4,worker_packages=c("torchvision","magick"))

##################model#####################
device <- if (cuda_is_available()){torch_device("cuda:0")}else{"cpu"}
model<-torch_load("m_resnet50.pth")
model$fc<-nn_linear(2048,200)
model<-model$to(device = device)

###################train##################
mycosinelr<-function(lrmin,lrmax,epochnum){
lrmin+0.5*(lrmax-lrmin)*(1+cos(seq(0,1,length=epochnum)*pi))}
lr_ratio<-c(mycosinelr(1e-7,1e-3,30),mycosinelr(1e-7,5e-4,30))
plot(lr_ratio,type="l")

n_epochs<-length(lr_ratio)#
for(epoch in 1:n_epochs){

#Train
optimizer<-optim_adamw(model$parameters,lr=lr_ratio[epoch])#
model$train()#
repeat{biter<-tryCatch({mydltr$.iter()},error=function(e){})#Generate a data iterator
if(!is.null(biter)){break}}
for(i in 1:length(mydltr)){
tryCatch({#Using the tryCatch function to skip possible errors
b<-biter$.next()#Generate a batch data from the iterator
optimizer$zero_grad()
output<-model(b[[1]]$to(device = device))
loss<-nnf_multilabel_soft_margin_loss(output,b[[2]]$to(device = device))
loss$backward()
optimizer$step()
},error=function(e){})
}
rm(list=c("loss","output","b","optimizer","biter"))#
gc()#
cuda_empty_cache()#

#Evaluate
#code of evalute

}

In this way, the problems that arise during the training of the network can be solved.

  1. aborted training due to errors in the neural network.
  2. the problem of slow loading of data can be greatly reduced
  3. Among them, the problem of slow loading of image data is mainly due to the fact that image_read is more time-consuming when reading images, and num_workers can be adjusted to a larger value.
  4. In addition in the picture reading, may lead to hard drive and cpu between the IO problem and blocking, the solution is to put the picture in the high-speed solid state hard drive.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant