Top Level Namespace
Defined Under Namespace
Classes: TaDataDO
Constant Summary collapse
- EVENT_TRACK_TYPE_SET =
- TA_PRESET_COLUMN_MAP =
Instance Method Summary collapse
Instance Method Details
#filter(event) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/logstash/filter/sensors/sensors_data.rb', line 32 def filter(event) #在这里处理业务数据,如果没有进行grok等一系列处理的情况下,直接在message中获取元数据进行处理 begin = event.get('message') #message 是你的上传的每条日志 sensors_data = JSON.parse() distinct_id = sensors_data['distinct_id'] original_id = sensors_data['original_id'] type = sensors_data['type'] time = sensors_data['time'] event_name = sensors_data['event'] properties = sensors_data['properties'] if properties.nil? properties = {} end data = TaDataDO.new if time.nil? time = Time.now.strftime('%Y-%m-%d %H:%M:%S') else time = Time.at(time / 1000).strftime('%Y-%m-%d %H:%M:%S') end data.time = time if (type == 'track' && event_name == '$SignUp') || type == 'track_signup' data.account_id = distinct_id data.distinct_id = properties['$track_signup_original_id'] if distinct_id == original_id puts 'original_id error:' + + "\n" end elsif type == 'track' || type.index('profile_') == 0 is_login_id = properties['$is_login_id'] if is_login_id data.account_id = distinct_id if distinct_id != original_id data.distinct_id = original_id end else data.distinct_id = distinct_id end else puts 'not recognized type: ' + + "\n" end if type == 'track' || type == 'track_signup' event_name = event_name.gsub('$', '') ip = properties['$ip'] if ip data.ip = ip end data.type = 'track' data.event_name = event_name elsif type == 'profile_set' data.type = 'user_set' elsif type == 'profile_increment' data.type = 'user_add' elsif type == 'profile_delete' data.type = 'user_del' elsif type == 'profile_unset' data.type = 'user_unset' elsif type == 'profile_set_once' data.type = 'user_setOnce' else puts '暂不支持的type: ' + type + "\n" end properties_json = {} properties.each do |key, value| if TA_PRESET_COLUMN_MAP.has_key?(key) && EVENT_TRACK_TYPE_SET.include?(type) properties_json[TA_PRESET_COLUMN_MAP[key]] = value else properties_json[key.gsub('$', '')] = value end end data.properties = properties_json event.set('message', data.to_json) return [event] rescue Exception => e return [event] end end |