Primeros pasos en Spark/Scala montado un contenedor Docker y un API REST
Los detalles se encuentran en: Repositorio del proyecto
El dataset u.data contiene estadísticas de películas vistas antes de la fecha 4/1998. Encontrar las 10 películas más vistas del dataset.
El dataset heart.csv contiene registros médicos de pruebas cardiovasculares. Encontrar el promedio de colesterol (columna chol) de las personas entre las edades de 40 y 50 años.
Resultados esperados:
* Especificar entorno de desarrollo utilizado.
* Especificar version de Spark y Scala.
* Archivo .scala con la solución de cada uno de los problemas (1 archivo por actividad).
* Muestra de ejecución (Mostrar resultados).
* Crear un archivo .jar para ejecución en un cluster de Spark.
Previo a contextualizar sobre Hadoop, Spark y Scala se realizó lo siguiente
- Probar Scala y SBT en Windows 10 junto con Scalafiddle
- Crear un entorno de desarrollo basado en Docker para Spark y Scala
- Aprender sobre SBT y SCALA realizando ejercicios sencillos sobre el lenguaje
- Crear un contenedor con SPARK listo para funcionar en clúster con un maestro y un esclavo
- Compilar un ejemplo de SPARK-SCALA para entender el ambiente y entorno
- Realizar los ejercicios
El procedimiento es el siguiente:
- SPARK_VERSION 2.4.1
- SCALA 2.11.12
- Java 1.8.0_222
Para obtener las últimas versiones, es necesario reconstruir con el DockerFile de este repositorio. Sin embargo, al ser un contenedor se puede desplegar en cualquier entorno con ambas configuraciones
El Dockerfile fue basado en gettyimages/spark y adaptado al ejercicio (https://hub.docker.com/r/gettyimages/spark/)
- Docker (https://www.docker.com/)
- Git (https://git-scm.com/)
Opcional para instalación en windows, ya que se puede trabajar con el contenedor unicamente
- Java 8 (https://adoptopenjdk.net/)
- Scala (https://www.scala-lang.org/download/)
- SBT (https://www.scala-sbt.org/download.html)
Visualizador de código usado
- VSCode (https://code.visualstudio.com/)
- IntelliJIDEA (https://www.jetbrains.com/es-es/idea/)
- El código se encuentra mapeado a la carpeta
/scala
Para probar scripts sencillos en Scala via web
1.- Descarga del repositorio y posicionarse en la carpeta
git clone https://github.com/eocode/spark-scala.git
cd spark-scala
Ahora, se pueden seguir dos caminos, si no importa contar con las últimas versiones ejecutar la versión rápida
Usando la imagen gettyimages/spark
2.- Descargar la imagen construida y lista para usar
docker pull gettyimages/spark
Levantar el entorno con un maestro y un esclavo
docker-compose up -d
docker ps
Después de esto el entorno se encontrará habilitado:
- Master: http://localhost:8080/
- Worker: http://localhost:8081/
3.- Ingresamos a la consola de comandos para ejecutar las tareas en Scala
docker exec -it docker-spark_master_1 bash
4.- Instalar Scala y el manejador de paquetes SBT en la maquina
apt-get update && curl -OL https://www.scala-lang.org/files/archive/scala-2.11.12.deb && dpkg -i scala-2.11.12.deb && rm -f scala-2.11.12.deb
apt-get update && curl -OL https://bintray.com/artifact/download/sbt/debian/sbt-1.3.8.deb && dpkg -i sbt-1.3.8.deb && rm -f sbt-1.3.8.deb
Listo el entorno está configurado y listo
Dependiendo de los recursos puede tardar un tiempo, por la descarga, instalación y construcción de la imagen Imagen preconfigurada con versiones actualizadas, se modifico el dockerfile
2.- Construir la imagen con el ambiente Spark
docker build -t spark-scala .
El comando anterior creara una imagen spark-scala con el S.O. debian con las configuraciones necesarias.
Para verificar
docker images
Levantar el entorno con un maestro y un esclavo. Editar antes el docker-compose con el nombre de la imagen spark-scala que se acaba de instalar
docker-compose up -d
docker ps
3.- Ingresamos a la consola de comandos para ejecutar las tareas en Scala
docker exec -it docker-spark_master_1 bash
Modificar el docker-compose con el nombre de la imagen spark-scala
antes de ejecutar los siguiente
Levantar el entorno con un maestro y un esclavo
docker-compose up -d
docker ps
Después de esto el entorno se encontrará habilitado:
- Master: http://localhost:8080/
- Worker: http://localhost:8081/
3.- Ingresamos a la consola de comandos para ejecutar las tareas en Scala
docker exec -it docker-spark_master_1 bash
Los archivos fuente se encuentran dentro de la carpeta /scala
- /code (contiene el código escrito en scala)
- /data (contiene las fuentes de datos)
Dentro del contenedor los archivos se encuentran en /home
Para ejecutar el ejemplo de spark
cd /home/code/example
sbt package
/usr/spark-2.4.1/bin/spark-submit --class "SimpleApp" --master local[4] target/scala-2.11/simple-project_2.11-1.0.jar
/usr/spark-2.4.1/bin/spark-shell --jars target/scala-2.11/simple-project_2.11-1.0.jar
El dataset u.data contiene estadísticas de películas vistas antes de la fecha 4/1998. Encontrar las 10 películas más vistas del dataset.
Asumiendo que cada fila es una reproducción de la pelicula, se tomaron como visitas los ids de usuario, obteniendo lo siguiente
// /home/code/movies/src/main/scala/Movies.scala
import scala.io.Source
import org.apache.spark.sql.SparkSession
import scala.reflect.io.Directory
import java.io.File
import scala.util.Try
object Movies {
def main(args: Array[String]) {
// Session Spark
val spark = SparkSession
.builder()
.appName("Movies")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// Run Task
runHeart(spark)
spark.stop()
}
private def runHeart(spark: SparkSession): Unit = {
import spark.implicits._
// Read CSV file
val readMoviesCSV = spark.read
.format("csv")
.option("sep", " ")
.option("inferSchema", "true")
.option("header", "true")
.load("/home/data/u.data")
// Remove parquet
val directory = new Directory(new File("/home/data/movies.parquet"))
if(directory.exists){
directory.deleteRecursively()
readMoviesCSV.write.parquet("/home/data/movies.parquet")
}else{
// Transform to Parket for columnar data
readMoviesCSV.write.parquet("/home/data/movies.parquet")
}
val parquetMoviesFile = spark.read.parquet("/home/data/movies.parquet")
parquetMoviesFile.createOrReplaceTempView("parquetMoviesFile")
val top = spark.sql(
"SELECT movie_id, count(user_id) views, count(distinct user_id) unique_views FROM parquetMoviesFile group by movie_id order by count(user_id) desc limit 10"
)
// Show query
top.select($"movie_id", $"views", $"unique_views").show()
}
}
Para ejecutar el código anterior:
cd /home/code/movies
sbt package
Compila el código y genera el .jar en target/scala-2.11/movies_2.11-1.0.jar
Para enviarlo a SPARK
/usr/spark-2.4.1/bin/spark-submit --class "Movies" --master local[4] target/scala-2.11/movies_2.11-1.0.jar
El dataset heart.csv contiene registros médicos de pruebas cardiovasculares. Encontrar el promedio de colesterol (columna chol) de las personas entre las edades de 40 y 50 años.
// /home/code/heart/src/main/scala/Heart.scala
import scala.io.Source
import org.apache.spark.sql.SparkSession
import scala.reflect.io.Directory
import java.io.File
import scala.util.Try
object HeartSpark {
def main(args: Array[String]) {
// Session Spark
val spark = SparkSession
.builder()
.appName("Heart")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// Run Task
runHeart(spark)
spark.stop()
}
private def runHeart(spark: SparkSession): Unit = {
import spark.implicits._
// Read CSV file
val readHeartCSV = spark.read
.format("csv")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.load("/home/data/heart.csv")
// Remove parquet
val directory = new Directory(new File("/home/data/heart.parquet"))
if(directory.exists){
directory.deleteRecursively()
readHeartCSV.write.parquet("/home/data/heart.parquet")
}else{
// Transform to Parket for columnar data
readHeartCSV.write.parquet("/home/data/heart.parquet")
}
val parquetHeartFile = spark.read.parquet("/home/data/heart.parquet")
parquetHeartFile.createOrReplaceTempView("parquetHeartFile")
val age = spark.sql(
"SELECT AVG(chol) as avg FROM parquetHeartFile where age BETWEEN 40 AND 50"
)
// Show query
age.select($"avg").show()
}
}
Para ejecutar el código anterior:
cd /home/code/heart
sbt package
Compila el código y genera el .jar en target/scala-2.11/heart_2.11-1.0.jar
Para enviarlo a SPARK
/usr/spark-2.4.1/bin/spark-submit --class "HeartSpark" --master local[4] target/scala-2.11/heart_2.11-1.0.jar
Para ejecutar usando spark-shell
/usr/spark-2.4.1/bin/spark-shell --jars target/scala-2.11/heart_2.11-1.0.jar
HeartSpark.main(Array())
La tarea se puede ver en http://localhost:4040
Al final se muestran algunas tareas completadas en Spark
Conocimiento adquirido para realizar el ejercicio