Contactez-nous
Kitpages
17 rue de la Frise
38000 Grenoble
tel : 04 58 00 33 81
Introduction à spark et scala
Introduction
Ce tutoriel s'adresse à des gens qui ont une idée de ce que sont hadoop, spark et scala. Il se présente plus comme un aide mémoire des opérations usuelles.
Il s'appuie sur une distribution hadoop qu'on déploie avec Docker et que l'on trouve sur github :
https://github.com/kibatic/docker-single-node-hadoop
On utilise également Zeppelin. C'est un notebook qui permet d'exécuter des codes scala et de voir de façon conviviale les résultats de certaines commandes. Ca ressemble un peu au iPython Notebook.
Installation de l'environnement
# récupérer le docker git clone https://github.com/kibatic/docker-single-node-hadoop.git cd docker-single-node-hadoop docker-compose build docker-compose up -d # entrer dans le docker docker exec -ti dockersinglenodehadoop_hsn_1 bash # aller voir zeppelin dans son navigateur # aller sur http://localhost:8002 # volumes partagés par docker # le répertoire docker_data du repository est monté sur /data à l'intérieur du docker
Quelques rappel des principes, les RDD dans spark
Spark manipule des RDD (Resilient Distributed Dataset). Des RDD sont des listes immutables.
Dans spark on enchaine des traitements sur des RDD pour obtenir de nouveaux RDD pour l'étape d'après. C'est par ces enchaînements (workflow) qu'on fait des opérations avec spark.
On a 2 types d'opérations sur le RDD :
- Les transformations ne font que définir le workflow des RDD mais n'exécutent pas les calculs (sc.textFile, filter(), ...)
- Les actions lancent les calculs proprement dit et renvoient un résultat (count(), saveAsTextFile,...)
Opérations usuelles
Créer un RDD à la main
// créer un RDD à la main val lines = sc.parallelize(List("chien", "chat")) // compter les lignes du RDD lines.count()
lire un fichier présent dans HDFS
Dans le docker :
Créer un fichier texte chemin_faisant.txt contenant le texte suivant :
Marcheur, ce sont tes traces
ce chemin, et rien de plus ;
Marcheur, il n'y a pas de chemin,
Le chemin se construit en marchant.
En marchant se construit le chemin,
Et en regardant en arrière
On voit la sente que jamais
On ne foulera à nouveau.
Marcheur, il n'y a pas de chemin,
Seulement des sillages sur la mer.
Antonio Machado
# copier le fichier dans hdfs hdfs dfs -put chemin_faisant.txt /tmp/chemin_faisant.txt # vérifier qu'il a bien été transféré hdfs dfs -put chemin_faisant.txt /tmp/chemin_faisant.txt
aller dans zeppelin à l'adresse http://localhost:8002 et créer un nouveau notebook.
Dans ce notebook, exécutez les commandes suivantes :
// créer un RDD avec le contenu du fichier. // note : dans notre docker, localhost:9000 est le host et le port de hdfs. var lines = sc.textFile("hdfs://localhost:9000/tmp/chemin_faisant.txt") // compte le nombre d'élément du RDD lines.count() // renvoie la 1ère ligne du RDD lines.first() // prend les 5 premières lignes du RDD et les affiches les unes sous les autres lines.take(5).foreach(println) /* retour : lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[15] at textFile at <console>:25 res22: Long = 12 res25: String = Marcheur, ce sont tes traces Marcheur, ce sont tes traces ce chemin, et rien de plus ; Marcheur, il n'y a pas de chemin, Le chemin se construit en marchant. En marchant se construit le chemin, */
Filtrer des lignes
On va enchaîner 2 RDD
var lines = sc.textFile("hdfs://localhost:9000/tmp/chemin_faisant.txt") var cheminLines = lines.filter(line => line.contains("chemin")) lines.count() cheminLines.count() /** @returns lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[19] at textFile at <console>:23 cheminLines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[20] at filter at <console>:25 res28: Long = 12 res29: Long = 5 */
Compter les mots
val input = sc.textFile("hdfs://localhost:9000/tmp/chemin_faisant.txt") val words = input.flatMap(line => line.split(" ")) val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y} counts.saveAsTextFile("/data/transfert/tuto_spark/word_count_result") // lire les fichiers part-00000 et part-00001 pour voir le résultat du calcul
Mettre un RDD en cache
var lines = sc.textFile("hdfs://localhost:9000/tmp/chemin_faisant.txt") var cheminLines = lines.filter(line => line.contains("chemin")) // ici, on indique que cheminLines doit être enregistré en cache // on a donc pas à le recalculer 2 fois pour le count() et pour le first() cheminLines.persist cheminLines.count() cheminLines.first()
Unir 2 RDD : union
Warning, le processus ne dédoublonne pas. Certaines lignes se retrouvent 2 fois dans le RDD final.
var lines = sc.textFile("hdfs://localhost:9000/tmp/chemin_faisant.txt") var cheminLines = lines.filter(line => line.contains("chemin")) var marcheurLines = lines.filter(line => line.contains("Marcheur")) var cheminOuMarcheurLines = cheminLines.union(marcheurLines) cheminOuMarcheurLines.persist cheminOuMarcheurLines.count() cheminOuMarcheurLines.first() cheminLines.count() marcheurLines.count() /* returns res83: Long = 8 res84: String = ce chemin, et rien de plus ; res85: Long = 5 res86: Long = 3 */
map : mapping simple, 1 élément vers 1 élément
val input = sc.parallelize(List(1, 2, 3, 4)) val result = input.map(x => x + x) result.collect() println(result.collect().mkString(",")) /* résultat res128: Array[Int] = Array(2, 4, 6, 8) 2,4,6,8 */
flatMap
val lines = sc.parallelize(List("hello world", "bonjour le monde")) val words = lines.flatMap(line => line.split(" ")) words.collect() /* renvoie : res131: Array[String] = Array(hello, world, bonjour, le, monde) */
Quelques fonctions utiles
// des fonctions simples var rdd = rdd1.distinct() var rdd = rdd1.union(rdd2) var rdd = rdd1.intersection(rdd2) var rdd = rdd1.subtract(rdd2) // cartesian product val letters = sc.parallelize(List("A", "B", "C")) var digits = sc.parallelize(List(1, 2)) var rdd = letters.cartesian(digits) rdd.collect() // returns Array[(String, Int)] = Array((A,1), (A,2), (B,1), (B,2), (C,1), (C,2))
Extraction de sous-ensembles
sample permet de récupérer un sous ensemble
val sample = rdd.extract(withReplacement, fraction, [seed])
val lines = sc.textFile("hdfs://localhost:9000/tmp/chemin_faisant.txt") val extract = lines.sample(false, 0.5) extract.collect().foreach(println) extract.count() lines.count() /* résultat non prédictible, mais la moitié de l'échantillon Marcheur, ce sont tes traces Le chemin se construit en marchant. Et en regardant en arrière On voit la sente que jamais Marcheur, il n'y a pas de chemin, Seulement des sillages sur la mer. Antonio Machado 6 12 */
Les actions possibles
// réduction simple val input = sc.parallelize(List(1, 2, 3, 4)) input.reduce((x,y) => x+y) // renvoie 10 // fold : idem réduction mais avec une "zero value" qui est l'identité de votre opération val input = sc.parallelize(List(1, 2, 3, 4)) input.fold(0)((x,y) => x+y) // renvoie 10 aussi // renvoie toute la collection input.collect() // on devine ce que ça fait input.count() input.countByValue() input.take(3) // 3 elements input.top(4) // 4 top elements input.takeOrdered(5)(ordering) // 5 élément ordonnées suivant la fonction fournie input.takeSample(false, 12) // 12 éléments au pif input.foreach(func) // applique la func aux élements du RDD // aggrégation input.aggregate(zeroValue)(seqOp, combOp) input.mean() // moyenne
Retour sur la persistance
La persistance peut se faire en mémoire ou sur disque ou un peu des deux.
import org.apache.spark.storage.StorageLevel val input = sc.parallelize(List(1, 2, 3, 4)) val result = input.map(x => x + x) result.persist(StorageLevel.DISK_ONLY) println(result.count()) println(result.collect().mkString(",")) /* 5 types de persistance. Avec SER, c'est avec sérialisation. Ca prend plus de CPU et moins de place. MEMORY_ONLY MEMORY_ONLY_SER MEMORY_AND_DISK MEMORY_AND_DISK_SER DISK_ONLY */
Commentaires
Note : on ne peut plus ajouter de commentaire sur ce site