查看原文
其他

Flink原理 | Flink中的数据抽象及数据交换过程

大数据技术与架构点击右侧关注,大数据开发领域最强公众号!

暴走大数据点击右侧关注,暴走大数据!



By 大数据技术与架构


场景描述:Flink作为一个高效的流框架,为了避免JVM的固有缺陷(java对象存储密度低,FGC影响吞吐和响应等),必然走上自主管理内存的道路。


关键词:数据抽象 内存管理

Flink的数据抽象

MemorySegment

Flink作为一个高效的流框架,为了避免JVM的固有缺陷(java对象存储密度低,FGC影响吞吐和响应等),必然走上自主管理内存的道路。

这个MemorySegment就是Flink的内存抽象。默认情况下,一个MemorySegment可以被看做是一个32kb大的内存块的抽象。这块内存既可以是JVM里的一个byte[],也可以是堆外内存(DirectByteBuffer)。

如果说byte[]数组和direct memory是最底层的存储,那么memorysegment就是在其上覆盖的一层统一抽象。它定义了一系列抽象方法,用于控制和底层内存的交互,如:

public abstract class MemorySegment {

public abstract byte get(int index);

public abstract void put(int index, byte b);

public int size() ;

public abstract ByteBuffer wrap(int offset, int length);

......
}

我们可以看到,它在提供了诸多直接操作内存的方法外,还提供了一个wrap()方法,将自己包装成一个ByteBuffer,我们待会儿讲这个ByteBuffer。

Flink为MemorySegment提供了两个实现类:HeapMemorySegment和HybridMemorySegment。他们的区别在于前者只能分配堆内存,而后者能用来分配堆内和堆外内存。事实上,Flink框架里,只使用了后者。这是为什么呢?

如果HybridMemorySegment只能用于分配堆外内存的话,似乎更合常理。但是在JVM的世界中,如果一个方法是一个虚方法,那么每次调用时,JVM都要花时间去确定调用的到底是哪个子类实现的该虚方法(方法重写机制,不明白的去看JVM的invokeVirtual指令),也就意味着每次都要去翻方法表;而如果该方法虽然是个虚方法,但实际上整个JVM里只有一个实现(就是说只加载了一个子类进来),那么JVM会很聪明的把它去虚化处理,这样就不用每次调用方法时去找方法表了,能够大大提升性能。但是只分配堆内或者堆外内存不能满足我们的需要,所以就出现了HybridMemorySegment同时可以分配两种内存的设计。

我们可以看看HybridMemorySegment的构造代码:

HybridMemorySegment(ByteBuffer buffer, Object owner) {
super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
this.offHeapBuffer = buffer;
}

HybridMemorySegment(byte[] buffer, Object owner) {
super(buffer, owner);
this.offHeapBuffer = null;
}

其中,第一个构造函数的checkBufferAndGetAddress()方法能够得到direct buffer的内存地址,因此可以操作堆外内存。

ByteBuffer与NetworkBufferPool
在MemorySegment这个抽象之上,Flink在数据从operator内的数据对象在向TaskManager上转移,预备被发给下个节点的过程中,使用的抽象或者说内存对象是Buffer。注意,这个Buffer是个flink接口,不是java.nio提供的那个Buffer抽象类。Flink在这一层面同时使用了这两个同名概念,用来存储对象,直接看代码时到处都是各种xxxBuffer很容易混淆:
  • java提供的那个Buffer抽象类在这一层主要用于构建HeapByteBuffer,这个主要是当数据从jvm里的一个对象被序列化成字节数组时用的;

  • Flink的这个Buffer接口主要是一种flink层面用于传输数据和事件的统一抽象,其实现类是NetworkBuffer,是对MemorySegment的包装。Flink在各个TaskManager之间传递数据时,使用的是这一层的抽象。
    因为Buffer的底层是MemorySegment,这可能不是JVM所管理的,所以为了知道什么时候一个Buffer用完了可以回收,Flink引入了引用计数的概念,当确认这个buffer没有人引用,就可以回收这一片MemorySegment用于别的地方了(JVM的垃圾回收为啥不用引用计数?读者思考一下):

public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {

private volatile int refCnt = 1;

......
}
为了方便管理NetworkBuffer,Flink提供了BufferPoolFactory,并且提供了唯一实现NetworkBufferPool,这是个工厂模式的应用。NetworkBufferPool在每个TaskManager上只有一个,负责所有子task的内存管理。其实例化时就会尝试获取所有可由它管理的内存(对于堆内存来说,直接获取所有内存并放入老年代,并令用户对象只在新生代存活,可以极大程度的减少Full GC),我们看看其构造方法:public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {

......

try {
this.availableMemorySegments = new ArrayBlockingQueue<>(numberOfSegmentsToAllocate);
}
catch (OutOfMemoryError err) {
throw new OutOfMemoryError("Could not allocate buffer queue of length "
+ numberOfSegmentsToAllocate + " - " + err.getMessage());
}

try {
for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize);
availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null));
}
}

......

long allocatedMb = (sizeInLong * availableMemorySegments.size()) >> 20;

LOG.info("Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).",
allocatedMb, availableMemorySegments.size(), segmentSize);
}
由于NetworkBufferPool只是个工厂,实际的内存池是LocalBufferPool。每个TaskManager都只有一个NetworkBufferPool工厂,但是上面运行的每个task都要有一个和其他task隔离的LocalBufferPool池,这从逻辑上很好理解。另外,NetworkBufferPool会计算自己所拥有的所有内存分片数,在分配新的内存池时对每个内存池应该占有的内存分片数重分配,步骤是:
  • 首先,从整个工厂管理的内存片中拿出所有的内存池所需要的最少Buffer数目总和
    如果正好分配完,就结束

  • 其次,把所有的剩下的没分配的内存片,按照每个LocalBufferPool内存池的剩余想要容量大小进行按比例分配

  • 剩余想要容量大小是这么个东西:如果该内存池至少需要3个buffer,最大需要10个buffer,那么它的剩余想要容量就是7
    实现代码如下:

private void redistributeBuffers() throws IOException {
assert Thread.holdsLock(factoryLock);

// All buffers, which are not among the required ones
final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;

if (numAvailableMemorySegment == 0) {
// in this case, we need to redistribute buffers so that every pool gets its minimum
for (LocalBufferPool bufferPool : allBufferPools) {
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
}
return;
}

long totalCapacity = 0; // long to avoid int overflow

for (LocalBufferPool bufferPool : allBufferPools) {
int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
bufferPool.getNumberOfRequiredMemorySegments();
totalCapacity += Math.min(numAvailableMemorySegment, excessMax);
}

// no capacity to receive additional buffers?
if (totalCapacity == 0) {
return; // necessary to avoid div by zero when nothing to re-distribute
}

final int memorySegmentsToDistribute = MathUtils.checkedDownCast(
Math.min(numAvailableMemorySegment, totalCapacity));

long totalPartsUsed = 0; // of totalCapacity
int numDistributedMemorySegment = 0;
for (LocalBufferPool bufferPool : allBufferPools) {
int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
bufferPool.getNumberOfRequiredMemorySegments();

// shortcut
if (excessMax == 0) {
continue;
}

totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax);


final int mySize = MathUtils.checkedDownCast(
memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment);

numDistributedMemorySegment += mySize;
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);
}

assert (totalPartsUsed == totalCapacity);
assert (numDistributedMemorySegment == memorySegmentsToDistribute);
}
接下来说说这个LocalBufferPool内存池。
LocalBufferPool的逻辑想想无非是增删改查,值得说的是其fields:/** 该内存池需要的最少内存片数目*/
private final int numberOfRequiredMemorySegments;

/**
* 当前已经获得的内存片中,还没有写入数据的空白内存片
*/

private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();

/**
* 注册的所有监控buffer可用性的监听器
*/

private final ArrayDeque<BufferListener> registeredListeners = new ArrayDeque<>();

/** 能给内存池分配的最大分片数*/
private final int maxNumberOfMemorySegments;

/** 当前内存池大小 */
private int currentPoolSize;

/**
* 所有经由NetworkBufferPool分配的,被本内存池引用到的(非直接获得的)分片数
*/

private int numberOfRequestedMemorySegments;
承接NetworkBufferPool的重分配方法,我们来看看LocalBufferPool的setNumBuffers()方法,代码很短,逻辑也相当简单,就不展开说了:public void setNumBuffers(int numBuffers) throws IOException {
synchronized (availableMemorySegments) {
checkArgument(numBuffers >= numberOfRequiredMemorySegments,
"Buffer pool needs at least %s buffers, but tried to set to %s",
numberOfRequiredMemorySegments, numBuffers);

if (numBuffers > maxNumberOfMemorySegments) {
currentPoolSize = maxNumberOfMemorySegments;
} else {
currentPoolSize = numBuffers;
}

returnExcessMemorySegments();

// If there is a registered owner and we have still requested more buffers than our
// size, trigger a recycle via the owner.
if (owner != null && numberOfRequestedMemorySegments > currentPoolSize) {
owner.releaseMemory(numberOfRequestedMemorySegments - numBuffers);
}
}
}
RecordWriter与Record

我们接着往高层抽象走,刚刚提到了最底层内存抽象是MemorySegment,用于数据传输的是Buffer,那么,承上启下对接从Java对象转为Buffer的中间对象是什么呢?是StreamRecord。

从StreamRecord<T>这个类名字就可以看出来,这个类就是个wrap,里面保存了原始的Java对象。另外,StreamRecord还保存了一个timestamp。

那么这个对象是怎么变成LocalBufferPool内存池里的一个大号字节数组的呢?借助了StreamWriter这个类。

我们直接来看把数据序列化交出去的方法:

private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
RecordSerializer<T> serializer = serializers[targetChannel];

SerializationResult result = serializer.addRecord(record);

while (result.isFullBuffer()) {
if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
// If this was a full record, we are done. Not breaking
// out of the loop at this point will lead to another
// buffer request before breaking out (that would not be
// a problem per se, but it can lead to stalls in the
// pipeline).
if (result.isFullRecord()) {
break;
}
}
BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);

result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
}
checkState(!serializer.hasSerializedData(), "All data should be written at once");



if (flushAlways) {
targetPartition.flush(targetChannel);
}
}

先说最后一行,如果配置为flushAlways,那么会立刻把元素发送出去,但是这样吞吐量会下降;Flink的默认设置其实也不是一个元素一个元素的发送,是单独起了一个线程,每隔固定时间flush一次所有channel,较真起来也算是mini batch了。
再说序列化那一句:SerializationResult result = serializer.addRecord(record);。在这行代码中,Flink把对象调用该对象所属的序列化器序列化为字节数组。

数据流转过程

上一节讲了各层数据的抽象,这一节讲讲数据在各个task之间exchange的过程。

整体过程

看这张图:

1.第一步必然是准备一个ResultPartition;
2.通知JobMaster;
3.JobMaster通知下游节点;如果下游节点尚未部署,则部署之;
4.下游节点向上游请求数据
5.开始传输数据

数据跨task传递

本节讲一下算子之间具体的数据传输过程。也先上一张图:

数据在task之间传递有如下几步:

1.数据在本operator处理完后,交给RecordWriter。每条记录都要选择一个下游节点,所以要经过ChannelSelector。
2.每个channel都有一个serializer(我认为这应该是为了避免多线程写的麻烦),把这条Record序列化为ByteBuffer
3.接下来数据被写入ResultPartition下的各个subPartition里,此时该数据已经存入DirectBuffer(MemorySegment)
4.单独的线程控制数据的flush速度,一旦触发flush,则通过Netty的nio通道向对端写入
5.对端的netty client接收到数据,decode出来,把数据拷贝到buffer里,然后通知InputChannel
6.有可用的数据时,下游算子从阻塞醒来,从InputChannel取出buffer,再解序列化成record,交给算子执行用户代码
数据在不同机器的算子之间传递的步骤就是以上这些。

了解了步骤之后,再来看一下部分关键代码:
首先是把数据交给recordwriter。

//RecordWriterOutput.java
@Override
public void collect(StreamRecord<OUT> record) {
if (this.outputTag != null) {
// we are only responsible for emitting to the main input
return;
}
//这里可以看到把记录交给了recordwriter
pushToRecordWriter(record);
}
然后recordwriter把数据发送到对应的通道。//RecordWriter.java
public void emit(T record) throws IOException, InterruptedException {
//channelselector登场了
for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
sendToTarget(record, targetChannel);
}
}

private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {

//选择序列化器并序列化数据
RecordSerializer<T> serializer = serializers[targetChannel];

SerializationResult result = serializer.addRecord(record);

while (result.isFullBuffer()) {
if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
// If this was a full record, we are done. Not breaking
// out of the loop at this point will lead to another
// buffer request before breaking out (that would not be
// a problem per se, but it can lead to stalls in the
// pipeline).
if (result.isFullRecord()) {
break;
}
}
BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);

//写入channel
result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
}
checkState(!serializer.hasSerializedData(), "All data should be written at once");

if (flushAlways) {
targetPartition.flush(targetChannel);
}
}
接下来是把数据推给底层设施(netty)的过程://ResultPartition.java
@Override
public void flushAll() {
for (ResultSubpartition subpartition : subpartitions) {
subpartition.flush();
}
}

//PartitionRequestQueue.java
void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {
//这里交给了netty server线程去推
ctx.executor().execute(new Runnable() {
@Override
public void run() {
ctx.pipeline().fireUserEventTriggered(reader);
}
});
}
netty相关的部分://AbstractChannelHandlerContext.java
public ChannelHandlerContext fireUserEventTriggered(final Object event) {
if (event == null) {
throw new NullPointerException("event");
} else {
final AbstractChannelHandlerContext next = this.findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeUserEventTriggered(event);
} else {
executor.execute(new OneTimeTask() {
public void run() {
next.invokeUserEventTriggered(event);
}
});
}

return this;
}
}
最后真实的写入://PartittionRequesetQueue.java
private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception {
if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
return;
}
// Queue an available reader for consumption. If the queue is empty,
// we try trigger the actual write. Otherwise this will be handled by
// the writeAndFlushNextMessageIfPossible calls.
boolean triggerWrite = availableReaders.isEmpty();
registerAvailableReader(reader);

if (triggerWrite) {
writeAndFlushNextMessageIfPossible(ctx.channel());
}
}

private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {

......

next = reader.getNextBuffer();
if (next == null) {
if (!reader.isReleased()) {
continue;
}
markAsReleased(reader.getReceiverId());

Throwable cause = reader.getFailureCause();
if (cause != null) {
ErrorResponse msg = new ErrorResponse(
new ProducerFailedException(cause),
reader.getReceiverId());

ctx.writeAndFlush(msg);
}
} else {
// This channel was now removed from the available reader queue.
// We re-add it into the queue if it is still available
if (next.moreAvailable()) {
registerAvailableReader(reader);
}

BufferResponse msg = new BufferResponse(
next.buffer(),
reader.getSequenceNumber(),
reader.getReceiverId(),
next.buffersInBacklog());

if (isEndOfPartitionEvent(next.buffer())) {
reader.notifySubpartitionConsumed();
reader.releaseAllResources();

markAsReleased(reader.getReceiverId());
}

// Write and flush and wait until this is done before
// trying to continue with the next buffer.
channel.writeAndFlush(msg).addListener(writeListener);

return;
}

......

}

上面这段代码里第二个方法中调用的writeAndFlush(msg)就是真正往netty的nio通道里写入的地方了。在这里,写入的是一个RemoteInputChannel,对应的就是下游节点的InputGate的channels。

有写就有读,nio通道的另一端需要读入buffer,代码如下:

//CreditBasedPartitionRequestClientHandler.java
private void decodeMsg(Object msg) throws Throwable {
final Class<?> msgClazz = msg.getClass();

// ---- Buffer --------------------------------------------------------
if (msgClazz == NettyMessage.BufferResponse.class) {
NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;

RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
if (inputChannel == null) {
bufferOrEvent.releaseBuffer();

cancelRequestFor(bufferOrEvent.receiverId);

return;
}

decodeBufferOrEvent(inputChannel, bufferOrEvent);

}

......

}

插一句,Flink其实做阻塞和获取数据的方式非常自然,利用了生产者和消费者模型,当获取不到数据时,消费者自然阻塞;当数据被加入队列,消费者被notify。Flink的背压机制也是借此实现。

然后在这里又反序列化成StreamRecord:

//StreamElementSerializer.java
public StreamElement deserialize(DataInputView source) throws IOException {
int tag = source.readByte();
if (tag == TAG_REC_WITH_TIMESTAMP) {
long timestamp = source.readLong();
return new StreamRecord<T>(typeSerializer.deserialize(source), timestamp);
}
else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
return new StreamRecord<T>(typeSerializer.deserialize(source));
}
else if (tag == TAG_WATERMARK) {
return new Watermark(source.readLong());
}
else if (tag == TAG_STREAM_STATUS) {
return new StreamStatus(source.readInt());
}
else if (tag == TAG_LATENCY_MARKER) {
return new LatencyMarker(source.readLong(), new OperatorID(source.readLong(), source.readLong()), source.readInt());
}
else {
throw new IOException("Corrupt stream, found tag: " + tag);
}
}

然后再次在StreamInputProcessor.processInput()循环中得到处理。

至此,数据在跨jvm的节点之间的流转过程就讲完了。

来源:jianshu.com/p/eaafc9db3f74

欢迎点赞+收藏+转发朋友圈素质三连


文章不错?点个【在看】吧! 👇

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存