Explorez les meilleures pratiques pour l’optimisation des performances Spark – Développeur IBM

count() sur un jeu de données est une action Spark. C’est un problème courant que j’ai vu où il y a plusieurs appels count() dans les applications Spark qui sont ajoutés pendant le débogage et ils ne sont pas supprimés. C’est une bonne idée de rechercher des actions Spark et de supprimer celles qui ne sont pas nécessaires car nous ne voulons pas utiliser de cycles CPU et d’autres ressources lorsqu’elles ne sont pas nécessaires.

Formats de fichiers

Lorsque vous concevez vos jeux de données pour votre application, assurez-vous d’utiliser au mieux les formats de fichiers disponibles avec Spark. Certaines choses à considérer:

  • Spark est optimisé pour Apache Parquet et ORC pour le débit de lecture. Spark prend en charge la vectorisation qui réduit les E / S de disque. Les formats en colonnes fonctionnent bien.
  • Utilisez le format de fichier Parquet et utilisez la compression.
  • Il existe différents formats de fichiers et sources de données intégrées qui peuvent être utilisés dans Apache Spark.Utilisez des formats de fichiers séparables.
  • Assurez-vous qu’il n’y a pas trop de petits fichiers. Si vous avez beaucoup de petits fichiers, il peut être judicieux de les compacter pour de meilleures performances.

Parallélisme

  • Augmentez le nombre de partitions Spark pour augmenter le parallélisme en fonction de la taille des données. Assurez-vous que les ressources du cluster sont utilisées de manière optimale. Trop peu de partitions peuvent entraîner l’inactivité de certains exécuteurs, tandis que trop de partitions peuvent entraîner une surcharge de la planification des tâches.
  • Réglez les partitions et les tâches. Spark peut gérer des tâches de plus de 100 ms et recommande au moins 2 à 3 tâches par cœur pour un exécuteur.
  • Spark décide du nombre de partitions en fonction de l’entrée de taille de fichier. Parfois, il est logique de spécifier explicitement le nombre de partitions.
    • L’API de lecture prend un nombre facultatif de partitions.
    • étincelle.SQL.fichier.maxPartitionBytes , disponible dans Spark v2.0.0, pour Parquet, ORC et JSON.
  • Les partitions de lecture aléatoire peuvent être réglées en réglant spark.SQL.shuffle.partitions , dont la valeur par défaut est 200. C’est vraiment petit si vous avez de grandes tailles d’ensembles de données.

Réduire la lecture aléatoire

La lecture aléatoire est une opération coûteuse car elle implique le déplacement de données sur les nœuds de votre cluster, ce qui implique des E / S réseau et disque. Il est toujours judicieux de réduire la quantité de données à mélanger. Voici quelques conseils pour réduire le shuffle:

  • Réglez l’étincelle .SQL.shuffle.partitions .
  • Partitionnez l’ensemble de données d’entrée de manière appropriée afin que chaque taille de tâche ne soit pas trop grande.
  • Utilisez l’interface utilisateur Spark pour étudier le plan afin de rechercher une opportunité de réduire le mélange autant que possible.
  • Recommandation de formule pour spark.SQL.shuffle.cloisons:
    • Pour les grands ensembles de données, visez une taille cible de tâche allant de 100 Mo à moins de 200 Mo pour une partition (utilisez une taille cible de 100 Mo, par exemple).
    • étincelle.SQL.shuffle.partitions = quotient (taille d’entrée d’étape aléatoire / taille cible) / cœurs totaux) * cœurs totaux.

Filtrer/Réduire la taille de l’ensemble de données

Recherchez des opportunités de filtrer les données le plus tôt possible dans votre pipeline d’applications. S’il y a une opération de filtrage et que vous souhaitez uniquement analyser un sous-ensemble de données, appliquez ce filtre tôt. Si vous pouvez réduire la taille de l’ensemble de données tôt, faites-le. Utilisez des prédicats de filtre appropriés dans votre requête SQL afin que Spark puisse les pousser vers la source de données sous-jacente ; les prédicats sélectifs sont bons. Utilisez-les comme il convient. Utilisez des filtres de partition s’ils sont applicables.

Cache de manière appropriée

Spark prend en charge la mise en cache des ensembles de données en mémoire. Il existe différentes options disponibles:

  • Utilisez la mise en cache lorsque la même opération est calculée plusieurs fois dans le flux de pipeline.
  • Utilisez la mise en cache à l’aide de l’API persist pour activer le paramètre de cache requis (persister sur le disque ou non ; sérialisé ou non).
  • Soyez conscient du chargement paresseux et du cache principal si nécessaire à l’avance. Certaines API sont impatientes et d’autres non.
  • Consultez l’onglet Stockage de l’interface utilisateur Spark pour afficher des informations sur les ensembles de données que vous avez mis en cache.
  • Il est recommandé de désinscrire votre ensemble de données mis en cache lorsque vous avez terminé de les utiliser afin de libérer des ressources, en particulier lorsque d’autres personnes utilisent également le cluster.

Join

Join est, en général, une opération coûteuse, alors faites attention aux jointures dans votre application pour les optimiser. BroadcastHashJoin est le plus performant dans les cas où l’une des relations est suffisamment petite pour pouvoir être diffusée. Voici quelques conseils:

  • L’ordre de jointure compte; commencez par la jointure la plus sélective. Pour les relations inférieures à spark.SQL.autoBroadcastJoinThreshold , vous pouvez vérifier si la diffusion HashJoin est captée.
  • Utilisez des astuces SQL si nécessaire pour forcer un type spécifique de jointure.
    • Exemple : Lorsque vous joignez un petit ensemble de données à un grand ensemble de données, une jointure de diffusion peut être forcée de diffuser le petit ensemble de données.
    • Confirmez que Spark récupère la jointure de hachage de diffusion; sinon, on peut la forcer à l’aide de l’indice SQL.
  • Évitez les jointures croisées.
  • Broadcast HashJoin est le plus performant, mais peut ne pas être applicable si les deux relations dans join sont importantes.
  • Collectez des statistiques sur des tables pour que Spark calcule un plan optimal.

Syntonisez les ressources du cluster

  • Syntonisez les ressources du cluster en fonction du gestionnaire de ressources et de la version de Spark.
  • Réglez la mémoire disponible sur le pilote : spark.pilote.mémoire.
  • Réglez le nombre d’exécuteurs et l’utilisation de la mémoire et du cœur en fonction des ressources du cluster : mémoire d’exécuteur, num-exécuteurs et cœurs d’exécuteurs.

Consultez la documentation de configuration de la version Spark avec laquelle vous travaillez et utilisez les paramètres appropriés.

Évitez les opérations coûteuses

  • Évitez commandez par si ce n’est pas nécessaire.
  • Lorsque vous écrivez vos requêtes, au lieu d’utiliser select * pour obtenir toutes les colonnes, récupérez uniquement les colonnes pertinentes pour votre requête.
  • N’appelez pas count inutilement.

Inclinaison des données

  • Assurez-vous que les partitions sont de taille égale pour éviter les problèmes d’inclinaison des données et de faible utilisation du processeur.
    • À titre d’exemple: Si vous avez des données provenant d’une source de données JDBC en parallèle et que chacune de ces partitions ne récupère pas un nombre similaire d’enregistrements, cela entraînera des tâches de taille inégale (une forme de biais de données). Peut-être qu’une partition ne fait que quelques Ko, alors qu’une autre fait quelques centaines de Mo. Certaines tâches seront plus grandes que d’autres, et pendant que les exécuteurs des tâches plus importantes seront occupés, les autres exécuteurs, qui gèrent la tâche plus petite, finiront et seront inactifs.
    • Si les données à la source ne sont pas partitionnées de manière optimale, vous pouvez également évaluer les compromis de l’utilisation de la repartition pour obtenir une partition équilibrée, puis utiliser la mise en cache pour la conserver en mémoire le cas échéant.
  • La repartition provoquera un shuffle, et le shuffle est une opération coûteuse, donc cela devrait être évalué sur une base d’application.
  • Utilisez l’interface utilisateur Spark pour rechercher les tailles de partition et la durée de la tâche.

UDF

Spark dispose d’un certain nombre de fonctions intégrées définies par l’utilisateur (UDF) disponibles. Pour les performances, vérifiez si vous pouvez utiliser l’une des fonctions intégrées car elles sont bonnes pour les performances. Les UDF personnalisés dans l’API Scala sont plus performants que les UDF Python. Si vous devez utiliser l’API Python, utilisez l’UDF pandas nouvellement introduit en Python qui a été publié dans Spark 2.3. La prise en charge de pandas UDF (UDF vectorisés) dans Spark a des améliorations significatives des performances par opposition à l’écriture d’un UDF Python personnalisé. Obtenez plus d’informations sur la rédaction d’un UDF pandas.

J’espère que cela vous a été utile lors de l’écriture de vos applications Spark. Bon développement! Dans un prochain blog, je vais vous montrer comment obtenir le plan d’exécution de votre travail Spark.

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée.