Class: Spark::RDD

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Helper::Logger, Helper::Parser, Helper::Statistic
Defined in:
lib/spark/rdd.rb

Overview

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel. This class contains the basic operations available on all RDDs, such as ‘map`, `filter`, and `persist`.

Direct Known Subclasses

PipelinedRDD

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helper::Statistic

#bisect_right, #compute_fraction, #determine_bounds, #upper_binomial_bound, #upper_poisson_bound

Methods included from Helper::Parser

included

Methods included from Helper::Logger

included

Constructor Details

#initialize(jrdd, context, serializer, deserializer = nil) ⇒ RDD

Initializing RDD, this method is root of all Pipelined RDD - its unique If you call some operations on this class it will be computed in Java

Parameters:

jrdd

org.apache.spark.api.java.JavaRDD

context

Context

serializer

Serializer



27
28
29
30
31
32
33
34
35
# File 'lib/spark/rdd.rb', line 27

def initialize(jrdd, context, serializer, deserializer=nil)
  @jrdd = jrdd
  @context = context

  @cached = false
  @checkpointed = false

  @command = Spark::CommandBuilder.new(serializer, deserializer)
end

Instance Attribute Details

#commandObject (readonly)

Returns the value of attribute command.



11
12
13
# File 'lib/spark/rdd.rb', line 11

def command
  @command
end

#contextObject (readonly)

Returns the value of attribute context.



11
12
13
# File 'lib/spark/rdd.rb', line 11

def context
  @context
end

#jrddObject (readonly)

Returns the value of attribute jrdd.



11
12
13
# File 'lib/spark/rdd.rb', line 11

def jrdd
  @jrdd
end

Instance Method Details

#+(other) ⇒ Object

Operators



41
42
43
# File 'lib/spark/rdd.rb', line 41

def +(other)
  self.union(other)
end

#add_command(klass, *args) ⇒ Object

Commad and serializer



49
50
51
# File 'lib/spark/rdd.rb', line 49

def add_command(klass, *args)
  @command.deep_copy.add_command(klass, *args)
end

#add_library(*libraries) ⇒ Object Also known as: addLibrary

Add ruby library Libraries will be included before computing

Example:

rdd.add_library('pry').add_library('nio4r', 'distribution')


59
60
61
62
# File 'lib/spark/rdd.rb', line 59

def add_library(*libraries)
  @command.add_library(*libraries)
  self
end

#aggregate(zero_value, seq_op, comb_op) ⇒ Object

Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral “zero value”.

This function can return a different result type. We need one operation for merging.

Result must be an Array otherwise Serializer Array’s zero value will be send as multiple values and not just one.

Example:

# 1 2 3 4 5  => 15 + 1 = 16
# 6 7 8 9 10 => 40 + 1 = 41
# 16 * 41 = 656

seq = lambda{|x,y| x+y}
com = lambda{|x,y| x*y}

rdd = $sc.parallelize(1..10, 2, batch_size: 1)
rdd.aggregate(1, seq, com)
# => 656


300
301
302
# File 'lib/spark/rdd.rb', line 300

def aggregate(zero_value, seq_op, comb_op)
  _reduce(Spark::Command::Aggregate, seq_op, comb_op, zero_value)
end

#aggregate_by_key(zero_value, seq_func, comb_func, num_partitions = nil) ⇒ Object Also known as: aggregateByKey

Aggregate the values of each key, using given combine functions and a neutral zero value.

Example:

def combine(x,y)
  x+y
end

def merge(x,y)
  x*y
end

rdd = $sc.parallelize([["a", 1], ["b", 2], ["a", 3], ["a", 4], ["c", 5]], 2, batch_size: 1)
rdd.aggregate_by_key(1, method(:combine), method(:merge))
# => [["b", 3], ["a", 16], ["c", 6]]


980
981
982
983
984
985
986
# File 'lib/spark/rdd.rb', line 980

def aggregate_by_key(zero_value, seq_func, comb_func, num_partitions=nil)
  _combine_by_key(
    [Spark::Command::CombineByKey::CombineWithZero, zero_value, seq_func],
    [Spark::Command::CombineByKey::Merge, comb_func],
    num_partitions
  )
end

#bind(objects) ⇒ Object

Bind object to RDD

Example:

text = "test"

rdd = $sc.parallelize(0..5)
rdd = rdd.map(lambda{|x| x.to_s + " " + text})
rdd = rdd.bind(text: text)

rdd.collect
# => ["0 test", "1 test", "2 test", "3 test", "4 test", "5 test"]


76
77
78
79
80
81
82
83
# File 'lib/spark/rdd.rb', line 76

def bind(objects)
  unless objects.is_a?(Hash)
    raise ArgumentError, 'Argument must be a Hash.'
  end

  @command.bind(objects)
  self
end

#cacheObject

Persist this RDD with the default storage level MEMORY_ONLY_SER because of serialization.



113
114
115
# File 'lib/spark/rdd.rb', line 113

def cache
  persist('memory_only_ser')
end

#cached?Boolean

Returns:

  • (Boolean)


140
141
142
# File 'lib/spark/rdd.rb', line 140

def cached?
  @cached
end

#cartesian(other) ⇒ Object

Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements ‘(a, b)` where `a` is in `self` and `b` is in `other`.

Example:

rdd1 = $sc.parallelize([1,2,3])
rdd2 = $sc.parallelize([4,5,6])

rdd1.cartesian(rdd2).collect
# => [[1, 4], [1, 5], [1, 6], [2, 4], [2, 5], [2, 6], [3, 4], [3, 5], [3, 6]]


657
658
659
660
661
# File 'lib/spark/rdd.rb', line 657

def cartesian(other)
  _deserializer = Spark::Serializer::Cartesian.new.set(self.deserializer, other.deserializer)
  new_jrdd = jrdd.cartesian(other.jrdd)
  RDD.new(new_jrdd, context, serializer, _deserializer)
end

#checkpointed?Boolean

Returns:

  • (Boolean)


144
145
146
# File 'lib/spark/rdd.rb', line 144

def checkpointed?
  @checkpointed
end

#coalesce(num_partitions) ⇒ Object

Return a new RDD that is reduced into num_partitions partitions.

Example:

rdd = $sc.parallelize(0..10, 3)
rdd.coalesce(2).glom.collect
# => [[0, 1, 2], [3, 4, 5, 6, 7, 8, 9, 10]]


641
642
643
644
# File 'lib/spark/rdd.rb', line 641

def coalesce(num_partitions)
  new_jrdd = jrdd.coalesce(num_partitions)
  RDD.new(new_jrdd, context, @command.serializer, @command.deserializer)
end

#cogroup(*others) ⇒ Object

For each key k in ‘this` or `other`, return a resulting RDD that contains a tuple with the list of values for that key in `this` as well as `other`.

Example:

rdd1 = $sc.parallelize([["a", 1], ["a", 2], ["b", 3]])
rdd2 = $sc.parallelize([["a", 4], ["a", 5], ["b", 6]])
rdd3 = $sc.parallelize([["a", 7], ["a", 8], ["b", 9]])
rdd1.cogroup(rdd2, rdd3).collect
# => [["a", [1, 2, 4, 5, 7, 8]], ["b", [3, 6, 9]]]


1011
1012
1013
1014
1015
1016
1017
1018
# File 'lib/spark/rdd.rb', line 1011

def cogroup(*others)
  unioned = self
  others.each do |other|
    unioned = unioned.union(other)
  end

  unioned.group_by_key
end

#collectObject

Return an array that contains all of the elements in this RDD. RJB raise an error if stage is killed.



172
173
174
175
176
# File 'lib/spark/rdd.rb', line 172

def collect
  collect_from_iterator(jrdd.collect.iterator)
rescue => e
  raise Spark::RDDError, e.message
end

#collect_as_hashObject

Convert an Array to Hash



190
191
192
# File 'lib/spark/rdd.rb', line 190

def collect_as_hash
  Hash[collect]
end

#collect_from_iterator(iterator) ⇒ Object



178
179
180
181
182
183
184
185
186
# File 'lib/spark/rdd.rb', line 178

def collect_from_iterator(iterator)
  if self.is_a?(PipelinedRDD)
    klass = @command.serializer
  else
    klass = @command.deserializer
  end

  klass.load_from_iterator(iterator)
end

#combine_by_key(create_combiner, merge_value, merge_combiners, num_partitions = nil) ⇒ Object Also known as: combineByKey

Generic function to combine the elements for each key using a custom set of aggregation functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a “combined type” C * Note that V and C can be different – for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, List). Users provide three functions:

Parameters:

create_combiner

which turns a V into a C (e.g., creates a one-element list)

merge_value

to merge a V into a C (e.g., adds it to the end of a list)

merge_combiners

to combine two C’s into a single one.

Example:

def combiner(x)
  x
end

def merge(x,y)
  x+y
end

rdd = $sc.parallelize(["a","b","c","a","b","c","a","c"], 2, batch_size: 1).map(lambda{|x| [x, 1]})
rdd.combine_by_key(method(:combiner), method(:merge), method(:merge)).collect_as_hash
# => {"a"=>3, "b"=>2, "c"=>3}


913
914
915
916
917
918
919
# File 'lib/spark/rdd.rb', line 913

def combine_by_key(create_combiner, merge_value, merge_combiners, num_partitions=nil)
  _combine_by_key(
    [Spark::Command::CombineByKey::Combine, create_combiner, merge_value],
    [Spark::Command::CombineByKey::Merge, merge_combiners],
    num_partitions
  )
end

#compactObject

Return a new RDD containing non-nil elements.

Example:

rdd = $sc.parallelize([1, nil, 2, nil, 3])
rdd.compact.collect
# => [1, 2, 3]


619
620
621
# File 'lib/spark/rdd.rb', line 619

def compact
  new_rdd_from_command(Spark::Command::Compact)
end

#configObject

Variables and non-computing functions



94
95
96
# File 'lib/spark/rdd.rb', line 94

def config
  @context.config
end

#countObject

Return the number of values in this RDD

Example:

rdd = $sc.parallelize(0..10)
rdd.count
# => 11


344
345
346
347
348
# File 'lib/spark/rdd.rb', line 344

def count
  # nil is for seq_op => it means the all result go directly to one worker for combine
  @count ||= self.map_partitions('lambda{|iterator| iterator.to_a.size }')
                 .aggregate(0, nil, 'lambda{|sum, item| sum + item }')
end

#default_reduce_partitionsObject Also known as: defaultReducePartitions



98
99
100
# File 'lib/spark/rdd.rb', line 98

def default_reduce_partitions
  config['spark.default.parallelism'] || partitions_size
end

#distinctObject

Return a new RDD containing the distinct elements in this RDD. Ordering is not preserved because of reducing

Example:

rdd = $sc.parallelize([1,1,1,2,3])
rdd.distinct.collect
# => [1, 2, 3]


671
672
673
674
675
# File 'lib/spark/rdd.rb', line 671

def distinct
  self.map('lambda{|x| [x, nil]}')
      .reduce_by_key('lambda{|x,_| x}')
      .map('lambda{|x| x[0]}')
end

#filter(f) ⇒ Object

Return a new RDD containing only the elements that satisfy a predicate.

Example:

rdd = $sc.parallelize(0..10)
rdd.filter(lambda{|x| x.even?}).collect
# => [0, 2, 4, 6, 8, 10]


608
609
610
# File 'lib/spark/rdd.rb', line 608

def filter(f)
  new_rdd_from_command(Spark::Command::Filter, f)
end

#firstObject

Return the first element in this RDD.

Example:

rdd = $sc.parallelize(0..100)
rdd.first
# => 0


248
249
250
# File 'lib/spark/rdd.rb', line 248

def first
  self.take(1)[0]
end

#flat_map(f) ⇒ Object Also known as: flatMap

Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

Example:

rdd = $sc.parallelize(0..5)
rdd.flat_map(lambda {|x| [x, 1]}).collect
# => [0, 1, 1, 1, 2, 1, 3, 1, 4, 1, 5, 1]


574
575
576
# File 'lib/spark/rdd.rb', line 574

def flat_map(f)
  new_rdd_from_command(Spark::Command::FlatMap, f)
end

#flat_map_values(f) ⇒ Object

Pass each value in the key-value pair RDD through a flat_map function without changing the keys; this also retains the original RDD’s partitioning.

Example:

rdd = $sc.parallelize([["a", [1,2]], ["b", [3]]])
rdd = rdd.flat_map_values(lambda{|x| x*2})
rdd.collect
# => [["a", 1], ["a", 2], ["a", 1], ["a", 2], ["b", 3], ["b", 3]]


1161
1162
1163
# File 'lib/spark/rdd.rb', line 1161

def flat_map_values(f)
  new_rdd_from_command(Spark::Command::FlatMapValues, f)
end

#fold(zero_value, f) ⇒ Object

Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral “zero value”.

The function f(x, y) is allowed to modify x and return it as its result value to avoid object allocation; however, it should not modify y.

Be careful, zero_values is applied to all stages. See example.

Example:

rdd = $sc.parallelize(0..10, 2)
rdd.fold(1, lambda{|sum, x| sum+x})
# => 58


276
277
278
# File 'lib/spark/rdd.rb', line 276

def fold(zero_value, f)
  self.aggregate(zero_value, f, f)
end

#fold_by_key(zero_value, f, num_partitions = nil) ⇒ Object Also known as: foldByKey

Merge the values for each key using an associative function f and a neutral ‘zero_value` which may be added to the result an arbitrary number of times, and must not change the result (e.g., 0 for addition, or 1 for multiplication.).

Example:

rdd = $sc.parallelize([["a", 1], ["b", 2], ["a", 3], ["a", 4], ["c", 5]])
rdd.fold_by_key(1, lambda{|x,y| x+y})
# => [["a", 9], ["c", 6], ["b", 3]]


961
962
963
# File 'lib/spark/rdd.rb', line 961

def fold_by_key(zero_value, f, num_partitions=nil)
  self.aggregate_by_key(zero_value, f, f, num_partitions)
end

#foreach(f, options = {}) ⇒ Object

Applies a function f to all elements of this RDD.

Example:

rdd = $sc.parallelize(0..5)
rdd.foreach(lambda{|x| puts x})
# => nil


534
535
536
537
# File 'lib/spark/rdd.rb', line 534

def foreach(f, options={})
  new_rdd_from_command(Spark::Command::Foreach, f).collect
  nil
end

#foreach_partition(f, options = {}) ⇒ Object Also known as: foreachPartition

Applies a function f to each partition of this RDD.

Example:

rdd = $sc.parallelize(0..5)
rdd.foreachPartition(lambda{|x| puts x.to_s})
# => nil


546
547
548
549
# File 'lib/spark/rdd.rb', line 546

def foreach_partition(f, options={})
  new_rdd_from_command(Spark::Command::ForeachPartition, f).collect
  nil
end

#glomObject

Return an RDD created by coalescing all elements within each partition into an array.

Example:

rdd = $sc.parallelize(0..10, 3, batch_size: 1)
rdd.glom.collect
# => [[0, 1, 2], [3, 4, 5, 6], [7, 8, 9, 10]]


630
631
632
# File 'lib/spark/rdd.rb', line 630

def glom
  new_rdd_from_command(Spark::Command::Glom)
end

#group_by(f, num_partitions = nil) ⇒ Object Also known as: groupBy

Return an RDD of grouped items.

Example:

rdd = $sc.parallelize(0..5)
rdd.group_by(lambda{|x| x%2}).collect
# => [[0, [0, 2, 4]], [1, [1, 3, 5]]]


928
929
930
# File 'lib/spark/rdd.rb', line 928

def group_by(f, num_partitions=nil)
  self.key_by(f).group_by_key(num_partitions)
end

#group_by_key(num_partitions = nil) ⇒ Object Also known as: groupByKey

Group the values for each key in the RDD into a single sequence. Allows controlling the partitioning of the resulting key-value pair RDD by passing a Partitioner.

Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduce_by_key or combine_by_key will provide much better performance.

Example:

rdd = $sc.parallelize([["a", 1], ["a", 2], ["b", 3]])
rdd.group_by_key.collect
# => [["a", [1, 2]], ["b", [3]]]


943
944
945
946
947
948
949
# File 'lib/spark/rdd.rb', line 943

def group_by_key(num_partitions=nil)
  create_combiner = 'lambda{|item| [item]}'
  merge_value     = 'lambda{|combiner, item| combiner << item; combiner}'
  merge_combiners = 'lambda{|combiner_1, combiner_2| combiner_1 += combiner_2; combiner_1}'

  combine_by_key(create_combiner, merge_value, merge_combiners, num_partitions)
end

#group_with(other, num_partitions = nil) ⇒ Object Also known as: groupWith

The same functionality as cogroup but this can grouped only 2 rdd’s and you can change num_partitions.

Example:

rdd1 = $sc.parallelize([["a", 1], ["a", 2], ["b", 3]])
rdd2 = $sc.parallelize([["a", 4], ["a", 5], ["b", 6]])
rdd1.group_with(rdd2).collect
# => [["a", [1, 2, 4, 5]], ["b", [3, 6]]]


997
998
999
# File 'lib/spark/rdd.rb', line 997

def group_with(other, num_partitions=nil)
  self.union(other).group_by_key(num_partitions)
end

#histogram(buckets) ⇒ Object

Compute a histogram using the provided buckets. The buckets are all open to the right except for the last which is closed. e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50], which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 and 50 we would have a histogram of 1,0,1.

If your histogram is evenly spaced (e.g. [0, 10, 20, 30]), this can be switched from an O(log n) inseration to O(1) per element(where n = # buckets).

Buckets must be sorted and not contain any duplicates, must be at least two elements.

Examples:

rdd = $sc.parallelize(0..50)

rdd.histogram(2)
# => [[0.0, 25.0, 50], [25, 26]]

rdd.histogram([0, 5, 25, 50])
# => [[0, 5, 25, 50], [5, 20, 26]]

rdd.histogram([0, 15, 30, 45, 60])
# => [[0, 15, 30, 45, 60], [15, 15, 15, 6]]


434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
# File 'lib/spark/rdd.rb', line 434

def histogram(buckets)

  # -----------------------------------------------------------------------
  # Integer
  #
  if buckets.is_a?(Integer)

    # Validation
    if buckets < 1
      raise ArgumentError, "Bucket count must be >= 1, #{buckets} inserted."
    end

    # Filter invalid values
    # Nil and NaN
    func = 'lambda{|x|
      if x.nil? || (x.is_a?(Float) && x.nan?)
        false
      else
        true
      end
    }'
    filtered = self.filter(func)

    # Compute the minimum and the maximum
    func = 'lambda{|memo, item|
      [memo[0] < item[0] ? memo[0] : item[0],
       memo[1] > item[1] ? memo[1] : item[1]]
    }'
    min, max = filtered.map('lambda{|x| [x, x]}').reduce(func)

    # Min, max must be valid numbers
    if (min.is_a?(Float) && !min.finite?) || (max.is_a?(Float) && !max.finite?)
      raise Spark::RDDError, 'Histogram on either an empty RDD or RDD containing +/-infinity or NaN'
    end

    # Already finished
    if min == max || buckets == 1
      return [min, max], [filtered.count]
    end

    # Custom range
    begin
      span = max - min # increment
      buckets = (0...buckets).map do |x|
        min + (x * span) / buckets.to_f
      end
      buckets << max
    rescue NoMethodError
      raise Spark::RDDError, 'Can not generate buckets with non-number in RDD'
    end

    even = true

  # -----------------------------------------------------------------------
  # Array
  #
  elsif buckets.is_a?(Array)

    if buckets.size < 2
      raise ArgumentError, 'Buckets should have more than one value.'
    end

    if buckets.detect{|x| x.nil? || (x.is_a?(Float) && x.nan?)}
      raise ArgumentError, 'Can not have nil or nan numbers in buckets.'
    end

    if buckets.detect{|x| buckets.count(x) > 1}
      raise ArgumentError, 'Buckets should not contain duplicated values.'
    end

    if buckets.sort != buckets
      raise ArgumentError, 'Buckets must be sorted.'
    end

    even = false

  # -----------------------------------------------------------------------
  # Other
  #
  else
    raise Spark::RDDError, 'Buckets should be number or array.'
  end

  reduce_func = 'lambda{|memo, item|
    memo.size.times do |i|
      memo[i] += item[i]
    end
    memo
  }'

  return buckets, new_rdd_from_command(Spark::Command::Histogram, even, buckets).reduce(reduce_func)
end

#idObject

A unique ID for this RDD (within its SparkContext).



108
109
110
# File 'lib/spark/rdd.rb', line 108

def id
  jrdd.id
end

#intersection(other) ⇒ Object

Return the intersection of this RDD and another one. The output will not contain any duplicate elements, even if the input RDDs did.

Example:

rdd1 = $sc.parallelize([1,2,3,4,5])
rdd2 = $sc.parallelize([1,4,5,6,7])
rdd1.intersection(rdd2).collect
# => [1, 4, 5]


739
740
741
742
743
744
745
746
747
# File 'lib/spark/rdd.rb', line 739

def intersection(other)
  mapping_function = 'lambda{|item| [item, nil]}'
  filter_function  = 'lambda{|(key, values)| values.size > 1}'

  self.map(mapping_function)
      .cogroup(other.map(mapping_function))
      .filter(filter_function)
      .keys
end

#key_by(f) ⇒ Object Also known as: keyBy

Creates array of the elements in this RDD by applying function f.

Example:

rdd = $sc.parallelize(0..5)
rdd.key_by(lambda{|x| x%2}).collect
# => [[0, 0], [1, 1], [0, 2], [1, 3], [0, 4], [1, 5]]


1133
1134
1135
# File 'lib/spark/rdd.rb', line 1133

def key_by(f)
  new_rdd_from_command(Spark::Command::KeyBy, f)
end

#keysObject

Return an RDD with the first element of PairRDD

Example:

rdd = $sc.parallelize([[1,2], [3,4], [5,6]])
rdd.keys.collect
# => [1, 3, 5]


1172
1173
1174
# File 'lib/spark/rdd.rb', line 1172

def keys
  self.map('lambda{|(key, _)| key}')
end

#map(f) ⇒ Object

Return a new RDD by applying a function to all elements of this RDD.

Example:

rdd = $sc.parallelize(0..5)
rdd.map(lambda {|x| x*2}).collect
# => [0, 2, 4, 6, 8, 10]


562
563
564
# File 'lib/spark/rdd.rb', line 562

def map(f)
  new_rdd_from_command(Spark::Command::Map, f)
end

#map_partitions(f) ⇒ Object Also known as: mapPartitions

Return a new RDD by applying a function to each partition of this RDD.

Example:

rdd = $sc.parallelize(0..10, 2)
rdd.map_partitions(lambda{|part| part.reduce(:+)}).collect
# => [15, 40]


585
586
587
# File 'lib/spark/rdd.rb', line 585

def map_partitions(f)
  new_rdd_from_command(Spark::Command::MapPartitions, f)
end

#map_partitions_with_index(f, options = {}) ⇒ Object Also known as: mapPartitionsWithIndex

Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.

Example:

rdd = $sc.parallelize(0...4, 4, batch_size: 1)
rdd.map_partitions_with_index(lambda{|part, index| part.first * index}).collect
# => [0, 1, 4, 9]


597
598
599
# File 'lib/spark/rdd.rb', line 597

def map_partitions_with_index(f, options={})
  new_rdd_from_command(Spark::Command::MapPartitionsWithIndex, f)
end

#map_values(f) ⇒ Object Also known as: mapValues

Pass each value in the key-value pair RDD through a map function without changing the keys. This also retains the original RDD’s partitioning.

Example:

rdd = $sc.parallelize(["ruby", "scala", "java"])
rdd = rdd.map(lambda{|x| [x, x]})
rdd = rdd.map_values(lambda{|x| x.upcase})
rdd.collect
# => [["ruby", "RUBY"], ["scala", "SCALA"], ["java", "JAVA"]]


1147
1148
1149
# File 'lib/spark/rdd.rb', line 1147

def map_values(f)
  new_rdd_from_command(Spark::Command::MapValues, f)
end

#maxObject

Return the max of this RDD

Example:

rdd = $sc.parallelize(0..10)
rdd.max
# => 10


311
312
313
# File 'lib/spark/rdd.rb', line 311

def max
  self.reduce('lambda{|memo, item| memo > item ? memo : item }')
end

#meanObject

Compute the mean of this RDD’s elements.

Example:

$sc.parallelize([1, 2, 3]).mean
# => 2.0


362
363
364
# File 'lib/spark/rdd.rb', line 362

def mean
  stats.mean
end

#minObject

Return the min of this RDD

Example:

rdd = $sc.parallelize(0..10)
rdd.min
# => 0


322
323
324
# File 'lib/spark/rdd.rb', line 322

def min
  self.reduce('lambda{|memo, item| memo < item ? memo : item }')
end

#nameObject

Return the name of this RDD.



150
151
152
153
# File 'lib/spark/rdd.rb', line 150

def name
  _name = jrdd.name
  _name && _name.encode(Encoding::UTF_8)
end

#new_rdd_from_command(klass, *args) ⇒ Object



85
86
87
88
# File 'lib/spark/rdd.rb', line 85

def new_rdd_from_command(klass, *args)
  comm = add_command(klass, *args)
  PipelinedRDD.new(self, comm)
end

#partition_by(num_partitions, partition_func = nil) ⇒ Object Also known as: partitionBy

Return a copy of the RDD partitioned using the specified partitioner.

Example:

rdd = $sc.parallelize(["1","2","3","4","5"]).map(lambda {|x| [x, 1]})
rdd.partitionBy(2).glom.collect
# => [[["3", 1], ["4", 1]], [["1", 1], ["2", 1], ["5", 1]]]


756
757
758
759
760
761
# File 'lib/spark/rdd.rb', line 756

def partition_by(num_partitions, partition_func=nil)
  num_partitions ||= default_reduce_partitions
  partition_func ||= 'lambda{|x| Spark::Digest.portable_hash(x.to_s)}'

  _partition_by(num_partitions, Spark::Command::PartitionBy::Basic, partition_func)
end

#partitions_sizeObject Also known as: partitionsSize

Count of ParallelCollectionPartition



103
104
105
# File 'lib/spark/rdd.rb', line 103

def partitions_size
  jrdd.rdd.partitions.size
end

#persist(new_level) ⇒ Object

Set this RDD’s storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet.

See StorageLevel for type of new_level



123
124
125
126
127
# File 'lib/spark/rdd.rb', line 123

def persist(new_level)
  @cached = true
  jrdd.persist(Spark::StorageLevel.java_get(new_level))
  self
end

#pipe(*cmds) ⇒ Object

Return an RDD created by piping elements to a forked external process.

Cmds:

cmd = [env,] command... [,options]

env: hash
  name => val : set the environment variable
  name => nil : unset the environment variable
command...:
  commandline                 : command line string which is passed to the standard shell
  cmdname, arg1, ...          : command name and one or more arguments (This form does
                                not use the shell. See below for caveats.)
  [cmdname, argv0], arg1, ... : command name, argv[0] and zero or more arguments (no shell)
options: hash

See http://ruby-doc.org/core-2.2.0/Process.html#method-c-spawn

Examples:

$sc.parallelize(0..5).pipe('cat').collect
# => ["0", "1", "2", "3", "4", "5"]

rdd = $sc.parallelize(0..5)
rdd = rdd.pipe('cat', "awk '{print $1*10}'")
rdd = rdd.map(lambda{|x| x.to_i + 1})
rdd.collect
# => [1, 11, 21, 31, 41, 51]


867
868
869
# File 'lib/spark/rdd.rb', line 867

def pipe(*cmds)
  new_rdd_from_command(Spark::Command::Pipe, cmds)
end

#reduce(f) ⇒ Object

Reduces the elements of this RDD using the specified lambda or method.

Example:

rdd = $sc.parallelize(0..10)
rdd.reduce(lambda{|sum, x| sum+x})
# => 55


259
260
261
# File 'lib/spark/rdd.rb', line 259

def reduce(f)
  _reduce(Spark::Command::Reduce, f, f)
end

#reduce_by_key(f, num_partitions = nil) ⇒ Object Also known as: reduceByKey

Merge the values for each key using an associative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce. Output will be hash-partitioned with the existing partitioner/ parallelism level.

Example:

rdd = $sc.parallelize(["a","b","c","a","b","c","a","c"]).map(lambda{|x| [x, 1]})
rdd.reduce_by_key(lambda{|x,y| x+y}).collect_as_hash
# => {"a"=>3, "b"=>2, "c"=>3}


885
886
887
# File 'lib/spark/rdd.rb', line 885

def reduce_by_key(f, num_partitions=nil)
  combine_by_key('lambda {|x| x}', f, f, num_partitions)
end

#reserialize(new_serializer, new_batch_size = nil) ⇒ Object

Return a new RDD with different serializer. This method is useful during union and join operations.

Example:

rdd = $sc.parallelize([1, 2, 3], nil, serializer: "marshal")
rdd = rdd.map(lambda{|x| x.to_s})
rdd.reserialize("oj").collect
# => ["1", "2", "3"]


716
717
718
719
720
721
722
723
724
725
726
727
728
# File 'lib/spark/rdd.rb', line 716

def reserialize(new_serializer, new_batch_size=nil)
  new_batch_size ||= deserializer.batch_size
  new_serializer = Spark::Serializer.get!(new_serializer).new(new_batch_size)

  if serializer == new_serializer
    return self
  end

  new_command = @command.deep_copy
  new_command.serializer = new_serializer

  PipelinedRDD.new(self, new_command)
end

#sample(with_replacement, fraction, seed = nil) ⇒ Object

Return a sampled subset of this RDD. Operations are base on Poisson and Uniform distributions. TODO: Replace Unfirom for Bernoulli

Examples:

rdd = $sc.parallelize(0..100)

rdd.sample(true, 10).collect
# => [17, 17, 22, 23, 51, 52, 62, 64, 69, 70, 96]

rdd.sample(false, 0.1).collect
# => [3, 5, 9, 32, 44, 55, 66, 68, 75, 80, 86, 91, 98]


776
777
778
# File 'lib/spark/rdd.rb', line 776

def sample(with_replacement, fraction, seed=nil)
  new_rdd_from_command(Spark::Command::Sample, with_replacement, fraction, seed)
end

#sample_stdevObject Also known as: sampleStdev

Compute the sample standard deviation of this RDD’s elements (which corrects for bias in estimating the standard deviation by dividing by N-1 instead of N).

Example:

$sc.parallelize([1, 2, 3]).sample_stdev
# => 1.0


394
395
396
# File 'lib/spark/rdd.rb', line 394

def sample_stdev
  stats.sample_stdev
end

#sample_varianceObject Also known as: sampleVariance

Compute the sample variance of this RDD’s elements (which corrects for bias in estimating the variance by dividing by N-1 instead of N).

Example:

$sc.parallelize([1, 2, 3]).sample_variance
# => 1.0


405
406
407
# File 'lib/spark/rdd.rb', line 405

def sample_variance
  stats.sample_variance
end

#set_name(name) ⇒ Object Also known as: setName

Assign a name to this RDD.



157
158
159
# File 'lib/spark/rdd.rb', line 157

def set_name(name)
  jrdd.setName(name)
end

#shuffle(seed = nil) ⇒ Object

Return a shuffled RDD.

Example:

rdd = $sc.parallelize(0..10)
rdd.shuffle.collect
# => [3, 10, 6, 7, 8, 0, 4, 2, 9, 1, 5]


684
685
686
687
688
# File 'lib/spark/rdd.rb', line 684

def shuffle(seed=nil)
  seed ||= Random.new_seed

  new_rdd_from_command(Spark::Command::Shuffle, seed)
end

#sort_by(key_function = nil, ascending = true, num_partitions = nil) ⇒ Object Also known as: sortBy

Sorts this RDD by the given key_function

This is a different implementation than spark. Sort by doesn’t use key_by method first. It can be slower but take less memory and you can always use map.sort_by_key

Example:

rdd = $sc.parallelize(["aaaaaaa", "cc", "b", "eeee", "ddd"])

rdd.sort_by.collect
# => ["aaaaaaa", "b", "cc", "ddd", "eeee"]

rdd.sort_by(lambda{|x| x.size}).collect
# => ["b", "cc", "ddd", "eeee", "aaaaaaa"]


1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
# File 'lib/spark/rdd.rb', line 1082

def sort_by(key_function=nil, ascending=true, num_partitions=nil)
  key_function   ||= 'lambda{|x| x}'
  num_partitions ||= default_reduce_partitions

  command_klass = Spark::Command::SortByKey

  # Allow spill data to disk due to memory limit
  # spilling = config['spark.shuffle.spill'] || false
  spilling = false
  memory = ''

  # Set spilling to false if worker has unlimited memory
  if memory.empty?
    spilling = false
    memory   = nil
  else
    memory = to_memory_size(memory)
  end

  # Sorting should do one worker
  if num_partitions == 1
    rdd = self
    rdd = rdd.coalesce(1) if partitions_size > 1
    return rdd.new_rdd_from_command(command_klass, key_function, ascending, spilling, memory, serializer)
  end

  # Compute boundary of collection
  # Collection should be evenly distributed
  # 20.0 is from scala RangePartitioner (for roughly balanced output partitions)
  count = self.count
  sample_size = num_partitions * 20.0
  fraction = [sample_size / [count, 1].max, 1.0].min
  samples = self.sample(false, fraction, 1).map(key_function).collect
  samples.sort!
  # Reverse is much faster than reverse sort_by
  samples.reverse! if !ascending

  # Determine part bounds
  bounds = determine_bounds(samples, num_partitions)

  shuffled = _partition_by(num_partitions, Spark::Command::PartitionBy::Sorting, key_function, bounds, ascending, num_partitions)
  shuffled.new_rdd_from_command(command_klass, key_function, ascending, spilling, memory, serializer)
end

#sort_by_key(ascending = true, num_partitions = nil) ⇒ Object Also known as: sortByKey

Sort the RDD by key

Example:

rdd = $sc.parallelize([["c", 1], ["b", 2], ["a", 3]])
rdd.sort_by_key.collect
# => [["a", 3], ["b", 2], ["c", 1]]


1063
1064
1065
# File 'lib/spark/rdd.rb', line 1063

def sort_by_key(ascending=true, num_partitions=nil)
  self.sort_by('lambda{|(key, _)| key}')
end

#statsObject

Return a StatCounter object that captures the mean, variance and count of the RDD’s elements in one operation.



352
353
354
# File 'lib/spark/rdd.rb', line 352

def stats
  @stats ||= new_rdd_from_command(Spark::Command::Stats).reduce('lambda{|memo, item| memo.merge(item)}')
end

#stdevObject

Compute the standard deviation of this RDD’s elements.

Example:

$sc.parallelize([1, 2, 3]).stdev
# => 0.816...


382
383
384
# File 'lib/spark/rdd.rb', line 382

def stdev
  stats.stdev
end

#subtract(other, num_partitions = nil) ⇒ Object

Return an RDD with the elements from self that are not in other.

Example:

rdd1 = $sc.parallelize([["a", 1], ["a", 2], ["b", 3], ["c", 4]])
rdd2 = $sc.parallelize([["a", 2], ["c", 6]])
rdd1.subtract(rdd2).collect
# => [["a", 1], ["b", 3], ["c", 4]]


1048
1049
1050
1051
1052
1053
1054
# File 'lib/spark/rdd.rb', line 1048

def subtract(other, num_partitions=nil)
  mapping_function = 'lambda{|x| [x,nil]}'

  self.map(mapping_function)
      .subtract_by_key(other.map(mapping_function), num_partitions)
      .keys
end

#subtract_by_key(other, num_partitions = nil) ⇒ Object Also known as: subtractByKey

Return each (key, value) pair in self RDD that has no pair with matching key in other RDD.

Example:

rdd1 = $sc.parallelize([["a", 1], ["a", 2], ["b", 3], ["c", 4]])
rdd2 = $sc.parallelize([["b", 5], ["c", 6]])
rdd1.subtract_by_key(rdd2).collect
# => [["a", 1], ["a", 2]]


1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
# File 'lib/spark/rdd.rb', line 1029

def subtract_by_key(other, num_partitions=nil)
  create_combiner = 'lambda{|item| [[item]]}'
  merge_value     = 'lambda{|combiner, item| combiner.first << item; combiner}'
  merge_combiners = 'lambda{|combiner_1, combiner_2| combiner_1 += combiner_2; combiner_1}'

  self.union(other)
      .combine_by_key(create_combiner, merge_value, merge_combiners, num_partitions)
      .filter('lambda{|(key,values)| values.size == 1}')
      .flat_map_values('lambda{|item| item.first}')
end

#sumObject

Return the sum of this RDD

Example:

rdd = $sc.parallelize(0..10)
rdd.sum
# => 55


333
334
335
# File 'lib/spark/rdd.rb', line 333

def sum
  self.reduce('lambda{|sum, item| sum + item}')
end

#take(count) ⇒ Object

Take the first num elements of the RDD.

It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.

Example:

rdd = $sc.parallelize(0..100, 20, batch_size: 1)
rdd.take(5)
# => [0, 1, 2, 3, 4]


205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/spark/rdd.rb', line 205

def take(count)
  buffer = []

  parts_count = self.partitions_size
  # No parts was scanned, yet
  last_scanned = -1

  while buffer.empty?
    last_scanned += 1
    buffer += context.run_job_with_command(self, [last_scanned], true, Spark::Command::Take, 0, -1)
  end

  # Assumption. Depend on batch_size and how Spark divided data.
  items_per_part = buffer.size
  left = count - buffer.size

  while left > 0 && last_scanned < parts_count
    parts_to_take = (left.to_f/items_per_part).ceil
    parts_for_scanned = Array.new(parts_to_take) do
      last_scanned += 1
    end

    # We cannot take exact number of items because workers are isolated from each other.
    # => once you take e.g. 50% from last part and left is still > 0 then its very
    # difficult merge new items
    items = context.run_job_with_command(self, parts_for_scanned, true, Spark::Command::Take, left, last_scanned)
    buffer += items

    left = count - buffer.size
    # Average size of all parts
    items_per_part = [items_per_part, items.size].reduce(0){|sum, x| sum + x.to_f/2}
  end

  buffer.slice!(0, count)
end

#take_sample(with_replacement, num, seed = nil) ⇒ Object Also known as: takeSample

Return a fixed-size sampled subset of this RDD in an array

Examples:

rdd = $sc.parallelize(0..100)

rdd.take_sample(true, 10)
# => [90, 84, 74, 44, 27, 22, 72, 96, 80, 54]

rdd.take_sample(false, 10)
# => [5, 35, 30, 48, 22, 33, 40, 75, 42, 32]


791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
# File 'lib/spark/rdd.rb', line 791

def take_sample(with_replacement, num, seed=nil)

  if num < 0
    raise Spark::RDDError, 'Size have to be greater than 0'
  elsif num == 0
    return []
  end

  # Taken from scala
  num_st_dev = 10.0

  # Number of items
  initial_count = self.count
  return [] if initial_count == 0

  # Create new generator
  seed ||= Random.new_seed
  rng = Random.new(seed)

  # Shuffle elements if requested num if greater than array size
  if !with_replacement && num >= initial_count
    return self.shuffle(seed).collect
  end

  # Max num
  max_sample_size = Integer::MAX - (num_st_dev * Math.sqrt(Integer::MAX)).to_i
  if num > max_sample_size
    raise Spark::RDDError, "Size can not be greate than #{max_sample_size}"
  end

  # Approximate fraction with tolerance
  fraction = compute_fraction(num, initial_count, with_replacement)

  # Compute first samled subset
  samples = self.sample(with_replacement, fraction, seed).collect

  # If the first sample didn't turn out large enough, keep trying to take samples;
  # this shouldn't happen often because we use a big multiplier for their initial size.
  index = 0
  while samples.size < num
    log_warning("Needed to re-sample due to insufficient sample size. Repeat #{index}")
    samples = self.sample(with_replacement, fraction, rng.rand(0..Integer::MAX)).collect
    index += 1
  end

  samples.shuffle!(random: rng)
  samples[0, num]
end

#to_javaObject



161
162
163
164
# File 'lib/spark/rdd.rb', line 161

def to_java
  rdd = self.reserialize('Marshal')
  RubyRDD.toJava(rdd.jrdd, rdd.serializer.batched?)
end

#union(other) ⇒ Object

Return the union of this RDD and another one. Any identical elements will appear multiple times (use .distinct to eliminate them).

Example:

rdd = $sc.parallelize([1, 2, 3])
rdd.union(rdd).collect
# => [1, 2, 3, 1, 2, 3]


698
699
700
701
702
703
704
705
# File 'lib/spark/rdd.rb', line 698

def union(other)
  if self.serializer != other.serializer
    other = other.reserialize(serializer.name, serializer.batch_size)
  end

  new_jrdd = jrdd.union(other.jrdd)
  RDD.new(new_jrdd, context, serializer, deserializer)
end

#unpersist(blocking = true) ⇒ Object

Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.

Parameters:

blocking

whether to block until all blocks are deleted.



134
135
136
137
138
# File 'lib/spark/rdd.rb', line 134

def unpersist(blocking=true)
  @cached = false
  jrdd.unpersist(blocking)
  self
end

#valuesObject

Return an RDD with the second element of PairRDD

Example:

rdd = $sc.parallelize([[1,2], [3,4], [5,6]])
rdd.keys.collect
# => [2, 4, 6]


1183
1184
1185
# File 'lib/spark/rdd.rb', line 1183

def values
  self.map('lambda{|(_, value)| value}')
end

#varianceObject

Compute the variance of this RDD’s elements.

Example:

$sc.parallelize([1, 2, 3]).variance
# => 0.666...


372
373
374
# File 'lib/spark/rdd.rb', line 372

def variance
  stats.variance
end