diff --git a/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java index b87ed0570..6190d24e3 100644 --- a/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java @@ -17,6 +17,7 @@ package io.rsocket; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import java.nio.channels.ClosedChannelException; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -78,6 +79,13 @@ default Mono sendOne(ByteBuf frame) { */ Flux receive(); + /** + * Returns the assigned {@link ByteBufAllocator}. + * + * @return the {@link ByteBufAllocator} + */ + ByteBufAllocator alloc(); + @Override default double availability() { return isDisposed() ? 0.0 : 1.0; diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java index 0e1ae12cb..43d344b9d 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java @@ -113,8 +113,29 @@ public ClientRSocketFactory(RSocketConnector connector) { this.connector = connector; } + /** + * @deprecated this method is deprecated and deliberately has no effect anymore. Right now, in + * order configure the custom {@link ByteBufAllocator} it is recommended to use the + * following setup for Reactor Netty based transport:
+ * 1. For Client:
+ *
{@code
+     * TcpClient.create()
+     *          ...
+     *          .bootstrap(bootstrap -> bootstrap.option(ChannelOption.ALLOCATOR, clientAllocator))
+     * }
+ *
+ * 2. For server:
+ *
{@code
+     * TcpServer.create()
+     *          ...
+     *          .bootstrap(serverBootstrap -> serverBootstrap.childOption(ChannelOption.ALLOCATOR, serverAllocator))
+     * }
+ * Or in case of local transport, to use corresponding factory method {@code + * LocalClientTransport.creat(String, ByteBufAllocator)} + * @param allocator instance of {@link ByteBufAllocator} + * @return this factory instance + */ public ClientRSocketFactory byteBufAllocator(ByteBufAllocator allocator) { - connector.byteBufAllocator(allocator); return this; } @@ -395,8 +416,30 @@ public ServerRSocketFactory(RSocketServer server) { this.server = server; } + /** + * @deprecated this method is deprecated and deliberately has no effect anymore. Right now, in + * order configure the custom {@link ByteBufAllocator} it is recommended to use the + * following setup for Reactor Netty based transport:
+ * 1. For Client:
+ *
{@code
+     * TcpClient.create()
+     *          ...
+     *          .bootstrap(bootstrap -> bootstrap.option(ChannelOption.ALLOCATOR, clientAllocator))
+     * }
+ *
+ * 2. For server:
+ *
{@code
+     * TcpServer.create()
+     *          ...
+     *          .bootstrap(serverBootstrap -> serverBootstrap.childOption(ChannelOption.ALLOCATOR, serverAllocator))
+     * }
+ * Or in case of local transport, to use corresponding factory method {@code + * LocalClientTransport.creat(String, ByteBufAllocator)} + * @param allocator instance of {@link ByteBufAllocator} + * @return this factory instance + */ + @Deprecated public ServerRSocketFactory byteBufAllocator(ByteBufAllocator allocator) { - server.byteBufAllocator(allocator); return this; } diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java index 23ecb9ba6..dc5be2430 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java @@ -16,7 +16,6 @@ package io.rsocket.core; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.rsocket.AbstractRSocket; import io.rsocket.ConnectionSetupPayload; @@ -71,7 +70,6 @@ public class RSocketConnector { private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT; private Consumer errorConsumer = Throwable::printStackTrace; - private ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; private RSocketConnector() {} @@ -241,16 +239,6 @@ public RSocketConnector errorConsumer(Consumer errorConsumer) { return this; } - /** - * @deprecated this is deprecated with no replacement and will be removed after {@link - * io.rsocket.RSocketFactory} is removed. - */ - public RSocketConnector byteBufAllocator(ByteBufAllocator allocator) { - Objects.requireNonNull(allocator); - this.allocator = allocator; - return this; - } - public Mono connect(ClientTransport transport) { return connect(() -> transport); } @@ -289,7 +277,6 @@ public Mono connect(Supplier transportSupplier) { RSocket rSocketRequester = new RSocketRequester( - allocator, multiplexer.asClientConnection(), payloadDecoder, errorConsumer, @@ -304,7 +291,7 @@ public Mono connect(Supplier transportSupplier) { ByteBuf setupFrame = SetupFrameFlyweight.encode( - allocator, + wrappedConnection.alloc(), leaseEnabled, (int) keepAliveInterval.toMillis(), (int) keepAliveMaxLifeTime.toMillis(), @@ -326,7 +313,7 @@ public Mono connect(Supplier transportSupplier) { leaseEnabled ? new ResponderLeaseHandler.Impl<>( CLIENT_TAG, - allocator, + wrappedConnection.alloc(), leases.sender(), errorConsumer, leases.stats()) @@ -334,7 +321,6 @@ public Mono connect(Supplier transportSupplier) { RSocket rSocketResponder = new RSocketResponder( - allocator, multiplexer.asServerConnection(), wrappedRSocketHandler, payloadDecoder, @@ -364,7 +350,6 @@ private ClientRSocketSession createSession( ClientRSocketSession session = new ClientRSocketSession( connection, - allocator, resume.getSessionDuration(), resume.getResumeStrategySupplier(), resume.getStoreFactory(CLIENT_TAG).apply(resumeToken), diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index 42a6a524d..04c766eec 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -108,7 +108,6 @@ class RSocketRequester implements RSocket { private volatile Throwable terminationError; RSocketRequester( - ByteBufAllocator allocator, DuplexConnection connection, PayloadDecoder payloadDecoder, Consumer errorConsumer, @@ -118,8 +117,8 @@ class RSocketRequester implements RSocket { int keepAliveAckTimeout, @Nullable KeepAliveHandler keepAliveHandler, RequesterLeaseHandler leaseHandler) { - this.allocator = allocator; this.connection = connection; + this.allocator = connection.alloc(); this.payloadDecoder = payloadDecoder; this.errorConsumer = errorConsumer; this.streamIdSupplier = streamIdSupplier; @@ -141,7 +140,7 @@ class RSocketRequester implements RSocket { if (keepAliveTickPeriod != 0 && keepAliveHandler != null) { KeepAliveSupport keepAliveSupport = - new ClientKeepAliveSupport(allocator, keepAliveTickPeriod, keepAliveAckTimeout); + new ClientKeepAliveSupport(this.allocator, keepAliveTickPeriod, keepAliveAckTimeout); this.keepAliveFramesAcceptor = keepAliveHandler.start( keepAliveSupport, sendProcessor::onNextPrioritized, this::tryTerminateOnKeepAlive); diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java index 5aef7eed2..f992b7577 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java @@ -75,15 +75,14 @@ class RSocketResponder implements ResponderRSocket { private final ByteBufAllocator allocator; RSocketResponder( - ByteBufAllocator allocator, DuplexConnection connection, RSocket requestHandler, PayloadDecoder payloadDecoder, Consumer errorConsumer, ResponderLeaseHandler leaseHandler, int mtu) { - this.allocator = allocator; this.connection = connection; + this.allocator = connection.alloc(); this.mtu = mtu; this.requestHandler = requestHandler; diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java index 113b4283f..6c289d707 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java @@ -17,7 +17,6 @@ package io.rsocket.core; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import io.rsocket.AbstractRSocket; import io.rsocket.Closeable; import io.rsocket.ConnectionSetupPayload; @@ -55,7 +54,6 @@ public final class RSocketServer { private Consumer errorConsumer = Throwable::printStackTrace; private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT; - private ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; private RSocketServer() {} @@ -114,17 +112,6 @@ public RSocketServer errorConsumer(Consumer errorConsumer) { return this; } - /** - * @deprecated this is deprecated with no replacement and will be removed after {@link - * io.rsocket.RSocketFactory} is removed. - */ - @Deprecated - public RSocketServer byteBufAllocator(ByteBufAllocator allocator) { - Objects.requireNonNull(allocator); - this.allocator = allocator; - return this; - } - public ServerTransport.ConnectionAcceptor asConnectionAcceptor() { return new ServerTransport.ConnectionAcceptor() { private final ServerSetup serverSetup = serverSetup(); @@ -228,7 +215,6 @@ private Mono acceptSetup( RSocket rSocketRequester = new RSocketRequester( - allocator, wrappedMultiplexer.asServerConnection(), payloadDecoder, errorConsumer, @@ -252,12 +238,13 @@ private Mono acceptSetup( .doOnNext( rSocketHandler -> { RSocket wrappedRSocketHandler = interceptors.initResponder(rSocketHandler); + DuplexConnection connection = wrappedMultiplexer.asClientConnection(); ResponderLeaseHandler responderLeaseHandler = leaseEnabled ? new ResponderLeaseHandler.Impl<>( SERVER_TAG, - allocator, + connection.alloc(), leases.sender(), errorConsumer, leases.stats()) @@ -265,8 +252,7 @@ private Mono acceptSetup( RSocket rSocketResponder = new RSocketResponder( - allocator, - wrappedMultiplexer.asClientConnection(), + connection, wrappedRSocketHandler, payloadDecoder, errorConsumer, @@ -279,12 +265,11 @@ private Mono acceptSetup( } private ServerSetup serverSetup() { - return resume != null ? createSetup() : new ServerSetup.DefaultServerSetup(allocator); + return resume != null ? createSetup() : new ServerSetup.DefaultServerSetup(); } ServerSetup createSetup() { return new ServerSetup.ResumableServerSetup( - allocator, new SessionManager(), resume.getSessionDuration(), resume.getStreamTimeout(), diff --git a/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java b/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java index 16f5f61b1..3e20d3c60 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java +++ b/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java @@ -19,7 +19,7 @@ import static io.rsocket.keepalive.KeepAliveHandler.*; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; +import io.rsocket.DuplexConnection; import io.rsocket.exceptions.RejectedResumeException; import io.rsocket.exceptions.UnsupportedSetupException; import io.rsocket.frame.ErrorFrameFlyweight; @@ -35,12 +35,6 @@ abstract class ServerSetup { - final ByteBufAllocator allocator; - - public ServerSetup(ByteBufAllocator allocator) { - this.allocator = allocator; - } - abstract Mono acceptRSocketSetup( ByteBuf frame, ClientServerInputMultiplexer multiplexer, @@ -51,18 +45,14 @@ abstract Mono acceptRSocketSetup( void dispose() {} Mono sendError(ClientServerInputMultiplexer multiplexer, Exception exception) { - return multiplexer - .asSetupConnection() - .sendOne(ErrorFrameFlyweight.encode(allocator, 0, exception)) + DuplexConnection duplexConnection = multiplexer.asSetupConnection(); + return duplexConnection + .sendOne(ErrorFrameFlyweight.encode(duplexConnection.alloc(), 0, exception)) .onErrorResume(err -> Mono.empty()); } static class DefaultServerSetup extends ServerSetup { - DefaultServerSetup(ByteBufAllocator allocator) { - super(allocator); - } - @Override public Mono acceptRSocketSetup( ByteBuf frame, @@ -94,7 +84,6 @@ public Mono acceptRSocketResume(ByteBuf frame, ClientServerInputMultiplexe } static class ResumableServerSetup extends ServerSetup { - private final ByteBufAllocator allocator; private final SessionManager sessionManager; private final Duration resumeSessionDuration; private final Duration resumeStreamTimeout; @@ -102,14 +91,11 @@ static class ResumableServerSetup extends ServerSetup { private final boolean cleanupStoreOnKeepAlive; ResumableServerSetup( - ByteBufAllocator allocator, SessionManager sessionManager, Duration resumeSessionDuration, Duration resumeStreamTimeout, Function resumeStoreFactory, boolean cleanupStoreOnKeepAlive) { - super(allocator); - this.allocator = allocator; this.sessionManager = sessionManager; this.resumeSessionDuration = resumeSessionDuration; this.resumeStreamTimeout = resumeStreamTimeout; @@ -131,7 +117,6 @@ public Mono acceptRSocketSetup( .save( new ServerRSocketSession( multiplexer.asClientServerConnection(), - allocator, resumeSessionDuration, resumeStreamTimeout, resumeStoreFactory, diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java index 333787e9f..316643e10 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java @@ -19,7 +19,6 @@ import static io.rsocket.fragmentation.FrameFragmenter.fragmentFrame; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; import io.rsocket.DuplexConnection; import io.rsocket.frame.FrameHeaderFlyweight; @@ -46,26 +45,19 @@ public final class FragmentationDuplexConnection extends ReassemblyDuplexConnect private static final Logger logger = LoggerFactory.getLogger(FragmentationDuplexConnection.class); private final DuplexConnection delegate; private final int mtu; - private final ByteBufAllocator allocator; private final FrameReassembler frameReassembler; private final boolean encodeLength; private final String type; public FragmentationDuplexConnection( - DuplexConnection delegate, - ByteBufAllocator allocator, - int mtu, - boolean encodeAndEncodeLength, - String type) { - super(delegate, allocator, encodeAndEncodeLength); + DuplexConnection delegate, int mtu, boolean encodeAndEncodeLength, String type) { + super(delegate, encodeAndEncodeLength); Objects.requireNonNull(delegate, "delegate must not be null"); - Objects.requireNonNull(allocator, "byteBufAllocator must not be null"); this.encodeLength = encodeAndEncodeLength; - this.allocator = allocator; this.delegate = delegate; this.mtu = assertMtu(mtu); - this.frameReassembler = new FrameReassembler(allocator); + this.frameReassembler = new FrameReassembler(delegate.alloc()); this.type = type; delegate.onClose().doFinally(s -> frameReassembler.dispose()).subscribe(); @@ -113,7 +105,7 @@ public Mono sendOne(ByteBuf frame) { if (shouldFragment(frameType, readableBytes)) { if (logger.isDebugEnabled()) { return delegate.send( - Flux.from(fragmentFrame(allocator, mtu, frame, frameType, encodeLength)) + Flux.from(fragmentFrame(alloc(), mtu, frame, frameType, encodeLength)) .doOnNext( byteBuf -> { ByteBuf f = encodeLength ? FrameLengthFlyweight.frame(byteBuf) : byteBuf; @@ -126,7 +118,7 @@ public Mono sendOne(ByteBuf frame) { })); } else { return delegate.send( - Flux.from(fragmentFrame(allocator, mtu, frame, frameType, encodeLength))); + Flux.from(fragmentFrame(alloc(), mtu, frame, frameType, encodeLength))); } } else { return delegate.sendOne(encode(frame)); @@ -135,7 +127,7 @@ public Mono sendOne(ByteBuf frame) { private ByteBuf encode(ByteBuf frame) { if (encodeLength) { - return FrameLengthFlyweight.encode(allocator, frame.readableBytes(), frame); + return FrameLengthFlyweight.encode(alloc(), frame.readableBytes(), frame); } else { return frame; } diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/ReassemblyDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/ReassemblyDuplexConnection.java index bf0d7482c..933755bb2 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/ReassemblyDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/ReassemblyDuplexConnection.java @@ -37,13 +37,11 @@ public class ReassemblyDuplexConnection implements DuplexConnection { private final FrameReassembler frameReassembler; private final boolean decodeLength; - public ReassemblyDuplexConnection( - DuplexConnection delegate, ByteBufAllocator allocator, boolean decodeLength) { + public ReassemblyDuplexConnection(DuplexConnection delegate, boolean decodeLength) { Objects.requireNonNull(delegate, "delegate must not be null"); - Objects.requireNonNull(allocator, "byteBufAllocator must not be null"); this.decodeLength = decodeLength; this.delegate = delegate; - this.frameReassembler = new FrameReassembler(allocator); + this.frameReassembler = new FrameReassembler(delegate.alloc()); delegate.onClose().doFinally(s -> frameReassembler.dispose()).subscribe(); } @@ -77,6 +75,11 @@ public Flux receive() { }); } + @Override + public ByteBufAllocator alloc() { + return delegate.alloc(); + } + @Override public Mono onClose() { return delegate.onClose(); diff --git a/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java b/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java index 68098e279..cf3eeb120 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java @@ -17,6 +17,7 @@ package io.rsocket.internal; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.rsocket.Closeable; import io.rsocket.DuplexConnection; import io.rsocket.frame.FrameHeaderFlyweight; @@ -201,6 +202,11 @@ public Flux receive() { })); } + @Override + public ByteBufAllocator alloc() { + return source.alloc(); + } + @Override public void dispose() { source.dispose(); diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java b/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java index b347642e3..509fb5168 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java @@ -41,13 +41,12 @@ public class ClientRSocketSession implements RSocketSession resumeStrategy, ResumableFramesStore resumableFramesStore, Duration resumeStreamTimeout, boolean cleanupStoreOnKeepAlive) { - this.allocator = allocator; + this.allocator = duplexConnection.alloc(); this.resumableConnection = new ResumableDuplexConnection( "client", diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java index 49401d560..b9c93f4cd 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java @@ -17,6 +17,7 @@ package io.rsocket.resume; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.rsocket.Closeable; import io.rsocket.DuplexConnection; import io.rsocket.frame.FrameHeaderFlyweight; @@ -105,6 +106,11 @@ public ResumableDuplexConnection( reconnect(duplexConnection); } + @Override + public ByteBufAllocator alloc() { + return curConnection.alloc(); + } + public void disconnect() { DuplexConnection c = this.curConnection; if (c != null) { diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java b/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java index 1a0605497..5d55559cc 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java @@ -43,13 +43,12 @@ public class ServerRSocketSession implements RSocketSession { public ServerRSocketSession( DuplexConnection duplexConnection, - ByteBufAllocator allocator, Duration resumeSessionDuration, Duration resumeStreamTimeout, Function resumeStoreFactory, ByteBuf resumeToken, boolean cleanupStoreOnKeepAlive) { - this.allocator = allocator; + this.allocator = duplexConnection.alloc(); this.resumeToken = resumeToken; this.resumableConnection = new ResumableDuplexConnection( diff --git a/rsocket-core/src/main/java/io/rsocket/util/DuplexConnectionProxy.java b/rsocket-core/src/main/java/io/rsocket/util/DuplexConnectionProxy.java index fa19553a7..2f5d1da4b 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/DuplexConnectionProxy.java +++ b/rsocket-core/src/main/java/io/rsocket/util/DuplexConnectionProxy.java @@ -17,6 +17,7 @@ package io.rsocket.util; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.rsocket.DuplexConnection; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -44,6 +45,11 @@ public double availability() { return connection.availability(); } + @Override + public ByteBufAllocator alloc() { + return connection.alloc(); + } + @Override public Mono onClose() { return connection.onClose(); diff --git a/rsocket-core/src/test/java/io/rsocket/core/AbstractSocketRule.java b/rsocket-core/src/test/java/io/rsocket/core/AbstractSocketRule.java index 5a43838c7..20972a0d3 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/AbstractSocketRule.java +++ b/rsocket-core/src/test/java/io/rsocket/core/AbstractSocketRule.java @@ -41,10 +41,10 @@ public Statement apply(final Statement base, Description description) { return new Statement() { @Override public void evaluate() throws Throwable { - connection = new TestDuplexConnection(); + allocator = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); + connection = new TestDuplexConnection(allocator); connectSub = TestSubscriber.create(); errors = new ConcurrentLinkedQueue<>(); - allocator = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); init(); base.evaluate(); } @@ -52,10 +52,10 @@ public void evaluate() throws Throwable { } protected void init() { - socket = newRSocket(allocator); + socket = newRSocket(); } - protected abstract T newRSocket(LeaksTrackingByteBufAllocator allocator); + protected abstract T newRSocket(); public void assertNoConnectionErrors() { if (errors.size() > 1) { diff --git a/rsocket-core/src/test/java/io/rsocket/core/KeepAliveTest.java b/rsocket-core/src/test/java/io/rsocket/core/KeepAliveTest.java index 10725238a..e8f3f4190 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/KeepAliveTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/KeepAliveTest.java @@ -23,6 +23,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.rsocket.RSocket; +import io.rsocket.buffer.LeaksTrackingByteBufAllocator; import io.rsocket.exceptions.ConnectionErrorException; import io.rsocket.frame.FrameHeaderFlyweight; import io.rsocket.frame.FrameType; @@ -52,11 +53,12 @@ public class KeepAliveTest { private ResumableRSocketState resumableRequesterState; static RSocketState requester(int tickPeriod, int timeout) { - TestDuplexConnection connection = new TestDuplexConnection(); + LeaksTrackingByteBufAllocator allocator = + LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); + TestDuplexConnection connection = new TestDuplexConnection(allocator); Errors errors = new Errors(); RSocketRequester rSocket = new RSocketRequester( - ByteBufAllocator.DEFAULT, connection, DefaultPayload::create, errors, @@ -66,11 +68,13 @@ static RSocketState requester(int tickPeriod, int timeout) { timeout, new DefaultKeepAliveHandler(connection), RequesterLeaseHandler.None); - return new RSocketState(rSocket, errors, connection); + return new RSocketState(rSocket, errors, allocator, connection); } static ResumableRSocketState resumableRequester(int tickPeriod, int timeout) { - TestDuplexConnection connection = new TestDuplexConnection(); + LeaksTrackingByteBufAllocator allocator = + LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); + TestDuplexConnection connection = new TestDuplexConnection(allocator); ResumableDuplexConnection resumableConnection = new ResumableDuplexConnection( "test", @@ -82,7 +86,6 @@ static ResumableRSocketState resumableRequester(int tickPeriod, int timeout) { Errors errors = new Errors(); RSocketRequester rSocket = new RSocketRequester( - ByteBufAllocator.DEFAULT, resumableConnection, DefaultPayload::create, errors, @@ -92,7 +95,7 @@ static ResumableRSocketState resumableRequester(int tickPeriod, int timeout) { timeout, new ResumableKeepAliveHandler(resumableConnection), RequesterLeaseHandler.None); - return new ResumableRSocketState(rSocket, errors, connection, resumableConnection); + return new ResumableRSocketState(rSocket, errors, connection, resumableConnection, allocator); } @BeforeEach @@ -194,7 +197,7 @@ void resumableRequesterKeepAlivesAfterReconnect() { resumableRequester(KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TIMEOUT); ResumableDuplexConnection resumableDuplexConnection = rSocketState.resumableDuplexConnection(); resumableDuplexConnection.disconnect(); - TestDuplexConnection newTestConnection = new TestDuplexConnection(); + TestDuplexConnection newTestConnection = new TestDuplexConnection(rSocketState.alloc()); resumableDuplexConnection.reconnect(newTestConnection); resumableDuplexConnection.resume(0, 0, ignored -> Mono.empty()); @@ -244,11 +247,17 @@ static class RSocketState { private final RSocket rSocket; private final Errors errors; private final TestDuplexConnection connection; + private final LeaksTrackingByteBufAllocator allocator; - public RSocketState(RSocket rSocket, Errors errors, TestDuplexConnection connection) { + public RSocketState( + RSocket rSocket, + Errors errors, + LeaksTrackingByteBufAllocator allocator, + TestDuplexConnection connection) { this.rSocket = rSocket; this.errors = errors; this.connection = connection; + this.allocator = allocator; } public TestDuplexConnection connection() { @@ -262,6 +271,10 @@ public RSocket rSocket() { public Errors errors() { return errors; } + + public LeaksTrackingByteBufAllocator alloc() { + return allocator; + } } static class ResumableRSocketState { @@ -269,16 +282,19 @@ static class ResumableRSocketState { private final Errors errors; private final TestDuplexConnection connection; private final ResumableDuplexConnection resumableDuplexConnection; + private final LeaksTrackingByteBufAllocator allocator; public ResumableRSocketState( RSocket rSocket, Errors errors, TestDuplexConnection connection, - ResumableDuplexConnection resumableDuplexConnection) { + ResumableDuplexConnection resumableDuplexConnection, + LeaksTrackingByteBufAllocator allocator) { this.rSocket = rSocket; this.errors = errors; this.connection = connection; this.resumableDuplexConnection = resumableDuplexConnection; + this.allocator = allocator; } public TestDuplexConnection connection() { @@ -296,6 +312,10 @@ public RSocket rSocket() { public Errors errors() { return errors; } + + public LeaksTrackingByteBufAllocator alloc() { + return allocator; + } } static class Errors implements Consumer { diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java index 01ee1eb6d..51f5afc24 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java @@ -26,9 +26,9 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; -import io.netty.buffer.UnpooledByteBufAllocator; import io.rsocket.Payload; import io.rsocket.RSocket; +import io.rsocket.buffer.LeaksTrackingByteBufAllocator; import io.rsocket.exceptions.Exceptions; import io.rsocket.frame.FrameHeaderFlyweight; import io.rsocket.frame.FrameType; @@ -77,10 +77,10 @@ class RSocketLeaseTest { @BeforeEach void setUp() { - connection = new TestDuplexConnection(); PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT; - byteBufAllocator = UnpooledByteBufAllocator.DEFAULT; + byteBufAllocator = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); + connection = new TestDuplexConnection(byteBufAllocator); requesterLeaseHandler = new RequesterLeaseHandler.Impl(TAG, leases -> leaseReceiver = leases); responderLeaseHandler = new ResponderLeaseHandler.Impl<>( @@ -90,7 +90,6 @@ void setUp() { new ClientServerInputMultiplexer(connection, new InitializingInterceptorRegistry(), true); rSocketRequester = new RSocketRequester( - byteBufAllocator, multiplexer.asClientConnection(), payloadDecoder, err -> {}, @@ -110,7 +109,6 @@ void setUp() { rSocketResponder = new RSocketResponder( - byteBufAllocator, multiplexer.asServerConnection(), mockRSocketHandler, payloadDecoder, diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java index 8380290f2..01cf99e26 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java @@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.rsocket.RSocket; +import io.rsocket.buffer.LeaksTrackingByteBufAllocator; import io.rsocket.frame.FrameHeaderFlyweight; import io.rsocket.frame.FrameType; import io.rsocket.frame.decoder.PayloadDecoder; @@ -52,15 +53,16 @@ class RSocketRequesterSubscribersTest { FrameType.REQUEST_STREAM, FrameType.REQUEST_CHANNEL)); + private LeaksTrackingByteBufAllocator allocator; private RSocket rSocketRequester; private TestDuplexConnection connection; @BeforeEach void setUp() { - connection = new TestDuplexConnection(); + allocator = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); + connection = new TestDuplexConnection(allocator); rSocketRequester = new RSocketRequester( - ByteBufAllocator.DEFAULT, connection, PayloadDecoder.DEFAULT, err -> {}, diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java index bc19d8132..e536d2db4 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java @@ -41,7 +41,6 @@ import io.netty.util.ReferenceCounted; import io.rsocket.Payload; import io.rsocket.RSocket; -import io.rsocket.buffer.LeaksTrackingByteBufAllocator; import io.rsocket.exceptions.ApplicationErrorException; import io.rsocket.exceptions.CustomRSocketException; import io.rsocket.exceptions.RejectedSetupException; @@ -741,9 +740,8 @@ public int sendRequestResponse(Publisher response) { public static class ClientSocketRule extends AbstractSocketRule { @Override - protected RSocketRequester newRSocket(LeaksTrackingByteBufAllocator allocator) { + protected RSocketRequester newRSocket() { return new RSocketRequester( - allocator, connection, PayloadDecoder.ZERO_COPY, throwable -> errors.add(throwable), diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java index 78027aa3d..48910b3a2 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java @@ -38,7 +38,6 @@ import io.rsocket.AbstractRSocket; import io.rsocket.Payload; import io.rsocket.RSocket; -import io.rsocket.buffer.LeaksTrackingByteBufAllocator; import io.rsocket.frame.CancelFrameFlyweight; import io.rsocket.frame.ErrorFrameFlyweight; import io.rsocket.frame.FrameHeaderFlyweight; @@ -735,7 +734,7 @@ public Mono requestResponse(Payload payload) { public void setAcceptingSocket(RSocket acceptingSocket) { this.acceptingSocket = acceptingSocket; - connection = new TestDuplexConnection(); + connection = new TestDuplexConnection(alloc()); connectSub = TestSubscriber.create(); errors = new ConcurrentLinkedQueue<>(); this.prefetch = Integer.MAX_VALUE; @@ -744,7 +743,7 @@ public void setAcceptingSocket(RSocket acceptingSocket) { public void setAcceptingSocket(RSocket acceptingSocket, int prefetch) { this.acceptingSocket = acceptingSocket; - connection = new TestDuplexConnection(); + connection = new TestDuplexConnection(alloc()); connectSub = TestSubscriber.create(); errors = new ConcurrentLinkedQueue<>(); this.prefetch = prefetch; @@ -752,9 +751,8 @@ public void setAcceptingSocket(RSocket acceptingSocket, int prefetch) { } @Override - protected RSocketResponder newRSocket(LeaksTrackingByteBufAllocator allocator) { + protected RSocketResponder newRSocket() { return new RSocketResponder( - allocator, connection, acceptingSocket, PayloadDecoder.ZERO_COPY, diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java index 568f8eed6..4a2c43ef8 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java @@ -26,6 +26,7 @@ import io.rsocket.AbstractRSocket; import io.rsocket.Payload; import io.rsocket.RSocket; +import io.rsocket.buffer.LeaksTrackingByteBufAllocator; import io.rsocket.exceptions.ApplicationErrorException; import io.rsocket.exceptions.CustomRSocketException; import io.rsocket.frame.decoder.PayloadDecoder; @@ -415,6 +416,8 @@ public static class SocketRule extends ExternalResource { private ArrayList clientErrors = new ArrayList<>(); private ArrayList serverErrors = new ArrayList<>(); + private LeaksTrackingByteBufAllocator allocator; + @Override public Statement apply(Statement base, Description description) { return new Statement() { @@ -426,14 +429,19 @@ public void evaluate() throws Throwable { }; } + public LeaksTrackingByteBufAllocator alloc() { + return allocator; + } + protected void init() { + allocator = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); serverProcessor = DirectProcessor.create(); clientProcessor = DirectProcessor.create(); LocalDuplexConnection serverConnection = - new LocalDuplexConnection("server", clientProcessor, serverProcessor); + new LocalDuplexConnection("server", allocator, clientProcessor, serverProcessor); LocalDuplexConnection clientConnection = - new LocalDuplexConnection("client", serverProcessor, clientProcessor); + new LocalDuplexConnection("client", allocator, serverProcessor, clientProcessor); requestAcceptor = null != requestAcceptor @@ -468,7 +476,6 @@ public Flux requestChannel(Publisher payloads) { srs = new RSocketResponder( - ByteBufAllocator.DEFAULT, serverConnection, requestAcceptor, PayloadDecoder.DEFAULT, @@ -478,7 +485,6 @@ public Flux requestChannel(Publisher payloads) { crs = new RSocketRequester( - ByteBufAllocator.DEFAULT, clientConnection, PayloadDecoder.DEFAULT, throwable -> clientErrors.add(throwable), diff --git a/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java b/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java index 5b489896b..db72c7775 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java @@ -6,6 +6,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.rsocket.*; +import io.rsocket.buffer.LeaksTrackingByteBufAllocator; import io.rsocket.exceptions.Exceptions; import io.rsocket.exceptions.RejectedSetupException; import io.rsocket.frame.ErrorFrameFlyweight; @@ -47,11 +48,12 @@ void responderRejectSetup() { @Test void requesterStreamsTerminatedOnZeroErrorFrame() { - TestDuplexConnection conn = new TestDuplexConnection(); + LeaksTrackingByteBufAllocator allocator = + LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); + TestDuplexConnection conn = new TestDuplexConnection(allocator); List errors = new ArrayList<>(); RSocketRequester rSocket = new RSocketRequester( - ByteBufAllocator.DEFAULT, conn, DefaultPayload::create, errors::add, @@ -84,10 +86,11 @@ void requesterStreamsTerminatedOnZeroErrorFrame() { @Test void requesterNewStreamsTerminatedAfterZeroErrorFrame() { - TestDuplexConnection conn = new TestDuplexConnection(); + LeaksTrackingByteBufAllocator allocator = + LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); + TestDuplexConnection conn = new TestDuplexConnection(allocator); RSocketRequester rSocket = new RSocketRequester( - ByteBufAllocator.DEFAULT, conn, DefaultPayload::create, err -> {}, @@ -132,7 +135,9 @@ public Mono senderRSocket() { private static class SingleConnectionTransport implements ServerTransport { - private final TestDuplexConnection conn = new TestDuplexConnection(); + private final LeaksTrackingByteBufAllocator allocator = + LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); + private final TestDuplexConnection conn = new TestDuplexConnection(allocator); @Override public Mono start(ConnectionAcceptor acceptor, int mtu) { @@ -150,8 +155,7 @@ public ByteBuf awaitSent() { public void connect() { Payload payload = DefaultPayload.create(DefaultPayload.EMPTY_BUFFER); ByteBuf setup = - SetupFrameFlyweight.encode( - ByteBufAllocator.DEFAULT, false, 0, 42, "mdMime", "dMime", payload); + SetupFrameFlyweight.encode(allocator, false, 0, 42, "mdMime", "dMime", payload); conn.addToReceivedBuffer(setup); } diff --git a/rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationDuplexConnectionTest.java b/rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationDuplexConnectionTest.java index a6918c497..9050eaa90 100644 --- a/rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationDuplexConnectionTest.java +++ b/rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationDuplexConnectionTest.java @@ -24,6 +24,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.rsocket.DuplexConnection; +import io.rsocket.buffer.LeaksTrackingByteBufAllocator; import io.rsocket.frame.*; import java.util.concurrent.ThreadLocalRandom; import org.junit.Assert; @@ -55,16 +56,14 @@ final class FragmentationDuplexConnectionTest { private final ArgumentCaptor> publishers = ArgumentCaptor.forClass(Publisher.class); - private ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; + private LeaksTrackingByteBufAllocator allocator = + LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); @DisplayName("constructor throws IllegalArgumentException with negative maxFragmentLength") @Test void constructorInvalidMaxFragmentSize() { assertThatIllegalArgumentException() - .isThrownBy( - () -> - new FragmentationDuplexConnection( - delegate, allocator, Integer.MIN_VALUE, false, "")) + .isThrownBy(() -> new FragmentationDuplexConnection(delegate, Integer.MIN_VALUE, false, "")) .withMessage("smallest allowed mtu size is 64 bytes, provided: -2147483648"); } @@ -72,23 +71,15 @@ void constructorInvalidMaxFragmentSize() { @Test void constructorMtuLessThanMin() { assertThatIllegalArgumentException() - .isThrownBy(() -> new FragmentationDuplexConnection(delegate, allocator, 2, false, "")) + .isThrownBy(() -> new FragmentationDuplexConnection(delegate, 2, false, "")) .withMessage("smallest allowed mtu size is 64 bytes, provided: 2"); } - @DisplayName("constructor throws NullPointerException with null byteBufAllocator") - @Test - void constructorNullByteBufAllocator() { - assertThatNullPointerException() - .isThrownBy(() -> new FragmentationDuplexConnection(delegate, null, 64, false, "")) - .withMessage("byteBufAllocator must not be null"); - } - @DisplayName("constructor throws NullPointerException with null delegate") @Test void constructorNullDelegate() { assertThatNullPointerException() - .isThrownBy(() -> new FragmentationDuplexConnection(null, allocator, 64, false, "")) + .isThrownBy(() -> new FragmentationDuplexConnection(null, 64, false, "")) .withMessage("delegate must not be null"); } @@ -100,8 +91,9 @@ void sendData() { allocator, 1, false, Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(data)); when(delegate.onClose()).thenReturn(Mono.never()); + when(delegate.alloc()).thenReturn(allocator); - new FragmentationDuplexConnection(delegate, allocator, 64, false, "").sendOne(encode.retain()); + new FragmentationDuplexConnection(delegate, 64, false, "").sendOne(encode.retain()); verify(delegate).send(publishers.capture()); diff --git a/rsocket-core/src/test/java/io/rsocket/fragmentation/ReassembleDuplexConnectionTest.java b/rsocket-core/src/test/java/io/rsocket/fragmentation/ReassembleDuplexConnectionTest.java index c5abce339..013e2ebc2 100644 --- a/rsocket-core/src/test/java/io/rsocket/fragmentation/ReassembleDuplexConnectionTest.java +++ b/rsocket-core/src/test/java/io/rsocket/fragmentation/ReassembleDuplexConnectionTest.java @@ -25,6 +25,7 @@ import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.rsocket.DuplexConnection; +import io.rsocket.buffer.LeaksTrackingByteBufAllocator; import io.rsocket.frame.CancelFrameFlyweight; import io.rsocket.frame.FrameHeaderFlyweight; import io.rsocket.frame.FrameType; @@ -51,7 +52,8 @@ final class ReassembleDuplexConnectionTest { private final DuplexConnection delegate = mock(DuplexConnection.class, RETURNS_SMART_NULLS); - private ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; + private LeaksTrackingByteBufAllocator allocator = + LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); @DisplayName("reassembles data") @Test @@ -82,8 +84,9 @@ void reassembleData() { when(delegate.receive()).thenReturn(Flux.fromIterable(byteBufs)); when(delegate.onClose()).thenReturn(Mono.never()); + when(delegate.alloc()).thenReturn(allocator); - new ReassemblyDuplexConnection(delegate, allocator, false) + new ReassemblyDuplexConnection(delegate, false) .receive() .as(StepVerifier::create) .assertNext( @@ -146,8 +149,9 @@ void reassembleMetadata() { when(delegate.receive()).thenReturn(Flux.fromIterable(byteBufs)); when(delegate.onClose()).thenReturn(Mono.never()); + when(delegate.alloc()).thenReturn(allocator); - new ReassemblyDuplexConnection(delegate, allocator, false) + new ReassemblyDuplexConnection(delegate, false) .receive() .as(StepVerifier::create) .assertNext( @@ -213,8 +217,9 @@ void reassembleMetadataAndData() { when(delegate.receive()).thenReturn(Flux.fromIterable(byteBufs)); when(delegate.onClose()).thenReturn(Mono.never()); + when(delegate.alloc()).thenReturn(allocator); - new ReassemblyDuplexConnection(delegate, allocator, false) + new ReassemblyDuplexConnection(delegate, false) .receive() .as(StepVerifier::create) .assertNext( @@ -234,8 +239,9 @@ void reassembleNonFragment() { when(delegate.receive()).thenReturn(Flux.just(encode)); when(delegate.onClose()).thenReturn(Mono.never()); + when(delegate.alloc()).thenReturn(allocator); - new ReassemblyDuplexConnection(delegate, allocator, false) + new ReassemblyDuplexConnection(delegate, false) .receive() .as(StepVerifier::create) .assertNext( @@ -253,8 +259,9 @@ void reassembleNonFragmentableFrame() { when(delegate.receive()).thenReturn(Flux.just(encode)); when(delegate.onClose()).thenReturn(Mono.never()); + when(delegate.alloc()).thenReturn(allocator); - new ReassemblyDuplexConnection(delegate, allocator, false) + new ReassemblyDuplexConnection(delegate, false) .receive() .as(StepVerifier::create) .assertNext( diff --git a/rsocket-core/src/test/java/io/rsocket/internal/ClientServerInputMultiplexerTest.java b/rsocket-core/src/test/java/io/rsocket/internal/ClientServerInputMultiplexerTest.java index 9a4b5d2db..efa962c48 100644 --- a/rsocket-core/src/test/java/io/rsocket/internal/ClientServerInputMultiplexerTest.java +++ b/rsocket-core/src/test/java/io/rsocket/internal/ClientServerInputMultiplexerTest.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; +import io.rsocket.buffer.LeaksTrackingByteBufAllocator; import io.rsocket.frame.*; import io.rsocket.plugins.InitializingInterceptorRegistry; import io.rsocket.test.util.TestDuplexConnection; @@ -32,12 +33,13 @@ public class ClientServerInputMultiplexerTest { private TestDuplexConnection source; private ClientServerInputMultiplexer clientMultiplexer; - private ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; + private LeaksTrackingByteBufAllocator allocator = + LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); private ClientServerInputMultiplexer serverMultiplexer; @Before public void setup() { - source = new TestDuplexConnection(); + source = new TestDuplexConnection(allocator); clientMultiplexer = new ClientServerInputMultiplexer(source, new InitializingInterceptorRegistry(), true); serverMultiplexer = diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java b/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java index d945dd45d..58323c066 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java @@ -17,6 +17,7 @@ package io.rsocket.test.util; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.rsocket.DuplexConnection; import org.reactivestreams.Publisher; import reactor.core.publisher.DirectProcessor; @@ -25,17 +26,22 @@ import reactor.core.publisher.MonoProcessor; public class LocalDuplexConnection implements DuplexConnection { + private final ByteBufAllocator allocator; private final DirectProcessor send; private final DirectProcessor receive; private final MonoProcessor onClose; private final String name; public LocalDuplexConnection( - String name, DirectProcessor send, DirectProcessor receive) { + String name, + ByteBufAllocator allocator, + DirectProcessor send, + DirectProcessor receive) { this.name = name; + this.allocator = allocator; this.send = send; this.receive = receive; - onClose = MonoProcessor.create(); + this.onClose = MonoProcessor.create(); } @Override @@ -52,6 +58,11 @@ public Flux receive() { return receive.doOnNext(f -> System.out.println(name + " - " + f.toString())); } + @Override + public ByteBufAllocator alloc() { + return allocator; + } + @Override public void dispose() { onClose.onComplete(); diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/TestClientTransport.java b/rsocket-core/src/test/java/io/rsocket/test/util/TestClientTransport.java index 37ad8ee5b..a30e75875 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/TestClientTransport.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/TestClientTransport.java @@ -1,12 +1,15 @@ package io.rsocket.test.util; +import io.netty.buffer.ByteBufAllocator; import io.rsocket.DuplexConnection; +import io.rsocket.buffer.LeaksTrackingByteBufAllocator; import io.rsocket.transport.ClientTransport; import reactor.core.publisher.Mono; public class TestClientTransport implements ClientTransport { - - private final TestDuplexConnection testDuplexConnection = new TestDuplexConnection(); + private final LeaksTrackingByteBufAllocator allocator = + LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); + private final TestDuplexConnection testDuplexConnection = new TestDuplexConnection(allocator); @Override public Mono connect(int mtu) { @@ -16,4 +19,8 @@ public Mono connect(int mtu) { public TestDuplexConnection testConnection() { return testDuplexConnection; } + + public LeaksTrackingByteBufAllocator alloc() { + return allocator; + } } diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java b/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java index 6298b0c3a..17a19b8c9 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java @@ -17,6 +17,7 @@ package io.rsocket.test.util; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.rsocket.DuplexConnection; import java.util.Collection; import java.util.concurrent.ConcurrentLinkedQueue; @@ -46,17 +47,19 @@ public class TestDuplexConnection implements DuplexConnection { private final FluxSink receivedSink; private final MonoProcessor onClose; private final ConcurrentLinkedQueue> sendSubscribers; + private final ByteBufAllocator allocator; private volatile double availability = 1; private volatile int initialSendRequestN = Integer.MAX_VALUE; - public TestDuplexConnection() { - sent = new LinkedBlockingQueue<>(); - received = DirectProcessor.create(); - receivedSink = received.sink(); - sentPublisher = DirectProcessor.create(); - sendSink = sentPublisher.sink(); - sendSubscribers = new ConcurrentLinkedQueue<>(); - onClose = MonoProcessor.create(); + public TestDuplexConnection(ByteBufAllocator allocator) { + this.allocator = allocator; + this.sent = new LinkedBlockingQueue<>(); + this.received = DirectProcessor.create(); + this.receivedSink = received.sink(); + this.sentPublisher = DirectProcessor.create(); + this.sendSink = sentPublisher.sink(); + this.sendSubscribers = new ConcurrentLinkedQueue<>(); + this.onClose = MonoProcessor.create(); } @Override @@ -83,6 +86,11 @@ public Flux receive() { return received; } + @Override + public ByteBufAllocator alloc() { + return allocator; + } + @Override public double availability() { return availability; diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/TestServerTransport.java b/rsocket-core/src/test/java/io/rsocket/test/util/TestServerTransport.java index 5cebf0da1..325496148 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/TestServerTransport.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/TestServerTransport.java @@ -1,12 +1,16 @@ package io.rsocket.test.util; +import io.netty.buffer.ByteBufAllocator; import io.rsocket.Closeable; +import io.rsocket.buffer.LeaksTrackingByteBufAllocator; import io.rsocket.transport.ServerTransport; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; public class TestServerTransport implements ServerTransport { private final MonoProcessor conn = MonoProcessor.create(); + private final LeaksTrackingByteBufAllocator allocator = + LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); @Override public Mono start(ConnectionAcceptor acceptor, int mtu) { @@ -39,8 +43,12 @@ private void disposeConnection() { } public TestDuplexConnection connect() { - TestDuplexConnection c = new TestDuplexConnection(); + TestDuplexConnection c = new TestDuplexConnection(allocator); conn.onNext(c); return c; } + + public LeaksTrackingByteBufAllocator alloc() { + return allocator; + } } diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketHeadersSample.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketHeadersSample.java index fd2dcbbd6..24f029845 100644 --- a/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketHeadersSample.java +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketHeadersSample.java @@ -16,7 +16,6 @@ package io.rsocket.examples.transport.ws; -import io.netty.buffer.ByteBufAllocator; import io.netty.handler.codec.http.HttpResponseStatus; import io.rsocket.AbstractRSocket; import io.rsocket.ConnectionSetupPayload; @@ -65,9 +64,7 @@ public static void main(String[] args) { if (in.headers().containsValue("Authorization", "test", true)) { DuplexConnection connection = new ReassemblyDuplexConnection( - new WebsocketDuplexConnection((Connection) in), - ByteBufAllocator.DEFAULT, - false); + new WebsocketDuplexConnection((Connection) in), false); return acceptor.apply(connection).then(out.neverComplete()); } diff --git a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java index 20d58dcb7..9904c2b24 100644 --- a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java +++ b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java @@ -20,6 +20,7 @@ import io.micrometer.core.instrument.*; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.rsocket.DuplexConnection; import io.rsocket.frame.FrameHeaderFlyweight; import io.rsocket.frame.FrameType; @@ -82,6 +83,11 @@ final class MicrometerDuplexConnection implements DuplexConnection { this.frameCounters = new FrameCounters(connectionType, meterRegistry, tags); } + @Override + public ByteBufAllocator alloc() { + return delegate.alloc(); + } + @Override public void dispose() { delegate.dispose(); diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java index 0d6a10391..d69bd65e8 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java @@ -37,21 +37,39 @@ public final class LocalClientTransport implements ClientTransport { private final String name; - private LocalClientTransport(String name) { + private final ByteBufAllocator allocator; + + private LocalClientTransport(String name, ByteBufAllocator allocator) { this.name = name; + this.allocator = allocator; } /** * Creates a new instance. * - * @param name the name of the {@link ServerTransport} instance to connect to + * @param name the name of the {@link ClientTransport} instance to connect to * @return a new instance * @throws NullPointerException if {@code name} is {@code null} */ public static LocalClientTransport create(String name) { Objects.requireNonNull(name, "name must not be null"); - return new LocalClientTransport(name); + return create(name, ByteBufAllocator.DEFAULT); + } + + /** + * Creates a new instance. + * + * @param name the name of the {@link ClientTransport} instance to connect to + * @param allocator the allocator used by {@link ClientTransport} instance + * @return a new instance + * @throws NullPointerException if {@code name} is {@code null} + */ + public static LocalClientTransport create(String name, ByteBufAllocator allocator) { + Objects.requireNonNull(name, "name must not be null"); + Objects.requireNonNull(allocator, "allocator must not be null"); + + return new LocalClientTransport(name, allocator); } private Mono connect() { @@ -66,9 +84,10 @@ private Mono connect() { UnboundedProcessor out = new UnboundedProcessor<>(); MonoProcessor closeNotifier = MonoProcessor.create(); - server.accept(new LocalDuplexConnection(out, in, closeNotifier)); + server.accept(new LocalDuplexConnection(allocator, out, in, closeNotifier)); - return Mono.just((DuplexConnection) new LocalDuplexConnection(in, out, closeNotifier)); + return Mono.just( + (DuplexConnection) new LocalDuplexConnection(allocator, in, out, closeNotifier)); }); } @@ -80,11 +99,9 @@ public Mono connect(int mtu) { return connect.map( duplexConnection -> { if (mtu > 0) { - return new FragmentationDuplexConnection( - duplexConnection, ByteBufAllocator.DEFAULT, mtu, false, "client"); + return new FragmentationDuplexConnection(duplexConnection, mtu, false, "client"); } else { - return new ReassemblyDuplexConnection( - duplexConnection, ByteBufAllocator.DEFAULT, false); + return new ReassemblyDuplexConnection(duplexConnection, false); } }); } diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java index f9501717c..afaa14f95 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java @@ -17,6 +17,7 @@ package io.rsocket.transport.local; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.rsocket.DuplexConnection; import java.util.Objects; import org.reactivestreams.Publisher; @@ -28,6 +29,7 @@ /** An implementation of {@link DuplexConnection} that connects inside the same JVM. */ final class LocalDuplexConnection implements DuplexConnection { + private final ByteBufAllocator allocator; private final Flux in; private final MonoProcessor onClose; @@ -42,7 +44,12 @@ final class LocalDuplexConnection implements DuplexConnection { * @param onClose the closing notifier * @throws NullPointerException if {@code in}, {@code out}, or {@code onClose} are {@code null} */ - LocalDuplexConnection(Flux in, Subscriber out, MonoProcessor onClose) { + LocalDuplexConnection( + ByteBufAllocator allocator, + Flux in, + Subscriber out, + MonoProcessor onClose) { + this.allocator = Objects.requireNonNull(allocator, "allocator must not be null"); this.in = Objects.requireNonNull(in, "in must not be null"); this.out = Objects.requireNonNull(out, "out must not be null"); this.onClose = Objects.requireNonNull(onClose, "onClose must not be null"); @@ -82,4 +89,9 @@ public Mono sendOne(ByteBuf frame) { out.onNext(frame); return Mono.empty(); } + + @Override + public ByteBufAllocator alloc() { + return allocator; + } } diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalServerTransport.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalServerTransport.java index 329b4e38c..382b4533a 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalServerTransport.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalServerTransport.java @@ -16,7 +16,6 @@ package io.rsocket.transport.local; -import io.netty.buffer.ByteBufAllocator; import io.rsocket.Closeable; import io.rsocket.DuplexConnection; import io.rsocket.fragmentation.FragmentationDuplexConnection; @@ -167,11 +166,9 @@ public void accept(DuplexConnection duplexConnection) { if (mtu > 0) { duplexConnection = - new FragmentationDuplexConnection( - duplexConnection, ByteBufAllocator.DEFAULT, mtu, false, "server"); + new FragmentationDuplexConnection(duplexConnection, mtu, false, "server"); } else { - duplexConnection = - new ReassemblyDuplexConnection(duplexConnection, ByteBufAllocator.DEFAULT, false); + duplexConnection = new ReassemblyDuplexConnection(duplexConnection, false); } acceptor.apply(duplexConnection).subscribe(); diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java index c9c29f0a9..aa28ab8b9 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java @@ -62,6 +62,11 @@ public TcpDuplexConnection(Connection connection, boolean encodeLength) { }); } + @Override + public ByteBufAllocator alloc() { + return connection.channel().alloc(); + } + @Override protected void doOnClose() { if (!connection.isDisposed()) { diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java index ead297928..0183ef19d 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java @@ -16,6 +16,7 @@ package io.rsocket.transport.netty; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.rsocket.DuplexConnection; import io.rsocket.internal.BaseDuplexConnection; @@ -53,6 +54,11 @@ public WebsocketDuplexConnection(Connection connection) { }); } + @Override + public ByteBufAllocator alloc() { + return connection.channel().alloc(); + } + @Override protected void doOnClose() { if (!connection.isDisposed()) { diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java index 8059b36bd..8be019f1c 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java @@ -16,7 +16,6 @@ package io.rsocket.transport.netty.client; -import io.netty.buffer.ByteBufAllocator; import io.rsocket.DuplexConnection; import io.rsocket.fragmentation.FragmentationDuplexConnection; import io.rsocket.fragmentation.ReassemblyDuplexConnection; @@ -105,14 +104,9 @@ public Mono connect(int mtu) { c -> { if (mtu > 0) { return new FragmentationDuplexConnection( - new TcpDuplexConnection(c, false), - ByteBufAllocator.DEFAULT, - mtu, - true, - "client"); + new TcpDuplexConnection(c, false), mtu, true, "client"); } else { - return new ReassemblyDuplexConnection( - new TcpDuplexConnection(c), ByteBufAllocator.DEFAULT, false); + return new ReassemblyDuplexConnection(new TcpDuplexConnection(c), false); } }); } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java index 49a2c2e92..b19621d46 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java @@ -20,7 +20,6 @@ import static io.rsocket.transport.netty.UriUtils.getPort; import static io.rsocket.transport.netty.UriUtils.isSecure; -import io.netty.buffer.ByteBufAllocator; import io.rsocket.DuplexConnection; import io.rsocket.fragmentation.FragmentationDuplexConnection; import io.rsocket.fragmentation.ReassemblyDuplexConnection; @@ -164,11 +163,9 @@ public Mono connect(int mtu) { DuplexConnection connection = new WebsocketDuplexConnection(c); if (mtu > 0) { connection = - new FragmentationDuplexConnection( - connection, ByteBufAllocator.DEFAULT, mtu, false, "client"); + new FragmentationDuplexConnection(connection, mtu, false, "client"); } else { - connection = - new ReassemblyDuplexConnection(connection, ByteBufAllocator.DEFAULT, false); + connection = new ReassemblyDuplexConnection(connection, false); } return connection; }); diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java index d39cc8e67..56dd59d45 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java @@ -16,7 +16,6 @@ package io.rsocket.transport.netty.server; -import io.netty.buffer.ByteBufAllocator; import io.rsocket.DuplexConnection; import io.rsocket.fragmentation.FragmentationDuplexConnection; import io.rsocket.fragmentation.ReassemblyDuplexConnection; @@ -106,15 +105,9 @@ public Mono start(ConnectionAcceptor acceptor, int mtu) { if (mtu > 0) { connection = new FragmentationDuplexConnection( - new TcpDuplexConnection(c, false), - ByteBufAllocator.DEFAULT, - mtu, - true, - "server"); + new TcpDuplexConnection(c, false), mtu, true, "server"); } else { - connection = - new ReassemblyDuplexConnection( - new TcpDuplexConnection(c), ByteBufAllocator.DEFAULT, false); + connection = new ReassemblyDuplexConnection(new TcpDuplexConnection(c), false); } acceptor .apply(connection) diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java index 1d8769cc6..83cb010b7 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java @@ -18,7 +18,6 @@ import static io.rsocket.frame.FrameLengthFlyweight.FRAME_LENGTH_MASK; -import io.netty.buffer.ByteBufAllocator; import io.rsocket.Closeable; import io.rsocket.DuplexConnection; import io.rsocket.fragmentation.FragmentationDuplexConnection; @@ -105,11 +104,9 @@ public static BiFunction> n return (in, out) -> { DuplexConnection connection = new WebsocketDuplexConnection((Connection) in); if (mtu > 0) { - connection = - new FragmentationDuplexConnection( - connection, ByteBufAllocator.DEFAULT, mtu, false, "server"); + connection = new FragmentationDuplexConnection(connection, mtu, false, "server"); } else { - connection = new ReassemblyDuplexConnection(connection, ByteBufAllocator.DEFAULT, false); + connection = new ReassemblyDuplexConnection(connection, false); } return acceptor.apply(connection).then(out.neverComplete()); }; diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java index 01c519ea3..4a0331c08 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java @@ -18,7 +18,6 @@ import static io.rsocket.frame.FrameLengthFlyweight.FRAME_LENGTH_MASK; -import io.netty.buffer.ByteBufAllocator; import io.rsocket.DuplexConnection; import io.rsocket.fragmentation.FragmentationDuplexConnection; import io.rsocket.fragmentation.ReassemblyDuplexConnection; @@ -129,12 +128,9 @@ public Mono start(ConnectionAcceptor acceptor, int mtu) { new WebsocketDuplexConnection((Connection) in); if (mtu > 0) { connection = - new FragmentationDuplexConnection( - connection, ByteBufAllocator.DEFAULT, mtu, false, "server"); + new FragmentationDuplexConnection(connection, mtu, false, "server"); } else { - connection = - new ReassemblyDuplexConnection( - connection, ByteBufAllocator.DEFAULT, false); + connection = new ReassemblyDuplexConnection(connection, false); } return acceptor.apply(connection).then(out.neverComplete()); },