Skip to content
Snippets Groups Projects
Select Git revision
  • e79e9da0a9a65def3e73f546a89e38d0ac3cb1a8
  • master default protected
2 results

average.py

Blame
  • Sebastian Fellner's avatar
    Fellner, Sebastian authored
    e79e9da0
    History
    average.py 1.51 KiB
    #!/usr/bin/env python3
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    MASTER = "local[*]"
    
    """
    Example: Calculates running averages of raw numerical data.
    """
    def main():
        sc = SparkContext(MASTER, "spark-dsp-wordcount")
        # Use batches of 2 seconds each
        ssc = StreamingContext(sc, 2)
        # This directory will be used to store checkpoints
        # Allows recovery from errors without data loss
        ssc.checkpoint("checkpoint/average")
    
        # Create a stream based on raw data
        lines = ssc.textFileStream("numeric-data")
        results = (lines.map(lambda line: float(line))
            # Filtering operation: Only keep non-negative values
            .filter(lambda x: x >= 0)
            # Enhancement operation: Pair each value with "1" (necessary for aggregation below)
            .map(lambda x: (x, 1))
            # Aggregation operation: Calculate sums and counts per window
            # Uses a sliding window, i.e., values can be part of multiple windows
            .reduceByWindow(lambda aggr, x: (aggr[0] + x[0], aggr[1] + x[1]), None, windowDuration=10, slideDuration=4)
            # Alternative: Tumbling window (with windowDuration==slideDuration)
            #.reduceByWindow(lambda aggr, x: (aggr[0] + x[0], aggr[1] + x[1]), None, windowDuration=10, slideDuration=10)
            # Calculate the average per sliding window
            .map(lambda aggr: aggr[0] / aggr[1])
        )
        # Continuously print the resulting stream
        results.pprint()
    
        ssc.start()
        ssc.awaitTermination()
    
    if __name__ == "__main__":
        main()