Ruby-Spark

Apache Spark™ is a fast and general engine for large-scale data processing.

This Gem allows you use Spark functionality on Ruby.

Word count in Spark's Ruby API

file = spark.text_file("hdfs://...")

file.flat_map(:split)
    .map(lambda{|word| [word, 1]})
    .reduce_by_key(lambda{|a, b| a+b})

Installation

Requirments

  • Java 7+
  • Ruby 2+
  • MRI or JRuby

Add this line to your application's Gemfile:

gem 'ruby-spark'

And then execute:

$ bundle

Or install it yourself as:

$ gem install ruby-spark

Run rake compile if you are using gem from local filesystem.

Build Apache Spark

This command will download Spark and build extensions for this gem (SBT is used for compiling). For more informations check wiki. Everything is stored by default at [GEM_ROOT]/target,

$ ruby-spark build

Usage

You can use Ruby Spark via interactive shell (Pry is used)

$ ruby-spark shell

Or on existing project

require 'ruby-spark'

# Create a SparkContext
Spark.start

# Context reference
Spark.sc

If you want configure Spark first. See configurations for more details.

require 'ruby-spark'

# Use if you have custom SPARK_HOME
Spark.load_lib(spark_home)

# Configuration
Spark.config do
   set_app_name "RubySpark"
   set 'spark.ruby.serializer', 'oj'
   set 'spark.ruby.serializer.batch_size', 100
end

# Start Apache Spark
Spark.start

Finally, to stop the cluster. On the shell is Spark stopped automatically when you exist.

Spark.stop

Creating RDD (upload data)

Single text file:

rdd = sc.text_file(FILE, workers_num, serializer=nil)

All files on directory:

rdd = sc.whole_text_files(DIRECTORY, workers_num, serializer=nil)

Direct uploading structures from ruby (choosen serializer must be able to serialize it):

rdd = sc.parallelize([1,2,3,4,5], workers_num, serializer=nil)
rdd = sc.parallelize(1..5, workers_num, serializer=nil)

Options

workers_num
Min count of works computing this task.
(This value can be overwriten by spark)
serializer
Custom serializer.
(default: by spark.ruby.serializer options)

Operations

All operations can be divided into 2 groups:

  • Transformations: append new operation to current RDD and return new
  • Actions: add operation and start calculations

See wiki page or Ruby-doc for more details.

Transformations

rdd.map(lambda{|item| ...})
rdd.flat_map(lambda{|item| ...})
rdd.filter(lambda{|item| ...})
rdd.union(rdd)
rdd.map_paritions(lambda{|iterator| ...})
# ...

Actions

rdd.count
rdd.take(n)
rdd.collect
# ...

Examples

Sum of numbers

sc.parallelize(0..10).sum
# => 55

Words count using methods

# Content:
# "first line"
# "second line"
rdd = sc.text_file(PATH)

# ["first", "line", "second", "line"]
rdd = rdd.flat_map(lambda{|line| line.split})

# [["first", 1], ["line", 1], ["second", 1], ["line", 1]]
rdd = rdd.map(lambda{|word| [word, 1]})

# [["first", 1], ["line", 2], ["second", 1]]
rdd = rdd.reduce_by_key(lambda{|a, b| a+b})

# {"first"=>1, "line"=>2, "second"=>1}
rdd.collect_as_hash

Estimating PI with a custom serializer

slices = 3
n = 100000 * slices

def map(_)
  x = rand * 2 - 1
  y = rand * 2 - 1

  if x**2 + y**2 < 1
    return 1
  else
    return 0
  end
end

rdd = Spark.context.parallelize(1..n, slices, serializer: 'oj')
rdd = rdd.map(method(:map))

puts 'Pi is roughly %f' % (4.0 * rdd.sum / n)

Estimating PI

rdd = sc.parallelize([10_000], 1)
rdd = rdd.add_library('bigdecimal/math')
rdd = rdd.map(lambda{|x| BigMath.PI(x)})
rdd.collect # => #<BigDecimal, '0.31415926...'>

Linear regression

# Import Mllib classes into Object
# Otherwise are accessible via Spark::Mllib::LinearRegressionWithSGD
Spark::Mllib.import(Object)

# Training data
data = [
  LabeledPoint.new(0.0, [0.0]),
  LabeledPoint.new(1.0, [1.0]),
  LabeledPoint.new(3.0, [2.0]),
  LabeledPoint.new(2.0, [3.0])
]

# Train a model
lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initial_weights: [1.0])

lrm.predict([0.0])