Class: Spark::Config

Inherits:
Object
  • Object
show all
Includes:
Helper::System
Defined in:
lib/spark/config.rb

Overview

Common configuration for RubySpark and Spark

Constant Summary collapse

TYPES =
{
  'spark.shuffle.spill' => :boolean,
  'spark.ruby.batch_size' => :integer
}

Instance Method Summary collapse

Methods included from Helper::System

included

Constructor Details

#initializeConfig

Initialize java SparkConf and load default configuration.



16
17
18
19
# File 'lib/spark/config.rb', line 16

def initialize
  @spark_conf = SparkConf.new(true)
  set_default
end

Instance Method Details

#[](key) ⇒ Object



30
31
32
# File 'lib/spark/config.rb', line 30

def [](key)
  get(key)
end

#[]=(key, value) ⇒ Object



34
35
36
# File 'lib/spark/config.rb', line 34

def []=(key, value)
  set(key, value)
end

#contains?(key) ⇒ Boolean

Returns:

  • (Boolean)


103
104
105
# File 'lib/spark/config.rb', line 103

def contains?(key)
  spark_conf.contains(key.to_s)
end

#default_batch_sizeObject



164
165
166
# File 'lib/spark/config.rb', line 164

def default_batch_size
  ENV['SPARK_RUBY_BATCH_SIZE'] || Spark::Serializer::DEFAULT_BATCH_SIZE.to_s
end

#default_executor_commandObject

Command template which is applied when scala want create a ruby process (e.g. master, home request). Command is represented by ‘%s’.

Example:

bash --norc -i -c "export HOME=/home/user; cd; source .bashrc; %s"


189
190
191
# File 'lib/spark/config.rb', line 189

def default_executor_command
  ENV['SPARK_RUBY_EXECUTOR_COMMAND'] || '%s'
end

#default_executor_optionsObject

Options for every worker.

Examples:

-J-Xmx512m


198
199
200
# File 'lib/spark/config.rb', line 198

def default_executor_options
  ENV['SPARK_RUBY_EXECUTOR_OPTIONS'] || ''
end

#default_executor_uriObject

Ruby executor.

Options:

nil

System’s gem is loaded (ruby-spark).

other

Path of library which will be used. Current ruby-spark gem is used. (default)



179
180
181
# File 'lib/spark/config.rb', line 179

def default_executor_uri
  ENV['SPARK_RUBY_EXECUTOR_URI'] || ''
end

#default_parallelize_strategyObject

How to handle with data in method parallelize.

Possible options:

inplace

data are changed directly to save memory

deep_copy

data are cloned fist



156
157
158
# File 'lib/spark/config.rb', line 156

def default_parallelize_strategy
  ENV['SPARK_RUBY_PARALLELIZE_STRATEGY'] || 'inplace'
end

#default_serializerObject



160
161
162
# File 'lib/spark/config.rb', line 160

def default_serializer
  ENV['SPARK_RUBY_SERIALIZER'] || Spark::Serializer::DEFAULT_SERIALIZER_NAME
end

#default_worker_typeObject

Type of worker.

Options:

process

(default)

thread

(experimental)



208
209
210
# File 'lib/spark/config.rb', line 208

def default_worker_type
  ENV['SPARK_RUBY_WORKER_TYPE'] || 'process'
end

#from_file(file) ⇒ Object



21
22
23
24
25
26
27
28
# File 'lib/spark/config.rb', line 21

def from_file(file)
  check_read_only

  if file && File.exist?(file)
    file = File.expand_path(file)
    RubyUtils.loadPropertiesFile(spark_conf, file)
  end
end

#get(key) ⇒ Object

Rescue from NoSuchElementException



84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/spark/config.rb', line 84

def get(key)
  value = spark_conf.get(key.to_s)

  case TYPES[key]
  when :boolean
    parse_boolean(value)
  when :integer
    parse_integer(value)
  else
    value
  end
rescue
  nil
end

#get_allObject Also known as: getAll



99
100
101
# File 'lib/spark/config.rb', line 99

def get_all
  Hash[spark_conf.getAll.map{|tuple| [tuple._1, tuple._2]}]
end

#load_executor_envsObject

Load environment variables for executor from ENV.

Examples:

SPARK_RUBY_EXECUTOR_ENV_KEY1="1"
SPARK_RUBY_EXECUTOR_ENV_KEY2="2"


218
219
220
221
222
223
224
225
226
227
228
# File 'lib/spark/config.rb', line 218

def load_executor_envs
  prefix = 'SPARK_RUBY_EXECUTOR_ENV_'

  envs = ENV.select{|key, _| key.start_with?(prefix)}
  envs.each do |key, value|
    key = key.dup # ENV keys are frozen
    key.slice!(0, prefix.size)

    set("spark.ruby.executor.env.#{key}", value)
  end
end

#parse_boolean(value) ⇒ Object



120
121
122
123
124
125
126
127
# File 'lib/spark/config.rb', line 120

def parse_boolean(value)
  case value
  when 'true'
    true
  when 'false'
    false
  end
end

#parse_integer(value) ⇒ Object



129
130
131
# File 'lib/spark/config.rb', line 129

def parse_integer(value)
  value.to_i
end

#read_only?Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/spark/config.rb', line 79

def read_only?
  Spark.started?
end

#set(key, value) ⇒ Object



107
108
109
110
# File 'lib/spark/config.rb', line 107

def set(key, value)
  check_read_only
  spark_conf.set(key.to_s, value.to_s)
end

#set_app_name(name) ⇒ Object Also known as: setAppName



112
113
114
# File 'lib/spark/config.rb', line 112

def set_app_name(name)
  set('spark.app.name', name)
end

#set_defaultObject

Defaults



136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/spark/config.rb', line 136

def set_default
  set_app_name('RubySpark')
  set_master('local[*]')
  set('spark.ruby.driver_home', Spark.home)
  set('spark.ruby.parallelize_strategy', default_parallelize_strategy)
  set('spark.ruby.serializer', default_serializer)
  set('spark.ruby.batch_size', default_batch_size)
  set('spark.ruby.executor.uri', default_executor_uri)
  set('spark.ruby.executor.command', default_executor_command)
  set('spark.ruby.executor.options', default_executor_options)
  set('spark.ruby.worker.type', default_worker_type)
  load_executor_envs
end

#set_master(master) ⇒ Object Also known as: setMaster



116
117
118
# File 'lib/spark/config.rb', line 116

def set_master(master)
  set('spark.master', master)
end

#spark_confObject



38
39
40
41
42
43
44
45
# File 'lib/spark/config.rb', line 38

def spark_conf
  if Spark.started?
    # Get latest configuration
    Spark.context.jcontext.conf
  else
    @spark_conf
  end
end

#valid!Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/spark/config.rb', line 47

def valid!
  errors = []

  if !contains?('spark.app.name')
    errors << 'An application name must be set in your configuration.'
  end

  if !contains?('spark.master')
    errors << 'A master URL must be set in your configuration.'
  end

  if Spark::Serializer.get(get('spark.ruby.serializer')).nil?
    errors << 'Default serializer must be set in your configuration.'
  end

  scanned = get('spark.ruby.executor.command').scan('%s')

  if scanned.size == 0
    errors << "Executor command must contain '%s'."
  end

  if scanned.size > 1
    errors << "Executor command can contain only one '%s'."
  end

  if errors.any?
    errors.map!{|error| "- #{error}"}

    raise Spark::ConfigurationError, "Configuration is not valid:\r\n#{errors.join("\r\n")}"
  end
end