From d96e567f20d5890ee47646179f58d037054304fe Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 28 Apr 2020 11:32:45 +0300 Subject: [PATCH] removes TupleByteBufs Signed-off-by: Oleh Dokuka --- .../rsocket/buffer/AbstractTupleByteBuf.java | 607 ------------------ .../java/io/rsocket/buffer/BufferUtil.java | 78 --- .../java/io/rsocket/buffer/Tuple2ByteBuf.java | 394 ------------ .../java/io/rsocket/buffer/Tuple3ByteBuf.java | 571 ---------------- .../java/io/rsocket/buffer/TupleByteBuf.java | 35 - .../io/rsocket/buffer/Tuple3ByteBufTest.java | 98 --- 6 files changed, 1783 deletions(-) delete mode 100644 rsocket-core/src/main/java/io/rsocket/buffer/AbstractTupleByteBuf.java delete mode 100644 rsocket-core/src/main/java/io/rsocket/buffer/BufferUtil.java delete mode 100644 rsocket-core/src/main/java/io/rsocket/buffer/Tuple2ByteBuf.java delete mode 100644 rsocket-core/src/main/java/io/rsocket/buffer/Tuple3ByteBuf.java delete mode 100644 rsocket-core/src/main/java/io/rsocket/buffer/TupleByteBuf.java delete mode 100644 rsocket-core/src/test/java/io/rsocket/buffer/Tuple3ByteBufTest.java diff --git a/rsocket-core/src/main/java/io/rsocket/buffer/AbstractTupleByteBuf.java b/rsocket-core/src/main/java/io/rsocket/buffer/AbstractTupleByteBuf.java deleted file mode 100644 index a80605877..000000000 --- a/rsocket-core/src/main/java/io/rsocket/buffer/AbstractTupleByteBuf.java +++ /dev/null @@ -1,607 +0,0 @@ -package io.rsocket.buffer; - -import io.netty.buffer.AbstractReferenceCountedByteBuf; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.Unpooled; -import io.netty.util.internal.SystemPropertyUtil; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.channels.FileChannel; -import java.nio.channels.ScatteringByteChannel; -import java.nio.charset.Charset; - -abstract class AbstractTupleByteBuf extends AbstractReferenceCountedByteBuf { - static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT = - SystemPropertyUtil.getInt("io.netty.allocator.directMemoryCacheAlignment", 0); - static final ByteBuffer EMPTY_NIO_BUFFER = Unpooled.EMPTY_BUFFER.nioBuffer(); - static final int NOT_ENOUGH_BYTES_AT_MAX_CAPACITY_CODE = 3; - - final ByteBufAllocator allocator; - final int capacity; - - AbstractTupleByteBuf(ByteBufAllocator allocator, int capacity) { - super(Integer.MAX_VALUE); - - this.capacity = capacity; - this.allocator = allocator; - super.writerIndex(capacity); - } - - abstract long calculateRelativeIndex(int index); - - abstract ByteBuf getPart(int index); - - @Override - public ByteBuffer nioBuffer(int index, int length) { - checkIndex(index, length); - - ByteBuffer[] buffers = nioBuffers(index, length); - - if (buffers.length == 1) { - return buffers[0].duplicate(); - } - - ByteBuffer merged = - BufferUtil.allocateDirectAligned(length, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT) - .order(order()); - for (ByteBuffer buf : buffers) { - merged.put(buf); - } - - merged.flip(); - return merged; - } - - @Override - public ByteBuffer[] nioBuffers(int index, int length) { - checkIndex(index, length); - if (length == 0) { - return new ByteBuffer[] {EMPTY_NIO_BUFFER}; - } - return _nioBuffers(index, length); - } - - protected abstract ByteBuffer[] _nioBuffers(int index, int length); - - @Override - protected byte _getByte(final int index) { - long ri = calculateRelativeIndex(index); - ByteBuf byteBuf = getPart(index); - - int calculatedIndex = (int) (ri & Integer.MAX_VALUE); - - return byteBuf.getByte(calculatedIndex); - } - - @Override - protected short _getShort(final int index) { - long ri = calculateRelativeIndex(index); - ByteBuf byteBuf = getPart(index); - - final int calculatedIndex = (int) (ri & Integer.MAX_VALUE); - - if (calculatedIndex + Short.BYTES <= byteBuf.writerIndex()) { - return byteBuf.getShort(calculatedIndex); - } else if (order() == ByteOrder.BIG_ENDIAN) { - return (short) ((_getByte(index) & 0xff) << 8 | _getByte(index + 1) & 0xff); - } else { - return (short) (_getByte(index) & 0xff | (_getByte(index + 1) & 0xff) << 8); - } - } - - @Override - protected short _getShortLE(int index) { - long ri = calculateRelativeIndex(index); - ByteBuf byteBuf = getPart(index); - - final int calculatedIndex = (int) (ri & Integer.MAX_VALUE); - - if (calculatedIndex + Short.BYTES <= byteBuf.writerIndex()) { - return byteBuf.getShortLE(calculatedIndex); - } else if (order() == ByteOrder.BIG_ENDIAN) { - return (short) (_getByte(index) & 0xff | (_getByte(index + 1) & 0xff) << 8); - } else { - return (short) ((_getByte(index) & 0xff) << 8 | _getByte(index + 1) & 0xff); - } - } - - @Override - protected int _getUnsignedMedium(final int index) { - long ri = calculateRelativeIndex(index); - ByteBuf byteBuf = getPart(index); - - int calculatedIndex = (int) (ri & Integer.MAX_VALUE); - - if (calculatedIndex + 3 <= byteBuf.writerIndex()) { - return byteBuf.getUnsignedMedium(calculatedIndex); - } else if (order() == ByteOrder.BIG_ENDIAN) { - return (_getShort(index) & 0xffff) << 8 | _getByte(index + 2) & 0xff; - } else { - return _getShort(index) & 0xFFFF | (_getByte(index + 2) & 0xFF) << 16; - } - } - - @Override - protected int _getUnsignedMediumLE(int index) { - long ri = calculateRelativeIndex(index); - ByteBuf byteBuf = getPart(index); - - int calculatedIndex = (int) (ri & Integer.MAX_VALUE); - - if (calculatedIndex + 3 <= byteBuf.writerIndex()) { - return byteBuf.getUnsignedMediumLE(calculatedIndex); - } else if (order() == ByteOrder.BIG_ENDIAN) { - return _getShortLE(index) & 0xffff | (_getByte(index + 2) & 0xff) << 16; - } else { - return (_getShortLE(index) & 0xffff) << 8 | _getByte(index + 2) & 0xff; - } - } - - @Override - protected int _getInt(final int index) { - long ri = calculateRelativeIndex(index); - ByteBuf byteBuf = getPart(index); - - int calculatedIndex = (int) (ri & Integer.MAX_VALUE); - - if (calculatedIndex + Integer.BYTES <= byteBuf.writerIndex()) { - return byteBuf.getInt(calculatedIndex); - } else if (order() == ByteOrder.BIG_ENDIAN) { - return (_getShort(index) & 0xffff) << 16 | _getShort(index + 2) & 0xffff; - } else { - return _getShort(index) & 0xFFFF | (_getShort(index + 2) & 0xFFFF) << 16; - } - } - - @Override - protected int _getIntLE(final int index) { - long ri = calculateRelativeIndex(index); - ByteBuf byteBuf = getPart(index); - - int calculatedIndex = (int) (ri & Integer.MAX_VALUE); - - if (calculatedIndex + Integer.BYTES <= byteBuf.writerIndex()) { - return byteBuf.getIntLE(calculatedIndex); - } else if (order() == ByteOrder.BIG_ENDIAN) { - return _getShortLE(index) & 0xffff | (_getShortLE(index + 2) & 0xffff) << 16; - } else { - return (_getShortLE(index) & 0xffff) << 16 | _getShortLE(index + 2) & 0xffff; - } - } - - @Override - protected long _getLong(final int index) { - long ri = calculateRelativeIndex(index); - ByteBuf byteBuf = getPart(index); - - int calculatedIndex = (int) (ri & Integer.MAX_VALUE); - - if (calculatedIndex + Long.BYTES <= byteBuf.writerIndex()) { - return byteBuf.getLong(calculatedIndex); - } else if (order() == ByteOrder.BIG_ENDIAN) { - return (_getInt(index) & 0xffffffffL) << 32 | _getInt(index + 4) & 0xffffffffL; - } else { - return _getInt(index) & 0xFFFFFFFFL | (_getInt(index + 4) & 0xFFFFFFFFL) << 32; - } - } - - @Override - protected long _getLongLE(final int index) { - long ri = calculateRelativeIndex(index); - ByteBuf byteBuf = getPart(index); - - int calculatedIndex = (int) (ri & Integer.MAX_VALUE); - - if (calculatedIndex + Long.BYTES <= byteBuf.writerIndex()) { - return byteBuf.getLongLE(calculatedIndex); - } else if (order() == ByteOrder.BIG_ENDIAN) { - return (_getInt(index) & 0xffffffffL) << 32 | _getInt(index + 4) & 0xffffffffL; - } else { - return _getInt(index) & 0xFFFFFFFFL | (_getInt(index + 4) & 0xFFFFFFFFL) << 32; - } - } - - @Override - public ByteBufAllocator alloc() { - return allocator; - } - - @Override - public int capacity() { - return capacity; - } - - @Override - public ByteBuf capacity(int newCapacity) { - throw new UnsupportedOperationException(); - } - - @Override - public int maxCapacity() { - return capacity; - } - - @Override - public ByteOrder order() { - return ByteOrder.LITTLE_ENDIAN; - } - - @Override - public ByteBuf order(ByteOrder endianness) { - return this; - } - - @Override - public ByteBuf unwrap() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isReadOnly() { - return true; - } - - @Override - public ByteBuf asReadOnly() { - return this; - } - - @Override - public boolean isWritable() { - return false; - } - - @Override - public boolean isWritable(int size) { - return false; - } - - @Override - public ByteBuf writerIndex(int writerIndex) { - return this; - } - - @Override - public final int writerIndex() { - return capacity; - } - - @Override - public ByteBuf setIndex(int readerIndex, int writerIndex) { - return this; - } - - @Override - public ByteBuf clear() { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf discardReadBytes() { - return this; - } - - @Override - public ByteBuf discardSomeReadBytes() { - return this; - } - - @Override - public ByteBuf ensureWritable(int minWritableBytes) { - return this; - } - - @Override - public int ensureWritable(int minWritableBytes, boolean force) { - return NOT_ENOUGH_BYTES_AT_MAX_CAPACITY_CODE; - } - - @Override - public ByteBuf setFloatLE(int index, float value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setDoubleLE(int index, double value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setBoolean(int index, boolean value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setByte(int index, int value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setShort(int index, int value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setShortLE(int index, int value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setMedium(int index, int value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setMediumLE(int index, int value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setInt(int index, int value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setIntLE(int index, int value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setLong(int index, long value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setLongLE(int index, long value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setChar(int index, int value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setFloat(int index, float value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setDouble(int index, double value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setBytes(int index, ByteBuf src) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setBytes(int index, ByteBuf src, int length) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setBytes(int index, byte[] src) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setBytes(int index, ByteBuffer src) { - throw new UnsupportedOperationException(); - } - - @Override - public int setBytes(int index, InputStream in, int length) { - throw new UnsupportedOperationException(); - } - - @Override - public int setBytes(int index, ScatteringByteChannel in, int length) { - throw new UnsupportedOperationException(); - } - - @Override - public int setBytes(int index, FileChannel in, long position, int length) { - throw new UnsupportedOperationException(); - } - - @Override - public int setCharSequence(int index, CharSequence sequence, Charset charset) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf setZero(int index, int length) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf writeBoolean(boolean value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf writeByte(int value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf writeShort(int value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf writeShortLE(int value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf writeMedium(int value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf writeMediumLE(int value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf writeInt(int value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf writeIntLE(int value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf writeLong(long value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf writeLongLE(long value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf writeChar(int value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf writeFloat(float value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf writeDouble(double value) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf writeBytes(ByteBuf src) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf writeBytes(ByteBuf src, int length) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf writeBytes(byte[] src) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf writeBytes(byte[] src, int srcIndex, int length) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf writeBytes(ByteBuffer src) { - throw new UnsupportedOperationException(); - } - - @Override - public int writeBytes(InputStream in, int length) { - throw new UnsupportedOperationException(); - } - - @Override - public int writeBytes(ScatteringByteChannel in, int length) { - throw new UnsupportedOperationException(); - } - - @Override - public int writeBytes(FileChannel in, long position, int length) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuf writeZero(int length) { - throw new UnsupportedOperationException(); - } - - @Override - public int writeCharSequence(CharSequence sequence, Charset charset) { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuffer internalNioBuffer(int index, int length) { - return nioBuffer(index, length); - } - - @Override - public boolean hasArray() { - return false; - } - - @Override - public byte[] array() { - throw new UnsupportedOperationException(); - } - - @Override - public int arrayOffset() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean hasMemoryAddress() { - return false; - } - - @Override - public long memoryAddress() { - throw new UnsupportedOperationException(); - } - - @Override - protected void _setByte(int index, int value) {} - - @Override - protected void _setShort(int index, int value) {} - - @Override - protected void _setShortLE(int index, int value) {} - - @Override - protected void _setMedium(int index, int value) {} - - @Override - protected void _setMediumLE(int index, int value) {} - - @Override - protected void _setInt(int index, int value) {} - - @Override - protected void _setIntLE(int index, int value) {} - - @Override - protected void _setLong(int index, long value) {} - - @Override - protected void _setLongLE(int index, long value) {} -} diff --git a/rsocket-core/src/main/java/io/rsocket/buffer/BufferUtil.java b/rsocket-core/src/main/java/io/rsocket/buffer/BufferUtil.java deleted file mode 100644 index 476583ab3..000000000 --- a/rsocket-core/src/main/java/io/rsocket/buffer/BufferUtil.java +++ /dev/null @@ -1,78 +0,0 @@ -package io.rsocket.buffer; - -import java.lang.reflect.Field; -import java.nio.Buffer; -import java.nio.ByteBuffer; -import java.security.AccessController; -import java.security.PrivilegedExceptionAction; -import sun.misc.Unsafe; - -abstract class BufferUtil { - - private static final Unsafe UNSAFE; - - static { - Unsafe unsafe; - try { - final PrivilegedExceptionAction action = - () -> { - final Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - - return (Unsafe) f.get(null); - }; - - unsafe = AccessController.doPrivileged(action); - } catch (final Exception ex) { - throw new RuntimeException(ex); - } - - UNSAFE = unsafe; - } - - private static final long BYTE_BUFFER_ADDRESS_FIELD_OFFSET; - - static { - try { - BYTE_BUFFER_ADDRESS_FIELD_OFFSET = - UNSAFE.objectFieldOffset(Buffer.class.getDeclaredField("address")); - } catch (final Exception ex) { - throw new RuntimeException(ex); - } - } - - /** - * Allocate a new direct {@link ByteBuffer} that is aligned on a given alignment boundary. - * - * @param capacity required for the buffer. - * @param alignment boundary at which the buffer should begin. - * @return a new {@link ByteBuffer} with the required alignment. - * @throws IllegalArgumentException if the alignment is not a power of 2. - */ - static ByteBuffer allocateDirectAligned(final int capacity, final int alignment) { - if (alignment == 0) { - return ByteBuffer.allocateDirect(capacity); - } - - if (!isPowerOfTwo(alignment)) { - throw new IllegalArgumentException("Must be a power of 2: alignment=" + alignment); - } - - final ByteBuffer buffer = ByteBuffer.allocateDirect(capacity + alignment); - - final long address = UNSAFE.getLong(buffer, BYTE_BUFFER_ADDRESS_FIELD_OFFSET); - final int remainder = (int) (address & (alignment - 1)); - final int offset = alignment - remainder; - - buffer.limit(capacity + offset); - buffer.position(offset); - - return buffer.slice(); - } - - private static boolean isPowerOfTwo(final int value) { - return value > 0 && ((value & (~value + 1)) == value); - } - - private BufferUtil() {} -} diff --git a/rsocket-core/src/main/java/io/rsocket/buffer/Tuple2ByteBuf.java b/rsocket-core/src/main/java/io/rsocket/buffer/Tuple2ByteBuf.java deleted file mode 100644 index ba6620cb0..000000000 --- a/rsocket-core/src/main/java/io/rsocket/buffer/Tuple2ByteBuf.java +++ /dev/null @@ -1,394 +0,0 @@ -package io.rsocket.buffer; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.Unpooled; -import io.netty.util.ReferenceCountUtil; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.channels.GatheringByteChannel; -import java.nio.charset.Charset; - -class Tuple2ByteBuf extends AbstractTupleByteBuf { - - private static final long ONE_MASK = 0x100000000L; - private static final long TWO_MASK = 0x200000000L; - private static final long MASK = 0x700000000L; - - private final ByteBuf one; - private final ByteBuf two; - private final int oneReadIndex; - private final int twoReadIndex; - private final int oneReadableBytes; - private final int twoReadableBytes; - private final int twoRelativeIndex; - - private boolean freed; - - Tuple2ByteBuf(ByteBufAllocator allocator, ByteBuf one, ByteBuf two) { - super(allocator, one.readableBytes() + two.readableBytes()); - - this.one = one; - this.two = two; - - this.oneReadIndex = one.readerIndex(); - this.twoReadIndex = two.readerIndex(); - - this.oneReadableBytes = one.readableBytes(); - this.twoReadableBytes = two.readableBytes(); - - this.twoRelativeIndex = oneReadableBytes; - - this.freed = false; - } - - @Override - long calculateRelativeIndex(int index) { - checkIndex(index, 0); - - long relativeIndex; - long mask; - if (index >= twoRelativeIndex) { - relativeIndex = twoReadIndex + (index - oneReadableBytes); - mask = TWO_MASK; - } else { - relativeIndex = oneReadIndex + index; - mask = ONE_MASK; - } - - return relativeIndex | mask; - } - - @Override - ByteBuf getPart(int index) { - long ri = calculateRelativeIndex(index); - switch ((int) ((ri & MASK) >>> 32L)) { - case 0x1: - return one; - case 0x2: - return two; - default: - throw new IllegalStateException(); - } - } - - @Override - public boolean isDirect() { - return one.isDirect() && two.isDirect(); - } - - @Override - public int nioBufferCount() { - return one.nioBufferCount() + two.nioBufferCount(); - } - - @Override - public ByteBuffer nioBuffer() { - ByteBuffer[] oneBuffers = one.nioBuffers(); - ByteBuffer[] twoBuffers = two.nioBuffers(); - - ByteBuffer merged = - BufferUtil.allocateDirectAligned(capacity, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT) - .order(order()); - - for (ByteBuffer b : oneBuffers) { - merged.put(b); - } - - for (ByteBuffer b : twoBuffers) { - merged.put(b); - } - - merged.flip(); - return merged; - } - - @Override - public ByteBuffer[] _nioBuffers(int index, int length) { - long ri = calculateRelativeIndex(index); - index = (int) (ri & Integer.MAX_VALUE); - switch ((int) ((ri & MASK) >>> 32L)) { - case 0x1: - ByteBuffer[] oneBuffer; - ByteBuffer[] twoBuffer; - int l = Math.min(oneReadableBytes - index, length); - oneBuffer = one.nioBuffers(index, l); - length -= l; - if (length != 0) { - twoBuffer = two.nioBuffers(twoReadIndex, length); - ByteBuffer[] results = new ByteBuffer[oneBuffer.length + twoBuffer.length]; - System.arraycopy(oneBuffer, 0, results, 0, oneBuffer.length); - System.arraycopy(twoBuffer, 0, results, oneBuffer.length, twoBuffer.length); - return results; - } else { - return oneBuffer; - } - case 0x2: - return two.nioBuffers(index, length); - default: - throw new IllegalStateException(); - } - } - - @Override - public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { - checkDstIndex(index, length, dstIndex, dst.capacity()); - if (length == 0) { - return this; - } - - // FIXME: check twice here - long ri = calculateRelativeIndex(index); - index = (int) (ri & Integer.MAX_VALUE); - switch ((int) ((ri & MASK) >>> 32L)) { - case 0x1: - { - int l = Math.min(oneReadableBytes - index, length); - one.getBytes(index, dst, dstIndex, l); - length -= l; - dstIndex += l; - - if (length != 0) { - two.getBytes(twoReadIndex, dst, dstIndex, length); - } - - break; - } - case 0x2: - { - two.getBytes(index, dst, dstIndex, length); - break; - } - default: - throw new IllegalStateException(); - } - - return this; - } - - @Override - public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { - ByteBuf dstBuf = Unpooled.wrappedBuffer(dst); - return getBytes(index, dstBuf, dstIndex, length); - } - - @Override - public ByteBuf getBytes(int index, ByteBuffer dst) { - ByteBuf dstBuf = Unpooled.wrappedBuffer(dst); - return getBytes(index, dstBuf); - } - - @Override - public ByteBuf getBytes(int index, final OutputStream out, int length) throws IOException { - checkIndex(index, length); - if (length == 0) { - return this; - } - - long ri = calculateRelativeIndex(index); - index = (int) (ri & Integer.MAX_VALUE); - switch ((int) ((ri & MASK) >>> 32L)) { - case 0x1: - { - int l = Math.min(oneReadableBytes - index, length); - one.getBytes(index, out, l); - length -= l; - if (length != 0) { - two.getBytes(twoReadIndex, out, length); - } - break; - } - case 0x2: - { - two.getBytes(index, out, length); - break; - } - default: - throw new IllegalStateException(); - } - - return this; - } - - @Override - public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { - checkIndex(index, length); - int read = 0; - long ri = calculateRelativeIndex(index); - index = (int) (ri & Integer.MAX_VALUE); - switch ((int) ((ri & MASK) >>> 32L)) { - case 0x1: - { - int l = Math.min(oneReadableBytes - index, length); - read += one.getBytes(index, out, l); - length -= l; - if (length != 0) { - read += two.getBytes(twoReadIndex, out, length); - } - break; - } - case 0x2: - { - read += two.getBytes(index, out, length); - break; - } - default: - throw new IllegalStateException(); - } - - return read; - } - - @Override - public int getBytes(int index, FileChannel out, long position, int length) throws IOException { - checkIndex(index, length); - int read = 0; - long ri = calculateRelativeIndex(index); - index = (int) (ri & Integer.MAX_VALUE); - switch ((int) ((ri & MASK) >>> 32L)) { - case 0x1: - { - int l = Math.min(oneReadableBytes - index, length); - read += one.getBytes(index, out, position, l); - length -= l; - position += l; - if (length != 0) { - read += two.getBytes(twoReadIndex, out, position, length); - } - break; - } - case 0x2: - { - read += two.getBytes(index, out, position, length); - break; - } - default: - throw new IllegalStateException(); - } - - return read; - } - - @Override - public ByteBuf copy(int index, int length) { - checkIndex(index, length); - - ByteBuf buffer = allocator.buffer(length); - - if (index == 0 && length == capacity) { - buffer.writeBytes(one, oneReadIndex, oneReadableBytes); - buffer.writeBytes(two, twoReadIndex, twoReadableBytes); - - return buffer; - } - - long ri = calculateRelativeIndex(index); - index = (int) (ri & Integer.MAX_VALUE); - - switch ((int) ((ri & MASK) >>> 32L)) { - case 0x1: - { - int l = Math.min(oneReadableBytes - index, length); - buffer.writeBytes(one, index, l); - - length -= l; - - if (length != 0) { - buffer.writeBytes(two, twoReadIndex, length); - } - - return buffer; - } - case 0x2: - { - return buffer.writeBytes(two, index, length); - } - default: - throw new IllegalStateException(); - } - } - - @Override - public ByteBuf slice(final int readIndex, int length) { - checkIndex(readIndex, length); - - if (readIndex == 0 && length == capacity) { - return new Tuple2ByteBuf( - allocator, - one.slice(oneReadIndex, oneReadableBytes), - two.slice(twoReadIndex, twoReadableBytes)); - } - - long ri = calculateRelativeIndex(readIndex); - int index = (int) (ri & Integer.MAX_VALUE); - - switch ((int) ((ri & MASK) >>> 32L)) { - case 0x1: - { - ByteBuf oneSlice; - ByteBuf twoSlice; - - int l = Math.min(oneReadableBytes - index, length); - oneSlice = one.slice(index, l); - length -= l; - if (length != 0) { - twoSlice = two.slice(twoReadIndex, length); - return new Tuple2ByteBuf(allocator, oneSlice, twoSlice); - } else { - return oneSlice; - } - } - case 0x2: - { - return two.slice(index, length); - } - default: - throw new IllegalStateException(); - } - } - - @Override - protected void deallocate() { - if (freed) { - return; - } - - freed = true; - ReferenceCountUtil.safeRelease(one); - ReferenceCountUtil.safeRelease(two); - } - - @Override - public String toString(Charset charset) { - StringBuilder builder = new StringBuilder(capacity); - builder.append(one.toString(charset)); - builder.append(two.toString(charset)); - return builder.toString(); - } - - @Override - public String toString() { - return "Tuple2ByteBuf{" - + "capacity=" - + capacity - + ", one=" - + one - + ", two=" - + two - + ", allocator=" - + allocator - + ", oneReadIndex=" - + oneReadIndex - + ", twoReadIndex=" - + twoReadIndex - + ", oneReadableBytes=" - + oneReadableBytes - + ", twoReadableBytes=" - + twoReadableBytes - + ", twoRelativeIndex=" - + twoRelativeIndex - + '}'; - } -} diff --git a/rsocket-core/src/main/java/io/rsocket/buffer/Tuple3ByteBuf.java b/rsocket-core/src/main/java/io/rsocket/buffer/Tuple3ByteBuf.java deleted file mode 100644 index be593019f..000000000 --- a/rsocket-core/src/main/java/io/rsocket/buffer/Tuple3ByteBuf.java +++ /dev/null @@ -1,571 +0,0 @@ -package io.rsocket.buffer; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.Unpooled; -import io.netty.util.ReferenceCountUtil; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.channels.GatheringByteChannel; -import java.nio.charset.Charset; - -class Tuple3ByteBuf extends AbstractTupleByteBuf { - private static final long ONE_MASK = 0x100000000L; - private static final long TWO_MASK = 0x200000000L; - private static final long THREE_MASK = 0x400000000L; - private static final long MASK = 0x700000000L; - - private final ByteBuf one; - private final ByteBuf two; - private final ByteBuf three; - private final int oneReadIndex; - private final int twoReadIndex; - private final int threeReadIndex; - private final int oneReadableBytes; - private final int twoReadableBytes; - private final int threeReadableBytes; - private final int twoRelativeIndex; - private final int threeRelativeIndex; - - private boolean freed; - - Tuple3ByteBuf(ByteBufAllocator allocator, ByteBuf one, ByteBuf two, ByteBuf three) { - super(allocator, one.readableBytes() + two.readableBytes() + three.readableBytes()); - - this.one = one; - this.two = two; - this.three = three; - - this.oneReadIndex = one.readerIndex(); - this.twoReadIndex = two.readerIndex(); - this.threeReadIndex = three.readerIndex(); - - this.oneReadableBytes = one.readableBytes(); - this.twoReadableBytes = two.readableBytes(); - this.threeReadableBytes = three.readableBytes(); - - this.twoRelativeIndex = oneReadableBytes; - this.threeRelativeIndex = twoRelativeIndex + twoReadableBytes; - - this.freed = false; - } - - @Override - public boolean isDirect() { - return one.isDirect() && two.isDirect() && three.isDirect(); - } - - @Override - public long calculateRelativeIndex(int index) { - checkIndex(index, 0); - long relativeIndex; - long mask; - if (index >= threeRelativeIndex) { - relativeIndex = threeReadIndex + (index - twoReadableBytes - oneReadableBytes); - mask = THREE_MASK; - } else if (index >= twoRelativeIndex) { - relativeIndex = twoReadIndex + (index - oneReadableBytes); - mask = TWO_MASK; - } else { - relativeIndex = oneReadIndex + index; - mask = ONE_MASK; - } - - return relativeIndex | mask; - } - - @Override - public ByteBuf getPart(int index) { - long ri = calculateRelativeIndex(index); - switch ((int) ((ri & MASK) >>> 32L)) { - case 0x1: - return one; - case 0x2: - return two; - case 0x4: - return three; - default: - throw new IllegalStateException(); - } - } - - @Override - public int nioBufferCount() { - return one.nioBufferCount() + two.nioBufferCount() + three.nioBufferCount(); - } - - @Override - public ByteBuffer nioBuffer() { - - ByteBuffer[] oneBuffers = one.nioBuffers(); - ByteBuffer[] twoBuffers = two.nioBuffers(); - ByteBuffer[] threeBuffers = three.nioBuffers(); - - ByteBuffer merged = - BufferUtil.allocateDirectAligned(capacity, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT) - .order(order()); - - for (ByteBuffer b : oneBuffers) { - merged.put(b); - } - - for (ByteBuffer b : twoBuffers) { - merged.put(b); - } - - for (ByteBuffer b : threeBuffers) { - merged.put(b); - } - - merged.flip(); - return merged; - } - - @Override - public ByteBuffer[] _nioBuffers(int index, int length) { - long ri = calculateRelativeIndex(index); - index = (int) (ri & Integer.MAX_VALUE); - switch ((int) ((ri & MASK) >>> 32L)) { - case 0x1: - { - ByteBuffer[] oneBuffer; - ByteBuffer[] twoBuffer; - ByteBuffer[] threeBuffer; - int l = Math.min(oneReadableBytes - index, length); - oneBuffer = one.nioBuffers(index, l); - length -= l; - if (length != 0) { - l = Math.min(twoReadableBytes, length); - twoBuffer = two.nioBuffers(twoReadIndex, l); - length -= l; - if (length != 0) { - threeBuffer = three.nioBuffers(threeReadIndex, length); - ByteBuffer[] results = - new ByteBuffer[oneBuffer.length + twoBuffer.length + threeBuffer.length]; - System.arraycopy(oneBuffer, 0, results, 0, oneBuffer.length); - System.arraycopy(twoBuffer, 0, results, oneBuffer.length, twoBuffer.length); - System.arraycopy( - threeBuffer, 0, results, oneBuffer.length + twoBuffer.length, threeBuffer.length); - return results; - } else { - ByteBuffer[] results = new ByteBuffer[oneBuffer.length + twoBuffer.length]; - System.arraycopy(oneBuffer, 0, results, 0, oneBuffer.length); - System.arraycopy(twoBuffer, 0, results, oneBuffer.length, twoBuffer.length); - return results; - } - } else { - return oneBuffer; - } - } - case 0x2: - { - ByteBuffer[] twoBuffer; - ByteBuffer[] threeBuffer; - int l = Math.min(twoReadableBytes - index, length); - twoBuffer = two.nioBuffers(index, l); - length -= l; - if (length != 0) { - threeBuffer = three.nioBuffers(threeReadIndex, length); - ByteBuffer[] results = new ByteBuffer[twoBuffer.length + threeBuffer.length]; - System.arraycopy(twoBuffer, 0, results, 0, twoBuffer.length); - System.arraycopy(threeBuffer, 0, results, twoBuffer.length, threeBuffer.length); - return results; - } else { - return twoBuffer; - } - } - case 0x4: - return three.nioBuffers(index, length); - default: - throw new IllegalStateException(); - } - } - - @Override - public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { - checkDstIndex(index, length, dstIndex, dst.capacity()); - long ri = calculateRelativeIndex(index); - index = (int) (ri & Integer.MAX_VALUE); - switch ((int) ((ri & MASK) >>> 32L)) { - case 0x1: - { - int l = Math.min(oneReadableBytes - index, length); - one.getBytes(index, dst, dstIndex, l); - length -= l; - dstIndex += l; - - if (length != 0) { - l = Math.min(twoReadableBytes, length); - two.getBytes(twoReadIndex, dst, dstIndex, l); - length -= l; - dstIndex += l; - - if (length != 0) { - three.getBytes(threeReadIndex, dst, dstIndex, length); - } - } - break; - } - case 0x2: - { - int l = Math.min(twoReadableBytes - index, length); - two.getBytes(index, dst, dstIndex, l); - length -= l; - dstIndex += l; - - if (length != 0) { - three.getBytes(threeReadIndex, dst, dstIndex, length); - } - break; - } - case 0x4: - { - three.getBytes(index, dst, dstIndex, length); - break; - } - default: - throw new IllegalStateException(); - } - - return this; - } - - @Override - public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { - ByteBuf dstBuf = Unpooled.wrappedBuffer(dst); - return getBytes(index, dstBuf, dstIndex, length); - } - - @Override - public ByteBuf getBytes(int index, ByteBuffer dst) { - ByteBuf dstBuf = Unpooled.wrappedBuffer(dst); - return getBytes(index, dstBuf); - } - - @Override - public ByteBuf getBytes(int index, final OutputStream out, int length) throws IOException { - checkIndex(index, length); - long ri = calculateRelativeIndex(index); - index = (int) (ri & Integer.MAX_VALUE); - switch ((int) ((ri & MASK) >>> 32L)) { - case 0x1: - { - int l = Math.min(oneReadableBytes - index, length); - one.getBytes(index, out, l); - length -= l; - if (length != 0) { - l = Math.min(twoReadableBytes, length); - two.getBytes(twoReadIndex, out, l); - length -= l; - if (length != 0) { - three.getBytes(threeReadIndex, out, length); - } - } - break; - } - case 0x2: - { - int l = Math.min(twoReadableBytes - index, length); - two.getBytes(index, out, l); - length -= l; - - if (length != 0) { - three.getBytes(threeReadIndex, out, length); - } - break; - } - case 0x4: - { - three.getBytes(index, out, length); - - break; - } - default: - throw new IllegalStateException(); - } - - return this; - } - - @Override - public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { - checkIndex(index, length); - int read = 0; - long ri = calculateRelativeIndex(index); - index = (int) (ri & Integer.MAX_VALUE); - switch ((int) ((ri & MASK) >>> 32L)) { - case 0x1: - { - int l = Math.min(oneReadableBytes - index, length); - read += one.getBytes(index, out, l); - length -= l; - if (length != 0) { - l = Math.min(twoReadableBytes, length); - read += two.getBytes(twoReadIndex, out, l); - length -= l; - if (length != 0) { - read += three.getBytes(threeReadIndex, out, length); - } - } - break; - } - case 0x2: - { - int l = Math.min(twoReadableBytes - index, length); - read += two.getBytes(index, out, l); - length -= l; - - if (length != 0) { - read += three.getBytes(threeReadIndex, out, length); - } - break; - } - case 0x4: - { - read += three.getBytes(index, out, length); - - break; - } - default: - throw new IllegalStateException(); - } - - return read; - } - - @Override - public int getBytes(int index, FileChannel out, long position, int length) throws IOException { - checkIndex(index, length); - int read = 0; - long ri = calculateRelativeIndex(index); - index = (int) (ri & Integer.MAX_VALUE); - switch ((int) ((ri & MASK) >>> 32L)) { - case 0x1: - { - int l = Math.min(oneReadableBytes - index, length); - read += one.getBytes(index, out, position, l); - length -= l; - position += l; - - if (length != 0) { - l = Math.min(twoReadableBytes, length); - read += two.getBytes(twoReadIndex, out, position, l); - length -= l; - position += l; - - if (length != 0) { - read += three.getBytes(threeReadIndex, out, position, length); - } - } - break; - } - case 0x2: - { - int l = Math.min(twoReadableBytes - index, length); - read += two.getBytes(index, out, position, l); - length -= l; - position += l; - - if (length != 0) { - read += three.getBytes(threeReadIndex, out, position, length); - } - break; - } - case 0x4: - { - read += three.getBytes(index, out, position, length); - - break; - } - default: - throw new IllegalStateException(); - } - - return read; - } - - @Override - public ByteBuf copy(int index, int length) { - checkIndex(index, length); - - ByteBuf buffer = allocator.buffer(length); - - if (index == 0 && length == capacity) { - buffer.writeBytes(one, oneReadIndex, oneReadableBytes); - buffer.writeBytes(two, twoReadIndex, twoReadableBytes); - buffer.writeBytes(three, threeReadIndex, threeReadableBytes); - - return buffer; - } - - long ri = calculateRelativeIndex(index); - index = (int) (ri & Integer.MAX_VALUE); - - switch ((int) ((ri & MASK) >>> 32L)) { - case 0x1: - { - int l = Math.min(oneReadableBytes - index, length); - buffer.writeBytes(one, index, l); - length -= l; - - if (length != 0) { - l = Math.min(twoReadableBytes, length); - buffer.writeBytes(two, twoReadIndex, l); - length -= l; - if (length != 0) { - buffer.writeBytes(three, threeReadIndex, length); - } - } - - return buffer; - } - case 0x2: - { - int l = Math.min(twoReadableBytes - index, length); - buffer.writeBytes(two, index, l); - length -= l; - - if (length != 0) { - buffer.writeBytes(three, threeReadIndex, length); - } - - return buffer; - } - case 0x4: - { - buffer.writeBytes(three, index, length); - - return buffer; - } - default: - throw new IllegalStateException(); - } - } - - @Override - public ByteBuf retainedSlice() { - return new Tuple3ByteBuf( - allocator, - one.retainedSlice(oneReadIndex, oneReadableBytes), - two.retainedSlice(twoReadIndex, twoReadableBytes), - three.retainedSlice(threeReadIndex, threeReadableBytes)); - } - - @Override - public ByteBuf slice(final int readIndex, int length) { - checkIndex(readIndex, length); - - if (readIndex == 0 && length == capacity) { - return new Tuple3ByteBuf( - allocator, - one.slice(oneReadIndex, oneReadableBytes), - two.slice(twoReadIndex, twoReadableBytes), - three.slice(threeReadIndex, threeReadableBytes)); - } - - long ri = calculateRelativeIndex(readIndex); - int index = (int) (ri & Integer.MAX_VALUE); - switch ((int) ((ri & MASK) >>> 32L)) { - case 0x1: - { - ByteBuf oneSlice; - ByteBuf twoSlice; - ByteBuf threeSlice; - - int l = Math.min(oneReadableBytes - index, length); - oneSlice = one.slice(index, l); - length -= l; - if (length != 0) { - l = Math.min(twoReadableBytes, length); - twoSlice = two.slice(twoReadIndex, l); - length -= l; - if (length != 0) { - threeSlice = three.slice(threeReadIndex, length); - return new Tuple3ByteBuf(allocator, oneSlice, twoSlice, threeSlice); - } else { - return new Tuple2ByteBuf(allocator, oneSlice, twoSlice); - } - - } else { - return oneSlice; - } - } - case 0x2: - { - ByteBuf twoSlice; - ByteBuf threeSlice; - - int l = Math.min(twoReadableBytes - index, length); - twoSlice = two.slice(index, l); - length -= l; - if (length != 0) { - threeSlice = three.slice(threeReadIndex, length); - return new Tuple2ByteBuf(allocator, twoSlice, threeSlice); - } else { - return twoSlice; - } - } - case 0x4: - { - return three.slice(index, length); - } - default: - throw new IllegalStateException(); - } - } - - @Override - protected void deallocate() { - if (freed) { - return; - } - - freed = true; - ReferenceCountUtil.safeRelease(one); - ReferenceCountUtil.safeRelease(two); - ReferenceCountUtil.safeRelease(three); - } - - @Override - public String toString(Charset charset) { - StringBuilder builder = new StringBuilder(3); - builder.append(one.toString(charset)); - builder.append(two.toString(charset)); - builder.append(three.toString(charset)); - return builder.toString(); - } - - @Override - public String toString() { - return "Tuple3ByteBuf{" - + "capacity=" - + capacity - + ", one=" - + one - + ", two=" - + two - + ", three=" - + three - + ", allocator=" - + allocator - + ", oneReadIndex=" - + oneReadIndex - + ", twoReadIndex=" - + twoReadIndex - + ", threeReadIndex=" - + threeReadIndex - + ", oneReadableBytes=" - + oneReadableBytes - + ", twoReadableBytes=" - + twoReadableBytes - + ", threeReadableBytes=" - + threeReadableBytes - + ", twoRelativeIndex=" - + twoRelativeIndex - + ", threeRelativeIndex=" - + threeRelativeIndex - + '}'; - } -} diff --git a/rsocket-core/src/main/java/io/rsocket/buffer/TupleByteBuf.java b/rsocket-core/src/main/java/io/rsocket/buffer/TupleByteBuf.java deleted file mode 100644 index 8c8e2e7e4..000000000 --- a/rsocket-core/src/main/java/io/rsocket/buffer/TupleByteBuf.java +++ /dev/null @@ -1,35 +0,0 @@ -package io.rsocket.buffer; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import java.util.Objects; - -public abstract class TupleByteBuf { - - private TupleByteBuf() {} - - public static ByteBuf of(ByteBuf one, ByteBuf two) { - return of(ByteBufAllocator.DEFAULT, one, two); - } - - public static ByteBuf of(ByteBufAllocator allocator, ByteBuf one, ByteBuf two) { - Objects.requireNonNull(allocator); - Objects.requireNonNull(one); - Objects.requireNonNull(two); - - return new Tuple2ByteBuf(allocator, one, two); - } - - public static ByteBuf of(ByteBuf one, ByteBuf two, ByteBuf three) { - return of(ByteBufAllocator.DEFAULT, one, two, three); - } - - public static ByteBuf of(ByteBufAllocator allocator, ByteBuf one, ByteBuf two, ByteBuf three) { - Objects.requireNonNull(allocator); - Objects.requireNonNull(one); - Objects.requireNonNull(two); - Objects.requireNonNull(three); - - return new Tuple3ByteBuf(allocator, one, two, three); - } -} diff --git a/rsocket-core/src/test/java/io/rsocket/buffer/Tuple3ByteBufTest.java b/rsocket-core/src/test/java/io/rsocket/buffer/Tuple3ByteBufTest.java deleted file mode 100644 index 4515fb29b..000000000 --- a/rsocket-core/src/test/java/io/rsocket/buffer/Tuple3ByteBufTest.java +++ /dev/null @@ -1,98 +0,0 @@ -package io.rsocket.buffer; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.Unpooled; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.concurrent.ThreadLocalRandom; -import org.junit.Assert; -import org.junit.jupiter.api.Test; - -class Tuple3ByteBufTest { - @Test - void testTupleBufferGet() { - ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; - ByteBuf one = allocator.directBuffer(9); - - byte[] bytes = new byte[9]; - ThreadLocalRandom.current().nextBytes(bytes); - one.writeBytes(bytes); - - bytes = new byte[8]; - ThreadLocalRandom.current().nextBytes(bytes); - ByteBuf two = Unpooled.wrappedBuffer(bytes); - - bytes = new byte[9]; - ThreadLocalRandom.current().nextBytes(bytes); - ByteBuf three = Unpooled.wrappedBuffer(bytes); - - ByteBuf tuple = TupleByteBuf.of(one, two, three); - - int anInt = tuple.getInt(16); - - long aLong = tuple.getLong(15); - - short aShort = tuple.getShort(8); - - int medium = tuple.getMedium(8); - } - - @Test - void testTuple3BufferSlicing() { - ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; - ByteBuf one = allocator.directBuffer(); - ByteBufUtil.writeUtf8(one, "foo"); - - ByteBuf two = allocator.directBuffer(); - ByteBufUtil.writeUtf8(two, "bar"); - - ByteBuf three = allocator.directBuffer(); - ByteBufUtil.writeUtf8(three, "bar"); - - ByteBuf buf = TupleByteBuf.of(one, two, three); - - String s = buf.slice(0, 6).toString(Charset.defaultCharset()); - Assert.assertEquals("foobar", s); - - String s1 = buf.slice(3, 6).toString(Charset.defaultCharset()); - Assert.assertEquals("barbar", s1); - - String s2 = buf.slice(4, 4).toString(Charset.defaultCharset()); - Assert.assertEquals("arba", s2); - } - - @Test - void testTuple3ToNioBuffers() throws Exception { - ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; - ByteBuf one = allocator.directBuffer(); - ByteBufUtil.writeUtf8(one, "one"); - - ByteBuf two = allocator.directBuffer(); - ByteBufUtil.writeUtf8(two, "two"); - - ByteBuf three = allocator.directBuffer(); - ByteBufUtil.writeUtf8(three, "three"); - - ByteBuf buf = TupleByteBuf.of(one, two, three); - ByteBuffer[] byteBuffers = buf.nioBuffers(); - - Assert.assertEquals(3, byteBuffers.length); - - ByteBuffer bb = byteBuffers[0]; - byte[] dst = new byte[bb.remaining()]; - bb.get(dst); - Assert.assertEquals("one", new String(dst, "UTF-8")); - - bb = byteBuffers[1]; - dst = new byte[bb.remaining()]; - bb.get(dst); - Assert.assertEquals("two", new String(dst, "UTF-8")); - - bb = byteBuffers[2]; - dst = new byte[bb.remaining()]; - bb.get(dst); - Assert.assertEquals("three", new String(dst, "UTF-8")); - } -}