Class: Spark::Context

Inherits:
Object
  • Object
show all
Includes:
Helper::Logger, Helper::Parser, Helper::System
Defined in:
lib/spark/context.rb

Overview

Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helper::Logger

included

Methods included from Helper::Parser

included

Methods included from Helper::System

included

Constructor Details

#initializeContext

Constructor for Ruby context. Configuration is automatically is taken from Spark. Config will be automatically set to default if user start context first.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/spark/context.rb', line 20

def initialize
  Spark.config.valid!
  @jcontext = JavaSparkContext.new(Spark.config.spark_conf)
  @jcontext.addJar(Spark.ruby_spark_jar)

  # Does not work on 1.2
  # ui.attachTab(RubyTab.new(ui, to_java_hash(RbConfig::CONFIG)))

  spark_local_dir = JUtils.getLocalDir(sc.conf)
  @temp_dir = JUtils.createTempDir(spark_local_dir, 'ruby').getAbsolutePath

  accum_server = Spark::Accumulator::Server
  accum_server.start
  @jaccumulator = @jcontext.accumulator(ArrayList.new, RubyAccumulatorParam.new(accum_server.host, accum_server.port))

  log_info("Ruby accumulator server is running on port #{accum_server.port}")

  set_call_site('Ruby') # description of stage
end

Instance Attribute Details

#jaccumulatorObject (readonly)

Returns the value of attribute jaccumulator.



14
15
16
# File 'lib/spark/context.rb', line 14

def jaccumulator
  @jaccumulator
end

#jcontextObject (readonly)

Returns the value of attribute jcontext.



14
15
16
# File 'lib/spark/context.rb', line 14

def jcontext
  @jcontext
end

#temp_dirObject (readonly)

Returns the value of attribute temp_dir.



14
15
16
# File 'lib/spark/context.rb', line 14

def temp_dir
  @temp_dir
end

Instance Method Details

#accumulator(value, accum_param = :+, zero_value = 0) ⇒ Object

Create an Accumulator with the given initial value, using a given accum_param helper object to define how to add values of the data type if provided.

Example:

accum = $sc.accumulator(7)

rdd = $sc.parallelize(0..5, 4)
rdd = rdd.bind(accum: accum)
rdd = rdd.map_partitions(lambda{|_| accum.add(1) })
rdd = rdd.collect

accum.value
# => 11


157
158
159
# File 'lib/spark/context.rb', line 157

def accumulator(value, accum_param=:+, zero_value=0)
  Spark::Accumulator.new(value, accum_param, zero_value)
end

#add_file(*files) ⇒ Object Also known as: addFile

Add a file to be downloaded with this Spark job on every node. The path of file passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI.

To access the file in Spark jobs, use ‘SparkFiles.get(file_name)` with the filename to find its download location.

Example:

`echo 10 > test.txt`

$sc.add_file('test.txt')
$sc.parallelize(0..5).map(lambda{|x| x * SparkFiles.get_content('test.txt').to_i}).collect
# => [0, 10, 20, 30, 40, 50]


118
119
120
121
122
# File 'lib/spark/context.rb', line 118

def add_file(*files)
  files.each do |file|
    sc.addFile(file)
  end
end

#broadcast(value) ⇒ Object

Broadcast a read-only variable to the cluster, returning a Spark::Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.

Example:

broadcast1 = $sc.broadcast('a')
broadcast2 = $sc.broadcast('b')

rdd = $sc.parallelize(0..5, 4)
rdd = rdd.bind(broadcast1: broadcast1, broadcast2: broadcast2)
rdd = rdd.map_partitions_with_index(lambda{|part, index| [broadcast1.value * index, broadcast2.value * index] })
rdd.collect
# => ["", "", "a", "b", "aa", "bb", "aaa", "bbb"]


138
139
140
# File 'lib/spark/context.rb', line 138

def broadcast(value)
  Spark::Broadcast.new(self, value)
end

#config(key = nil) ⇒ Object

Return a copy of this SparkContext’s configuration. The configuration cannot be changed at runtime.



95
96
97
98
99
100
101
102
# File 'lib/spark/context.rb', line 95

def config(key=nil)
  # if key
  #   Spark.config[key]
  # else
  #   Spark.config.get_all
  # end
  Spark.config
end

#default_parallelismObject Also known as: defaultParallelism

Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD)



56
57
58
# File 'lib/spark/context.rb', line 56

def default_parallelism
  sc.defaultParallelism
end

#get_call_siteObject Also known as: getCallSite

Capture the current user callsite and return a formatted version for printing. If the user has overridden the call site, this will return the user’s version.



88
89
90
# File 'lib/spark/context.rb', line 88

def get_call_site
  jcontext.getCallSite
end

#get_local_property(key) ⇒ Object Also known as: getLocalProperty

Get a local property set in this thread, or null if it is missing



75
76
77
# File 'lib/spark/context.rb', line 75

def get_local_property(key)
  jcontext.getLocalProperty(key)
end

#get_serializer(serializer, *args) ⇒ Object



60
61
62
63
64
# File 'lib/spark/context.rb', line 60

def get_serializer(serializer, *args)
  serializer   = Spark::Serializer.get(serializer)
  serializer ||= Spark::Serializer.get(config['spark.ruby.serializer'])
  serializer.new(config['spark.ruby.batch_size']).set(*args)
end

#parallelize(data, num_slices = nil, options = {}) ⇒ Object

Distribute a local Ruby collection to form an RDD Direct method can be slow so be careful, this method update data inplace

Parameters:

data

Range or Array

num_slices

number of slice

options
  • use

  • serializer

  • batch_size

Examples:

$sc.parallelize(["1", "2", "3"]).map(lambda{|x| x.to_i}).collect
#=> [1, 2, 3]

$sc.parallelize(1..3).map(:to_s).collect
#=> ["1", "2", "3"]


179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/spark/context.rb', line 179

def parallelize(data, num_slices=nil, options={})
  num_slices ||= default_parallelism

  # use = jruby? ? (options[:use] || :direct) : :file
  use = :file
  serializer = get_serializer(options[:serializer], options[:batch_size])

  if data.is_a?(Array) && config['spark.ruby.parallelize_strategy'] == 'deep_copy'
    data = data.deep_copy
  else
    # For enumerator or range
    data = data.to_a
  end

  case use
  when :direct
    serializer.dump_to_java(data)
    jrdd = jcontext.parallelize(data, num_slices)
  when :file
    file = Tempfile.new('to_parallelize', temp_dir)
    serializer.dump(data, file)
    file.close # not unlink
    jrdd = RubyRDD.readRDDFromFile(jcontext, file.path, num_slices)
    file.unlink
  end

  Spark::RDD.new(jrdd, self, serializer)
end

#run_job(rdd, f, partitions = nil, allow_local = false) ⇒ Object Also known as: runJob

Executes the given partition function f on the specified set of partitions, returning the result as an array of elements.

If partitions is not specified, this will run over all partitions.

Example:

rdd = $sc.parallelize(0..10, 5, batch_size: 1)
$sc.run_job(rdd, lambda{|x| x.to_s}, [0,2])
# => ["[0, 1]", "[4, 5]"]


261
262
263
# File 'lib/spark/context.rb', line 261

def run_job(rdd, f, partitions=nil, allow_local=false)
  run_job_with_command(rdd, partitions, allow_local, Spark::Command::MapPartitions, f)
end

#run_job_with_command(rdd, partitions, allow_local, command, *args) ⇒ Object Also known as: runJobWithCommand

Execute the given command on specific set of partitions.



267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/spark/context.rb', line 267

def run_job_with_command(rdd, partitions, allow_local, command, *args)
  if !partitions.nil? && !partitions.is_a?(Array)
    raise Spark::ContextError, 'Partitions must be nil or Array'
  end

  partitions_size = rdd.partitions_size

  # Execute all parts
  if partitions.nil?
    partitions = (0...partitions_size).to_a
  end

  # Can happend when you use coalesce
  partitions.delete_if {|part| part >= partitions_size}

  # Rjb represent Fixnum as Integer but Jruby as Long
  partitions = to_java_array_list(convert_to_java_int(partitions))

  mapped = rdd.new_rdd_from_command(command, *args)
  iterator = PythonRDD.runJob(rdd.context.sc, mapped.jrdd, partitions, allow_local)
  mapped.collect_from_iterator(iterator)
end

#scObject



46
47
48
# File 'lib/spark/context.rb', line 46

def sc
  @jcontext.sc
end

#set_call_site(site) ⇒ Object Also known as: setCallSite

Support function for API backtraces.



81
82
83
# File 'lib/spark/context.rb', line 81

def set_call_site(site)
  set_local_property('externalCallSite', site)
end

#set_local_property(key, value) ⇒ Object Also known as: setLocalProperty

Set a local property that affects jobs submitted from this thread, such as the Spark fair scheduler pool.



69
70
71
# File 'lib/spark/context.rb', line 69

def set_local_property(key, value)
  jcontext.setLocalProperty(key, value)
end

#stopObject



40
41
42
43
44
# File 'lib/spark/context.rb', line 40

def stop
  Spark::Accumulator::Server.stop
  log_info('Ruby accumulator server was stopped')
  @jcontext.stop
end

#text_file(path, min_partitions = nil, options = {}) ⇒ Object Also known as: textFile

Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.

Example:

f = Tempfile.new("test")
f.puts("1")
f.puts("2")
f.close

$sc.text_file(f.path).map(lambda{|x| x.to_i}).collect
# => [1, 2]


220
221
222
223
224
225
# File 'lib/spark/context.rb', line 220

def text_file(path, min_partitions=nil, options={})
  min_partitions ||= default_parallelism
  serializer = get_serializer(options[:serializer], options[:batch_size])

  Spark::RDD.new(@jcontext.textFile(path, min_partitions), self, serializer, get_serializer('UTF8'))
end

#uiObject



50
51
52
# File 'lib/spark/context.rb', line 50

def ui
  sc.ui
end

#whole_text_files(path, min_partitions = nil, options = {}) ⇒ Object Also known as: wholeTextFiles

Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.

Example:

dir = Dir.mktmpdir
f1 = Tempfile.new("test1", dir)
f2 = Tempfile.new("test2", dir)
f1.puts("1"); f1.puts("2");
f2.puts("3"); f2.puts("4");
f1.close
f2.close

$sc.whole_text_files(dir).flat_map(lambda{|key, value| value.split}).collect
# => ["1", "2", "3", "4"]


243
244
245
246
247
248
249
# File 'lib/spark/context.rb', line 243

def whole_text_files(path, min_partitions=nil, options={})
  min_partitions ||= default_parallelism
  serializer = get_serializer(options[:serializer], options[:batch_size])
  deserializer = get_serializer('Pair', get_serializer('UTF8'), get_serializer('UTF8'))

  Spark::RDD.new(@jcontext.wholeTextFiles(path, min_partitions), self, serializer, deserializer)
end