GNU/Linux >> Linux の 問題 >  >> Ubuntu

初心者のためのSparkストリーミングガイド

はじめに

Spark Streamingは、ライブストリーミングと大規模データの処理のためのSparkAPIへの追加です。大量の非構造化生データを処理して後からクリーンアップする代わりに、SparkStreamingはほぼリアルタイムのデータ処理と収集を実行します。

この記事では、Spark Streamingとは何か、その仕組みを説明し、ストリーミングデータのユースケースの例を示します。

前提条件

  • Apache Sparkがインストールおよび構成されている(ガイドに従ってください:UbuntuにSparkをインストールする方法、Windows 10にSparkをインストールする方法)
  • Spark用に設定された環境(JupyterノートブックでPysparkを使用します)。
  • データストリーム(Twitter APIを使用します)。
  • Pythonライブラリtweepy json 、およびソケット Twitterからデータをストリーミングする場合(pipを使用してインストールします)。

Spark Streamingとは何ですか?

Spark Streamingは、ほぼ連続したデータストリームを処理するためのSparkライブラリです。コアの抽象化は離散化されたストリームです データをバッチに分割するためにSparkDStreamAPIによって作成されます。 DStreamAPIはSparkRDD(Resilient Distributed Datasets)を利用しており、SparkSQLやMLlibなどの他のApacheSparkモジュールとのシームレスな統合を可能にします。

企業は、さまざまなユースケースでSparkStreamingのパワーを活用しています。

  • ライブストリームETL –保存前のデータのクリーニングと結合。
  • 継続的な学習 –機械学習モデルを常に新しい情報で更新します。
  • イベントのトリガー –リアルタイムで異常を検出します。
  • データの充実 –保存前にデータに統計情報を追加します。
  • ライブの複雑なセッション –分析のためにユーザーアクティビティをグループ化します。

ストリーミングアプローチにより、より迅速な顧客行動分析、より迅速な推奨システム、およびリアルタイムの不正検出が可能になります。エンジニアの場合、データが収集されるときに、IoTデバイスからのあらゆる種類のセンサーの異常が表示されます。

SparkStreamingの側面

Spark Streamingは、バッチワークロードとストリーミングワークロードの両方をネイティブにサポートし、データフィードにエキサイティングな改善を提供します。この独自の側面は、最新のデータストリーミングシステムの次の要件を満たしています。

  • 動的負荷バランス。 データはマイクロバッチに分割されるため、ボトルネックはもはや問題ではありません。従来のアーキテクチャは、一度に1つのレコードを処理し、計算量の多いパーティションが発生すると、そのノード上の他のすべてのデータをブロックします。 Spark Streamingを使用すると、タスクはワーカー間で分割され、使用可能なリソースに応じて、処理が長くなるタスクと短いタスクが処理されます。
  • 障害の回復。 1つのノードで失敗したタスクは、他のワーカー間で離散化および分散されます。ワーカーノードが計算を実行している間、ストラグラーには回復する時間があります。
  • インタラクティブな分析。 DStreamsは一連のRDDです。ワーカーメモリに保存されたストリーミングデータのバッチは、インタラクティブにクエリを実行します。
  • 高度な分析。 DStreamsによって生成されたRDDは、SQLでクエリを実行するDataFrameに変換され、MLlibなどのライブラリに拡張されて、機械学習モデルを作成し、ストリーミングデータに適用します。
  • ストリームパフォーマンスの向上。 バッチでストリーミングすると、スループットパフォーマンスが向上し、数百ミリ秒という短いレイテンシが活用されます。

SparkStreamingの利点と欠点

Spark Streamingを含むすべてのテクノロジーには、長所と短所があります。

長所 短所
複雑なタスクの卓越した速度パフォーマンス 大量のメモリ消費
フォールトトレランス 使いにくく、デバッグし、学ぶのが難しい
クラウドクラスターに簡単に実装 十分に文書化されておらず、学習リソースが不足しています
多言語サポート データの視覚化が不十分
CassandraやMongoDBなどのビッグデータフレームワークの統合 少量のデータで遅い
複数の種類のデータベースに参加する機能 機械学習アルゴリズムはほとんどありません

Spark Streamingはどのように機能しますか?

Spark Streamingは、大規模で複雑なほぼリアルタイムの分析を扱います。分散ストリーム処理パイプラインは、次の3つのステップを経ます。

1.受信 ライブストリーミングソースからのストリーミングデータ。

2.プロセス 並列のクラスター上のデータ。

3.出力 処理されたデータをシステムに変換します。

SparkStreamingアーキテクチャ

Spark Streamingのコアアーキテクチャは、バッチの離散化されたストリーミングにあります。ストリーム処理パイプラインを一度に1レコード通過する代わりに、マイクロバッチは動的に割り当てられて処理されます。したがって、データは利用可能なリソースと地域に基づいてワーカーに転送されます。

データが到着すると、受信者はそれをRDDのパーティションに分割します。 RDDはSparkデータセットの基本的な抽象化であるため、RDDに変換すると、Sparkコードとライブラリを使用してバッチを処理できます。

Spark Streaming Sources

データストリームには、ソースから受信したデータが必要です。 Sparkストリーミングは、これらのソースを2つのカテゴリに分類します。

  • 基本的な情報源。 ソケット接続やHDFSと互換性のあるファイルシステムなど、ストリーミングコアAPIで直接利用できるソース
  • 高度なソース。 ソースにはリンクの依存関係が必要であり、KafkaやKinesisなどのストリーミングコアAPIでは利用できません。

各入力DStreamはレシーバーに接続します。データの並列ストリームの場合、複数のDStreamを作成すると、複数のレシーバーも生成されます。

Spark Streaming Operations

Spark Streamingには、さまざまな種類の操作の実行が含まれます。

1.変換操作 RDDに適用されるものと同様に、入力DStreamから受信したデータを変更します。変換操作は遅延評価され、データが出力に到達するまで実行されません。

2.出力操作 DStreamsをデータベースやファイルシステムなどの外部システムにプッシュします。外部システムに移動すると、変換操作がトリガーされます。

3.DataFrameおよびSQL操作 RDDをDataFrameに変換し、クエリを実行するための一時テーブルとして登録するときに発生します。

4.MLlib操作 次のような機械学習アルゴリズムを実行するために使用されます:

  • ストリーミングアルゴリズム ストリーミング線形回帰やストリーミングk-meansなどのライブデータに適用します。
  • オフラインアルゴリズム 履歴データを使用してオフラインでモデルを学習し、アルゴリズムをオンラインでストリーミングデータに適用するため。

SparkStreamingの例

ストリーミングの例の構造は次のとおりです。

アーキテクチャは2つの部分に分割され、2つのファイルから実行されます。

  • 最初のファイルを実行する Twitter APIとの接続を確立し、TwitterAPIとSparkの間にソケットを作成します。ファイルを実行し続けます。
  • 2番目のファイルを実行する データのストリーミングを要求して開始し、処理されたツイートをコンソールに印刷します。未処理の送信済みデータが最初のファイルに出力されます。

Twitterリスナーオブジェクトを作成する

TweetListener オブジェクトは、 StreamListenerを使用してTwitterストリームからのツイートをリッスンします tweepyから 。サーバー(ローカル)へのソケットでリクエストが行われると、 TweetListener データをリッスンし、ツイート情報(ツイートテキスト)を抽出します。拡張Tweetオブジェクトが使用可能な場合、TweetListenerは拡張をフェッチします。 フィールド、それ以外の場合は text フィールドがフェッチされます。最後に、リスナーは __ endを追加します 各ツイートの最後に。このステップは後で、Sparkでデータストリームをフィルタリングするのに役立ちます。

import tweepy
import json
from tweepy.streaming import StreamListener
class TweetListener(StreamListener):
  # tweet object listens for the tweets
    def __init__(self, csocket):
        self.client_socket = csocket
    def on_data(self, data):
        try:  
            # Load data
            msg = json.loads(data)
            # Read extended Tweet if available
            if "extended_tweet" in msg:
                # Add "__end" at the end of each Tweet 
                self.client_socket\
                    .send(str(msg['extended_tweet']['full_text']+" __end")\
                    .encode('utf-8'))         
                print(msg['extended_tweet']['full_text'])
            # Else read Tweet text
            else:
                # Add "__end" at the end of each Tweet
                self.client_socket\
                    .send(str(msg['text']+"__end")\
                    .encode('utf-8'))
                print(msg['text'])
            return True
        except BaseException as e:
            print("error on_data: %s" % str(e))
        return True
    def on_error(self, status):
        print(status)
        return True

接続でエラーが発生した場合、コンソールは情報を出力します。

Twitter開発者の資格情報を収集する

Twitterの開発者ポータル TwitterとのAPI接続を確立するためのOAuthクレデンシャルが含まれています。情報はアプリケーションにありますキーとトークン タブ。

データを収集するには:

1.APIキーとシークレットを生成します コンシューマーキーにあります プロジェクトのセクションと情報を保存します:

コンシューマーキー ユーザー名など、自分の身元をTwitterに確認します。

2.アクセストークンとシークレットを生成します 認証トークンから セクションを作成して情報を保存します:

認証トークン Twitterから特定のデータを取得できるようにします。

TwitterAPIからソケットにデータを送信する

開発者の資格情報を使用して、 API_KEYに入力します 、 API_SECRET ACCESS_TOKEN 、および ACCESS_SECRET TwitterAPIにアクセスするため。

関数sendData クライアントがリクエストを行うときにTwitterストリームを実行します。ストリームリクエストが最初に検証され、次にリスナーオブジェクトが作成され、キーワードと言語に基づいてストリームデータがフィルタリングされます。

例:

from tweepy import Stream
from tweepy import OAuthHandler
API_KEY = "api_key"
API_SECRET = "api_secret"
ACCESS_TOKEN = "access_token"
ACCESS_SECRET = "access_secret"
def sendData(c_socket, keyword):
    print("Start sending data from Twitter to socket")
    # Authentication based on the developer credentials from twitter
    auth = OAuthHandler(API_KEY, API_SECRET)
    auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
    # Send data from the Stream API
    twitter_stream = Stream(auth, TweetListener(c_socket))
    # Filter by keyword and language
    twitter_stream.filter(track = keyword, languages=["en"])

サーバー上にリスニングTCPソケットを作成する

最初のファイルの最後の部分には、ローカルサーバーでのリスニングソケットの作成が含まれます。アドレスとポートはバインドされており、Sparkクライアントからの接続をリッスンします。

例:

import socket
if __name__ == "__main__":
    # Create listening socket on server (local)
    s = socket.socket()
    # Host address and port
    host = "127.0.0.1"
    port = 5555
    s.bind((host, port))
    print("Socket is established")
    # Server listens for connections
    s.listen(4)
    print("Socket is listening")
    # Return the socket and the address of the client
    c_socket, addr = s.accept()
    print("Received request from: " + str(addr))
    # Send data to client via socket for selected keyword
    sendData(c_socket, keyword = ['covid'])

Sparkクライアントがリクエストを行うと、クライアントのソケットとアドレスがコンソールに出力されます。次に、選択したキーワードフィルタに基づいて、データストリームがクライアントに送信されます。
このステップで、最初のファイルのコードを終了します。実行すると、次の情報が出力されます。

ファイルを実行し続け、Sparkクライアントの作成に進みます。

SparkDStreamレシーバーを作成する

別のファイルで、1秒のバッチ間隔でSparkコンテキストとローカルストリーミングコンテキストを作成します。クライアントはホスト名とポートソケットから読み取ります。

import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(appName="tweetStream")
# Create a local StreamingContext with batch interval of 1 second
ssc = StreamingContext(sc, 1)
# Create a DStream that conencts to hostname:port
lines = ssc.socketTextStream("127.0.0.1", 5555)

データの前処理

RDDの前処理には、受信したデータ行を __ endで分割することが含まれます。 が表示され、テキストが小文字に変わります。最初の10個の要素がコンソールに出力されます。

# Split Tweets
words = lines.flatMap(lambda s: s.lower().split("__end"))
# Print the first ten elements of each DStream RDD to the console
words.pprint()

コードを実行した後、評価が怠惰であるため、何も起こりません。ストリーミングコンテキストが開始すると、計算が開始されます。

ストリーミングコンテキストと計算を開始する

ストリーミングコンテキストを開始すると、ホストにリクエストが送信されます。ホストは収集したデータをTwitterからSparkクライアントに送り返し、クライアントはデータを前処理します。その後、コンソールは結果を出力します。

# Start computing
ssc.start()        
# Wait for termination
ssc.awaitTermination()

ストリーミングコンテキストを開始すると、受信したリクエストが最初のファイルに出力され、生データテキストがストリーミングされます。

2番目のファイルはソケットから毎秒データを読み取り、前処理がデータに適用されます。接続が確立されるまで、最初の数行は空です。

ストリーミングコンテキストはいつでも終了する準備ができています。


Ubuntu
  1. 初心者向けのYAML

  2. 仮想化の概要:初心者向けの包括的なガイド

  3. Device Mapper (DM) マルチパスの初心者向けガイド

  1. 初心者向けのLinuxターミナルガイド

  2. Ubuntu 18.04にMongoDBをインストールする方法–初心者向けガイド

  3. SELinux 初心者向けガイド

  1. Cron Job:初心者向けの包括的なガイド2022

  2. Dockerコンテナとは:初心者向け入門ガイド

  3. 初心者向けのDNFコマンドの例