flink多目录路径读取数据源

二维码
| Apr 19, 2020 | 原创

在使用 flink 读取数据源时,经常会遇到从多个目录读取数据源的问题,例如像下面的 hdfs 路径:

如上数据为小时级切割数据,文件夹命名是以:年-月-日-小时 格式,但如果想计算某一天的全部数据,如:2019-11-19 ,那怎么才能让 flink 自动读取所有以 2019-11-19 开头的目录文件呢?

如果是使用 spark 开发,其实非常容易,直接传入模糊匹配的路径即可:

spark.read.text("/data/2019-11-19*/*")

但是在 flink 中却没有提供类似的方法,因此需要我们自己来扩展:

// 外层父级目录
String dir = "hdfs://namenode.yuankan.co/data";

Path path = new Path(dir);
Configuration configuration = new Configuration();
// 设置递归获取文件
configuration.setBoolean("recursive.file.enumeration", true);

TextInputFormat textInputFormat = new TextInputFormat(path);
textInputFormat.supportsMultiPaths();
textInputFormat.configure(configuration);
textInputFormat.setFilesFilter(new FilePathFilter() {
	@Override
	public boolean filterPath(Path filePath) {
		// 过滤想要的路径
		return  !filePath.toString().contains("2019-11-19");
	}
});

env.readFile(textInputFormat,dir)

整体思路也非常简单,利用内置的 readFile 方法,读取外层目录文件夹,过滤筛选出需要使用的数据源即可。

作者:猿谋人
博客:https://yuankan.co