{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "![](img/dask_horizontal.svg) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Карманная артиллерия датасаенса" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "У Dask хорошая документация http://dask.readthedocs.io/en/latest/\n", "\n", "Метью делает отличные слайды http://matthewrocklin.com/slides/dask-short.html\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## План сражения\n", "1. История вопроса\n", "2. Основные компоненты\n", "3. Примеры\n", "4. Экосистема\n", "5. See also - Blaze, Numba, Toolz" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Зачем\n", "\n", "\"Сообщество оказалось поймано в ловушку вычислений на одном процессоре и в памяти одного компьютера\".\n", "\n", "\"Медианный размер кластера Dask - один компьютер\".\n", "\n", "[Matthew Rocklin](https://matthewrocklin.com/)\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Проблемы Pandas:\n", "- данные должны умещаться в память\n", "- все работает в один поток. Параллельная обработка - руками" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Решение Dask\n", "* обертки для работы с данными, не умещающимися в память\n", "* автоматическая параллелизация вычислений.\n", "* просто" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## History\n", " * Parallel NumPy algorithms\n", " * Computational task scheduler (single machine)\n", " * Dataframes and Bags\n", " * Custom computations (dask.delayed)\n", " * Distributed scheduler\n", " * Asynchronous workflows (concurrent.futures)\n", " * Increased diversity of workloads\n", " * Auto-scaling\n", " * Multi-client collaboration\n", " * Other languages (Julia client exists)\n", " * Non-task-based APIs" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Батарейки прилагаются\n", " * Dask DataFrame - как pandas\n", " * Dask Array - как numpy\n", " * Dask Bag - как iterators, Toolz, PySpark\n", " * Dask-ML\n", " * distributed-joblib\n", " * dask-оптимизированные алгоритмы\n", " * dask-searchcv\n", " * dask-xgboost\n", " * dask-tensorflow\n", " * Dask-Distributed\n", " * Dask Delayed\n", " * concurrent.futures" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dask DataFrame\n", "```\n", "import pandas as pd import dask.dataframe as dd\n", "df = pd.read_csv('2015-01-01.csv') df = dd.read_csv('2015-*-*.csv')\n", "df.groupby(df.user_id).value.mean() df.groupby(df.user_id).value.mean().compute()\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dask Array\n", "```\n", "import numpy as np import dask.array as da\n", "f = h5py.File('myfile.hdf5') f = h5py.File('myfile.hdf5')\n", "x = np.array(f['/small-data']) x = da.from_array(f['/big-data'], chunks=(1000, 1000))\n", "x - x.mean(axis=1) x - x.mean(axis=1).compute()\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dask Bag\n", "```\n", "import dask.bag as db\n", "b = db.read_text('2015-*-*.json.gz').map(json.loads)\n", "b.pluck('name').frequencies().topk(10, lambda pair: pair[1]).compute()\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dask Delayed\n", "```\n", "from dask import delayed\n", "L = []\n", "for fn in filenames: \n", " data = delayed(load)(fn)\n", " L.append(delayed(process)(data)) \n", "\n", "result = delayed(summarize)(L)\n", "result.compute()\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## concurrent.futures\n", "```\n", "from dask.distributed import Client\n", "client = Client('scheduler:port')\n", "\n", "futures = []\n", "for fn in filenames:\n", " future = client.submit(load, fn)\n", " futures.append(future)\n", "\n", "summary = client.submit(summarize, futures)\n", "summary.result()\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Примеры" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import dask.dataframe as dd\n", "import dask\n", "import pandas as pd\n", "from IPython.display import display\n", "import glob" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "column_names = ['ip', 'ts', 'request', 'code', 'fsize', 't1', 't2', 'server']\n", "params = dict(sep='\\t', header=None, names=column_names, dtype={'t2': 'object'})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Есть тут у меня один файл" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "1860793 data2/fastimage.log.1\r\n" ] } ], "source": [ "!wc -l data2/fastimage.log.1" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "ip\n", "91.192.148.252 7950486721\n", "92.242.34.58 7960177558\n", "109.171.20.123 9410222194\n", "37.9.68.172 34845595493\n", "37.9.68.181 50381208617\n", "Name: fsize, dtype: int64\n", "CPU times: user 3.36 s, sys: 156 ms, total: 3.52 s\n", "Wall time: 3.52 s\n" ] } ], "source": [ "%%time\n", "df2 = pd.read_csv('data2/fastimage.log.1', **params)\n", "print(df2.groupby(by='ip')['fsize'].sum().sort_values().tail())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Кстати, что там? Логи запросов к серверу" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", " | ip | \n", "ts | \n", "request | \n", "code | \n", "fsize | \n", "t1 | \n", "t2 | \n", "server | \n", "
---|---|---|---|---|---|---|---|---|
0 | \n", "85.90.99.170 | \n", "[02/Apr/2018:06:25:28 +0300] | \n", "GET /download/catalog/ts161021_1.png HTTP/1.1 | \n", "200 | \n", "13881 | \n", "0.100 | \n", "0.100 | \n", "185.22.234.73:2085 | \n", "
1 | \n", "37.9.68.172 | \n", "[02/Apr/2018:06:25:28 +0300] | \n", "GET /download/catalog/tb298681_3.png?20180414 ... | \n", "200 | \n", "109503 | \n", "0.744 | \n", "0.744 | \n", "185.22.234.73:2086 | \n", "
2 | \n", "213.180.200.143 | \n", "[02/Apr/2018:06:25:28 +0300] | \n", "GET /download/catalog/tb500170_1.png?20180414 ... | \n", "200 | \n", "150901 | \n", "0.300 | \n", "0.300 | \n", "185.22.234.73:2086 | \n", "
3 | \n", "178.154.148.12 | \n", "[02/Apr/2018:06:25:28 +0300] | \n", "GET /download/catalog/tb155521_1.png?20180414 ... | \n", "200 | \n", "293674 | \n", "0.500 | \n", "0.500 | \n", "185.22.234.73:2086 | \n", "
4 | \n", "5.255.206.244 | \n", "[02/Apr/2018:06:25:28 +0300] | \n", "GET /download/catalog/tb304173_1.png?20180414 ... | \n", "200 | \n", "224071 | \n", "0.300 | \n", "0.300 | \n", "185.22.234.73:2086 | \n", "
Failed to display Jupyter Widget of type VBox
.
\n", " If you're reading this message in the Jupyter Notebook or JupyterLab Notebook, it may mean\n", " that the widgets JavaScript is still loading. If this message persists, it\n", " likely means that the widgets JavaScript library is either not installed or\n", " not enabled. See the Jupyter\n", " Widgets Documentation for setup instructions.\n", "
\n", "\n", " If you're reading this message in another frontend (for example, a static\n", " rendering on GitHub or NBViewer),\n", " it may mean that your frontend doesn't currently support widgets.\n", "
\n" ], "text/plain": [ "VBox(children=(HTML(value='Workers | 4 |
---|---|
Cores | 4 |
Memory | 33.61 GB |
Dashboard: http://127.0.0.1:8787/status
\\n')))" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "cluster = LocalCluster()\n", "cluster" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('INFO', 'distributed.scheduler - INFO - Clear task state'),\n", " ('INFO',\n", " 'distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:40133'),\n", " ('INFO',\n", " 'distributed.scheduler - INFO - bokeh at: 127.0.0.1:8787'),\n", " ('INFO', 'distributed.scheduler - INFO - Register tcp://127.0.0.1:44610'),\n", " ('INFO', 'distributed.scheduler - INFO - Register tcp://127.0.0.1:34695'),\n", " ('INFO',\n", " 'distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:44610'),\n", " ('INFO',\n", " 'distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:34695'),\n", " ('INFO', 'distributed.scheduler - INFO - Register tcp://127.0.0.1:34104'),\n", " ('INFO',\n", " 'distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:34104'),\n", " ('INFO', 'distributed.scheduler - INFO - Register tcp://127.0.0.1:36931'),\n", " ('INFO',\n", " 'distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:36931')]" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "cluster.scheduler.get_logs()" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n",
"Client\n", "
| \n",
"\n",
"Cluster\n", "
| \n",
"