Ruby-Spark
Apache Spark™ is a fast and general engine for large-scale data processing.
This Gem allows you use Spark functionality on Ruby.
Word count in Spark's Ruby API
file = spark.text_file("hdfs://...")
file.flat_map(:split)
.map(lambda{|word| [word, 1]})
.reduce_by_key(lambda{|a, b| a+b})
Installation
Requirments
- Java 7+
- Ruby 2+
- MRI or JRuby
Add this line to your application's Gemfile:
gem 'ruby-spark'
And then execute:
$ bundle
Or install it yourself as:
$ gem install ruby-spark
Run rake compile
if you are using gem from local filesystem.
Build Apache Spark
This command will download Spark and build extensions for this gem (SBT is used for compiling). For more informations check wiki. Everything is stored by default at [GEM_ROOT]/target,
$ ruby-spark build
Usage
You can use Ruby Spark via interactive shell (Pry is used)
$ ruby-spark shell
Or on existing project
require 'ruby-spark'
# Create a SparkContext
Spark.start
# Context reference
Spark.sc
If you want configure Spark first. See configurations for more details.
require 'ruby-spark'
# Use if you have custom SPARK_HOME
Spark.load_lib(spark_home)
# Configuration
Spark.config do
set_app_name "RubySpark"
set 'spark.ruby.serializer', 'oj'
set 'spark.ruby.serializer.batch_size', 100
end
# Start Apache Spark
Spark.start
Finally, to stop the cluster. On the shell is Spark stopped automatically when you exist.
Spark.stop
Creating RDD (upload data)
Single text file:
rdd = sc.text_file(FILE, workers_num, serializer=nil)
All files on directory:
rdd = sc.whole_text_files(DIRECTORY, workers_num, serializer=nil)
Direct uploading structures from ruby (choosen serializer must be able to serialize it):
rdd = sc.parallelize([1,2,3,4,5], workers_num, serializer=nil)
rdd = sc.parallelize(1..5, workers_num, serializer=nil)
Options
- workers_num
-
Min count of works computing this task.
(This value can be overwriten by spark) - serializer
-
Custom serializer.
(default: by spark.ruby.serializer options)
Operations
All operations can be divided into 2 groups:
- Transformations: append new operation to current RDD and return new
- Actions: add operation and start calculations
See wiki page or Ruby-doc for more details.
Transformations
rdd.map(lambda{|item| ...})
rdd.flat_map(lambda{|item| ...})
rdd.filter(lambda{|item| ...})
rdd.union(rdd)
rdd.map_paritions(lambda{|iterator| ...})
# ...
Actions
rdd.count
rdd.take(n)
rdd.collect
# ...
Examples
Sum of numbers
sc.parallelize(0..10).sum
# => 55
Words count using methods
# Content:
# "first line"
# "second line"
rdd = sc.text_file(PATH)
# ["first", "line", "second", "line"]
rdd = rdd.flat_map(lambda{|line| line.split})
# [["first", 1], ["line", 1], ["second", 1], ["line", 1]]
rdd = rdd.map(lambda{|word| [word, 1]})
# [["first", 1], ["line", 2], ["second", 1]]
rdd = rdd.reduce_by_key(lambda{|a, b| a+b})
# {"first"=>1, "line"=>2, "second"=>1}
rdd.collect_as_hash
Estimating PI with a custom serializer
slices = 3
n = 100000 * slices
def map(_)
x = rand * 2 - 1
y = rand * 2 - 1
if x**2 + y**2 < 1
return 1
else
return 0
end
end
rdd = Spark.context.parallelize(1..n, slices, serializer: 'oj')
rdd = rdd.map(method(:map))
puts 'Pi is roughly %f' % (4.0 * rdd.sum / n)
Estimating PI
rdd = sc.parallelize([10_000], 1)
rdd = rdd.add_library('bigdecimal/math')
rdd = rdd.map(lambda{|x| BigMath.PI(x)})
rdd.collect # => #<BigDecimal, '0.31415926...'>
Linear regression
# Import Mllib classes into Object
# Otherwise are accessible via Spark::Mllib::LinearRegressionWithSGD
Spark::Mllib.import(Object)
# Training data
data = [
LabeledPoint.new(0.0, [0.0]),
LabeledPoint.new(1.0, [1.0]),
LabeledPoint.new(3.0, [2.0]),
LabeledPoint.new(2.0, [3.0])
]
# Train a model
lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initial_weights: [1.0])
lrm.predict([0.0])