Skip to content
Snippets Groups Projects
Commit 175c4eab authored by Fellner, Sebastian's avatar Fellner, Sebastian
Browse files

Move work

parents
No related branches found
No related tags found
No related merge requests found
checkpoint/
*.swp
# Code samples for Data Stream Processing
## Install dependencies
```
pip3 -r requirements.txt
```
## Run the Spark application
```
python3 average.py
```
## Feed the example data into the application
```
./send-data.sh
```
#!/usr/bin/env python3
from pyspark.sql import SparkSession
from pyspark.sql.streaming import StructType
MASTER = "local[*]"
"""
Example: Calculates the average of a numeric input stream.
"""
def main():
spark = (SparkSession.builder
.appName("dsp-average-sql")
.getOrCreate())
# Load data from directory
lines = (spark.readStream
.schema(StructType().add("line", "string"))
.text("numeric-data"))
# Cast string data to floats
values = lines.selectExpr("CAST(line AS REAL)")
# Register the resulting stream as an SQL view
values.createOrReplaceTempView("values")
# Run a continuous query on top of the stream (calculating the average per window)
results = spark.sql("select avg(*) from values")
# Print the results to the console
query = results.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
if __name__ == "__main__":
main()
#!/usr/bin/env python3
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
MASTER = "local[*]"
"""
Example: Calculates running averages of raw numerical data.
"""
def main():
sc = SparkContext(MASTER, "spark-dsp-wordcount")
# Use "batches" of 2 seconds each
ssc = StreamingContext(sc, 2)
# This directory will be used to store checkpoints
# Allows recovery from errors without data loss
ssc.checkpoint("checkpoint/average")
lines = ssc.textFileStream("numeric-data")
results = (lines.map(lambda line: float(line))
# Filtering operation: Only keep non-negative values
.filter(lambda x: x >= 0)
# Enhancement operation: Pair each value with "1" (necessary by aggregation below)
.map(lambda x: (x, 1))
# Aggregation operation: Calculate sums and counts per window
# Uses a sliding window, i.e., values can be part of multiple windows
.reduceByWindow(lambda aggr, x: (aggr[0] + x[0], aggr[1] + x[1]), None, windowDuration=10, slideDuration=4)
# Alternative: Tumbling window (with windowDuration==slideDuration)
#.reduceByWindow(lambda aggr, x: (aggr[0] + x[0], aggr[1] + x[1]), None, windowDuration=10, slideDuration=10)
# Calculate the average per sliding window
.map(lambda aggr: aggr[0] / aggr[1])
)
results.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
1
3
4
-5.1
59.2
234
43
40
30
-200
pyspark==3.2.0
#!/bin/sh
touch numeric-data/1.txt
sleep 5
touch numeric-data/2.txt
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.
Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.
Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.
Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.
#!/usr/bin/env python3
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
MASTER = "local[*]"
"""
Example: Counts the number of word occurrences of a text input stream.
"""
def main():
sc = SparkContext(MASTER, "spark-dsp-wordcount")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint/wordcount")
lines = ssc.textFileStream("text-data")
wordCounts = (
# Each data point of the input data is turned into (possibly) multiple data points in the stream
lines.flatMap(lambda line: line.split(" "))
# Enhance stream with constant "1" (necessary for counting)
.map(lambda word: (word, 1))
# Aggregate the counts per word and per tumbling window
.reduceByKey(lambda x, y: x + y)
)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment