Udforsk bedste praksis for optimering af Spark – ydeevne – IBM Developer

count() på et datasæt en Gnisthandling. Det er et almindeligt problem, som jeg har set, hvor der er flere count() opkald i Spark-applikationer, der tilføjes under debugging, og de bliver ikke fjernet. Det er en god ide at kigge efter Gnisthandlinger og fjerne alt, hvad der ikke er nødvendigt, fordi vi ikke ønsker at bruge CPU-cyklusser og andre ressourcer, når det ikke kræves.

filformater

når du designer dine datasæt til din applikation, skal du sikre dig, at du udnytter de filformater, der er tilgængelige med Spark, bedst muligt. Nogle ting at overveje:

  • Spark er optimeret til Apache parket og ORC for læse gennemløb. Spark har vektoriseringsstøtte, der reducerer disk I/O. kolonneformater fungerer godt.
  • brug Parketfilformatet og gør brug af komprimering.
  • der er forskellige filformater og indbyggede datakilder, der kan bruges i Apache Spark.Brug opdelelige filformater.
  • sørg for, at der ikke er for mange små filer. Hvis du har mange små filer, kan det være fornuftigt at gøre komprimering af dem for bedre ydeevne.

parallelisme

  • Forøg antallet af Gnistpartitioner for at øge paralleliteten baseret på dataens størrelse. Sørg for, at klyngeressourcerne udnyttes optimalt. For få partitioner kan resultere i, at nogle eksekutorer er inaktive, mens for mange partitioner kan resultere i overhead af Opgaveplanlægning.
  • Indstil partitioner og opgaver. Spark kan håndtere opgaver på 100 ms+ og anbefaler mindst 2-3 opgaver pr.
  • Spark bestemmer antallet af partitioner baseret på filstørrelsen input. Til tider er det fornuftigt at angive antallet af partitioner eksplicit.
    • Læs API tager et valgfrit antal partitioner.
    • gnist.SQL.fil.maks. partitionbytes, Fås i Spark v2.0.0, til parket, ORK og JSON.
  • shuffle-partitionerne kan indstilles ved at indstille spark.SQL.blande.partitioner , som standard er 200. Dette er virkelig lille, hvis du har store datasætstørrelser.

reducer shuffle

Shuffle er en dyr operation, da det indebærer at flytte data over noderne i din klynge, hvilket involverer netværk og disk I/O. Det er altid en god ide at reducere mængden af data, der skal blandes. Her er nogle tips til at reducere shuffle:

  • Tune den gnist.SQL.blande.partitioner.
  • partitioner inputdatasættet korrekt, så hver opgavestørrelse ikke er for stor.
  • brug Spark UI til at studere planen for at se efter mulighed for at reducere blandingen så meget som muligt.
  • formel anbefaling til spark.SQL.blande.skillevægge:
    • for store datasæt skal du sigte mod hvor som helst fra 100 MB til mindre end 200 MB opgavemålstørrelse for en partition (brug f.eks. målstørrelse på 100 MB).
    • gnist.SQL.blande.partitioner = kvotient (shuffle stage input størrelse/mål størrelse)/total kerner) * Total kerner.

Filtrer / reducer datasætstørrelse

se efter muligheder for at filtrere data så tidligt som muligt i din applikationsrørledning. Hvis der er en filterhandling, og du kun er interesseret i at foretage analyse for en delmængde af dataene, skal du anvende dette filter tidligt. Hvis du kan reducere datasætstørrelsen tidligt, skal du gøre det. Brug passende filterprædikater i din forespørgsel, så Spark kan skubbe dem ned til den underliggende datakilde; selektive prædikater er gode. Brug dem efter behov. Brug partitionsfiltre, hvis de er relevante.

Cache passende

Spark understøtter caching af datasæt i hukommelsen. Der er forskellige muligheder:

  • brug caching, når den samme operation beregnes flere gange i rørledningsstrømmen.
  • brug caching ved hjælp af vedvarende API for at aktivere den krævede cache-indstilling (fortsæt til disk eller ej; serialiseret eller ej).
  • Vær opmærksom på doven indlæsning og prime cache, hvis det er nødvendigt på forhånd. Nogle API ‘ er er ivrige, og nogle er det ikke.
  • Tjek Spark UI ‘ s Lagringsfane for at se oplysninger om de datasæt, du har cachelagret.
  • det er god praksis at unpersistere dit cachelagrede datasæt, når du er færdig med at bruge dem for at frigive ressourcer, især når du også har andre mennesker, der bruger klyngen.

Join

Join er generelt en dyr operation, så vær opmærksom på joinforbindelserne i din ansøgning for at optimere dem. BroadcastHashJoin er mest performant for tilfælde, hvor et af relationerne er lille nok til at det kan udsendes. Nedenfor er nogle tips:

  • Deltag i ordre spørgsmål; start med den mest selektive deltagelse. For relationer mindre end gnist.SQL.autoBroadcastJoinThreshold, du kan kontrollere, om broadcast HashJoin er afhentet.
  • Brug tip om nødvendigt for at tvinge en bestemt type sammenføjning.
    • eksempel: når du tilslutter dig et lille datasæt med stort datasæt, kan en udsendelsesforbindelse blive tvunget til at udsende det lille datasæt.
    • Bekræft, at Spark optager broadcast hash join; hvis ikke, kan man tvinge det ved hjælp af tip.
  • undgå cross-joints.
  • Broadcast HashJoin er mest performant, men kan ikke være relevant, hvis begge relationer i join er store.
  • Saml statistik over tabeller for Spark til at beregne en optimal plan.

Indstil klyngeressourcer

  • Indstil ressourcerne på klyngen afhængigt af ressourceadministratoren og versionen af Spark.
  • Indstil den tilgængelige hukommelse til driveren: spark.driver.hukommelse.
  • Indstil antallet af eksekutorer og hukommelse og kerneforbrug baseret på ressourcer i klyngen: eksekutor-hukommelse, num-eksekutorer og eksekutor-kerner.

Tjek konfigurationsdokumentationen for den Gnistudgivelse, du arbejder med, og brug de relevante parametre.

undgå dyre operationer

  • undgåbestil med hvis det ikke er nødvendigt.
  • når du skriver dine forespørgsler, i stedet for at bruge Vælg * for at få alle kolonnerne, skal du kun hente de kolonner, der er relevante for din forespørgsel.
  • ring ikke unødigt.

data skævhed

  • sørg for, at partitionerne er ens i størrelse for at undgå data skævhed og lav CPU-udnyttelse spørgsmål.
    • som et eksempel: hvis du har data, der kommer ind fra en JDBC-datakilde parallelt, og hver af disse partitioner ikke henter et lignende antal poster, vil dette resultere i opgaver i ulige størrelse (en form for dataskævning). Måske er en partition kun et par KB, mens en anden er et par hundrede MB. Nogle opgaver vil være større end andre, og mens eksekutorerne på større opgaver vil være optaget, vil de andre eksekutorer, der håndterer den mindre opgave, afslutte og være inaktive.
    • hvis data ved kilden ikke er partitioneret optimalt, kan du også evaluere afvejningerne ved at bruge repartition for at få en afbalanceret partition og derefter bruge caching til at fortsætte den i hukommelsen, hvis det er relevant.
  • Repartition vil forårsage en shuffle, og shuffle er en dyr operation, så dette bør evalueres på ansøgningsbasis.
  • brug Spark UI til at kigge efter partitionsstørrelser og opgavevarighed.

UDFs

Spark har en række indbyggede brugerdefinerede funktioner (UDFs) til rådighed. For ydeevne skal du kontrollere, om du kan bruge en af de indbyggede funktioner, da de er gode til ydeevne. Brugerdefinerede UDFs i Scala API er mere performant end Python UDFs. Hvis du skal bruge Python API, skal du bruge den nyligt introducerede pandas UDF i Python, der blev frigivet i Spark 2.3. Pandas udf (vektoriseret UDFs) support i Spark har betydelige præstationsforbedringer i modsætning til at skrive en brugerdefineret Python udf. Få mere information om at skrive en pandas UDF.

jeg håber, at dette var nyttigt for dig, når du går i gang med at skrive dine Spark-applikationer. Glad udvikling! I en kommende blog, Jeg vil vise, hvordan du får udførelsesplanen for dit Gnistjob.

Skriv et svar

Din e-mailadresse vil ikke blive publiceret.