Class: Spark::RDD
- Inherits:
-
Object
- Object
- Spark::RDD
- Extended by:
- Forwardable
- Includes:
- Helper::Logger, Helper::Parser, Helper::Statistic
- Defined in:
- lib/spark/rdd.rb
Overview
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel. This class contains the basic operations available on all RDDs, such as ‘map`, `filter`, and `persist`.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#command ⇒ Object
readonly
Returns the value of attribute command.
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#jrdd ⇒ Object
readonly
Returns the value of attribute jrdd.
Instance Method Summary collapse
-
#+(other) ⇒ Object
Operators.
-
#add_command(klass, *args) ⇒ Object
Commad and serializer.
-
#add_library(*libraries) ⇒ Object
(also: #addLibrary)
Add ruby library Libraries will be included before computing.
-
#aggregate(zero_value, seq_op, comb_op) ⇒ Object
Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral “zero value”.
-
#aggregate_by_key(zero_value, seq_func, comb_func, num_partitions = nil) ⇒ Object
(also: #aggregateByKey)
Aggregate the values of each key, using given combine functions and a neutral zero value.
-
#bind(objects) ⇒ Object
Bind object to RDD.
-
#cache ⇒ Object
Persist this RDD with the default storage level MEMORY_ONLY_SER because of serialization.
- #cached? ⇒ Boolean
-
#cartesian(other) ⇒ Object
Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements ‘(a, b)` where `a` is in `self` and `b` is in `other`.
- #checkpointed? ⇒ Boolean
-
#coalesce(num_partitions) ⇒ Object
Return a new RDD that is reduced into num_partitions partitions.
-
#cogroup(*others) ⇒ Object
For each key k in ‘this` or `other`, return a resulting RDD that contains a tuple with the list of values for that key in `this` as well as `other`.
-
#collect ⇒ Object
Return an array that contains all of the elements in this RDD.
-
#collect_as_hash ⇒ Object
Convert an Array to Hash.
- #collect_from_iterator(iterator) ⇒ Object
-
#combine_by_key(create_combiner, merge_value, merge_combiners, num_partitions = nil) ⇒ Object
(also: #combineByKey)
Generic function to combine the elements for each key using a custom set of aggregation functions.
-
#compact ⇒ Object
Return a new RDD containing non-nil elements.
-
#config ⇒ Object
Variables and non-computing functions.
-
#count ⇒ Object
Return the number of values in this RDD.
- #default_reduce_partitions ⇒ Object (also: #defaultReducePartitions)
-
#distinct ⇒ Object
Return a new RDD containing the distinct elements in this RDD.
-
#filter(f) ⇒ Object
Return a new RDD containing only the elements that satisfy a predicate.
-
#first ⇒ Object
Return the first element in this RDD.
-
#flat_map(f) ⇒ Object
(also: #flatMap)
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
-
#flat_map_values(f) ⇒ Object
Pass each value in the key-value pair RDD through a flat_map function without changing the keys; this also retains the original RDD’s partitioning.
-
#fold(zero_value, f) ⇒ Object
Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral “zero value”.
-
#fold_by_key(zero_value, f, num_partitions = nil) ⇒ Object
(also: #foldByKey)
Merge the values for each key using an associative function f and a neutral ‘zero_value` which may be added to the result an arbitrary number of times, and must not change the result (e.g., 0 for addition, or 1 for multiplication.).
-
#foreach(f, options = {}) ⇒ Object
Applies a function f to all elements of this RDD.
-
#foreach_partition(f, options = {}) ⇒ Object
(also: #foreachPartition)
Applies a function f to each partition of this RDD.
-
#glom ⇒ Object
Return an RDD created by coalescing all elements within each partition into an array.
-
#group_by(f, num_partitions = nil) ⇒ Object
(also: #groupBy)
Return an RDD of grouped items.
-
#group_by_key(num_partitions = nil) ⇒ Object
(also: #groupByKey)
Group the values for each key in the RDD into a single sequence.
-
#group_with(other, num_partitions = nil) ⇒ Object
(also: #groupWith)
The same functionality as cogroup but this can grouped only 2 rdd’s and you can change num_partitions.
-
#histogram(buckets) ⇒ Object
Compute a histogram using the provided buckets.
-
#id ⇒ Object
A unique ID for this RDD (within its SparkContext).
-
#initialize(jrdd, context, serializer, deserializer = nil) ⇒ RDD
constructor
Initializing RDD, this method is root of all Pipelined RDD - its unique If you call some operations on this class it will be computed in Java.
-
#intersection(other) ⇒ Object
Return the intersection of this RDD and another one.
-
#key_by(f) ⇒ Object
(also: #keyBy)
Creates array of the elements in this RDD by applying function f.
-
#keys ⇒ Object
Return an RDD with the first element of PairRDD.
-
#map(f) ⇒ Object
Return a new RDD by applying a function to all elements of this RDD.
-
#map_partitions(f) ⇒ Object
(also: #mapPartitions)
Return a new RDD by applying a function to each partition of this RDD.
-
#map_partitions_with_index(f, options = {}) ⇒ Object
(also: #mapPartitionsWithIndex)
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
-
#map_values(f) ⇒ Object
(also: #mapValues)
Pass each value in the key-value pair RDD through a map function without changing the keys.
-
#max ⇒ Object
Return the max of this RDD.
-
#mean ⇒ Object
Compute the mean of this RDD’s elements.
-
#min ⇒ Object
Return the min of this RDD.
-
#name ⇒ Object
Return the name of this RDD.
- #new_rdd_from_command(klass, *args) ⇒ Object
-
#partition_by(num_partitions, partition_func = nil) ⇒ Object
(also: #partitionBy)
Return a copy of the RDD partitioned using the specified partitioner.
-
#partitions_size ⇒ Object
(also: #partitionsSize)
Count of ParallelCollectionPartition.
-
#persist(new_level) ⇒ Object
Set this RDD’s storage level to persist its values across operations after the first time it is computed.
-
#pipe(*cmds) ⇒ Object
Return an RDD created by piping elements to a forked external process.
-
#reduce(f) ⇒ Object
Reduces the elements of this RDD using the specified lambda or method.
-
#reduce_by_key(f, num_partitions = nil) ⇒ Object
(also: #reduceByKey)
Merge the values for each key using an associative reduce function.
-
#reserialize(new_serializer, new_batch_size = nil) ⇒ Object
Return a new RDD with different serializer.
-
#sample(with_replacement, fraction, seed = nil) ⇒ Object
Return a sampled subset of this RDD.
-
#sample_stdev ⇒ Object
(also: #sampleStdev)
Compute the sample standard deviation of this RDD’s elements (which corrects for bias in estimating the standard deviation by dividing by N-1 instead of N).
-
#sample_variance ⇒ Object
(also: #sampleVariance)
Compute the sample variance of this RDD’s elements (which corrects for bias in estimating the variance by dividing by N-1 instead of N).
-
#set_name(name) ⇒ Object
(also: #setName)
Assign a name to this RDD.
-
#shuffle(seed = nil) ⇒ Object
Return a shuffled RDD.
-
#sort_by(key_function = nil, ascending = true, num_partitions = nil) ⇒ Object
(also: #sortBy)
Sorts this RDD by the given key_function.
-
#sort_by_key(ascending = true, num_partitions = nil) ⇒ Object
(also: #sortByKey)
Sort the RDD by key.
-
#stats ⇒ Object
Return a StatCounter object that captures the mean, variance and count of the RDD’s elements in one operation.
-
#stdev ⇒ Object
Compute the standard deviation of this RDD’s elements.
-
#subtract(other, num_partitions = nil) ⇒ Object
Return an RDD with the elements from self that are not in other.
-
#subtract_by_key(other, num_partitions = nil) ⇒ Object
(also: #subtractByKey)
Return each (key, value) pair in self RDD that has no pair with matching key in other RDD.
-
#sum ⇒ Object
Return the sum of this RDD.
-
#take(count) ⇒ Object
Take the first num elements of the RDD.
-
#take_sample(with_replacement, num, seed = nil) ⇒ Object
(also: #takeSample)
Return a fixed-size sampled subset of this RDD in an array.
- #to_java ⇒ Object
-
#union(other) ⇒ Object
Return the union of this RDD and another one.
-
#unpersist(blocking = true) ⇒ Object
Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
-
#values ⇒ Object
Return an RDD with the second element of PairRDD.
-
#variance ⇒ Object
Compute the variance of this RDD’s elements.
Methods included from Helper::Statistic
#bisect_right, #compute_fraction, #determine_bounds, #upper_binomial_bound, #upper_poisson_bound
Methods included from Helper::Parser
Methods included from Helper::Logger
Constructor Details
#initialize(jrdd, context, serializer, deserializer = nil) ⇒ RDD
Initializing RDD, this method is root of all Pipelined RDD - its unique If you call some operations on this class it will be computed in Java
Parameters:
- jrdd
-
org.apache.spark.api.java.JavaRDD
- context
- serializer
27 28 29 30 31 32 33 34 35 |
# File 'lib/spark/rdd.rb', line 27 def initialize(jrdd, context, serializer, deserializer=nil) @jrdd = jrdd @context = context @cached = false @checkpointed = false @command = Spark::CommandBuilder.new(serializer, deserializer) end |
Instance Attribute Details
#command ⇒ Object (readonly)
Returns the value of attribute command.
11 12 13 |
# File 'lib/spark/rdd.rb', line 11 def command @command end |
#context ⇒ Object (readonly)
Returns the value of attribute context.
11 12 13 |
# File 'lib/spark/rdd.rb', line 11 def context @context end |
#jrdd ⇒ Object (readonly)
Returns the value of attribute jrdd.
11 12 13 |
# File 'lib/spark/rdd.rb', line 11 def jrdd @jrdd end |
Instance Method Details
#+(other) ⇒ Object
Operators
41 42 43 |
# File 'lib/spark/rdd.rb', line 41 def +(other) self.union(other) end |
#add_command(klass, *args) ⇒ Object
Commad and serializer
49 50 51 |
# File 'lib/spark/rdd.rb', line 49 def add_command(klass, *args) @command.deep_copy.add_command(klass, *args) end |
#add_library(*libraries) ⇒ Object Also known as: addLibrary
Add ruby library Libraries will be included before computing
Example:
rdd.add_library('pry').add_library('nio4r', 'distribution')
59 60 61 62 |
# File 'lib/spark/rdd.rb', line 59 def add_library(*libraries) @command.add_library(*libraries) self end |
#aggregate(zero_value, seq_op, comb_op) ⇒ Object
Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral “zero value”.
This function can return a different result type. We need one operation for merging.
Result must be an Array otherwise Serializer Array’s zero value will be send as multiple values and not just one.
Example:
# 1 2 3 4 5 => 15 + 1 = 16
# 6 7 8 9 10 => 40 + 1 = 41
# 16 * 41 = 656
seq = lambda{|x,y| x+y}
com = lambda{|x,y| x*y}
rdd = $sc.parallelize(1..10, 2, batch_size: 1)
rdd.aggregate(1, seq, com)
# => 656
300 301 302 |
# File 'lib/spark/rdd.rb', line 300 def aggregate(zero_value, seq_op, comb_op) _reduce(Spark::Command::Aggregate, seq_op, comb_op, zero_value) end |
#aggregate_by_key(zero_value, seq_func, comb_func, num_partitions = nil) ⇒ Object Also known as: aggregateByKey
Aggregate the values of each key, using given combine functions and a neutral zero value.
Example:
def combine(x,y)
x+y
end
def merge(x,y)
x*y
end
rdd = $sc.parallelize([["a", 1], ["b", 2], ["a", 3], ["a", 4], ["c", 5]], 2, batch_size: 1)
rdd.aggregate_by_key(1, method(:combine), method(:merge))
# => [["b", 3], ["a", 16], ["c", 6]]
980 981 982 983 984 985 986 |
# File 'lib/spark/rdd.rb', line 980 def aggregate_by_key(zero_value, seq_func, comb_func, num_partitions=nil) _combine_by_key( [Spark::Command::CombineByKey::CombineWithZero, zero_value, seq_func], [Spark::Command::CombineByKey::Merge, comb_func], num_partitions ) end |
#bind(objects) ⇒ Object
Bind object to RDD
Example:
text = "test"
rdd = $sc.parallelize(0..5)
rdd = rdd.map(lambda{|x| x.to_s + " " + text})
rdd = rdd.bind(text: text)
rdd.collect
# => ["0 test", "1 test", "2 test", "3 test", "4 test", "5 test"]
76 77 78 79 80 81 82 83 |
# File 'lib/spark/rdd.rb', line 76 def bind(objects) unless objects.is_a?(Hash) raise ArgumentError, 'Argument must be a Hash.' end @command.bind(objects) self end |
#cache ⇒ Object
Persist this RDD with the default storage level MEMORY_ONLY_SER because of serialization.
113 114 115 |
# File 'lib/spark/rdd.rb', line 113 def cache persist('memory_only_ser') end |
#cached? ⇒ Boolean
140 141 142 |
# File 'lib/spark/rdd.rb', line 140 def cached? @cached end |
#cartesian(other) ⇒ Object
Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements ‘(a, b)` where `a` is in `self` and `b` is in `other`.
Example:
rdd1 = $sc.parallelize([1,2,3])
rdd2 = $sc.parallelize([4,5,6])
rdd1.cartesian(rdd2).collect
# => [[1, 4], [1, 5], [1, 6], [2, 4], [2, 5], [2, 6], [3, 4], [3, 5], [3, 6]]
657 658 659 660 661 |
# File 'lib/spark/rdd.rb', line 657 def cartesian(other) _deserializer = Spark::Serializer::Cartesian.new.set(self.deserializer, other.deserializer) new_jrdd = jrdd.cartesian(other.jrdd) RDD.new(new_jrdd, context, serializer, _deserializer) end |
#checkpointed? ⇒ Boolean
144 145 146 |
# File 'lib/spark/rdd.rb', line 144 def checkpointed? @checkpointed end |
#coalesce(num_partitions) ⇒ Object
Return a new RDD that is reduced into num_partitions partitions.
Example:
rdd = $sc.parallelize(0..10, 3)
rdd.coalesce(2).glom.collect
# => [[0, 1, 2], [3, 4, 5, 6, 7, 8, 9, 10]]
641 642 643 644 |
# File 'lib/spark/rdd.rb', line 641 def coalesce(num_partitions) new_jrdd = jrdd.coalesce(num_partitions) RDD.new(new_jrdd, context, @command.serializer, @command.deserializer) end |
#cogroup(*others) ⇒ Object
For each key k in ‘this` or `other`, return a resulting RDD that contains a tuple with the list of values for that key in `this` as well as `other`.
Example:
rdd1 = $sc.parallelize([["a", 1], ["a", 2], ["b", 3]])
rdd2 = $sc.parallelize([["a", 4], ["a", 5], ["b", 6]])
rdd3 = $sc.parallelize([["a", 7], ["a", 8], ["b", 9]])
rdd1.cogroup(rdd2, rdd3).collect
# => [["a", [1, 2, 4, 5, 7, 8]], ["b", [3, 6, 9]]]
1011 1012 1013 1014 1015 1016 1017 1018 |
# File 'lib/spark/rdd.rb', line 1011 def cogroup(*others) unioned = self others.each do |other| unioned = unioned.union(other) end unioned.group_by_key end |
#collect ⇒ Object
Return an array that contains all of the elements in this RDD. RJB raise an error if stage is killed.
172 173 174 175 176 |
# File 'lib/spark/rdd.rb', line 172 def collect collect_from_iterator(jrdd.collect.iterator) rescue => e raise Spark::RDDError, e. end |
#collect_as_hash ⇒ Object
Convert an Array to Hash
190 191 192 |
# File 'lib/spark/rdd.rb', line 190 def collect_as_hash Hash[collect] end |
#collect_from_iterator(iterator) ⇒ Object
178 179 180 181 182 183 184 185 186 |
# File 'lib/spark/rdd.rb', line 178 def collect_from_iterator(iterator) if self.is_a?(PipelinedRDD) klass = @command.serializer else klass = @command.deserializer end klass.load_from_iterator(iterator) end |
#combine_by_key(create_combiner, merge_value, merge_combiners, num_partitions = nil) ⇒ Object Also known as: combineByKey
Generic function to combine the elements for each key using a custom set of aggregation functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a “combined type” C * Note that V and C can be different – for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, List). Users provide three functions:
Parameters:
- create_combiner
-
which turns a V into a C (e.g., creates a one-element list)
- merge_value
-
to merge a V into a C (e.g., adds it to the end of a list)
- merge_combiners
-
to combine two C’s into a single one.
Example:
def combiner(x)
x
end
def merge(x,y)
x+y
end
rdd = $sc.parallelize(["a","b","c","a","b","c","a","c"], 2, batch_size: 1).map(lambda{|x| [x, 1]})
rdd.combine_by_key(method(:combiner), method(:merge), method(:merge)).collect_as_hash
# => {"a"=>3, "b"=>2, "c"=>3}
913 914 915 916 917 918 919 |
# File 'lib/spark/rdd.rb', line 913 def combine_by_key(create_combiner, merge_value, merge_combiners, num_partitions=nil) _combine_by_key( [Spark::Command::CombineByKey::Combine, create_combiner, merge_value], [Spark::Command::CombineByKey::Merge, merge_combiners], num_partitions ) end |
#compact ⇒ Object
Return a new RDD containing non-nil elements.
Example:
rdd = $sc.parallelize([1, nil, 2, nil, 3])
rdd.compact.collect
# => [1, 2, 3]
619 620 621 |
# File 'lib/spark/rdd.rb', line 619 def compact new_rdd_from_command(Spark::Command::Compact) end |
#config ⇒ Object
Variables and non-computing functions
94 95 96 |
# File 'lib/spark/rdd.rb', line 94 def config @context.config end |
#count ⇒ Object
Return the number of values in this RDD
Example:
rdd = $sc.parallelize(0..10)
rdd.count
# => 11
344 345 346 347 348 |
# File 'lib/spark/rdd.rb', line 344 def count # nil is for seq_op => it means the all result go directly to one worker for combine @count ||= self.map_partitions('lambda{|iterator| iterator.to_a.size }') .aggregate(0, nil, 'lambda{|sum, item| sum + item }') end |
#default_reduce_partitions ⇒ Object Also known as: defaultReducePartitions
98 99 100 |
# File 'lib/spark/rdd.rb', line 98 def default_reduce_partitions config['spark.default.parallelism'] || partitions_size end |
#distinct ⇒ Object
Return a new RDD containing the distinct elements in this RDD. Ordering is not preserved because of reducing
Example:
rdd = $sc.parallelize([1,1,1,2,3])
rdd.distinct.collect
# => [1, 2, 3]
671 672 673 674 675 |
# File 'lib/spark/rdd.rb', line 671 def distinct self.map('lambda{|x| [x, nil]}') .reduce_by_key('lambda{|x,_| x}') .map('lambda{|x| x[0]}') end |
#filter(f) ⇒ Object
Return a new RDD containing only the elements that satisfy a predicate.
Example:
rdd = $sc.parallelize(0..10)
rdd.filter(lambda{|x| x.even?}).collect
# => [0, 2, 4, 6, 8, 10]
608 609 610 |
# File 'lib/spark/rdd.rb', line 608 def filter(f) new_rdd_from_command(Spark::Command::Filter, f) end |
#first ⇒ Object
Return the first element in this RDD.
Example:
rdd = $sc.parallelize(0..100)
rdd.first
# => 0
248 249 250 |
# File 'lib/spark/rdd.rb', line 248 def first self.take(1)[0] end |
#flat_map(f) ⇒ Object Also known as: flatMap
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
Example:
rdd = $sc.parallelize(0..5)
rdd.flat_map(lambda {|x| [x, 1]}).collect
# => [0, 1, 1, 1, 2, 1, 3, 1, 4, 1, 5, 1]
574 575 576 |
# File 'lib/spark/rdd.rb', line 574 def flat_map(f) new_rdd_from_command(Spark::Command::FlatMap, f) end |
#flat_map_values(f) ⇒ Object
Pass each value in the key-value pair RDD through a flat_map function without changing the keys; this also retains the original RDD’s partitioning.
Example:
rdd = $sc.parallelize([["a", [1,2]], ["b", [3]]])
rdd = rdd.flat_map_values(lambda{|x| x*2})
rdd.collect
# => [["a", 1], ["a", 2], ["a", 1], ["a", 2], ["b", 3], ["b", 3]]
1161 1162 1163 |
# File 'lib/spark/rdd.rb', line 1161 def flat_map_values(f) new_rdd_from_command(Spark::Command::FlatMapValues, f) end |
#fold(zero_value, f) ⇒ Object
Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral “zero value”.
The function f(x, y) is allowed to modify x and return it as its result value to avoid object allocation; however, it should not modify y.
Be careful, zero_values is applied to all stages. See example.
Example:
rdd = $sc.parallelize(0..10, 2)
rdd.fold(1, lambda{|sum, x| sum+x})
# => 58
276 277 278 |
# File 'lib/spark/rdd.rb', line 276 def fold(zero_value, f) self.aggregate(zero_value, f, f) end |
#fold_by_key(zero_value, f, num_partitions = nil) ⇒ Object Also known as: foldByKey
Merge the values for each key using an associative function f and a neutral ‘zero_value` which may be added to the result an arbitrary number of times, and must not change the result (e.g., 0 for addition, or 1 for multiplication.).
Example:
rdd = $sc.parallelize([["a", 1], ["b", 2], ["a", 3], ["a", 4], ["c", 5]])
rdd.fold_by_key(1, lambda{|x,y| x+y})
# => [["a", 9], ["c", 6], ["b", 3]]
961 962 963 |
# File 'lib/spark/rdd.rb', line 961 def fold_by_key(zero_value, f, num_partitions=nil) self.aggregate_by_key(zero_value, f, f, num_partitions) end |
#foreach(f, options = {}) ⇒ Object
Applies a function f to all elements of this RDD.
Example:
rdd = $sc.parallelize(0..5)
rdd.foreach(lambda{|x| puts x})
# => nil
534 535 536 537 |
# File 'lib/spark/rdd.rb', line 534 def foreach(f, ={}) new_rdd_from_command(Spark::Command::Foreach, f).collect nil end |
#foreach_partition(f, options = {}) ⇒ Object Also known as: foreachPartition
Applies a function f to each partition of this RDD.
Example:
rdd = $sc.parallelize(0..5)
rdd.foreachPartition(lambda{|x| puts x.to_s})
# => nil
546 547 548 549 |
# File 'lib/spark/rdd.rb', line 546 def foreach_partition(f, ={}) new_rdd_from_command(Spark::Command::ForeachPartition, f).collect nil end |
#glom ⇒ Object
Return an RDD created by coalescing all elements within each partition into an array.
Example:
rdd = $sc.parallelize(0..10, 3, batch_size: 1)
rdd.glom.collect
# => [[0, 1, 2], [3, 4, 5, 6], [7, 8, 9, 10]]
630 631 632 |
# File 'lib/spark/rdd.rb', line 630 def glom new_rdd_from_command(Spark::Command::Glom) end |
#group_by(f, num_partitions = nil) ⇒ Object Also known as: groupBy
Return an RDD of grouped items.
Example:
rdd = $sc.parallelize(0..5)
rdd.group_by(lambda{|x| x%2}).collect
# => [[0, [0, 2, 4]], [1, [1, 3, 5]]]
928 929 930 |
# File 'lib/spark/rdd.rb', line 928 def group_by(f, num_partitions=nil) self.key_by(f).group_by_key(num_partitions) end |
#group_by_key(num_partitions = nil) ⇒ Object Also known as: groupByKey
Group the values for each key in the RDD into a single sequence. Allows controlling the partitioning of the resulting key-value pair RDD by passing a Partitioner.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduce_by_key or combine_by_key will provide much better performance.
Example:
rdd = $sc.parallelize([["a", 1], ["a", 2], ["b", 3]])
rdd.group_by_key.collect
# => [["a", [1, 2]], ["b", [3]]]
943 944 945 946 947 948 949 |
# File 'lib/spark/rdd.rb', line 943 def group_by_key(num_partitions=nil) create_combiner = 'lambda{|item| [item]}' merge_value = 'lambda{|combiner, item| combiner << item; combiner}' merge_combiners = 'lambda{|combiner_1, combiner_2| combiner_1 += combiner_2; combiner_1}' combine_by_key(create_combiner, merge_value, merge_combiners, num_partitions) end |
#group_with(other, num_partitions = nil) ⇒ Object Also known as: groupWith
The same functionality as cogroup but this can grouped only 2 rdd’s and you can change num_partitions.
Example:
rdd1 = $sc.parallelize([["a", 1], ["a", 2], ["b", 3]])
rdd2 = $sc.parallelize([["a", 4], ["a", 5], ["b", 6]])
rdd1.group_with(rdd2).collect
# => [["a", [1, 2, 4, 5]], ["b", [3, 6]]]
997 998 999 |
# File 'lib/spark/rdd.rb', line 997 def group_with(other, num_partitions=nil) self.union(other).group_by_key(num_partitions) end |
#histogram(buckets) ⇒ Object
Compute a histogram using the provided buckets. The buckets are all open to the right except for the last which is closed. e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50], which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 and 50 we would have a histogram of 1,0,1.
If your histogram is evenly spaced (e.g. [0, 10, 20, 30]), this can be switched from an O(log n) inseration to O(1) per element(where n = # buckets).
Buckets must be sorted and not contain any duplicates, must be at least two elements.
Examples:
rdd = $sc.parallelize(0..50)
rdd.histogram(2)
# => [[0.0, 25.0, 50], [25, 26]]
rdd.histogram([0, 5, 25, 50])
# => [[0, 5, 25, 50], [5, 20, 26]]
rdd.histogram([0, 15, 30, 45, 60])
# => [[0, 15, 30, 45, 60], [15, 15, 15, 6]]
434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 |
# File 'lib/spark/rdd.rb', line 434 def histogram(buckets) # ----------------------------------------------------------------------- # Integer # if buckets.is_a?(Integer) # Validation if buckets < 1 raise ArgumentError, "Bucket count must be >= 1, #{buckets} inserted." end # Filter invalid values # Nil and NaN func = 'lambda{|x| if x.nil? || (x.is_a?(Float) && x.nan?) false else true end }' filtered = self.filter(func) # Compute the minimum and the maximum func = 'lambda{|memo, item| [memo[0] < item[0] ? memo[0] : item[0], memo[1] > item[1] ? memo[1] : item[1]] }' min, max = filtered.map('lambda{|x| [x, x]}').reduce(func) # Min, max must be valid numbers if (min.is_a?(Float) && !min.finite?) || (max.is_a?(Float) && !max.finite?) raise Spark::RDDError, 'Histogram on either an empty RDD or RDD containing +/-infinity or NaN' end # Already finished if min == max || buckets == 1 return [min, max], [filtered.count] end # Custom range begin span = max - min # increment buckets = (0...buckets).map do |x| min + (x * span) / buckets.to_f end buckets << max rescue NoMethodError raise Spark::RDDError, 'Can not generate buckets with non-number in RDD' end even = true # ----------------------------------------------------------------------- # Array # elsif buckets.is_a?(Array) if buckets.size < 2 raise ArgumentError, 'Buckets should have more than one value.' end if buckets.detect{|x| x.nil? || (x.is_a?(Float) && x.nan?)} raise ArgumentError, 'Can not have nil or nan numbers in buckets.' end if buckets.detect{|x| buckets.count(x) > 1} raise ArgumentError, 'Buckets should not contain duplicated values.' end if buckets.sort != buckets raise ArgumentError, 'Buckets must be sorted.' end even = false # ----------------------------------------------------------------------- # Other # else raise Spark::RDDError, 'Buckets should be number or array.' end reduce_func = 'lambda{|memo, item| memo.size.times do |i| memo[i] += item[i] end memo }' return buckets, new_rdd_from_command(Spark::Command::Histogram, even, buckets).reduce(reduce_func) end |
#id ⇒ Object
A unique ID for this RDD (within its SparkContext).
108 109 110 |
# File 'lib/spark/rdd.rb', line 108 def id jrdd.id end |
#intersection(other) ⇒ Object
Return the intersection of this RDD and another one. The output will not contain any duplicate elements, even if the input RDDs did.
Example:
rdd1 = $sc.parallelize([1,2,3,4,5])
rdd2 = $sc.parallelize([1,4,5,6,7])
rdd1.intersection(rdd2).collect
# => [1, 4, 5]
739 740 741 742 743 744 745 746 747 |
# File 'lib/spark/rdd.rb', line 739 def intersection(other) mapping_function = 'lambda{|item| [item, nil]}' filter_function = 'lambda{|(key, values)| values.size > 1}' self.map(mapping_function) .cogroup(other.map(mapping_function)) .filter(filter_function) .keys end |
#key_by(f) ⇒ Object Also known as: keyBy
Creates array of the elements in this RDD by applying function f.
Example:
rdd = $sc.parallelize(0..5)
rdd.key_by(lambda{|x| x%2}).collect
# => [[0, 0], [1, 1], [0, 2], [1, 3], [0, 4], [1, 5]]
1133 1134 1135 |
# File 'lib/spark/rdd.rb', line 1133 def key_by(f) new_rdd_from_command(Spark::Command::KeyBy, f) end |
#keys ⇒ Object
Return an RDD with the first element of PairRDD
Example:
rdd = $sc.parallelize([[1,2], [3,4], [5,6]])
rdd.keys.collect
# => [1, 3, 5]
1172 1173 1174 |
# File 'lib/spark/rdd.rb', line 1172 def keys self.map('lambda{|(key, _)| key}') end |
#map(f) ⇒ Object
Return a new RDD by applying a function to all elements of this RDD.
Example:
rdd = $sc.parallelize(0..5)
rdd.map(lambda {|x| x*2}).collect
# => [0, 2, 4, 6, 8, 10]
562 563 564 |
# File 'lib/spark/rdd.rb', line 562 def map(f) new_rdd_from_command(Spark::Command::Map, f) end |
#map_partitions(f) ⇒ Object Also known as: mapPartitions
Return a new RDD by applying a function to each partition of this RDD.
Example:
rdd = $sc.parallelize(0..10, 2)
rdd.map_partitions(lambda{|part| part.reduce(:+)}).collect
# => [15, 40]
585 586 587 |
# File 'lib/spark/rdd.rb', line 585 def map_partitions(f) new_rdd_from_command(Spark::Command::MapPartitions, f) end |
#map_partitions_with_index(f, options = {}) ⇒ Object Also known as: mapPartitionsWithIndex
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
Example:
rdd = $sc.parallelize(0...4, 4, batch_size: 1)
rdd.map_partitions_with_index(lambda{|part, index| part.first * index}).collect
# => [0, 1, 4, 9]
597 598 599 |
# File 'lib/spark/rdd.rb', line 597 def map_partitions_with_index(f, ={}) new_rdd_from_command(Spark::Command::MapPartitionsWithIndex, f) end |
#map_values(f) ⇒ Object Also known as: mapValues
Pass each value in the key-value pair RDD through a map function without changing the keys. This also retains the original RDD’s partitioning.
Example:
rdd = $sc.parallelize(["ruby", "scala", "java"])
rdd = rdd.map(lambda{|x| [x, x]})
rdd = rdd.map_values(lambda{|x| x.upcase})
rdd.collect
# => [["ruby", "RUBY"], ["scala", "SCALA"], ["java", "JAVA"]]
1147 1148 1149 |
# File 'lib/spark/rdd.rb', line 1147 def map_values(f) new_rdd_from_command(Spark::Command::MapValues, f) end |
#max ⇒ Object
Return the max of this RDD
Example:
rdd = $sc.parallelize(0..10)
rdd.max
# => 10
311 312 313 |
# File 'lib/spark/rdd.rb', line 311 def max self.reduce('lambda{|memo, item| memo > item ? memo : item }') end |
#mean ⇒ Object
Compute the mean of this RDD’s elements.
Example:
$sc.parallelize([1, 2, 3]).mean
# => 2.0
362 363 364 |
# File 'lib/spark/rdd.rb', line 362 def mean stats.mean end |
#min ⇒ Object
Return the min of this RDD
Example:
rdd = $sc.parallelize(0..10)
rdd.min
# => 0
322 323 324 |
# File 'lib/spark/rdd.rb', line 322 def min self.reduce('lambda{|memo, item| memo < item ? memo : item }') end |
#name ⇒ Object
Return the name of this RDD.
150 151 152 153 |
# File 'lib/spark/rdd.rb', line 150 def name _name = jrdd.name _name && _name.encode(Encoding::UTF_8) end |
#new_rdd_from_command(klass, *args) ⇒ Object
85 86 87 88 |
# File 'lib/spark/rdd.rb', line 85 def new_rdd_from_command(klass, *args) comm = add_command(klass, *args) PipelinedRDD.new(self, comm) end |
#partition_by(num_partitions, partition_func = nil) ⇒ Object Also known as: partitionBy
Return a copy of the RDD partitioned using the specified partitioner.
Example:
rdd = $sc.parallelize(["1","2","3","4","5"]).map(lambda {|x| [x, 1]})
rdd.partitionBy(2).glom.collect
# => [[["3", 1], ["4", 1]], [["1", 1], ["2", 1], ["5", 1]]]
756 757 758 759 760 761 |
# File 'lib/spark/rdd.rb', line 756 def partition_by(num_partitions, partition_func=nil) num_partitions ||= default_reduce_partitions partition_func ||= 'lambda{|x| Spark::Digest.portable_hash(x.to_s)}' _partition_by(num_partitions, Spark::Command::PartitionBy::Basic, partition_func) end |
#partitions_size ⇒ Object Also known as: partitionsSize
Count of ParallelCollectionPartition
103 104 105 |
# File 'lib/spark/rdd.rb', line 103 def partitions_size jrdd.rdd.partitions.size end |
#persist(new_level) ⇒ Object
Set this RDD’s storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet.
See StorageLevel for type of new_level
123 124 125 126 127 |
# File 'lib/spark/rdd.rb', line 123 def persist(new_level) @cached = true jrdd.persist(Spark::StorageLevel.java_get(new_level)) self end |
#pipe(*cmds) ⇒ Object
Return an RDD created by piping elements to a forked external process.
Cmds:
cmd = [env,] command... [,options]
env: hash
name => val : set the environment variable
name => nil : unset the environment variable
command...:
commandline : command line string which is passed to the standard shell
cmdname, arg1, ... : command name and one or more arguments (This form does
not use the shell. See below for caveats.)
[cmdname, argv0], arg1, ... : command name, argv[0] and zero or more arguments (no shell)
options: hash
See http://ruby-doc.org/core-2.2.0/Process.html#method-c-spawn
Examples:
$sc.parallelize(0..5).pipe('cat').collect
# => ["0", "1", "2", "3", "4", "5"]
rdd = $sc.parallelize(0..5)
rdd = rdd.pipe('cat', "awk '{print $1*10}'")
rdd = rdd.map(lambda{|x| x.to_i + 1})
rdd.collect
# => [1, 11, 21, 31, 41, 51]
867 868 869 |
# File 'lib/spark/rdd.rb', line 867 def pipe(*cmds) new_rdd_from_command(Spark::Command::Pipe, cmds) end |
#reduce(f) ⇒ Object
Reduces the elements of this RDD using the specified lambda or method.
Example:
rdd = $sc.parallelize(0..10)
rdd.reduce(lambda{|sum, x| sum+x})
# => 55
259 260 261 |
# File 'lib/spark/rdd.rb', line 259 def reduce(f) _reduce(Spark::Command::Reduce, f, f) end |
#reduce_by_key(f, num_partitions = nil) ⇒ Object Also known as: reduceByKey
Merge the values for each key using an associative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce. Output will be hash-partitioned with the existing partitioner/ parallelism level.
Example:
rdd = $sc.parallelize(["a","b","c","a","b","c","a","c"]).map(lambda{|x| [x, 1]})
rdd.reduce_by_key(lambda{|x,y| x+y}).collect_as_hash
# => {"a"=>3, "b"=>2, "c"=>3}
885 886 887 |
# File 'lib/spark/rdd.rb', line 885 def reduce_by_key(f, num_partitions=nil) combine_by_key('lambda {|x| x}', f, f, num_partitions) end |
#reserialize(new_serializer, new_batch_size = nil) ⇒ Object
Return a new RDD with different serializer. This method is useful during union and join operations.
Example:
rdd = $sc.parallelize([1, 2, 3], nil, serializer: "marshal")
rdd = rdd.map(lambda{|x| x.to_s})
rdd.reserialize("oj").collect
# => ["1", "2", "3"]
716 717 718 719 720 721 722 723 724 725 726 727 728 |
# File 'lib/spark/rdd.rb', line 716 def reserialize(new_serializer, new_batch_size=nil) new_batch_size ||= deserializer.batch_size new_serializer = Spark::Serializer.get!(new_serializer).new(new_batch_size) if serializer == new_serializer return self end new_command = @command.deep_copy new_command.serializer = new_serializer PipelinedRDD.new(self, new_command) end |
#sample(with_replacement, fraction, seed = nil) ⇒ Object
Return a sampled subset of this RDD. Operations are base on Poisson and Uniform distributions. TODO: Replace Unfirom for Bernoulli
Examples:
rdd = $sc.parallelize(0..100)
rdd.sample(true, 10).collect
# => [17, 17, 22, 23, 51, 52, 62, 64, 69, 70, 96]
rdd.sample(false, 0.1).collect
# => [3, 5, 9, 32, 44, 55, 66, 68, 75, 80, 86, 91, 98]
776 777 778 |
# File 'lib/spark/rdd.rb', line 776 def sample(with_replacement, fraction, seed=nil) new_rdd_from_command(Spark::Command::Sample, with_replacement, fraction, seed) end |
#sample_stdev ⇒ Object Also known as: sampleStdev
Compute the sample standard deviation of this RDD’s elements (which corrects for bias in estimating the standard deviation by dividing by N-1 instead of N).
Example:
$sc.parallelize([1, 2, 3]).sample_stdev
# => 1.0
394 395 396 |
# File 'lib/spark/rdd.rb', line 394 def sample_stdev stats.sample_stdev end |
#sample_variance ⇒ Object Also known as: sampleVariance
Compute the sample variance of this RDD’s elements (which corrects for bias in estimating the variance by dividing by N-1 instead of N).
Example:
$sc.parallelize([1, 2, 3]).sample_variance
# => 1.0
405 406 407 |
# File 'lib/spark/rdd.rb', line 405 def sample_variance stats.sample_variance end |
#set_name(name) ⇒ Object Also known as: setName
Assign a name to this RDD.
157 158 159 |
# File 'lib/spark/rdd.rb', line 157 def set_name(name) jrdd.setName(name) end |
#shuffle(seed = nil) ⇒ Object
Return a shuffled RDD.
Example:
rdd = $sc.parallelize(0..10)
rdd.shuffle.collect
# => [3, 10, 6, 7, 8, 0, 4, 2, 9, 1, 5]
684 685 686 687 688 |
# File 'lib/spark/rdd.rb', line 684 def shuffle(seed=nil) seed ||= Random.new_seed new_rdd_from_command(Spark::Command::Shuffle, seed) end |
#sort_by(key_function = nil, ascending = true, num_partitions = nil) ⇒ Object Also known as: sortBy
Sorts this RDD by the given key_function
This is a different implementation than spark. Sort by doesn’t use key_by method first. It can be slower but take less memory and you can always use map.sort_by_key
Example:
rdd = $sc.parallelize(["aaaaaaa", "cc", "b", "eeee", "ddd"])
rdd.sort_by.collect
# => ["aaaaaaa", "b", "cc", "ddd", "eeee"]
rdd.sort_by(lambda{|x| x.size}).collect
# => ["b", "cc", "ddd", "eeee", "aaaaaaa"]
1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 |
# File 'lib/spark/rdd.rb', line 1082 def sort_by(key_function=nil, ascending=true, num_partitions=nil) key_function ||= 'lambda{|x| x}' num_partitions ||= default_reduce_partitions command_klass = Spark::Command::SortByKey # Allow spill data to disk due to memory limit # spilling = config['spark.shuffle.spill'] || false spilling = false memory = '' # Set spilling to false if worker has unlimited memory if memory.empty? spilling = false memory = nil else memory = to_memory_size(memory) end # Sorting should do one worker if num_partitions == 1 rdd = self rdd = rdd.coalesce(1) if partitions_size > 1 return rdd.new_rdd_from_command(command_klass, key_function, ascending, spilling, memory, serializer) end # Compute boundary of collection # Collection should be evenly distributed # 20.0 is from scala RangePartitioner (for roughly balanced output partitions) count = self.count sample_size = num_partitions * 20.0 fraction = [sample_size / [count, 1].max, 1.0].min samples = self.sample(false, fraction, 1).map(key_function).collect samples.sort! # Reverse is much faster than reverse sort_by samples.reverse! if !ascending # Determine part bounds bounds = determine_bounds(samples, num_partitions) shuffled = _partition_by(num_partitions, Spark::Command::PartitionBy::Sorting, key_function, bounds, ascending, num_partitions) shuffled.new_rdd_from_command(command_klass, key_function, ascending, spilling, memory, serializer) end |
#sort_by_key(ascending = true, num_partitions = nil) ⇒ Object Also known as: sortByKey
Sort the RDD by key
Example:
rdd = $sc.parallelize([["c", 1], ["b", 2], ["a", 3]])
rdd.sort_by_key.collect
# => [["a", 3], ["b", 2], ["c", 1]]
1063 1064 1065 |
# File 'lib/spark/rdd.rb', line 1063 def sort_by_key(ascending=true, num_partitions=nil) self.sort_by('lambda{|(key, _)| key}') end |
#stats ⇒ Object
Return a StatCounter object that captures the mean, variance and count of the RDD’s elements in one operation.
352 353 354 |
# File 'lib/spark/rdd.rb', line 352 def stats @stats ||= new_rdd_from_command(Spark::Command::Stats).reduce('lambda{|memo, item| memo.merge(item)}') end |
#stdev ⇒ Object
Compute the standard deviation of this RDD’s elements.
Example:
$sc.parallelize([1, 2, 3]).stdev
# => 0.816...
382 383 384 |
# File 'lib/spark/rdd.rb', line 382 def stdev stats.stdev end |
#subtract(other, num_partitions = nil) ⇒ Object
Return an RDD with the elements from self that are not in other.
Example:
rdd1 = $sc.parallelize([["a", 1], ["a", 2], ["b", 3], ["c", 4]])
rdd2 = $sc.parallelize([["a", 2], ["c", 6]])
rdd1.subtract(rdd2).collect
# => [["a", 1], ["b", 3], ["c", 4]]
1048 1049 1050 1051 1052 1053 1054 |
# File 'lib/spark/rdd.rb', line 1048 def subtract(other, num_partitions=nil) mapping_function = 'lambda{|x| [x,nil]}' self.map(mapping_function) .subtract_by_key(other.map(mapping_function), num_partitions) .keys end |
#subtract_by_key(other, num_partitions = nil) ⇒ Object Also known as: subtractByKey
Return each (key, value) pair in self RDD that has no pair with matching key in other RDD.
Example:
rdd1 = $sc.parallelize([["a", 1], ["a", 2], ["b", 3], ["c", 4]])
rdd2 = $sc.parallelize([["b", 5], ["c", 6]])
rdd1.subtract_by_key(rdd2).collect
# => [["a", 1], ["a", 2]]
1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 |
# File 'lib/spark/rdd.rb', line 1029 def subtract_by_key(other, num_partitions=nil) create_combiner = 'lambda{|item| [[item]]}' merge_value = 'lambda{|combiner, item| combiner.first << item; combiner}' merge_combiners = 'lambda{|combiner_1, combiner_2| combiner_1 += combiner_2; combiner_1}' self.union(other) .combine_by_key(create_combiner, merge_value, merge_combiners, num_partitions) .filter('lambda{|(key,values)| values.size == 1}') .flat_map_values('lambda{|item| item.first}') end |
#sum ⇒ Object
Return the sum of this RDD
Example:
rdd = $sc.parallelize(0..10)
rdd.sum
# => 55
333 334 335 |
# File 'lib/spark/rdd.rb', line 333 def sum self.reduce('lambda{|sum, item| sum + item}') end |
#take(count) ⇒ Object
Take the first num elements of the RDD.
It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.
Example:
rdd = $sc.parallelize(0..100, 20, batch_size: 1)
rdd.take(5)
# => [0, 1, 2, 3, 4]
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/spark/rdd.rb', line 205 def take(count) buffer = [] parts_count = self.partitions_size # No parts was scanned, yet last_scanned = -1 while buffer.empty? last_scanned += 1 buffer += context.run_job_with_command(self, [last_scanned], true, Spark::Command::Take, 0, -1) end # Assumption. Depend on batch_size and how Spark divided data. items_per_part = buffer.size left = count - buffer.size while left > 0 && last_scanned < parts_count parts_to_take = (left.to_f/items_per_part).ceil parts_for_scanned = Array.new(parts_to_take) do last_scanned += 1 end # We cannot take exact number of items because workers are isolated from each other. # => once you take e.g. 50% from last part and left is still > 0 then its very # difficult merge new items items = context.run_job_with_command(self, parts_for_scanned, true, Spark::Command::Take, left, last_scanned) buffer += items left = count - buffer.size # Average size of all parts items_per_part = [items_per_part, items.size].reduce(0){|sum, x| sum + x.to_f/2} end buffer.slice!(0, count) end |
#take_sample(with_replacement, num, seed = nil) ⇒ Object Also known as: takeSample
Return a fixed-size sampled subset of this RDD in an array
Examples:
rdd = $sc.parallelize(0..100)
rdd.take_sample(true, 10)
# => [90, 84, 74, 44, 27, 22, 72, 96, 80, 54]
rdd.take_sample(false, 10)
# => [5, 35, 30, 48, 22, 33, 40, 75, 42, 32]
791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 |
# File 'lib/spark/rdd.rb', line 791 def take_sample(with_replacement, num, seed=nil) if num < 0 raise Spark::RDDError, 'Size have to be greater than 0' elsif num == 0 return [] end # Taken from scala num_st_dev = 10.0 # Number of items initial_count = self.count return [] if initial_count == 0 # Create new generator seed ||= Random.new_seed rng = Random.new(seed) # Shuffle elements if requested num if greater than array size if !with_replacement && num >= initial_count return self.shuffle(seed).collect end # Max num max_sample_size = Integer::MAX - (num_st_dev * Math.sqrt(Integer::MAX)).to_i if num > max_sample_size raise Spark::RDDError, "Size can not be greate than #{max_sample_size}" end # Approximate fraction with tolerance fraction = compute_fraction(num, initial_count, with_replacement) # Compute first samled subset samples = self.sample(with_replacement, fraction, seed).collect # If the first sample didn't turn out large enough, keep trying to take samples; # this shouldn't happen often because we use a big multiplier for their initial size. index = 0 while samples.size < num log_warning("Needed to re-sample due to insufficient sample size. Repeat #{index}") samples = self.sample(with_replacement, fraction, rng.rand(0..Integer::MAX)).collect index += 1 end samples.shuffle!(random: rng) samples[0, num] end |
#to_java ⇒ Object
161 162 163 164 |
# File 'lib/spark/rdd.rb', line 161 def to_java rdd = self.reserialize('Marshal') RubyRDD.toJava(rdd.jrdd, rdd.serializer.batched?) end |
#union(other) ⇒ Object
Return the union of this RDD and another one. Any identical elements will appear multiple times (use .distinct to eliminate them).
Example:
rdd = $sc.parallelize([1, 2, 3])
rdd.union(rdd).collect
# => [1, 2, 3, 1, 2, 3]
698 699 700 701 702 703 704 705 |
# File 'lib/spark/rdd.rb', line 698 def union(other) if self.serializer != other.serializer other = other.reserialize(serializer.name, serializer.batch_size) end new_jrdd = jrdd.union(other.jrdd) RDD.new(new_jrdd, context, serializer, deserializer) end |
#unpersist(blocking = true) ⇒ Object
Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
Parameters:
- blocking
-
whether to block until all blocks are deleted.
134 135 136 137 138 |
# File 'lib/spark/rdd.rb', line 134 def unpersist(blocking=true) @cached = false jrdd.unpersist(blocking) self end |
#values ⇒ Object
Return an RDD with the second element of PairRDD
Example:
rdd = $sc.parallelize([[1,2], [3,4], [5,6]])
rdd.keys.collect
# => [2, 4, 6]
1183 1184 1185 |
# File 'lib/spark/rdd.rb', line 1183 def values self.map('lambda{|(_, value)| value}') end |
#variance ⇒ Object
Compute the variance of this RDD’s elements.
Example:
$sc.parallelize([1, 2, 3]).variance
# => 0.666...
372 373 374 |
# File 'lib/spark/rdd.rb', line 372 def variance stats.variance end |