Skip to content
Snippets Groups Projects
Select Git revision
  • 95205be4270d4cbc599b13fd790e2d67fc995dfc
  • main default protected
  • renovate/lock-file-maintenance
  • demo protected
  • person-select-custom
  • dbp-translation-component
  • icon-set-mapping
  • port-i18next-parser
  • remove-sentry
  • favorites-and-recent-files
  • revert-6c632dc6
  • lit2
  • advertisement
  • wc-part
  • automagic
  • publish
  • wip-cleanup
  • demo-file-handling
18 results

demo.js

Blame
  • average.py 1.42 KiB
    #!/usr/bin/env python3
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    MASTER = "local[*]"
    
    """
    Example: Calculates running averages of raw numerical data.
    """
    def main():
        sc = SparkContext(MASTER, "spark-dsp-wordcount")
        # Use "batches" of 2 seconds each
        ssc = StreamingContext(sc, 2)
        # This directory will be used to store checkpoints
        # Allows recovery from errors without data loss
        ssc.checkpoint("checkpoint/average")
    
        lines = ssc.textFileStream("numeric-data")
        results = (lines.map(lambda line: float(line))
            # Filtering operation: Only keep non-negative values
            .filter(lambda x: x >= 0)
            # Enhancement operation: Pair each value with "1" (necessary by aggregation below)
            .map(lambda x: (x, 1))
            # Aggregation operation: Calculate sums and counts per window
            # Uses a sliding window, i.e., values can be part of multiple windows
            .reduceByWindow(lambda aggr, x: (aggr[0] + x[0], aggr[1] + x[1]), None, windowDuration=10, slideDuration=4)
            # Alternative: Tumbling window (with windowDuration==slideDuration)
            #.reduceByWindow(lambda aggr, x: (aggr[0] + x[0], aggr[1] + x[1]), None, windowDuration=10, slideDuration=10)
            # Calculate the average per sliding window
            .map(lambda aggr: aggr[0] / aggr[1])
        )
        results.pprint()
    
        ssc.start()
        ssc.awaitTermination()
    
    if __name__ == "__main__":
        main()