Samplex:扩大你的火花工作

发布的

使用Samplex来扩大你的Spark Jobs

几年前,当我接触到Taboola的数据时,我突然意识到,我所听到和了解的关于大数据的一切都不是真正的大数据。

理解范围内

Taboola的大数据环境具有大规模的生产规模,数据管道中流动着pb级的数据。随着数千种不同的工作负载不断地在这些数据上运行,Taboola每天生成大量的原始数据,大约100TB,这还不包括为了报告和分析而创建的所有聚合。这些数据经常被用于公司的各种应用,如深度学习、商业智能(BI)分析和报告。

未定义的

在处理大的原始数据时,我们经常寻求优化下游工作,防止不必要的工作处理整个数据集的。这是通过创建不同作业或常见查询通常需要的原始数据子集来实现的。一个经典的例子是对不同下游管道的数据应用相同的过滤器。

多读浪费资源

随着时间的推移,我们在数据平台组鉴定的数量越来越多火花生成完整数据集的不同子集以供下游查询使用的作业。每一个查询都需要大量的资源来查询完整的数据集、过滤它并将它保存回HDFS。这似乎是一种资源的浪费……如果我们能一次读取完整的数据集,并在一个Spark作业中生成所有这些子集就好了。

简单而直接的方法是通过读取源数据帧来生成不同的数据子集,然后使用多个预定义的过滤器多次保存数据,如下例所示:

Spark是惰性的,每个写操作都会触发整个DAG(一个在Spark调度层的工作流),所以它需要Spark用相应数量的写操作多次扫描整个数据。我们可以尝试缓存输入数据。不幸的是,我们正在处理大量的数据,这些数据太大以至于无法装入内存,即使有几十个执行器节点。

此外,我们生成的每个子集都可能受益于谓词下推和其他存储优化,从而避免读取整个数据。但是对于许多不同的过滤器,以及非常复杂的模式,我们无法真正存储为每个读路径优化的完整数据,而不保留完整数据集的不同投影。

读一次写多次

我们决定接受这个挑战,并找到一个解决方案,该方案将涉及单个Spark动作,不需要缓存(也不需要多次读取数据)。我们的解决方案,即下文所描述的,为我们节省了大量的资源,我们甚至开源它。

我们意识到我们需要回到最基本的东西,把我们自己带出我们的Spark SQL舒适区。我们使用Spark核心功能来检查每个Parquet行,应用一系列不同的谓词来决定应该写入哪个数据子集,并将满足谓词条件的行写入多个输出位置。

未定义的

Samplex用法

Samplex应用程序编程接口(API)为每个输出数据集实现了两个接口。第一个是' SamplexJob ',它定义要写入的输出的目标路径,还有一个' SamplexFilter ',它定义使用的谓词。

SamplexJob-您应该提供目标路径、记录和模式过滤器。

SamplexFilter-它将告诉samplex哪个记录应该写入输出

SchemaFilter(可选的)-在其中用户应该定义的字段保留或删除基于块/允许列表。

准备好SamplexJobs后,只需创建一个SamplexExecuter并提供作业。

请参阅完整的例子GitHub

在测试Samplex时,我们遇到了一个错误(如拼花- 1441),为了解决这个问题,我们使用了最新版本的parquet-avro库,并使用maven-shade-plugin在Samplex开源下重新定位它。

注意事项

使用Samplex时,需要注意以下几点:

  • 并行性—使用Samplex,每个拼花部件都在一个特定的任务中完全处理,数据不能被分割和重新分区,以实现更高的并行性。基本上,Samplex的并行性受输入文件数量的限制。这意味着,如果拼花部件包含多个行组,则不能像使用SparkSQL那样在不同的spark分区中处理它们。
  • 输出文件数量—Samplex的输出也取决于并行度。如果某个输出子集非常小,则不能将其合并到更少的拼木地板部件中,也就是说,所有输出都以相同的并行性写入。
  • 谓词下推- Samplex不能从任何谓词下推机制中获益,因为这与Samplex的想法相反,Samplex的每个输出都使用不同的过滤器。应用谓词下推将需要对所有数据子集的谓词进行交叉。
  • 调优-可以帮助提高性能的额外配置是为每个任务(spark.task.cpus)添加更多的内核,以写入在同一个任务中并行执行的输出。

随着Samplex的继续开发,上述一些问题将得到解决。请注意,所有开发人员都可以为未来的Samplex版本做出贡献,无论是通过识别和解决bug,建议新特性或改进,还是仅仅通过头脑风暴。

结果

为了生成采样数据,我们对Samplex和Spark SQL的原始使用进行了基准测试,包括了超过1.1TB的拼花数据的4个采样过滤器。在这两种情况下,我们都为Spark分配了800个cpu。此外,为了避免数据局部性的偏差,Spark executor不运行在HDFS集群机器上。

在我们的概念证明(POC)过程中,我们发现在使用与以前相同的资源时,执行时间有了显著的改进,总持续时间减少了50%以上,总读取字节数的减少甚至更大。与添加另一个独立的Spark作业相比,为Samplex添加另一个过滤器的开销要小得多。此外,通过使用线程编写输出,我们能够更有效地利用资源。

我们的基准如下:

  • 产生四个不同的子集
  • 输入文件数量:4500个
  • 总输入大小1192.0GB
  • 记录数量:337,693,147
  • 列数:超过1600(使用深度嵌套的模式)
  • 文件格式:拼花
  • 文件压缩:时髦的

未定义的

未定义的

未定义的

结论

我们喜欢Spark,它是我们数据平台组的核心技术之一,也是我们栈中的关键技术,因为它的早期版本。它从未让我们失望。但在我们的规模下,我们经常需要跳出固有的思维模式,找到创造性的解决方案来利用现有的技术,使工作更有效率。我们的Spark集群中有超过700个计算节点(约30000个核),我们努力使可用资源的效率最大化。由于研发中的新举措需要更多的计算能力,我们总是倾向于扩大规模,而不是在寻求其他替代方案之前盲目扩大规模。

今天就开始你的Taboola事业吧!