flink实时转存kafka数据到hdfs注意事项

二维码
| Apr 26, 2020 | 原创

在实际项目实战中, flink 实时转存 kafka 数据到 hdfs 遇到一些具体的问题,这里整理总结一下。转存 hdfs 会用到两个内置的 sink 类:

  1. BucketingSink
  2. StreamingFileSink

BucketingSink

BucketingSink 算是老大哥,它是 flink 最早的同步 hdfs 的提供的方法,功能也相对完善,但是它有一个比较致命的缺点:

  1. 没有基于 savepoint 自动实现数据恢复 truncate 操作。

当基于 savepoint 重启任务或者想恢复异常到之前某个 savepoint 时,你会发觉他会为你创建一个 valid-length 结尾的文件,为你说明当时同步文件有效数据的长度,即之后的数据是重复的,需要你手动自行处理。

但除了这个问题,其它方面是有起来都还不错,也没有 StreamingFileSink 演衍生的问题,对 hdfs 版本也没有特殊要求。并且可以定义生成文件的前缀、后缀,文件大小等等,但是比较遗憾的是, flink 1.9.0 之后废弃了它。

示例

public static BucketingSink<String> getInstance(String idc, String hdfsPath) {
    BucketingSink<String> bucketSink = new BucketingSink<>(hdfsPath);
    bucketSink.setPartPrefix(idc);
    bucketSink.setBatchSize(128 * 1024 * 1024); // 单文件大小不超过128MB
    bucketSink.setBucketer(new EventTimeBucket());

    return bucketSink;
}

// 自定义基于log日志时间,分桶存储
public class EventTimeBucket implements Bucketer<String> {
    @Override
    public org.apache.hadoop.fs.Path getBucketPath(Clock clock, org.apache.hadoop.fs.Path basePath, String element) {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH", Locale.US);
        String newDateTimeString;
        try {
            JSONObject jsonObject = JSON.parseObject(element);
            String timeStr = jsonObject.getString("log_time");
            Date parse = new SimpleDateFormat("yyyyMMddHHmmss").parse(timeStr);
            newDateTimeString = simpleDateFormat.format(parse);
        } catch (Exception e) {
            System.out.println("parse time error");
            newDateTimeString = simpleDateFormat.format(new Date());
        }

        return new Path(basePath + "/" + newDateTimeString);
    }
}

StreamingFileSink

两个类都可以实现转存数据到 hdfs,但使用时也会遇到不少的坑。BucketingSink 算是老大哥,配置相对完善,StreamingFileSink 算是小弟,flink 1.6.0+ 才能使用,旨在替代 BucketingSink ,但公司目前使用的 flink 集群是 1.7.0 版本,这个版本下的 StreamingFileSink 小弟还不够成熟,总结下俩还有如下一些问题:

  1. 无法自定义同步文件的名称前缀(prefix)和后缀(suffix),1.10.0+ 提供了方法。
  2. 同步的数据到 hdfs 会变成多个小文件,碎片化严重。
  3. 使用有条件要求,hdfs 版本必须大于 2.7.0

示例

public static main(String[] args){
       // 输出, 并按小时级分桶存储
        StreamingFileSink<String> bucketSink = HdfsSink.StreamSink("hdfs://abd/path");
	// ...
}

public static StreamingFileSink<String> StreamSink(String hdfsPath) {
        DefaultRollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy
                .create()
                .withRolloverInterval(Long.MAX_VALUE) // 滚动写入新文件的时间,默认60s。这里设置为无限大
                .withInactivityInterval(180 * 1000) // 60s空闲,就滚动写入新的文件
                .build();

        return StreamingFileSink.forRowFormat(new Path(hdfsPath), new SimpleStringEncoder<String>())
                .withBucketAssigner(new HdfsLogHourAssigner())
                .withRollingPolicy(rollingPolicy)
                .withBucketCheckInterval(1000L)
                .build();
}

// 自定义按日志时间,作为分桶时间
public class HdfsLogHourAssigner implements BucketAssigner<String, String> {

    private SimpleDateFormat simpleDateFormat, logDateFormat;

    @Override
    public String getBucketId(String element, Context context) {
        if (simpleDateFormat == null) {
            simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH", Locale.US);
        }

        if (logDateFormat == null) {
            logDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
        }

        try {
            JSONObject jsonObject = JSON.parseObject(element);
            String timeStr = jsonObject.getString("log_time");
            Date parse = logDateFormat.parse(timeStr);

            return simpleDateFormat.format(parse);
        } catch (Exception e) {
            System.out.println("parse time error");

            return simpleDateFormat.format(new Date());
        }
    }

    @Override
    public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;
    }
}

StreamFileSink 的好处在于,能够基于 savepoint 自动实现数据的 exactly-once 级精确数据容灾,例如 flink 任务重启、或者脚本异常挂掉重启时,可以基于 savepoint 机制自动恢复处理数据,不会出现数据重复的问题。