mirror of
https://github.com/System-End/theseus.git
synced 2026-04-19 15:28:19 +00:00
Fix CSV batch import row-address mismatch bug
This commit is contained in:
parent
888b119d51
commit
5f38f885c6
4 changed files with 70 additions and 62 deletions
|
|
@ -7,6 +7,7 @@
|
|||
# country :integer
|
||||
# email :string
|
||||
# first_name :string
|
||||
# import_token :uuid
|
||||
# last_name :string
|
||||
# line_1 :string
|
||||
# line_2 :string
|
||||
|
|
@ -19,7 +20,8 @@
|
|||
#
|
||||
# Indexes
|
||||
#
|
||||
# index_addresses_on_batch_id (batch_id)
|
||||
# index_addresses_on_batch_id (batch_id)
|
||||
# index_addresses_on_import_token (import_token) WHERE (import_token IS NOT NULL)
|
||||
#
|
||||
# Foreign Keys
|
||||
#
|
||||
|
|
|
|||
|
|
@ -13,7 +13,6 @@
|
|||
# letter_weight :decimal(, )
|
||||
# letter_width :decimal(, )
|
||||
# tags :citext default([]), is an Array
|
||||
# template_cycle :string default([]), is an Array
|
||||
# type :string not null
|
||||
# warehouse_user_facing_title :string
|
||||
# created_at :datetime not null
|
||||
|
|
@ -109,59 +108,55 @@ class Batch < ApplicationRecord
|
|||
def run_map!
|
||||
rows = CSV.parse(csv_data, headers: true, converters: [->(s) { s&.strip&.delete(GREMLINS).presence }])
|
||||
|
||||
# Phase 1: Collect all address data
|
||||
address_attributes = []
|
||||
row_map = {} # Keep rows in a hash
|
||||
Parallel.each(rows.each_with_index, in_threads: 8) do |row, i|
|
||||
# Phase 1: Build address attributes in parallel with correlation tokens
|
||||
# Parallel.map returns results in input order and avoids shared mutable state
|
||||
items = Parallel.map(rows.each_with_index, in_threads: 8) do |row, i|
|
||||
begin
|
||||
# Skip rows where first_name is blank
|
||||
next if row[field_mapping["first_name"]].blank?
|
||||
|
||||
address_attrs = build_address_attributes(row)
|
||||
if address_attrs
|
||||
address_attributes << address_attrs
|
||||
row_map[i] = row # Store row in hash
|
||||
end
|
||||
next unless address_attrs
|
||||
|
||||
# UUID token correlates this row with its address after bulk insert
|
||||
{ token: SecureRandom.uuid, row: row, attrs: address_attrs }
|
||||
rescue => e
|
||||
Rails.logger.error("Error processing row #{i} in batch #{id}: #{e.message}")
|
||||
raise
|
||||
end
|
||||
end.compact
|
||||
|
||||
return mark_fields_mapped && save! if items.empty?
|
||||
|
||||
# Bulk insert addresses with correlation tokens
|
||||
now = Time.current
|
||||
address_attributes = items.map do |item|
|
||||
item[:attrs].merge(batch_id: id, import_token: item[:token], created_at: now, updated_at: now)
|
||||
end
|
||||
|
||||
# Bulk insert all addresses
|
||||
if address_attributes.any?
|
||||
now = Time.current
|
||||
address_attributes.each do |attrs|
|
||||
attrs[:created_at] = now
|
||||
attrs[:updated_at] = now
|
||||
attrs[:batch_id] = id
|
||||
end
|
||||
Address.insert_all!(address_attributes)
|
||||
|
||||
# Phase 2: Fetch addresses by token and create associated records
|
||||
tokens = items.map { |item| item[:token] }
|
||||
addresses_by_token = Address.where(import_token: tokens).index_by(&:import_token)
|
||||
|
||||
Parallel.each(items, in_threads: 8) do |item|
|
||||
begin
|
||||
Address.insert_all!(address_attributes)
|
||||
rescue ActiveRecord::RecordInvalid => e
|
||||
Rails.logger.error("Failed to insert addresses: #{e.message}")
|
||||
address = addresses_by_token.fetch(item[:token])
|
||||
|
||||
ActiveRecord::Base.connection_pool.with_connection do
|
||||
ActiveRecord::Base.transaction do
|
||||
build_mapping(item[:row], address)
|
||||
end
|
||||
end
|
||||
rescue => e
|
||||
Rails.logger.error("Error creating associated records for address in batch #{id}: #{e.message}")
|
||||
raise
|
||||
end
|
||||
|
||||
# Phase 2: Create associated records (letters) for each address
|
||||
# Fetch all addresses we just created
|
||||
addresses = Address.where(batch_id: id).where(created_at: now).to_a
|
||||
|
||||
Parallel.each(addresses.each_with_index, in_threads: 8) do |address, i|
|
||||
begin
|
||||
ActiveRecord::Base.connection_pool.with_connection do
|
||||
ActiveRecord::Base.transaction do
|
||||
build_mapping(row_map[i], address)
|
||||
end
|
||||
end
|
||||
rescue => e
|
||||
Rails.logger.error("Error creating associated records for address #{address.id} in batch #{id}: #{e.message}")
|
||||
raise
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Clear tokens after successful mapping
|
||||
Address.where(import_token: tokens).update_all(import_token: nil)
|
||||
|
||||
mark_fields_mapped
|
||||
save!
|
||||
end
|
||||
|
|
|
|||
|
|
@ -0,0 +1,6 @@
|
|||
class AddImportTokenToAddresses < ActiveRecord::Migration[8.0]
|
||||
def change
|
||||
add_column :addresses, :import_token, :uuid
|
||||
add_index :addresses, :import_token, where: "import_token IS NOT NULL"
|
||||
end
|
||||
end
|
||||
51
db/schema.rb
generated
51
db/schema.rb
generated
|
|
@ -10,22 +10,12 @@
|
|||
#
|
||||
# It's strongly recommended that you check this file into your version control system.
|
||||
|
||||
ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do
|
||||
ActiveRecord::Schema[8.0].define(version: 2026_01_27_200000) do
|
||||
# These are extensions that must be enabled in order to support this database
|
||||
enable_extension "citext"
|
||||
enable_extension "pg_catalog.plpgsql"
|
||||
enable_extension "pgcrypto"
|
||||
|
||||
create_table "action_text_rich_texts", force: :cascade do |t|
|
||||
t.string "name", null: false
|
||||
t.text "body"
|
||||
t.string "record_type", null: false
|
||||
t.bigint "record_id", null: false
|
||||
t.datetime "created_at", null: false
|
||||
t.datetime "updated_at", null: false
|
||||
t.index ["record_type", "record_id", "name"], name: "index_action_text_rich_texts_uniqueness", unique: true
|
||||
end
|
||||
|
||||
create_table "active_storage_attachments", force: :cascade do |t|
|
||||
t.string "name", null: false
|
||||
t.string "record_type", null: false
|
||||
|
|
@ -68,7 +58,9 @@ ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do
|
|||
t.string "phone_number"
|
||||
t.bigint "batch_id"
|
||||
t.string "email"
|
||||
t.uuid "import_token"
|
||||
t.index ["batch_id"], name: "index_addresses_on_batch_id"
|
||||
t.index ["import_token"], name: "index_addresses_on_import_token", where: "(import_token IS NOT NULL)"
|
||||
end
|
||||
|
||||
create_table "api_keys", force: :cascade do |t|
|
||||
|
|
@ -100,10 +92,9 @@ ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do
|
|||
t.decimal "letter_weight"
|
||||
t.bigint "letter_mailer_id_id"
|
||||
t.bigint "letter_return_address_id"
|
||||
t.citext "tags", default: [], array: true
|
||||
t.integer "letter_processing_category"
|
||||
t.date "letter_mailing_date"
|
||||
t.string "template_cycle", default: [], array: true
|
||||
t.citext "tags", default: [], array: true
|
||||
t.string "letter_return_address_name"
|
||||
t.bigint "letter_queue_id"
|
||||
t.bigint "hcb_payment_account_id"
|
||||
|
|
@ -296,23 +287,23 @@ ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do
|
|||
t.string "name"
|
||||
t.string "slug"
|
||||
t.bigint "user_id", null: false
|
||||
t.datetime "created_at", null: false
|
||||
t.datetime "updated_at", null: false
|
||||
t.decimal "letter_height"
|
||||
t.decimal "letter_width"
|
||||
t.decimal "letter_weight"
|
||||
t.integer "letter_processing_category"
|
||||
t.date "letter_mailing_date"
|
||||
t.bigint "letter_mailer_id_id"
|
||||
t.bigint "letter_return_address_id"
|
||||
t.string "letter_return_address_name"
|
||||
t.string "user_facing_title"
|
||||
t.citext "tags", default: [], array: true
|
||||
t.datetime "created_at", null: false
|
||||
t.datetime "updated_at", null: false
|
||||
t.string "type"
|
||||
t.string "template"
|
||||
t.string "postage_type"
|
||||
t.bigint "usps_payment_account_id"
|
||||
t.boolean "include_qr_code", default: true
|
||||
t.date "letter_mailing_date"
|
||||
t.bigint "hcb_payment_account_id"
|
||||
t.index ["hcb_payment_account_id"], name: "index_letter_queues_on_hcb_payment_account_id"
|
||||
t.index ["letter_mailer_id_id"], name: "index_letter_queues_on_letter_mailer_id_id"
|
||||
|
|
@ -341,17 +332,18 @@ ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do
|
|||
t.bigint "batch_id"
|
||||
t.bigint "return_address_id", null: false
|
||||
t.jsonb "metadata"
|
||||
t.citext "tags", default: [], array: true
|
||||
t.integer "postage_type"
|
||||
t.date "mailing_date"
|
||||
t.citext "tags", default: [], array: true
|
||||
t.string "user_facing_title"
|
||||
t.datetime "printed_at"
|
||||
t.datetime "mailed_at"
|
||||
t.datetime "received_at"
|
||||
t.string "user_facing_title"
|
||||
t.bigint "user_id", null: false
|
||||
t.string "return_address_name"
|
||||
t.bigint "letter_queue_id"
|
||||
t.string "idempotency_key"
|
||||
t.index ["aasm_state"], name: "index_letters_on_aasm_state"
|
||||
t.index ["address_id"], name: "index_letters_on_address_id"
|
||||
t.index ["batch_id"], name: "index_letters_on_batch_id"
|
||||
t.index ["idempotency_key"], name: "index_letters_on_idempotency_key", unique: true
|
||||
|
|
@ -399,8 +391,6 @@ ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do
|
|||
t.datetime "created_at", null: false
|
||||
t.datetime "updated_at", null: false
|
||||
t.boolean "opted_out_of_map", default: false
|
||||
t.string "hca_id"
|
||||
t.index ["hca_id"], name: "index_public_users_on_hca_id", unique: true
|
||||
end
|
||||
|
||||
create_table "return_addresses", force: :cascade do |t|
|
||||
|
|
@ -435,7 +425,6 @@ ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do
|
|||
t.string "icon_url"
|
||||
t.string "username"
|
||||
t.boolean "can_warehouse"
|
||||
t.boolean "back_office", default: false
|
||||
t.boolean "can_impersonate_public"
|
||||
t.bigint "home_mid_id", default: 1, null: false
|
||||
t.bigint "home_return_address_id", default: 1, null: false
|
||||
|
|
@ -472,13 +461,15 @@ ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do
|
|||
t.bigint "letter_id"
|
||||
t.bigint "batch_id", null: false
|
||||
t.jsonb "payload"
|
||||
t.datetime "created_at", null: false
|
||||
t.datetime "updated_at", null: false
|
||||
t.string "opcode"
|
||||
t.string "zip_code"
|
||||
t.bigint "mailer_id_id", null: false
|
||||
t.datetime "created_at", null: false
|
||||
t.datetime "updated_at", null: false
|
||||
t.index ["batch_id"], name: "index_usps_iv_mtr_events_on_batch_id"
|
||||
t.index ["letter_id"], name: "index_usps_iv_mtr_events_on_letter_id"
|
||||
t.index ["mailer_id_id", "happened_at"], name: "index_usps_iv_mtr_events_on_mailer_id_id_and_happened_at"
|
||||
t.index ["mailer_id_id", "opcode"], name: "index_usps_iv_mtr_events_on_mailer_id_id_and_opcode"
|
||||
t.index ["mailer_id_id"], name: "index_usps_iv_mtr_events_on_mailer_id_id"
|
||||
end
|
||||
|
||||
|
|
@ -556,10 +547,14 @@ ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do
|
|||
t.citext "tags", default: [], array: true
|
||||
t.decimal "labor_cost", precision: 10, scale: 2
|
||||
t.decimal "contents_cost", precision: 10, scale: 2
|
||||
t.integer "created_via", default: 0, null: false
|
||||
t.bigint "origin_batch_id"
|
||||
t.index ["address_id"], name: "index_warehouse_orders_on_address_id"
|
||||
t.index ["batch_id"], name: "index_warehouse_orders_on_batch_id"
|
||||
t.index ["created_via"], name: "index_warehouse_orders_on_created_via"
|
||||
t.index ["hc_id"], name: "index_warehouse_orders_on_hc_id"
|
||||
t.index ["idempotency_key"], name: "index_warehouse_orders_on_idempotency_key", unique: true
|
||||
t.index ["origin_batch_id"], name: "index_warehouse_orders_on_origin_batch_id"
|
||||
t.index ["source_tag_id"], name: "index_warehouse_orders_on_source_tag_id"
|
||||
t.index ["tags"], name: "index_warehouse_orders_on_tags", using: :gin
|
||||
t.index ["template_id"], name: "index_warehouse_orders_on_template_id"
|
||||
|
|
@ -593,6 +588,15 @@ ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do
|
|||
t.index ["zenventory_id"], name: "index_warehouse_purchase_orders_on_zenventory_id", unique: true
|
||||
end
|
||||
|
||||
create_table "warehouse_purpose_codes", force: :cascade do |t|
|
||||
t.string "code"
|
||||
t.string "description"
|
||||
t.datetime "created_at", null: false
|
||||
t.datetime "updated_at", null: false
|
||||
t.integer "sequence_number"
|
||||
t.index ["code"], name: "index_warehouse_purpose_codes_on_code"
|
||||
end
|
||||
|
||||
create_table "warehouse_skus", force: :cascade do |t|
|
||||
t.string "sku"
|
||||
t.text "description"
|
||||
|
|
@ -667,6 +671,7 @@ ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do
|
|||
add_foreign_key "warehouse_line_items", "warehouse_templates", column: "template_id"
|
||||
add_foreign_key "warehouse_orders", "addresses"
|
||||
add_foreign_key "warehouse_orders", "batches"
|
||||
add_foreign_key "warehouse_orders", "batches", column: "origin_batch_id"
|
||||
add_foreign_key "warehouse_orders", "source_tags"
|
||||
add_foreign_key "warehouse_orders", "users"
|
||||
add_foreign_key "warehouse_orders", "warehouse_templates", column: "template_id"
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue