Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.function.Supplier;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

public class RSocketConnector {
Expand Down Expand Up @@ -293,7 +294,8 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
(int) keepAliveInterval.toMillis(),
(int) keepAliveMaxLifeTime.toMillis(),
keepAliveHandler,
requesterLeaseHandler);
requesterLeaseHandler,
Schedulers.single(Schedulers.parallel()));

RSocket wrappedRSocketRequester = interceptors.initRequester(rSocketRequester);

Expand Down
248 changes: 130 additions & 118 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import reactor.core.publisher.Operators;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.util.concurrent.Queues;

/**
Expand Down Expand Up @@ -105,6 +106,7 @@ class RSocketRequester implements RSocket {
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
private volatile Throwable terminationError;
private final MonoProcessor<Void> onClose;
private final Scheduler serialScheduler;

RSocketRequester(
DuplexConnection connection,
Expand All @@ -115,7 +117,8 @@ class RSocketRequester implements RSocket {
int keepAliveTickPeriod,
int keepAliveAckTimeout,
@Nullable KeepAliveHandler keepAliveHandler,
RequesterLeaseHandler leaseHandler) {
RequesterLeaseHandler leaseHandler,
Scheduler serialScheduler) {
this.connection = connection;
this.allocator = connection.alloc();
this.payloadDecoder = payloadDecoder;
Expand All @@ -126,6 +129,7 @@ class RSocketRequester implements RSocket {
this.senders = new SynchronizedIntObjectHashMap<>();
this.receivers = new SynchronizedIntObjectHashMap<>();
this.onClose = MonoProcessor.create();
this.serialScheduler = serialScheduler;

// DO NOT Change the order here. The Send processor must be subscribed to before receiving
this.sendProcessor = new UnboundedProcessor<>();
Expand Down Expand Up @@ -208,22 +212,23 @@ private Mono<Void> handleFireAndForget(Payload payload) {

final AtomicBoolean once = new AtomicBoolean();

return Mono.defer(
() -> {
if (once.getAndSet(true)) {
return Mono.error(
new IllegalStateException("FireAndForgetMono allows only a single subscriber"));
}
return Mono.<Void>defer(
() -> {
if (once.getAndSet(true)) {
return Mono.error(
new IllegalStateException("FireAndForgetMono allows only a single subscriber"));
}

final int streamId = streamIdSupplier.nextStreamId(receivers);
final ByteBuf requestFrame =
RequestFireAndForgetFrameFlyweight.encodeReleasingPayload(
allocator, streamId, payload);
final int streamId = streamIdSupplier.nextStreamId(receivers);
final ByteBuf requestFrame =
RequestFireAndForgetFrameFlyweight.encodeReleasingPayload(
allocator, streamId, payload);

sendProcessor.onNext(requestFrame);
sendProcessor.onNext(requestFrame);

return Mono.empty();
});
return Mono.empty();
})
.subscribeOn(serialScheduler);
}

private Mono<Payload> handleRequestResponse(final Payload payload) {
Expand Down Expand Up @@ -284,6 +289,7 @@ public void hookOnTerminal(SignalType signalType) {
receivers.remove(streamId, receiver);
}
}))
.subscribeOn(serialScheduler)
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
});
}
Expand Down Expand Up @@ -356,6 +362,7 @@ void hookOnTerminal(SignalType signalType) {
receivers.remove(streamId);
}
}))
.subscribeOn(serialScheduler, false)
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
});
}
Expand Down Expand Up @@ -392,120 +399,125 @@ private Flux<? extends Payload> handleChannel(Payload initialPayload, Flux<Paylo

final UnicastProcessor<Payload> receiver = UnicastProcessor.create();

return receiver.transform(
Operators.<Payload, Payload>lift(
(s, actual) ->
new RequestOperator(actual) {
return receiver
.transform(
Operators.<Payload, Payload>lift(
(s, actual) ->
new RequestOperator(actual) {

final BaseSubscriber<Payload> upstreamSubscriber =
new BaseSubscriber<Payload>() {
final BaseSubscriber<Payload> upstreamSubscriber =
new BaseSubscriber<Payload>() {

boolean first = true;
boolean first = true;

@Override
protected void hookOnSubscribe(Subscription subscription) {
// noops
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
// noops
}

@Override
protected void hookOnNext(Payload payload) {
if (first) {
// need to skip first since we have already sent it
// no need to release it since it was released earlier on the request
// establishment
// phase
first = false;
request(1);
return;
}
if (!PayloadValidationUtils.isValid(mtu, payload)) {
payload.release();
cancel();
final IllegalArgumentException t =
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
errorConsumer.accept(t);
// no need to send any errors.
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
receiver.onError(t);
return;
}
final ByteBuf frame =
PayloadFrameFlyweight.encodeNextReleasingPayload(
allocator, streamId, payload);

sendProcessor.onNext(frame);
}
@Override
protected void hookOnNext(Payload payload) {
if (first) {
// need to skip first since we have already sent it
// no need to release it since it was released earlier on the
// request
// establishment
// phase
first = false;
request(1);
return;
}
if (!PayloadValidationUtils.isValid(mtu, payload)) {
payload.release();
cancel();
final IllegalArgumentException t =
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
errorConsumer.accept(t);
// no need to send any errors.
sendProcessor.onNext(
CancelFrameFlyweight.encode(allocator, streamId));
receiver.onError(t);
return;
}
final ByteBuf frame =
PayloadFrameFlyweight.encodeNextReleasingPayload(
allocator, streamId, payload);

sendProcessor.onNext(frame);
}

@Override
protected void hookOnComplete() {
ByteBuf frame =
PayloadFrameFlyweight.encodeComplete(allocator, streamId);
sendProcessor.onNext(frame);
}

@Override
protected void hookOnError(Throwable t) {
ByteBuf frame = ErrorFrameFlyweight.encode(allocator, streamId, t);
sendProcessor.onNext(frame);
receiver.onError(t);
}

@Override
protected void hookOnComplete() {
ByteBuf frame = PayloadFrameFlyweight.encodeComplete(allocator, streamId);
sendProcessor.onNext(frame);
@Override
protected void hookFinally(SignalType type) {
senders.remove(streamId, this);
}
};

@Override
void hookOnFirstRequest(long n) {
final int streamId = streamIdSupplier.nextStreamId(receivers);
this.streamId = streamId;

final ByteBuf frame =
RequestChannelFrameFlyweight.encodeReleasingPayload(
allocator, streamId, false, n, initialPayload);

senders.put(streamId, upstreamSubscriber);
receivers.put(streamId, receiver);

inboundFlux
.limitRate(Queues.SMALL_BUFFER_SIZE)
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
.subscribe(upstreamSubscriber);

sendProcessor.onNext(frame);
}

@Override
void hookOnRemainingRequests(long n) {
if (receiver.isDisposed()) {
return;
}

@Override
protected void hookOnError(Throwable t) {
ByteBuf frame = ErrorFrameFlyweight.encode(allocator, streamId, t);
sendProcessor.onNext(frame);
receiver.onError(t);
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
}

@Override
void hookOnCancel() {
senders.remove(streamId, upstreamSubscriber);
if (receivers.remove(streamId, receiver)) {
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
}
}

@Override
protected void hookFinally(SignalType type) {
senders.remove(streamId, this);
@Override
void hookOnTerminal(SignalType signalType) {
if (signalType == SignalType.ON_ERROR) {
upstreamSubscriber.cancel();
}
};

@Override
void hookOnFirstRequest(long n) {
final int streamId = streamIdSupplier.nextStreamId(receivers);
this.streamId = streamId;

final ByteBuf frame =
RequestChannelFrameFlyweight.encodeReleasingPayload(
allocator, streamId, false, n, initialPayload);

senders.put(streamId, upstreamSubscriber);
receivers.put(streamId, receiver);

inboundFlux
.limitRate(Queues.SMALL_BUFFER_SIZE)
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
.subscribe(upstreamSubscriber);

sendProcessor.onNext(frame);
}

@Override
void hookOnRemainingRequests(long n) {
if (receiver.isDisposed()) {
return;
}

sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
}

@Override
void hookOnCancel() {
senders.remove(streamId, upstreamSubscriber);
if (receivers.remove(streamId, receiver)) {
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
}
}

@Override
void hookOnTerminal(SignalType signalType) {
if (signalType == SignalType.ON_ERROR) {
upstreamSubscriber.cancel();
}
receivers.remove(streamId, receiver);
}

@Override
public void cancel() {
upstreamSubscriber.cancel();
super.cancel();
}
}));
receivers.remove(streamId, receiver);
}

@Override
public void cancel() {
upstreamSubscriber.cancel();
super.cancel();
}
}))
.subscribeOn(serialScheduler, false);
}

private Mono<Void> handleMetadataPush(Payload payload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.function.Consumer;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public final class RSocketServer {
private static final String SERVER_TAG = "server";
Expand Down Expand Up @@ -222,7 +223,8 @@ private Mono<Void> acceptSetup(
setupPayload.keepAliveInterval(),
setupPayload.keepAliveMaxLifetime(),
keepAliveHandler,
requesterLeaseHandler);
requesterLeaseHandler,
Schedulers.single(Schedulers.parallel()));

RSocket wrappedRSocketRequester = interceptors.initRequester(rSocketRequester);

Expand Down
Loading