Spark使用多文件输出

需求描述

最近遇到了这样一个场景,用spark去处理多个数据源的数据,但是这些数据都掺杂在一起,需要根据不同数据源将数据存储在不同的目录中。

解决方案

我们spark处理晚的数据,在存储的时候可以使用saveAsHadoopFile去自定义存储路径和格式。

具体实例如下:
1、首先需要创建一个类,去继承MultipleTextOutputFormat类,并重写它的generateFileNameForKeyValue方法。

而MultipleTextOutputFormat是何方神圣呢?
其实MultipleTextOutputFormat是hadoop的多文件输出类。在写每条记录之前,MultipleOutputFormat将调用generateFileNameForKeyValue方法来确定需要写入的文件名。
其中generateFileNameForKeyValue方法的默认实现如下:

1
2
3
protected String generateFileNameForKeyValue(K key, V value, String name) {
return name;
}

我们可以重写generateFileNameForKeyValue方法,改成根据key存储,或者解析value,根据value中的某一字段进行分目录存储都可以。下例是根据key分目录存储。

1
2
3
4
5
6
7
8
/**
* 设置根据spark的key进行存储
*/
class RDDMultipleTextOutputFormat[K, V]() extends MultipleTextOutputFormat[K, V]() {
override def generateFileNameForKeyValue(key: K, value: V, name: String) : String = {
(key + "/" + name)
}
}

2、然后存储的时候可以使用saveAsHadoopFile存储,并指定存储方式。

1
2
3
4
5
6
... //处理完的rdd调用saveAsHadoopFile,使用自己定义的存储方式存储
resultRdd.saveAsHadoopFile(
outputDataPath,
classOf[String],
classOf[String],
classOf[RDDMultipleTextOutputFormat[_,_]])