Utforska bästa praxis för Spark performance optimization-IBM Developer

count () på en dataset en Gniståtgärd. Det är ett vanligt problem som jag har sett där det finns flera count () samtal i Spark-applikationer som läggs till under felsökning och de tas inte bort. Det är bra att leta efter Spark-åtgärder och ta bort alla som inte är nödvändiga eftersom vi inte vill använda CPU-cykler och andra resurser när det inte behövs.

filformat

när du utformar dina dataset för din applikation, se till att du utnyttjar de filformat som finns tillgängliga med Spark på bästa sätt. Några saker att tänka på:

  • Spark är optimerad för Apache parkett och ORC för läsgenomströmning. Spark har vektoriseringsstöd som minskar disk I/O. kolumnformat fungerar bra.
  • använd Parkettfilformatet och använd komprimering.
  • det finns olika filformat och inbyggda datakällor som kan användas i Apache Spark.Använd delbara filformat.
  • se till att det inte finns för många små filer. Om du har många små filer kan det vara vettigt att komprimera dem för bättre prestanda.

parallellitet

  • öka antalet Gnistpartitioner för att öka parallellitet baserat på storleken på data. Se till att klusterresurserna utnyttjas optimalt. För få partitioner kan leda till att vissa exekutörer är inaktiva, medan för många partitioner kan leda till omkostnader för aktivitetsschemaläggning.
  • Ställ in partitioner och uppgifter. Spark kan hantera uppgifter på 100 ms + och rekommenderar minst 2-3 uppgifter per kärna för en exekutör.
  • Spark bestämmer antalet partitioner baserat på filstorleksinmatningen. Ibland är det vettigt att ange antalet partitioner uttryckligen.
    • läs API tar ett valfritt antal partitioner.
    • gnista.SQL.arkivera.maxPartitionBytes, finns i Spark v2.0.0, för parkett, ORC och JSON.
  • shuffle-partitionerna kan ställas in genom att ställa in spark.SQL.blanda.partitioner , som standard är 200. Detta är verkligen liten om du har stora dataset storlekar.

minska shuffle

Shuffle är en dyr operation eftersom det innebär att flytta data över noderna i ditt kluster, vilket innebär nätverk och disk I/O. det är alltid bra att minska mängden data som behöver blandas. Här är några tips för att minska shuffle:

  • Ställ in gnistan.SQL.blanda.partitioner.
  • partitionera indatauppsättningen på lämpligt sätt så att varje uppgiftsstorlek inte är för stor.
  • använd Spark UI för att studera planen för att leta efter möjlighet att minska shuffle så mycket som möjligt.
  • formel rekommendation för gnista.SQL.blanda.partitioner:
    • för stora datamängder, sikta på allt från 100MB till mindre än 200MB uppgiftsmålstorlek för en partition (använd målstorlek på 100MB, till exempel).
    • gnista.SQL.blanda.partitioner = kvot (shuffle stage ingångsstorlek / målstorlek) / totala kärnor) * totala kärnor.

filtrera / minska dataSet storlek

leta efter möjligheter att filtrera bort data så tidigt som möjligt i din ansökan pipeline. Om det finns en filteroperation och du bara är intresserad av att göra analys för en delmängd av data, använd detta filter tidigt. Om du kan minska datauppsättningsstorleken tidigt, gör det. Använd lämpliga filterpredikat i din SQL-fråga så att Spark kan trycka ner dem till den underliggande datakällan; selektiva predikat är bra. Använd dem efter behov. Använd partitionsfilter om de är tillämpliga.

Cache lämpligt

Spark stöder cachning av datamängder i minnet. Det finns olika alternativ:

  • använd cachning när samma operation beräknas flera gånger i rörledningsflödet.
  • använd cachning med hjälp av persist API för att aktivera den önskade cacheinställningen (fortsätt till disk eller inte; serialiserad eller inte).
  • var medveten om lazy loading och prime cache om det behövs up-front. Vissa API: er är ivriga och andra inte.
  • kolla in Spark UI: s Lagringsflik för att se information om de datamängder du har Cachat.
  • det är bra att unpersist din cachade dataset när du är klar med att använda dem för att frigöra resurser, särskilt när du har andra personer som använder klustret också.

gå med

gå med är i allmänhet en dyr operation, så var uppmärksam på anslutningarna i din ansökan för att optimera dem. BroadcastHashJoin är mest performant för fall där en av relationerna är tillräckligt liten för att den kan sändas. Nedan följer några tips:

  • gå med i orderfrågor; börja med den mest selektiva anslutningen. För relationer mindre än gnista.SQL.autoBroadcastJoinThreshold, du kan kontrollera om sändning HashJoin hämtas.
  • använd SQL-tips om det behövs för att tvinga en viss typ av koppling.
    • exempel: när du ansluter till en liten datauppsättning med stor datauppsättning kan en sändningsanslutning tvingas sända den lilla datauppsättningen.
    • bekräfta att Spark plockar upp broadcast hash join; om inte, kan man tvinga den med SQL-tipset.
  • Undvik korskopplingar.
  • Broadcast HashJoin är mest performant, men kanske inte är tillämpligt om båda relationerna i join är stora.
  • samla statistik på Tabeller för Spark för att beräkna en optimal plan.

Ställ in klusterresurser

  • Ställ in resurserna i klustret beroende på resurshanteraren och versionen av Spark.
  • Ställ in tillgängligt minne till drivrutinen: spark.drivrutin.minne.
  • Ställ in antalet exekutörer och minne och kärnanvändning baserat på resurser i klustret: executor-memory, num-executors och executor-cores.

kolla in konfigurationsdokumentationen för Spark-utgåvan du arbetar med och Använd lämpliga parametrar.

Undvik dyra operationer

  • UndvikBeställ med om det inte behövs.
  • när du skriver dina frågor, istället för att använda Välj * för att få alla kolumner, hämtar du bara de kolumner som är relevanta för din fråga.
  • ring inte räkna onödigt.

data skev

  • se till att partitionerna är lika stora för att undvika data skev och låg CPU-utnyttjande frågor.
    • som ett exempel: om du har data som kommer in från en JDBC-datakälla parallellt, och var och en av dessa partitioner inte hämtar ett liknande antal poster, kommer detta att resultera i olika storleksuppgifter (en form av data skev). Kanske är en partition bara några KB, medan en annan är några hundra MB. Vissa uppgifter kommer att vara större än andra, och medan exekutörerna på större uppgifter kommer att vara upptagna, kommer de andra exekutörerna, som hanterar den mindre uppgiften, att slutföra och vara lediga.
    • om data vid källan inte partitioneras optimalt kan du också utvärdera avvägningarna med att använda ompartition för att få en balanserad partition och sedan använda caching för att fortsätta i minnet om det är lämpligt.
  • Ompartition kommer att orsaka en blandning, och blandning är en dyr operation, så detta bör utvärderas på applikationsbasis.
  • använd Spark-användargränssnittet för att leta efter partitionsstorlekar och uppgiftsvaraktighet.

UDF: er

Spark har ett antal inbyggda användardefinierade funktioner (UDF: er) tillgängliga. För prestanda, kontrollera om du kan använda någon av de inbyggda funktionerna eftersom de är bra för prestanda. Anpassade UDF: er i Scala API är mer prestanda än Python UDF: er. Om du måste använda Python API, använd den nyligen introducerade pandas UDF i Python som släpptes i Spark 2.3. Pandas UDF (vektoriserad UDF) stöd i Spark har betydande prestandaförbättringar i motsats till att skriva en anpassad Python UDF. Få mer information om att skriva en pandas UDF.

jag hoppas att det här var till hjälp för dig när du skriver dina Spark-applikationer. Lycklig utveckling! I en kommande blogg kommer jag att visa hur man får exekveringsplanen för ditt Gnistjobb.

Lämna ett svar

Din e-postadress kommer inte publiceras.