Method: Dataflow::Adapters::MongoDbAdapter#save
- Defined in:
- lib/dataflow/adapters/mongo_db_adapter.rb
#save(records:, replace_by: nil) ⇒ Object
Save the given records.
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 142 def save(records:, replace_by: nil) if replace_by.present? replace_keys = Array(replace_by) bulk_ops = records.map do |record| filter = replace_keys.map { |x| [x, record[x]] }.to_h { replace_one: { filter: filter, replacement: record, upsert: true } } end client[write_dataset_name].bulk_write(bulk_ops, ordered: false) else client[write_dataset_name].insert_many(records, ordered: false) end rescue Mongo::Error::BulkWriteError => e dup_key_error = e.result['writeErrors'].all? { |x| x['code'] == 11_000 } # don't raise if it is errors about duplicated keys unless dup_key_error raise e end end |