Skip to content

Commit

Permalink
Rabbit mq- RPC message pattern working #40
Browse files Browse the repository at this point in the history
  • Loading branch information
ramya committed Mar 4, 2022
1 parent fdb83a7 commit 4a1b859
Showing 1 changed file with 49 additions and 31 deletions.
80 changes: 49 additions & 31 deletions weather-data-ingestor-microservice/server.js
Original file line number Diff line number Diff line change
@@ -1,61 +1,79 @@

const express = require('express')
// const express = require('express')
const controller = require('./api/controller');
console.log(controller)
const AWS = require('aws-sdk');
var S3 = new AWS.S3({ region: 'us-east-1', maxRetries: 15});

//init express
const app = express()
const port = 3001
app.use(express.json());

// routes(app);

app.listen(port, () => {
console.log(`Server is listening on port ${port}`)
})

var BucketConfig = {
Bucket: 'noaa-nexrad-level2',
Delimiter: '/',
};

// amqplib is a protocol for messaging . So use that
var amqp = require('amqplib/callback_api');
//connect to the rabitmq server

amqp.connect('amqp://localhost', function(error0, connection) {
amqp.connect('amqp://orionRabbit', function(error0, connection) {
if (error0) {
throw error0;
}
console.log("Connection established with rabbitMQ");
// create channel ,establiish connection and declare the queue
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = 'rpc_queue_1';

console.log("Channel connected");
var queue = 'ingestor_rx';
channel.assertQueue(queue, {
durable: false
});
// to spread the load equally over multiple servers we need to set the prefetch setting on the channel
channel.prefetch(1);
console.log(' [x] Awaiting RPC requests');
channel.consume(queue, function reply(msg) {
var userInput = JSON.parse(msg.content.toString());
var ingestorRes;
const [hour, time] = userInput.time.split(":");
const datacenter = userInput.datacenter;
const [year, month, day] = userInput.date.split("-")
BucketConfig.Prefix = `${year}/${month}/${day}/${datacenter}/`;

// var n = parseInt(msg.content.toString());
var n = parseInt(msg.content);

// console.log(" [.] fib(%d)", n);
console.log(n);
S3.makeUnauthenticatedRequest('listObjects', BucketConfig, (err, data) => {
if (err) {
console.log(err);
ingestorRes = {error: "500"};
}
else {
const result = [];

var r = controller.fib(n);
console.log(r)
// var r = fibonacci(n); // function call. need to check with anita
console.log("msg"+msg.content.toJSON());
var r= msg.content;
console.log(r.toJSON());
channel.sendToQueue(msg.properties.replyTo,
Buffer.from(r.toString()), {
correlationId: msg.properties.correlationId
});
if (data) {
const { Contents } = data;
if (Contents.length) {
var hour_ = hour.substring(0,2);
Contents
.forEach(({ Key }) => {
const [, hourString] = Key.split("_");
const hr = hourString.substring(0, 2);
if (hr === hour_) {
result.push(Key)
}
})
}
}
ingestorRes = result.slice(0, 4);
console.log(ingestorRes);

channel.ack(msg);
let stringData = JSON.stringify(ingestorRes);
channel.sendToQueue(msg.properties.replyTo,
Buffer.from(stringData), {
correlationId: msg.properties.correlationId
});
console.log("Sent this back:", stringData);
channel.ack(msg);
}
})
});
});
});
Expand Down

0 comments on commit 4a1b859

Please sign in to comment.