Explore las mejores prácticas para la optimización del rendimiento de Spark-IBM Developer

count () en un conjunto de datos es una acción Spark. Es un problema común que he visto cuando hay varias llamadas count() en aplicaciones Spark que se agregan durante la depuración y no se eliminan. Es una buena idea buscar acciones de Spark y eliminar las que no sean necesarias porque no queremos usar ciclos de CPU y otros recursos cuando no sean necesarios.

Formatos de archivo

Cuando diseñe sus conjuntos de datos para su aplicación, asegúrese de aprovechar al máximo los formatos de archivo disponibles con Spark. Algunas cosas a considerar:

  • Spark está optimizado para Apache Parquet y ORC para el rendimiento de lectura. Spark tiene soporte de vectorización que reduce las E/S de disco.Los formatos en columnas funcionan bien.
  • Utilice el formato de archivo Parquet y haga uso de la compresión.
  • Hay diferentes formatos de archivo y fuentes de datos integradas que se pueden usar en Apache Spark.Utilice formatos de archivo dividibles.
  • Asegúrese de que no haya demasiados archivos pequeños. Si tiene muchos archivos pequeños, podría tener sentido compactarlos para un mejor rendimiento.

Paralelismo

  • Aumente el número de particiones Spark para aumentar el paralelismo en función del tamaño de los datos. Asegúrese de que los recursos del clúster se utilizan de forma óptima. Muy pocas particiones podrían resultar en que algunos ejecutores estén inactivos, mientras que demasiadas particiones podrían resultar en sobrecarga de la programación de tareas.
  • Ajuste las particiones y las tareas. Spark puede manejar tareas de más de 100 ms y recomienda al menos 2-3 tareas por núcleo para un ejecutor.
  • Spark decide el número de particiones en función del tamaño de archivo de entrada. A veces, tiene sentido especificar el número de particiones explícitamente.
    • La API de lectura toma un número opcional de particiones.
    • spark.SQL.file.maxPartitionBytes, disponible en Spark v2.0.0, para Parquet, ORCO y JSON.
  • Las particiones barajadas se pueden ajustar configurando spark.SQL.barajar.particiones, cuyo valor predeterminado es 200. Esto es realmente pequeño si tiene grandes tamaños de conjuntos de datos.

Reducir aleatorio

Shuffle es una operación costosa, ya que implica el movimiento de datos a través de los nodos en el clúster, que implica la red y el disco I/O. es siempre una buena idea para reducir la cantidad de datos que necesita ser mezclado. Estos son algunos consejos para reducir la mezcla aleatoria:

  • Sintonice la chispa .SQL.barajar.particiones.
  • Particione el conjunto de datos de entrada de forma adecuada para que el tamaño de cada tarea no sea demasiado grande.
  • Use la interfaz de usuario de Spark para estudiar el plan y buscar la oportunidad de reducir la mezcla tanto como sea posible.
  • Recomendación de fórmula para spark.SQL.barajar.particiones:
    • Para conjuntos de datos grandes, apunte a un tamaño de destino de tarea de entre 100 Mb y menos de 200 MB para una partición (use un tamaño de destino de 100 MB, por ejemplo).
    • spark.SQL.barajar.particiones = cociente (tamaño de entrada de etapa aleatoria/tamaño de destino)/núcleos totales) * núcleos totales.

Filtrar / Reducir el tamaño del conjunto de datos

Busque oportunidades para filtrar los datos lo antes posible en su canalización de aplicaciones. Si hay una operación de filtro y solo está interesado en realizar análisis para un subconjunto de datos, aplique este filtro antes de tiempo. Si puede reducir el tamaño del conjunto de datos antes de tiempo, hágalo. Use predicados de filtro adecuados en su consulta SQL para que Spark pueda enviarlos al origen de datos subyacente; los predicados selectivos son buenos. Utilícelos según corresponda. Utilice filtros de partición si son aplicables.

Cache apropiadamente

Spark admite el almacenamiento en caché de conjuntos de datos en memoria. Hay diferentes opciones disponibles:

  • Utilice el almacenamiento en caché cuando la misma operación se calcula varias veces en el flujo de canalización.
  • Utilice el almacenamiento en caché con la API persist para habilitar la configuración de caché necesaria (persistir en el disco o no; serializado o no).
  • Tenga en cuenta la carga lenta y la caché principal si es necesario por adelantado. Algunas API están ansiosas y otras no.
  • Consulte la pestaña Almacenamiento de la interfaz de usuario de Spark para ver información sobre los conjuntos de datos que ha almacenado en caché.
  • Es una buena práctica dejar intacto el conjunto de datos en caché cuando haya terminado de usarlos para liberar recursos, especialmente cuando otras personas también usen el clúster.

Unirse

Unirse es, en general, una operación costosa, así que preste atención a las uniones en su aplicación para optimizarlas. BroadcastHashJoin es más eficaz para los casos en los que una de las relaciones es lo suficientemente pequeña como para poder transmitirse. A continuación se presentan algunos consejos:

  • Asuntos de orden de unión; comience con la unión más selectiva. Para relaciones inferiores a spark.SQL.autoBroadcastJoinThreshold, puede verificar si broadcast HashJoin se recoge.
  • Use sugerencias SQL si es necesario para forzar un tipo específico de unión.
    • Ejemplo: Al unir un conjunto de datos pequeño con un conjunto de datos grande, una unión de difusión puede verse obligada a transmitir el conjunto de datos pequeño.
    • Confirme que Spark está recogiendo la unión de hash de difusión; si no, se puede forzar usando la sugerencia SQL.
  • Evite las uniones cruzadas.
  • Broadcast HashJoin es el más performante, pero puede no ser aplicable si ambas relaciones en join son grandes.
  • Recopile estadísticas en tablas para que Spark calcule un plan óptimo.

Ajustar los recursos del clúster

  • Ajustar los recursos del clúster según el administrador de recursos y la versión de Spark.
  • Ajuste la memoria disponible al controlador: spark.controlador.memoria.
  • Ajuste el número de ejecutores y el uso de memoria y núcleo en función de los recursos del clúster: memoria ejecutora, ejecutores numéricos y núcleos ejecutores.

Consulte la documentación de configuración de la versión de Spark con la que está trabajando y utilice los parámetros adecuados.

Evite operaciones costosas

  • Evite ordene por si no es necesario.
  • Cuando escriba sus consultas, en lugar de usar select * para obtener todas las columnas, solo recupere las columnas relevantes para su consulta.
  • No llame a count innecesariamente.

Sesgo de datos

  • Asegúrese de que las particiones tengan el mismo tamaño para evitar sesgos de datos y problemas de baja utilización de la CPU.
    • Por ejemplo: Si tiene datos procedentes de una fuente de datos JDBC en paralelo, y cada una de esas particiones no está recuperando un número similar de registros, esto dará lugar a tareas de tamaño desigual (una forma de sesgo de datos). Tal vez una partición es de solo unos pocos KB, mientras que otra es de unos pocos cientos de MB. Algunas tareas serán más grandes que otras, y mientras los ejecutores en tareas más grandes estarán ocupados, los otros ejecutores, que están manejando la tarea más pequeña, terminarán y estarán inactivos.
    • Si los datos en el origen no se particionan de manera óptima, también puede evaluar las ventajas y desventajas de usar la repartición para obtener una partición equilibrada y, a continuación, utilizar el almacenamiento en caché para conservarla en la memoria, si procede.
  • La repartición causará un barajamiento, y el barajamiento es una operación costosa, por lo que debe evaluarse en función de la aplicación.
  • Use la interfaz de usuario de Spark para buscar los tamaños de partición y la duración de la tarea.

UDFs

Spark tiene una serie de funciones integradas definidas por el usuario (UDFs) disponibles. Para el rendimiento, compruebe si puede usar una de las funciones integradas, ya que son buenas para el rendimiento. Los UDF personalizados en la API de Scala son más eficientes que los UDF de Python. Si tiene que usar la API de Python, use el UDF pandas recién introducido en Python que se lanzó en Spark 2.3. El soporte UDF pandas (UDFs vectorizado) en Spark tiene mejoras de rendimiento significativas en lugar de escribir un UDF Python personalizado. Obtenga más información sobre cómo escribir un UDF de pandas.

Espero que esto le haya sido útil a medida que escribe sus aplicaciones Spark. Feliz desarrollo! En un próximo blog, te mostraré cómo obtener el plan de ejecución para tu trabajo de Spark.

Deja una respuesta

Tu dirección de correo electrónico no será publicada.