原创

Flink消费Kafka指定offset的五种方式

方式一:从topic中指定的group上次消费的位置开始消费。

必须配置group.id参数从消费者组提交的偏移量开始读取分区(kafka或zookeeper中)。如果找不到分区的偏移量,auto.offset.reset将使用属性中的设置(Kafka Properties中配置比auto.offset.reset优先级高)。

​ 默认是setStartFromGroupOffsets。任务从checkpoint重启,按照重启前的offset进行消费,如果直接重启不从检查点重启并且group.id不变,程序会按照上次提交的offset的位置继续消费。如果group.id改变了,则程序按照auto.offset.reset设置的属性进行消费。但是如果程序带有状态的算子,还是建议使用检查点重启

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(5000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);

Properties props = new Properties();
props.setProperty("bootstrap.servers",KAFKA_BROKER);
props.setProperty("zookeeper.connect", ZK_HOST);
props.setProperty("group.id",GROUP_ID);
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(TOPIC, new SimpleStringSchema(), props);

consumer.setStartFromGroupOffsets();

方式二 : 指定topic, 指定partition的offset位置

Map<KafkaTopicPartition, Long> offsets = new HashedMap();
offsets.put(new KafkaTopicPartition("topic_name", 0), 11L);
offsets.put(new KafkaTopicPartition("topic_name", 1), 22L);
offsets.put(new KafkaTopicPartition("topic_name", 2), 33L);
consumer.setStartFromSpecificOffsets(offsets);

Map Long参数指定的offset位置

KafkaTopicPartition构造函数有两个参数,第一个为topic名字,第二个为分区数.

  • 如果使用者需要读取在提供的偏移量映射中没有指定偏移量的分区,则它将回退到setStartFromGroupOffsets()该特定分区的默认组偏移行为。
  • 当作业从故障中自动恢复或使用保存点手动恢复时,这些起始位置配置方法不会影响起始位置。在恢复时,每个Kafka分区的起始位置由存储在保存点或检查点中的偏移量确定。

consumer.setStartFromSpecificOffsets(offsets);

方式三: 从topic中最初的数据开始消费

consumer.setStartFromEarliest();

方式四: 从指定的时间戳开始

consumer.setStartFromTimestamp(1559801580000l);

对于每个分区,时间戳大于或等于指定时间戳的记录将用作起始位置。如果分区的最新记录早于时间戳,则只会从最新记录中读取分区。在此模式下,Kafka中的已提交偏移将被忽略,不会用作起始位置。时间戳指的是kafka中消息自带的时间戳。

方式五: 从最新的数据开始消费

consumer.setStartFromLatest();
正文到此结束
本文目录