Navigation

Contactez-nous

Kitpages
17 rue de la Frise
38000 Grenoble
tel : 04 58 00 33 81

Par Philippe Le Van (twitter accountplv) Dernière mise à jour : 14 January 2016

Introduction à spark et scala

Introduction

Ce tutoriel s'adresse à des gens qui ont une idée de ce que sont hadoop, spark et scala. Il se présente plus comme un aide mémoire des opérations usuelles.

Il s'appuie sur une distribution hadoop qu'on déploie avec Docker et que l'on trouve sur github :

https://github.com/kibatic/docker-single-node-hadoop

On utilise également Zeppelin. C'est un notebook qui permet d'exécuter des codes scala et de voir de façon conviviale les résultats de certaines commandes. Ca ressemble un peu au iPython Notebook.

Installation de l'environnement

# récupérer le docker
git clone https://github.com/kibatic/docker-single-node-hadoop.git
cd docker-single-node-hadoop
docker-compose build
docker-compose up -d

# entrer dans le docker
docker exec -ti dockersinglenodehadoop_hsn_1 bash

# aller voir zeppelin dans son navigateur
# aller sur http://localhost:8002

# volumes partagés par docker
# le répertoire docker_data du repository est monté sur /data à l'intérieur du docker

Quelques rappel des principes, les RDD dans spark

Spark manipule des RDD (Resilient Distributed Dataset). Des RDD sont des listes immutables.

Dans spark on enchaine des traitements sur des RDD pour obtenir de nouveaux RDD pour l'étape d'après. C'est par ces enchaînements (workflow) qu'on fait des opérations avec spark.

On a 2 types d'opérations sur le RDD :

  • Les transformations ne font que définir le workflow des RDD mais n'exécutent pas les calculs (sc.textFile, filter(), ...)
  • Les actions lancent les calculs proprement dit et renvoient un résultat (count(), saveAsTextFile,...)

Opérations usuelles

Créer un RDD à la main

// créer un RDD à la main
val lines = sc.parallelize(List("chien", "chat"))
// compter les lignes du RDD
lines.count()

lire un fichier présent dans HDFS

Dans le docker :

Créer un fichier texte chemin_faisant.txt contenant le texte suivant :

Marcheur, ce sont tes traces
ce chemin, et rien de plus ;
Marcheur, il n'y a pas de chemin,
Le chemin se construit en marchant.
En marchant se construit le chemin,
Et en regardant en arrière
On voit la sente que jamais
On ne foulera à nouveau.
Marcheur, il n'y a pas de chemin,
Seulement des sillages sur la mer.

Antonio Machado

# copier le fichier dans hdfs
hdfs dfs -put chemin_faisant.txt /tmp/chemin_faisant.txt

# vérifier qu'il a bien été transféré
hdfs dfs -put chemin_faisant.txt /tmp/chemin_faisant.txt

aller dans zeppelin à l'adresse http://localhost:8002 et créer un nouveau notebook.

Dans ce notebook, exécutez les commandes suivantes :

// créer un RDD avec le contenu du fichier.
// note : dans notre docker, localhost:9000 est le host et le port de hdfs.
var lines = sc.textFile("hdfs://localhost:9000/tmp/chemin_faisant.txt")

// compte le nombre d'élément du RDD
lines.count()

// renvoie la 1ère ligne du RDD
lines.first()

// prend les 5 premières lignes du RDD et les affiches les unes sous les autres
lines.take(5).foreach(println)

/* retour :
lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[15] at textFile at <console>:25
res22: Long = 12
res25: String = Marcheur, ce sont tes traces
Marcheur, ce sont tes traces 
ce chemin, et rien de plus ; 
Marcheur, il n'y a pas de chemin, 
Le chemin se construit en marchant. 
En marchant se construit le chemin,
*/

Filtrer des lignes

On va enchaîner 2 RDD

var lines = sc.textFile("hdfs://localhost:9000/tmp/chemin_faisant.txt")
var cheminLines = lines.filter(line => line.contains("chemin"))
lines.count()
cheminLines.count()

/** @returns
lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[19] at textFile at <console>:23
cheminLines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[20] at filter at <console>:25
res28: Long = 12
res29: Long = 5
*/

Compter les mots

val input = sc.textFile("hdfs://localhost:9000/tmp/chemin_faisant.txt")
val words = input.flatMap(line => line.split(" "))
val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}
counts.saveAsTextFile("/data/transfert/tuto_spark/word_count_result")
// lire les fichiers part-00000 et part-00001 pour voir le résultat du calcul

Mettre un RDD en cache

var lines = sc.textFile("hdfs://localhost:9000/tmp/chemin_faisant.txt")
var cheminLines = lines.filter(line => line.contains("chemin"))

// ici, on indique que cheminLines doit être enregistré en cache
// on a donc pas à le recalculer 2 fois pour le count() et pour le first()
cheminLines.persist

cheminLines.count()
cheminLines.first()

Unir 2 RDD : union

Warning, le processus ne dédoublonne pas. Certaines lignes se retrouvent 2 fois dans le RDD final.

var lines = sc.textFile("hdfs://localhost:9000/tmp/chemin_faisant.txt")
var cheminLines = lines.filter(line => line.contains("chemin"))
var marcheurLines = lines.filter(line => line.contains("Marcheur"))
var cheminOuMarcheurLines = cheminLines.union(marcheurLines)
cheminOuMarcheurLines.persist
cheminOuMarcheurLines.count()
cheminOuMarcheurLines.first()
cheminLines.count()
marcheurLines.count()

/* returns
res83: Long = 8 
res84: String = ce chemin, et rien de plus ; 
res85: Long = 5 
res86: Long = 3
*/

map : mapping simple, 1 élément vers 1 élément

val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x + x)
result.collect()
println(result.collect().mkString(","))

/* résultat
res128: Array[Int] = Array(2, 4, 6, 8)
2,4,6,8
*/

flatMap

val lines = sc.parallelize(List("hello world", "bonjour le monde"))
val words = lines.flatMap(line => line.split(" "))
words.collect()

/* renvoie :
res131: Array[String] = Array(hello, world, bonjour, le, monde) 
*/

Quelques fonctions utiles

// des fonctions simples
var rdd = rdd1.distinct()
var rdd = rdd1.union(rdd2)
var rdd = rdd1.intersection(rdd2)
var rdd = rdd1.subtract(rdd2)

// cartesian product
val letters = sc.parallelize(List("A", "B", "C"))
var digits = sc.parallelize(List(1, 2))
var rdd = letters.cartesian(digits)
rdd.collect()
// returns Array[(String, Int)] = Array((A,1), (A,2), (B,1), (B,2), (C,1), (C,2))

Extraction de sous-ensembles

sample permet de récupérer un sous ensemble

val sample = rdd.extract(withReplacement, fraction, [seed])

val lines = sc.textFile("hdfs://localhost:9000/tmp/chemin_faisant.txt")
val extract = lines.sample(false, 0.5)
extract.collect().foreach(println)
extract.count()
lines.count()

/* résultat non prédictible, mais la moitié de l'échantillon
Marcheur, ce sont tes traces
Le chemin se construit en marchant. 
Et en regardant en arrière 
On voit la sente que jamais 
Marcheur, il n'y a pas de chemin, 
Seulement des sillages sur la mer. 
Antonio Machado

6
12
*/

Les actions possibles

// réduction simple
val input = sc.parallelize(List(1, 2, 3, 4))
input.reduce((x,y) => x+y)
// renvoie 10

// fold : idem réduction mais avec une "zero value" qui est l'identité de votre opération
val input = sc.parallelize(List(1, 2, 3, 4))
input.fold(0)((x,y) => x+y)
// renvoie 10 aussi

// renvoie toute la collection
input.collect()

// on devine ce que ça fait
input.count()
input.countByValue()
input.take(3) // 3 elements
input.top(4) // 4 top elements
input.takeOrdered(5)(ordering) // 5 élément ordonnées suivant la fonction fournie
input.takeSample(false, 12) // 12 éléments au pif
input.foreach(func) // applique la func aux élements du RDD

// aggrégation
input.aggregate(zeroValue)(seqOp, combOp) 

input.mean() // moyenne

Retour sur la persistance

La persistance peut se faire en mémoire ou sur disque ou un peu des deux.

import org.apache.spark.storage.StorageLevel
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x + x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))

/*
5 types de persistance. Avec SER, c'est avec sérialisation. Ca prend plus de CPU et moins de place.
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK
MEMORY_AND_DISK_SER
DISK_ONLY
*/

Commentaires

Note : on ne peut plus ajouter de commentaire sur ce site