The readme at https://github.com/big-data-europe/docker-spark/tree/master/template/python claims an example application is coming soon, but that line was written 4 years ago so no time like the present to make my own!
I will assume you have:
- docker installed
- docker-machine installed and configured (I used version 0.10.0)
- docker-compose installed (I used version 1.12.0)
- Basic working knowledge of docker
- Basic working knowledge of Spark
All of the files described can be found at https://github.com/willardmr/PySparkDockerExample/, so feel free to just pull them if you don’t want to follow along.
Let’s begin!
- Create a directory to hold your project. All the files we create will go in that directory.
- Create a file named entrypoint.py to hold your PySpark job. Mine counts the lines that contain occurrences of the word “the” in a file. I just picked a random file to run it on that was available in the docker container. Your file could look like:
import pyspark sc = pyspark.SparkContext('local[*]') txt = sc.textFile('file:////usr//share//X11//Xcms.txt') python_lines = txt.filter(lambda line: 'the' in line.lower()) print("The number of lines containing 'the' in your file is: ", python_lines.count())I took this example from https://realpython.com/pyspark-intro/ so go there if you want more explanation.
- Create a python requirements.txt file that contains the line:
pyspark==2.4.5
- Now we can extend the template from https://github.com/big-data-europe/docker-spark/tree/master/template/python by creating a Dockerfile that contains:
FROM bde2020/spark-python-template:2.4.0-hadoop2.7 ENV SPARK_APPLICATION_PYTHON_LOCATION=/app/entrypoint.py
- So far we can run a PySpark job in Docker, but we don’t have a Spark cluster to run the job on. We can follow the docker-compose instructions from https://github.com/big-data-europe/docker-spark and add the PySpark job we just created. Our docker-compose.yml template ends up looking like:
version: "2" services: spark-master: image: bde2020/spark-master:2.4.5-hadoop2.7 container_name: spark-master ports: - "8080:8080" - "7077:7077" environment: - ENABLE_INIT_DAEMON=false spark-worker-1: image: bde2020/spark-worker:2.4.5-hadoop2.7 container_name: spark-worker-1 depends_on: - spark-master ports: - "8082:8081" environment: - "SPARK_MASTER=spark://spark-master:7077" spark-worker-2: image: bde2020/spark-worker:2.4.5-hadoop2.7 container_name: spark-worker-2 depends_on: - spark-master ports: - "8083:8081" environment: - "SPARK_MASTER=spark://spark-master:7077" py-spark: build: . depends_on: - spark-master environment: - ENABLE_INIT_DAEMON=false - "SPARK_MASTER=spark://spark-master:7077"Our py-spark task is built using the Dockerfile we wrote and will only start after spark-master is initialized. I don’t know anything about ENABLE_INIT_DAEMON=false so don’t even ask.
- Now lets run it! Execute
docker-compose build && docker-compose run py-spark
The output should look like this:

As you can see the PySpark job was distributed over the two workers and successfully printed the output!
- …Profit?
Hi Max, great article. Did you try maybe to use the pyspark.ml library, it throws an error ‘ModuleNotFoundError: No module named ‘numpy’? Do you know how to deal with this problem?
Hi Nikola, great question. The solution to this problem should be as easy as adding numpy to the requirements.txt file but it is not. It looks like numpy relies on gcc and gcc isn’t installed on the spark image I am using so the installation of numpy fails. I figured out a workaround in a new repo: https://github.com/willardmr/PySparkMLDockerExample. The specific changes I had to make are here: https://github.com/willardmr/PySparkMLDockerExample/commit/09b946df2092e8aa79cc530223a6672b20b91a9c. I hope this helps!
Thank you very much, it works great, it helped me a lot. I have another problem I came across, and that is adding jar files, when I tried to create an application with spark-streaming integration with Kafka. This is my docker-compose file:
version: “3”
services:
# BigData2 – Python spark streaming
streaming-consumer:
build: ./streaming-consumer
image: big-data_streaming-consumer:latest
container_name: streaming-consumer
depends_on:
– spark-master
– kafka
– mongo
– streaming-producer
environment:
SPARK_MASTER: spark://spark-master:7077
MONGO_URL: mongo
KAFKA_URL: kafka:9092
SPARK_APPLICATION_ARGS: ”
ENABLE_INIT_DAEMON: ‘false’
streaming-producer:
build: ./streaming-producer
image: big-data_streaming-producer:latest
container_name: streaming-producer
depends_on:
– spark-master
– namenode
– kafka
volumes:
– ./big-data/taxi-data.csv:/data/data.csv
environment:
KAFKA_URL: kafka:9092
SPARK_APPLICATION_ARGS: ”
ENABLE_INIT_DAEMON: ‘false’
mongo:
image: mongo
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: example
volumes:
– /my/own/datadir:/data/db
ports:
– 27017:27017
mongo-express:
image: mongo-express
restart: always
ports:
– 8081:8081
environment:
ME_CONFIG_MONGODB_ADMINUSERNAME: root
ME_CONFIG_MONGODB_ADMINPASSWORD: example
depends_on:
– mongo
# KAFKA
zookeeper:
image: wurstmeister/zookeeper:3.4.6
container_name: zookeeper
ports:
– “2181:2181”
kafka:
image: wurstmeister/kafka:2.12-2.4.0
container_name: kafka
expose:
– “9092”
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: kafka
# SPARK
spark-master:
image: bde2020/spark-master:2.4.3-hadoop2.7
container_name: spark-master
ports:
– “8080:8080”
– “7077:7077”
environment:
– INIT_DAEMON_STEP=false
spark-worker-1:
image: bde2020/spark-worker:2.4.3-hadoop2.7
container_name: spark-worker-1
depends_on:
– spark-master
ports:
– “8082:8081”
environment:
– “SPARK_MASTER=spark://spark-master:7077”
spark-worker-2:
image: bde2020/spark-worker:2.4.3-hadoop2.7
container_name: spark-worker-2
depends_on:
– spark-master
ports:
– “8083:8081”
environment:
– “SPARK_MASTER=spark://spark-master:7077”
# HADOOP
namenode:
image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8
container_name: namenode
ports:
– 9870:9870
– 9000:9000
volumes:
– hadoop_namenode:/hadoop/dfs/name
– ./big-data/taxi-data.csv:/big-data/data.csv
environment:
– CLUSTER_NAME=test
env_file:
– ./hadoop.env
datanode:
image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8
container_name: datanode
ports:
– 9864:9864
volumes:
– hadoop_datanode:/hadoop/dfs/data
environment:
SERVICE_PRECONDITION: “namenode:9870”
env_file:
– ./hadoop.env
resourcemanager:
image: bde2020/hadoop-resourcemanager:2.0.0-hadoop3.2.1-java8
container_name: resourcemanager
ports:
– 8088:8088
depends_on:
– namenode
– datanode
environment:
SERVICE_PRECONDITION: “namenode:9000 namenode:9870 datanode:9864”
env_file:
– ./hadoop.env
nodemanager:
image: bde2020/hadoop-nodemanager:2.0.0-hadoop3.2.1-java8
container_name: nodemanager
depends_on:
– namenode
– datanode
– resourcemanager
environment:
SERVICE_PRECONDITION: “namenode:9000 namenode:9870 datanode:9864 resourcemanager:8088”
env_file:
– ./hadoop.env
historyserver:
image: bde2020/hadoop-historyserver:2.0.0-hadoop3.2.1-java8
container_name: historyserver
depends_on:
– namenode
– datanode
– resourcemanager
environment:
SERVICE_PRECONDITION: “namenode:9000 namenode:9870 datanode:9864 resourcemanager:8088”
volumes:
– hadoop_historyserver:/hadoop/yarn/timeline
env_file:
– ./hadoop.env
volumes:
hadoop_namenode:
hadoop_datanode:
hadoop_historyserver:
I tried adding jar files manually, but constantly I get java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaRDDPartition exception. This is dockerfile for streaming-consumer containter.
FROM bde2020/spark-python-template:2.4.3-hadoop2.7
ENV SPARK_APPLICATION_PYTHON_LOCATION=/app/kafka_consumer.py
ENV PYSPARK_SUBMIT_ARGS=’–jars spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar pyspark-shell’
#ENV SPARK_APPLICATION_ARGS “”
# Copy in /spark/jars – spark 2.4.3 kafka 0.8 scala 2.11
COPY spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar /spark/jars/spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar
COPY spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar usr/lib/python3.7/site-packages/pyspark/jars/spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar
I wanted to make a simple application in which the producer would read the data from the file system and the consumer would process the data using direct stream, and foreachRDD operation and place them in the mongodb database. I know kafka 0.8 is deprecated, but I don’t think there is any support for kafka 0.10 for python yet. I just can’t find a way to integrate spark streaming with kafka in pyspark. Do you have any idea about this problem maybe? Thanks in advance for your help, and you’ve already helped.
Hi Nikola, what you are trying to do is complicated and based on what you included here I can’t tell what is going wrong. If you would like me to take a look I need a complete reproduction case that I can pull and run from github or some other public git repo to get the error you are seeing.
Hi Max, thank you for response. Here is the link for repo https://github.com/bozin96/big-data, and data which I’m using can be downloaded from this link https://www.kaggle.com/kentonnlp/2014-new-york-city-taxi-trips. The problem is in running second project. Thanks again for the reply, and I hope you will find what is the problem.
After I add the environment variable “KAFKA_AUTO_CREATE_TOPICS_ENABLE: “true”” I am getting a different error “Couldn’t find leaders for Set([TAXI_TOPIC,0])”. I am having a hard time following what you are doing because it is a bit messy so I can’t really make more progress. If you can describe what you want I will consider writing a blog post with a simple implementation.
I wanted to create a simple application that would mimic the producer/consumer pattern using spark streaming integration with Kafka or structured streaming integration with Kafka . The producer process reads line by line from the file and sends it to the Kafka topic, while the consumer reads the data from the topic in time windows, does some simple filtering and then stores data in some database. Either way thanks for the time you spent.