W RDD nie musimy przechowywać pojedynczych obiektów, ale możemy tam przekazywać pary obiektów, czyli tak zwane krotki lub z angielskiego tuple. Na takim RDD mamy dostęp do nowych metod usprawniających nam pracę z krotkami.
TWorzenie RDD par w programie sterownika
Scala:
val lines = Map((1,"Ala ma kota"), (2,"Witaj świecie"), (3,"Dwadzieścia tysięcy mil podmorskiej żeglugi"))
val rddPair = sc.parallelize(lines.toSeq)
rddPair.first
Python:
lines = [(1,"Ala ma kota"), (2,"Witaj świecie"), (3,"Dwadzieścia tysięcy mil podmorskiej żeglugi")]
rddPair = sc.parallelize(lines)
rddPair.first()
Transformacje na RDD par
Mapowanie wartości
Podobnie jak dla RDD z pojedynczym oboektem, RDD par udostępnia metodę map pozwalającą zmapować jedno RDD w drugie. Możemy także tej metody z powodzeniem użyć do stworzenia RDD par z zwykłego RDD jak w poniższym przykładzie gdzie mapujemy linie tekstu na krotki zawierające linię i jej długość:
Scala:
val lines = sc.textFile("README.md")
val pairs = lines.map(line => (line, line.split(" ").size))
pairs.take(10).foreach(println)
Python:
lines = sc.textFile("README.md")
pairs = lines.map(lambda line : (line, len(line.split(" "))))
pairs.take(10)
W przypadku RDD par możemy także skorzystać z metody map mapując całą krotkę na inny obiekt lub inną krotkę, ale możemy także skorzystać z funkcji mapValues oraz flatMapValues które to mapują nam same wartości z naszej krotki bez zmiany klucza, oraz w drugim przypadku dokonują także “spłaszczenia” listy tablic na jedną listę wszystkich obiektów, w tym przypadku RDD.
Scala:
val values = List(("Ala",3), ("Tomek", 4), ("Kasia", 5), ("Ala",5), ("Tomek", 3), ("Kasia", 4))
val valueRdd = sc.parallelize(values)
val countRdd = valueRdd.mapValues( v => (v, 1))
countRdd.take(10).foreach(println)
Python:
values = [("Ala",3), ("Tomek", 4), ("Kasia", 5), ("Ala",5), ("Tomek", 3), ("Kasia", 4)]
valueRdd = sc.parallelize(values)
countRdd = valueRdd.mapValues(lambda v : (v, 1))
countRdd.collect()
Redukcja wartości
Oprócz mapowania drugą bardzo popularną operacją jest agregacja której możemy dokonać za pomocą funkcji reduceByKey, combineByKey oraz groupByKey.
Funkcja groupByKey umożliwia nam zgrupowaniu wszystkich wartości występujących pod wspólnym kluczem i zwraca nowe RDD.
Python:
values = [("Ala",3), ("Tomek", 4), ("Kasia", 5), ("Ala",5), ("Tomek", 3), ("Kasia", 4)]
valueRdd = sc.parallelize(values)
group = valueRdd.groupByKey()
group.mapValues(list).collect()
Funkcja reduceByKey pozwala dodatkowo wskazać funkcję która zredukuje zgrupowane wartości do pojedynczego elementu:
Scala:
val values = List(("Ala",3), ("Tomek", 4), ("Kasia", 5), ("Ala",5), ("Tomek", 3), ("Kasia", 4))
val valueRdd = sc.parallelize(values)
val avgRdd = valueRdd.mapValues( v => (v, 1)).
reduceByKey( (x,y) => (x._1 + y._1, x._2 + y._2) ).
mapValues( x => (x._1.toDouble / x._2.toDouble) )
avgRdd.take(10).foreach(println)
Python:
values = [("Ala",3), ("Tomek", 4), ("Kasia", 5), ("Ala",5), ("Tomek", 3), ("Kasia", 4)]
valueRdd = sc.parallelize(values)
avgRdd = valueRdd.mapValues(lambda v : (v, 1)).reduceByKey(lambda x,y: (x[0] + y[0], x[1] + y[1]) ).mapValues(lambda x: ( float(x[0]) / float(x[1]) ) )
avgRdd.collect()
MapReduce w RDD
Scala:
val lines = sc.textFile("README.md")
val wordsRdd = lines.flatMap(line => line.split(" "))
val wordCount = wordsRdd.map(x => (x,1)).reduceByKey( (x,y) => x + y)
wordCount.sortByKey().take(10).foreach(println)
Python:
lines = sc.textFile("README.md")
wordsRdd = lines.flatMap(lambda line : line.split(" "))
wordCount = wordsRdd.map(lambda w: (w,1) ).reduceByKey(lambda x, y: x + y )
wordCount.sortByKey().take(10)
lub w znacznie prostszej wersji :)
Scala:
val lines = sc.textFile("README.md")
val wordsRdd = lines.flatMap(line => line.split(" "))
val wordCount = wordsRdd.countByValue()
Python:
lines = sc.textFile("README.md")
wordsRdd = lines.flatMap(lambda line : line.split(" "))
wordCount = wordsRdd.countByValue()
print(wordCount)
Sortowanie po kluczu
Python:
values = [("Ala",3), ("Tomek", 4), ("Kasia", 5), ("Ala",5), ("Tomek", 3), ("Kasia", 4)]
valueRdd = sc.parallelize(values)
sort = valueRdd.sortByKey()
sort.collect()
Zliczanie po kluczu
Python:
values = [("Ala",3), ("Tomek", 4), ("Kasia", 5), ("Ala",5), ("Tomek", 3), ("Kasia", 4)]
valueRdd = sc.parallelize(values)
count = valueRdd.countByKey()
print(count)