SQL to Scalding¶
Motivation¶
SQL is a popular language for data analytics. Scalding is a relative newcomer that is more powerful and complex. The goal of this document is to translate commonly used SQL idioms to the Scalding type-safe API (which is preferred over the fields-based API). We are using Vertica SQL variant that is based on PSQL and has support for analytic functions. We have purposely picked trivial example datasets so that it is easy to experiment using the REPL and view intermediate results to get a better understanding of what each method does. More information on how to use the REPL is in [[Scalding REPL]] and Learning Scalding with Alice.
Prerequisites:
- Elementary knowledge of Scala
- Basic ability to decipher types in Scalding methods
You should not expect Scalding to be as intuitive as SQL, but at the same time it is not as hard as it may seem when you see the plethora of classes and methods in the Scalding docs.
To get a deeper understanding of monoids like QTree, please see Learning Algebird Monoids with REPL
import com.twitter.scalding._ import com.twitter.scalding.ReplImplicits._ import com.twitter.scalding.ReplImplicitContext._
Create datasets¶
SQL
CREATE TABLE test.allsales( state VARCHAR(20), name VARCHAR(20), sales INT ); INSERT INTO test.allsales VALUES('CA', 'A', 60); INSERT INTO test.allsales VALUES('CA', 'A', 20); INSERT INTO test.allsales VALUES('VA', 'B', 15); COMMIT; pwagle=> select * from test.allsales; state | name | sales -------+------+------- CA | A | 60 VA | B | 15 CA | A | 20 (3 rows)
Scalding
scala> case class Sale(state: String, name: String, sale: Int) defined class Sale scala> val salesList = List(Sale("CA", "A", 60), Sale("CA", "A", 20), Sale("VA", "B", 15)) salesList: List[Sale] = List(Sale(CA,A,60), Sale(CA,A,20), Sale(VA,B,15)) scala> val salesPipe = TypedPipe.from(salesList) salesPipe: com.twitter.scalding.typed.TypedPipe[Sale] = IterablePipe(List(Sale(CA,A,60), Sale(CA,A,20), Sale(VA,B,15)))
Simple Count¶
SQL
pwagle=> select count(1) from test.allsales; count ------- 3
Scalding
scala> salesPipe.groupAll.size.values.dump 3
Count distinct¶
SQL
pwagle=> select count(distinct state) from test.allsales; count ------- 2
Scalding
scala> salesPipe.map{ _.state }.distinct.groupAll.size.values.dump 2
Count, Count distinct, Sum in one query¶
SQL
pwagle=> select count(1), count(distinct state), sum(sales) from test.allsales; count | count | sum -------+-------+----- 3 | 2 | 95
Scalding
scala> { | salesPipe.map{x => (1, Set(x.state), x.sale) } | .groupAll | .sum | .values | .map{ case(count, set, sum) => (count, set.size, sum) } | .dump | } (3,2,95)
The above query will have performance issues if count(distinct state) is large. This can be solved in two ways:
- Group by state first (TODO)
- Using an approximate data structure like HyperLogLog (TODO)
Also see [[Aggregation using Algebird Aggregators]].
Where¶
SQL
select state, name, sales from test.allsales where state = 'CA';
Scalding
scala> salesPipe.filter(sale => (sale.state == "CA")).dump Sale(CA,A,60) Sale(CA,A,20)
Order by X, Y limit N¶
SQL
select state, name, sale from test.allsales order by state, name limit 1;
Scalding
object SaleOrderingWithState extends Ordering[Sale] { def compare(a: Sale, b: Sale) = a.state compare b.state } implicit val saleOrderingWithState = SaleOrderingWithState
scala> salesPipe.groupAll.sorted.values.dump Sale(CA,A,60) Sale(CA,A,20) Sale(VA,B,15) scala> salesPipe.groupAll.sorted.take(1).values.dump Sale(CA,A,20) scala> salesPipe.groupAll.sortedTake(1).values.dump List(Sale(CA,A,20))
Union¶
SQL
select state, name, sales from test.allsales UNION ALL select state, name, sales from test.allsales2
Scalding
scala> val salesPipe1 = TypedPipe.from(salesList) salesPipe1: com.twitter.scalding.typed.TypedPipe[Sale] = IterablePipe(List(Sale(CA,A,60), Sale(CA,A,20), Sale(VA,B,15))) scala> val salesPipe2 = TypedPipe.from(salesList) salesPipe2: com.twitter.scalding.typed.TypedPipe[Sale] = IterablePipe(List(Sale(CA,A,60), Sale(CA,A,20), Sale(VA,B,15))) scala> (salesPipe1 ++ salesPipe2).dump Sale(CA,A,60) Sale(CA,A,20) Sale(VA,B,15) Sale(CA,A,60) Sale(CA,A,20) Sale(VA,B,15)
Group and Aggregate¶
SQL
pwagle=> select state, count(1), count(distinct name), sum(sales) pwagle-> from test.allsales pwagle-> group by state; state | count | count | sum -------+-------+-------+----- CA | 2 | 1 | 80 VA | 1 | 1 | 15
Scalding
scala> { | salesPipe.map{ x => (x.state, (1, Set(x.name), x.sale)) } | .group | .sum | .dump | } (CA,(2,Set(A),80)) (VA,(1,Set(B),15)) scala> { | salesPipe.map{ x => (x.state, (1, Set(x.name), x.sale)) } | .group | .sum | .map{ case (state, (count, set, sum)) => (state, (count, set.size, sum))} | .dump | } (CA,(2,1,80)) (VA,(1,1,15))
Join¶
Scalding
case class Table1Row(field1: String, val1: Int) case class Table2Row(field2: String, val2: Int) val table1 = TypedPipe.from(List( Table1Row("a", 1), Table1Row("b", 2))) val table2 = TypedPipe.from(List( Table2Row("b", 3), Table2Row("c", 4))) val table1Group = table1.groupBy { _.field1 } val table2Group = table2.groupBy { _.field2 } val join = table1Group.join(table2Group)
scala> join.dump (b,(Table1Row(b,2),Table2Row(b,3)))
val leftJoin = table1Group.leftJoin(table2Group) val outerJoin = table1Group.outerJoin(table2Group)
scala> leftJoin.dump (a,(Table1Row(a,1),None)) (b,(Table1Row(b,2),Some(Table2Row(b,3)))) scala> outerJoin.dump (a,(Some(Table1Row(a,1)),None)) (b,(Some(Table1Row(b,2)),Some(Table2Row(b,3)))) (c,(None,Some(Table2Row(c,4))))
Histogram, Ntile¶
SQL
TODO
Scalding Histogram Fields-based Only
val inputTp: TypedPipe[Int] = TypedPipe.from(List(5, 2, 3, 3, 4, 4, 4, 1, 15, 30)) val p = inputTp.toPipe(('value)) val p1 = p.groupAll { group => group.histogram('value -> 'histogram) } .map('histogram -> ('min, 'q1, 'median, 'q3, 'max, 'mean)) { x: Histogram => (x.min, x.q1, x.median, x.q3, x.max, x.mean) } val outputTp = p1.toTypedPipe[(Double, Double, Double, Double, Double, Double)](('min, 'q1, 'median, 'q3, 'max, 'mean)) outputTp.dump (1.0,3.0,4.0,5.0,30.0,7.1)
Scalding QTree
val inputTp: TypedPipe[Int] = TypedPipe.from(List(5, 2, 3, 3, 4, 4, 4, 1, 15, 30)) implicit val qtSemigroup = new QTreeSemigroup[Long](6) val v = inputTp.map {x => QTree(x)}.groupAll.sum.values scala> val inputTp: TypedPipe[Int] = TypedPipe.from(List(5, 2, 3, 3, 4, 4, 4, 1, 15, 30)) inputTp: com.twitter.scalding.package.TypedPipe[Int] = IterablePipe(List(5, 2, 3, 3, 4, 4, 4, 1, 15, 30)) scala> val v = inputTp.map {x => QTree(x)}.groupAll.sum.values <console>:41: error: Cannot find Semigroup type class for com.twitter.algebird.QTree[Long] val v = inputTp.map {x => QTree(x)}.groupAll.sum.values ^ scala> implicit val qtSemigroup = new QTreeSemigroup[Long](6) qtSemigroup: com.twitter.algebird.QTreeSemigroup[Long] = com.twitter.algebird.QTreeSemigroup@4e92c2ed scala> val v = inputTp.map {x => QTree(x)}.groupAll.sum.values v: com.twitter.scalding.typed.TypedPipe[com.twitter.algebird.QTree[Long]] = com.twitter.scalding.typed.TypedPipeFactory@7925e180 scala> v.map { q => (q.count, q.upperBound, q.lowerBound, q.quantileBounds(.5), q.quantileBounds(.95)) }.dump (10,32.0,0.0,(4.0,5.0),(15.0,16.0))