diff --git a/weather-data-ingestor-microservice/server.js b/weather-data-ingestor-microservice/server.js index 3869a10..da63fd3 100644 --- a/weather-data-ingestor-microservice/server.js +++ b/weather-data-ingestor-microservice/server.js @@ -1,35 +1,31 @@ -const express = require('express') +// const express = require('express') const controller = require('./api/controller'); console.log(controller) +const AWS = require('aws-sdk'); +var S3 = new AWS.S3({ region: 'us-east-1', maxRetries: 15}); - //init express - const app = express() - const port = 3001 - app.use(express.json()); - -// routes(app); - - app.listen(port, () => { - console.log(`Server is listening on port ${port}`) -}) - +var BucketConfig = { + Bucket: 'noaa-nexrad-level2', + Delimiter: '/', +}; // amqplib is a protocol for messaging . So use that var amqp = require('amqplib/callback_api'); //connect to the rabitmq server -amqp.connect('amqp://localhost', function(error0, connection) { +amqp.connect('amqp://orionRabbit', function(error0, connection) { if (error0) { throw error0; } + console.log("Connection established with rabbitMQ"); // create channel ,establiish connection and declare the queue connection.createChannel(function(error1, channel) { if (error1) { throw error1; } - var queue = 'rpc_queue_1'; - + console.log("Channel connected"); + var queue = 'ingestor_rx'; channel.assertQueue(queue, { durable: false }); @@ -37,25 +33,47 @@ amqp.connect('amqp://localhost', function(error0, connection) { channel.prefetch(1); console.log(' [x] Awaiting RPC requests'); channel.consume(queue, function reply(msg) { + var userInput = JSON.parse(msg.content.toString()); + var ingestorRes; + const [hour, time] = userInput.time.split(":"); + const datacenter = userInput.datacenter; + const [year, month, day] = userInput.date.split("-") + BucketConfig.Prefix = `${year}/${month}/${day}/${datacenter}/`; - // var n = parseInt(msg.content.toString()); - var n = parseInt(msg.content); - - // console.log(" [.] fib(%d)", n); - console.log(n); + S3.makeUnauthenticatedRequest('listObjects', BucketConfig, (err, data) => { + if (err) { + console.log(err); + ingestorRes = {error: "500"}; + } + else { + const result = []; - var r = controller.fib(n); - console.log(r) - // var r = fibonacci(n); // function call. need to check with anita - console.log("msg"+msg.content.toJSON()); - var r= msg.content; - console.log(r.toJSON()); - channel.sendToQueue(msg.properties.replyTo, - Buffer.from(r.toString()), { - correlationId: msg.properties.correlationId - }); + if (data) { + const { Contents } = data; + if (Contents.length) { + var hour_ = hour.substring(0,2); + Contents + .forEach(({ Key }) => { + const [, hourString] = Key.split("_"); + const hr = hourString.substring(0, 2); + if (hr === hour_) { + result.push(Key) + } + }) + } + } + ingestorRes = result.slice(0, 4); + console.log(ingestorRes); - channel.ack(msg); + let stringData = JSON.stringify(ingestorRes); + channel.sendToQueue(msg.properties.replyTo, + Buffer.from(stringData), { + correlationId: msg.properties.correlationId + }); + console.log("Sent this back:", stringData); + channel.ack(msg); + } + }) }); }); });