class: center, middle, inverse # Dask ## extending Python data tools for parallel and distributed computing Joris Van den Bossche - FOSDEM 2017 ??? https://github.com/jorisvandenbossche/SWSC2016-pandas-dask --- class: center, middle, inverse # Python's scientific/data tools ecosystem #### ## Thanks to Jake VanderPlas for the figure --- class: center, middle, bgheader background-image: url(img/JakeVdP-ecosystem3.svg) background-size: cover --- count: false class: center, middle, bgheader background-image: url(img/JakeVdP-ecosystem5.svg) background-size: cover --- ![:scale 100%](img/pandas_logo.svg) * Provides high-performance, easy-to-use data structures and tools * Widely used for doing practical data analysis in Python * Suited for tabular data (e.g. column data, spread-sheets, databases) ```python import pandas as pd df = pd.read_csv("myfile.csv") subset = df[df['value'] > 0] subset.groupby('key').mean() ``` --- class: center, inverse
## Python has a fast and pragmatic data science ecosystem -- ## ... restricted to in-memory and a single core --- class: center, middle, inverse ![:scale 70%](img/dask_horizontal_white.svg) # a flexible library for parallelism --- # Dask is ### A parallel computing framework ### Lets you work on larger-than-memory datasets ### Written in pure Python ### That leverages the excellent Python ecosystem ### Using blocked algorithms and task scheduling --- # Dask.array * Parallel and out-of-core array library * Mirrors NumPy interface * Coordinate many NumPy arrays into single logical Dask array .center[ ![:scale 60%](img/dask-array.svg) ] -- .pull-left[ ```python import numpy as np x = np.random.random(...) u, s, v = np.linalg.svd(x.dot(x.T)) ``` ] .pull-right[ ```python import dask.array as da x = da.random.random(..., chunks=(1000, 1000)) u, s, v = da.linalg.svd(x.dot(x.T)) ``` ] --- # Dask.dataframe .pull-left[ ![:scale 100%](img/dask-dataframe.svg) ] .pull-right[ * Parallel and out-of-core dataframe library * Mirrors the Pandas interface * Coordinates many Pandas DataFrames into single logical Dask DataFrame * Index is (optionally) sorted, allowing for optimizations ] --- # Dask.dataframe .pull-left[ ![:scale 100%](img/dask-dataframe.svg) ] .pull-right[ ```python import pandas as pd df = pd.read_csv('2015-01-01.csv') res = df.groupby('user_id').mean() ``` ```python import dask.dataframe as dd df = dd.read_csv('2015-*-*.csv') res = df.groupby('user_id').mean() res.compute() ``` ] --- class: center, middle # Complex graphs --- # ND-Array - sum ![:scale 100%](img/graph-array-sum.svg) ```python x = da.ones((15, 15), (5, 5)) x.sum(axis=0) ``` --- # ND-Array - matrix multiply ![:scale 100%](img/graph-array-xdotxT.svg) ```python x = da.ones((15, 15), (5, 5)) x.dot(x.T + 1) ``` --- # Efficient timeseries - resample ![:scale 100%](img/graph-resample.svg) ```python df.value.resample('1w').mean() ``` --- # Efficient rolling ![:scale 100%](img/graph-rolling.svg) ```python df.value.rolling(100).mean() ``` --- class: center, middle # Some problems don't fit well into collections --- # Dask Delayed * Tool for creating arbitrary task graphs * Dead simple interface (one function) ```python _ results = {} for a in A: for b in B: results[a, b] = fit(a, b) best = score(results) _ ``` --- count: false # Dask Delayed * Tool for creating arbitrary task graphs * Dead simple interface (one function) ```python from dask import delayed results = {} for a in A: for b in B: results[a, b] = delayed(fit)(a, b) best = delayed(score)(results) result = best.compute() ``` --- class: center, inverse, middle # Collections author task graphs ![:scale 100%](img/grid_search_schedule.gif) ## Now we need to run them efficiently --- #### ![:scale 90%](img/collections-schedulers.png) ### Collections build task graphs ### Schedulers execute task graphs -- ### Dask schedulers target different architectures ### Easy swapping enables scaling up and down --- # Single Machine Scheduler Optimized for larger-than-memory use * **Parallel CPU**: Uses multiple threads or processes * **Minimizes RAM**: Choose tasks to remove intermediates * **Low overhead**: ~100us per task * **Concise**: ~600 LOC, stable --- # Distributed Scheduler ![:scale 90%](img/network.svg) --- # Distributed Scheduler * **Distributed**: One scheduler coordinates many workers * **Data local**: Tries to moves computation to "best" worker * **Asynchronous**: Continuous non-blocking conversation * **Multi-user**: Several users can share the same system * **HDFS Aware**: Works well with HDFS, S3, etc.. * **Less Concise**: ~3000 LOC Tornado TCP application --- # Visual dashboards ![:scale 100%](img/daskboard.gif) --- # Visual dashboards Filtering + creating new column + groupby operation
--- # To summarise: Dask is Dynamic task scheduler for arbitrary computations * **Familiar**: Implements NumPy/Pandas interfaces * **Flexible**: Handles arbitrary task graphs efficiently (custom workloads, integration with other projects) * **Fast**: Optimized for demanding applications * **Scales up**: Runs resiliently on clusters with 1000s of cores * **Scales down**: Pragmatic on a laptop * **Responsive**: for interactive computing ### Dask ** *builds on* ** the existing Python ecosystem. --- class: center ### Acknowledgements: slides partly based on material from dask developers Matthew Rocklin and Jim Crist (Continuum Analytics) # http://dask.pydata.org ![:scale 90%](img/grid_search_schedule.gif) --- # About me - Researcher at Vrije Universiteit Brussel (VUB), and contractor for Continuum Analytics - PhD bio-science engineer, air quality research - pandas core dev https://github.com/jorisvandenbossche [@jorisvdbossche](https://twitter.com/jorisvdbossche)