感悟
spark是一个好东西。流式处理数据,扩容方便,较大程度上的利用机器,学习还是很有必要的。这几个星期的使用,也踩了不少坑。世界上的很多东西,不是难,而是不知道。
_jsc
我是python的使用者,近来发现需要在启动spark的时候设置参数,但是在文档查找的过程中,只发现只有java的api文档里面有相关的参数设置,而python没有。囧~
最后发现python的SparkContext
里,有一个参数是_jsc
,其代表的恰恰就是java里面的 SparkContext
。运行时可以通过这个对象,来实现参数的设置。
parquet.enable.summary-metadata
上面提到的参数设置,其实就是这个参数了。由于我所使用的spark的版本是1.6.1,因此这个参数默认的值是true,2.0以后默认为false。详细可见 issue
大概意思如下,spark在生成parquet文件的过程中,最后会扫描文件夹下的所有文件,并将文件的大致的详细汇总到文件夹目录的metadata文件,完成这个过程之后,在下次直接匹配这个文件夹的时候,spark会读取metadata,加快扫描的速度。
问题是生成这个metadata的文件是需要扫描所有文件的!而我生成paruqet的时候,是以追加的形式加入,频率还是蛮高的情况下,这就意味着我的spark程序会花费大量的时间在汇总这部分数据。并且汇总数据的过程是不会分布到多台机器上运行的。这就大大的降低了集群的利用率,其实也毫无必要。
因此spark生成parquet的过程中,可以选择关闭这个特性~
1 | sparkContext._jsc.hadoopConfiguration.set('parquet.enable.summary-metadata', 'false') |
partitionBy
一开始我一直纠结了很多问题,output的分类就是其中一个。而partitonBy
完美地解决了我的需求。
1 | df.write.partitionBy('index1', 'index2', 'index3').parquet('some/path/') |
以上的代码中,df
为一个dataFrame的数据集。
这段代码可以根据index1, index2, index3这几个columns来分类,将数据自动分为如下的目录结构保存.
1 | some/path/index1={index1}/index2={index2}/index3={index3}/part-****.parquet |
之前还傻乎乎地groupBy数据,然后用了collect这个算子,因而需要读取所有文件,就有内存问题等存在。
binaryFiles
这个不算坑,只是自己没有注意到而已。
之前都是使用textFile
的格式读取文本文件,这个过程中,没有指定minPartition
,而minPartition
的值也基本默认为文本文件的个数。但是binaryFiles
不是。程序在默认读取大量二进制文件之后,并没有选择分开,而是minPartition
为1,因此这里需要手动指定。
parquet && sparkSQL
sparkSQL太杀手级了~
parquet是列存储数据模式,可以大大的减少存储的文件大小,减少扫描过程中的大小。效率惊人~
sparkSQL在读取文件夹目录下所有文件的过程中,存在一个特定partition auto-discovery
的特性。在spark的文档中有详细的记载。大概如下:
1 | df = sqlContext.read.parquet(/some/path) |
如果/some/path
下有目录如下1
2
3
4
5/some/path
-- /index1=h1/index2=f1/
-- /index1=h1/index2=f2/
-- /index1=h2/index2=f1/
-- /index1=h2/index2=f2/
则最后生成的sparkSQL的table里面,将会自动的加入index1, index2的column,当你使用index1来查询时,即
1 | df.registerTempTable("log") |
这个过程中,会自动的只扫描index1=h1的目录下的文件。
缺点是我喜欢使用通配符的形式来读取,结果通配的列就不存在了~
sad~
结局
未完待续~