Skip to content
Snippets Groups Projects
Commit e79e9da0 authored by Fellner, Sebastian's avatar Fellner, Sebastian
Browse files

Small tweaks

parent 175c4eab
Branches
No related tags found
No related merge requests found
......@@ -9,17 +9,18 @@ Example: Calculates running averages of raw numerical data.
"""
def main():
sc = SparkContext(MASTER, "spark-dsp-wordcount")
# Use "batches" of 2 seconds each
# Use batches of 2 seconds each
ssc = StreamingContext(sc, 2)
# This directory will be used to store checkpoints
# Allows recovery from errors without data loss
ssc.checkpoint("checkpoint/average")
# Create a stream based on raw data
lines = ssc.textFileStream("numeric-data")
results = (lines.map(lambda line: float(line))
# Filtering operation: Only keep non-negative values
.filter(lambda x: x >= 0)
# Enhancement operation: Pair each value with "1" (necessary by aggregation below)
# Enhancement operation: Pair each value with "1" (necessary for aggregation below)
.map(lambda x: (x, 1))
# Aggregation operation: Calculate sums and counts per window
# Uses a sliding window, i.e., values can be part of multiple windows
......@@ -29,6 +30,7 @@ def main():
# Calculate the average per sliding window
.map(lambda aggr: aggr[0] / aggr[1])
)
# Continuously print the resulting stream
results.pprint()
ssc.start()
......
......@@ -5,7 +5,7 @@ from pyspark.streaming import StreamingContext
MASTER = "local[*]"
"""
Example: Counts the number of word occurrences of a text input stream.
Example: Counts the number of occurrences for each word of a text input stream.
"""
def main():
sc = SparkContext(MASTER, "spark-dsp-wordcount")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment