Class: Spark::Serializer::Base Abstract

Inherits:
Object
  • Object
show all
Includes:
Constant, Helper::Serialize
Defined in:
lib/spark/serializer/base.rb

Overview

This class is abstract.

Parent for all type of serializers

Direct Known Subclasses

Cartesian, Marshal, Pair, UTF8

Constant Summary

Constants included from Constant

Constant::ACCUMULATOR_ACK, Constant::CREATE_WORKER, Constant::DATA_EOF, Constant::KILL_WORKER, Constant::KILL_WORKER_AND_WAIT, Constant::SUCCESSFULLY_KILLED, Constant::UNSUCCESSFUL_KILLING, Constant::WORKER_DONE, Constant::WORKER_ERROR

Constants included from Helper::Serialize

Helper::Serialize::DIRECTIVE_CHARS, Helper::Serialize::DIRECTIVE_DOUBLES_BIG_ENDIAN, Helper::Serialize::DIRECTIVE_DOUBLE_BIG_ENDIAN, Helper::Serialize::DIRECTIVE_INTEGERS_BIG_ENDIAN, Helper::Serialize::DIRECTIVE_INTEGER_BIG_ENDIAN, Helper::Serialize::DIRECTIVE_LONGS_BIG_ENDIAN, Helper::Serialize::DIRECTIVE_LONG_BIG_ENDIAN, Helper::Serialize::DIRECTIVE_UNSIGNED_CHARS

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helper::Serialize

#pack_double, #pack_doubles, #pack_int, #pack_ints, #pack_long, #pack_longs, #pack_unsigned_chars, #unpack_chars, #unpack_int, #unpack_long

Constructor Details

#initialize(batch_size = nil) ⇒ Base

Set default values



12
13
14
# File 'lib/spark/serializer/base.rb', line 12

def initialize(batch_size=nil)
  self.batch_size = batch_size
end

Instance Attribute Details

#batch_sizeObject

Returns the value of attribute batch_size.



9
10
11
# File 'lib/spark/serializer/base.rb', line 9

def batch_size
  @batch_size
end

Instance Method Details

#==(other) ⇒ Object



16
17
18
# File 'lib/spark/serializer/base.rb', line 16

def ==(other)
  self.class == other.class && self.batch_size == other.batch_size
end

#batched?Boolean

nil, 0, 1 are considered as non-batched

Returns:

  • (Boolean)


35
36
37
# File 'lib/spark/serializer/base.rb', line 35

def batched?
  batch_size > 1
end

#dump(data, io) ⇒ Object

Serialize and send data into IO. Check ‘load_from_io’ for data format.



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/spark/serializer/base.rb', line 128

def dump(data, io)
  if !data.is_a?(Array) && !data.is_a?(Enumerator)
    data = [data]
  end
  data = data.each_slice(batch_size) if batched?

  data.each do |item|
    serialized = serialize(item)

    # Size and data can have different encoding
    # Marshal: both ASCII
    # Oj: ASCII and UTF-8
    io.write(pack_int(serialized.bytesize))
    io.write(serialized)
  end

  io.flush
end

#dump_to_java(data) ⇒ Object

For direct serialization



148
149
150
151
152
# File 'lib/spark/serializer/base.rb', line 148

def dump_to_java(data)
  data.map! do |item|
    serialize(item).to_java_bytes
  end
end

#load(source) ⇒ Object

Load and deserialize an Array from IO, Array of Java iterator

mri:   respond_to?(:iterator) => false
jruby: respond_to?(:iterator) => true


46
47
48
49
50
51
52
53
54
55
56
# File 'lib/spark/serializer/base.rb', line 46

def load(source)
  # Tempfile is Delegator for File so it is not IO
  # second wasy is __getobj__.is_a?(IO)
  if source.is_a?(IO) || source.is_a?(Tempfile)
    load_from_io(source)
  # elsif source.is_a?(Array)
  #   load_from_array(source)
  elsif try(source, :iterator)
    load_from_iterator(source.iterator)
  end
end

#load_from_io(io) ⇒ Object

Load data from IO. Data must have a format:

+------------+--------+
| signed int |  data  |
|     4B     |        |
+------------+--------+


65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/spark/serializer/base.rb', line 65

def load_from_io(io)
  return to_enum(__callee__, io) unless block_given?

  loop do
    lenght = read_int(io)
    break if lenght == DATA_EOF

    result = load_next_from_io(io, lenght)
    if batched? && result.respond_to?(:each)
      result.each {|item| yield item }
    else
      yield result
    end
  end # loop
end

#load_from_iterator(iterator) ⇒ Object

Load from Java iterator by calling hasNext and next



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/spark/serializer/base.rb', line 87

def load_from_iterator(iterator)
  result = []
  while iterator.hasNext
    item = iterator.next

    # mri: data are String
    # jruby: data are bytes Array

    if item.is_a?(String)
      # Serialized data
      result << deserialize(item)
    else
      # Java object
      if try(item, :getClass)
        case item.getClass.name
        when '[B'
          # Array of bytes
          result << deserialize(pack_unsigned_chars(item.to_a))
        when 'scala.Tuple2'
          # Tuple2
          result << deserialize(item._1, item._2)
        end
      end
    end

  end

  result.flatten!(1) if batched?
  result
end

#load_next_from_io(io, lenght) ⇒ Object

load_from_io



81
82
83
# File 'lib/spark/serializer/base.rb', line 81

def load_next_from_io(io, lenght)
  deserialize(io.read(lenght))
end

#read_int(io) ⇒ Object



118
119
120
121
122
# File 'lib/spark/serializer/base.rb', line 118

def read_int(io)
  bytes = io.read(4)
  return DATA_EOF if bytes.nil?
  unpack_int(bytes)
end

#set(batch_size) ⇒ Object

Set values given by user



21
22
23
24
# File 'lib/spark/serializer/base.rb', line 21

def set(batch_size)
  self.batch_size = batch_size unless batch_size.nil?
  self
end

#try(object, method) ⇒ Object

Rescue cannot be defined

mri   => RuntimeError
jruby => NoMethodError


159
160
161
162
163
164
165
166
# File 'lib/spark/serializer/base.rb', line 159

def try(object, method)
  begin
    object.__send__(method)
    return true
  rescue
    return false
  end
end

#unbatch!Object



30
31
32
# File 'lib/spark/serializer/base.rb', line 30

def unbatch!
  self.batch_size = 1
end