Class: Spark::RDD
- Inherits:
-
Object
- Object
- Spark::RDD
- Extended by:
- Forwardable
- Includes:
- Helper::Logger, Helper::Parser, Helper::Statistic
- Defined in:
- lib/spark/rdd.rb
Overview
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel. This class contains the basic operations available on all RDDs, such as ‘map`, `filter`, and `persist`.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#command ⇒ Object
readonly
Returns the value of attribute command.
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#jrdd ⇒ Object
readonly
Returns the value of attribute jrdd.
Instance Method Summary collapse
-
#+(other) ⇒ Object
Operators.
-
#add_command(klass, *args) ⇒ Object
Commad and serializer.
-
#add_library(*libraries) ⇒ Object
(also: #addLibrary, #require)
Add ruby library Libraries will be included before computing.
-
#aggregate(zero_value, seq_op, comb_op) ⇒ Object
Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral “zero value”.
-
#aggregate_by_key(zero_value, seq_func, comb_func, num_partitions = nil) ⇒ Object
(also: #aggregateByKey)
Aggregate the values of each key, using given combine functions and a neutral zero value.
-
#bind(objects) ⇒ Object
Bind object to RDD.
-
#cache ⇒ Object
Persist this RDD with the default storage level MEMORY_ONLY_SER because of serialization.
- #cached? ⇒ Boolean
-
#cartesian(other) ⇒ Object
Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements ‘(a, b)` where `a` is in `self` and `b` is in `other`.
- #checkpointed? ⇒ Boolean
-
#coalesce(num_partitions) ⇒ Object
Return a new RDD that is reduced into num_partitions partitions.
-
#cogroup(*others) ⇒ Object
For each key k in ‘this` or `other`, return a resulting RDD that contains a tuple with the list of values for that key in `this` as well as `other`.
-
#collect(as_enum = false) ⇒ Object
Return an array that contains all of the elements in this RDD.
-
#collect_as_hash ⇒ Object
Convert an Array to Hash.
- #collect_from_file(file, as_enum = false) ⇒ Object
-
#combine_by_key(create_combiner, merge_value, merge_combiners, num_partitions = nil) ⇒ Object
(also: #combineByKey)
Generic function to combine the elements for each key using a custom set of aggregation functions.
-
#compact ⇒ Object
Return a new RDD containing non-nil elements.
-
#config ⇒ Object
Variables and non-computing functions.
-
#count ⇒ Object
Return the number of values in this RDD.
- #default_reduce_partitions ⇒ Object (also: #defaultReducePartitions)
-
#distinct ⇒ Object
Return a new RDD containing the distinct elements in this RDD.
-
#filter(f) ⇒ Object
Return a new RDD containing only the elements that satisfy a predicate.
-
#first ⇒ Object
Return the first element in this RDD.
-
#flat_map(f) ⇒ Object
(also: #flatMap)
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
-
#flat_map_values(f) ⇒ Object
Pass each value in the key-value pair RDD through a flat_map function without changing the keys; this also retains the original RDD’s partitioning.
-
#fold(zero_value, f) ⇒ Object
Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral “zero value”.
-
#fold_by_key(zero_value, f, num_partitions = nil) ⇒ Object
(also: #foldByKey)
Merge the values for each key using an associative function f and a neutral ‘zero_value` which may be added to the result an arbitrary number of times, and must not change the result (e.g., 0 for addition, or 1 for multiplication.).
-
#foreach(f, options = {}) ⇒ Object
Applies a function f to all elements of this RDD.
-
#foreach_partition(f, options = {}) ⇒ Object
(also: #foreachPartition)
Applies a function f to each partition of this RDD.
-
#glom ⇒ Object
Return an RDD created by coalescing all elements within each partition into an array.
-
#group_by(f, num_partitions = nil) ⇒ Object
(also: #groupBy)
Return an RDD of grouped items.
-
#group_by_key(num_partitions = nil) ⇒ Object
(also: #groupByKey)
Group the values for each key in the RDD into a single sequence.
-
#group_with(other, num_partitions = nil) ⇒ Object
(also: #groupWith)
The same functionality as cogroup but this can grouped only 2 rdd’s and you can change num_partitions.
-
#histogram(buckets) ⇒ Object
Compute a histogram using the provided buckets.
-
#id ⇒ Object
A unique ID for this RDD (within its SparkContext).
-
#initialize(jrdd, context, serializer, deserializer = nil) ⇒ RDD
constructor
Initializing RDD, this method is root of all Pipelined RDD - its unique If you call some operations on this class it will be computed in Java.
- #inspect ⇒ Object
-
#intersection(other) ⇒ Object
Return the intersection of this RDD and another one.
-
#key_by(f) ⇒ Object
(also: #keyBy)
Creates array of the elements in this RDD by applying function f.
-
#keys ⇒ Object
Return an RDD with the first element of PairRDD.
-
#map(f) ⇒ Object
Return a new RDD by applying a function to all elements of this RDD.
-
#map_partitions(f) ⇒ Object
(also: #mapPartitions)
Return a new RDD by applying a function to each partition of this RDD.
-
#map_partitions_with_index(f, options = {}) ⇒ Object
(also: #mapPartitionsWithIndex)
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
-
#map_values(f) ⇒ Object
(also: #mapValues)
Pass each value in the key-value pair RDD through a map function without changing the keys.
-
#max ⇒ Object
Return the max of this RDD.
-
#mean ⇒ Object
Compute the mean of this RDD’s elements.
-
#min ⇒ Object
Return the min of this RDD.
-
#name ⇒ Object
Return the name of this RDD.
- #new_rdd_from_command(klass, *args) ⇒ Object
-
#partition_by(num_partitions, partition_func = nil) ⇒ Object
(also: #partitionBy)
Return a copy of the RDD partitioned using the specified partitioner.
-
#partitions_size ⇒ Object
(also: #partitionsSize)
Count of ParallelCollectionPartition.
-
#persist(new_level) ⇒ Object
Set this RDD’s storage level to persist its values across operations after the first time it is computed.
-
#pipe(*cmds) ⇒ Object
Return an RDD created by piping elements to a forked external process.
-
#reduce(f) ⇒ Object
Reduces the elements of this RDD using the specified lambda or method.
-
#reduce_by_key(f, num_partitions = nil) ⇒ Object
(also: #reduceByKey)
Merge the values for each key using an associative reduce function.
-
#reserialize(new_serializer) ⇒ Object
Return a new RDD with different serializer.
-
#sample(with_replacement, fraction, seed = nil) ⇒ Object
Return a sampled subset of this RDD.
-
#sample_stdev ⇒ Object
(also: #sampleStdev)
Compute the sample standard deviation of this RDD’s elements (which corrects for bias in estimating the standard deviation by dividing by N-1 instead of N).
-
#sample_variance ⇒ Object
(also: #sampleVariance)
Compute the sample variance of this RDD’s elements (which corrects for bias in estimating the variance by dividing by N-1 instead of N).
-
#set_name(name) ⇒ Object
(also: #setName)
Assign a name to this RDD.
-
#shuffle(seed = nil) ⇒ Object
Return a shuffled RDD.
-
#sort_by(key_function = nil, ascending = true, num_partitions = nil) ⇒ Object
(also: #sortBy)
Sorts this RDD by the given key_function.
-
#sort_by_key(ascending = true, num_partitions = nil) ⇒ Object
(also: #sortByKey)
Sort the RDD by key.
-
#sort_by_value(ascending = true, num_partitions = nil) ⇒ Object
Sort the RDD by value.
-
#stats ⇒ Object
Return a StatCounter object that captures the mean, variance and count of the RDD’s elements in one operation.
-
#stdev ⇒ Object
Compute the standard deviation of this RDD’s elements.
-
#subtract(other, num_partitions = nil) ⇒ Object
Return an RDD with the elements from self that are not in other.
-
#subtract_by_key(other, num_partitions = nil) ⇒ Object
(also: #subtractByKey)
Return each (key, value) pair in self RDD that has no pair with matching key in other RDD.
-
#sum ⇒ Object
Return the sum of this RDD.
-
#take(count) ⇒ Object
Take the first num elements of the RDD.
-
#take_sample(with_replacement, num, seed = nil) ⇒ Object
(also: #takeSample)
Return a fixed-size sampled subset of this RDD in an array.
- #to_java ⇒ Object
-
#union(other) ⇒ Object
Return the union of this RDD and another one.
-
#unpersist(blocking = true) ⇒ Object
Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
-
#values ⇒ Object
Return an RDD with the second element of PairRDD.
-
#variance ⇒ Object
Compute the variance of this RDD’s elements.
Methods included from Helper::Statistic
#bisect_right, #compute_fraction, #determine_bounds, #upper_binomial_bound, #upper_poisson_bound
Methods included from Helper::Parser
Methods included from Helper::Logger
Constructor Details
#initialize(jrdd, context, serializer, deserializer = nil) ⇒ RDD
Initializing RDD, this method is root of all Pipelined RDD - its unique If you call some operations on this class it will be computed in Java
Parameters:
- jrdd
-
org.apache.spark.api.java.JavaRDD
- context
- serializer
27 28 29 30 31 32 33 34 35 |
# File 'lib/spark/rdd.rb', line 27 def initialize(jrdd, context, serializer, deserializer=nil) @jrdd = jrdd @context = context @cached = false @checkpointed = false @command = Spark::CommandBuilder.new(serializer, deserializer) end |
Instance Attribute Details
#command ⇒ Object (readonly)
Returns the value of attribute command.
11 12 13 |
# File 'lib/spark/rdd.rb', line 11 def command @command end |
#context ⇒ Object (readonly)
Returns the value of attribute context.
11 12 13 |
# File 'lib/spark/rdd.rb', line 11 def context @context end |
#jrdd ⇒ Object (readonly)
Returns the value of attribute jrdd.
11 12 13 |
# File 'lib/spark/rdd.rb', line 11 def jrdd @jrdd end |
Instance Method Details
#+(other) ⇒ Object
Operators
53 54 55 |
# File 'lib/spark/rdd.rb', line 53 def +(other) self.union(other) end |
#add_command(klass, *args) ⇒ Object
Commad and serializer
61 62 63 |
# File 'lib/spark/rdd.rb', line 61 def add_command(klass, *args) @command.deep_copy.add_command(klass, *args) end |
#add_library(*libraries) ⇒ Object Also known as: addLibrary, require
Add ruby library Libraries will be included before computing
Example:
rdd.add_library('pry').add_library('nio4r', 'distribution')
71 72 73 74 |
# File 'lib/spark/rdd.rb', line 71 def add_library(*libraries) @command.add_library(*libraries) self end |
#aggregate(zero_value, seq_op, comb_op) ⇒ Object
Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral “zero value”.
This function can return a different result type. We need one operation for merging.
Result must be an Array otherwise Serializer Array’s zero value will be send as multiple values and not just one.
Example:
# 1 2 3 4 5 => 15 + 1 = 16
# 6 7 8 9 10 => 40 + 1 = 41
# 16 * 41 = 656
seq = lambda{|x,y| x+y}
com = lambda{|x,y| x*y}
rdd = $sc.parallelize(1..10, 2)
rdd.aggregate(1, seq, com)
# => 656
333 334 335 |
# File 'lib/spark/rdd.rb', line 333 def aggregate(zero_value, seq_op, comb_op) _reduce(Spark::Command::Aggregate, seq_op, comb_op, zero_value) end |
#aggregate_by_key(zero_value, seq_func, comb_func, num_partitions = nil) ⇒ Object Also known as: aggregateByKey
Aggregate the values of each key, using given combine functions and a neutral zero value.
Example:
def combine(x,y)
x+y
end
def merge(x,y)
x*y
end
rdd = $sc.parallelize([["a", 1], ["b", 2], ["a", 3], ["a", 4], ["c", 5]], 2)
rdd.aggregate_by_key(1, method(:combine), method(:merge))
# => [["b", 3], ["a", 16], ["c", 6]]
1017 1018 1019 1020 1021 1022 1023 |
# File 'lib/spark/rdd.rb', line 1017 def aggregate_by_key(zero_value, seq_func, comb_func, num_partitions=nil) _combine_by_key( [Spark::Command::CombineByKey::CombineWithZero, zero_value, seq_func], [Spark::Command::CombineByKey::Merge, comb_func], num_partitions ) end |
#bind(objects) ⇒ Object
Bind object to RDD
Example:
text = "test"
rdd = $sc.parallelize(0..5)
rdd = rdd.map(lambda{|x| x.to_s + " " + text})
rdd = rdd.bind(text: text)
rdd.collect
# => ["0 test", "1 test", "2 test", "3 test", "4 test", "5 test"]
88 89 90 91 92 93 94 95 |
# File 'lib/spark/rdd.rb', line 88 def bind(objects) unless objects.is_a?(Hash) raise ArgumentError, 'Argument must be a Hash.' end @command.bind(objects) self end |
#cache ⇒ Object
Persist this RDD with the default storage level MEMORY_ONLY_SER because of serialization.
125 126 127 |
# File 'lib/spark/rdd.rb', line 125 def cache persist('memory_only_ser') end |
#cached? ⇒ Boolean
152 153 154 |
# File 'lib/spark/rdd.rb', line 152 def cached? @cached end |
#cartesian(other) ⇒ Object
Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements ‘(a, b)` where `a` is in `self` and `b` is in `other`.
Example:
rdd1 = $sc.parallelize([1,2,3])
rdd2 = $sc.parallelize([4,5,6])
rdd1.cartesian(rdd2).collect
# => [[1, 4], [1, 5], [1, 6], [2, 4], [2, 5], [2, 6], [3, 4], [3, 5], [3, 6]]
696 697 698 699 700 701 |
# File 'lib/spark/rdd.rb', line 696 def cartesian(other) _deserializer = Spark::Serializer::Cartesian.new(self.deserializer, other.deserializer) new_jrdd = jrdd.cartesian(other.jrdd) RDD.new(new_jrdd, context, serializer, _deserializer) end |
#checkpointed? ⇒ Boolean
156 157 158 |
# File 'lib/spark/rdd.rb', line 156 def checkpointed? @checkpointed end |
#coalesce(num_partitions) ⇒ Object
Return a new RDD that is reduced into num_partitions partitions.
Example:
rdd = $sc.parallelize(0..10, 3)
rdd.coalesce(2).glom.collect
# => [[0, 1, 2], [3, 4, 5, 6, 7, 8, 9, 10]]
674 675 676 677 678 679 680 681 682 683 |
# File 'lib/spark/rdd.rb', line 674 def coalesce(num_partitions) if self.is_a?(PipelinedRDD) deser = @command.serializer else deser = @command.deserializer end new_jrdd = jrdd.coalesce(num_partitions) RDD.new(new_jrdd, context, @command.serializer, deser) end |
#cogroup(*others) ⇒ Object
For each key k in ‘this` or `other`, return a resulting RDD that contains a tuple with the list of values for that key in `this` as well as `other`.
Example:
rdd1 = $sc.parallelize([["a", 1], ["a", 2], ["b", 3]])
rdd2 = $sc.parallelize([["a", 4], ["a", 5], ["b", 6]])
rdd3 = $sc.parallelize([["a", 7], ["a", 8], ["b", 9]])
rdd1.cogroup(rdd2, rdd3).collect
# => [["a", [1, 2, 4, 5, 7, 8]], ["b", [3, 6, 9]]]
1048 1049 1050 1051 1052 1053 1054 1055 |
# File 'lib/spark/rdd.rb', line 1048 def cogroup(*others) unioned = self others.each do |other| unioned = unioned.union(other) end unioned.group_by_key end |
#collect(as_enum = false) ⇒ Object
Return an array that contains all of the elements in this RDD. RJB raise an error if stage is killed.
193 194 195 196 197 198 199 200 201 |
# File 'lib/spark/rdd.rb', line 193 def collect(as_enum=false) file = Tempfile.new('collect', context.temp_dir) RubyRDD.writeRDDToFile(jrdd.rdd, file.path) collect_from_file(file, as_enum) rescue => e raise Spark::RDDError, e. end |
#collect_as_hash ⇒ Object
Convert an Array to Hash
223 224 225 |
# File 'lib/spark/rdd.rb', line 223 def collect_as_hash Hash[collect] end |
#collect_from_file(file, as_enum = false) ⇒ Object
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/spark/rdd.rb', line 203 def collect_from_file(file, as_enum=false) if self.is_a?(PipelinedRDD) klass = @command.serializer else klass = @command.deserializer end if as_enum result = klass.load_from_file(file) else result = klass.load_from_io(file).to_a file.close file.unlink end result end |
#combine_by_key(create_combiner, merge_value, merge_combiners, num_partitions = nil) ⇒ Object Also known as: combineByKey
Generic function to combine the elements for each key using a custom set of aggregation functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a “combined type” C * Note that V and C can be different – for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, List). Users provide three functions:
Parameters:
- create_combiner
-
which turns a V into a C (e.g., creates a one-element list)
- merge_value
-
to merge a V into a C (e.g., adds it to the end of a list)
- merge_combiners
-
to combine two C’s into a single one.
Example:
def combiner(x)
x
end
def merge(x,y)
x+y
end
rdd = $sc.parallelize(["a","b","c","a","b","c","a","c"], 2).map(lambda{|x| [x, 1]})
rdd.combine_by_key(method(:combiner), method(:merge), method(:merge)).collect_as_hash
# => {"a"=>3, "b"=>2, "c"=>3}
950 951 952 953 954 955 956 |
# File 'lib/spark/rdd.rb', line 950 def combine_by_key(create_combiner, merge_value, merge_combiners, num_partitions=nil) _combine_by_key( [Spark::Command::CombineByKey::Combine, create_combiner, merge_value], [Spark::Command::CombineByKey::Merge, merge_combiners], num_partitions ) end |
#compact ⇒ Object
Return a new RDD containing non-nil elements.
Example:
rdd = $sc.parallelize([1, nil, 2, nil, 3])
rdd.compact.collect
# => [1, 2, 3]
652 653 654 |
# File 'lib/spark/rdd.rb', line 652 def compact new_rdd_from_command(Spark::Command::Compact) end |
#config ⇒ Object
Variables and non-computing functions
106 107 108 |
# File 'lib/spark/rdd.rb', line 106 def config @context.config end |
#count ⇒ Object
Return the number of values in this RDD
Example:
rdd = $sc.parallelize(0..10)
rdd.count
# => 11
377 378 379 380 381 |
# File 'lib/spark/rdd.rb', line 377 def count # nil is for seq_op => it means the all result go directly to one worker for combine @count ||= self.map_partitions('lambda{|iterator| iterator.to_a.size }') .aggregate(0, nil, 'lambda{|sum, item| sum + item }') end |
#default_reduce_partitions ⇒ Object Also known as: defaultReducePartitions
110 111 112 |
# File 'lib/spark/rdd.rb', line 110 def default_reduce_partitions config['spark.default.parallelism'] || partitions_size end |
#distinct ⇒ Object
Return a new RDD containing the distinct elements in this RDD. Ordering is not preserved because of reducing
Example:
rdd = $sc.parallelize([1,1,1,2,3])
rdd.distinct.collect
# => [1, 2, 3]
711 712 713 714 715 |
# File 'lib/spark/rdd.rb', line 711 def distinct self.map('lambda{|x| [x, nil]}') .reduce_by_key('lambda{|x,_| x}') .map('lambda{|x| x[0]}') end |
#filter(f) ⇒ Object
Return a new RDD containing only the elements that satisfy a predicate.
Example:
rdd = $sc.parallelize(0..10)
rdd.filter(lambda{|x| x.even?}).collect
# => [0, 2, 4, 6, 8, 10]
641 642 643 |
# File 'lib/spark/rdd.rb', line 641 def filter(f) new_rdd_from_command(Spark::Command::Filter, f) end |
#first ⇒ Object
Return the first element in this RDD.
Example:
rdd = $sc.parallelize(0..100)
rdd.first
# => 0
281 282 283 |
# File 'lib/spark/rdd.rb', line 281 def first self.take(1)[0] end |
#flat_map(f) ⇒ Object Also known as: flatMap
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
Example:
rdd = $sc.parallelize(0..5)
rdd.flat_map(lambda {|x| [x, 1]}).collect
# => [0, 1, 1, 1, 2, 1, 3, 1, 4, 1, 5, 1]
607 608 609 |
# File 'lib/spark/rdd.rb', line 607 def flat_map(f) new_rdd_from_command(Spark::Command::FlatMap, f) end |
#flat_map_values(f) ⇒ Object
Pass each value in the key-value pair RDD through a flat_map function without changing the keys; this also retains the original RDD’s partitioning.
Example:
rdd = $sc.parallelize([["a", [1,2]], ["b", [3]]])
rdd = rdd.flat_map_values(lambda{|x| x*2})
rdd.collect
# => [["a", 1], ["a", 2], ["a", 1], ["a", 2], ["b", 3], ["b", 3]]
1209 1210 1211 |
# File 'lib/spark/rdd.rb', line 1209 def flat_map_values(f) new_rdd_from_command(Spark::Command::FlatMapValues, f) end |
#fold(zero_value, f) ⇒ Object
Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral “zero value”.
The function f(x, y) is allowed to modify x and return it as its result value to avoid object allocation; however, it should not modify y.
Be careful, zero_values is applied to all stages. See example.
Example:
rdd = $sc.parallelize(0..10, 2)
rdd.fold(1, lambda{|sum, x| sum+x})
# => 58
309 310 311 |
# File 'lib/spark/rdd.rb', line 309 def fold(zero_value, f) self.aggregate(zero_value, f, f) end |
#fold_by_key(zero_value, f, num_partitions = nil) ⇒ Object Also known as: foldByKey
Merge the values for each key using an associative function f and a neutral ‘zero_value` which may be added to the result an arbitrary number of times, and must not change the result (e.g., 0 for addition, or 1 for multiplication.).
Example:
rdd = $sc.parallelize([["a", 1], ["b", 2], ["a", 3], ["a", 4], ["c", 5]])
rdd.fold_by_key(1, lambda{|x,y| x+y})
# => [["a", 9], ["c", 6], ["b", 3]]
998 999 1000 |
# File 'lib/spark/rdd.rb', line 998 def fold_by_key(zero_value, f, num_partitions=nil) self.aggregate_by_key(zero_value, f, f, num_partitions) end |
#foreach(f, options = {}) ⇒ Object
Applies a function f to all elements of this RDD.
Example:
rdd = $sc.parallelize(0..5)
rdd.foreach(lambda{|x| puts x})
# => nil
567 568 569 570 |
# File 'lib/spark/rdd.rb', line 567 def foreach(f, ={}) new_rdd_from_command(Spark::Command::Foreach, f).collect nil end |
#foreach_partition(f, options = {}) ⇒ Object Also known as: foreachPartition
Applies a function f to each partition of this RDD.
Example:
rdd = $sc.parallelize(0..5)
rdd.foreachPartition(lambda{|x| puts x.to_s})
# => nil
579 580 581 582 |
# File 'lib/spark/rdd.rb', line 579 def foreach_partition(f, ={}) new_rdd_from_command(Spark::Command::ForeachPartition, f).collect nil end |
#glom ⇒ Object
Return an RDD created by coalescing all elements within each partition into an array.
Example:
rdd = $sc.parallelize(0..10, 3)
rdd.glom.collect
# => [[0, 1, 2], [3, 4, 5, 6], [7, 8, 9, 10]]
663 664 665 |
# File 'lib/spark/rdd.rb', line 663 def glom new_rdd_from_command(Spark::Command::Glom) end |
#group_by(f, num_partitions = nil) ⇒ Object Also known as: groupBy
Return an RDD of grouped items.
Example:
rdd = $sc.parallelize(0..5)
rdd.group_by(lambda{|x| x%2}).collect
# => [[0, [0, 2, 4]], [1, [1, 3, 5]]]
965 966 967 |
# File 'lib/spark/rdd.rb', line 965 def group_by(f, num_partitions=nil) self.key_by(f).group_by_key(num_partitions) end |
#group_by_key(num_partitions = nil) ⇒ Object Also known as: groupByKey
Group the values for each key in the RDD into a single sequence. Allows controlling the partitioning of the resulting key-value pair RDD by passing a Partitioner.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduce_by_key or combine_by_key will provide much better performance.
Example:
rdd = $sc.parallelize([["a", 1], ["a", 2], ["b", 3]])
rdd.group_by_key.collect
# => [["a", [1, 2]], ["b", [3]]]
980 981 982 983 984 985 986 |
# File 'lib/spark/rdd.rb', line 980 def group_by_key(num_partitions=nil) create_combiner = 'lambda{|item| [item]}' merge_value = 'lambda{|combiner, item| combiner << item; combiner}' merge_combiners = 'lambda{|combiner_1, combiner_2| combiner_1 += combiner_2; combiner_1}' combine_by_key(create_combiner, merge_value, merge_combiners, num_partitions) end |
#group_with(other, num_partitions = nil) ⇒ Object Also known as: groupWith
The same functionality as cogroup but this can grouped only 2 rdd’s and you can change num_partitions.
Example:
rdd1 = $sc.parallelize([["a", 1], ["a", 2], ["b", 3]])
rdd2 = $sc.parallelize([["a", 4], ["a", 5], ["b", 6]])
rdd1.group_with(rdd2).collect
# => [["a", [1, 2, 4, 5]], ["b", [3, 6]]]
1034 1035 1036 |
# File 'lib/spark/rdd.rb', line 1034 def group_with(other, num_partitions=nil) self.union(other).group_by_key(num_partitions) end |
#histogram(buckets) ⇒ Object
Compute a histogram using the provided buckets. The buckets are all open to the right except for the last which is closed. e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50], which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 and 50 we would have a histogram of 1,0,1.
If your histogram is evenly spaced (e.g. [0, 10, 20, 30]), this can be switched from an O(log n) inseration to O(1) per element(where n = # buckets).
Buckets must be sorted and not contain any duplicates, must be at least two elements.
Examples:
rdd = $sc.parallelize(0..50)
rdd.histogram(2)
# => [[0.0, 25.0, 50], [25, 26]]
rdd.histogram([0, 5, 25, 50])
# => [[0, 5, 25, 50], [5, 20, 26]]
rdd.histogram([0, 15, 30, 45, 60])
# => [[0, 15, 30, 45, 60], [15, 15, 15, 6]]
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 |
# File 'lib/spark/rdd.rb', line 467 def histogram(buckets) # ----------------------------------------------------------------------- # Integer # if buckets.is_a?(Integer) # Validation if buckets < 1 raise ArgumentError, "Bucket count must be >= 1, #{buckets} inserted." end # Filter invalid values # Nil and NaN func = 'lambda{|x| if x.nil? || (x.is_a?(Float) && x.nan?) false else true end }' filtered = self.filter(func) # Compute the minimum and the maximum func = 'lambda{|memo, item| [memo[0] < item[0] ? memo[0] : item[0], memo[1] > item[1] ? memo[1] : item[1]] }' min, max = filtered.map('lambda{|x| [x, x]}').reduce(func) # Min, max must be valid numbers if (min.is_a?(Float) && !min.finite?) || (max.is_a?(Float) && !max.finite?) raise Spark::RDDError, 'Histogram on either an empty RDD or RDD containing +/-infinity or NaN' end # Already finished if min == max || buckets == 1 return [min, max], [filtered.count] end # Custom range begin span = max - min # increment buckets = (0...buckets).map do |x| min + (x * span) / buckets.to_f end buckets << max rescue NoMethodError raise Spark::RDDError, 'Can not generate buckets with non-number in RDD' end even = true # ----------------------------------------------------------------------- # Array # elsif buckets.is_a?(Array) if buckets.size < 2 raise ArgumentError, 'Buckets should have more than one value.' end if buckets.detect{|x| x.nil? || (x.is_a?(Float) && x.nan?)} raise ArgumentError, 'Can not have nil or nan numbers in buckets.' end if buckets.detect{|x| buckets.count(x) > 1} raise ArgumentError, 'Buckets should not contain duplicated values.' end if buckets.sort != buckets raise ArgumentError, 'Buckets must be sorted.' end even = false # ----------------------------------------------------------------------- # Other # else raise Spark::RDDError, 'Buckets should be number or array.' end reduce_func = 'lambda{|memo, item| memo.size.times do |i| memo[i] += item[i] end memo }' return buckets, new_rdd_from_command(Spark::Command::Histogram, even, buckets).reduce(reduce_func) end |
#id ⇒ Object
A unique ID for this RDD (within its SparkContext).
120 121 122 |
# File 'lib/spark/rdd.rb', line 120 def id jrdd.id end |
#inspect ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/spark/rdd.rb', line 37 def inspect comms = @command.commands.join(' -> ') result = %{#<#{self.class.name}:0x#{object_id}} result << %{ (#{comms})} unless comms.empty? result << %{\n} result << %{ Serializer: "#{serializer}"\n} result << %{Deserializer: "#{deserializer}"} result << %{>} result end |
#intersection(other) ⇒ Object
Return the intersection of this RDD and another one. The output will not contain any duplicate elements, even if the input RDDs did.
Example:
rdd1 = $sc.parallelize([1,2,3,4,5])
rdd2 = $sc.parallelize([1,4,5,6,7])
rdd1.intersection(rdd2).collect
# => [1, 4, 5]
776 777 778 779 780 781 782 783 784 |
# File 'lib/spark/rdd.rb', line 776 def intersection(other) mapping_function = 'lambda{|item| [item, nil]}' filter_function = 'lambda{|(key, values)| values.size > 1}' self.map(mapping_function) .cogroup(other.map(mapping_function)) .filter(filter_function) .keys end |
#key_by(f) ⇒ Object Also known as: keyBy
Creates array of the elements in this RDD by applying function f.
Example:
rdd = $sc.parallelize(0..5)
rdd.key_by(lambda{|x| x%2}).collect
# => [[0, 0], [1, 1], [0, 2], [1, 3], [0, 4], [1, 5]]
1181 1182 1183 |
# File 'lib/spark/rdd.rb', line 1181 def key_by(f) new_rdd_from_command(Spark::Command::KeyBy, f) end |
#keys ⇒ Object
Return an RDD with the first element of PairRDD
Example:
rdd = $sc.parallelize([[1,2], [3,4], [5,6]])
rdd.keys.collect
# => [1, 3, 5]
1220 1221 1222 |
# File 'lib/spark/rdd.rb', line 1220 def keys self.map('lambda{|(key, _)| key}') end |
#map(f) ⇒ Object
Return a new RDD by applying a function to all elements of this RDD.
Example:
rdd = $sc.parallelize(0..5)
rdd.map(lambda {|x| x*2}).collect
# => [0, 2, 4, 6, 8, 10]
595 596 597 |
# File 'lib/spark/rdd.rb', line 595 def map(f) new_rdd_from_command(Spark::Command::Map, f) end |
#map_partitions(f) ⇒ Object Also known as: mapPartitions
Return a new RDD by applying a function to each partition of this RDD.
Example:
rdd = $sc.parallelize(0..10, 2)
rdd.map_partitions(lambda{|part| part.reduce(:+)}).collect
# => [15, 40]
618 619 620 |
# File 'lib/spark/rdd.rb', line 618 def map_partitions(f) new_rdd_from_command(Spark::Command::MapPartitions, f) end |
#map_partitions_with_index(f, options = {}) ⇒ Object Also known as: mapPartitionsWithIndex
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
Example:
rdd = $sc.parallelize(0...4, 4)
rdd.map_partitions_with_index(lambda{|part, index| part.first * index}).collect
# => [0, 1, 4, 9]
630 631 632 |
# File 'lib/spark/rdd.rb', line 630 def map_partitions_with_index(f, ={}) new_rdd_from_command(Spark::Command::MapPartitionsWithIndex, f) end |
#map_values(f) ⇒ Object Also known as: mapValues
Pass each value in the key-value pair RDD through a map function without changing the keys. This also retains the original RDD’s partitioning.
Example:
rdd = $sc.parallelize(["ruby", "scala", "java"])
rdd = rdd.map(lambda{|x| [x, x]})
rdd = rdd.map_values(lambda{|x| x.upcase})
rdd.collect
# => [["ruby", "RUBY"], ["scala", "SCALA"], ["java", "JAVA"]]
1195 1196 1197 |
# File 'lib/spark/rdd.rb', line 1195 def map_values(f) new_rdd_from_command(Spark::Command::MapValues, f) end |
#max ⇒ Object
Return the max of this RDD
Example:
rdd = $sc.parallelize(0..10)
rdd.max
# => 10
344 345 346 |
# File 'lib/spark/rdd.rb', line 344 def max self.reduce('lambda{|memo, item| memo > item ? memo : item }') end |
#mean ⇒ Object
Compute the mean of this RDD’s elements.
Example:
$sc.parallelize([1, 2, 3]).mean
# => 2.0
395 396 397 |
# File 'lib/spark/rdd.rb', line 395 def mean stats.mean end |
#min ⇒ Object
Return the min of this RDD
Example:
rdd = $sc.parallelize(0..10)
rdd.min
# => 0
355 356 357 |
# File 'lib/spark/rdd.rb', line 355 def min self.reduce('lambda{|memo, item| memo < item ? memo : item }') end |
#name ⇒ Object
Return the name of this RDD.
162 163 164 165 |
# File 'lib/spark/rdd.rb', line 162 def name _name = jrdd.name _name && _name.encode(Encoding::UTF_8) end |
#new_rdd_from_command(klass, *args) ⇒ Object
97 98 99 100 |
# File 'lib/spark/rdd.rb', line 97 def new_rdd_from_command(klass, *args) comm = add_command(klass, *args) PipelinedRDD.new(self, comm) end |
#partition_by(num_partitions, partition_func = nil) ⇒ Object Also known as: partitionBy
Return a copy of the RDD partitioned using the specified partitioner.
Example:
rdd = $sc.parallelize(["1","2","3","4","5"]).map(lambda {|x| [x, 1]})
rdd.partitionBy(2).glom.collect
# => [[["3", 1], ["4", 1]], [["1", 1], ["2", 1], ["5", 1]]]
793 794 795 796 797 798 |
# File 'lib/spark/rdd.rb', line 793 def partition_by(num_partitions, partition_func=nil) num_partitions ||= default_reduce_partitions partition_func ||= 'lambda{|x| Spark::Digest.portable_hash(x.to_s)}' _partition_by(num_partitions, Spark::Command::PartitionBy::Basic, partition_func) end |
#partitions_size ⇒ Object Also known as: partitionsSize
Count of ParallelCollectionPartition
115 116 117 |
# File 'lib/spark/rdd.rb', line 115 def partitions_size jrdd.rdd.partitions.size end |
#persist(new_level) ⇒ Object
Set this RDD’s storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet.
See StorageLevel for type of new_level
135 136 137 138 139 |
# File 'lib/spark/rdd.rb', line 135 def persist(new_level) @cached = true jrdd.persist(Spark::StorageLevel.java_get(new_level)) self end |
#pipe(*cmds) ⇒ Object
Return an RDD created by piping elements to a forked external process.
Cmds:
cmd = [env,] command... [,options]
env: hash
name => val : set the environment variable
name => nil : unset the environment variable
command...:
commandline : command line string which is passed to the standard shell
cmdname, arg1, ... : command name and one or more arguments (This form does
not use the shell. See below for caveats.)
[cmdname, argv0], arg1, ... : command name, argv[0] and zero or more arguments (no shell)
options: hash
See http://ruby-doc.org/core-2.2.0/Process.html#method-c-spawn
Examples:
$sc.parallelize(0..5).pipe('cat').collect
# => ["0", "1", "2", "3", "4", "5"]
rdd = $sc.parallelize(0..5)
rdd = rdd.pipe('cat', "awk '{print $1*10}'")
rdd = rdd.map(lambda{|x| x.to_i + 1})
rdd.collect
# => [1, 11, 21, 31, 41, 51]
904 905 906 |
# File 'lib/spark/rdd.rb', line 904 def pipe(*cmds) new_rdd_from_command(Spark::Command::Pipe, cmds) end |
#reduce(f) ⇒ Object
Reduces the elements of this RDD using the specified lambda or method.
Example:
rdd = $sc.parallelize(0..10)
rdd.reduce(lambda{|sum, x| sum+x})
# => 55
292 293 294 |
# File 'lib/spark/rdd.rb', line 292 def reduce(f) _reduce(Spark::Command::Reduce, f, f) end |
#reduce_by_key(f, num_partitions = nil) ⇒ Object Also known as: reduceByKey
Merge the values for each key using an associative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce. Output will be hash-partitioned with the existing partitioner/ parallelism level.
Example:
rdd = $sc.parallelize(["a","b","c","a","b","c","a","c"]).map(lambda{|x| [x, 1]})
rdd.reduce_by_key(lambda{|x,y| x+y}).collect_as_hash
# => {"a"=>3, "b"=>2, "c"=>3}
922 923 924 |
# File 'lib/spark/rdd.rb', line 922 def reduce_by_key(f, num_partitions=nil) combine_by_key('lambda {|x| x}', f, f, num_partitions) end |
#reserialize(new_serializer) ⇒ Object
Return a new RDD with different serializer. This method is useful during union and join operations.
Example:
rdd = $sc.parallelize([1, 2, 3], nil, serializer: "marshal")
rdd = rdd.map(lambda{|x| x.to_s})
rdd.reserialize("oj").collect
# => ["1", "2", "3"]
756 757 758 759 760 761 762 763 764 765 |
# File 'lib/spark/rdd.rb', line 756 def reserialize(new_serializer) if serializer == new_serializer return self end new_command = @command.deep_copy new_command.serializer = new_serializer PipelinedRDD.new(self, new_command) end |
#sample(with_replacement, fraction, seed = nil) ⇒ Object
Return a sampled subset of this RDD. Operations are base on Poisson and Uniform distributions. TODO: Replace Unfirom for Bernoulli
Examples:
rdd = $sc.parallelize(0..100)
rdd.sample(true, 10).collect
# => [17, 17, 22, 23, 51, 52, 62, 64, 69, 70, 96]
rdd.sample(false, 0.1).collect
# => [3, 5, 9, 32, 44, 55, 66, 68, 75, 80, 86, 91, 98]
813 814 815 |
# File 'lib/spark/rdd.rb', line 813 def sample(with_replacement, fraction, seed=nil) new_rdd_from_command(Spark::Command::Sample, with_replacement, fraction, seed) end |
#sample_stdev ⇒ Object Also known as: sampleStdev
Compute the sample standard deviation of this RDD’s elements (which corrects for bias in estimating the standard deviation by dividing by N-1 instead of N).
Example:
$sc.parallelize([1, 2, 3]).sample_stdev
# => 1.0
427 428 429 |
# File 'lib/spark/rdd.rb', line 427 def sample_stdev stats.sample_stdev end |
#sample_variance ⇒ Object Also known as: sampleVariance
Compute the sample variance of this RDD’s elements (which corrects for bias in estimating the variance by dividing by N-1 instead of N).
Example:
$sc.parallelize([1, 2, 3]).sample_variance
# => 1.0
438 439 440 |
# File 'lib/spark/rdd.rb', line 438 def sample_variance stats.sample_variance end |
#set_name(name) ⇒ Object Also known as: setName
Assign a name to this RDD.
169 170 171 |
# File 'lib/spark/rdd.rb', line 169 def set_name(name) jrdd.setName(name) end |
#shuffle(seed = nil) ⇒ Object
Return a shuffled RDD.
Example:
rdd = $sc.parallelize(0..10)
rdd.shuffle.collect
# => [3, 10, 6, 7, 8, 0, 4, 2, 9, 1, 5]
724 725 726 727 728 |
# File 'lib/spark/rdd.rb', line 724 def shuffle(seed=nil) seed ||= Random.new_seed new_rdd_from_command(Spark::Command::Shuffle, seed) end |
#sort_by(key_function = nil, ascending = true, num_partitions = nil) ⇒ Object Also known as: sortBy
Sorts this RDD by the given key_function
This is a different implementation than spark. Sort by doesn’t use key_by method first. It can be slower but take less memory and you can always use map.sort_by_key
Example:
rdd = $sc.parallelize(["aaaaaaa", "cc", "b", "eeee", "ddd"])
rdd.sort_by.collect
# => ["aaaaaaa", "b", "cc", "ddd", "eeee"]
rdd.sort_by(lambda{|x| x.size}).collect
# => ["b", "cc", "ddd", "eeee", "aaaaaaa"]
1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 |
# File 'lib/spark/rdd.rb', line 1130 def sort_by(key_function=nil, ascending=true, num_partitions=nil) key_function ||= 'lambda{|x| x}' num_partitions ||= default_reduce_partitions command_klass = Spark::Command::SortByKey # Allow spill data to disk due to memory limit # spilling = config['spark.shuffle.spill'] || false spilling = false memory = '' # Set spilling to false if worker has unlimited memory if memory.empty? spilling = false memory = nil else memory = to_memory_size(memory) end # Sorting should do one worker if num_partitions == 1 rdd = self rdd = rdd.coalesce(1) if partitions_size > 1 return rdd.new_rdd_from_command(command_klass, key_function, ascending, spilling, memory, serializer) end # Compute boundary of collection # Collection should be evenly distributed # 20.0 is from scala RangePartitioner (for roughly balanced output partitions) count = self.count sample_size = num_partitions * 20.0 fraction = [sample_size / [count, 1].max, 1.0].min samples = self.sample(false, fraction, 1).map(key_function).collect samples.sort! # Reverse is much faster than reverse sort_by samples.reverse! if !ascending # Determine part bounds bounds = determine_bounds(samples, num_partitions) shuffled = _partition_by(num_partitions, Spark::Command::PartitionBy::Sorting, key_function, bounds, ascending, num_partitions) shuffled.new_rdd_from_command(command_klass, key_function, ascending, spilling, memory, serializer) end |
#sort_by_key(ascending = true, num_partitions = nil) ⇒ Object Also known as: sortByKey
Sort the RDD by key
Example:
rdd = $sc.parallelize([["c", 1], ["b", 2], ["a", 3]])
rdd.sort_by_key.collect
# => [["a", 3], ["b", 2], ["c", 1]]
1100 1101 1102 |
# File 'lib/spark/rdd.rb', line 1100 def sort_by_key(ascending=true, num_partitions=nil) self.sort_by('lambda{|(key, _)| key}') end |
#sort_by_value(ascending = true, num_partitions = nil) ⇒ Object
Sort the RDD by value
Example:
rdd = $sc.parallelize([["a", 3], ["b", 1], ["c", 2]])
rdd.sort_by_value.collect
# => [["b", 1], ["c", 2], ["a", 3]]
1111 1112 1113 |
# File 'lib/spark/rdd.rb', line 1111 def sort_by_value(ascending=true, num_partitions=nil) self.sort_by('lambda{|(_, value)| value}') end |
#stats ⇒ Object
Return a StatCounter object that captures the mean, variance and count of the RDD’s elements in one operation.
385 386 387 |
# File 'lib/spark/rdd.rb', line 385 def stats @stats ||= new_rdd_from_command(Spark::Command::Stats).reduce('lambda{|memo, item| memo.merge(item)}') end |
#stdev ⇒ Object
Compute the standard deviation of this RDD’s elements.
Example:
$sc.parallelize([1, 2, 3]).stdev
# => 0.816...
415 416 417 |
# File 'lib/spark/rdd.rb', line 415 def stdev stats.stdev end |
#subtract(other, num_partitions = nil) ⇒ Object
Return an RDD with the elements from self that are not in other.
Example:
rdd1 = $sc.parallelize([["a", 1], ["a", 2], ["b", 3], ["c", 4]])
rdd2 = $sc.parallelize([["a", 2], ["c", 6]])
rdd1.subtract(rdd2).collect
# => [["a", 1], ["b", 3], ["c", 4]]
1085 1086 1087 1088 1089 1090 1091 |
# File 'lib/spark/rdd.rb', line 1085 def subtract(other, num_partitions=nil) mapping_function = 'lambda{|x| [x,nil]}' self.map(mapping_function) .subtract_by_key(other.map(mapping_function), num_partitions) .keys end |
#subtract_by_key(other, num_partitions = nil) ⇒ Object Also known as: subtractByKey
Return each (key, value) pair in self RDD that has no pair with matching key in other RDD.
Example:
rdd1 = $sc.parallelize([["a", 1], ["a", 2], ["b", 3], ["c", 4]])
rdd2 = $sc.parallelize([["b", 5], ["c", 6]])
rdd1.subtract_by_key(rdd2).collect
# => [["a", 1], ["a", 2]]
1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 |
# File 'lib/spark/rdd.rb', line 1066 def subtract_by_key(other, num_partitions=nil) create_combiner = 'lambda{|item| [[item]]}' merge_value = 'lambda{|combiner, item| combiner.first << item; combiner}' merge_combiners = 'lambda{|combiner_1, combiner_2| combiner_1 += combiner_2; combiner_1}' self.union(other) .combine_by_key(create_combiner, merge_value, merge_combiners, num_partitions) .filter('lambda{|(key,values)| values.size == 1}') .flat_map_values('lambda{|item| item.first}') end |
#sum ⇒ Object
Return the sum of this RDD
Example:
rdd = $sc.parallelize(0..10)
rdd.sum
# => 55
366 367 368 |
# File 'lib/spark/rdd.rb', line 366 def sum self.reduce('lambda{|sum, item| sum + item}') end |
#take(count) ⇒ Object
Take the first num elements of the RDD.
It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.
Example:
rdd = $sc.parallelize(0..100, 20)
rdd.take(5)
# => [0, 1, 2, 3, 4]
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/spark/rdd.rb', line 238 def take(count) buffer = [] parts_count = self.partitions_size # No parts was scanned, yet last_scanned = -1 while buffer.empty? last_scanned += 1 buffer += context.run_job_with_command(self, [last_scanned], true, Spark::Command::Take, 0, -1) end # Assumption. Depend on batch_size and how Spark divided data. items_per_part = buffer.size left = count - buffer.size while left > 0 && last_scanned < parts_count parts_to_take = (left.to_f/items_per_part).ceil parts_for_scanned = Array.new(parts_to_take) do last_scanned += 1 end # We cannot take exact number of items because workers are isolated from each other. # => once you take e.g. 50% from last part and left is still > 0 then its very # difficult merge new items items = context.run_job_with_command(self, parts_for_scanned, true, Spark::Command::Take, left, last_scanned) buffer += items left = count - buffer.size # Average size of all parts items_per_part = [items_per_part, items.size].reduce(0){|sum, x| sum + x.to_f/2} end buffer.slice!(0, count) end |
#take_sample(with_replacement, num, seed = nil) ⇒ Object Also known as: takeSample
Return a fixed-size sampled subset of this RDD in an array
Examples:
rdd = $sc.parallelize(0..100)
rdd.take_sample(true, 10)
# => [90, 84, 74, 44, 27, 22, 72, 96, 80, 54]
rdd.take_sample(false, 10)
# => [5, 35, 30, 48, 22, 33, 40, 75, 42, 32]
828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 |
# File 'lib/spark/rdd.rb', line 828 def take_sample(with_replacement, num, seed=nil) if num < 0 raise Spark::RDDError, 'Size have to be greater than 0' elsif num == 0 return [] end # Taken from scala num_st_dev = 10.0 # Number of items initial_count = self.count return [] if initial_count == 0 # Create new generator seed ||= Random.new_seed rng = Random.new(seed) # Shuffle elements if requested num if greater than array size if !with_replacement && num >= initial_count return self.shuffle(seed).collect end # Max num max_sample_size = Integer::MAX - (num_st_dev * Math.sqrt(Integer::MAX)).to_i if num > max_sample_size raise Spark::RDDError, "Size can not be greate than #{max_sample_size}" end # Approximate fraction with tolerance fraction = compute_fraction(num, initial_count, with_replacement) # Compute first samled subset samples = self.sample(with_replacement, fraction, seed).collect # If the first sample didn't turn out large enough, keep trying to take samples; # this shouldn't happen often because we use a big multiplier for their initial size. index = 0 while samples.size < num log_warning("Needed to re-sample due to insufficient sample size. Repeat #{index}") samples = self.sample(with_replacement, fraction, rng.rand(0..Integer::MAX)).collect index += 1 end samples.shuffle!(random: rng) samples[0, num] end |
#to_java ⇒ Object
173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/spark/rdd.rb', line 173 def to_java marshal = Spark::Serializer.marshal if deserializer.batched? ser = deserializer.deep_copy ser.serializer = marshal else ser = Spark::Serializer.batched(marshal) end rdd = self.reserialize(ser) RubyRDD.toJava(rdd.jrdd, rdd.serializer.batched?) end |
#union(other) ⇒ Object
Return the union of this RDD and another one. Any identical elements will appear multiple times (use .distinct to eliminate them).
Example:
rdd = $sc.parallelize([1, 2, 3])
rdd.union(rdd).collect
# => [1, 2, 3, 1, 2, 3]
738 739 740 741 742 743 744 745 |
# File 'lib/spark/rdd.rb', line 738 def union(other) if self.serializer != other.serializer other = other.reserialize(serializer) end new_jrdd = jrdd.union(other.jrdd) RDD.new(new_jrdd, context, serializer, deserializer) end |
#unpersist(blocking = true) ⇒ Object
Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
Parameters:
- blocking
-
whether to block until all blocks are deleted.
146 147 148 149 150 |
# File 'lib/spark/rdd.rb', line 146 def unpersist(blocking=true) @cached = false jrdd.unpersist(blocking) self end |
#values ⇒ Object
Return an RDD with the second element of PairRDD
Example:
rdd = $sc.parallelize([[1,2], [3,4], [5,6]])
rdd.keys.collect
# => [2, 4, 6]
1231 1232 1233 |
# File 'lib/spark/rdd.rb', line 1231 def values self.map('lambda{|(_, value)| value}') end |
#variance ⇒ Object
Compute the variance of this RDD’s elements.
Example:
$sc.parallelize([1, 2, 3]).variance
# => 0.666...
405 406 407 |
# File 'lib/spark/rdd.rb', line 405 def variance stats.variance end |