Skip to content

Commit

Permalink
fix: database crash issue (#449)
Browse files Browse the repository at this point in the history
* chore: clean up settings.gradle file

* fix: move creation of DBInsertionHandlerThread instance in saveEvent()

As the crash is related to the queue, we attempted to remove the queue-related logic to prevent any hidden thread-related issues. Additionally, since we are now calling saveEvent from the executor in the repository class, we no longer need a separate executor for DBInsertionHandlerThread instance creation.

* test: fix failing test cases
  • Loading branch information
1abhishekpandey authored Jun 25, 2024
1 parent 9e5f4f0 commit 9251b68
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import androidx.annotation.Nullable;
import androidx.annotation.VisibleForTesting;

import com.rudderstack.android.ruddermetricsreporterandroid.RudderReporter;
import com.rudderstack.android.sdk.core.persistence.DefaultPersistenceProviderFactory;
import com.rudderstack.android.sdk.core.persistence.Persistence;
import com.rudderstack.android.sdk.core.persistence.PersistenceProvider;
Expand All @@ -26,11 +25,9 @@
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
Expand All @@ -43,7 +40,6 @@
class DBPersistentManager/* extends SQLiteOpenHelper*/ {

public static final String DBPERSISTENT_MANAGER_CHECK_FOR_MIGRATIONS_TAG = "DBPersistentManager: checkForMigrations: ";
public static final Object QUEUE_LOCK = new Object();
public static final ExecutorService executor = Executors.newSingleThreadExecutor();
static final String EVENT = "EVENT";

Expand Down Expand Up @@ -100,8 +96,7 @@ class DBPersistentManager/* extends SQLiteOpenHelper*/ {
//synchronizing database access
private static final Object DB_LOCK = new Object();
private static DBPersistentManager instance;
final Queue<Message> queue = new LinkedList<>();
DBInsertionHandlerThread dbInsertionHandlerThread;
private DBInsertionHandlerThread dbInsertionHandlerThread;
private Persistence persistence;

private DBPersistentManager(Application application,
Expand Down Expand Up @@ -190,18 +185,18 @@ private void createSchema(String eventSchemaSQL) {


/*
* Receives message from Repository, and passes it to the Handler thread if it exists, else adds it to a queue for replay
* once Handler thread is initialized.
* Receives message from Repository, and passes it to the Handler thread if it exists else creates a new Handler thread.
* This method should be called in a synchronized way.
* */
void saveEvent(String messageJson, EventInsertionCallback callback) {
Message msg = createOsMessageFromJson(messageJson, callback);
synchronized (DBPersistentManager.QUEUE_LOCK) {
if (dbInsertionHandlerThread == null) {
queue.add(msg);
return;
}
addMessageToHandlerThread(msg);
if (dbInsertionHandlerThread == null) {
// Need to perform db operations on a separate thread to support strict mode.
// saveEvent method is already called on an executor thread, so we can directly call DBInsertionHandlerThread
dbInsertionHandlerThread = new DBInsertionHandlerThread("db_insertion_thread", persistence);
dbInsertionHandlerThread.start();
}
dbInsertionHandlerThread.addMessage(msg);
}

private Message createOsMessageFromJson(String messageJson, EventInsertionCallback callback) {
Expand All @@ -213,13 +208,6 @@ private Message createOsMessageFromJson(String messageJson, EventInsertionCallba
return msg;
}

/*
Passes the input message to the Handler thread.
*/
void addMessageToHandlerThread(Message msg) {
dbInsertionHandlerThread.addMessage(msg);
}

@VisibleForTesting
void saveEventSync(String messageJson) {
ContentValues insertValues = new ContentValues();
Expand Down Expand Up @@ -456,32 +444,6 @@ private int getCountForCommand(String sql) {
return count;
}

/*
Starts the Handler thread, which is responsible for storing the messages in its internal queue, and
save them to the sqlite db sequentially.
*/
void startHandlerThread() {
Runnable runnable = () -> {
try {
synchronized (DBPersistentManager.QUEUE_LOCK) {
dbInsertionHandlerThread = new DBInsertionHandlerThread("db_insertion_thread", persistence);
dbInsertionHandlerThread.start();
for (Message msg : queue) {
addMessageToHandlerThread(msg);
}
}
} catch (SQLiteDatabaseCorruptException | ConcurrentModificationException |
NullPointerException ex) {
RudderLogger.logError(ex);
ReportManager.reportError(ex);

}
};
// Need to perform db operations on a separate thread to support strict mode.
executor.execute(runnable);
}


private boolean checkIfColumnExists(String newColumn) {
String checkIfStatusExistsSqlString = "PRAGMA table_info(events)";
if (!persistence.isAccessible()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ private void initializeDbManager(Application application) {
dbEncryption.getPersistenceProviderFactoryClassName(), dbEncryption.key);
this.dbManager = DBPersistentManager.getInstance(application, dbManagerParams);
dbManager.checkForMigrations();
dbManager.startHandlerThread();
}

private void initiatePreferenceManager(Application application) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,46 +6,29 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.reflect.Whitebox;
import org.robolectric.RobolectricTestRunner;
import org.robolectric.annotation.Config;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;

import static java.util.concurrent.TimeUnit.SECONDS;

import android.os.Build;
import android.os.Message;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasProperty;
import static java.lang.Thread.sleep;

import android.app.Application;

import androidx.test.core.app.ApplicationProvider;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import com.google.common.collect.ImmutableList;
import com.rudderstack.android.sdk.core.gson.RudderGson;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;

@RunWith(RobolectricTestRunner.class)
@Config(sdk = Build.VERSION_CODES.O_MR1)
Expand Down Expand Up @@ -84,8 +67,6 @@ public void setUp() throws Exception {
dbPersistentManager = PowerMockito.mock(DBPersistentManager.class);
PowerMockito.when(dbPersistentManager, "saveEventSync", anyString()).thenCallRealMethod();
PowerMockito.when(dbPersistentManager, "saveEvent", anyString(), any()).thenCallRealMethod();
PowerMockito.when(dbPersistentManager, "startHandlerThread").thenCallRealMethod();
Whitebox.setInternalState(dbPersistentManager, "queue", new LinkedList<Message>());
deviceModeManager = Mockito.mock(RudderDeviceModeManager.class);
}

Expand All @@ -98,57 +79,6 @@ public void tearDown() {

private int addMessageCalled = 0;

@Test
public void testSynchronicity() throws Exception {
final AtomicInteger messagesSaved = new AtomicInteger(0);
// Mocking the addMessageToQueue, which is used by both the save-event-thread and Handler thread, to verify synchronization
PowerMockito.when(dbPersistentManager, "addMessageToHandlerThread", any(Message.class))
.thenAnswer((Answer<Void>) invocation -> {
++addMessageCalled;
System.out.println("addMessageToQueue called by: " + Thread.currentThread().getName());
//assert if called by multiple thread
assertThat(addMessageCalled, Matchers.lessThan(2));
sleep(500);
--addMessageCalled;
assertThat(addMessageCalled, Matchers.lessThan(1));
System.out.println("return from addMessageToQueue by: " + Thread.currentThread().getName());
messagesSaved.incrementAndGet();
return null;
}
);

// Triggering the saveEvent method of DBPersistentManager from save-event-thread, as this method adds messages to the queue.
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < messages.size(); i++) {
dbPersistentManager.saveEvent(messages.get(i),
new EventInsertionCallback(new RudderMessageBuilder().build(),
deviceModeManager));
// Starting the Handler thread, only when some events are added to the queue, so that the replay happens, and handler
// thread starts reading from the queue.
if (i == messages.size() / 2) {
dbPersistentManager.startHandlerThread();
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}, "save-event-thread") {
}.start();


//await until finished
await().atMost(15, SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return messagesSaved.get() == messages.size();
}
});
}
@Test
public void doneEventsTest() {
final DBPersistentManager dbPersistentManager = DBPersistentManager.getInstance(ApplicationProvider
Expand Down Expand Up @@ -216,4 +146,4 @@ private List<RudderMessage> parse(List<String> messageJsons) {
}
return messages;
}
}
}
18 changes: 0 additions & 18 deletions settings.gradle
Original file line number Diff line number Diff line change
@@ -1,19 +1 @@
include ':sample-cdn', ':sample-kotlin', ':core', ':sample-segment-java', ':sample-kotlin-integration', ':dummy-impl', ':android-tv'
//include(':web')
//project(':web').projectDir = new File(rootDir, "../RudderAndroidLibs/web")
////
//include(':rudderjsonadapter')
//project(':rudderjsonadapter').projectDir = new File(rootDir, "../RudderAndroidLibs/rudderjsonadapter")
//include(':gsonrudderadapter')
//project(':gsonrudderadapter').projectDir = new File(rootDir, "../RudderAndroidLibs/gsonrudderadapter")
//include(':moshirudderadapter')
//project(':moshirudderadapter').projectDir = new File(rootDir, "../RudderAndroidLibs/moshirudderadapter")
//include(':jacksonrudderadapter')
//project(':jacksonrudderadapter').projectDir = new File(rootDir, "../RudderAndroidLibs/jacksonrudderadapter")
////
//include(':repository')
//project(':repository').projectDir = new File(rootDir, "../RudderAndroidLibs/repository")
////
//include(':rudderreporter')
//project(':rudderreporter').projectDir = new File(rootDir, "../RudderAndroidLibs/rudderreporter")

0 comments on commit 9251b68

Please sign in to comment.