Introduction to PySpark on Docker

The readme at https://github.com/big-data-europe/docker-spark/tree/master/template/python claims an example application is coming soon, but that line was written 4 years ago so no time like the present to make my own!

I will assume you have:

  • docker installed
  • docker-machine installed and configured (I used version 0.10.0)
  • docker-compose installed (I used version 1.12.0)
  • Basic working knowledge of docker
  • Basic working knowledge of Spark

All of the files described can be found at https://github.com/willardmr/PySparkDockerExample/, so feel free to just pull them if you don’t want to follow along.

Let’s begin!

  1. Create a directory to hold your project. All the files we create will go in that directory.
  2. Create a file named entrypoint.py to hold your PySpark job. Mine counts the lines that contain occurrences of the word “the” in a file. I just picked a random file to run it on that was available in the docker container. Your file could look like:
    import pyspark
    
    sc = pyspark.SparkContext('local[*]')
    
    txt = sc.textFile('file:////usr//share//X11//Xcms.txt')
    
    python_lines = txt.filter(lambda line: 'the' in line.lower())
    
    print("The number of lines containing 'the' in your file is: ", python_lines.count())

    I took this example from https://realpython.com/pyspark-intro/ so go there if you want more explanation.

  3. Create a python requirements.txt file that contains the line:
    pyspark==2.4.5
  4. Now we can extend the template from https://github.com/big-data-europe/docker-spark/tree/master/template/python by creating a Dockerfile that contains:
    FROM bde2020/spark-python-template:2.4.0-hadoop2.7
    
    ENV SPARK_APPLICATION_PYTHON_LOCATION=/app/entrypoint.py
  5. So far we can run a PySpark job in Docker, but we don’t have a Spark cluster to run the job on. We can follow the docker-compose instructions from https://github.com/big-data-europe/docker-spark and add the PySpark job we just created. Our docker-compose.yml template ends up looking like:
    version: "2"
    services:
      spark-master:
        image: bde2020/spark-master:2.4.5-hadoop2.7
        container_name: spark-master
        ports:
          - "8080:8080"
          - "7077:7077"
        environment:
          - ENABLE_INIT_DAEMON=false
      spark-worker-1:
        image: bde2020/spark-worker:2.4.5-hadoop2.7
        container_name: spark-worker-1
        depends_on:
          - spark-master
        ports:
          - "8082:8081"
        environment:
          - "SPARK_MASTER=spark://spark-master:7077"
      spark-worker-2:
        image: bde2020/spark-worker:2.4.5-hadoop2.7
        container_name: spark-worker-2
        depends_on:
          - spark-master
        ports:
          - "8083:8081"
        environment:
          - "SPARK_MASTER=spark://spark-master:7077"
      py-spark:
        build: .
        depends_on:
          - spark-master
        environment:
          - ENABLE_INIT_DAEMON=false
          - "SPARK_MASTER=spark://spark-master:7077"

    Our py-spark task is built using the Dockerfile we wrote and will only start after spark-master is initialized. I don’t know anything about ENABLE_INIT_DAEMON=false so don’t even ask.

  6. Now lets run it! Execute
    docker-compose build && docker-compose run py-spark

    The output should look like this:pyspark_docker

    As you can see the PySpark job was distributed over the two workers and successfully printed the output!

  7. …Profit?

 

 

7 thoughts on “Introduction to PySpark on Docker

Add yours

  1. Hi Max, great article. Did you try maybe to use the pyspark.ml library, it throws an error ‘ModuleNotFoundError: No module named ‘numpy’? Do you know how to deal with this problem?

    1. Hi Nikola, great question. The solution to this problem should be as easy as adding numpy to the requirements.txt file but it is not. It looks like numpy relies on gcc and gcc isn’t installed on the spark image I am using so the installation of numpy fails. I figured out a workaround in a new repo: https://github.com/willardmr/PySparkMLDockerExample. The specific changes I had to make are here: https://github.com/willardmr/PySparkMLDockerExample/commit/09b946df2092e8aa79cc530223a6672b20b91a9c. I hope this helps!

      1. Thank you very much, it works great, it helped me a lot. I have another problem I came across, and that is adding jar files, when I tried to create an application with spark-streaming integration with Kafka. This is my docker-compose file:
        version: “3”
        services:
        # BigData2 – Python spark streaming
        streaming-consumer:
        build: ./streaming-consumer
        image: big-data_streaming-consumer:latest
        container_name: streaming-consumer
        depends_on:
        – spark-master
        – kafka
        – mongo
        – streaming-producer
        environment:
        SPARK_MASTER: spark://spark-master:7077
        MONGO_URL: mongo
        KAFKA_URL: kafka:9092
        SPARK_APPLICATION_ARGS: ”
        ENABLE_INIT_DAEMON: ‘false’
        streaming-producer:
        build: ./streaming-producer
        image: big-data_streaming-producer:latest
        container_name: streaming-producer
        depends_on:
        – spark-master
        – namenode
        – kafka
        volumes:
        – ./big-data/taxi-data.csv:/data/data.csv
        environment:
        KAFKA_URL: kafka:9092
        SPARK_APPLICATION_ARGS: ”
        ENABLE_INIT_DAEMON: ‘false’
        mongo:
        image: mongo
        restart: always
        environment:
        MONGO_INITDB_ROOT_USERNAME: root
        MONGO_INITDB_ROOT_PASSWORD: example
        volumes:
        – /my/own/datadir:/data/db
        ports:
        – 27017:27017
        mongo-express:
        image: mongo-express
        restart: always
        ports:
        – 8081:8081
        environment:
        ME_CONFIG_MONGODB_ADMINUSERNAME: root
        ME_CONFIG_MONGODB_ADMINPASSWORD: example
        depends_on:
        – mongo
        # KAFKA
        zookeeper:
        image: wurstmeister/zookeeper:3.4.6
        container_name: zookeeper
        ports:
        – “2181:2181”
        kafka:
        image: wurstmeister/kafka:2.12-2.4.0
        container_name: kafka
        expose:
        – “9092”
        environment:
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        KAFKA_ADVERTISED_HOST_NAME: kafka
        # SPARK
        spark-master:
        image: bde2020/spark-master:2.4.3-hadoop2.7
        container_name: spark-master
        ports:
        – “8080:8080”
        – “7077:7077”
        environment:
        – INIT_DAEMON_STEP=false
        spark-worker-1:
        image: bde2020/spark-worker:2.4.3-hadoop2.7
        container_name: spark-worker-1
        depends_on:
        – spark-master
        ports:
        – “8082:8081”
        environment:
        – “SPARK_MASTER=spark://spark-master:7077”
        spark-worker-2:
        image: bde2020/spark-worker:2.4.3-hadoop2.7
        container_name: spark-worker-2
        depends_on:
        – spark-master
        ports:
        – “8083:8081”
        environment:
        – “SPARK_MASTER=spark://spark-master:7077”
        # HADOOP
        namenode:
        image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8
        container_name: namenode
        ports:
        – 9870:9870
        – 9000:9000
        volumes:
        – hadoop_namenode:/hadoop/dfs/name
        – ./big-data/taxi-data.csv:/big-data/data.csv
        environment:
        – CLUSTER_NAME=test
        env_file:
        – ./hadoop.env
        datanode:
        image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8
        container_name: datanode
        ports:
        – 9864:9864
        volumes:
        – hadoop_datanode:/hadoop/dfs/data
        environment:
        SERVICE_PRECONDITION: “namenode:9870”
        env_file:
        – ./hadoop.env
        resourcemanager:
        image: bde2020/hadoop-resourcemanager:2.0.0-hadoop3.2.1-java8
        container_name: resourcemanager
        ports:
        – 8088:8088
        depends_on:
        – namenode
        – datanode
        environment:
        SERVICE_PRECONDITION: “namenode:9000 namenode:9870 datanode:9864”
        env_file:
        – ./hadoop.env
        nodemanager:
        image: bde2020/hadoop-nodemanager:2.0.0-hadoop3.2.1-java8
        container_name: nodemanager
        depends_on:
        – namenode
        – datanode
        – resourcemanager
        environment:
        SERVICE_PRECONDITION: “namenode:9000 namenode:9870 datanode:9864 resourcemanager:8088”
        env_file:
        – ./hadoop.env
        historyserver:
        image: bde2020/hadoop-historyserver:2.0.0-hadoop3.2.1-java8
        container_name: historyserver
        depends_on:
        – namenode
        – datanode
        – resourcemanager
        environment:
        SERVICE_PRECONDITION: “namenode:9000 namenode:9870 datanode:9864 resourcemanager:8088”
        volumes:
        – hadoop_historyserver:/hadoop/yarn/timeline
        env_file:
        – ./hadoop.env
        volumes:
        hadoop_namenode:
        hadoop_datanode:
        hadoop_historyserver:
        I tried adding jar files manually, but constantly I get java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaRDDPartition exception. This is dockerfile for streaming-consumer containter.
        FROM bde2020/spark-python-template:2.4.3-hadoop2.7
        ENV SPARK_APPLICATION_PYTHON_LOCATION=/app/kafka_consumer.py
        ENV PYSPARK_SUBMIT_ARGS=’–jars spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar pyspark-shell’
        #ENV SPARK_APPLICATION_ARGS “”
        # Copy in /spark/jars – spark 2.4.3 kafka 0.8 scala 2.11
        COPY spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar /spark/jars/spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar
        COPY spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar usr/lib/python3.7/site-packages/pyspark/jars/spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar
        I wanted to make a simple application in which the producer would read the data from the file system and the consumer would process the data using direct stream, and foreachRDD operation and place them in the mongodb database. I know kafka 0.8 is deprecated, but I don’t think there is any support for kafka 0.10 for python yet. I just can’t find a way to integrate spark streaming with kafka in pyspark. Do you have any idea about this problem maybe? Thanks in advance for your help, and you’ve already helped.

      2. Hi Nikola, what you are trying to do is complicated and based on what you included here I can’t tell what is going wrong. If you would like me to take a look I need a complete reproduction case that I can pull and run from github or some other public git repo to get the error you are seeing.

    1. After I add the environment variable “KAFKA_AUTO_CREATE_TOPICS_ENABLE: “true”” I am getting a different error “Couldn’t find leaders for Set([TAXI_TOPIC,0])”. I am having a hard time following what you are doing because it is a bit messy so I can’t really make more progress. If you can describe what you want I will consider writing a blog post with a simple implementation.

      1. I wanted to create a simple application that would mimic the producer/consumer pattern using spark streaming integration with Kafka or structured streaming integration with Kafka . The producer process reads line by line from the file and sends it to the Kafka topic, while the consumer reads the data from the topic in time windows, does some simple filtering and then stores data in some database. Either way thanks for the time you spent.

Leave a Reply

Up ↑

Discover more from Max Blog

Subscribe now to keep reading and get access to the full archive.

Continue reading