Flink中的多事件Join

流式计算中的 2 个问题:

  1. 流式数据到达计算引擎的时间不一定:比如 A 流的数据先到了,A 流不知道 B 流对应同 key 的数据什么时候到,没法关联(数据质量问题)

  2. 流式数据不知何时、下发怎样的数据: A 流的数据到达后,如果 B 流的数据永远不到,那么 A 流的数据在什么时候以及是否要填充一个 null 值下发下去(数据时效问题)

一、Flink Join 解决方案:Flink Window Join
1.1 拆解分析

Flink Window Join就是将两条流的数据从无界数据变为有界数据,即划分出时间窗口,然后将同一时间窗口内的两条流的数据做 Join(这里的时间窗口支持 Tumbling、Sliding、Session)

面对文章开始提出的两个问题,Regular Join会怎么解决

1、 流式数据到达计算引擎的时间不一定:数据已经被划分为窗口,无界数据变为有界数据,就和离线批处理的方式一样了,两个窗口的数据简单的进行关联即可

2、 流式数据不知何时、下发怎样的数据:窗口结束就把数据下发下去,关联到的数据就下发 [A, B],没有关联到的数据取决于是否是 outer join 然后进行数据下发

方案目前支持 Flink DataStream API、SQL API 两种:


DataStream API:方案只支持 inner join,即窗口内能关联到的才会下发,关联不到的则直接丢掉。

val flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment()
// A 流
val aStream = flinkEnv.addSource(new SourceFunction[Object] {
  override def run(ctx: SourceContext[Object]): Unit = {
    // 实现数据源逻辑
  }
  override def cancel(): Unit = {
    // 取消数据源逻辑
  }
})

// B 流
val bStream = flinkEnv.addSource(new SourceFunction[Object] {
  override def run(ctx: SourceContext[Object]): Unit = {
    // 实现数据源逻辑
  }
  override def cancel(): Unit = {
    // 取消数据源逻辑
  }
})

// A 流的 keyby 条件
val keyedAStream = aStream.keyBy((value: Object) => {
  // 实现Key选择逻辑
})

// B 流的 keyby 条件
val keyedBStream = bStream.keyBy((value: Object) => {
  // 实现Key选择逻辑
})

// 连接、窗口和应用逻辑
keyedAStream
  .join(keyedBStream)
  .where((value: Object) => {
    // A 流的 keyby 条件
  })
  .equalTo((value: Object) => {
    // B 流的 keyby 条件
  })
  .window(TumblingEventTimeWindows.of(Time.seconds(60)))
  .apply(new JoinFunction[Object, Object, Object] {
    override def join(first: Object, second: Object): Object = {
      // 窗口中关联到的数据的处理逻辑
    }
  })

如果你想实现 window 上的 outer join,可以使用 coGroup 算子,关注如下案例:

使用coGroup函数来关联两个数据流。这个示例中的两个数据流(A 流和 B 流)从不同的socket端口接收文本数据,然后基于相同的键(这里是字符串的第一个元素)进行关联。如果数据在指定的窗口期内没有匹配项,您可以自定义输出数据的格式。

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // A 流
    val input1: DataStream[(String, String)] = env.socketTextStream("", 9002)
      .map(new MapFunction[String, (String, String)] {
        override def map(s: String): (String, String) = {
          val parts = s.split(" ")
          (parts(0), parts(1))
        }
      })

    // B 流
    val input2: DataStream[(String, String)] = env.socketTextStream("", 9001)
      .map(new MapFunction[String, (String, String)] {
        override def map(s: String): (String, String) = {
          val parts = s.split(" ")
          (parts(0), parts(1))
        }
      })

    // A 流关联 B 流
    input1.coGroup(input2)
      // A 流的 keyby 条件
      .where(new KeySelector[(String, String), String] {
        override def getKey(value: (String, String)): String = value._1
      })
      // B 流的 keyby 条件
      .equalTo(new KeySelector[(String, String), String] {
        override def getKey(value: (String, String)): String = value._1
      })
      // 窗口
      .window(ProcessingTimeSessionWindows.withGap(Time.seconds(3)))
      .apply(new CoGroupFunction[(String, String), (String, String), String] {
        override def coGroup(first: Iterable[(String, String)], second: Iterable[(String, String)], out: Collector[String]): Unit = {
          val buffer = new StringBuffer
          buffer.append("DataStream first:\n")
          first.foreach(value => buffer.append(value._1 + "=>" + value._2 + "\n"))
          buffer.append("DataStream second:\n")
          second.foreach(value => buffer.append(value._1 + "=>" + value._2 + "\n"))
          out.collect(buffer.toString)
        }
      }).print()

    env.execute()
  }

还可以使用 connect 算子自定义各种关联操作(connect 算子相比 join、coGroup 算子灵活很多):

val env = StreamExecutionEnvironment.getExecutionEnvironment
// 假设 UserEvent 是一个已定义的类,包含 getUserId 方法
case class UserEvent(userId: String)
// Kafka 用户事件源
val kafkaUserEventSource = new FlinkKafkaConsumer010[UserEvent]("input-topic", new UserEventDeserializationSchema(), properties)
// 用户事件流
val customerUserEventStream: KeyedStream[UserEvent, String] = env
  .addSource(kafkaUserEventSource)
  .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[UserEvent] {
    override def extractAscendingTimestamp(element: UserEvent): Long = {
      // 提取时间戳的逻辑
    }}).keyBy(_.getUserId)
// Kafka 配置事件源
val kafkaConfigEventSource = new FlinkKafkaConsumer010[Config]("config-topic", new ConfigDeserializationSchema(), properties)
// 广播流
val configBroadcastStream: BroadcastStream[Config] = env
  .addSource(kafkaConfigEventSource)
  .broadcast(configStateDescriptor)
// Kafka 生产者
val kafkaProducer = new FlinkKafkaProducer010[EvaluatedResult](
  "output-topic",
  new EvaluatedResultSerializationSchema(),
  producerProps)
// 连接流和处理逻辑
val connectedStream: DataStream[EvaluatedResult] = customerUserEventStream
  .connect(configBroadcastStream)
  .process(new ConnectedBroadcastProcessFunction())
connectedStream.addSink(kafkaProducer)
env.execute("Flink Scala Job")

SQL API:

SELECT 
    L.num as L_Num
    , L.id as L_Id
    , R.num as R_Num
    , R.id as R_Id
    , L.window_start
    , L.window_end
FROM (
    SELECT * 
    FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) L
FULL JOIN (
    SELECT * 
    FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) R
ON L.num = R.num 
AND L.window_start = R.window_start 
AND L.window_end = R.window_end;
1.2 方案的特点

当我们的窗口大小划分的越细时,在窗口边缘关联不上的数据就会越多,数据质量就越差。窗口大小划分的越宽时,窗口内关联上的数据就会越多,数据质量越好,但是产出时效性就会越差。所以小伙伴萌在使用时要注意取舍。

举个例子:以曝光关联点击来说,如果我们划分的时间窗口为 1 分钟,那么一旦出现曝光在 0:59,点击在 1:01 的情况,就会关联不上,当我们的划分的时间窗口 1 小时时,只有在每个小时的边界处的数据才会出现关联不上的情况。

以上可以大致得到其特点:数据质量:低 数据产出时效:中

1.3 方案的适用场景

方案适用于可以评估出窗口内的关联率高的场景,如果窗口内关联率不高则不建议使用。 注意:这种方案由于上面说到的数据质量和时效性问题在实际生产环境中很少使用。

二、Flink Join 解决方案:Flink Interval Join
2.1 拆解分析

将两条流的数据从无界数据变为有界数据,但是这里的有界和上节说到的 Flink Window Join 的有界的概念是不一样的,这里的有界是指两条流之间的有界。

以 A 流 join B 流举例,interval join 可以让 A 流可以关联 B 流一段时间区间内的数据,比如 A 流关联 B 流前后 5 分钟的数据。

面对文章开始提出的两个问题,Interval Join会怎么解决

  1. 流式数据到达计算引擎的时间不一定:数据已经被划分为窗口,无界数据变为有界数据,就和离线批处理的方式一样了,两个窗口的数据简单的进行关联即可

  2. 流式数据不知何时、下发怎样的数据:窗口结束(这里的窗口结束是指 interval 区间结束,区间的结束是利用 watermark 来判断的)就把数据下发下去,关联到的数据就下发 [A, B],没有关联到的数据取决于是否是 outer join 然后进行数据下发

Interval Join目前支持 Flink DataStream API 和 SQL API 两种:

DataStream API:

clickRecordStream
  .keyBy(record -> record.getMerchandiseId())
  .intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId()))
  // 定义 interval 的时间区间
  .between(Time.seconds(-30), Time.seconds(30))
  .process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
    @Override
    public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {
      collector.collect(StringUtils.join(Arrays.asList(
        accessRecord.getMerchandiseId(),
        orderRecord.getPrice(),
        orderRecord.getCouponMoney(),
        orderRecord.getRebateAmount()
      ), '\t'));
    }}).print();

SQL API:

CREATE TABLE show_log_table (
     log_id BIGINT,
     show_params STRING,
     row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
     WATERMARK FOR row_time AS row_time
 ) WITH (
   'connector' = 'datagen',
   'rows-per-second' = '1',
   'fields.show_params.length' = '1',
   'fields.log_id.min' = '1',
   'fields.log_id.max' = '10'
 );
 
 CREATE TABLE click_log_table (
     log_id BIGINT,
     click_params STRING,
     row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
     WATERMARK FOR row_time AS row_time
 )
 WITH (
   'connector' = 'datagen',
   'rows-per-second' = '1',
   'fields.click_params.length' = '1',
   'fields.log_id.min' = '1',
   'fields.log_id.max' = '10'
 );
 
 CREATE TABLE sink_table (
     s_id BIGINT,
     s_params STRING,
     c_id BIGINT,
     c_params STRING
 ) WITH (
   'connector' = 'print'
 );
 
 INSERT INTO sink_table
 SELECT
     show_log_table.log_id as s_id,
     show_log_table.show_params as s_params,
     click_log_table.log_id as c_id,
     click_log_table.click_params as c_params
 FROM show_log_table FULL JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
 AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '5' SECOND AND click_log_table.row_time
2.2 Interval Join特点

interval join 的方案比 window join 方案在数据质量上好很多,但是其也是存在 join 不到的情况的。并且如果为 outer join 的话,outer 一测的流数据需要要等到区间结束才能下发。

这样以来可以大概知道其产出数据质量:中 产出数据时效性:中

2.3 Interval Join适应场景

方案适用于两条流之间可以明确评估出相互延迟的时间是多久的,这里我们可以使用离线数据进行评估,使用离线数据的两条流的时间戳做差得到一个分布区间。

比如在 A 流和 B 流时间戳相差在 1min 之内的有 95%,在 1-4 min 之内的有 4.5%,则我们就可以认为两条流数据时间相差在 4 min 之内的有 99.5%,这时我们将上下界设置为 4min 就是一个能保障 0.5% 误差的合理区间。

注意:这种方案在生产环境中还是比较常用的。

三、Flink Join 解决方案:Flink Regular Join
3.1 拆解分析

window join 和 Interval Join都是基于划分窗口,将无界数据变为有界数据进行关联机制,但是本节说的 regular join 则还是基于无界数据进行关联。

以 A 流 left join B 流举例,A 流数据到来之后,直接去尝试关联 B 流数据。

  1. 如果关联到了则直接下发关联到的数据

  2. 如果没有关联到则也直接下发没有关联到的数据,后续 B 流中的数据到来之后,会把之前下发下去的没有关联到数据撤回,然后把关联到的数据数据进行下发。由此可以看出这是基于 Flink SQL 的 retract 机制,则也就说明了其目前只支持 Flink SQL。

面对文章开始提出的两个问题,Regular Join会怎么解决

  1. 流式数据到达计算引擎的时间不一定:两条流的数据会尝试关联,能关联到直接下发,关联不到先下发一个目前的结果数据

  2. 流式数据不知何时、下发怎样的数据:两条流的数据会尝试关联,能关联到直接下发,关联不到先下发一个目前的结果数据

这应该是目前最好的一种数据关联方式,Regular Join目前只支持SQL API,案例如下:

CREATE TABLE show_log_table (
    log_id BIGINT,
    show_params STRING
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'fields.show_params.length' = '3',
  'fields.log_id.min' = '1',
  'fields.log_id.max' = '10'
);
 
CREATE TABLE click_log_table (
  log_id BIGINT,
  click_params     STRING
)
WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'fields.click_params.length' = '3',
  'fields.log_id.min' = '1',
  'fields.log_id.max' = '10'
);
 
CREATE TABLE sink_table (
    s_id BIGINT,
    s_params STRING,
    c_id BIGINT,
    c_params STRING
) WITH (
  'connector' = 'print'
);
 
INSERT INTO sink_table
SELECT
    show_log_table.log_id as s_id,
    show_log_table.show_params as s_params,
    click_log_table.log_id as c_id,
    click_log_table.click_params as c_params
FROM show_log_table
LEFT JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;
3.2 Regular Join特点

数据质量和时效性高的原因都是因为 regular join 会保障目前 Flink 任务已经接收到的数据中能关联的一定是关联上的,即使关联不上,数据也会下发,完完全全保障了当前数据的客观性和时效性。

这样的方式可以做到数据质量产出和数据产出时效都是高状态

3.3 Regular Join适应场景

该种解决方案虽然是目前在产出质量、时效性上最好的一种解决方案,但是在实际场景中使用时,也存在一些问题:

  1. 基于 retract 机制,所有的数据都会存储在 state 中以判断能否关联到,所以我们要设置合理的 state ttl 来避免大 state 问题导致的任务不稳定

  2. 基于 retract 机制,所以在数据发生更新时,会下发回撤数据、最新数据 2 条消息,当我们的关联层级越多,则下发消息量的也会放大

  3. sink 组件要支持 retract,我们不要忘了最终数据是要提供数据服务给需求方进行使用的,所以我们最终写入的数据组件也需要支持 retract,比如 MySQL。如果写入的是 Kafka,则下游消费这个 Kafka 的引擎也需要支持回撤\更新机制。

四、三种方案中共性问题优化

无论是哪一种 Join 方案,Join 的前提都是将 A 流和 B 流的数据先存储在状态中,然后再进行关联。

即在实际生产中使用时常常会碰到的问题就是:大状态的问题。

关于大状态问题业界常见两种解决思路:

  1. 减少状态大小:在 Flink Join 中的可以想到的优化措施就是减少 state key 的数量。在未优化之前 A 流和 B 流的数据往往是存储在单独的两个 State 实例中的,那么我们的优化思路就是将同 Key 的数据放在一起进行存储,一个 key 的数据只需要存储一份,减少了 key 的数量

  2. 转移状态至外存:大 State 会导致 Flink 任务不稳定,那么我们就将 State 存储在外存中,让 Flink 任务轻量化,比如将数据存储在 Redis 中,A 流和 B 流中相同 key 的数据共同维护在一个 Redis 的 hashmap 中,以供相互进行关联

4.1 优化方案:同 key 共享 state
  • 优化方案说明

    将两条流的数据使用 union、connect 算子合并在一起,然后使用一个共享的 state 进行处理。

  • 种优化方案使用 DataStream API

    // 假设 FlinkEnv 和 FlinkEnvUtils 是您项目中已经定义好的类
    val flinkEnv = FlinkEnvUtils.getStreamTableEnv(args)
    
    flinkEnv.env().setParallelism(1)
    
    val source1: DataStream[Object] = flinkEnv.env()
      .addSource(new SourceFunction[Object] {
        override def run(ctx: SourceFunction.SourceContext[Object]): Unit = {
          // 实现数据源逻辑
        }
        override def cancel(): Unit = {
          // 实现取消逻辑
        }
      })
    
    val keyedStream1: KeyedStream[Object, Object] = source1.keyBy(new KeySelector[Object, Object] {
      override def getKey(value: Object): Object = {
        // 实现 key 选择逻辑
        null
      }
    })
    
    val source2: DataStream[Object] = flinkEnv.env()
      .addSource(new SourceFunction[Object] {
        override def run(ctx: SourceFunction.SourceContext[Object]): Unit = {
          // 实现数据源逻辑
        }
        override def cancel(): Unit = {
          // 实现取消逻辑
        }
      })
    
    val keyedStream2: KeyedStream[Object, Object] = source2.keyBy(new KeySelector[Object, Object] {
      override def getKey(value: Object): Object = {
        // 实现 key 选择逻辑
        null
      }
    })
    
    keyedStream1
      .connect(keyedStream2)
      .process(new KeyedCoProcessFunction[Object, Object, Object, Object] {
        var mapState: MapState[String, String] = _
    
        override def open(parameters: Configuration): Unit = {
          super.open(parameters)
          mapState = getRuntimeContext.getMapState(new MapStateDescriptor[String, String]("a", classOf[String], classOf[String]))
        }
    
        override def processElement1(value: Object, ctx: KeyedCoProcessFunction[Object, Object, Object, Object]#Context, out: Collector[Object]): Unit = {
          // 实现处理逻辑
        }
    
        override def processElement2(value: Object, ctx: KeyedCoProcessFunction[Object, Object, Object, Object]#Context, out: Collector[Object]): Unit = {
          // 实现处理逻辑
        }
      })
      .print()
    
    flinkEnv.env().execute("Flink Scala Job")
  • 特点

    1.state 的过期方式

    2.左右两条流的数据的 state 中的存储方式

    3.左右两条流数据在关联不到对方的情况下是否要输出到下游、输出什么样的数据到下游的方式

文章引用自:Flink流Join方案