Calling Scalding from Inside your Application¶
Starting in scalding 0.12, there is a clear API for doing this. See Execution[T]
, which describes a set of map/reduce operations that when executed return a Future[T]
. See the scaladocs for Execution. Below is an example.
val job: Execution[Unit] = TypedPipe.from(TextLine("input")) .flatMap(_.split("\\s+")) .map { word => (word, 1L) } .sumByKey .writeExecution(TypedTsv("output")) // Now we run it in Local mode val u: Unit = job.waitFor(Config.default, Local(true)) // Or for Hadoop: val jobConf = new JobConf val u: Unit = job.waitFor(Config.hadoopWithDefaults(jobConf), Hdfs(true, jobConf)) // If you want to be asynchronous, use run instead of waitFor and get a Future in return
For testing or cases where you aggregate data down to a manageable level, .toIterableExecution
on TypedPipe is very useful:
val job: Execution[Iterable[(String, Long)]] = TypedPipe.from(TextLine("input")) .flatMap(_.split("\\s+")) .map { word => (word, 1L) } .sumByKey .toIterableExecution // Now we run it in Local mode val counts: Map[String, Long] = job.waitFor(Config.default, Local(true)).toMap
To run an Execution as a stand-alone job, see:
- ExecutionApp Make an
object MyExJob extends ExecutionApp
for a job you can run like a normal java application (by using java on the classname). - ExecutionJob - use this only if you have an existing tooling around launching scalding.Job subclasses.
Some rules¶
- When using Execution NEVER use
.write
or.toPipe
(or call any method that takes an implicit flowDef). Instead use.writeExecution
,.toIterableExecution
, or.forceToDiskExecution
. (see scaladocs). - Avoid calling
.waitFor
or.run
AS LONG AS POSSIBLE. Try to compose your entire job into on large Execution using.zip
or.flatMap
to combineExecutions
.waitFor
is the same asrun
except it waits on the future. There should be at most 1 calling to .waitFor or .run in each Execution App/Job. - Only mutate vars or perform side effects using
.onComplete
. If yourun
the result ofonComplete
, your function you pass will be run when the result up to that point is available and you will get theTry[T]
for the result. Avoid this if possible. It is here to deal with external IO, or existing APIs, and designed for experts that are comfortable using .onComplete on scala Futures (which is all this method is doing under the covers).
Running Existing Jobs Inside A Library¶
We recommend the above approach to build composable jobs with Executions. But if you have an existing Job, you can also run that:
Working example:¶
WordCountJob.scala
class WordCountJob(args: Args) extends Job(args) { TextLine(args("input")) .read .flatMap('line -> 'word) { line: String => line.split("\\s+") } .groupBy('word) { _.size } .write(Tsv(args("output"))) }
Runner.scala
object Runner extends App { val hadoopConfiguration: Configuration = new Configuration hadoopConfiguration.set("mapred.job.tracker","hadoop-master:8021") hadoopConfiguration.set("fs.defaultFS","hdfs://hadoop-master:8020") val hdfsMode = Hdfs(strict = true, hadoopConfiguration) val arguments = Mode.putMode(hdfsMode, Args("--input in.txt --output counts.tsv")) // Now create the job after the mode is set up properly. val job: WordCountJob = new WordCountJob(arguments) val flow = job.buildFlow flow.complete() }
And then you can run your App on any server, that have access to Hadoop cluster