Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,22 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) {
handleMissingResponseProcessor(streamId, type, frame);
return;
}
receiver.onError(Exceptions.from(streamId, frame));

// FIXME: when https://github.com/reactor/reactor-core/issues/2176 is resolved
// This is workaround to handle specific Reactor related case when
// onError call may not return normally
try {
receiver.onError(Exceptions.from(streamId, frame));
} catch (RuntimeException e) {
if (reactor.core.Exceptions.isBubbling(e)
|| reactor.core.Exceptions.isErrorCallbackNotImplemented(e)) {
if (LOGGER.isDebugEnabled()) {
Throwable unwrapped = reactor.core.Exceptions.unwrap(e);
LOGGER.debug("Unhandled dropped exception", unwrapped);
}
}
}

receivers.remove(streamId);
break;
case CANCEL:
Expand Down
67 changes: 37 additions & 30 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,20 @@ private void handleFrame(ByteBuf frame) {
case ERROR:
receiver = channelProcessors.get(streamId);
if (receiver != null) {
receiver.onError(io.rsocket.exceptions.Exceptions.from(streamId, frame));
// FIXME: when https://github.com/reactor/reactor-core/issues/2176 is resolved
// This is workaround to handle specific Reactor related case when
// onError call may not return normally
try {
receiver.onError(io.rsocket.exceptions.Exceptions.from(streamId, frame));
} catch (RuntimeException e) {
if (reactor.core.Exceptions.isBubbling(e)
|| reactor.core.Exceptions.isErrorCallbackNotImplemented(e)) {
if (LOGGER.isDebugEnabled()) {
Throwable unwrapped = reactor.core.Exceptions.unwrap(e);
LOGGER.debug("Unhandled dropped exception", unwrapped);
}
}
}
}
break;
case NEXT_COMPLETE:
Expand Down Expand Up @@ -448,43 +461,38 @@ protected void hookOnNext(Payload payload) {
try {
if (!PayloadValidationUtils.isValid(mtu, payload)) {
payload.release();
// specifically for requestChannel case so when Payload is invalid we will not be
// sending CancelFrame and ErrorFrame
// Note: CancelFrame is redundant and due to spec
// (https://github.com/rsocket/rsocket/blob/master/Protocol.md#request-channel)
// Upon receiving an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream
// is
// terminated on both Requester and Responder.
// Upon sending an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream is
// terminated on both the Requester and Responder.
if (requestChannel != null) {
channelProcessors.remove(streamId, requestChannel);
}
cancel();
final IllegalArgumentException t =
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
handleError(streamId, t);

cancelStream(t);
return;
}

ByteBuf byteBuf =
PayloadFrameCodec.encodeNextReleasingPayload(allocator, streamId, payload);
sendProcessor.onNext(byteBuf);
} catch (Throwable e) {
// specifically for requestChannel case so when Payload is invalid we will not be
// sending CancelFrame and ErrorFrame
// Note: CancelFrame is redundant and due to spec
// (https://github.com/rsocket/rsocket/blob/master/Protocol.md#request-channel)
// Upon receiving an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream is
// terminated on both Requester and Responder.
// Upon sending an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream is
// terminated on both the Requester and Responder.
if (requestChannel != null) {
channelProcessors.remove(streamId, requestChannel);
}
cancel();
handleError(streamId, e);
cancelStream(e);
}
}

private void cancelStream(Throwable t) {
// Cancel the output stream and send an ERROR frame but do not dispose the
// requestChannel (i.e. close the connection) since the spec allows to leave
// the channel in half-closed state.
// specifically for requestChannel case so when Payload is invalid we will not be
// sending CancelFrame and ErrorFrame
// Note: CancelFrame is redundant and due to spec
// (https://github.com/rsocket/rsocket/blob/master/Protocol.md#request-channel)
// Upon receiving an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream
// is terminated on both Requester and Responder.
// Upon sending an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream is
// terminated on both the Requester and Responder.
if (requestChannel != null) {
channelProcessors.remove(streamId, requestChannel);
}
cancel();
handleError(streamId, t);
}

@Override
Expand All @@ -502,8 +510,7 @@ protected void hookOnError(Throwable throwable) {
// Note: CancelFrame is redundant and due to spec
// (https://github.com/rsocket/rsocket/blob/master/Protocol.md#request-channel)
// Upon receiving an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream
// is
// terminated on both Requester and Responder.
// is terminated on both Requester and Responder.
// Upon sending an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream is
// terminated on both the Requester and Responder.
if (requestChannel != null && !requestChannel.isDisposed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1081,6 +1081,28 @@ public void shouldTerminateAllStreamsIfThereRacingBetweenDisposeAndRequests(
}
}

@Test
// see https://github.com/rsocket/rsocket-java/issues/858
public void testWorkaround858() {
ByteBuf buffer = rule.alloc().buffer();
buffer.writeCharSequence("test", CharsetUtil.UTF_8);

rule.socket.requestResponse(ByteBufPayload.create(buffer)).subscribe();

rule.connection.addToReceivedBuffer(
ErrorFrameCodec.encode(rule.alloc(), 1, new RuntimeException("test")));

Assertions.assertThat(rule.connection.getSent())
.hasSize(1)
.first()
.matches(bb -> FrameHeaderCodec.frameType(bb) == REQUEST_RESPONSE)
.matches(ByteBuf::release);

Assertions.assertThat(rule.socket.isDisposed()).isFalse();

rule.assertHasNoLeaks();
}

public static class ClientSocketRule extends AbstractSocketRule<RSocketRequester> {
@Override
protected RSocketRequester newRSocket() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,23 +429,23 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {

ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, 1, Integer.MAX_VALUE);

ByteBuf m1 = allocator.buffer();
m1.writeCharSequence("m1", CharsetUtil.UTF_8);
ByteBuf d1 = allocator.buffer();
d1.writeCharSequence("d1", CharsetUtil.UTF_8);
Payload np1 = ByteBufPayload.create(d1, m1);

ByteBuf m2 = allocator.buffer();
m2.writeCharSequence("m2", CharsetUtil.UTF_8);
ByteBuf d2 = allocator.buffer();
d2.writeCharSequence("d2", CharsetUtil.UTF_8);
Payload np2 = ByteBufPayload.create(d2, m2);

ByteBuf m3 = allocator.buffer();
m3.writeCharSequence("m3", CharsetUtil.UTF_8);
ByteBuf d3 = allocator.buffer();
d3.writeCharSequence("d3", CharsetUtil.UTF_8);
Payload np3 = ByteBufPayload.create(d3, m3);
ByteBuf m1 = allocator.buffer();
m1.writeCharSequence("m1", CharsetUtil.UTF_8);
ByteBuf d1 = allocator.buffer();
d1.writeCharSequence("d1", CharsetUtil.UTF_8);
Payload np1 = ByteBufPayload.create(d1, m1);

ByteBuf m2 = allocator.buffer();
m2.writeCharSequence("m2", CharsetUtil.UTF_8);
ByteBuf d2 = allocator.buffer();
d2.writeCharSequence("d2", CharsetUtil.UTF_8);
Payload np2 = ByteBufPayload.create(d2, m2);

ByteBuf m3 = allocator.buffer();
m3.writeCharSequence("m3", CharsetUtil.UTF_8);
ByteBuf d3 = allocator.buffer();
d3.writeCharSequence("d3", CharsetUtil.UTF_8);
Payload np3 = ByteBufPayload.create(d3, m3);

FluxSink<Payload> sink = sinks[0];
RaceTestUtils.race(
Expand All @@ -464,9 +464,10 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {

Assertions.assertThat(rule.connection.getSent()).allMatch(ReferenceCounted::release);

assertSubscriber.assertTerminated()
.assertError(CancellationException.class)
.assertErrorMessage("Disposed");
assertSubscriber
.assertTerminated()
.assertError(CancellationException.class)
.assertErrorMessage("Disposed");
Assertions.assertThat(assertSubscriber.values()).allMatch(ReferenceCounted::release);
rule.assertHasNoLeaks();
}
Expand Down Expand Up @@ -772,6 +773,42 @@ private static Stream<FrameType> refCntCases() {
return Stream.of(REQUEST_RESPONSE, REQUEST_STREAM, REQUEST_CHANNEL);
}

@Test
// see https://github.com/rsocket/rsocket-java/issues/858
public void testWorkaround858() {
ByteBuf buffer = rule.alloc().buffer();
buffer.writeCharSequence("test", CharsetUtil.UTF_8);

TestPublisher<Payload> testPublisher = TestPublisher.create();

rule.setAcceptingSocket(
new RSocket() {
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
Flux.from(payloads).doOnNext(ReferenceCounted::release).subscribe();

return testPublisher.flux();
}
});

rule.connection.addToReceivedBuffer(
RequestChannelFrameCodec.encodeReleasingPayload(
rule.alloc(), 1, false, 1, ByteBufPayload.create(buffer)));
rule.connection.addToReceivedBuffer(
ErrorFrameCodec.encode(rule.alloc(), 1, new RuntimeException("test")));

Assertions.assertThat(rule.connection.getSent())
.hasSize(1)
.first()
.matches(bb -> FrameHeaderCodec.frameType(bb) == REQUEST_N)
.matches(ReferenceCounted::release);

Assertions.assertThat(rule.socket.isDisposed()).isFalse();
testPublisher.assertWasCancelled();

rule.assertHasNoLeaks();
}

public static class ServerSocketRule extends AbstractSocketRule<RSocketResponder> {

private RSocket acceptingSocket;
Expand Down