Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/DuplexConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.rsocket;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.nio.channels.ClosedChannelException;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -78,6 +79,13 @@ default Mono<Void> sendOne(ByteBuf frame) {
*/
Flux<ByteBuf> receive();

/**
* Returns the assigned {@link ByteBufAllocator}.
*
* @return the {@link ByteBufAllocator}
*/
ByteBufAllocator alloc();

@Override
default double availability() {
return isDisposed() ? 0.0 : 1.0;
Expand Down
47 changes: 45 additions & 2 deletions rsocket-core/src/main/java/io/rsocket/RSocketFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,29 @@ public ClientRSocketFactory(RSocketConnector connector) {
this.connector = connector;
}

/**
* @deprecated this method is deprecated and deliberately has no effect anymore. Right now, in
* order configure the custom {@link ByteBufAllocator} it is recommended to use the
* following setup for Reactor Netty based transport: <br>
* 1. For Client: <br>
* <pre>{@code
* TcpClient.create()
* ...
* .bootstrap(bootstrap -> bootstrap.option(ChannelOption.ALLOCATOR, clientAllocator))
* }</pre>
* <br>
* 2. For server: <br>
* <pre>{@code
* TcpServer.create()
* ...
* .bootstrap(serverBootstrap -> serverBootstrap.childOption(ChannelOption.ALLOCATOR, serverAllocator))
* }</pre>
* Or in case of local transport, to use corresponding factory method {@code
* LocalClientTransport.creat(String, ByteBufAllocator)}
* @param allocator instance of {@link ByteBufAllocator}
* @return this factory instance
*/
public ClientRSocketFactory byteBufAllocator(ByteBufAllocator allocator) {
connector.byteBufAllocator(allocator);
return this;
}

Expand Down Expand Up @@ -395,8 +416,30 @@ public ServerRSocketFactory(RSocketServer server) {
this.server = server;
}

/**
* @deprecated this method is deprecated and deliberately has no effect anymore. Right now, in
* order configure the custom {@link ByteBufAllocator} it is recommended to use the
* following setup for Reactor Netty based transport: <br>
* 1. For Client: <br>
* <pre>{@code
* TcpClient.create()
* ...
* .bootstrap(bootstrap -> bootstrap.option(ChannelOption.ALLOCATOR, clientAllocator))
* }</pre>
* <br>
* 2. For server: <br>
* <pre>{@code
* TcpServer.create()
* ...
* .bootstrap(serverBootstrap -> serverBootstrap.childOption(ChannelOption.ALLOCATOR, serverAllocator))
* }</pre>
* Or in case of local transport, to use corresponding factory method {@code
* LocalClientTransport.creat(String, ByteBufAllocator)}
* @param allocator instance of {@link ByteBufAllocator}
* @return this factory instance
*/
@Deprecated
public ServerRSocketFactory byteBufAllocator(ByteBufAllocator allocator) {
server.byteBufAllocator(allocator);
return this;
}

Expand Down
19 changes: 2 additions & 17 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
Expand Down Expand Up @@ -71,7 +70,6 @@ public class RSocketConnector {
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;

private Consumer<Throwable> errorConsumer = Throwable::printStackTrace;
private ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;

private RSocketConnector() {}

Expand Down Expand Up @@ -241,16 +239,6 @@ public RSocketConnector errorConsumer(Consumer<Throwable> errorConsumer) {
return this;
}

/**
* @deprecated this is deprecated with no replacement and will be removed after {@link
* io.rsocket.RSocketFactory} is removed.
*/
public RSocketConnector byteBufAllocator(ByteBufAllocator allocator) {
Objects.requireNonNull(allocator);
this.allocator = allocator;
return this;
}

public Mono<RSocket> connect(ClientTransport transport) {
return connect(() -> transport);
}
Expand Down Expand Up @@ -289,7 +277,6 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {

RSocket rSocketRequester =
new RSocketRequester(
allocator,
multiplexer.asClientConnection(),
payloadDecoder,
errorConsumer,
Expand All @@ -304,7 +291,7 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {

ByteBuf setupFrame =
SetupFrameFlyweight.encode(
allocator,
wrappedConnection.alloc(),
leaseEnabled,
(int) keepAliveInterval.toMillis(),
(int) keepAliveMaxLifeTime.toMillis(),
Expand All @@ -326,15 +313,14 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
leaseEnabled
? new ResponderLeaseHandler.Impl<>(
CLIENT_TAG,
allocator,
wrappedConnection.alloc(),
leases.sender(),
errorConsumer,
leases.stats())
: ResponderLeaseHandler.None;

RSocket rSocketResponder =
new RSocketResponder(
allocator,
multiplexer.asServerConnection(),
wrappedRSocketHandler,
payloadDecoder,
Expand Down Expand Up @@ -364,7 +350,6 @@ private ClientRSocketSession createSession(
ClientRSocketSession session =
new ClientRSocketSession(
connection,
allocator,
resume.getSessionDuration(),
resume.getResumeStrategySupplier(),
resume.getStoreFactory(CLIENT_TAG).apply(resumeToken),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ class RSocketRequester implements RSocket {
private volatile Throwable terminationError;

RSocketRequester(
ByteBufAllocator allocator,
DuplexConnection connection,
PayloadDecoder payloadDecoder,
Consumer<Throwable> errorConsumer,
Expand All @@ -118,8 +117,8 @@ class RSocketRequester implements RSocket {
int keepAliveAckTimeout,
@Nullable KeepAliveHandler keepAliveHandler,
RequesterLeaseHandler leaseHandler) {
this.allocator = allocator;
this.connection = connection;
this.allocator = connection.alloc();
this.payloadDecoder = payloadDecoder;
this.errorConsumer = errorConsumer;
this.streamIdSupplier = streamIdSupplier;
Expand All @@ -141,7 +140,7 @@ class RSocketRequester implements RSocket {

if (keepAliveTickPeriod != 0 && keepAliveHandler != null) {
KeepAliveSupport keepAliveSupport =
new ClientKeepAliveSupport(allocator, keepAliveTickPeriod, keepAliveAckTimeout);
new ClientKeepAliveSupport(this.allocator, keepAliveTickPeriod, keepAliveAckTimeout);
this.keepAliveFramesAcceptor =
keepAliveHandler.start(
keepAliveSupport, sendProcessor::onNextPrioritized, this::tryTerminateOnKeepAlive);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,14 @@ class RSocketResponder implements ResponderRSocket {
private final ByteBufAllocator allocator;

RSocketResponder(
ByteBufAllocator allocator,
DuplexConnection connection,
RSocket requestHandler,
PayloadDecoder payloadDecoder,
Consumer<Throwable> errorConsumer,
ResponderLeaseHandler leaseHandler,
int mtu) {
this.allocator = allocator;
this.connection = connection;
this.allocator = connection.alloc();
this.mtu = mtu;

this.requestHandler = requestHandler;
Expand Down
23 changes: 4 additions & 19 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.AbstractRSocket;
import io.rsocket.Closeable;
import io.rsocket.ConnectionSetupPayload;
Expand Down Expand Up @@ -55,7 +54,6 @@ public final class RSocketServer {

private Consumer<Throwable> errorConsumer = Throwable::printStackTrace;
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;
private ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;

private RSocketServer() {}

Expand Down Expand Up @@ -114,17 +112,6 @@ public RSocketServer errorConsumer(Consumer<Throwable> errorConsumer) {
return this;
}

/**
* @deprecated this is deprecated with no replacement and will be removed after {@link
* io.rsocket.RSocketFactory} is removed.
*/
@Deprecated
public RSocketServer byteBufAllocator(ByteBufAllocator allocator) {
Objects.requireNonNull(allocator);
this.allocator = allocator;
return this;
}

public ServerTransport.ConnectionAcceptor asConnectionAcceptor() {
return new ServerTransport.ConnectionAcceptor() {
private final ServerSetup serverSetup = serverSetup();
Expand Down Expand Up @@ -228,7 +215,6 @@ private Mono<Void> acceptSetup(

RSocket rSocketRequester =
new RSocketRequester(
allocator,
wrappedMultiplexer.asServerConnection(),
payloadDecoder,
errorConsumer,
Expand All @@ -252,21 +238,21 @@ private Mono<Void> acceptSetup(
.doOnNext(
rSocketHandler -> {
RSocket wrappedRSocketHandler = interceptors.initResponder(rSocketHandler);
DuplexConnection connection = wrappedMultiplexer.asClientConnection();

ResponderLeaseHandler responderLeaseHandler =
leaseEnabled
? new ResponderLeaseHandler.Impl<>(
SERVER_TAG,
allocator,
connection.alloc(),
leases.sender(),
errorConsumer,
leases.stats())
: ResponderLeaseHandler.None;

RSocket rSocketResponder =
new RSocketResponder(
allocator,
wrappedMultiplexer.asClientConnection(),
connection,
wrappedRSocketHandler,
payloadDecoder,
errorConsumer,
Expand All @@ -279,12 +265,11 @@ private Mono<Void> acceptSetup(
}

private ServerSetup serverSetup() {
return resume != null ? createSetup() : new ServerSetup.DefaultServerSetup(allocator);
return resume != null ? createSetup() : new ServerSetup.DefaultServerSetup();
}

ServerSetup createSetup() {
return new ServerSetup.ResumableServerSetup(
allocator,
new SessionManager(),
resume.getSessionDuration(),
resume.getStreamTimeout(),
Expand Down
23 changes: 4 additions & 19 deletions rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import static io.rsocket.keepalive.KeepAliveHandler.*;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.exceptions.RejectedResumeException;
import io.rsocket.exceptions.UnsupportedSetupException;
import io.rsocket.frame.ErrorFrameFlyweight;
Expand All @@ -35,12 +35,6 @@

abstract class ServerSetup {

final ByteBufAllocator allocator;

public ServerSetup(ByteBufAllocator allocator) {
this.allocator = allocator;
}

abstract Mono<Void> acceptRSocketSetup(
ByteBuf frame,
ClientServerInputMultiplexer multiplexer,
Expand All @@ -51,18 +45,14 @@ abstract Mono<Void> acceptRSocketSetup(
void dispose() {}

Mono<Void> sendError(ClientServerInputMultiplexer multiplexer, Exception exception) {
return multiplexer
.asSetupConnection()
.sendOne(ErrorFrameFlyweight.encode(allocator, 0, exception))
DuplexConnection duplexConnection = multiplexer.asSetupConnection();
return duplexConnection
.sendOne(ErrorFrameFlyweight.encode(duplexConnection.alloc(), 0, exception))
.onErrorResume(err -> Mono.empty());
}

static class DefaultServerSetup extends ServerSetup {

DefaultServerSetup(ByteBufAllocator allocator) {
super(allocator);
}

@Override
public Mono<Void> acceptRSocketSetup(
ByteBuf frame,
Expand Down Expand Up @@ -94,22 +84,18 @@ public Mono<Void> acceptRSocketResume(ByteBuf frame, ClientServerInputMultiplexe
}

static class ResumableServerSetup extends ServerSetup {
private final ByteBufAllocator allocator;
private final SessionManager sessionManager;
private final Duration resumeSessionDuration;
private final Duration resumeStreamTimeout;
private final Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory;
private final boolean cleanupStoreOnKeepAlive;

ResumableServerSetup(
ByteBufAllocator allocator,
SessionManager sessionManager,
Duration resumeSessionDuration,
Duration resumeStreamTimeout,
Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory,
boolean cleanupStoreOnKeepAlive) {
super(allocator);
this.allocator = allocator;
this.sessionManager = sessionManager;
this.resumeSessionDuration = resumeSessionDuration;
this.resumeStreamTimeout = resumeStreamTimeout;
Expand All @@ -131,7 +117,6 @@ public Mono<Void> acceptRSocketSetup(
.save(
new ServerRSocketSession(
multiplexer.asClientServerConnection(),
allocator,
resumeSessionDuration,
resumeStreamTimeout,
resumeStoreFactory,
Expand Down
Loading