SQL to Scalding

Motivation

SQL is a popular language for data analytics. Scalding is a relative newcomer that is more powerful and complex. The goal of this document is to translate commonly used SQL idioms to the Scalding type-safe API (which is preferred over the fields-based API). We are using Vertica SQL variant that is based on PSQL and has support for analytic functions. We have purposely picked trivial example datasets so that it is easy to experiment using the REPL and view intermediate results to get a better understanding of what each method does. More information on how to use the REPL is in [[Scalding REPL]] and Learning Scalding with Alice.

Prerequisites:

  • Elementary knowledge of Scala
  • Basic ability to decipher types in Scalding methods

You should not expect Scalding to be as intuitive as SQL, but at the same time it is not as hard as it may seem when you see the plethora of classes and methods in the Scalding docs.

To get a deeper understanding of monoids like QTree, please see Learning Algebird Monoids with REPL

import com.twitter.scalding._
import com.twitter.scalding.ReplImplicits._
import com.twitter.scalding.ReplImplicitContext._

Create datasets

SQL

CREATE TABLE test.allsales(
  state VARCHAR(20),
  name VARCHAR(20),
  sales INT
);

INSERT INTO test.allsales VALUES('CA', 'A', 60);
INSERT INTO test.allsales VALUES('CA', 'A', 20);
INSERT INTO test.allsales VALUES('VA', 'B', 15);
COMMIT;

pwagle=> select * from test.allsales;
 state | name | sales
-------+------+-------
 CA    | A    |    60
 VA    | B    |    15
 CA    | A    |    20
(3 rows)

Scalding

scala> case class Sale(state: String, name: String, sale: Int)
defined class Sale

scala> val salesList = List(Sale("CA", "A", 60), Sale("CA", "A", 20), Sale("VA", "B", 15))
salesList: List[Sale] = List(Sale(CA,A,60), Sale(CA,A,20), Sale(VA,B,15))

scala> val salesPipe = TypedPipe.from(salesList)
salesPipe: com.twitter.scalding.typed.TypedPipe[Sale] = IterablePipe(List(Sale(CA,A,60), Sale(CA,A,20), Sale(VA,B,15)))

Simple Count

SQL

pwagle=> select count(1) from test.allsales;
 count
-------
     3

Scalding

scala> salesPipe.groupAll.size.values.dump
3

Count distinct

SQL

pwagle=> select count(distinct state) from test.allsales;
 count
-------
     2

Scalding

scala> salesPipe.map{ _.state }.distinct.groupAll.size.values.dump
2

Count, Count distinct, Sum in one query

SQL

pwagle=> select count(1), count(distinct state), sum(sales) from test.allsales;
 count | count | sum
-------+-------+-----
     3 |     2 |  95

Scalding

scala> {
     |   salesPipe.map{x => (1, Set(x.state), x.sale) }
     |            .groupAll
     |            .sum
     |            .values
     |            .map{ case(count, set, sum) => (count, set.size, sum) }
     |            .dump
     | }
(3,2,95)

The above query will have performance issues if count(distinct state) is large. This can be solved in two ways:

  • Group by state first (TODO)
  • Using an approximate data structure like HyperLogLog (TODO)

Also see [[Aggregation using Algebird Aggregators]].

Where

SQL

select state, name, sales
from test.allsales
where
state = 'CA';

Scalding

scala> salesPipe.filter(sale => (sale.state == "CA")).dump
Sale(CA,A,60)
Sale(CA,A,20)

Order by X, Y limit N

SQL

select state, name, sale
from test.allsales
order by state, name
limit 1;

Scalding

object SaleOrderingWithState extends Ordering[Sale] {
  def compare(a: Sale, b: Sale) = a.state compare b.state
}

implicit val saleOrderingWithState = SaleOrderingWithState
scala> salesPipe.groupAll.sorted.values.dump
Sale(CA,A,60)
Sale(CA,A,20)
Sale(VA,B,15)

scala> salesPipe.groupAll.sorted.take(1).values.dump
Sale(CA,A,20)

scala> salesPipe.groupAll.sortedTake(1).values.dump
List(Sale(CA,A,20))

Union

SQL

select state, name, sales from test.allsales
UNION ALL
select state, name, sales from test.allsales2

Scalding

scala> val salesPipe1 = TypedPipe.from(salesList)
salesPipe1: com.twitter.scalding.typed.TypedPipe[Sale] = IterablePipe(List(Sale(CA,A,60), Sale(CA,A,20), Sale(VA,B,15)))

scala> val salesPipe2 = TypedPipe.from(salesList)
salesPipe2: com.twitter.scalding.typed.TypedPipe[Sale] = IterablePipe(List(Sale(CA,A,60), Sale(CA,A,20), Sale(VA,B,15)))

scala> (salesPipe1 ++ salesPipe2).dump
Sale(CA,A,60)
Sale(CA,A,20)
Sale(VA,B,15)
Sale(CA,A,60)
Sale(CA,A,20)
Sale(VA,B,15)

Group and Aggregate

SQL

pwagle=> select state, count(1), count(distinct name), sum(sales)
pwagle-> from test.allsales
pwagle-> group by state;
 state | count | count | sum
-------+-------+-------+-----
 CA    |     2 |     1 |  80
 VA    |     1 |     1 |  15

Scalding

scala> {
     |   salesPipe.map{ x => (x.state, (1, Set(x.name), x.sale)) }
     |            .group
     |            .sum
     |            .dump
     | }
(CA,(2,Set(A),80))
(VA,(1,Set(B),15))

scala> {
     |   salesPipe.map{ x => (x.state, (1, Set(x.name), x.sale)) }
     |      .group
     |      .sum
     |      .map{ case (state, (count, set, sum)) => (state, (count, set.size, sum))}
     |      .dump
     | }
(CA,(2,1,80))
(VA,(1,1,15))

Join

Scalding

case class Table1Row(field1: String, val1: Int)
case class Table2Row(field2: String, val2: Int)

val table1 = TypedPipe.from(List(
    Table1Row("a", 1),
    Table1Row("b", 2)))
val table2 = TypedPipe.from(List(
    Table2Row("b", 3),
    Table2Row("c", 4)))

val table1Group = table1.groupBy { _.field1 }
val table2Group = table2.groupBy { _.field2 }

val join = table1Group.join(table2Group)
scala> join.dump
(b,(Table1Row(b,2),Table2Row(b,3)))
val leftJoin = table1Group.leftJoin(table2Group)
val outerJoin = table1Group.outerJoin(table2Group)
scala> leftJoin.dump
(a,(Table1Row(a,1),None))
(b,(Table1Row(b,2),Some(Table2Row(b,3))))

scala> outerJoin.dump
(a,(Some(Table1Row(a,1)),None))
(b,(Some(Table1Row(b,2)),Some(Table2Row(b,3))))
(c,(None,Some(Table2Row(c,4))))

Histogram, Ntile

SQL

TODO

Scalding Histogram Fields-based Only

val inputTp: TypedPipe[Int] = TypedPipe.from(List(5, 2, 3, 3, 4, 4, 4, 1, 15, 30))
val p = inputTp.toPipe(('value))
val p1 = p.groupAll { group => group.histogram('value -> 'histogram) }
  .map('histogram -> ('min, 'q1, 'median, 'q3, 'max, 'mean)) {
     x: Histogram => (x.min, x.q1, x.median, x.q3, x.max, x.mean)
   }
val outputTp = p1.toTypedPipe[(Double, Double, Double, Double, Double, Double)](('min, 'q1, 'median, 'q3, 'max, 'mean))
outputTp.dump
(1.0,3.0,4.0,5.0,30.0,7.1)

Scalding QTree

val inputTp: TypedPipe[Int] = TypedPipe.from(List(5, 2, 3, 3, 4, 4, 4, 1, 15, 30))
implicit val qtSemigroup = new QTreeSemigroup[Long](6)
val v = inputTp.map {x => QTree(x)}.groupAll.sum.values

scala> val inputTp: TypedPipe[Int] = TypedPipe.from(List(5, 2, 3, 3, 4, 4, 4, 1, 15, 30))
inputTp: com.twitter.scalding.package.TypedPipe[Int] = IterablePipe(List(5, 2, 3, 3, 4, 4, 4, 1, 15, 30))

scala> val v = inputTp.map {x => QTree(x)}.groupAll.sum.values
<console>:41: error: Cannot find Semigroup type class for com.twitter.algebird.QTree[Long]
       val v = inputTp.map {x => QTree(x)}.groupAll.sum.values
                                                    ^

scala> implicit val qtSemigroup = new QTreeSemigroup[Long](6)
qtSemigroup: com.twitter.algebird.QTreeSemigroup[Long] = com.twitter.algebird.QTreeSemigroup@4e92c2ed

scala> val v = inputTp.map {x => QTree(x)}.groupAll.sum.values
v: com.twitter.scalding.typed.TypedPipe[com.twitter.algebird.QTree[Long]] = com.twitter.scalding.typed.TypedPipeFactory@7925e180

scala> v.map { q => (q.count, q.upperBound, q.lowerBound, q.quantileBounds(.5), q.quantileBounds(.95)) }.dump
(10,32.0,0.0,(4.0,5.0),(15.0,16.0))

TODO

Analytic / Window Functions (Rank, Ntile, Lag/Lead)

Running Total, Moving Average, Sessionization