博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的StateDescriptor
阅读量:7018 次
发布时间:2019-06-28

本文共 13793 字,大约阅读时间需要 45 分钟。

  hot3.png

本文主要研究一下flink的StateDescriptor

RuntimeContext.getState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/RuntimeContext.java

/** * A RuntimeContext contains information about the context in which functions are executed. Each parallel instance * of the function will have a context through which it can access static contextual information (such as * the current parallelism) and other constructs like accumulators and broadcast variables. * * 

A function can, during runtime, obtain the RuntimeContext via a call to * {@link AbstractRichFunction#getRuntimeContext()}. */@Publicpublic interface RuntimeContext { //...... @PublicEvolving

ValueState
getState(ValueStateDescriptor
stateProperties); @PublicEvolving
ListState
getListState(ListStateDescriptor
stateProperties); @PublicEvolving
ReducingState
getReducingState(ReducingStateDescriptor
stateProperties); @PublicEvolving
AggregatingState
getAggregatingState(AggregatingStateDescriptor
stateProperties); @PublicEvolving @Deprecated
FoldingState
getFoldingState(FoldingStateDescriptor
stateProperties); @PublicEvolving
MapState
getMapState(MapStateDescriptor
stateProperties);}

  • RuntimeContext针对各种state提供了根据对应StateDescriptor的get方法,比如提供了getState方法,通过ValueStateDescriptor参数来获取ValueState;getListState通过ListStateDescriptor获取ListState;getReducingState通过ReducingStateDescriptor获取ReducingState;getAggregatingState通过AggregatingStateDescriptor获取AggregatingState;getFoldingState通过FoldingStateDescriptor获取FoldingState;getMapState通过MapStateDescriptor获取MapState

StateDescriptor

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/StateDescriptor.java

/** * Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned * {@link State} in stateful operations. * * 

Subclasses must correctly implement {@link #equals(Object)} and {@link #hashCode()}. * * @param The type of the State objects created from this {@code StateDescriptor}. * @param

The type of the value of the state object described by this state descriptor. */@PublicEvolvingpublic abstract class StateDescriptor
implements Serializable { /** * An enumeration of the types of supported states. Used to identify the state type * when writing and restoring checkpoints and savepoints. */ // IMPORTANT: Do not change the order of the elements in this enum, ordinal is used in serialization public enum Type { /** * @deprecated Enum for migrating from old checkpoints/savepoint versions. */ @Deprecated UNKNOWN, VALUE, LIST, REDUCING, FOLDING, AGGREGATING, MAP } private static final long serialVersionUID = 1L; // ------------------------------------------------------------------------ /** Name that uniquely identifies state created from this StateDescriptor. */ protected final String name; /** The serializer for the type. May be eagerly initialized in the constructor, * or lazily once the {@link #initializeSerializerUnlessSet(ExecutionConfig)} method * is called. */ @Nullable protected TypeSerializer
serializer; /** The type information describing the value type. Only used to if the serializer * is created lazily. */ @Nullable private TypeInformation
typeInfo; /** Name for queries against state created from this StateDescriptor. */ @Nullable private String queryableStateName; /** Name for queries against state created from this StateDescriptor. */ @Nonnull private StateTtlConfig ttlConfig = StateTtlConfig.DISABLED; /** The default value returned by the state when no other value is bound to a key. */ @Nullable protected transient T defaultValue; // ------------------------------------------------------------------------ /** * Create a new {@code StateDescriptor} with the given name and the given type serializer. * * @param name The name of the {@code StateDescriptor}. * @param serializer The type serializer for the values in the state. * @param defaultValue The default value that will be set when requesting state without setting * a value before. */ protected StateDescriptor(String name, TypeSerializer
serializer, @Nullable T defaultValue) { this.name = checkNotNull(name, "name must not be null"); this.serializer = checkNotNull(serializer, "serializer must not be null"); this.defaultValue = defaultValue; } /** * Create a new {@code StateDescriptor} with the given name and the given type information. * * @param name The name of the {@code StateDescriptor}. * @param typeInfo The type information for the values in the state. * @param defaultValue The default value that will be set when requesting state without setting * a value before. */ protected StateDescriptor(String name, TypeInformation
typeInfo, @Nullable T defaultValue) { this.name = checkNotNull(name, "name must not be null"); this.typeInfo = checkNotNull(typeInfo, "type information must not be null"); this.defaultValue = defaultValue; } /** * Create a new {@code StateDescriptor} with the given name and the given type information. * *

If this constructor fails (because it is not possible to describe the type via a class), * consider using the {@link #StateDescriptor(String, TypeInformation, Object)} constructor. * * @param name The name of the {@code StateDescriptor}. * @param type The class of the type of values in the state. * @param defaultValue The default value that will be set when requesting state without setting * a value before. */ protected StateDescriptor(String name, Class

type, @Nullable T defaultValue) { this.name = checkNotNull(name, "name must not be null"); checkNotNull(type, "type class must not be null"); try { this.typeInfo = TypeExtractor.createTypeInfo(type); } catch (Exception e) { throw new RuntimeException( "Could not create the type information for '" + type.getName() + "'. " + "The most common reason is failure to infer the generic type information, due to Java's type erasure. " + "In that case, please pass a 'TypeHint' instead of a class to describe the type. " + "For example, to describe 'Tuple2
' as a generic type, use " + "'new PravegaDeserializationSchema<>(new TypeHint
>(){}, serializer);'", e); } this.defaultValue = defaultValue; } // ------------------------------------------------------------------------ /** * Returns the name of this {@code StateDescriptor}. */ public String getName() { return name; } /** * Returns the default value. */ public T getDefaultValue() { if (defaultValue != null) { if (serializer != null) { return serializer.copy(defaultValue); } else { throw new IllegalStateException("Serializer not yet initialized."); } } else { return null; } } /** * Returns the {@link TypeSerializer} that can be used to serialize the value in the state. * Note that the serializer may initialized lazily and is only guaranteed to exist after * calling {@link #initializeSerializerUnlessSet(ExecutionConfig)}. */ public TypeSerializer
getSerializer() { if (serializer != null) { return serializer.duplicate(); } else { throw new IllegalStateException("Serializer not yet initialized."); } } /** * Sets the name for queries of state created from this descriptor. * *

If a name is set, the created state will be published for queries * during runtime. The name needs to be unique per job. If there is another * state instance published under the same name, the job will fail during runtime. * * @param queryableStateName State name for queries (unique name per job) * @throws IllegalStateException If queryable state name already set */ public void setQueryable(String queryableStateName) { Preconditions.checkArgument( ttlConfig.getUpdateType() == StateTtlConfig.UpdateType.Disabled, "Queryable state is currently not supported with TTL"); if (this.queryableStateName == null) { this.queryableStateName = Preconditions.checkNotNull(queryableStateName, "Registration name"); } else { throw new IllegalStateException("Queryable state name already set"); } } /** * Returns the queryable state name. * * @return Queryable state name or null if not set. */ @Nullable public String getQueryableStateName() { return queryableStateName; } /** * Returns whether the state created from this descriptor is queryable. * * @return true if state is queryable, false * otherwise. */ public boolean isQueryable() { return queryableStateName != null; } /** * Configures optional activation of state time-to-live (TTL). * *

State user value will expire, become unavailable and be cleaned up in storage * depending on configured {@link StateTtlConfig}. * * @param ttlConfig configuration of state TTL */ public void enableTimeToLive(StateTtlConfig ttlConfig) { Preconditions.checkNotNull(ttlConfig); Preconditions.checkArgument( ttlConfig.getUpdateType() != StateTtlConfig.UpdateType.Disabled && queryableStateName == null, "Queryable state is currently not supported with TTL"); this.ttlConfig = ttlConfig; } @Nonnull @Internal public StateTtlConfig getTtlConfig() { return ttlConfig; } // ------------------------------------------------------------------------ /** * Checks whether the serializer has been initialized. Serializer initialization is lazy, * to allow parametrization of serializers with an {@link ExecutionConfig} via * {@link #initializeSerializerUnlessSet(ExecutionConfig)}. * * @return True if the serializers have been initialized, false otherwise. */ public boolean isSerializerInitialized() { return serializer != null; } /** * Initializes the serializer, unless it has been initialized before. * * @param executionConfig The execution config to use when creating the serializer. */ public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { if (serializer == null) { checkState(typeInfo != null, "no serializer and no type info"); // instantiate the serializer serializer = typeInfo.createSerializer(executionConfig); // we can drop the type info now, no longer needed typeInfo = null; } } // ------------------------------------------------------------------------ // Standard Utils // ------------------------------------------------------------------------ @Override public final int hashCode() { return name.hashCode() + 31 * getClass().hashCode(); } @Override public final boolean equals(Object o) { if (o == this) { return true; } else if (o != null && o.getClass() == this.getClass()) { final StateDescriptor

that = (StateDescriptor
) o; return this.name.equals(that.name); } else { return false; } } @Override public String toString() { return getClass().getSimpleName() + "{name=" + name + ", defaultValue=" + defaultValue + ", serializer=" + serializer + (isQueryable() ? ", queryableStateName=" + queryableStateName + "" : "") + '}'; } public abstract Type getType(); // ------------------------------------------------------------------------ // Serialization // ------------------------------------------------------------------------ private void writeObject(final ObjectOutputStream out) throws IOException { // write all the non-transient fields out.defaultWriteObject(); // write the non-serializable default value field if (defaultValue == null) { // we don't have a default value out.writeBoolean(false); } else { // we have a default value out.writeBoolean(true); byte[] serializedDefaultValue; try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos)) { TypeSerializer
duplicateSerializer = serializer.duplicate(); duplicateSerializer.serialize(defaultValue, outView); outView.flush(); serializedDefaultValue = baos.toByteArray(); } catch (Exception e) { throw new IOException("Unable to serialize default value of type " + defaultValue.getClass().getSimpleName() + ".", e); } out.writeInt(serializedDefaultValue.length); out.write(serializedDefaultValue); } } private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { // read the non-transient fields in.defaultReadObject(); // read the default value field boolean hasDefaultValue = in.readBoolean(); if (hasDefaultValue) { int size = in.readInt(); byte[] buffer = new byte[size]; in.readFully(buffer); try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer); DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) { defaultValue = serializer.deserialize(inView); } catch (Exception e) { throw new IOException("Unable to deserialize default value.", e); } } else { defaultValue = null; } }}

  • StateDescriptor是ValueStateDescriptor、ListStateDescriptor、ReducingStateDescriptor、FoldingStateDescriptor、AggregatingStateDescriptor、MapStateDescriptor的基类,它定义了一个抽象方法,返回Type类型(VALUE,LIST,EDUCING,FOLDING,AGGREGATING,MAP),用于各个子类表达自己的Type类型
  • StateDescriptor提供了几个构造器,用于传递name、TypeSerializer或TypeInformation或Class类型信息、defaultValue
  • StateDescriptor重写了equals及hashCode方法;它还实现了Serializable接口,另外还通过writeObject及readObject自定义序列化过程

小结

  • RuntimeContext针对各种state提供了根据对应StateDescriptor的get方法,比如getState、getListState、getReducingState、getAggregatingState、getFoldingState、getMapState
  • StateDescriptor是ValueStateDescriptor、ListStateDescriptor、ReducingStateDescriptor、FoldingStateDescriptor、AggregatingStateDescriptor、MapStateDescriptor的基类,它定义了一个抽象方法,返回Type类型(VALUE,LIST,EDUCING,FOLDING,AGGREGATING,MAP),用于各个子类表达自己的Type类型
  • StateDescriptor重写了equals及hashCode方法;它还实现了Serializable接口,另外还通过writeObject及readObject自定义序列化过程

doc

转载于:https://my.oschina.net/go4it/blog/2992176

你可能感兴趣的文章
女兒,今天是你的生日
查看>>
机器学习零基础?手把手教你用TensorFlow搭建图像识别系统(二)
查看>>
韩国美女黑客是怎样的一种存在?
查看>>
利用无人机或吸尘器黑掉企业无线打印机
查看>>
Linux基础命令介绍九:进程与内存
查看>>
实现翻转卡片的动画效果
查看>>
即时通信增长持续放缓,差异化与多元化成为趋势
查看>>
《Linux内核精髓:精通Linux内核必会的75个绝技》一第2章 资源管理
查看>>
《认知设计:提升学习体验的艺术》——与学习者进行交流
查看>>
Facebook、亚马逊等巨头是如何建造自己的数据库的?| 全球聚焦
查看>>
默认OpenStack安全组:如何更改规则?
查看>>
IBM利用“沃森”超级电脑帮助员工对抗癌症
查看>>
传感器将获工信部重点关注
查看>>
调查:网络提供商不能有效抵御DDoS攻击
查看>>
人工智能+人=强大的网络安全
查看>>
Office 2016七月更新:Word和Outlook更智能,PowerPoint新动效
查看>>
“十三五”公共安全科技创新规划涉及哪些安防概念?
查看>>
论道商业WiFi:商业WiFi技术大起底
查看>>
大公司病怎么治?贝索斯致股东信泄露了天机
查看>>
AI 科技评论和学术青年们的 GAIR 小聚会,面基就要freestyle|CCF-GAIR 2017
查看>>