Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow to write response in multiple threads in order instead of fixed thread #12

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 26 additions & 16 deletions httpserver/src/main/java/esa/httpserver/core/Response.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,32 @@
/**
* An interface defines a http server response.
* <p>
* !Note: It would be uncompleted state which means current {@link Response} haven't been completely written(eg. only
* part of the body is written, and waiting to write the left.) and it could be indicated by the return value of {@link
* #isCommitted()} {@link #isEnded()}.
* !Note: It would be uncompleted state which means current {@link Response} hasn't been completely written(eg. only
* part of the body was written, and waiting to write the left. and it could be indicated by the return values of {@link
* #isCommitted()} and {@link #isEnded()}.
* <p>
* Also, you should know that this response is not designed thread-safe, but it does a special committing control that
* only one thread is allowed to commit this response, which means you can not commit this response by calling {@link
* #write} or {@link #end} method if another thread had committed this response(such as calling the {@link #write}
* method).
* Also, you should know that this response is not designed as thread-safe, but it does a special committing control:
* <p>
* <strong>Every call of writes should be kept in order.</strong>
* </p>
* which means it is not allowed to call {@link #write(byte[])} or {@link #end(byte[])} concurrently.
* Simply put, you must keep your writes in order, whatever these writes are in one thread or different threads.
* <p>
* Typically, write response in one thread
* <pre>{@code
* response.write("foo".getBytes());
* response.write("bar".getBytes());
* res.end("baz".getBytes());
* }</pre>
* <p>
* Write response asynchronously
* <pre>{@code
* response.write("foo".getBytes()).addListener(f -> {
* response.write("bar".getBytes()).addListener(f1 -> {
* response.end("baz".getBytes());
* });
* });
* }</pre>
*/
public interface Response {

Expand All @@ -53,7 +71,6 @@ public interface Response {
* Set the response code
*
* @param code code
*
* @return this
*/
Response setStatus(int code);
Expand All @@ -76,7 +93,6 @@ public interface Response {
* Adds the specified cookie to the response. This method can be called multiple times to set more than one cookie.
*
* @param cookie the Cookie to return to the client
*
* @return this
*/
Response addCookie(Cookie cookie);
Expand All @@ -86,7 +102,6 @@ public interface Response {
*
* @param name cookie name
* @param value value
*
* @return this
*/
Response addCookie(String name, String value);
Expand Down Expand Up @@ -121,7 +136,6 @@ default Future<Void> write(byte[] data, int offset) {
* @param data data to write
* @param offset offset
* @param length length
*
* @return future
*/
Future<Void> write(byte[] data, int offset, int length);
Expand All @@ -135,8 +149,7 @@ default Future<Void> write(byte[] data, int offset) {
* {@link ByteBuf} allocation/deallocation
*
* @param data data to write
*
* @return future
* 3 * @return future
*/
Future<Void> write(ByteBuf data);

Expand All @@ -163,7 +176,6 @@ default Future<Void> end(byte[] data, int offset) {
* @param data data to write
* @param offset offset
* @param length length
*
* @return future
*/
Future<Void> end(byte[] data, int offset, int length);
Expand All @@ -188,7 +200,6 @@ default Future<Void> end() {
* {@link ByteBuf} allocation/deallocation
*
* @param data data to write
*
* @return future
*/
Future<Void> end(ByteBuf data);
Expand All @@ -200,7 +211,6 @@ default Future<Void> end() {
* URI. otherwise it will be regarded as a relative path to root path.
*
* @param newUri target uri
*
* @return future
*/
default Future<Void> sendRedirect(String newUri) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ private void checkEnded() {
@Override
public String toString() {
return StringUtils.concat("Request",
response().isCommitted() ? "![" : "-[",
isEnded() ? "![" : "-[",
version().name(),
" ", rawMethod(),
" ", path(),
Expand Down
145 changes: 95 additions & 50 deletions httpserver/src/main/java/esa/httpserver/impl/BaseResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,23 @@ abstract class BaseResponse<REQ extends BaseRequestHandle> implements Response {
final ChannelPromise onEndPromise;
final ChannelPromise endPromise;
int status = 200;
private volatile Thread committed;
private volatile boolean ended;

private static final AtomicReferenceFieldUpdater<BaseResponse, Thread> COMMITTED_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(BaseResponse.class, Thread.class, "committed");
/**
* Indicates the committing status of current response.
*
* <ul>
* <li>{@code null}: INIT, response hasn't been committed or ended yet.</li>
* <li>An instance {@link Thread}: committing, response is being written by this thread and response has been
* committed but hasn't been ended.</li>
* <li>{@link #IDLE}: IDLE, response has been committed but hasn't been ended. and can be written now.</li>
* <li>{@link #IDLE}: END, response has been ended. and can not be written.</li>
* </ul>
*/
private volatile Object committed;

private static final AtomicReferenceFieldUpdater<BaseResponse, Object> COMMITTED_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(BaseResponse.class, Object.class, "committed");
private static final Object END = new Object();
private static final Object IDLE = new Object();

BaseResponse(REQ req) {
this.request = req;
Expand All @@ -68,9 +80,6 @@ public int status() {

@Override
public Response setStatus(int code) {
if (isCommitted()) {
return this;
}
this.status = code;
return this;
}
Expand Down Expand Up @@ -119,23 +128,31 @@ public Future<Void> write(byte[] data, int offset, int length) {
}

private Future<Void> write0(byte[] data, int offset, int length) {
final boolean casWon = ensureCommittedExclusively();
if (!casWon && isEnded()) {
throw new IllegalStateException("Already ended");
boolean writeHead = ensureCommitExclusively();
try {
return doWrite(data, offset, length, writeHead);
} finally {
// set to IDlE state to indicates that the current write has been over and the next write is allowed.
// we can use lazySet() because the IDLE value set here can be observed by other thread if the call of
// write is kept in order which needs some means to ensure the memory visibility.
COMMITTED_UPDATER.lazySet(this, IDLE);
}
return doWrite(data, offset, length, casWon);
}

@Override
public Future<Void> write(ByteBuf data) {
if (data == null) {
data = Unpooled.EMPTY_BUFFER;
}
final boolean casWon = ensureCommittedExclusively();
if (!casWon && isEnded()) {
throw new IllegalStateException("Already ended");
boolean writeHead = ensureCommitExclusively();
try {
return doWrite(data, writeHead);
} finally {
// set to IDlE to indicates that the current write has been over and the next write is allowed.
// we can use lazySet() because the IDLE value set here can be observed by other thread if the call of
// write is kept in order which needs some means to ensure the memory visibility.
COMMITTED_UPDATER.lazySet(this, IDLE);
}
return doWrite(data, casWon);
}

@Override
Expand Down Expand Up @@ -166,9 +183,9 @@ public Future<Void> end(byte[] data, int offset, int length) {
}

private Future<Void> end0(byte[] data, int offset, int length) {
final boolean casWon = ensureEndedExclusively();
final boolean writeHead = ensureEndExclusively(true);
try {
doEnd(data, offset, length, casWon);
doEnd(data, offset, length, writeHead);
} finally {
if (!isKeepAlive()) {
endPromise.addListener(closure());
Expand All @@ -183,9 +200,9 @@ public Future<Void> end(ByteBuf data) {
if (data == null) {
data = Unpooled.EMPTY_BUFFER;
}
final boolean casWon = ensureEndedExclusively();
final boolean writeHead = ensureEndExclusively(true);
try {
doEnd(data, casWon, false);
doEnd(data, writeHead, false);
} finally {
if (!isKeepAlive()) {
endPromise.addListener(closure());
Expand All @@ -205,9 +222,8 @@ public Future<Void> sendFile(File file, long offset, long length) {
ExceptionUtils.throwException(new FileNotFoundException(file.getName()));
}

if (!ensureEndedExclusively()) {
throw new IllegalStateException("Already committed");
}
ensureEndExclusively(false);

if (!headers().contains(CONTENT_TYPE)) {
String contentType = MimeMappings.getMimeTypeOrDefault(file.getPath());
headers().set(CONTENT_TYPE, contentType);
Expand All @@ -232,12 +248,12 @@ public boolean isWritable() {

@Override
public boolean isCommitted() {
return committed != null;
return COMMITTED_UPDATER.get(this) != null;
}

@Override
public boolean isEnded() {
return ended;
return COMMITTED_UPDATER.get(this) == END;
}

@Override
Expand All @@ -257,35 +273,70 @@ public ByteBufAllocator alloc() {

@Override
public String toString() {
Object committed = COMMITTED_UPDATER.get(this);
String committedStatus;
if (committed == null) {
committedStatus = "-";
} else if (committed == END) {
committedStatus = "!";
} else {
committedStatus = "~";
}
return StringUtils.concat("Response",
isCommitted() ? "![" : "-[",
committedStatus, "[",
request.rawMethod(),
" ", request.path(),
" ", Integer.toString(status),
"]");
}

boolean ensureEndedExclusively() {
boolean committed = ensureCommittedExclusively();
if (ended) {
boolean ensureCommitExclusively() {
final Object current = COMMITTED_UPDATER.get(this);
if (current == null) {
// INIT
final Thread t = Thread.currentThread();
if (COMMITTED_UPDATER.compareAndSet(this, null, t)) {
return true;
}
throw new IllegalStateException("Concurrent committing['INIT' -> '" + t.getName() + "']");
} else if (current == END) {
// END
throw new IllegalStateException("Already ended");
} else if (current == IDLE) {
// IDLE
final Thread t = Thread.currentThread();
if (COMMITTED_UPDATER.compareAndSet(this, IDLE, t)) {
return false;
}
throw new IllegalStateException("Concurrent committing['IDLE' -> '" + t.getName() + "']");
} else {
// current response is being committing by another thread
throw new IllegalStateException("Concurrent committing ['" + ((Thread) current).getName() +
"' -> '" + Thread.currentThread().getName() + "']");
}
ended = true;
return committed;
}

boolean ensureCommittedExclusively() {
final Thread current = Thread.currentThread();
if (COMMITTED_UPDATER.compareAndSet(this, null, current)) {
return true;
}

if (current == committed) {
return false;
boolean ensureEndExclusively(boolean allowCommitted) {
final Object current = COMMITTED_UPDATER.get(this);
if (current == null) {
// INIT
if (COMMITTED_UPDATER.compareAndSet(this, null, END)) {
return true;
}
throw new IllegalStateException("Concurrent ending['INIT' -> 'END']");
} else if (current == IDLE) {
// IDLE
if (allowCommitted && COMMITTED_UPDATER.compareAndSet(this, IDLE, END)) {
return false;
}
throw new IllegalStateException("Concurrent ending['IDLE' -> 'END']");
} else if (current == END) {
// END
throw new IllegalStateException("Already ended");
} else {
// current response is being committing by another thread
throw new IllegalStateException("Concurrent ending['" + ((Thread) current).getName() + "' -> 'END']");
}

throw new IllegalStateException("Should be committed by same thread. expected '"
+ committed.getName() + "' but '" + current.getName() + "'");
}

ChannelHandlerContext ctx() {
Expand Down Expand Up @@ -321,14 +372,8 @@ boolean tryEnd(HttpResponseStatus status,
}

private boolean tryEnd() {
if (committed == null) {
final Thread current = Thread.currentThread();
if (COMMITTED_UPDATER.compareAndSet(this, null, current)) {
ended = true;
return true;
}
}
return false;
final Object current = COMMITTED_UPDATER.get(this);
return current == null && COMMITTED_UPDATER.compareAndSet(this, null, END);
}

abstract Future<Void> doWrite(ByteBuf data, boolean writeHead);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,15 @@ void testAggregatedWithDuplicatedData() {
assertSame(AggregationHandle.EMPTY, req.aggregated());
}

@Test
void testToString() {
final Req req = plainReq();
assertEquals("Request-[HTTP_1_1 GET /foo]", req.toString());

req.isEnded = true;
assertEquals("Request![HTTP_1_1 GET /foo]", req.toString());
}

static Req plainReq() {
return new Req(new EmbeddedChannel(new ChannelInboundHandlerAdapter()).pipeline().firstContext(), "/foo");
}
Expand Down
Loading