Skip to content

Commit 025b48f

Browse files
committed
QPID-8203: [Broker-J][AMQP 0-9] Fix maximum message size check
1 parent 560d4a3 commit 025b48f

File tree

2 files changed

+87
-1
lines changed
  • broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8
  • systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/maxsize

2 files changed

+87
-1
lines changed

broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -2217,7 +2217,10 @@ public void receiveMessageHeader(final BasicContentHeaderProperties properties,
22172217
closeChannel(ErrorCodes.MESSAGE_TOO_LARGE,
22182218
"Message size of " + bodySize + " greater than allowed maximum of " + _connection.getMaxMessageSize());
22192219
}
2220-
publishContentHeader(new ContentHeaderBody(properties, bodySize));
2220+
else
2221+
{
2222+
publishContentHeader(new ContentHeaderBody(properties, bodySize));
2223+
}
22212224
}
22222225
else
22232226
{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
*/
21+
package org.apache.qpid.tests.protocol.v0_8.extension.maxsize;
22+
23+
import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
24+
import static org.hamcrest.CoreMatchers.equalTo;
25+
import static org.hamcrest.CoreMatchers.is;
26+
import static org.hamcrest.MatcherAssert.assertThat;
27+
28+
import java.net.InetSocketAddress;
29+
import java.util.stream.Collectors;
30+
import java.util.stream.Stream;
31+
32+
import org.junit.Before;
33+
import org.junit.Test;
34+
35+
import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody;
36+
import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
37+
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseOkBody;
38+
import org.apache.qpid.tests.protocol.ChannelClosedResponse;
39+
import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
40+
import org.apache.qpid.tests.protocol.v0_8.Interaction;
41+
import org.apache.qpid.tests.utils.BrokerAdmin;
42+
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
43+
import org.apache.qpid.tests.utils.BrokerSpecific;
44+
import org.apache.qpid.tests.utils.ConfigItem;
45+
46+
@BrokerSpecific(kind = KIND_BROKER_J)
47+
@ConfigItem(name = "qpid.max_message_size", value = "1000")
48+
public class MaximumMessageSize extends BrokerAdminUsingTestBase
49+
{
50+
private InetSocketAddress _brokerAddress;
51+
52+
@Before
53+
public void setUp()
54+
{
55+
_brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
56+
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
57+
}
58+
59+
@Test
60+
public void limitExceeded() throws Exception
61+
{
62+
String content = Stream.generate(() -> String.valueOf('.')).limit(1001).collect(Collectors.joining());
63+
try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
64+
{
65+
final Interaction interaction = transport.newInteraction();
66+
interaction.openAnonymousConnection()
67+
.channel().open().consumeResponse(ChannelOpenOkBody.class)
68+
.basic().contentHeaderPropertiesContentType("text/plain")
69+
.contentHeaderPropertiesDeliveryMode((byte)1)
70+
.contentHeaderPropertiesPriority((byte)1)
71+
.publishExchange("")
72+
.publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
73+
.content(content)
74+
.publishMessage()
75+
.consumeResponse(ChannelCloseBody.class)
76+
.channel().closeOk()
77+
.connection().close()
78+
.consumeResponse(ConnectionCloseOkBody.class, ChannelClosedResponse.class);
79+
80+
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
81+
}
82+
}
83+
}

0 commit comments

Comments
 (0)