Skip to content

Commit

Permalink
Patch to provide custom executor for long/blocking tasks
Browse files Browse the repository at this point in the history
Patch to remove blocking on S3AccessTool
This close #2339
This close #2324
  • Loading branch information
gvagenas committed Jul 11, 2017
1 parent 8742e3d commit 2bb19ef
Show file tree
Hide file tree
Showing 13 changed files with 263 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.restcomm.connect.mscontrol.api.MediaServerInfo;
import org.restcomm.connect.mscontrol.jsr309.Jsr309ControllerFactory;
import org.restcomm.connect.mscontrol.mms.MmsControllerFactory;
import scala.concurrent.ExecutionContext;

import javax.media.mscontrol.MsControlException;
import javax.media.mscontrol.MsControlFactory;
Expand Down Expand Up @@ -56,6 +57,7 @@ public final class Bootstrapper extends SipServlet implements SipServletListener
private static final Logger logger = Logger.getLogger(Bootstrapper.class);

private ActorSystem system;
private ExecutionContext ec;

public Bootstrapper() {
super();
Expand Down Expand Up @@ -212,10 +214,10 @@ private String home(final ServletContext context) {
}
}

private DaoManager storage(final Configuration configuration, Configuration daoManagerConfiguration, final ClassLoader loader) throws ObjectInstantiationException {
private DaoManager storage(final Configuration configuration, Configuration daoManagerConfiguration, final ClassLoader loader, final ExecutionContext ec) throws ObjectInstantiationException {
final String classpath = daoManagerConfiguration.getString("dao-manager[@class]");
final DaoManager daoManager = (DaoManager) new ObjectFactory(loader).getObjectInstance(classpath);
daoManager.configure(configuration, daoManagerConfiguration );
daoManager.configure(configuration, daoManagerConfiguration, ec );
daoManager.start();
return daoManager;
}
Expand Down Expand Up @@ -286,10 +288,11 @@ public void servletInitialized(SipServletContextEvent event) {
system = ActorSystem.create("RestComm", settings, loader);
// Share the actor system with other servlets.
context.setAttribute(ActorSystem.class.getName(), system);
ec = system.dispatchers().lookup("restcomm-blocking-dispatcher");
// Create the storage system.
DaoManager storage = null;
try {
storage = storage(xml, daoManagerConf, loader);
storage = storage(xml, daoManagerConf, loader, ec);
} catch (final ObjectInstantiationException exception) {
logger.error("ObjectInstantiationException during initialization: ", exception);
}
Expand Down
41 changes: 24 additions & 17 deletions restcomm/restcomm.application/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
akka {
# Event handlers to register at boot time (Logging$DefaultLogger logs to STDOUT)
event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
# Event handlers to register at boot time (Logging$DefaultLogger logs to STDOUT)
event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]

# Log level used by the configured loggers (see "event-handlers") as soon
# as they have been started; before that, see "stdout-loglevel"
# Options: OFF, ERROR, WARNING, INFO, DEBUG
loglevel = "INFO"
# Log level used by the configured loggers (see "event-handlers") as soon
# as they have been started; before that, see "stdout-loglevel"
# Options: OFF, ERROR, WARNING, INFO, DEBUG
loglevel = "INFO"

# Log level for the very basic logger activated during AkkaApplication startup
# Options: OFF, ERROR, WARNING, INFO, DEBUG
stdout-loglevel = "INFO"
# Log level for the very basic logger activated during AkkaApplication startup
# Options: OFF, ERROR, WARNING, INFO, DEBUG
stdout-loglevel = "INFO"

# Log the complete configuration at INFO level when the actor system is started.
# This is useful when you are uncertain of what configuration is used.
log-config-on-start = off
# Log the complete configuration at INFO level when the actor system is started.
# This is useful when you are uncertain of what configuration is used.
log-config-on-start = off

# Configuration reference http://doc.akka.io/docs/akka/2.1.4/general/configuration.html
# Configuration reference http://doc.akka.io/docs/akka/2.1.4/general/configuration.html

actor {

Expand All @@ -28,7 +28,14 @@ log-config-on-start = off
# Default value 20s is too high
# creation-timeout = 20s
creation-timeout = 10s

}
}

}
}

restcomm-blocking-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 32
}
throughput = 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
package org.restcomm.connect.commons;

import org.apache.commons.configuration.Configuration;
import scala.concurrent.ExecutionContext;

/**
* @author quintana.thomas@gmail.com (Thomas Quintana)
*/
public interface Configurable {
void configure(Configuration configuration, Configuration daoManagerConfiguration);
void configure(Configuration configuration, Configuration daoManagerConfiguration, ExecutionContext ec);
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public AmazonS3 getS3client() {
return s3client;
}

public URI uploadFile(final String fileToUpload) {
public boolean uploadFile(final String fileToUpload) {
if (s3client == null) {
s3client = getS3client();
}
Expand All @@ -117,7 +117,7 @@ public URI uploadFile(final String fileToUpload) {
}
File file = new File(fileUri);

while (!FileUtils.waitFor(file, 30)){}
// while (!FileUtils.waitFor(file, 30)){}
if (file.exists()) {
PutObjectRequest putRequest = new PutObjectRequest(bucket.toString(), file.getName(), file);
ObjectMetadata metadata = new ObjectMetadata();
Expand All @@ -130,12 +130,15 @@ public URI uploadFile(final String fileToUpload) {
if (removeOriginalFile) {
removeLocalFile(file);
}
URI recordingS3Uri = s3client.getUrl(bucket.toString(), file.getName()).toURI();
return recordingS3Uri;
// return downloadUrl.toURI();

if (logger.isInfoEnabled()) {
String msg = String.format("File %s uploaded to S3 successfully");
logger.info(msg);
}
return true;
} else {
logger.error("Timeout waiting for the recording file: "+file.getAbsolutePath());
return null;
return false;
}
} catch (AmazonServiceException ase) {
logger.error("Caught an AmazonServiceException");
Expand All @@ -144,18 +147,37 @@ public URI uploadFile(final String fileToUpload) {
logger.error("AWS Error Code: " + ase.getErrorCode());
logger.error("Error Type: " + ase.getErrorType());
logger.error("Request ID: " + ase.getRequestId());
return null;
return false;
} catch (AmazonClientException ace) {
logger.error("Caught an AmazonClientException ");
logger.error("Error Message: " + ace.getMessage());
return null;
} catch (URISyntaxException e) {
logger.error("URISyntaxException: "+e.getMessage());
return null;
return false;
} catch (IOException e) {
logger.error("Problem while trying to touch recording file for testing", e);
return null;
return false;
}
}

public URI getS3Uri(final String fileToUpload) {
if (s3client == null) {
s3client = getS3client();
}
StringBuffer bucket = new StringBuffer();
bucket.append(bucketName);
if (folder != null && !folder.isEmpty())
bucket.append("/").append(folder);
URI fileUri = URI.create(fileToUpload);
File file = new File(fileUri);
URI recordingS3Uri = null;
try {
recordingS3Uri = s3client.getUrl(bucketName, file.getName()).toURI();
} catch (URISyntaxException e) {
logger.error("Problem during creation of S3 URI");
}
if (logger.isInfoEnabled()) {
logger.info("Created S3Uri for file: " + fileUri.toString());
}
return recordingS3Uri;
}

public URI getPublicUrl (String fileName) throws URISyntaxException {
Expand Down Expand Up @@ -190,4 +212,4 @@ private void removeLocalFile(final File file) {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.List;

import org.restcomm.connect.commons.amazonS3.S3AccessTool;
import org.restcomm.connect.dao.entities.Recording;
import org.restcomm.connect.commons.dao.Sid;
import org.restcomm.connect.dao.entities.RecordingFilter;
Expand Down Expand Up @@ -49,4 +50,6 @@ public interface RecordingsDao {
List<Recording> getRecordings(RecordingFilter filter);

void updateRecording(Recording recording);

S3AccessTool getS3AccessTool();
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.restcomm.connect.dao.TranscriptionsDao;
import org.restcomm.connect.dao.UsageDao;
import org.restcomm.connect.dao.GeolocationDao;
import scala.concurrent.ExecutionContext;

/**
* @author quintana.thomas@gmail.com (Thomas Quintana)
Expand Down Expand Up @@ -87,15 +88,18 @@ public final class MybatisDaoManager implements DaoManager {
private ExtensionsConfigurationDao extensionsConfigurationDao;
private GeolocationDao geolocationDao;

private ExecutionContext ec;

public MybatisDaoManager() {
super();
}

@Override
public void configure(final Configuration configuration, Configuration daoManagerConfiguration) {
public void configure(final Configuration configuration, Configuration daoManagerConfiguration, final ExecutionContext ec) {
this.configuration = daoManagerConfiguration.subset("dao-manager");
this.amazonS3Configuration = configuration.subset("amazon-s3");
this.runtimeConfiguration = configuration.subset("runtime-settings");
this.ec = ec;
}

@Override
Expand Down Expand Up @@ -272,7 +276,7 @@ public void start(final SqlSessionFactory sessions) {
presenceRecordsDao = new MybatisRegistrationsDao(sessions);
if (s3AccessTool != null) {
final String recordingPath = runtimeConfiguration.getString("recordings-path");
recordingsDao = new MybatisRecordingsDao(sessions, s3AccessTool, recordingPath);
recordingsDao = new MybatisRecordingsDao(sessions, s3AccessTool, recordingPath, ec);
} else {
recordingsDao = new MybatisRecordingsDao(sessions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package org.restcomm.connect.dao.mybatis;

import akka.dispatch.Futures;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.joda.time.DateTime;
Expand All @@ -30,13 +31,16 @@
import org.restcomm.connect.dao.RecordingsDao;
import org.restcomm.connect.dao.entities.Recording;
import org.restcomm.connect.dao.entities.RecordingFilter;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

/**
* @author quintana.thomas@gmail.com (Thomas Quintana)
Expand All @@ -47,26 +51,41 @@ public final class MybatisRecordingsDao implements RecordingsDao {
private final SqlSessionFactory sessions;
private S3AccessTool s3AccessTool;
private String recordingPath;
private ExecutionContext ec;

public MybatisRecordingsDao(final SqlSessionFactory sessions) {
super();
this.sessions = sessions;
}

public MybatisRecordingsDao(final SqlSessionFactory sessions, final S3AccessTool s3AccessTool, final String recordingPath) {
public MybatisRecordingsDao(final SqlSessionFactory sessions, final S3AccessTool s3AccessTool, final String recordingPath, final ExecutionContext ec) {
super();
this.sessions = sessions;
this.s3AccessTool = s3AccessTool;
this.recordingPath = recordingPath;
this.ec = ec;
}

@Override
public S3AccessTool getS3AccessTool () {
return s3AccessTool;
}

@Override
public void addRecording(Recording recording) {
if (s3AccessTool != null) {
URI s3Uri = s3AccessTool.uploadFile(recordingPath+"/"+recording.getSid().toString()+".wav");
if (s3AccessTool != null && ec != null) {
final String recordingSid = recording.getSid().toString();
URI s3Uri = s3AccessTool.getS3Uri(recordingPath+"/"+recordingSid+".wav");
//s3AccessTool.uploadFile(recordingPath+"/"+recording.getSid().toString()+".wav");
if (s3Uri != null) {
recording = recording.setS3Uri(s3Uri);
}
Future<Boolean> f = Futures.future(new Callable<Boolean>() {
@Override
public Boolean call () throws Exception {
return s3AccessTool.uploadFile(recordingPath+"/"+recordingSid+".wav");
}
}, ec);
}
String fileUrl = String.format("/restcomm/%s/Accounts/%s/Recordings/%s",recording.getApiVersion(),recording.getAccountSid(),recording.getSid());
recording = recording.updateFileUri(generateLocalFileUri(fileUrl));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package org.restcomm.connect.dao;

import org.apache.commons.configuration.Configuration;
import scala.concurrent.ExecutionContext;

/**
* DaoManager mock class to be used for unit-testing endpoints. Add further Daos if needed.
Expand Down Expand Up @@ -151,7 +152,7 @@ public GeolocationDao getGeolocationDao() {
}

@Override
public void configure(Configuration configuration, Configuration daoManagerConfiguration) {
public void configure(Configuration configuration, Configuration daoManagerConfiguration, ExecutionContext ec) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1758,6 +1758,8 @@ public FinishRecording(final ActorRef source) {
@SuppressWarnings("unchecked")
@Override
public void execute(final Object message) throws Exception {
boolean amazonS3Enabled = configuration.subset("amazon-s3").getBoolean("enabled");

final Class<?> klass = message.getClass();
if (CallStateChanged.class.equals(klass)) {
final CallStateChanged event = (CallStateChanged) message;
Expand Down Expand Up @@ -1788,6 +1790,10 @@ public void execute(final Object message) throws Exception {
logger.debug("File already exists, length: "+ (new File(recordingUri).length()));
}

if (logger.isDebugEnabled()) {
logger.debug("Recording duration: "+duration);
}

final Recording.Builder builder = Recording.builder();
builder.setSid(recordingSid);
builder.setAccountSid(accountId);
Expand Down Expand Up @@ -1907,7 +1913,6 @@ public void execute(final Object message) throws Exception {
}
}
final List<NameValuePair> parameters = parameters();
boolean amazonS3Enabled = configuration.subset("amazon-s3").getBoolean("enabled");
if (amazonS3Enabled) {
//If Amazon S3 is enabled the Recordings DAO uploaded the wav file to S3 and changed the URI
parameters.add(new BasicNameValuePair("RecordingUrl", recording.getFileUri().toURL().toString()));
Expand All @@ -1932,20 +1937,13 @@ public void execute(final Object message) throws Exception {
logger.info("About to execute Record action to: "+uri);
}
downloader.tell(request, self());
// A little clean up.
recordingSid = null;
recordingUri = null;
return;
} else if (CallStateChanged.class.equals(klass)) {
parameters.add(new BasicNameValuePair("Digits", "hangup"));
request = new HttpRequestDescriptor(uri, method, parameters);
if (logger.isInfoEnabled()) {
logger.info("About to execute Record action to: "+uri);
}
downloader.tell(request, self());
// A little clean up.
recordingSid = null;
recordingUri = null;
}
// final StopInterpreter stop = new StopInterpreter();
// source.tell(stop, source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public UntypedActor create() throws Exception {
protected void startDaoManager() throws ConfigurationException, MalformedURLException{
daoManagerConf = (XMLConfiguration)createDaoManagerCfg(CONFIG_PATH_DAO_MANAGER);
daoManager = new MybatisDaoManager();
daoManager.configure(configurationNode1, daoManagerConf );
daoManager.configure(configurationNode1, daoManagerConf, null);
daoManager.start();
}

Expand Down
Loading

0 comments on commit 2bb19ef

Please sign in to comment.