Dask and local Kubernetes (minikube)

Motivation

I want to use Dask which implements Numpy and Pandas API to operate on data distributed over compute cluster Kubernetes. This approach enables easy scaling up of available resources on demand, using cloud services.

Setup

First,

sudo apt-get install qemu-kvm libvirt-clients libvirt-daemon-system
sudo adduser $USER libvirt
sudo adduser $USER libvirt-qemu

virsh --connect qemu:///system list --all # To check that all is fine

Now install single cluster node variant of kubernetes

curl -LO https://github.com/kubernetes/minikube/releases/download/v0.25.0/minikube_0.25-0.deb
dpkg — force-depends -i minikube_0.25–0.deb

curl -LO https://storage.googleapis.com/minikube/releases/latest/docker-machine-driver-kvm2 && chmod +x docker-machine-driver-kvm2 && sudo mv docker-machine-driver-kvm2 /usr/local/bin/

Start minikube with kvm

minikube --memory 8192 --cpus 2 start --vm-driver kvm2

Result is

Starting local Kubernetes v1.10.0 cluster...
Starting VM...
Getting VM IP address...
Moving files into cluster...
Setting up certs...
Connecting to cluster...
Setting up kubeconfig...
Starting cluster components...
Kubectl is now configured to use the cluster.
Loading cached images from config file.

Now minikube is ready, start dashboard

minikube dashboard

Currently we are using minikube cluster, but this can be switched in future

kubectl config get-contexts

Execute Python on cluster

  from dask_kubernetes import KubeCluster
  cluster = KubeCluster.from_yaml('worker-spec.yml')

  import dask.dataframe as dd
  df = dd.demo.make_timeseries('2000-01-01', '2000-12-31', freq='1s', partition_freq='1M',
                               dtypes={'name': str, 'id': int, 'x': float, 'y': float})

  df2 = df[df.y > 0]
  df3 = df2.groupby('name').x.std()
  # This runs on cluster
  computed_df = df3.compute()

 Share!

 
comments powered by Disqus