about云開發

 找回密碼
 立即注冊

QQ登錄

只需一步,快速開始

掃一掃,訪問微社區

查看: 1397|回復: 0
打印 上一主題 下一主題

[連載型] Flink難點:徹底明白CEP8,CEP庫中的時間、例子、版本說明

[復制鏈接]
跳轉到指定樓層
樓主
發表于 2019-6-17 16:04:44 | 只看該作者 回帖獎勵 |倒序瀏覽 |閱讀模式
本帖最后由 pig2 于 2019-6-17 16:09 編輯

問題導讀

1.對于事件延遲,cep是如何處理的?
2.對于cep編程,本文舉了一個什么例子?
3.從較舊的Flink版本遷移有哪些需要注意?

上一篇:
Flink難點:徹底明白CEP8,模式檢測
http://www.lcuoip.tw/forum.php?mod=viewthread&tid=27335


CEP庫中的時間

處理事件時間的延遲
在CEP中,元素處理的順序很重要。 為了保證在事件時間工作時元素以正確的順序處理,傳入元素最初放在緩沖區中,元素按照時間戳按升序排序,當水印到達時,此緩沖區中的所有元素都包含在 處理小于水印的時間戳。 這意味著水印之間的元素按事件時間順序處理。

注意:在事件時間工作時,庫假定水印的正確性。

為了保證水印中的元素按事件時間順序處理,Flink的CEP庫假定水印的正確性,并將其視為時間戳小于上次看到的水印的后期元素。 后期元素不會被進一步處理。 此外,可以指定sideOutput標記來收集最后看到的水印之后的后期元素,可以像這樣使用它。


[Scala] 純文本查看 復制代碼
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val lateDataOutputTag = OutputTag[String]("late-data")

val result: SingleOutputStreamOperator[ComplexEvent] = patternStream
      .sideOutputLateData(lateDataOutputTag)
      .select{
          pattern: Map[String, Iterable[ComplexEvent]] => ComplexEvent()
      }

val lateData: DataStream[String] = result.getSideOutput(lateDataOutputTag)

[Java] 純文本查看 復制代碼
PatternStream<Event> patternStream = CEP.pattern(input, pattern);

OutputTag<String> lateDataOutputTag = new OutputTag<String>("late-data"){};

SingleOutputStreamOperator<ComplexEvent> result = patternStream
    .sideOutputLateData(lateDataOutputTag)
    .select(
        new PatternSelectFunction<Event, ComplexEvent>() {...}
    );

DataStream<String> lateData = result.getSideOutput(lateDataOutputTag);


時間context
在PatternProcessFunction以及IterativeCondition中,用戶可以訪問實現TimeContext的上下文,如下所示:
[Bash shell] 純文本查看 復制代碼
/**
 * Enables access to time related characteristics such as current processing time or timestamp of
 * currently processed element. Used in {@link PatternProcessFunction} and
 * {@link org.apache.flink.cep.pattern.conditions.IterativeCondition}
 */
@PublicEvolving
public interface TimeContext {

        /**
         * Timestamp of the element currently being processed.
         *
         * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this
         * will be set to the time when event entered the cep operator.
         */
        long timestamp();

        /** Returns the current processing time. */
        long currentProcessingTime();
}

此上下文使用戶可以訪問已處理事件的時間特征(IterativeCondition中的傳入記錄和PatternProcessFunction情況下的匹配)。 調用TimeContext#currentProcessingTime總是為提供當前處理時間的值,此調用應優先于例如 調用System.currentTimeMillis()。

在TimeContext #timestamp()的情況下,返回的值等于EventTime的指定時間戳。 在ProcessingTime中,這將等于所述事件進入cep運算符的時間點(或者在PatternProcessFunction的情況下生成匹配時)。 這意味著該值在多次調用該方法時將保持一致。


例子
以下示例檢測事件的key數據流上的模式start,middle(name =“error”) - > end(name =“critical”)。 事件由其ID鍵入,并且有效模式必須在10秒內發生。 整個處理是在事件時間完成的。
[Scala] 純文本查看 復制代碼
val env : StreamExecutionEnvironment = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val input : DataStream[Event] = ...

val partitionedInput = input.keyBy(event => event.getId)

val pattern = Pattern.begin[Event]("start")
  .next("middle").where(_.getName == "error")
  .followedBy("end").where(_.getName == "critical")
  .within(Time.seconds(10))

val patternStream = CEP.pattern(partitionedInput, pattern)

val alerts = patternStream.select(createAlert(_))

[Java] 純文本查看 復制代碼
StreamExecutionEnvironment env = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Event> input = ...

DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() {
        @Override
        public Integer getKey(Event value) throws Exception {
                return value.getId();
        }
});

Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
        .next("middle").where(new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event value) throws Exception {
                        return value.getName().equals("error");
                }
        }).followedBy("end").where(new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event value) throws Exception {
                        return value.getName().equals("critical");
                }
        }).within(Time.seconds(10));

PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);

DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
        @Override
        public Alert select(Map<String, List<Event>> pattern) throws Exception {
                return createAlert(pattern);
        }
});


從較舊的Flink版本遷移(1.3之前版本)

遷移到1.4+
在Flink-1.4中,CEP庫與<= Flink 1.2的向后兼容性被刪除。 遺憾的是,無法恢復曾經使用1.2.x運行的CEP作業


遷移到1.3.x.
Flink-1.3中的CEP庫附帶了許多新功能,這些功能導致了API的一些變化。在這里,我們描述了您需要對舊CEP作業進行的更改,以便能夠使用Flink-1.3運行它們。進行這些更改并重新編譯作業后,將能夠從使用舊版本作業的保存點恢復執行,即無需重新處理過去的數據。

所需的更改是:

  • 更改條件(where(...)子句中的條件)以擴展SimpleCondition類,而不是實現FilterFunction接口。

  • 更改select(...)和flatSelect(...)方法的參數,以期望與每個模式關聯事件列表(Java中的List,Scala中的Iterable)。這是因為通過添加循環模式,多個輸入事件可以匹配單個(循環)模式。

  • Flink 1.1和1.2中的followBy()暗示了非確定性松弛連續性(見此處)。在Flink 1.3中,這已經改變并且followBy()意味著放松的連續性,而如果需要非確定性的松弛連續性,則應該使用followAyAny()。



最新經典文章,歡迎關注公眾號




加入About云知識星球,獲取更多實用資料


分享到:  QQ好友和群QQ好友和群 QQ空間QQ空間 騰訊微博騰訊微博 騰訊朋友騰訊朋友 微信微信
收藏收藏 轉播轉播 分享分享 分享淘帖 贊 踩 分享到微信分享到微信
您需要登錄后才可以回帖 登錄 | 立即注冊

本版積分規則

關閉

推薦上一條 /3 下一條

QQ|小黑屋|about云開發-學問論壇|社區 ( 京ICP備12023829號

GMT+8, 2019-8-6 20:27 , Processed in 0.447302 second(s), 32 queries , Gzip On.

Powered by Discuz! X3.2 Licensed

快速回復 返回頂部 返回列表
排球比赛场地