那些年不懂的spark

感悟

spark是一个好东西。流式处理数据,扩容方便,较大程度上的利用机器,学习还是很有必要的。这几个星期的使用,也踩了不少坑。世界上的很多东西,不是难,而是不知道。

_jsc

我是python的使用者,近来发现需要在启动spark的时候设置参数,但是在文档查找的过程中,只发现只有java的api文档里面有相关的参数设置,而python没有。囧~

最后发现python的SparkContext里,有一个参数是_jsc,其代表的恰恰就是java里面的 SparkContext。运行时可以通过这个对象,来实现参数的设置。

parquet.enable.summary-metadata

上面提到的参数设置,其实就是这个参数了。由于我所使用的spark的版本是1.6.1,因此这个参数默认的值是true,2.0以后默认为false。详细可见 issue

大概意思如下,spark在生成parquet文件的过程中,最后会扫描文件夹下的所有文件,并将文件的大致的详细汇总到文件夹目录的metadata文件,完成这个过程之后,在下次直接匹配这个文件夹的时候,spark会读取metadata,加快扫描的速度。

问题是生成这个metadata的文件是需要扫描所有文件的!而我生成paruqet的时候,是以追加的形式加入,频率还是蛮高的情况下,这就意味着我的spark程序会花费大量的时间在汇总这部分数据。并且汇总数据的过程是不会分布到多台机器上运行的。这就大大的降低了集群的利用率,其实也毫无必要。

因此spark生成parquet的过程中,可以选择关闭这个特性~

1
sparkContext._jsc.hadoopConfiguration.set('parquet.enable.summary-metadata', 'false')

partitionBy

一开始我一直纠结了很多问题,output的分类就是其中一个。而partitonBy完美地解决了我的需求。

1
df.write.partitionBy('index1', 'index2', 'index3').parquet('some/path/')

以上的代码中,df为一个dataFrame的数据集。

这段代码可以根据index1, index2, index3这几个columns来分类,将数据自动分为如下的目录结构保存.

1
some/path/index1={index1}/index2={index2}/index3={index3}/part-****.parquet

之前还傻乎乎地groupBy数据,然后用了collect这个算子,因而需要读取所有文件,就有内存问题等存在。

binaryFiles

这个不算坑,只是自己没有注意到而已。

之前都是使用textFile的格式读取文本文件,这个过程中,没有指定minPartition,而minPartition的值也基本默认为文本文件的个数。但是binaryFiles不是。程序在默认读取大量二进制文件之后,并没有选择分开,而是minPartition为1,因此这里需要手动指定。

parquet && sparkSQL

sparkSQL太杀手级了~

parquet是列存储数据模式,可以大大的减少存储的文件大小,减少扫描过程中的大小。效率惊人~

sparkSQL在读取文件夹目录下所有文件的过程中,存在一个特定partition auto-discovery的特性。在spark的文档中有详细的记载。大概如下:

1
df = sqlContext.read.parquet(/some/path)

如果/some/path下有目录如下

1
2
3
4
5
/some/path
-- /index1=h1/index2=f1/
-- /index1=h1/index2=f2/
-- /index1=h2/index2=f1/
-- /index1=h2/index2=f2/

则最后生成的sparkSQL的table里面,将会自动的加入index1, index2的column,当你使用index1来查询时,即

1
2
df.registerTempTable("log")
sqlContext.sql("SELECT * FROM log WHERE index1 = h1")

这个过程中,会自动的只扫描index1=h1的目录下的文件。
缺点是我喜欢使用通配符的形式来读取,结果通配的列就不存在了~
sad~

结局

未完待续~

Done