Class: Spark::Config

Inherits:
Object
  • Object
show all
Includes:
Helper::System
Defined in:
lib/spark/config.rb

Overview

Common configuration for RubySpark and Spark

Constant Summary collapse

TYPES =
{
  'spark.shuffle.spill' => :boolean,
  'spark.ruby.serializer.compress' => :boolean
}

Instance Method Summary collapse

Methods included from Helper::System

included

Constructor Details

#initializeConfig

Initialize java SparkConf and load default configuration.



16
17
18
19
# File 'lib/spark/config.rb', line 16

def initialize
  @spark_conf = SparkConf.new(true)
  set_default
end

Instance Method Details

#[](key) ⇒ Object



30
31
32
# File 'lib/spark/config.rb', line 30

def [](key)
  get(key)
end

#[]=(key, value) ⇒ Object



34
35
36
# File 'lib/spark/config.rb', line 34

def []=(key, value)
  set(key, value)
end

#contains?(key) ⇒ Boolean

Returns:

  • (Boolean)


103
104
105
# File 'lib/spark/config.rb', line 103

def contains?(key)
  spark_conf.contains(key.to_s)
end

#default_executor_commandObject

Command template which is applied when scala want create a ruby process (e.g. master, home request). Command is represented by ‘%s’.

Example:

bash --norc -i -c "export HOME=/home/user; cd; source .bashrc; %s"


183
184
185
# File 'lib/spark/config.rb', line 183

def default_executor_command
  ENV['SPARK_RUBY_EXECUTOR_COMMAND'] || '%s'
end

#default_executor_optionsObject

Options for every worker.

Examples:

-J-Xmx512m


192
193
194
# File 'lib/spark/config.rb', line 192

def default_executor_options
  ENV['SPARK_RUBY_EXECUTOR_OPTIONS'] || ''
end

#default_executor_uriObject

Ruby executor.

Options:

nil

System’s gem is loaded (ruby-spark).

other

Path of library which will be used. Current ruby-spark gem is used. (default)



173
174
175
# File 'lib/spark/config.rb', line 173

def default_executor_uri
  ENV['SPARK_RUBY_EXECUTOR_URI'] || ''
end

#default_serializerObject



150
151
152
# File 'lib/spark/config.rb', line 150

def default_serializer
  ENV['SPARK_RUBY_SERIALIZER'] || Spark::Serializer::DEFAULT_SERIALIZER_NAME
end

#default_serializer_batch_sizeObject



158
159
160
# File 'lib/spark/config.rb', line 158

def default_serializer_batch_size
  ENV['SPARK_RUBY_SERIALIZER_BATCH_SIZE'] || Spark::Serializer::DEFAULT_BATCH_SIZE
end

#default_serializer_compressObject



154
155
156
# File 'lib/spark/config.rb', line 154

def default_serializer_compress
  ENV['SPARK_RUBY_SERIALIZER_COMPRESS'] || Spark::Serializer::DEFAULT_COMPRESS
end

#default_worker_typeObject

Type of worker.

Options:

process

(default)

thread

(experimental)



202
203
204
# File 'lib/spark/config.rb', line 202

def default_worker_type
  ENV['SPARK_RUBY_WORKER_TYPE'] || 'process'
end

#from_file(file) ⇒ Object



21
22
23
24
25
26
27
28
# File 'lib/spark/config.rb', line 21

def from_file(file)
  check_read_only

  if file && File.exist?(file)
    file = File.expand_path(file)
    RubyUtils.loadPropertiesFile(spark_conf, file)
  end
end

#get(key) ⇒ Object

Rescue from NoSuchElementException



84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/spark/config.rb', line 84

def get(key)
  value = spark_conf.get(key.to_s)

  case TYPES[key]
  when :boolean
    parse_boolean(value)
  when :integer
    parse_integer(value)
  else
    value
  end
rescue
  nil
end

#get_allObject Also known as: getAll



99
100
101
# File 'lib/spark/config.rb', line 99

def get_all
  Hash[spark_conf.getAll.map{|tuple| [tuple._1, tuple._2]}]
end

#load_executor_envsObject

Load environment variables for executor from ENV.

Examples:

SPARK_RUBY_EXECUTOR_ENV_KEY1="1"
SPARK_RUBY_EXECUTOR_ENV_KEY2="2"


212
213
214
215
216
217
218
219
220
221
222
# File 'lib/spark/config.rb', line 212

def load_executor_envs
  prefix = 'SPARK_RUBY_EXECUTOR_ENV_'

  envs = ENV.select{|key, _| key.start_with?(prefix)}
  envs.each do |key, value|
    key = key.dup # ENV keys are frozen
    key.slice!(0, prefix.size)

    set("spark.ruby.executor.env.#{key}", value)
  end
end

#parse_boolean(value) ⇒ Object



120
121
122
123
124
125
126
127
# File 'lib/spark/config.rb', line 120

def parse_boolean(value)
  case value
  when 'true'
    true
  when 'false'
    false
  end
end

#parse_integer(value) ⇒ Object



129
130
131
# File 'lib/spark/config.rb', line 129

def parse_integer(value)
  value.to_i
end

#read_only?Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/spark/config.rb', line 79

def read_only?
  Spark.started?
end

#set(key, value) ⇒ Object



107
108
109
110
# File 'lib/spark/config.rb', line 107

def set(key, value)
  check_read_only
  spark_conf.set(key.to_s, value.to_s)
end

#set_app_name(name) ⇒ Object Also known as: setAppName



112
113
114
# File 'lib/spark/config.rb', line 112

def set_app_name(name)
  set('spark.app.name', name)
end

#set_defaultObject

Defaults



136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/spark/config.rb', line 136

def set_default
  set_app_name('RubySpark')
  set_master('local[*]')
  set('spark.ruby.driver_home', Spark.home)
  set('spark.ruby.serializer', default_serializer)
  set('spark.ruby.serializer.compress', default_serializer_compress)
  set('spark.ruby.serializer.batch_size', default_serializer_batch_size)
  set('spark.ruby.executor.uri', default_executor_uri)
  set('spark.ruby.executor.command', default_executor_command)
  set('spark.ruby.executor.options', default_executor_options)
  set('spark.ruby.worker.type', default_worker_type)
  load_executor_envs
end

#set_master(master) ⇒ Object Also known as: setMaster



116
117
118
# File 'lib/spark/config.rb', line 116

def set_master(master)
  set('spark.master', master)
end

#spark_confObject



38
39
40
41
42
43
44
45
# File 'lib/spark/config.rb', line 38

def spark_conf
  if Spark.started?
    # Get latest configuration
    Spark.context.jcontext.conf
  else
    @spark_conf
  end
end

#valid!Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/spark/config.rb', line 47

def valid!
  errors = []

  if !contains?('spark.app.name')
    errors << 'An application name must be set in your configuration.'
  end

  if !contains?('spark.master')
    errors << 'A master URL must be set in your configuration.'
  end

  if Spark::Serializer.find(get('spark.ruby.serializer')).nil?
    errors << 'Unknow serializer.'
  end

  scanned = get('spark.ruby.executor.command').scan('%s')

  if scanned.size == 0
    errors << "Executor command must contain '%s'."
  end

  if scanned.size > 1
    errors << "Executor command can contain only one '%s'."
  end

  if errors.any?
    errors.map!{|error| "- #{error}"}

    raise Spark::ConfigurationError, "Configuration is not valid:\r\n#{errors.join("\r\n")}"
  end
end