-
Notifications
You must be signed in to change notification settings - Fork 215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Run tests through kafka #2005
base: master
Are you sure you want to change the base?
Run tests through kafka #2005
Conversation
3a99143
to
bcf7528
Compare
ObjectId testingRunResultSummaryId = new ObjectId(jsonObject.getString("testingRunResultSummaryId")); | ||
ApiInfo.ApiInfoKey apiInfoKey = ApiInfo.getApiInfoKeyFromString(jsonObject.getString("apiInfoKey")); | ||
String subcategory = jsonObject.getString("subcategory"); | ||
List<TestingRunResult.TestLog> testLogs = JSON.parseArray(jsonObject.getString("testLogs"), TestingRunResult.TestLog.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check how testlogs are used and stored
import com.akto.dto.ApiInfo.ApiInfoKey; | ||
import com.akto.dto.testing.TestingRunResult; | ||
|
||
public class TestMessages { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SingleTestPayload
@@ -23,6 +23,13 @@ private Constants() {} | |||
public static final String AKTO_NODE_ID = "x-akto-node"; | |||
public static final String AKTO_REMOVE_AUTH= "x-akto-remove-auth"; | |||
|
|||
public static final String LOCAL_KAFKA_BROKER_URL = "localhost:29092"; // run kafka process with name kafka1 in docker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't hardcode kafka related variables
@@ -0,0 +1,99 @@ | |||
package com.akto.testing.testing_with_kafka; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package name is really weird, rename
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename classes to producer.java, consumer.java
import com.akto.dto.testing.TestingRunConfig; | ||
import com.akto.store.TestingUtil; | ||
|
||
public class CommonSingletonForTesting { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename this
} | ||
} | ||
|
||
private boolean isKafkaEmpty(){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this intentional?
return null; | ||
} | ||
|
||
private static void deleteAllMessagesFromTopic(String bootstrapServers, String topicName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete the topic itself, new message push wll create a new one
bcf7528
to
f6a8ff4
Compare
|
||
public static final Kafka producer = new Kafka(Constants.LOCAL_KAFKA_BROKER_URL, 500, 1000); | ||
|
||
public static Void pushMessagesToKafka(List<TestMessages> messages){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
void
|
||
String testingRunId = currentTestInfo.getString("testingRunId"); | ||
String testingRunSummaryId = currentTestInfo.getString("summaryId"); | ||
TestingRun testingRun = TestingRunDao.instance.findOne(Filters.eq(Constants.ID, new ObjectId(testingRunId))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
review this and add try catch, if something panics, whole process will exit
return null; | ||
} | ||
|
||
private Void doJobForTest(int accountId, String testSubCategory, ApiInfo.ApiInfoKey apiInfoKey, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which job? rename
executor.awaitTermination(maxRunTimeInSeconds, TimeUnit.SECONDS); | ||
break; | ||
} | ||
Thread.sleep(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100 seconds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100ms
try { | ||
future.get(4, TimeUnit.MINUTES); | ||
} catch (InterruptedException e) { | ||
logger.error("Task timed out: " + message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a way to know which message timed out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolve comments
No description provided.