Skip to content

Commit

Permalink
CXF-9057: Chunked Stream is closed regularly when Exception is thrown (
Browse files Browse the repository at this point in the history
  • Loading branch information
reta authored Dec 1, 2024
1 parent 8684601 commit ff6cf7b
Show file tree
Hide file tree
Showing 8 changed files with 585 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,13 @@ public void handleMessage(Message message) {
AttachmentSerializer ser = message.getContent(AttachmentSerializer.class);
if (ser != null) {
try {
message.put(Message.PARTIAL_ATTACHMENTS_MESSAGE, true);
String cte = (String)message.getContextualProperty(Message.CONTENT_TRANSFER_ENCODING);
if (cte != null) {
ser.setContentTransferEncoding(cte);
}
ser.writeAttachments();
message.put(Message.PARTIAL_ATTACHMENTS_MESSAGE, false);
} catch (IOException e) {
throw new Fault(new org.apache.cxf.common.i18n.Message("WRITE_ATTACHMENTS", BUNDLE), e);
}
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/apache/cxf/message/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ public interface Message extends StringMap {
String EMPTY_PARTIAL_RESPONSE_MESSAGE = "org.apache.cxf.partial.response.empty";
String ONE_WAY_REQUEST = "OnewayRequest";

/**
* Boolean property specifying if the attachments have been partially written
* (due to I/O error, fe).
*/
String PARTIAL_ATTACHMENTS_MESSAGE = "org.apache.cxf.partial.attachments";


/**
* Boolean property specifying if oneWay response must be processed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.common.util.StringUtils;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageUtils;
import org.apache.cxf.phase.Phase;
import org.apache.cxf.staxutils.StaxUtils;

Expand All @@ -61,9 +63,16 @@ static class Soap11FaultOutInterceptorInternal extends AbstractSoapInterceptor {
super(Phase.MARSHAL);
}
public void handleMessage(SoapMessage message) throws Fault {
XMLStreamWriter writer = message.getContent(XMLStreamWriter.class);
Fault f = (Fault) message.getContent(Exception.class);

// If only some attachments have been written (usually, using chunked transfer), we could
// have been streaming some data already and may not be able to inject a fault in the middle
// of the data transfer.
if (MessageUtils.getContextualBoolean(message, Message.PARTIAL_ATTACHMENTS_MESSAGE, false)) {
throw f;
}

XMLStreamWriter writer = message.getContent(XMLStreamWriter.class);
SoapFault fault = SoapFault.createFault(f, message.getVersion());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.cxf.common.util.StringUtils;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageUtils;
import org.apache.cxf.phase.Phase;
import org.apache.cxf.staxutils.StaxUtils;

Expand Down Expand Up @@ -64,9 +65,16 @@ static class Soap12FaultOutInterceptorInternal extends AbstractSoapInterceptor {
}
public void handleMessage(SoapMessage message) throws Fault {
LOG.info(getClass() + (String) message.get(Message.CONTENT_TYPE));
Fault f = (Fault)message.getContent(Exception.class);

// If only some attachments have been written (usually, using chunked transfer), we could
// have been streaming some data already and may not be able to inject a fault in the middle
// of the data transfer.
if (MessageUtils.getContextualBoolean(message, Message.PARTIAL_ATTACHMENTS_MESSAGE, false)) {
throw f;
}

XMLStreamWriter writer = message.getContent(XMLStreamWriter.class);
Fault f = (Fault)message.getContent(Exception.class);
message.put(org.apache.cxf.message.Message.RESPONSE_CODE, f.getStatusCode());

SoapFault fault = SoapFault.createFault(f, message.getVersion());
Expand Down
12 changes: 12 additions & 0 deletions systests/jaxws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,18 @@
<goal>wsdl2java</goal>
</goals>
</execution>
<execution>
<id>generate-attachments-test-sources</id>
<phase>generate-test-sources</phase>
<configuration>
<fork>${cxf.codegenplugin.forkmode}</fork>
<testSourceRoot>${basedir}/target/generated/src/test/java</testSourceRoot>
<testWsdlRoot>${basedir}/src/test/resources/attachments</testWsdlRoot>
</configuration>
<goals>
<goal>wsdl2java</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.cxf.systest.jaxws;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Logger;

import jakarta.activation.DataHandler;
import jakarta.activation.DataSource;
import jakarta.xml.ws.Binding;
import jakarta.xml.ws.BindingProvider;
import jakarta.xml.ws.Endpoint;
import jakarta.xml.ws.soap.SOAPBinding;
import jakarta.xml.ws.soap.SOAPFaultException;
import org.apache.cxf.Download;
import org.apache.cxf.DownloadFault_Exception;
import org.apache.cxf.DownloadNextResponseType;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
import org.apache.cxf.testutil.common.AbstractBusTestServerBase;

import org.junit.BeforeClass;
import org.junit.Test;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

public class AttachmentChunkingTest extends AbstractBusClientServerTestBase {
private static final String PORT = allocatePort(DownloadServer.class);
private static final Logger LOG = LogUtils.getLogger(AttachmentChunkingTest.class);

private static final class DownloadImpl implements Download {
@Override
public DownloadNextResponseType downloadNext(Boolean simulate) {
final DownloadNextResponseType responseType = new DownloadNextResponseType();
responseType.setDataContent(new DataHandler(new DataSource() {
@Override
public InputStream getInputStream() {
if (simulate) {
return simulate();
} else {
return generate(100000);
}
}

@Override
public OutputStream getOutputStream() {
return null;
}

@Override
public String getContentType() {
return "";
}

@Override
public String getName() {
return "";
}
}));

return responseType;
}
}

public static class DownloadServer extends AbstractBusTestServerBase {
protected void run() {
Object implementor = new DownloadImpl();
String address = "http://localhost:" + PORT + "/SoapContext/SoapPort";
Endpoint.publish(address, implementor);
}

public static void main(String[] args) {
try {
DownloadServer s = new DownloadServer();
s.start();
} catch (Exception ex) {
ex.printStackTrace();
System.exit(-1);
} finally {
LOG.info("done!");
}
}
}

@BeforeClass
public static void startServers() throws Exception {
assertTrue("server did not launch correctly", launchServer(DownloadServer.class, true));
}

@Test
public void testChunkingPartialFailure() {
final JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
factory.setServiceClass(Download.class);

final Download client = (Download) factory.create();
final BindingProvider bindingProvider = (BindingProvider) client;
final Binding binding = bindingProvider.getBinding();

final String address = String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort", PORT);
bindingProvider.getRequestContext().put("jakarta.xml.ws.service.endpoint.address", address);
((SOAPBinding) binding).setMTOMEnabled(true);

// See please https://issues.apache.org/jira/browse/CXF-9057
SOAPFaultException ex = assertThrows(SOAPFaultException.class, () -> client.downloadNext(true));
assertThat(ex.getMessage(), containsString("simulated error during stream processing"));
}

@Test
public void testChunking() throws IOException, DownloadFault_Exception {
final JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
factory.setServiceClass(Download.class);

final Download client = (Download) factory.create();
final BindingProvider bindingProvider = (BindingProvider) client;
final Binding binding = bindingProvider.getBinding();

final String address = String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort", PORT);
bindingProvider.getRequestContext().put("jakarta.xml.ws.service.endpoint.address", address);
((SOAPBinding) binding).setMTOMEnabled(true);

final DownloadNextResponseType response = client.downloadNext(false);
assertThat(response.getDataContent().getInputStream().readAllBytes().length, equalTo(100000));
}

private static InputStream generate(int size) {
final byte[] buf = new byte[size];
Arrays.fill(buf, (byte) 'x');
return new ByteArrayInputStream(buf);
}

private static InputStream simulate() {
return new InputStream() {
@Override
public int read() {
return (byte) 'x';
}

@Override
public int read(byte[] b, int off, int len) {
if (ThreadLocalRandom.current().nextBoolean()) {
throw new IllegalArgumentException("simulated error during stream processing");
}

for (int i = off; i < off + len; i++) {
b[i] = (byte) 'x';
}

return len;
}
};
}
}
84 changes: 84 additions & 0 deletions systests/jaxws/src/test/resources/attachments/cxf9057.wsdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
<?xml version="1.0" encoding="UTF-8"?>
<wsdl:definitions name="Download" targetNamespace="http://cxf.apache.org/"
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:tns="http://cxf.apache.org/"
xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/"
xmlns:wsam="http://www.w3.org/2007/05/addressing/metadata">
<wsdl:types>
<xs:schema xmlns:tns="http://cxf.apache.org/"
xmlns:xmime="http://www.w3.org/2005/05/xmlmime"
xmlns:xs="http://www.w3.org/2001/XMLSchema" attributeFormDefault="unqualified"
elementFormDefault="unqualified" targetNamespace="http://cxf.apache.org/">

<xs:import namespace="http://www.w3.org/2005/05/xmlmime"/>
<xs:element name="downloadNext" type="tns:downloadNext"/>
<xs:element name="downloadNextResponse" type="tns:downloadNextResponse"/>
<xs:complexType name="downloadNextResponseType">
<xs:sequence>
<xs:element name="dataContent" type="xs:base64Binary"
xmime:expectedContentTypes="application/octet-stream"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="downloadNext">
<xs:sequence>
<xs:element minOccurs="0" name="simulate" type="xs:boolean"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="downloadNextResponse">
<xs:sequence>
<xs:element form="qualified" minOccurs="0" name="downloadNextResponse"
type="tns:downloadNextResponseType"/>
</xs:sequence>
</xs:complexType>
<xs:element name="DownloadFault" type="tns:DownloadFault"/>
<xs:complexType name="DownloadFault">
<xs:sequence/>
</xs:complexType>
</xs:schema>
</wsdl:types>
<wsdl:message name="DownloadFault">
<wsdl:part name="DownloadFault" element="tns:DownloadFault">
</wsdl:part>
</wsdl:message>
<wsdl:message name="downloadNextResponse">
<wsdl:part name="parameters" element="tns:downloadNextResponse">
</wsdl:part>
</wsdl:message>
<wsdl:message name="downloadNext">
<wsdl:part name="parameters" element="tns:downloadNext">
</wsdl:part>
</wsdl:message>
<wsdl:portType name="Download">
<wsdl:operation name="downloadNext">
<wsdl:input name="downloadNext" message="tns:downloadNext"
wsam:Action="http://cxf.apache.org/downloadNext">
</wsdl:input>
<wsdl:output name="downloadNextResponse" message="tns:downloadNextResponse"
wsam:Action="http://cxf.apache.org/Download/downloadNextResponse">
</wsdl:output>
<wsdl:fault name="DownloadFault" message="tns:DownloadFault"
wsam:Action="http://cxf.apache.org/Download/downloadNext/Fault/DownloadFault">
</wsdl:fault>
</wsdl:operation>
</wsdl:portType>
<wsdl:binding name="DownloadServiceInterfaceServiceSoapBinding" type="tns:Download">
<soap:binding style="document" transport="http://schemas.xmlsoap.org/soap/http"/>
<wsdl:operation name="downloadNext">
<soap:operation soapAction="http://cxf.apache.org/downloadNext" style="document"/>
<wsdl:input name="downloadNext">
<soap:body use="literal"/>
</wsdl:input>
<wsdl:output name="downloadNextResponse">
<soap:body use="literal"/>
</wsdl:output>
<wsdl:fault name="DownloadFault">
<soap:fault name="DownloadFault" use="literal"/>
</wsdl:fault>
</wsdl:operation>
</wsdl:binding>
<wsdl:service name="DownloadServiceInterfaceService">
<wsdl:port name="DownloadPort" binding="tns:DownloadServiceInterfaceServiceSoapBinding">
<soap:address location="http://localhost:9090/DownloadPort"/>
</wsdl:port>
</wsdl:service>
</wsdl:definitions>
Loading

0 comments on commit ff6cf7b

Please sign in to comment.