Esplora le best practice per l’ottimizzazione delle prestazioni Spark-IBM Developer

count () su un set di dati è un’azione Spark. È un problema comune che ho visto dove ci sono più chiamate count() nelle applicazioni Spark che vengono aggiunte durante il debug e non vengono rimosse. È una buona idea cercare le azioni Spark e rimuovere quelle che non sono necessarie perché non vogliamo utilizzare i cicli della CPU e altre risorse quando non sono necessarie.

Formati di file

Quando si progettano i set di dati per l’applicazione, assicurarsi di utilizzare al meglio i formati di file disponibili con Spark. Alcune cose da considerare:

  • Spark è ottimizzato per Apache Parquet e ORC per il throughput di lettura. Spark ha il supporto per la vettorizzazione che riduce l’I / O del disco.
  • Utilizzare il formato di file Parquet e fare uso di compressione.
  • Esistono diversi formati di file e origini dati integrate che possono essere utilizzate in Apache Spark.Utilizzare formati di file divisibili.
  • Assicurarsi che non ci siano troppi file di piccole dimensioni. Se hai molti file di piccole dimensioni, potrebbe avere senso compattarli per migliorare le prestazioni.

Parallelismo

  • Aumentare il numero di partizioni Spark per aumentare il parallelismo in base alle dimensioni dei dati. Assicurarsi che le risorse del cluster siano utilizzate in modo ottimale. Troppe poche partizioni potrebbero comportare che alcuni esecutori siano inattivi, mentre troppe partizioni potrebbero comportare un sovraccarico della pianificazione delle attività.
  • Sintonizzare le partizioni e le attività. Spark può gestire attività di 100 ms+ e consiglia almeno 2-3 attività per core per un esecutore.
  • Spark decide il numero di partizioni in base all’input della dimensione del file. A volte, ha senso specificare esplicitamente il numero di partizioni.
    • L’API di lettura accetta un numero opzionale di partizioni.
    • scintilla.sql.file.maxPartitionBytes, disponibile in Spark v2.0.0, per Parquet, ORC e JSON.
  • Le partizioni shuffle possono essere sintonizzate impostando spark.sql.mescolare.partizioni, il cui valore predefinito è 200. Questo è davvero piccolo se si dispone di grandi dimensioni del set di dati.

Riduci lo shuffle

Lo shuffle è un’operazione costosa in quanto comporta lo spostamento dei dati tra i nodi del cluster, che comporta l’I/O di rete e disco.È sempre una buona idea ridurre la quantità di dati che deve essere mescolata. Ecco alcuni suggerimenti per ridurre shuffle:

  • Sintonizzare la scintilla .sql.mescolare.partizioni.
  • Partizionare il set di dati di input in modo appropriato in modo che ogni dimensione dell’attività non sia troppo grande.
  • Usa l’interfaccia utente Spark per studiare il piano per cercare l’opportunità di ridurre il più possibile lo shuffle.
  • Raccomandazione formula per spark.sql.mescolare.partizioni:
    • Per set di dati di grandi dimensioni, puntare da 100 MB a meno di 200 MB dimensione obiettivo attività per una partizione (utilizzare la dimensione di destinazione di 100 MB, per esempio).
    • scintilla.sql.mescolare.partizioni = quoziente (shuffle stage input size / target size) / core totali) * core totali.

Filtra / Riduci le dimensioni del set di dati

Cerca opportunità per filtrare i dati il prima possibile nella pipeline dell’applicazione. Se c’è un’operazione di filtro e sei interessato solo a fare analisi per un sottoinsieme dei dati, applica questo filtro in anticipo. Se è possibile ridurre la dimensione del set di dati in anticipo, farlo. Utilizzare predicati di filtro appropriati nella query SQL in modo che Spark possa spingerli verso l’origine dati sottostante; i predicati selettivi sono buoni. Usali come appropriato. Utilizzare i filtri di partizione se sono applicabili.

Cache in modo appropriato

Spark supporta la memorizzazione nella cache dei set di dati in memoria. Ci sono diverse opzioni disponibili:

  • Utilizzare il caching quando la stessa operazione viene calcolata più volte nel flusso della pipeline.
  • Utilizzare il caching utilizzando l’API persist per abilitare l’impostazione cache richiesta (persistere su disco o meno, serializzato o meno).
  • Essere consapevoli del caricamento pigro e della cache prime, se necessario, in anticipo. Alcune API sono desiderose e altre no.
  • Controlla la scheda di archiviazione dell’interfaccia utente Spark per visualizzare le informazioni sui set di dati memorizzati nella cache.
  • È buona norma deselezionare il set di dati memorizzato nella cache quando si è finito di utilizzarli per rilasciare risorse, in particolare quando si hanno altre persone che utilizzano il cluster.

Join

Join è, in generale, un’operazione costosa, quindi fai attenzione ai join nella tua applicazione per ottimizzarli. BroadcastHashJoin è più performante per i casi in cui una delle relazioni è abbastanza piccola da poter essere trasmessa. Di seguito sono riportati alcuni suggerimenti:

  • Iscriviti questioni ordine; inizia con il join più selettivo. Per le relazioni inferiori a spark.sql.autoBroadcastJoinThreshold, è possibile verificare se broadcast HashJoin viene prelevato.
  • Utilizzare i suggerimenti SQL se necessario per forzare un tipo specifico di join.
    • Esempio: Quando si unisce un piccolo set di dati con un set di dati di grandi dimensioni, un join broadcast può essere costretto a trasmettere il piccolo set di dati.
    • Conferma che Spark sta rilevando broadcast hash join; in caso contrario, è possibile forzarlo utilizzando il suggerimento SQL.
  • Evitare i cross-join.
  • Broadcast HashJoin è più performante, ma potrebbe non essere applicabile se entrambe le relazioni in join sono grandi.
  • Raccogliere statistiche sulle tabelle per Spark per calcolare un piano ottimale.

Sintonizza le risorse del cluster

  • Sintonizza le risorse del cluster in base al gestore risorse e alla versione di Spark.
  • Sintonizzare la memoria disponibile sul driver: spark.driver.memoria.
  • Sintonizzare il numero di esecutori e l’utilizzo della memoria e del core in base alle risorse del cluster: executor-memory, num-executors e executor-core.

Controlla la documentazione di configurazione per la versione Spark con cui stai lavorando e usa i parametri appropriati.

Evita operazioni costose

  • Evita ordina per se non è necessario.
  • Quando si scrivono le query, invece di utilizzare select * per ottenere tutte le colonne, recuperare solo le colonne rilevanti per la query.
  • Non chiamare count inutilmente.

Inclinazione dei dati

  • Assicurarsi che le partizioni siano di dimensioni uguali per evitare l’inclinazione dei dati e problemi di basso utilizzo della CPU.
    • Ad esempio: se si hanno dati provenienti da un’origine dati JDBC in parallelo e ciascuna di queste partizioni non recupera un numero simile di record, ciò comporterà attività di dimensioni disuguali (una forma di inclinazione dei dati). Forse una partizione è solo di pochi KB, mentre un’altra è di poche centinaia di MB. Alcune attività saranno più grandi di altre e mentre gli esecutori su attività più grandi saranno occupati, gli altri esecutori, che gestiscono l’attività più piccola, finiranno e saranno inattivi.
    • Se i dati all’origine non sono partizionati in modo ottimale, è anche possibile valutare i compromessi dell’utilizzo di ripartition per ottenere una partizione bilanciata e quindi utilizzare il caching per mantenerlo in memoria, se appropriato.
  • La ripartizione causerà uno shuffle e lo shuffle è un’operazione costosa, quindi questo dovrebbe essere valutato su base applicativa.
  • Utilizzare l’interfaccia utente Spark per cercare le dimensioni delle partizioni e la durata dell’attività.

UDFs

Spark dispone di una serie di funzioni predefinite definite dall’utente (UDFS) disponibili. Per le prestazioni, verificare se è possibile utilizzare una delle funzioni integrate poiché sono buone per le prestazioni. Le UDF personalizzate nell’API Scala sono più performanti delle UDF Python. Se devi usare l’API Python, usa l’UDF panda appena introdotto in Python che è stato rilasciato in Spark 2.3. Il supporto pandas UDF (vectorized UDFs) in Spark ha miglioramenti significativi delle prestazioni rispetto alla scrittura di un UDF Python personalizzato. Ottenere ulteriori informazioni sulla scrittura di un panda UDF.

Spero che questo ti sia stato utile mentre scrivevi le tue applicazioni Spark. Felice sviluppo! In un prossimo blog, vi mostrerò come ottenere il piano di esecuzione per il vostro lavoro Spark.

Lascia un commento

Il tuo indirizzo email non sarà pubblicato.