聊聊Flink:这次把Flink的触发器(Trigger)、移除器(Evictor)讲透
一、触发器(Trigger)
Trigger 决定了一个窗口(由 window assigner 定义)何时可以被 window function 处理。每个 WindowAssigner 都有一个默认的 Trigger。如果默认 trigger 无法满足你的需要,你可以在 trigger(…) 调用中指定自定义的 trigger。
1.1 Flink中预置的Trigger
窗口的计算触发依赖于窗口触发器,每种类型的窗口都有对应的窗口触发机制,都有一个默认的窗口触发器,触发器的作用就是去控制什么时候来触发计算。flink内部定义多种触发器,每种触发器对应于不同的WindowAssigner。常见的触发器如下:
- EventTimeTrigger:通过对比EventTime和窗口的Endtime确定是否触发窗口计算,如果EventTime大于Window EndTime则触发,否则不触发,窗口将继续等待。
- ProcessTimeTrigger:通过对比ProcessTime和窗口EndTme确定是否触发窗口,如果ProcessTime大于EndTime则触发计算,否则窗口继续等待。
- ProcessingTimeoutTrigger:可以将任何触发器转变为超时触发器。
- ContinuousEventTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前EndTime触发窗口计算。
- ContinuousProcessingTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前ProcessTime触发窗口计算。
- CountTrigger:根据接入数据量是否超过设定的阙值判断是否触发窗口计算。
- DeltaTrigger:根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算。
- PurgingTrigger:可以将任意触发器作为参数转换为Purge类型的触发器,计算完成后数据将被清理。
- NeverTrigger:任何时候都不触发窗口计算
1.2 Trigger的抽象类
Trigger 接口提供了五个方法来响应不同的事件:
- onElement() 方法在每个元素被加入窗口时调用。
- onEventTime() 方法在注册的 event-time timer 触发时调用。
- onProcessingTime() 方法在注册的 processing-time timer 触发时调用。
- canMerge() 方法判断是否可以合并。
- onMerge() 方法与有状态的 trigger 相关。该方法会在两个窗口合并时, 将窗口对应 trigger 的状态进行合并,比如使用会话窗口时。
- clear() 方法处理在对应窗口被移除时所需的逻辑。
触发器接口的源码如下:
@PublicEvolving public abstract class Trigger implements Serializable { private static final long serialVersionUID = -4104633972991191369L; /** * Called for every element that gets added to a pane. The result of this will determine whether * the pane is evaluated to emit results. * * @param element The element that arrived. * @param timestamp The timestamp of the element that arrived. * @param window The window to which the element is being added. * @param ctx A context object that can be used to register timer callbacks. */ public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception; /** * Called when a processing-time timer that was set using the trigger context fires. * * @param time The timestamp at which the timer fired. * @param window The window for which the timer fired. * @param ctx A context object that can be used to register timer callbacks. */ public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception; /** * Called when an event-time timer that was set using the trigger context fires. * * @param time The timestamp at which the timer fired. * @param window The window for which the timer fired. * @param ctx A context object that can be used to register timer callbacks. */ public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception; /** * Returns true if this trigger supports merging of trigger state and can therefore be used with * a {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}. * * If this returns {@code true} you must properly implement {@link #onMerge(Window, * OnMergeContext)} */ public boolean canMerge() { return false; } /** * Called when several windows have been merged into one window by the {@link * org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. * * @param window The new window that results from the merge. * @param ctx A context object that can be used to register timer callbacks and access state. */ public void onMerge(W window, OnMergeContext ctx) throws Exception { throw new UnsupportedOperationException("This trigger does not support merging."); } /** * Clears any state that the trigger might still hold for the given window. This is called when * a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)} and * {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as well as * state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}. */ public abstract void clear(W window, TriggerContext ctx) throws Exception; // ------------------------------------------------------------------------ /** * A context object that is given to {@link Trigger} methods to allow them to register timer * callbacks and deal with state. */ public interface TriggerContext { // ... } /** * Extension of {@link TriggerContext} that is given to {@link Trigger#onMerge(Window, * OnMergeContext)}. */ public interface OnMergeContext extends TriggerContext {