diff --git a/average.py b/average.py index 6da298a4778c5920aa80076b8adad569eec3051d..fcbd1f09d3feec51737c914516a69c942f588d62 100755 --- a/average.py +++ b/average.py @@ -9,17 +9,18 @@ Example: Calculates running averages of raw numerical data. """ def main(): sc = SparkContext(MASTER, "spark-dsp-wordcount") - # Use "batches" of 2 seconds each + # Use batches of 2 seconds each ssc = StreamingContext(sc, 2) # This directory will be used to store checkpoints # Allows recovery from errors without data loss ssc.checkpoint("checkpoint/average") + # Create a stream based on raw data lines = ssc.textFileStream("numeric-data") results = (lines.map(lambda line: float(line)) # Filtering operation: Only keep non-negative values .filter(lambda x: x >= 0) - # Enhancement operation: Pair each value with "1" (necessary by aggregation below) + # Enhancement operation: Pair each value with "1" (necessary for aggregation below) .map(lambda x: (x, 1)) # Aggregation operation: Calculate sums and counts per window # Uses a sliding window, i.e., values can be part of multiple windows @@ -29,6 +30,7 @@ def main(): # Calculate the average per sliding window .map(lambda aggr: aggr[0] / aggr[1]) ) + # Continuously print the resulting stream results.pprint() ssc.start() diff --git a/wordcount.py b/wordcount.py index aaa899a7baf62ca3575c316a1aa414bd338e9345..ce105bbc014d627a368125bd86f743d6a94f476b 100755 --- a/wordcount.py +++ b/wordcount.py @@ -5,7 +5,7 @@ from pyspark.streaming import StreamingContext MASTER = "local[*]" """ -Example: Counts the number of word occurrences of a text input stream. +Example: Counts the number of occurrences for each word of a text input stream. """ def main(): sc = SparkContext(MASTER, "spark-dsp-wordcount")