Select Git revision
Fellner, Sebastian authored
average.py 1.51 KiB
#!/usr/bin/env python3
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
MASTER = "local[*]"
"""
Example: Calculates running averages of raw numerical data.
"""
def main():
sc = SparkContext(MASTER, "spark-dsp-wordcount")
# Use batches of 2 seconds each
ssc = StreamingContext(sc, 2)
# This directory will be used to store checkpoints
# Allows recovery from errors without data loss
ssc.checkpoint("checkpoint/average")
# Create a stream based on raw data
lines = ssc.textFileStream("numeric-data")
results = (lines.map(lambda line: float(line))
# Filtering operation: Only keep non-negative values
.filter(lambda x: x >= 0)
# Enhancement operation: Pair each value with "1" (necessary for aggregation below)
.map(lambda x: (x, 1))
# Aggregation operation: Calculate sums and counts per window
# Uses a sliding window, i.e., values can be part of multiple windows
.reduceByWindow(lambda aggr, x: (aggr[0] + x[0], aggr[1] + x[1]), None, windowDuration=10, slideDuration=4)
# Alternative: Tumbling window (with windowDuration==slideDuration)
#.reduceByWindow(lambda aggr, x: (aggr[0] + x[0], aggr[1] + x[1]), None, windowDuration=10, slideDuration=10)
# Calculate the average per sliding window
.map(lambda aggr: aggr[0] / aggr[1])
)
# Continuously print the resulting stream
results.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()