Class: Spark::Context

Inherits:
Object
  • Object
show all
Includes:
Helper::Logger, Helper::Parser, Helper::System
Defined in:
lib/spark/context.rb

Overview

Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helper::Logger

included

Methods included from Helper::Parser

included

Methods included from Helper::System

included

Constructor Details

#initializeContext

Constructor for Ruby context. Configuration is automatically is taken from Spark. Config will be automatically set to default if user start context first.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/spark/context.rb', line 21

def initialize
  Spark.config.valid!
  @jcontext = JavaSparkContext.new(Spark.config.spark_conf)
  @jcontext.addJar(Spark.ruby_spark_jar)

  # Does not work on 1.2
  # ui.attachTab(RubyTab.new(ui, to_java_hash(RbConfig::CONFIG)))

  spark_local_dir = JUtils.getLocalDir(sc.conf)
  @temp_dir = JUtils.createTempDir(spark_local_dir, 'ruby').getAbsolutePath

  accum_server = Spark::Accumulator::Server
  accum_server.start
  @jaccumulator = @jcontext.accumulator(ArrayList.new, RubyAccumulatorParam.new(accum_server.host, accum_server.port))

  log_info("Ruby accumulator server is running on port #{accum_server.port}")

  set_call_site('Ruby') # description of stage
end

Instance Attribute Details

#jaccumulatorObject (readonly)

Returns the value of attribute jaccumulator.



15
16
17
# File 'lib/spark/context.rb', line 15

def jaccumulator
  @jaccumulator
end

#jcontextObject (readonly)

Returns the value of attribute jcontext.



15
16
17
# File 'lib/spark/context.rb', line 15

def jcontext
  @jcontext
end

#temp_dirObject (readonly)

Returns the value of attribute temp_dir.



15
16
17
# File 'lib/spark/context.rb', line 15

def temp_dir
  @temp_dir
end

Instance Method Details

#accumulator(value, accum_param = :+, zero_value = 0) ⇒ Object

Create an Accumulator with the given initial value, using a given accum_param helper object to define how to add values of the data type if provided.

Example:

accum = $sc.accumulator(7)

rdd = $sc.parallelize(0..5, 4)
rdd = rdd.bind(accum: accum)
rdd = rdd.map_partitions(lambda{|_| accum.add(1) })
rdd = rdd.collect

accum.value
# => 11


185
186
187
# File 'lib/spark/context.rb', line 185

def accumulator(value, accum_param=:+, zero_value=0)
  Spark::Accumulator.new(value, accum_param, zero_value)
end

#add_file(*files) ⇒ Object Also known as: addFile

Add a file to be downloaded with this Spark job on every node. The path of file passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI.

To access the file in Spark jobs, use ‘SparkFiles.get(file_name)` with the filename to find its download location.

Example:

`echo 10 > test.txt`

$sc.add_file('test.txt')
$sc.parallelize(0..5).map(lambda{|x| x * SparkFiles.get_content('test.txt').to_i}).collect
# => [0, 10, 20, 30, 40, 50]


146
147
148
149
150
# File 'lib/spark/context.rb', line 146

def add_file(*files)
  files.each do |file|
    sc.addFile(file)
  end
end

#broadcast(value) ⇒ Object

Broadcast a read-only variable to the cluster, returning a Spark::Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.

Example:

broadcast1 = $sc.broadcast('a')
broadcast2 = $sc.broadcast('b')

rdd = $sc.parallelize(0..5, 4)
rdd = rdd.bind(broadcast1: broadcast1, broadcast2: broadcast2)
rdd = rdd.map_partitions_with_index(lambda{|part, index| [broadcast1.value * index, broadcast2.value * index] })
rdd.collect
# => ["", "", "a", "b", "aa", "bb", "aaa", "bbb"]


166
167
168
# File 'lib/spark/context.rb', line 166

def broadcast(value)
  Spark::Broadcast.new(self, value)
end

#config(key = nil) ⇒ Object

Return a copy of this SparkContext’s configuration. The configuration cannot be changed at runtime.



124
125
126
127
128
129
130
# File 'lib/spark/context.rb', line 124

def config(key=nil)
  if key
    Spark.config.get(key)
  else
    Spark.config
  end
end

#default_batch_sizeObject



86
87
88
89
90
91
92
93
# File 'lib/spark/context.rb', line 86

def default_batch_size
  size = config('spark.ruby.serializer.batch_size').to_i
  if size >= 1
    size
  else
    'auto'
  end
end

#default_parallelismObject Also known as: defaultParallelism

Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD)



57
58
59
# File 'lib/spark/context.rb', line 57

def default_parallelism
  sc.defaultParallelism
end

#default_serializerObject

Default serializer

Batch -> Compress -> Basic



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/spark/context.rb', line 65

def default_serializer
  # Basic
  serializer = Spark::Serializer.find!(config('spark.ruby.serializer')).new

  # Compress
  if config('spark.ruby.serializer.compress')
    serializer = Spark::Serializer.compressed(serializer)
  end

  # Bactching
  batch_size = default_batch_size
  if batch_size == 'auto'
    serializer = Spark::Serializer.auto_batched(serializer)
  else
    serializer = Spark::Serializer.batched(serializer, batch_size)
  end

  # Finally, "container" contains serializers
  serializer
end

#get_call_siteObject Also known as: getCallSite

Capture the current user callsite and return a formatted version for printing. If the user has overridden the call site, this will return the user’s version.



117
118
119
# File 'lib/spark/context.rb', line 117

def get_call_site
  jcontext.getCallSite
end

#get_local_property(key) ⇒ Object Also known as: getLocalProperty

Get a local property set in this thread, or null if it is missing



104
105
106
# File 'lib/spark/context.rb', line 104

def get_local_property(key)
  jcontext.getLocalProperty(key)
end

#parallelize(data, num_slices = nil, serializer = nil) ⇒ Object

Distribute a local Ruby collection to form an RDD Direct method can be slow so be careful, this method update data inplace

Parameters:

data

Range or Array

num_slices

number of slice

serializer

custom serializer (default: serializer based on configuration)

Examples:

$sc.parallelize(["1", "2", "3"]).map(lambda{|x| x.to_i}).collect
#=> [1, 2, 3]

$sc.parallelize(1..3).map(:to_s).collect
#=> ["1", "2", "3"]


204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/spark/context.rb', line 204

def parallelize(data, num_slices=nil, serializer=nil)
  num_slices ||= default_parallelism
  serializer ||= default_serializer

  serializer.check_each(data)

  # Through file
  file = Tempfile.new('to_parallelize', temp_dir)
  serializer.dump_to_io(data, file)
  file.close # not unlink
  jrdd = RubyRDD.readRDDFromFile(jcontext, file.path, num_slices)

  Spark::RDD.new(jrdd, self, serializer)
ensure
  file && file.unlink
end

#run_job(rdd, f, partitions = nil, allow_local = false) ⇒ Object Also known as: runJob

Executes the given partition function f on the specified set of partitions, returning the result as an array of elements.

If partitions is not specified, this will run over all partitions.

Example:

rdd = $sc.parallelize(0..10, 5)
$sc.run_job(rdd, lambda{|x| x.to_s}, [0,2])
# => ["[0, 1]", "[4, 5]"]


275
276
277
# File 'lib/spark/context.rb', line 275

def run_job(rdd, f, partitions=nil, allow_local=false)
  run_job_with_command(rdd, partitions, allow_local, Spark::Command::MapPartitions, f)
end

#run_job_with_command(rdd, partitions, allow_local, command, *args) ⇒ Object Also known as: runJobWithCommand

Execute the given command on specific set of partitions.



281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# File 'lib/spark/context.rb', line 281

def run_job_with_command(rdd, partitions, allow_local, command, *args)
  if !partitions.nil? && !partitions.is_a?(Array)
    raise Spark::ContextError, 'Partitions must be nil or Array'
  end

  partitions_size = rdd.partitions_size

  # Execute all parts
  if partitions.nil?
    partitions = (0...partitions_size).to_a
  end

  # Can happend when you use coalesce
  partitions.delete_if {|part| part >= partitions_size}

  # Rjb represent Fixnum as Integer but Jruby as Long
  partitions = to_java_array_list(convert_to_java_int(partitions))

  # File for result
  file = Tempfile.new('collect', temp_dir)

  mapped = rdd.new_rdd_from_command(command, *args)
  RubyRDD.runJob(rdd.context.sc, mapped.jrdd, partitions, allow_local, file.path)

  mapped.collect_from_file(file)
end

#scObject



47
48
49
# File 'lib/spark/context.rb', line 47

def sc
  @jcontext.sc
end

#set_call_site(site) ⇒ Object Also known as: setCallSite

Support function for API backtraces.



110
111
112
# File 'lib/spark/context.rb', line 110

def set_call_site(site)
  set_local_property('externalCallSite', site)
end

#set_local_property(key, value) ⇒ Object Also known as: setLocalProperty

Set a local property that affects jobs submitted from this thread, such as the Spark fair scheduler pool.



98
99
100
# File 'lib/spark/context.rb', line 98

def set_local_property(key, value)
  jcontext.setLocalProperty(key, value)
end

#stopObject



41
42
43
44
45
# File 'lib/spark/context.rb', line 41

def stop
  Spark::Accumulator::Server.stop
  log_info('Ruby accumulator server was stopped')
  @jcontext.stop
end

#text_file(path, min_partitions = nil, encoding = Encoding::UTF_8, serializer = nil) ⇒ Object Also known as: textFile

Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.

Example:

f = Tempfile.new("test")
f.puts("1")
f.puts("2")
f.close

$sc.text_file(f.path).map(lambda{|x| x.to_i}).collect
# => [1, 2]


233
234
235
236
237
238
239
# File 'lib/spark/context.rb', line 233

def text_file(path, min_partitions=nil, encoding=Encoding::UTF_8, serializer=nil)
  min_partitions ||= default_parallelism
  serializer     ||= default_serializer
  deserializer     = Spark::Serializer.build { __text__(encoding) }

  Spark::RDD.new(@jcontext.textFile(path, min_partitions), self, serializer, deserializer)
end

#uiObject



51
52
53
# File 'lib/spark/context.rb', line 51

def ui
  sc.ui
end

#whole_text_files(path, min_partitions = nil, serializer = nil) ⇒ Object Also known as: wholeTextFiles

Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.

Example:

dir = Dir.mktmpdir
f1 = Tempfile.new("test1", dir)
f2 = Tempfile.new("test2", dir)
f1.puts("1"); f1.puts("2");
f2.puts("3"); f2.puts("4");
f1.close
f2.close

$sc.whole_text_files(dir).flat_map(lambda{|key, value| value.split}).collect
# => ["1", "2", "3", "4"]


257
258
259
260
261
262
263
# File 'lib/spark/context.rb', line 257

def whole_text_files(path, min_partitions=nil, serializer=nil)
  min_partitions ||= default_parallelism
  serializer     ||= default_serializer
  deserializer     = Spark::Serializer.build{ __pair__(__text__, __text__) }

  Spark::RDD.new(@jcontext.wholeTextFiles(path, min_partitions), self, serializer, deserializer)
end