本文共 1552 字,大约阅读时间需要 5 分钟。
CheckPoint是Flink实现故障容错的一种机制,系统会根据配置的检查点定期自动对程序计算状态进行备份。一旦程序在计算过程中出现故障,系统会选择一个最近的检查点进行故障恢复。
SavePoint是一种有效的运维手段,需要用户手动触发程序进行状态备份,本质也是在做CheckPoint。
./bin/flink cancel -m centos:8081 -s hdfs:///savepoints f21795e74312eb06fbf0d48cb8d90489
实现故障恢复的先决条件:
代码配置如下,关于不同的job任务,计算复杂程度和资源使用情况是不一样的,所以关于该参数配置,没有最优方案,要根据服务器的性能、任务的实际情况进行调试。
原则:因为checkpoint会牺牲计算的实时性(barrier实现),所以在保证程序checkpoint成功的情况下,不建议时间间隔太短。
var env=StreamExecutionEnvironment.getExecutionEnvironment//启动检查点机制env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE)//配置checkpoint必须在2s内完成一次checkpoint,否则检查点终止env.getCheckpointConfig.setCheckpointTimeout(2000)//设置checkpoint之间时间间隔 <= Checkpoint interval。忽略检查点的时间间隔env.getCheckpointConfig.setMinPauseBetweenCheckpoints(5)//配置checkpoint并行度,不配置默认1env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)//一旦检查点不能正常运行,Task也将终止env.getCheckpointConfig.setFailOnCheckpointingErrors(true)//将检查点存储外围系统 filesystem、rocksdb,可以配置在cancel任务时候,系统是否保留checkpointenv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
首先,这在Spark(后期有时间博主会分享关于spark的知识)中是不允许的,因为Spark会持久化代码片段,一旦修改代码,必须删除Checkpoint,但是Flink仅仅存储各个算子的计算状态,如果用户修改代码,需要用户在有状态的操作算子上指定uid属性。
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props)).uid("kakfa-consumer").flatMap(line => line.split("\\s+")).map((_,1)).keyBy(0) //只可以写一个参数.sum(1).uid("word-count") //唯一即可.map(t=>t._1+"->"+t._2).print()
转载地址:http://limzi.baihongyu.com/