Class: Spark::Context
- Inherits:
-
Object
- Object
- Spark::Context
- Includes:
- Helper::Logger, Helper::Parser, Helper::System
- Defined in:
- lib/spark/context.rb
Overview
Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
Instance Attribute Summary collapse
-
#jaccumulator ⇒ Object
readonly
Returns the value of attribute jaccumulator.
-
#jcontext ⇒ Object
readonly
Returns the value of attribute jcontext.
-
#temp_dir ⇒ Object
readonly
Returns the value of attribute temp_dir.
Instance Method Summary collapse
-
#accumulator(value, accum_param = :+, zero_value = 0) ⇒ Object
Create an Accumulator with the given initial value, using a given accum_param helper object to define how to add values of the data type if provided.
-
#add_file(*files) ⇒ Object
(also: #addFile)
Add a file to be downloaded with this Spark job on every node.
-
#broadcast(value) ⇒ Object
Broadcast a read-only variable to the cluster, returning a Spark::Broadcast object for reading it in distributed functions.
-
#config(key = nil) ⇒ Object
Return a copy of this SparkContext’s configuration.
-
#default_parallelism ⇒ Object
(also: #defaultParallelism)
Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD).
-
#get_call_site ⇒ Object
(also: #getCallSite)
Capture the current user callsite and return a formatted version for printing.
-
#get_local_property(key) ⇒ Object
(also: #getLocalProperty)
Get a local property set in this thread, or null if it is missing.
- #get_serializer(serializer, *args) ⇒ Object
-
#initialize ⇒ Context
constructor
Constructor for Ruby context.
-
#parallelize(data, num_slices = nil, options = {}) ⇒ Object
Distribute a local Ruby collection to form an RDD Direct method can be slow so be careful, this method update data inplace.
-
#run_job(rdd, f, partitions = nil, allow_local = false) ⇒ Object
(also: #runJob)
Executes the given partition function f on the specified set of partitions, returning the result as an array of elements.
-
#run_job_with_command(rdd, partitions, allow_local, command, *args) ⇒ Object
(also: #runJobWithCommand)
Execute the given command on specific set of partitions.
- #sc ⇒ Object
-
#set_call_site(site) ⇒ Object
(also: #setCallSite)
Support function for API backtraces.
-
#set_local_property(key, value) ⇒ Object
(also: #setLocalProperty)
Set a local property that affects jobs submitted from this thread, such as the Spark fair scheduler pool.
- #stop ⇒ Object
-
#text_file(path, min_partitions = nil, options = {}) ⇒ Object
(also: #textFile)
Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.
- #ui ⇒ Object
-
#whole_text_files(path, min_partitions = nil, options = {}) ⇒ Object
(also: #wholeTextFiles)
Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI.
Methods included from Helper::Logger
Methods included from Helper::Parser
Methods included from Helper::System
Constructor Details
#initialize ⇒ Context
Constructor for Ruby context. Configuration is automatically is taken from Spark. Config will be automatically set to default if user start context first.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/spark/context.rb', line 20 def initialize Spark.config.valid! @jcontext = JavaSparkContext.new(Spark.config.spark_conf) @jcontext.addJar(Spark.ruby_spark_jar) # Does not work on 1.2 # ui.attachTab(RubyTab.new(ui, to_java_hash(RbConfig::CONFIG))) spark_local_dir = JUtils.getLocalDir(sc.conf) @temp_dir = JUtils.createTempDir(spark_local_dir, 'ruby').getAbsolutePath accum_server = Spark::Accumulator::Server accum_server.start @jaccumulator = @jcontext.accumulator(ArrayList.new, RubyAccumulatorParam.new(accum_server.host, accum_server.port)) log_info("Ruby accumulator server is running on port #{accum_server.port}") set_call_site('Ruby') # description of stage end |
Instance Attribute Details
#jaccumulator ⇒ Object (readonly)
Returns the value of attribute jaccumulator.
14 15 16 |
# File 'lib/spark/context.rb', line 14 def jaccumulator @jaccumulator end |
#jcontext ⇒ Object (readonly)
Returns the value of attribute jcontext.
14 15 16 |
# File 'lib/spark/context.rb', line 14 def jcontext @jcontext end |
#temp_dir ⇒ Object (readonly)
Returns the value of attribute temp_dir.
14 15 16 |
# File 'lib/spark/context.rb', line 14 def temp_dir @temp_dir end |
Instance Method Details
#accumulator(value, accum_param = :+, zero_value = 0) ⇒ Object
Create an Accumulator with the given initial value, using a given accum_param helper object to define how to add values of the data type if provided.
Example:
accum = $sc.accumulator(7)
rdd = $sc.parallelize(0..5, 4)
rdd = rdd.bind(accum: accum)
rdd = rdd.map_partitions(lambda{|_| accum.add(1) })
rdd = rdd.collect
accum.value
# => 11
157 158 159 |
# File 'lib/spark/context.rb', line 157 def accumulator(value, accum_param=:+, zero_value=0) Spark::Accumulator.new(value, accum_param, zero_value) end |
#add_file(*files) ⇒ Object Also known as: addFile
Add a file to be downloaded with this Spark job on every node. The path of file passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI.
To access the file in Spark jobs, use ‘SparkFiles.get(file_name)` with the filename to find its download location.
Example:
`echo 10 > test.txt`
$sc.add_file('test.txt')
$sc.parallelize(0..5).map(lambda{|x| x * SparkFiles.get_content('test.txt').to_i}).collect
# => [0, 10, 20, 30, 40, 50]
118 119 120 121 122 |
# File 'lib/spark/context.rb', line 118 def add_file(*files) files.each do |file| sc.addFile(file) end end |
#broadcast(value) ⇒ Object
Broadcast a read-only variable to the cluster, returning a Spark::Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.
Example:
broadcast1 = $sc.broadcast('a')
broadcast2 = $sc.broadcast('b')
rdd = $sc.parallelize(0..5, 4)
rdd = rdd.bind(broadcast1: broadcast1, broadcast2: broadcast2)
rdd = rdd.map_partitions_with_index(lambda{|part, index| [broadcast1.value * index, broadcast2.value * index] })
rdd.collect
# => ["", "", "a", "b", "aa", "bb", "aaa", "bbb"]
138 139 140 |
# File 'lib/spark/context.rb', line 138 def broadcast(value) Spark::Broadcast.new(self, value) end |
#config(key = nil) ⇒ Object
Return a copy of this SparkContext’s configuration. The configuration cannot be changed at runtime.
95 96 97 98 99 100 101 102 |
# File 'lib/spark/context.rb', line 95 def config(key=nil) # if key # Spark.config[key] # else # Spark.config.get_all # end Spark.config end |
#default_parallelism ⇒ Object Also known as: defaultParallelism
Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD)
56 57 58 |
# File 'lib/spark/context.rb', line 56 def default_parallelism sc.defaultParallelism end |
#get_call_site ⇒ Object Also known as: getCallSite
Capture the current user callsite and return a formatted version for printing. If the user has overridden the call site, this will return the user’s version.
88 89 90 |
# File 'lib/spark/context.rb', line 88 def get_call_site jcontext.getCallSite end |
#get_local_property(key) ⇒ Object Also known as: getLocalProperty
Get a local property set in this thread, or null if it is missing
75 76 77 |
# File 'lib/spark/context.rb', line 75 def get_local_property(key) jcontext.getLocalProperty(key) end |
#get_serializer(serializer, *args) ⇒ Object
60 61 62 63 64 |
# File 'lib/spark/context.rb', line 60 def get_serializer(serializer, *args) serializer = Spark::Serializer.get(serializer) serializer ||= Spark::Serializer.get(config['spark.ruby.serializer']) serializer.new(config['spark.ruby.batch_size']).set(*args) end |
#parallelize(data, num_slices = nil, options = {}) ⇒ Object
Distribute a local Ruby collection to form an RDD Direct method can be slow so be careful, this method update data inplace
Parameters:
- data
-
Range or Array
- num_slices
-
number of slice
- options
-
use
-
serializer
-
batch_size
-
Examples:
$sc.parallelize(["1", "2", "3"]).map(lambda{|x| x.to_i}).collect
#=> [1, 2, 3]
$sc.parallelize(1..3).map(:to_s).collect
#=> ["1", "2", "3"]
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/spark/context.rb', line 179 def parallelize(data, num_slices=nil, ={}) num_slices ||= default_parallelism # use = jruby? ? (options[:use] || :direct) : :file use = :file serializer = get_serializer([:serializer], [:batch_size]) if data.is_a?(Array) && config['spark.ruby.parallelize_strategy'] == 'deep_copy' data = data.deep_copy else # For enumerator or range data = data.to_a end case use when :direct serializer.dump_to_java(data) jrdd = jcontext.parallelize(data, num_slices) when :file file = Tempfile.new('to_parallelize', temp_dir) serializer.dump(data, file) file.close # not unlink jrdd = RubyRDD.readRDDFromFile(jcontext, file.path, num_slices) file.unlink end Spark::RDD.new(jrdd, self, serializer) end |
#run_job(rdd, f, partitions = nil, allow_local = false) ⇒ Object Also known as: runJob
Executes the given partition function f on the specified set of partitions, returning the result as an array of elements.
If partitions is not specified, this will run over all partitions.
Example:
rdd = $sc.parallelize(0..10, 5, batch_size: 1)
$sc.run_job(rdd, lambda{|x| x.to_s}, [0,2])
# => ["[0, 1]", "[4, 5]"]
261 262 263 |
# File 'lib/spark/context.rb', line 261 def run_job(rdd, f, partitions=nil, allow_local=false) run_job_with_command(rdd, partitions, allow_local, Spark::Command::MapPartitions, f) end |
#run_job_with_command(rdd, partitions, allow_local, command, *args) ⇒ Object Also known as: runJobWithCommand
Execute the given command on specific set of partitions.
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
# File 'lib/spark/context.rb', line 267 def run_job_with_command(rdd, partitions, allow_local, command, *args) if !partitions.nil? && !partitions.is_a?(Array) raise Spark::ContextError, 'Partitions must be nil or Array' end partitions_size = rdd.partitions_size # Execute all parts if partitions.nil? partitions = (0...partitions_size).to_a end # Can happend when you use coalesce partitions.delete_if {|part| part >= partitions_size} # Rjb represent Fixnum as Integer but Jruby as Long partitions = to_java_array_list(convert_to_java_int(partitions)) mapped = rdd.new_rdd_from_command(command, *args) iterator = PythonRDD.runJob(rdd.context.sc, mapped.jrdd, partitions, allow_local) mapped.collect_from_iterator(iterator) end |
#sc ⇒ Object
46 47 48 |
# File 'lib/spark/context.rb', line 46 def sc @jcontext.sc end |
#set_call_site(site) ⇒ Object Also known as: setCallSite
Support function for API backtraces.
81 82 83 |
# File 'lib/spark/context.rb', line 81 def set_call_site(site) set_local_property('externalCallSite', site) end |
#set_local_property(key, value) ⇒ Object Also known as: setLocalProperty
Set a local property that affects jobs submitted from this thread, such as the Spark fair scheduler pool.
69 70 71 |
# File 'lib/spark/context.rb', line 69 def set_local_property(key, value) jcontext.setLocalProperty(key, value) end |
#stop ⇒ Object
40 41 42 43 44 |
# File 'lib/spark/context.rb', line 40 def stop Spark::Accumulator::Server.stop log_info('Ruby accumulator server was stopped') @jcontext.stop end |
#text_file(path, min_partitions = nil, options = {}) ⇒ Object Also known as: textFile
Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.
Example:
f = Tempfile.new("test")
f.puts("1")
f.puts("2")
f.close
$sc.text_file(f.path).map(lambda{|x| x.to_i}).collect
# => [1, 2]
220 221 222 223 224 225 |
# File 'lib/spark/context.rb', line 220 def text_file(path, min_partitions=nil, ={}) min_partitions ||= default_parallelism serializer = get_serializer([:serializer], [:batch_size]) Spark::RDD.new(@jcontext.textFile(path, min_partitions), self, serializer, get_serializer('UTF8')) end |
#ui ⇒ Object
50 51 52 |
# File 'lib/spark/context.rb', line 50 def ui sc.ui end |
#whole_text_files(path, min_partitions = nil, options = {}) ⇒ Object Also known as: wholeTextFiles
Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.
Example:
dir = Dir.mktmpdir
f1 = Tempfile.new("test1", dir)
f2 = Tempfile.new("test2", dir)
f1.puts("1"); f1.puts("2");
f2.puts("3"); f2.puts("4");
f1.close
f2.close
$sc.whole_text_files(dir).flat_map(lambda{|key, value| value.split}).collect
# => ["1", "2", "3", "4"]
243 244 245 246 247 248 249 |
# File 'lib/spark/context.rb', line 243 def whole_text_files(path, min_partitions=nil, ={}) min_partitions ||= default_parallelism serializer = get_serializer([:serializer], [:batch_size]) deserializer = get_serializer('Pair', get_serializer('UTF8'), get_serializer('UTF8')) Spark::RDD.new(@jcontext.wholeTextFiles(path, min_partitions), self, serializer, deserializer) end |