-
Notifications
You must be signed in to change notification settings - Fork 34
Introduce persistence layer for the broker #106
Conversation
a32b55d
to
bb4d246
Compare
bb4d246
to
6cefbd9
Compare
0489b8c
to
b704394
Compare
…next development iteration
…next development iteration
0a5b8af
to
4cee110
Compare
} | ||
|
||
public ByteBuf getRawMetadata() { | ||
return rawMetadata; | ||
public boolean hasAttachedQueues() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we use a single word denote the attached queues (attached or owned)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used the word attached in all the places
@@ -56,15 +55,15 @@ public void basicPublishReceived(ShortString routingKey, ShortString exchangeNam | |||
|
|||
/** | |||
* Add the header frame that gives the relevant metadata for the given message. | |||
* | |||
* @param rawMetadata unprocessed raw metadata {@link ByteBuf} | |||
* @param headers protocol specific headers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's fix the spacing issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed!
* | ||
* @param rawMetadata unprocessed raw metadata {@link ByteBuf} | ||
* @param headers protocol specific headers | ||
* @param properties |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's define what the properties mean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
4cee110
to
84a2c1f
Compare
|
||
void bind(Queue queue, String routingKey, FieldTable arguments) throws BrokerException; | ||
public BindingDao getBindingDao() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is unused. Maybe we can get rid of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method removed.
previousTimestamp = ts; | ||
long id = (ts - REFERENCE_START) * 256L * 1024L + instanceId * 1024L + offset; | ||
if (previousId == id) { | ||
throw new RuntimeException("duplicate ids detected. This should never happen"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to use "Duplicate"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
d0928c3
to
8527888
Compare
} | ||
// Unique queues can be empty due unmatching selectors. | ||
if (!uniqueQueues.isEmpty()) { | ||
boolean published = publishToQueues(message, uniqueQueues); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a local variable for this? Can't directly use it on if?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the code segment.
8527888
to
91baa09
Compare
|
||
if (passive && queueHandler == null) { | ||
throw new BrokerException("QueueHandler [ " + queueName + " ] doesn't exists. Passive parameter " + | ||
"is set, hence not creating the queue."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+
should precede here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
38f5944
to
7d54046
Compare
<!--<classes>--> | ||
<!--<class name="org.wso2.messaging.integration.QueueConsumerTest"/>--> | ||
<!--</classes>--> | ||
<!--</test>--> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we remove this part since we have it above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
<fileSet> | ||
<directory>target/database/</directory> | ||
<outputDirectory>database</outputDirectory> | ||
<fileMode>755</fileMode> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to set the file mode to 755?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to 644
d124b2b
to
3579d50
Compare
- Implement database backed queue with message persist and delete operations.
- Update broker metadata implementation with FieldTables to handle message properties and headers. - Implement database backed queue message restoration at broker startup.
Send an error to the client and log the exception when the client request to close a consumer with an invalid consumer tag.
3579d50
to
7d407f0
Compare
Purpose
Purpose of this feature is to introduce persistence of Messages, Exchanges, Queues and Bindings so that messages won't get lost on server failure.
Resolves #24, Resolves #105, Resolves #93, Resolves #79, Resolves #69
Goals
With this PR we are introducing RDBMS based message persistence layer to persist Messages, Exchanges, Queues and Bindings
Approach
Following key improvements are done
User stories
N/A
Release note
RDBMS based persistent layer. Supports standard SQL based DBMS
Documentation
Training
Certification
Marketing
Automation tests
Yes
Security checks
No
Samples
Related PRs
N/A
Migrations (if applicable)