Skip to content
Snippets Groups Projects
Select Git revision
  • 19fd9496ad9485f2f9f32722e66edd243b7c6f09
  • main default protected
  • tests
3 results

phpstan.neon

Blame
  • average.py 1.42 KiB
    #!/usr/bin/env python3
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    MASTER = "local[*]"
    
    """
    Example: Calculates running averages of raw numerical data.
    """
    def main():
        sc = SparkContext(MASTER, "spark-dsp-wordcount")
        # Use "batches" of 2 seconds each
        ssc = StreamingContext(sc, 2)
        # This directory will be used to store checkpoints
        # Allows recovery from errors without data loss
        ssc.checkpoint("checkpoint/average")
    
        lines = ssc.textFileStream("numeric-data")
        results = (lines.map(lambda line: float(line))
            # Filtering operation: Only keep non-negative values
            .filter(lambda x: x >= 0)
            # Enhancement operation: Pair each value with "1" (necessary by aggregation below)
            .map(lambda x: (x, 1))
            # Aggregation operation: Calculate sums and counts per window
            # Uses a sliding window, i.e., values can be part of multiple windows
            .reduceByWindow(lambda aggr, x: (aggr[0] + x[0], aggr[1] + x[1]), None, windowDuration=10, slideDuration=4)
            # Alternative: Tumbling window (with windowDuration==slideDuration)
            #.reduceByWindow(lambda aggr, x: (aggr[0] + x[0], aggr[1] + x[1]), None, windowDuration=10, slideDuration=10)
            # Calculate the average per sliding window
            .map(lambda aggr: aggr[0] / aggr[1])
        )
        results.pprint()
    
        ssc.start()
        ssc.awaitTermination()
    
    if __name__ == "__main__":
        main()