Poznaj najlepsze praktyki optymalizacji wydajności Spark-IBM Developer

count() w zbiorze danych jest akcją Spark. Jest to częsty problem, który widziałem, gdy istnieje wiele wywołań count () w aplikacjach Spark, które są dodawane podczas debugowania i nie są usuwane. Dobrze jest poszukać akcji Spark i usunąć te, które nie są konieczne, ponieważ nie chcemy używać cykli procesora i innych zasobów, gdy nie są wymagane.

formaty plików

projektując zbiory danych dla swojej aplikacji, upewnij się, że korzystasz z formatów plików dostępnych w aplikacji Spark. Kilka rzeczy do rozważenia:

  • Spark jest zoptymalizowany dla Apache Parquet i ORC pod kątem przepustowości odczytu. Spark ma obsługę wektoryzacji, która zmniejsza We/Wy dysków.
  • Użyj formatu pliku parkiet i użyj kompresji.
  • istnieją różne formaty plików i wbudowane źródła danych, które mogą być używane w Apache Spark.Używaj rozdzielanych formatów plików.
  • upewnij się, że nie ma zbyt wielu małych plików. Jeśli masz wiele małych plików, może sensowne jest ich zagęszczanie dla lepszej wydajności.

równoległość

  • zwiększ liczbę partycji Spark, aby zwiększyć równoległość w oparciu o rozmiar danych. Upewnij się, że zasoby klastra są optymalnie wykorzystywane. Zbyt mało partycji może spowodować, że niektóre executory będą bezczynne, podczas gdy zbyt wiele partycji może spowodować obciążenie harmonogramu zadań.
  • dostrajanie partycji i zadań. Spark może obsługiwać zadania 100ms+ i zaleca co najmniej 2-3 zadania na rdzeń dla wykonawcy.
  • Spark decyduje o liczbie partycji na podstawie wprowadzonego rozmiaru pliku. Czasami warto wyraźnie określić liczbę partycji.
    • odczyt API pobiera opcjonalną liczbę partycji.
    • sql.pliki.maxPartitionBytes, dostępny w Spark v2. 0.0, dla parkietu, ORC i JSON.
  • partycje shuffle mogą być dostrojone przez ustawienie spark.sql.przetasuj.partycje, która domyślnie wynosi 200. Jest to naprawdę małe, jeśli masz duże rozmiary zbiorów danych.

Reduce shuffle

Shuffle jest kosztowną operacją, ponieważ polega na przenoszeniu danych między węzłami klastra, co wiąże się z We/Wy sieciowymi i dyskowymi.zawsze dobrym pomysłem jest zmniejszenie ilości danych, które należy tasować. Oto kilka wskazówek, jak zmniejszyć tasowanie:

  • nastroić iskrę .sql.przetasuj.partycje .
  • odpowiednio Partycjonuj wejściowy zestaw danych, aby każdy rozmiar zadania nie był zbyt duży.
  • Użyj interfejsu Spark, aby przestudiować plan, aby szukać okazji do jak największego zmniejszenia przetasowania.
  • wzór rekomendacji dla iskry.sql.przetasuj.przegrody:
    • w przypadku dużych zbiorów danych, celuj w dowolnym miejscu od 100MB do mniej niż 200MB docelowego rozmiaru zadania dla partycji (użyj na przykład docelowego rozmiaru 100MB).
    • sql.przetasuj.partitions = quotient (shuffle stage input size/target size)/total rdzenie) * total rdzenie.

Filtruj / zmniejszaj rozmiar zbioru danych

poszukaj możliwości jak najwcześniejszego filtrowania danych w potoku aplikacji. Jeśli istnieje operacja filtrowania i jesteś zainteresowany wykonaniem analizy tylko dla podzbioru danych, zastosuj ten filtr wcześnie. Jeśli możesz zmniejszyć rozmiar zestawu danych wcześniej, zrób to. Użyj odpowiednich predykatów filtrowania w zapytaniu SQL, aby Spark mógł je wypchnąć do podstawowego źródła danych; predykaty selektywne są dobre. Użyj ich odpowiednio. Użyj filtrów partycji, jeśli mają zastosowanie.

odpowiednio Cache

Spark obsługuje buforowanie zbiorów danych w pamięci. Dostępne są różne opcje:

  • użyj buforowania, gdy ta sama operacja jest obliczana wielokrotnie w przepływie rurociągu.
  • użyj buforowania przy użyciu interfejsu persist API, aby włączyć wymagane ustawienie pamięci podręcznej (persist to disk or not; serialized or not).
  • bądź świadomy leniwego ładowania i prime cache w razie potrzeby z góry. Niektóre API są chętne, a niektóre nie.
  • Sprawdź kartę pamięć w interfejsie Spark, aby zobaczyć informacje o przechowywanych zbiorach danych.
  • dobrą praktyką jest odłączanie buforowanego zestawu danych po zakończeniu ich używania w celu uwolnienia zasobów, szczególnie gdy masz inne osoby korzystające z klastra.

Join

Join jest ogólnie kosztowną operacją, więc zwróć uwagę na połączenia w aplikacji, aby je zoptymalizować. BroadcastHashJoin jest najbardziej wydajny w przypadkach, gdy jedna z relacji jest na tyle mała, że może być transmitowana. Poniżej kilka wskazówek:

  • Dołącz do sprawy porządku; zacznij od najbardziej selektywnego łączenia. Dla relacji mniejszych niż .sql.autoBroadcastJoinThreshold, możesz sprawdzić, czy transmisja HashJoin jest odbierana.
  • Użyj podpowiedzi SQL w razie potrzeby, aby wymusić określony typ połączenia.
    • przykład: podczas łączenia małego zbioru danych z dużym zbiorem danych, połączenie transmisji może być wymuszone do transmisji małego zbioru danych.
    • potwierdź, że Spark odbiera broadcast hash join; jeśli nie, można go wymusić za pomocą podpowiedzi SQL.
  • unikaj łączenia krzyżowego.
  • Broadcast HashJoin jest najbardziej wydajny, ale może nie mieć zastosowania, jeśli oba relacje w join są duże.
  • Zbieraj statystyki tabel dla Spark, aby obliczyć optymalny plan.

dostrajaj zasoby klastra

  • dostrajaj zasoby klastra w zależności od Menedżera zasobów i wersji Spark.
  • Dostrój dostępną pamięć do sterownika: spark.kierowca.pamięć.
  • dostrajanie liczby executorów oraz wykorzystania pamięci i rdzenia w oparciu o zasoby w klastrze: Executor-memory, num-executors i Executor-cores.

zapoznaj się z dokumentacją konfiguracji wersji Spark, z którą pracujesz, i użyj odpowiednich parametrów.

unikaj kosztownych operacji

  • unikaj zamawiania przez, jeśli nie jest to potrzebne.
  • podczas pisania zapytań, zamiast używać Wybierz*, aby pobrać wszystkie kolumny, pobieraj tylko kolumny odpowiednie dla Twojego zapytania.
  • nie dzwoń niepotrzebnie.

pochylanie danych

  • upewnij się, że partycje są równe, aby uniknąć pochylania danych i problemów z niskim wykorzystaniem procesora.
    • jako przykład: jeśli masz dane przychodzące ze źródła danych JDBC równolegle, a każda z tych partycji nie pobiera podobnej liczby rekordów, spowoduje to zadania o nierównym rozmiarze (forma pochylenia danych). Może jedna partycja to tylko kilka KB, podczas gdy druga to kilkaset MB. Niektóre zadania będą większe niż inne, a podczas gdy wykonawcy na większych zadaniach będą zajęci, inni wykonawcy, którzy obsługują mniejsze zadanie, zakończą i będą bezczynni.
    • jeśli dane w źródle nie są optymalnie podzielone na partycje, możesz również ocenić kompromisy wynikające z użycia repartition, aby uzyskać zrównoważoną partycję, a następnie użyć buforowania, aby w razie potrzeby zachować ją w pamięci.
  • Repartycja spowoduje przetasowanie, a przetasowanie jest kosztowną operacją, więc powinna być oceniana na podstawie aplikacji.
  • Użyj interfejsu Spark, aby sprawdzić rozmiary partycji i czas trwania zadania.

UDFs

Spark ma wiele wbudowanych funkcji zdefiniowanych przez użytkownika (UDFs). Aby uzyskać wydajność, sprawdź, czy możesz użyć jednej z wbudowanych funkcji, ponieważ są one dobre dla wydajności. Niestandardowe pliki UDF w interfejsie API Scala są bardziej wydajne niż pliki UDF w Pythonie. Jeśli musisz użyć API Pythona, Użyj nowo wprowadzonego pandas UDF w Pythonie, który został wydany w Spark 2.3. Obsługa pandas UDF (wektoryzowane UDFs) w Spark ma znaczną poprawę wydajności w przeciwieństwie do pisania niestandardowego Pythona UDF. Uzyskaj więcej informacji na temat pisania pandy UDF.

mam nadzieję, że to było pomocne dla ciebie, gdy idziesz na pisanie aplikacji Spark. Szczęśliwego rozwoju! W nadchodzącym blogu pokażę, jak uzyskać plan wykonania dla swojej pracy Spark.

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany.