はじめに
Spark Streamingは、ライブストリーミングと大規模データの処理のためのSparkAPIへの追加です。大量の非構造化生データを処理して後からクリーンアップする代わりに、SparkStreamingはほぼリアルタイムのデータ処理と収集を実行します。
この記事では、Spark Streamingとは何か、その仕組みを説明し、ストリーミングデータのユースケースの例を示します。
前提条件
- Apache Sparkがインストールおよび構成されている(ガイドに従ってください:UbuntuにSparkをインストールする方法、Windows 10にSparkをインストールする方法)
- Spark用に設定された環境(JupyterノートブックでPysparkを使用します)。
- データストリーム(Twitter APIを使用します)。
- Pythonライブラリtweepy 、 json 、およびソケット Twitterからデータをストリーミングする場合(pipを使用してインストールします)。
Spark Streamingとは何ですか?
Spark Streamingは、ほぼ連続したデータストリームを処理するためのSparkライブラリです。コアの抽象化は離散化されたストリームです データをバッチに分割するためにSparkDStreamAPIによって作成されます。 DStreamAPIはSparkRDD(Resilient Distributed Datasets)を利用しており、SparkSQLやMLlibなどの他のApacheSparkモジュールとのシームレスな統合を可能にします。
企業は、さまざまなユースケースでSparkStreamingのパワーを活用しています。
- ライブストリームETL –保存前のデータのクリーニングと結合。
- 継続的な学習 –機械学習モデルを常に新しい情報で更新します。
- イベントのトリガー –リアルタイムで異常を検出します。
- データの充実 –保存前にデータに統計情報を追加します。
- ライブの複雑なセッション –分析のためにユーザーアクティビティをグループ化します。
ストリーミングアプローチにより、より迅速な顧客行動分析、より迅速な推奨システム、およびリアルタイムの不正検出が可能になります。エンジニアの場合、データが収集されるときに、IoTデバイスからのあらゆる種類のセンサーの異常が表示されます。
SparkStreamingの側面
Spark Streamingは、バッチワークロードとストリーミングワークロードの両方をネイティブにサポートし、データフィードにエキサイティングな改善を提供します。この独自の側面は、最新のデータストリーミングシステムの次の要件を満たしています。
- 動的負荷バランス。 データはマイクロバッチに分割されるため、ボトルネックはもはや問題ではありません。従来のアーキテクチャは、一度に1つのレコードを処理し、計算量の多いパーティションが発生すると、そのノード上の他のすべてのデータをブロックします。 Spark Streamingを使用すると、タスクはワーカー間で分割され、使用可能なリソースに応じて、処理が長くなるタスクと短いタスクが処理されます。
- 障害の回復。 1つのノードで失敗したタスクは、他のワーカー間で離散化および分散されます。ワーカーノードが計算を実行している間、ストラグラーには回復する時間があります。
- インタラクティブな分析。 DStreamsは一連のRDDです。ワーカーメモリに保存されたストリーミングデータのバッチは、インタラクティブにクエリを実行します。
- 高度な分析。 DStreamsによって生成されたRDDは、SQLでクエリを実行するDataFrameに変換され、MLlibなどのライブラリに拡張されて、機械学習モデルを作成し、ストリーミングデータに適用します。
- ストリームパフォーマンスの向上。 バッチでストリーミングすると、スループットパフォーマンスが向上し、数百ミリ秒という短いレイテンシが活用されます。
SparkStreamingの利点と欠点
Spark Streamingを含むすべてのテクノロジーには、長所と短所があります。
長所 | 短所 |
複雑なタスクの卓越した速度パフォーマンス | 大量のメモリ消費 |
フォールトトレランス | 使いにくく、デバッグし、学ぶのが難しい |
クラウドクラスターに簡単に実装 | 十分に文書化されておらず、学習リソースが不足しています |
多言語サポート | データの視覚化が不十分 |
CassandraやMongoDBなどのビッグデータフレームワークの統合 | 少量のデータで遅い |
複数の種類のデータベースに参加する機能 | 機械学習アルゴリズムはほとんどありません |
Spark Streamingはどのように機能しますか?
Spark Streamingは、大規模で複雑なほぼリアルタイムの分析を扱います。分散ストリーム処理パイプラインは、次の3つのステップを経ます。
1.受信 ライブストリーミングソースからのストリーミングデータ。
2.プロセス 並列のクラスター上のデータ。
3.出力 処理されたデータをシステムに変換します。
SparkStreamingアーキテクチャ
Spark Streamingのコアアーキテクチャは、バッチの離散化されたストリーミングにあります。ストリーム処理パイプラインを一度に1レコード通過する代わりに、マイクロバッチは動的に割り当てられて処理されます。したがって、データは利用可能なリソースと地域に基づいてワーカーに転送されます。
データが到着すると、受信者はそれをRDDのパーティションに分割します。 RDDはSparkデータセットの基本的な抽象化であるため、RDDに変換すると、Sparkコードとライブラリを使用してバッチを処理できます。
Spark Streaming Sources
データストリームには、ソースから受信したデータが必要です。 Sparkストリーミングは、これらのソースを2つのカテゴリに分類します。
- 基本的な情報源。 ソケット接続やHDFSと互換性のあるファイルシステムなど、ストリーミングコアAPIで直接利用できるソース
- 高度なソース。 ソースにはリンクの依存関係が必要であり、KafkaやKinesisなどのストリーミングコアAPIでは利用できません。
各入力DStreamはレシーバーに接続します。データの並列ストリームの場合、複数のDStreamを作成すると、複数のレシーバーも生成されます。
Spark Streaming Operations
Spark Streamingには、さまざまな種類の操作の実行が含まれます。
1.変換操作 RDDに適用されるものと同様に、入力DStreamから受信したデータを変更します。変換操作は遅延評価され、データが出力に到達するまで実行されません。
2.出力操作 DStreamsをデータベースやファイルシステムなどの外部システムにプッシュします。外部システムに移動すると、変換操作がトリガーされます。
3.DataFrameおよびSQL操作 RDDをDataFrameに変換し、クエリを実行するための一時テーブルとして登録するときに発生します。
4.MLlib操作 次のような機械学習アルゴリズムを実行するために使用されます:
- ストリーミングアルゴリズム ストリーミング線形回帰やストリーミングk-meansなどのライブデータに適用します。
- オフラインアルゴリズム 履歴データを使用してオフラインでモデルを学習し、アルゴリズムをオンラインでストリーミングデータに適用するため。
SparkStreamingの例
ストリーミングの例の構造は次のとおりです。
アーキテクチャは2つの部分に分割され、2つのファイルから実行されます。
- 最初のファイルを実行する Twitter APIとの接続を確立し、TwitterAPIとSparkの間にソケットを作成します。ファイルを実行し続けます。
- 2番目のファイルを実行する データのストリーミングを要求して開始し、処理されたツイートをコンソールに印刷します。未処理の送信済みデータが最初のファイルに出力されます。
Twitterリスナーオブジェクトを作成する
TweetListener オブジェクトは、 StreamListenerを使用してTwitterストリームからのツイートをリッスンします tweepyから 。サーバー(ローカル)へのソケットでリクエストが行われると、 TweetListener データをリッスンし、ツイート情報(ツイートテキスト)を抽出します。拡張Tweetオブジェクトが使用可能な場合、TweetListenerは拡張をフェッチします。 フィールド、それ以外の場合は text フィールドがフェッチされます。最後に、リスナーは __ endを追加します 各ツイートの最後に。このステップは後で、Sparkでデータストリームをフィルタリングするのに役立ちます。
import tweepy
import json
from tweepy.streaming import StreamListener
class TweetListener(StreamListener):
# tweet object listens for the tweets
def __init__(self, csocket):
self.client_socket = csocket
def on_data(self, data):
try:
# Load data
msg = json.loads(data)
# Read extended Tweet if available
if "extended_tweet" in msg:
# Add "__end" at the end of each Tweet
self.client_socket\
.send(str(msg['extended_tweet']['full_text']+" __end")\
.encode('utf-8'))
print(msg['extended_tweet']['full_text'])
# Else read Tweet text
else:
# Add "__end" at the end of each Tweet
self.client_socket\
.send(str(msg['text']+"__end")\
.encode('utf-8'))
print(msg['text'])
return True
except BaseException as e:
print("error on_data: %s" % str(e))
return True
def on_error(self, status):
print(status)
return True
接続でエラーが発生した場合、コンソールは情報を出力します。
Twitter開発者の資格情報を収集する
Twitterの開発者ポータル TwitterとのAPI接続を確立するためのOAuthクレデンシャルが含まれています。情報はアプリケーションにありますキーとトークン タブ。
データを収集するには:
1.APIキーとシークレットを生成します コンシューマーキーにあります プロジェクトのセクションと情報を保存します:
コンシューマーキー ユーザー名など、自分の身元をTwitterに確認します。
2.アクセストークンとシークレットを生成します 認証トークンから セクションを作成して情報を保存します:
認証トークン Twitterから特定のデータを取得できるようにします。
TwitterAPIからソケットにデータを送信する
開発者の資格情報を使用して、 API_KEYに入力します 、 API_SECRET 、 ACCESS_TOKEN 、および ACCESS_SECRET TwitterAPIにアクセスするため。
関数sendData クライアントがリクエストを行うときにTwitterストリームを実行します。ストリームリクエストが最初に検証され、次にリスナーオブジェクトが作成され、キーワードと言語に基づいてストリームデータがフィルタリングされます。
例:
from tweepy import Stream
from tweepy import OAuthHandler
API_KEY = "api_key"
API_SECRET = "api_secret"
ACCESS_TOKEN = "access_token"
ACCESS_SECRET = "access_secret"
def sendData(c_socket, keyword):
print("Start sending data from Twitter to socket")
# Authentication based on the developer credentials from twitter
auth = OAuthHandler(API_KEY, API_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
# Send data from the Stream API
twitter_stream = Stream(auth, TweetListener(c_socket))
# Filter by keyword and language
twitter_stream.filter(track = keyword, languages=["en"])
サーバー上にリスニングTCPソケットを作成する
最初のファイルの最後の部分には、ローカルサーバーでのリスニングソケットの作成が含まれます。アドレスとポートはバインドされており、Sparkクライアントからの接続をリッスンします。
例:
import socket
if __name__ == "__main__":
# Create listening socket on server (local)
s = socket.socket()
# Host address and port
host = "127.0.0.1"
port = 5555
s.bind((host, port))
print("Socket is established")
# Server listens for connections
s.listen(4)
print("Socket is listening")
# Return the socket and the address of the client
c_socket, addr = s.accept()
print("Received request from: " + str(addr))
# Send data to client via socket for selected keyword
sendData(c_socket, keyword = ['covid'])
Sparkクライアントがリクエストを行うと、クライアントのソケットとアドレスがコンソールに出力されます。次に、選択したキーワードフィルタに基づいて、データストリームがクライアントに送信されます。
このステップで、最初のファイルのコードを終了します。実行すると、次の情報が出力されます。
ファイルを実行し続け、Sparkクライアントの作成に進みます。
SparkDStreamレシーバーを作成する
別のファイルで、1秒のバッチ間隔でSparkコンテキストとローカルストリーミングコンテキストを作成します。クライアントはホスト名とポートソケットから読み取ります。
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(appName="tweetStream")
# Create a local StreamingContext with batch interval of 1 second
ssc = StreamingContext(sc, 1)
# Create a DStream that conencts to hostname:port
lines = ssc.socketTextStream("127.0.0.1", 5555)
データの前処理
RDDの前処理には、受信したデータ行を __ endで分割することが含まれます。 が表示され、テキストが小文字に変わります。最初の10個の要素がコンソールに出力されます。
# Split Tweets
words = lines.flatMap(lambda s: s.lower().split("__end"))
# Print the first ten elements of each DStream RDD to the console
words.pprint()
コードを実行した後、評価が怠惰であるため、何も起こりません。ストリーミングコンテキストが開始すると、計算が開始されます。
ストリーミングコンテキストと計算を開始する
ストリーミングコンテキストを開始すると、ホストにリクエストが送信されます。ホストは収集したデータをTwitterからSparkクライアントに送り返し、クライアントはデータを前処理します。その後、コンソールは結果を出力します。
# Start computing
ssc.start()
# Wait for termination
ssc.awaitTermination()
ストリーミングコンテキストを開始すると、受信したリクエストが最初のファイルに出力され、生データテキストがストリーミングされます。
2番目のファイルはソケットから毎秒データを読み取り、前処理がデータに適用されます。接続が確立されるまで、最初の数行は空です。
ストリーミングコンテキストはいつでも終了する準備ができています。