使用Spark动态分配

发布的

故事从参数开始。每个成熟的软件公司都需要一个度量系统来监控资源的利用。在某些时候,我们注意到spark执行器及其cpu的利用率不足。通常,为了通过共享提高CPU利用率,使用动态分配而不是静态资源分配。在这篇博文中,我们将定义问题,分享我们的工作目标,并强调许多与动态分配使用相关的技术特性。

在Taboola,我们使用Grafana、Prometheus以及基于kafka的管道来收集来自世界各地多个数据中心的参数。大规模度量是一个非常有趣的主题,它本身涉及多个问题,我们之前已经在我们的文章中讨论过这些问题博客meetup演讲

我们的数据平台包括几个计算数据投影的服务,重要的是,这些服务都是具有长寿命spark上下文的长时间运行的流程。当这些服务被触发时,它们会周期性地处理新的数据块,但是,它们会阻塞,直到下列情况出现,导致资源未被使用,而由于内核的静态资源分配,其他框架无法使用它们。

这里是一个Grafana仪表盘显示的问题:

之前

从Mesos集群中获取的内核总数总是500个,而实际使用峰值为400个,偶尔会导致内核在很长时间内处于空闲状态。

我们可以这样定义我们的目标:

  1. 更好地利用现有资源
  2. 改善端到端处理时间

释放静态集群中未使用资源的一种方法是(我们在内部运行,使用静态数量的Mesos-worker节点)开始使用动态分配特性。

什么是动态分配?

  • Spark提供了一种机制,可以根据工作负载动态调整应用程序占用的资源
  • 如果资源不再被使用,您的应用程序可能会将它们返回给集群,并在稍后有需求时再次请求它们
  • 如果多个应用程序共享您的Spark集群资源,那么它尤其有用

那么,到底发生了什么呢?

Spark驱动程序监控等待执行的任务的数量。当没有这样的任务或有足够的执行器时,就会安装一个超时计时器。如果它过期,驱动程序关闭Mesos-worker节点上应用程序的executor。其他执行器可能仍然希望访问已完成所有任务的执行器上的一些数据,因此我们需要一个外部的洗牌服务来为它们提供继续访问洗牌数据的方法。

如何开始

  1. 开启External shuffle服务:Spark.shuffle.service.enabled = true,可以选择配置spark.shuffle.service.port
  2. 启用动态分配特性标志:spark.dynamicAllocation.enabled = true
  3. 在集群中的每个节点上提供外部洗牌服务,这些服务将监听spark.shuffle.service.port

如何确保外部洗牌服务在每个mesos-worker节点上运行

自然的方法是使用马拉松.你可以把它想象成init。d”用于Mesos集群框架。默认情况下,Marathon可以决定跨集群分发服务实例,这样一些机器就可以承载多个shuffle服务实例,或者根本没有。这是不可取的,因为我们希望每台主机上有一个严格的实例。

为了确保最多实例化一个服务,我们将使用service约束它允许我们显式地指定每台机器的最大实例。例如,我们可以在配置中添加一个条目:

“约束”:[[“主机名”、“独特”]]

同样,为了确保至少一个服务可以被放置在一台机器上,我们将简单地reserv指定给该服务的E资源。

“shuffle”角色的静态资源预留

  1. 外部洗牌服务的需要将由马拉松术语中的“洗牌”角色来满足。
  2. 我们在每个节点上配置mesos-agent,以便将资源报告给集群,同时将shuffle需求考虑在内。下面的启动参数用于代理:
——资源= cpu: 10; mem: 16000;港口:[31000 - 32000];cpu (shuffle): 2; mem (shuffle): 2048;港口(shuffle): (7337 - 7339)

让我们将这些参数逐一分解:

  • mesos代理的默认端口范围是31000-32000。
  • 我们正在为外部洗牌服务分配2Gb的RAM。
  • 我们正在为外部洗牌服务分配3个端口(7337到7339)(用于蓝绿色部署,不同的spark版本等)
  • 资源可能被过度分配(分配给shuffle角色的2个cpu不包括在10个cpu的总数中,尽管实际上机器上总共只有10个cpu)。

为了设置马拉松大师正确使用资源,我们需要在启动时向-mesos_role参数添加相同的角色(shuffle)。

现在,我们确保外部洗牌服务在每个节点上只运行一次它自己的资源,而不管资源利用情况如何。

在登台环境的测试过程中,我们发现在20分钟后,由于缺少shuffle文件,任务开始失败。似乎shuffle文件的spark管理有它的墙角案例。

外部Shuffle服务和Shuffle文件管理

如前所述,外部shuffle服务将执行器产生的所有shuffle文件注册在同一个节点上,并负责作为已经死亡的执行器的代理。它负责在某些时候清理这些文件。但是,spark作业可能会失败或尝试重新计算提前清理的文件。

  1. 这个问题还有一些蛛丝马迹。火星- 12583-解决移除洗牌文件过早的问题,通过发送心跳每个外部洗牌服务从应用程序。
    • 驱动程序必须注册到所有运行在它有执行器所在的mesos-worker节点上的外部shuffle服务
    • 尽管对该机制进行了完整的重构,但它仍然不能总是工作。我们开了火星- 23286
  2. 最后(即使修复了),这对我们长时间运行spark服务的用例来说并不好,因为我们的应用程序“永远不会”结束,所以不清楚什么时候删除shuffle文件
  3. 我们通过-Dspark.shuffle.cleaner.interval=31557600禁用了外部洗牌服务的清理
  4. 我们在每个spark worker上安装了一个简单的cron作业,以清理未超过X小时的洗牌文件。这需要相当大的磁盘才能有一个缓冲区。

因此,我们调整了我们的外部洗牌服务参数。下面是如何在马拉松上安装该服务的详细信息。

定义External shuffle服务作为马拉松服务运行

  1. 马拉松式的支持REST API,因此您可以通过如下所示的张贴服务描述符来部署服务
curl -v localhost:8080/v2/apps -XPOST -H "Content-Type: application/json" -d'{…}'

  1. 我们将json描述符提交到源代码控制存储库以维护历史记录
  2. 仲裁中的马拉松领导者运行定期任务,必要时通过REST-API更新服务描述符
  3. 下面是运行在7337端口上的shuffle服务的马拉松服务json描述符:
    • 实例动态配置
    • 使用便rest api找到活跃的员工
    • 使用Marathon REST-API找出给定服务的运行任务(实例)数量
{"id": "/shuffle-service-7337", "cmd": "spark-2.2.0-bin-hadoop2.7/sbin/start-mesos-shuffle-service.sh", "cpu ": 0.5, "mem": 1024, "instances": 20, "constraints": [["hostname", "UNIQUE"]], "acceptedResourceRoles": ["shuffle"], "uris": ["http://my-repo-endpoint/spark-2.2.0-bin-hadoop2.7. sh" .sh": "http://my-repo-endpoint/spark-2.2.0-bin-hadoop2.7. sh" . "tgz"], "env": {"SPARK_NO_DAEMONIZE":"true", "SPARK_SHUFFLE_OPTS":" -Dspark.shuffle.cleaner.interval=31557600 -Dspark.shuffle.service. "= 7337 -Dspark.shuffle.service港。= true -Dspark.shuffle.io启用。connectionTimeout=300s", "SPARK_DAEMON_MEMORY": "1g", "SPARK_IDENT_STRING": "7337", "SPARK_PID_DIR": "/var/run", "SPARK_LOG_DIR": "/var/log/taboola", "PATH": "/usr/bin:/bin"}, "portDefinitions": [{"protocol": "tcp", "port": 7337}], "requirePorts": true}

如前所述,我们需要适当地配置spark应用程序:

火花应用程序设置:

  1. spark.shuffle.service.enabled = true
  2. spark.dynamicAllocation.enabled = true
  3. spark.dynamicAllocation.executorIdleTimeout = 120年代
  4. spark.dynamicAllocation.cachedExecutorIdleTimeout = 120年代
    • 默认为无限,可能会阻止缩小
    • 广播数据似乎属于“缓存”类别,所以如果你有广播,它可能也会阻止你释放资源
  5. spark.shuffle.service.port = 7337
  6. spark. dynamicallocation . minexecutor = 1 -默认为0
  7. Spark.scheduler.listenerbus.eventqueue.size = 500000 -详细信息参见火星- 21460

到目前为止,我们正在运行在生产环境中启用动态分配的服务。前半天一切都很好。然而,过了一段时间,我们开始注意到这些服务的退化。尽管Mesos master报告了可用资源,但框架从Mesos master获得的cpu越来越少。

我们启用了spark调试日志,调查并发现使用动态分配的框架拒绝来自Mesos master的资源“提供”。这有两个原因:

  1. 我们运行的是绑定到jmx端口的spark执行器,所以在使用动态分配时,同一个框架在某些情况下从同一个mesos-worker获得了额外的提供,并试图启动它的执行器,但失败了(由于端口冲突)。
  2. 司机开始把临时工列入黑名单后才2这样的失败没有任何超时的黑名单。由于在动态分配模式下,执行器不断地启动和关闭,这些故障更加频繁,在服务运行6小时后,大约有1/3的mesos-worker被列入服务黑名单。

黑名单mesos-workers节点

  1. Spark有一个在默认情况下关闭的黑名单机制。
  2. Spark-Mesos集成有一个自定义的黑名单机制总是在最大失败次数== 2。
  3. 我们已经实现了一个自定义补丁,以便该黑名单将在配置的超时后过期,因此Mesos-worker节点将返回到有效节点池。
  4. 我们已经从执行器的配置中删除了jmx配置和所有其他端口绑定,以减少失败的数量。

我们仍然需要发现外部洗牌服务的优化

一些参数仅适用于spark 2.3或以上版本:火星- 20640

  1. spark.shuffle.io.serverThreads
  2. spark.shuffle.io.backLog
  3. spark.shuffle.service.index.cache.entries

我们取得了

  1. 我们在有意义的生产中使用动态分配(例如,有一些空闲时间的服务)
  2. 我们有更好的资源利用:我们可以在同一个集群上运行五个服务,而不是四个服务
  3. 我们能够为每个服务提供更多的内核(800个vs 500个),这减少了端到端运行时间。

请注意total_cpus_sum(来自集群的分配)是如何跟随real_cpus_sum(框架中所有worker的实际使用量)的。

总的来说,我们可以说:

  1. 动态分配有助于更好地利用资源。
  2. 但仍有一些极端情况,特别是在中观星系团中。

今天就开始你的Taboola事业吧!