11/* Copyright (C) 2016 NooBaa */
22'use strict' ;
33
4- const AWS = require ( 'aws-sdk' ) ;
54const system_store = require ( '../system_services/system_store' ) . get_instance ( ) ;
65const replication_store = require ( '../system_services/replication_store' ) . instance ( ) ;
76const cloud_utils = require ( '../../util/cloud_utils' ) ;
8- const http_utils = require ( '../../util/http_utils' ) ;
97const pool_server = require ( '../system_services/pool_server' ) ;
108const dbg = require ( '../../util/debug_module' ) ( __filename ) ;
119const config = require ( '../../../config' ) ;
1210const moment = require ( 'moment' ) ;
1311const { LogsQueryClient, LogsQueryResultStatus } = require ( "@azure/monitor-query" ) ;
1412const { ClientSecretCredential } = require ( "@azure/identity" ) ;
13+ const noobaa_s3_client = require ( '../../sdk/noobaa_s3_client/noobaa_s3_client' ) ;
1514
1615/**
1716 * get_log_candidates will return an object which contains the log candidates
@@ -61,7 +60,8 @@ async function get_aws_log_candidates(source_bucket_id, rule_id, replication_con
6160 }
6261
6362 const next_log_data = await _aws_get_next_log ( s3 , logs_bucket , next_log_entry . Contents [ 0 ] . Key ) ;
64- aws_parse_log_object ( logs , next_log_data , sync_deletions , obj_prefix_filter ) ;
63+ const log_string = await next_log_data . Body . transformToString ( ) ;
64+ await aws_parse_log_object ( logs , log_string , sync_deletions , obj_prefix_filter ) ;
6565
6666 dbg . log1 ( "get_aws_log_candidates: parsed logs " , logs ) ;
6767
@@ -227,6 +227,13 @@ function create_candidates(logs) {
227227 return candidates ;
228228}
229229
230+ /**
231+ * _aws_get_next_log will get the log object
232+ * @param {S3 } s3
233+ * @param {string } logs_bucket
234+ * @param {string } logs_prefix
235+ * @param {* } continuation_token
236+ */
230237async function aws_get_next_log_entry ( s3 , logs_bucket , logs_prefix , continuation_token ) {
231238 let start_after = logs_prefix ;
232239 if ( start_after && ! start_after . endsWith ( '/' ) ) {
@@ -243,7 +250,7 @@ async function aws_get_next_log_entry(s3, logs_bucket, logs_prefix, continuation
243250
244251 try {
245252 dbg . log2 ( 'log_parser aws_get_next_log_entry: params:' , params ) ;
246- const res = await s3 . listObjectsV2 ( params ) . promise ( ) ;
253+ const res = await s3 . listObjectsV2 ( params ) ;
247254 dbg . log1 ( 'log_parser aws_get_next_log_entry: finished successfully ' , res ) ;
248255 return res ;
249256
@@ -255,7 +262,7 @@ async function aws_get_next_log_entry(s3, logs_bucket, logs_prefix, continuation
255262
256263/**
257264 * _aws_get_next_log will get the log object
258- * @param {AWS. S3 } s3
265+ * @param {S3 } s3
259266 * @param {string } bucket
260267 * @param {string } key
261268 */
@@ -266,7 +273,7 @@ async function _aws_get_next_log(s3, bucket, key) {
266273 Bucket : bucket ,
267274 Key : key ,
268275 ResponseContentType : 'json'
269- } ) . promise ( ) ;
276+ } ) ;
270277
271278 dbg . log1 ( 'log_parser _aws_get_next_log: finished successfully ' , res ) ;
272279 return res ;
@@ -280,12 +287,11 @@ async function _aws_get_next_log(s3, bucket, key) {
280287/**
281288 * aws_parse_log_object will parse the log object and will return an array of candidates
282289 * @param {nb.ReplicationLogs } logs - Log array
283- * @param {* } log_object - AWS log object
290+ * @param {String } log_string - AWS log object
284291 * @param {boolean } sync_deletions - Whether deletions should be synced or not
285292 * @param {string } obj_prefix_filter - Object prefix filter
286293 */
287- function aws_parse_log_object ( logs , log_object , sync_deletions , obj_prefix_filter ) {
288- const log_string = log_object . Body . toString ( ) ;
294+ async function aws_parse_log_object ( logs , log_string , sync_deletions , obj_prefix_filter ) {
289295 const log_array = log_string . split ( "\n" ) ;
290296
291297 for ( const line of log_array ) {
@@ -362,25 +368,20 @@ function _get_source_bucket_azure_connection(source_bucket_id) {
362368
363369function _get_source_bucket_aws_connection ( source_bucket_id , aws_log_replication_info ) {
364370 const source_bucket = system_store . data . get_by_id ( source_bucket_id ) ;
365- const logs_location = aws_log_replication_info . logs_location ;
366- const { logs_bucket } = logs_location ;
367371
368372 const s3_resource_connection_info =
369373 pool_server . get_namespace_resource_extended_info ( source_bucket . namespace . write_resource . resource ) ;
370-
371- const agent = s3_resource_connection_info . endpoint_type === 'AWS' ?
372- http_utils . get_default_agent ( s3_resource_connection_info . endpoint ) :
373- http_utils . get_unsecured_agent ( s3_resource_connection_info . endpoint ) ;
374-
375- const s3 = new AWS . S3 ( {
376- params : { Bucket : logs_bucket } ,
374+ const s3 = noobaa_s3_client . get_s3_client_v3_params ( {
377375 endpoint : s3_resource_connection_info . endpoint ,
378- accessKeyId : s3_resource_connection_info . access_key . unwrap ( ) ,
379- secretAccessKey : s3_resource_connection_info . secret_key . unwrap ( ) ,
376+ credentials : {
377+ accessKeyId : s3_resource_connection_info . access_key . unwrap ( ) ,
378+ secretAccessKey : s3_resource_connection_info . secret_key . unwrap ( ) ,
379+ } ,
380+ forcePathStyle : true ,
381+ region : config . DEFAULT_REGION ,
380382 signatureVersion : cloud_utils . get_s3_endpoint_signature_ver ( s3_resource_connection_info . endpoint ,
381383 s3_resource_connection_info . auth_method ) ,
382- s3ForcePathStyle : true ,
383- httpOptions : { agent }
384+ requestHandler : noobaa_s3_client . get_requestHandler_with_suitable_agent ( s3_resource_connection_info . endpoint ) ,
384385 } ) ;
385386
386387 return s3 ;
0 commit comments