Sparkパフォーマンス最適化のベスト-プラクティスを探る-IBM Developer

count()はSparkアクションです。 デバッグ中に追加され、削除されないSparkアプリケーションに複数のcount()呼び出しがある場合、私が見た共通の問題です。 Sparkアクションを探して、必要でないときにCPUサイクルやその他のリソースを使用したくないため、必要でないものを削除することをお勧めします。

ファイル形式

アプリケーション用にデータセットを設計するときは、Sparkで使用可能なファイル形式を最大限に活用していることを確認してくださ 考慮すべきいくつかのこと:

  • Sparkはapache Parquetとorc用に最適化されており、読み取りスループットが向上しています。 Sparkには、ディスクI/Oを削減するベクトル化サポートがあります。
  • Parquetファイル形式を使用し、圧縮を使用します。
  • Apache Sparkで使用できるさまざまなファイル形式と組み込みのデータソースがあります。分割可能なファイル形式を使用します。
  • 小さなファイルが多すぎないようにしてください。 多くの小さなファイルがある場合は、パフォーマンスを向上させるためにそれらの圧縮を行うのが理にかなっています。

並列処理

  • Sparkパーティションの数を増やして、データのサイズに基づいて並列処理を増やします。 クラスターリソースが最適に利用されていることを確認します。 パーティションが少なすぎると、一部のエグゼキュータがアイドル状態になる可能性がありますが、パーティションが多すぎるとタスクスケジュー
  • パーティションとタスクを調整します。 Sparkは100ms+のタスクを処理でき、executorのコアあたり少なくとも2-3のタスクを推奨します。
  • Sparkは、ファイルサイズの入力に基づいてパーティションの数を決定します。 場合によっては、パーティションの数を明示的に指定することが理にかなっています。
    • read APIは、任意の数のパーティションを取ります。
    • sqlです。ファイル。spark v2.0で使用可能なmaxPartitionBytes。Parquet、ORC、およびJSONの場合は0です。
  • シャッフルパーティションは、sparkを設定することで調整できます。sqlです。シャッフルパーティション、デフォルトは200です。 データセットのサイズが大きい場合、これは非常に小さいです。

Reduce shuffle

Shuffleは、ネットワークとディスクI/Oを含むクラスター内のノード間でデータを移動するため、高価な操作です。 シャッフルを減らすためのヒントをいくつか紹介します:

  • sqlです。シャッフル
  • 各タスクのサイズが大きすぎないように、入力データセットを適切に分割します。
  • Spark UIを使用して、シャッフルをできるだけ減らす機会を探すための計画を研究します。
  • sqlです。シャッフル間仕切り:
    • 大規模なデータセットの場合は、パーティションの100MBから200MB未満のタスクターゲットサイズを目指します(たとえば、100MBのターゲットサイズを使用します)。
    • sqlです。シャッフルパーティション=商(シャッフルステージ入力サイズ/ターゲットサイズ)/合計コア)*合計コア。

データセットサイズのフィルター/縮小

アプリケーションパイプラインで可能な限り早期にデータをフィルター処理する機会を探します。 フィルター操作があり、データのサブセットの分析のみに関心がある場合は、このフィルターを早期に適用してください。 データセットのサイズを早期に縮小できる場合は、それを行います。 Sqlクエリで適切なフィルター述語を使用して、Sparkがそれらを基になるデータソースにプッシュできるようにします。 必要に応じて使用してください。 該当する場合は、パーティションフィルタを使用します。

キャッシュ

Sparkはメモリ内のデータセットのキャッシュをサポートしています。 利用可能なさまざまなオプションがあります:

  • パイプラインフローで同じ操作が複数回計算される場合は、キャッシュを使用します。
  • persist APIを使用してキャッシュを使用して、必要なキャッシュ設定を有効にします(ディスクに永続化するかどうか、シリアル化されているかどうか)。
  • 必要に応じて遅延読み込みとプライムキャッシュに注意してください。 一部のApiは熱心であり、一部のApiはそうではありません。
  • キャッシュしたデータセットに関する情報を確認するには、Spark UIのストレージタブをチェックしてください。
  • リソースを解放するために、キャッシュされたデータセットを使用し終わったときに、特にクラスタを使用している他の人がいるときに、キャッシュされたデータセットをアンパーシストすることをお勧めします。

Join

Joinは一般的に高価な操作なので、アプリケーションの結合に注意して最適化してください。 BroadcastHashJoinは、関係の1つが放送できるほど十分に小さい場合に最も効果的です。 以下はいくつかのヒントです:

  • 最も選択的な結合から始めます。 未満の関係の場合は、次のようになります。sqlです。a u t o broadcastjointhresholdでは、broadcastHashJoinがピックアップされているかどうかを確認できます。
  • 必要に応じてSQLヒントを使用して、特定のタイプの結合を強制します。
    • 例:大きなデータセットで小さなデータセットを結合する場合、ブロードキャスト結合は小さなデータセットを強制的にブロードキャストすることがあります。
    • Sparkがbroadcast hash joinを取得していることを確認します。
  • 交差結合は避けてください。
  • BroadcastHashJoinが最も性能的ですが、joinの両方の関係が大きい場合は適用できない可能性があります。
  • Sparkのテーブルの統計を収集して最適な計画を計算します。

クラスターリソースの調整

  • リソースマネージャーとSparkのバージョンに応じて、クラスター上のリソースを調整します。
  • 使用可能なメモリをドライバsparkにチューニングします。ドライバー。メモリ。
  • クラスター内のリソースexecutor-memory、num-executors、executor-coresに基づいて、executorの数とメモリとコアの使用量を調整します。

使用しているSparkリリースの構成ドキュメントを確認し、適切なパラメータを使用します。

高価な操作は避けてください

  • が必要でない場合はによる順序を避けてください。クエリを記述するときは、select*を使用してすべての列を取得するのではなく、クエリに関連する列のみを取得します。
  • countを不必要に呼び出さないでください。

データスキュー

  • データスキューとCPU使用率の低下の問題を回避するために、パーティションのサイズが等しいことを確認します。
    • 例として、JDBCデータソースから並行してデータが入ってきて、それらの各パーティションが同様の数のレコードを取得していない場合、サイズの異なるタス 別のものは数百MBであるのに対し、多分一つのパーティションは、わずか数KBです。 いくつかのタスクは他のタスクよりも大きくなり、大きなタスクのエグゼキュータはビジー状態になりますが、小さなタスクを処理している他のエグゼキュータは終了してアイドル状態になります。
    • ソースのデータが最適に分割されていない場合は、再分割を使用してバランスの取れたパーティションを取得し、必要に応じてキャッシュを使用してメモリに永続化することのトレードオフを評価することもできます。
  • 再分割はシャッフルの原因となり、シャッフルは高価な操作なので、これはアプリケーションベースで評価する必要があります。
  • Spark UIを使用して、パーティションサイズとタスク期間を検索します。Udf

    Sparkには、多数の組み込みユーザー定義関数(Udf)が利用可能です。 パフォーマンスについては、組み込み関数がパフォーマンスに適しているため、いずれかを使用できるかどうかを確認してくださ Scala APIのカスタムUdfは、Python Udfよりもパフォーマンスが高いです。 Python APIを使用する必要がある場合は、Spark2.3でリリースされたPythonで新しく導入されたpandas UDFを使用してください。 Sparkのpandas UDF(ベクトル化されたUdf)サポートは、カスタムPython UDFを作成するのとは対照的に、パフォーマンスが大幅に向上しています。 Pandas UDFを書くことについてのより多くの情報を得なさい。

    あなたがSparkアプリケーションを書くときに、これがあなたに役立つことを願っています。 幸せな開発! 今後のブログでは、Sparkジョブの実行計画を取得する方法を紹介します。

コメントを残す

メールアドレスが公開されることはありません。