flink自定义Bucketer实现基于日志EventTime的精确小时级归档hdfs操作

二维码
| Jun 14, 2020 | 原创

使用 flinkkafka 队列数据按小时或天级别分文件目录归档存储到 hdfs 中是我们常见的数据处理方式,但在 flink 默认的数据输出 hdfs配置中,时间的分桶是基于当前程序运行时间而不是日志真正触发的时间,有时候我们希望是基于日志本身触发的时间(EventTime)来定义数据切分规则,这样能够保证就算队列出现堆积或网络延迟,也不会导致日志日志混乱。

如果我们想要实现这样的功能,那么我们就需要自定义 Bucketer 来告诉 flink 每一条日志应该放在哪一个路径下,代码也非常简单:

public class EventTimeBucket implements Bucketer<Log> {
    @Override
    public org.apache.hadoop.fs.Path getBucketPath(Clock clock, org.apache.hadoop.fs.Path basePath, Log element) {
        return new Path(basePath + "/" + element.getTime());
    }
}

我们只需要自定义实现 getBucketPath 方法即可,该方法会返回每一条数据的需要写入 hdfs的路径,由于我们再日志处理阶段已经把日志封装到了 Log 类中,因为我们直接返回该该数据的时间即可。

getBucketPath 函数的第二个参数代表的是 hdfs 的路径,所以文件会基于此目录作为 base目录存放,第三个参数即为操作的每条日志,如果你操作的日志是并未有做任何处理原始日志,那么你可以解析原始日志,然后提取日志中的时间作为分桶标记即可。

最后调用代码如下:

// 继承 Buckter 类型
String output = "hdfs://namenode.dfs.shbt.qihoo.net:9000/home/test/log/";

// 输出, 并按小时级分桶存储
BucketingSink<CubeLog> bucketSink = new BucketingSink<>(output);
bucketSink.setBucketer(new EventTimeBucket());

env.addSource(consumer).flatMap(new LogFlatMapData()).addSink(bucketSink);