Skip to content

Commit

Permalink
Merge branch 'main' into NP-47308-fix-role-name
Browse files Browse the repository at this point in the history
# Conflicts:
#	template.yaml
  • Loading branch information
monaullah committed Oct 8, 2024
2 parents 0164efe + d363d54 commit c61bdcb
Show file tree
Hide file tree
Showing 108 changed files with 1,187 additions and 2,653 deletions.
82 changes: 13 additions & 69 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,78 +1,22 @@
# NVA Data Report API

This repository contains the NVA data report API.
This repository contains functions for generating csv reports of data from NVA.
See [reportTypes](documentation/reportTypes.md) for a list of reports and data types.

## How to run a bulk upload
## Architectural overview

The steps below can be outlined briefly as:
![Architecture](documentation/images/data_export_overview.png)

- Pre-run
- Stop incoming live-update events
- Delete data from previous runs
- Delete all data in database
- Bulk upload
- Generate batches of document keys for upload
- Transform the data to a format compatible with the bulk-upload action
- Initiate bulk upload
- Verify data integrity
- Post-run
- Start incoming live-update events
## Integration overview

### Pre-run steps
The s3 bucket `data-report-csv-export-{accountName}` (defined in template) is
set up as a data source in Databricks (in another AWS account) following
databricks [guide _Create a storage credential for connecting to AWS S3_](https://docs.databricks.com/en/connect/unity-catalog/storage-credentials.html#create-a-storage-credential-for-connecting-to-aws-s3).
This is how the data platform accesses files from
`data-report-csv-export-{accountName}`:

1. Remove all objects from S3 bucket `loader-input-files-{accountName}`
2. Turn off S3 event notifications for bucket `persisted-resources-{accountName}`
In aws console, go
to
<br>_S3_ -> _persisted-resources-{accountName}_ -> _Properties_ -> _Amazon EventBridge_ ->
_Edit_ -> _Off_
3. Press `ResetDatabaseButton` (Trigger `DatabaseResetHandler`). This might take around a minute to
complete.
4. Verify that database is empty. You can use SageMaker notebook to query the database*. Example
sparql queries:
```
SELECT (COUNT(DISTINCT ?g) as ?gCount) WHERE {GRAPH ?g {?s ?p ?o}}
```
or
```
SELECT ?g ?s ?p ?o WHERE {GRAPH ?g {?s ?p ?o}} LIMIT 100
```
![Databricks integration](documentation/images/data_report_aws_databricks_storage_credential.png)

### Bulk upload steps
## How-to guides

1. Generate key batches for both locations: `resources` and `nvi-candidates`. Manually trigger
`GenerateKeyBatchesHandler` with the following input:
```json
{
"detail": {
"location": "resources|nvi-candidates"
}
}
```
2. Verify that `GenerateKeyBatchesHandler` is done processing (i.e. check logs) and that key batches
have been generated S3 bucket `data-report-key-batches-{accountName}`
3. Trigger `BulkTransformerHandler`
4. Verify that `BulkTransformerHandler` is done processing (i.e. check logs) and that nquads
have been generated S3 bucket `loader-input-files-{accountName}`
5. Trigger `BulkDataLoader`
6. To check progress for bulk upload to Neptune. Trigger `BulkDataLoader` with the following input:
```json
{
"loadId": "{copy loadId UUID from test log}"
}
```
7. Verify that expected count is in database. Query for counting distinct named graphs:
```
SELECT (COUNT(DISTINCT ?g) as ?gCount) WHERE {GRAPH ?g {?s ?p ?o}}
```
### Post-run steps
1. Turn on S3 event notifications for bucket `persisted-resources-{accountName}`.
In aws console, go
to
<br> _S3_ -> _persisted-resources-{accountName}_ -> _Properties_ -> _Amazon EventBridge_ ->
_Edit_ -> _On_
*Note: You can use SageMaker notebook to query the database. Notebook can be opened from the AWS
console through _SageMaker_ -> _Notebooks_ -> _Notebook instances_ -> _Open JupyterLab_
- [Run bulk export](documentation/bulkExport.md)
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ dependencies {
testImplementation libs.bundles.testing
}

sourceCompatibility = JavaVersion.VERSION_21 // source-code version and must be <= targetCompatibility
targetCompatibility = JavaVersion.VERSION_21 // bytecode target version
java {
sourceCompatibility = JavaVersion.VERSION_21
// source-code version and must be <= targetCompatibility
targetCompatibility = JavaVersion.VERSION_21 // bytecode target version
}

tasks.named('test') {
useJUnitPlatform()
Expand All @@ -23,7 +26,7 @@ tasks.named('test') {
}

pmd {
toolVersion = '6.20.0'
toolVersion = '7.5.0'
ruleSetConfig = rootProject.resources.text.fromFile('config/pmd/ruleset.xml')
ruleSets = []
ignoreFailures = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ protected void persist(List<ContentWithLocation> transformedData) {
transformedData.forEach(this::persist);
}

private void persist(ContentWithLocation transformation) {
var request = buildRequest(transformation.location());
s3OutputClient.putObject(request, RequestBody.fromString(transformation.content()));
}

private static Model createModelAndLoadInput(Stream<JsonNode> jsonNodeStream) {
var model = ModelFactory.createDefaultModel();
jsonNodeStream.forEach(jsonNode -> RDFDataMgr.read(model, stringToStream(jsonNode.toString()), Lang.JSONLD));
Expand All @@ -86,11 +91,6 @@ private List<ContentWithLocation> transformResources(Model model) {
.toList();
}

private void persist(ContentWithLocation transformation) {
var request = buildRequest(transformation.location());
s3OutputClient.putObject(request, RequestBody.fromString(transformation.content()));
}

private ContentWithLocation transform(Model model, ReportType reportType) {
var resultSet = new ModelQueryService().query(model, reportType);
var formatted = new CsvFormatter().format(resultSet);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package no.sikt.nva.data.report.api.etl.transformer;
package no.sikt.nva.data.report.api.export;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.nonNull;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package no.sikt.nva.data.report.api.etl.transformer;
package no.sikt.nva.data.report.api.export;

import static java.util.Objects.nonNull;
import com.fasterxml.jackson.annotation.JsonCreator;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package no.sikt.nva.data.report.api.export;

import static no.sikt.nva.data.report.testing.utils.ResultSorter.sortResponse;
import static no.sikt.nva.data.report.testing.utils.generator.PublicationHeaders.CONTRIBUTOR_IDENTIFIER;
import static no.sikt.nva.data.report.testing.utils.generator.PublicationHeaders.PUBLICATION_ID;
import static no.sikt.nva.data.report.testing.utils.model.ResultType.CSV;
import static no.unit.nva.commons.json.JsonUtils.dtoObjectMapper;
import static no.unit.nva.testutils.RandomDataGenerator.randomString;
import static nva.commons.core.attempt.Try.attempt;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand All @@ -32,11 +27,11 @@
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import no.sikt.nva.data.report.testing.utils.generator.TestData;
import no.sikt.nva.data.report.testing.utils.generator.TestData.DatePair;
import no.sikt.nva.data.report.testing.utils.generator.nvi.TestNviCandidate;
import no.sikt.nva.data.report.testing.utils.generator.SampleData;
import no.sikt.nva.data.report.testing.utils.generator.SampleData.DatePair;
import no.sikt.nva.data.report.testing.utils.generator.nvi.SampleNviCandidate;
import no.sikt.nva.data.report.testing.utils.generator.publication.PublicationDate;
import no.sikt.nva.data.report.testing.utils.generator.publication.TestPublication;
import no.sikt.nva.data.report.testing.utils.generator.publication.SamplePublication;
import no.sikt.nva.data.report.testing.utils.model.EventConsumptionAttributes;
import no.sikt.nva.data.report.testing.utils.model.IndexDocument;
import no.sikt.nva.data.report.testing.utils.model.NviIndexDocument;
Expand Down Expand Up @@ -93,22 +88,21 @@ void setUp() {
@ParameterizedTest
@EnumSource(names = {"AFFILIATION", "CONTRIBUTOR", "FUNDING", "IDENTIFIER", "PUBLICATION"})
void shouldWriteCsvFileToS3ForAllReportTypes(ReportType reportType) throws IOException {
var testData = new TestData(generateDatePairs(1));
var testData = new SampleData(generateDatePairs(1));
var batch = setupExistingBatch(testData, reportType);
var location = PERSISTED_RESOURCES_PUBLICATIONS;
var batchKey = UnixPath.of(location).addChild(randomString());
s3KeyBatches3Driver.insertFile(batchKey, batch);
handler.handleRequest(eventStream(null, location), outputStream, mock(Context.class));
var actualContent = attempt(() -> sortResponse(CSV, getActualPersistedFile(reportType), PUBLICATION_ID,
CONTRIBUTOR_IDENTIFIER)).orElseThrow();
var expectedContent = getExpectedResponseData(reportType, testData);
assertEquals(expectedContent, actualContent);
var expected = getExpectedResponseData(reportType, testData);
var actual = getActualPersistedFile(reportType);
assertEqualsInAnyOrder(expected, actual);
}

@ParameterizedTest
@EnumSource(names = {"AFFILIATION", "CONTRIBUTOR", "FUNDING", "IDENTIFIER", "PUBLICATION"})
void shouldWriteCsvFilesForAllReportTypesToSpecificFolderInExportBucket(ReportType reportType) throws IOException {
var testData = new TestData(generateDatePairs(1));
var testData = new SampleData(generateDatePairs(1));
var batch = setupExistingBatch(testData, reportType);
var location = PERSISTED_RESOURCES_PUBLICATIONS;
var batchKey = UnixPath.of(location).addChild(randomString());
Expand All @@ -122,21 +116,20 @@ void shouldWriteCsvFilesForAllReportTypesToSpecificFolderInExportBucket(ReportTy
@Test
void shouldWriteCsvFileToS3ForReportTypeNvi() throws IOException {
var reportType = ReportType.NVI;
var testData = new TestData(generateDatePairs(2));
var testData = new SampleData(generateDatePairs(2));
var batch = setupExistingBatch(testData, reportType);
var location = PERSISTED_RESOURCES_NVI_CANDIDATES;
var batchKey = UnixPath.of(location).addChild(randomString());
s3KeyBatches3Driver.insertFile(batchKey, batch);
handler.handleRequest(eventStream(null, location), outputStream, mock(Context.class));
var actualContent = attempt(() -> sortResponse(CSV, getActualPersistedFile(reportType), PUBLICATION_ID,
CONTRIBUTOR_IDENTIFIER)).orElseThrow();
var expectedContent = getExpectedResponseData(reportType, testData);
assertEquals(expectedContent, actualContent);
var expected = getExpectedResponseData(reportType, testData);
var actual = getActualPersistedFile(reportType);
assertEqualsInAnyOrder(expected, actual);
}

@Test
void shouldWriteFilesWithCsvFileExtension() throws IOException {
var batch = setupExistingBatch(new TestData(generateDatePairs(1)), ReportType.PUBLICATION);
var batch = setupExistingBatch(new SampleData(generateDatePairs(1)), ReportType.PUBLICATION);
var location = PERSISTED_RESOURCES_PUBLICATIONS;
s3KeyBatches3Driver.insertFile(UnixPath.of(location).addChild(randomString()), batch);
handler.handleRequest(eventStream(null, location), outputStream, mock(Context.class));
Expand All @@ -146,7 +139,7 @@ void shouldWriteFilesWithCsvFileExtension() throws IOException {

@Test
void shouldWriteFilesWithContentTypeAndEncoding() throws IOException {
var batch = setupExistingBatch(new TestData(generateDatePairs(1)), ReportType.PUBLICATION);
var batch = setupExistingBatch(new SampleData(generateDatePairs(1)), ReportType.PUBLICATION);
var location = PERSISTED_RESOURCES_PUBLICATIONS;
s3KeyBatches3Driver.insertFile(UnixPath.of(location).addChild(randomString()), batch);
var mockedS3OutputClient = mock(S3Client.class);
Expand All @@ -162,7 +155,7 @@ void shouldWriteFilesWithContentTypeAndEncoding() throws IOException {

@Test
void shouldEncodeCsvFileInUtf8() throws IOException {
var testData = new TestData(generateDatePairs(1));
var testData = new SampleData(generateDatePairs(1));
var batch = setupExistingBatch(testData, ReportType.PUBLICATION);
var reportType = ReportType.PUBLICATION;
var location = PERSISTED_RESOURCES_PUBLICATIONS;
Expand All @@ -176,7 +169,7 @@ void shouldEncodeCsvFileInUtf8() throws IOException {

@Test
void shouldNotEmitNewEventWhenNoMoreBatchesToRetrieve() throws IOException {
var testData = new TestData(generateDatePairs(2));
var testData = new SampleData(generateDatePairs(2));
var batch = setupExistingBatch(testData, ReportType.PUBLICATION);
var location = PERSISTED_RESOURCES_PUBLICATIONS;
var batchKey = UnixPath.of(location).addChild(randomString());
Expand All @@ -188,7 +181,7 @@ void shouldNotEmitNewEventWhenNoMoreBatchesToRetrieve() throws IOException {

@Test
void shouldEmitNewEventWhenThereAreMoreBatchesToProcess() throws IOException {
var testData = new TestData(generateDatePairs(2));
var testData = new SampleData(generateDatePairs(2));
var batch = setupExistingBatch(testData, ReportType.PUBLICATION);
var location = PERSISTED_RESOURCES_PUBLICATIONS;
var batchKey = UnixPath.of(location).addChild(randomString());
Expand All @@ -210,7 +203,7 @@ void shouldEmitNewEventWhenThereAreMoreBatchesToProcess() throws IOException {

@Test
void shouldNotFailWhenBlobNotFound() throws IOException {
var testData = new TestData(generateDatePairs(2));
var testData = new SampleData(generateDatePairs(2));
var indexDocuments = createAndPersistIndexDocuments(testData, ReportType.PUBLICATION);
removeOneResourceFromPersistedResourcesBucket(indexDocuments);
var batch = indexDocuments.stream()
Expand Down Expand Up @@ -250,7 +243,7 @@ List<DatePair> generateDatePairs(int numberOfDatePairs) {
.toList();
}

private static String getExpectedResponseData(ReportType reportType, TestData test) {
private static String getExpectedResponseData(ReportType reportType, SampleData test) {
return switch (reportType) {
case AFFILIATION -> test.getAffiliationResponseData();
case CONTRIBUTOR -> test.getContributorResponseData();
Expand All @@ -261,27 +254,37 @@ private static String getExpectedResponseData(ReportType reportType, TestData te
};
}

private static IndexDocument toIndexDocument(TestPublication publication) {
private static IndexDocument toIndexDocument(SamplePublication publication) {
return new IndexDocument(randomConsumptionAttribute(), PublicationIndexDocument.from(publication).asJsonNode());
}

private static IndexDocument toIndexDocument(TestNviCandidate nviCandidate) {
private static IndexDocument toIndexDocument(SampleNviCandidate nviCandidate) {
return new IndexDocument(randomConsumptionAttribute(), NviIndexDocument.from(nviCandidate).asJsonNode());
}

private void assertEqualsInAnyOrder(String expected, String actual) {
var expectedLines = expected.split(System.lineSeparator());
var actualLines = actual.split(System.lineSeparator());
assertEquals(expectedLines.length, actualLines.length);
var expectedList = List.of(expectedLines);
var actualList = List.of(actualLines);
assertTrue(expectedList.containsAll(actualList));
assertTrue(actualList.containsAll(expectedList));
}

private UnixPath getFirstFilePath(ReportType reportType) {
return s3OutputDriver.listAllFiles(UnixPath.of(reportType.getType())).getFirst();
}

private void setUpValidTestData(String location) throws IOException {
var testData = new TestData(generateDatePairs(2));
var testData = new SampleData(generateDatePairs(2));
var batch = setupExistingBatch(testData, ReportType.PUBLICATION);
var batchKey = UnixPath.of(location).addChild(randomString());
s3KeyBatches3Driver.insertFile(batchKey, batch);
}

private String setupExistingBatch(TestData testData, ReportType type) {
var indexDocuments = createAndPersistIndexDocuments(testData, type);
private String setupExistingBatch(SampleData sampleData, ReportType type) {
var indexDocuments = createAndPersistIndexDocuments(sampleData, type);
return indexDocuments.stream()
.map(IndexDocument::getIdentifier)
.collect(Collectors.joining(System.lineSeparator()));
Expand All @@ -292,22 +295,22 @@ private void removeOneResourceFromPersistedResourcesBucket(List<IndexDocument> e
s3ResourcesDriver.deleteFile(UnixPath.of(document.getIdentifier()));
}

private List<IndexDocument> createAndPersistIndexDocuments(TestData testData, ReportType type) {
private List<IndexDocument> createAndPersistIndexDocuments(SampleData sampleData, ReportType type) {
var indexDocuments = ReportType.NVI.equals(type)
? createAndPersistNviData(testData)
: createAndPersistPublications(testData);
? createAndPersistNviData(sampleData)
: createAndPersistPublications(sampleData);
indexDocuments.forEach(document -> document.persistInS3(s3ResourcesDriver));
return indexDocuments;
}

private List<IndexDocument> createAndPersistPublications(TestData testData) {
return testData.getPublicationTestData().stream()
private List<IndexDocument> createAndPersistPublications(SampleData sampleData) {
return sampleData.getPublicationTestData().stream()
.map(CsvTransformerTest::toIndexDocument)
.toList();
}

private List<IndexDocument> createAndPersistNviData(TestData testData) {
return testData.getNviTestData().stream()
private List<IndexDocument> createAndPersistNviData(SampleData sampleData) {
return sampleData.getNviTestData().stream()
.map(CsvTransformerTest::toIndexDocument)
.toList();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package no.sikt.nva.data.report.api.etl.transformer;
package no.sikt.nva.data.report.api.export;

import static no.unit.nva.testutils.RandomDataGenerator.randomString;
import static nva.commons.core.attempt.Try.attempt;
Expand All @@ -21,6 +21,7 @@
import no.unit.nva.s3.S3Driver;
import no.unit.nva.stubs.FakeS3Client;
import nva.commons.core.SingletonCollector;
import nva.commons.core.attempt.Try;
import nva.commons.core.ioutils.IoUtils;
import nva.commons.core.paths.UnixPath;
import nva.commons.core.paths.UriWrapper;
Expand Down Expand Up @@ -174,7 +175,7 @@ private KeyBatchRequestEvent saveContainedEvent(PutEventsRequest putEventsReques
PutEventsRequestEntry eventEntry = putEventsRequest.entries()
.stream()
.collect(SingletonCollector.collect());
return attempt(eventEntry::detail).map(
return Try.attempt(eventEntry::detail).map(
jsonString -> objectMapperWithEmpty.readValue(jsonString, KeyBatchRequestEvent.class)).orElseThrow();
}
}
Expand Down
Loading

0 comments on commit c61bdcb

Please sign in to comment.