Skip to content

take() ignored when resuming after Exceptions$ErrorCallbackNotImplemented #858

@yamass

Description

@yamass

I am using the resume() functionality. I have experienced a very strange behavior that I consider one or multiple bugs. It might also be related to #857, since it also logs the message mentioned there.

When calling requestStream() and subscribing to the returned flux without an error handler, the connection gets closed with the following error log:

02-06-2020 19:20:31.306 [reactor-tcp-nio-2] ERROR r.n.c.ChannelOperationsHandler - [id: 0x8793d78e, L:/127.0.0.1:55222 - R:localhost/127.0.0.1:7890] Error was received while reading the incoming data. The connection will be closed.
reactor.core.Exceptions$ErrorCallbackNotImplemented: ApplicationErrorException (0x201): Some Exception for this test
Caused by: io.rsocket.exceptions.ApplicationErrorException: Some Exception for this test
	at io.rsocket.exceptions.Exceptions.from(Exceptions.java:76)
	at io.rsocket.core.RSocketRequester.handleFrame(RSocketRequester.java:646)
	at io.rsocket.core.RSocketRequester.handleIncomingFrames(RSocketRequester.java:585)

When reusing the client connection for another request, the server does receive the request and starts processing it (seen from the logs). However, it does not react on cancel signals from the client, nor does the client receive any results (next signals).

See the code below.

Expected Behavior

After reconnect, cancel and next signals should still be honored.

Actual Behavior

They are not. See test code.

Steps to Reproduce

@Test
public void strangeResumeBehavior() throws Exception {
	CloseableChannel server = RSocketServer.create((setup, sendingRSocket) -> Mono.just(new RSocket() {
		@Override
		public Flux<Payload> requestStream(Payload payload) {
			if (payload.getDataUtf8().equals("flux")) {
				return Flux.interval(Duration.ofMillis(1))
						.doOnNext(aLong -> System.out.println("Server-side doOnNext: " + aLong))
						.doOnCancel(() -> System.out.println("Server-side doOnCancel"))
						.map(aLong -> DefaultPayload.create("" + aLong));
			} else {
				return Flux.error(new RuntimeException("Some Exception for this test"));
			}
		}
	}))
			.resume(new Resume()
					.sessionDuration(Duration.ofSeconds(30))
					.retry(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(2))
							.doBeforeRetry(retrySignal -> System.out.println("RETRYING to connect.")))) // never called. why not?
			.bind(TcpServerTransport.create(TcpServer.create().host("localhost").port(7890)))
			.block();

	RSocket client = RSocketConnector.create()
			.resume(new Resume()
					.sessionDuration(Duration.ofSeconds(30))
					.retry(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(5))))
			.connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(7890)))
			.retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10)))
			.cache()
			.block();


	client.requestStream(DefaultPayload.create("flux"))
			.take(10) // works
			.subscribe(payload -> System.out.println(payload.getDataUtf8()));

	Thread.sleep(1000);

	client.requestStream(DefaultPayload.create("exception"))
			.subscribe(payload -> System.out.println(payload.getDataUtf8()));  // No error handler, so "The connection will be closed."

	Thread.sleep(1000);

	client.requestStream(DefaultPayload.create("flux"))
			.take(10) // <<< Ignored. Why?
			.subscribe(payload -> System.out.println(payload.getDataUtf8())); // <<< Never arrives...

	Thread.sleep(6_000);
}

Possible Solution

Your Environment

  • RSocket version(s) used: 1.0.0
  • Other relevant libraries versions (eg. netty, ...): default rsocket dependencies
  • Platform (eg. JVM version (javar -version) or Node version (node --version)):
    openjdk 14.0.1 2020-04-14
    OpenJDK Runtime Environment AdoptOpenJDK (build 14.0.1+7)
    OpenJDK 64-Bit Server VM AdoptOpenJDK (build 14.0.1+7, mixed mode, sharing)
  • OS and version (eg uname -a): MacOS

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugsupersededIssue is superseded by another

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions