diff --git a/app/models/address.rb b/app/models/address.rb index c6334f8..f76932d 100644 --- a/app/models/address.rb +++ b/app/models/address.rb @@ -7,6 +7,7 @@ # country :integer # email :string # first_name :string +# import_token :uuid # last_name :string # line_1 :string # line_2 :string @@ -19,7 +20,8 @@ # # Indexes # -# index_addresses_on_batch_id (batch_id) +# index_addresses_on_batch_id (batch_id) +# index_addresses_on_import_token (import_token) WHERE (import_token IS NOT NULL) # # Foreign Keys # diff --git a/app/models/batch.rb b/app/models/batch.rb index 81c3cad..cf11409 100644 --- a/app/models/batch.rb +++ b/app/models/batch.rb @@ -13,7 +13,6 @@ # letter_weight :decimal(, ) # letter_width :decimal(, ) # tags :citext default([]), is an Array -# template_cycle :string default([]), is an Array # type :string not null # warehouse_user_facing_title :string # created_at :datetime not null @@ -109,59 +108,55 @@ class Batch < ApplicationRecord def run_map! rows = CSV.parse(csv_data, headers: true, converters: [->(s) { s&.strip&.delete(GREMLINS).presence }]) - # Phase 1: Collect all address data - address_attributes = [] - row_map = {} # Keep rows in a hash - Parallel.each(rows.each_with_index, in_threads: 8) do |row, i| + # Phase 1: Build address attributes in parallel with correlation tokens + # Parallel.map returns results in input order and avoids shared mutable state + items = Parallel.map(rows.each_with_index, in_threads: 8) do |row, i| begin - # Skip rows where first_name is blank next if row[field_mapping["first_name"]].blank? address_attrs = build_address_attributes(row) - if address_attrs - address_attributes << address_attrs - row_map[i] = row # Store row in hash - end + next unless address_attrs + + # UUID token correlates this row with its address after bulk insert + { token: SecureRandom.uuid, row: row, attrs: address_attrs } rescue => e Rails.logger.error("Error processing row #{i} in batch #{id}: #{e.message}") raise end + end.compact + + return mark_fields_mapped && save! if items.empty? + + # Bulk insert addresses with correlation tokens + now = Time.current + address_attributes = items.map do |item| + item[:attrs].merge(batch_id: id, import_token: item[:token], created_at: now, updated_at: now) end - # Bulk insert all addresses - if address_attributes.any? - now = Time.current - address_attributes.each do |attrs| - attrs[:created_at] = now - attrs[:updated_at] = now - attrs[:batch_id] = id - end + Address.insert_all!(address_attributes) + # Phase 2: Fetch addresses by token and create associated records + tokens = items.map { |item| item[:token] } + addresses_by_token = Address.where(import_token: tokens).index_by(&:import_token) + + Parallel.each(items, in_threads: 8) do |item| begin - Address.insert_all!(address_attributes) - rescue ActiveRecord::RecordInvalid => e - Rails.logger.error("Failed to insert addresses: #{e.message}") + address = addresses_by_token.fetch(item[:token]) + + ActiveRecord::Base.connection_pool.with_connection do + ActiveRecord::Base.transaction do + build_mapping(item[:row], address) + end + end + rescue => e + Rails.logger.error("Error creating associated records for address in batch #{id}: #{e.message}") raise end - - # Phase 2: Create associated records (letters) for each address - # Fetch all addresses we just created - addresses = Address.where(batch_id: id).where(created_at: now).to_a - - Parallel.each(addresses.each_with_index, in_threads: 8) do |address, i| - begin - ActiveRecord::Base.connection_pool.with_connection do - ActiveRecord::Base.transaction do - build_mapping(row_map[i], address) - end - end - rescue => e - Rails.logger.error("Error creating associated records for address #{address.id} in batch #{id}: #{e.message}") - raise - end - end end + # Clear tokens after successful mapping + Address.where(import_token: tokens).update_all(import_token: nil) + mark_fields_mapped save! end diff --git a/db/migrate/20260127200000_add_import_token_to_addresses.rb b/db/migrate/20260127200000_add_import_token_to_addresses.rb new file mode 100644 index 0000000..5ea1957 --- /dev/null +++ b/db/migrate/20260127200000_add_import_token_to_addresses.rb @@ -0,0 +1,6 @@ +class AddImportTokenToAddresses < ActiveRecord::Migration[8.0] + def change + add_column :addresses, :import_token, :uuid + add_index :addresses, :import_token, where: "import_token IS NOT NULL" + end +end diff --git a/db/schema.rb b/db/schema.rb index 1bcdd5b..558b806 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,22 +10,12 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do +ActiveRecord::Schema[8.0].define(version: 2026_01_27_200000) do # These are extensions that must be enabled in order to support this database enable_extension "citext" enable_extension "pg_catalog.plpgsql" enable_extension "pgcrypto" - create_table "action_text_rich_texts", force: :cascade do |t| - t.string "name", null: false - t.text "body" - t.string "record_type", null: false - t.bigint "record_id", null: false - t.datetime "created_at", null: false - t.datetime "updated_at", null: false - t.index ["record_type", "record_id", "name"], name: "index_action_text_rich_texts_uniqueness", unique: true - end - create_table "active_storage_attachments", force: :cascade do |t| t.string "name", null: false t.string "record_type", null: false @@ -68,7 +58,9 @@ ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do t.string "phone_number" t.bigint "batch_id" t.string "email" + t.uuid "import_token" t.index ["batch_id"], name: "index_addresses_on_batch_id" + t.index ["import_token"], name: "index_addresses_on_import_token", where: "(import_token IS NOT NULL)" end create_table "api_keys", force: :cascade do |t| @@ -100,10 +92,9 @@ ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do t.decimal "letter_weight" t.bigint "letter_mailer_id_id" t.bigint "letter_return_address_id" - t.citext "tags", default: [], array: true t.integer "letter_processing_category" t.date "letter_mailing_date" - t.string "template_cycle", default: [], array: true + t.citext "tags", default: [], array: true t.string "letter_return_address_name" t.bigint "letter_queue_id" t.bigint "hcb_payment_account_id" @@ -296,23 +287,23 @@ ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do t.string "name" t.string "slug" t.bigint "user_id", null: false - t.datetime "created_at", null: false - t.datetime "updated_at", null: false t.decimal "letter_height" t.decimal "letter_width" t.decimal "letter_weight" t.integer "letter_processing_category" + t.date "letter_mailing_date" t.bigint "letter_mailer_id_id" t.bigint "letter_return_address_id" t.string "letter_return_address_name" t.string "user_facing_title" t.citext "tags", default: [], array: true + t.datetime "created_at", null: false + t.datetime "updated_at", null: false t.string "type" t.string "template" t.string "postage_type" t.bigint "usps_payment_account_id" t.boolean "include_qr_code", default: true - t.date "letter_mailing_date" t.bigint "hcb_payment_account_id" t.index ["hcb_payment_account_id"], name: "index_letter_queues_on_hcb_payment_account_id" t.index ["letter_mailer_id_id"], name: "index_letter_queues_on_letter_mailer_id_id" @@ -341,17 +332,18 @@ ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do t.bigint "batch_id" t.bigint "return_address_id", null: false t.jsonb "metadata" - t.citext "tags", default: [], array: true t.integer "postage_type" t.date "mailing_date" + t.citext "tags", default: [], array: true + t.string "user_facing_title" t.datetime "printed_at" t.datetime "mailed_at" t.datetime "received_at" - t.string "user_facing_title" t.bigint "user_id", null: false t.string "return_address_name" t.bigint "letter_queue_id" t.string "idempotency_key" + t.index ["aasm_state"], name: "index_letters_on_aasm_state" t.index ["address_id"], name: "index_letters_on_address_id" t.index ["batch_id"], name: "index_letters_on_batch_id" t.index ["idempotency_key"], name: "index_letters_on_idempotency_key", unique: true @@ -399,8 +391,6 @@ ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do t.datetime "created_at", null: false t.datetime "updated_at", null: false t.boolean "opted_out_of_map", default: false - t.string "hca_id" - t.index ["hca_id"], name: "index_public_users_on_hca_id", unique: true end create_table "return_addresses", force: :cascade do |t| @@ -435,7 +425,6 @@ ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do t.string "icon_url" t.string "username" t.boolean "can_warehouse" - t.boolean "back_office", default: false t.boolean "can_impersonate_public" t.bigint "home_mid_id", default: 1, null: false t.bigint "home_return_address_id", default: 1, null: false @@ -472,13 +461,15 @@ ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do t.bigint "letter_id" t.bigint "batch_id", null: false t.jsonb "payload" - t.datetime "created_at", null: false - t.datetime "updated_at", null: false t.string "opcode" t.string "zip_code" t.bigint "mailer_id_id", null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false t.index ["batch_id"], name: "index_usps_iv_mtr_events_on_batch_id" t.index ["letter_id"], name: "index_usps_iv_mtr_events_on_letter_id" + t.index ["mailer_id_id", "happened_at"], name: "index_usps_iv_mtr_events_on_mailer_id_id_and_happened_at" + t.index ["mailer_id_id", "opcode"], name: "index_usps_iv_mtr_events_on_mailer_id_id_and_opcode" t.index ["mailer_id_id"], name: "index_usps_iv_mtr_events_on_mailer_id_id" end @@ -556,10 +547,14 @@ ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do t.citext "tags", default: [], array: true t.decimal "labor_cost", precision: 10, scale: 2 t.decimal "contents_cost", precision: 10, scale: 2 + t.integer "created_via", default: 0, null: false + t.bigint "origin_batch_id" t.index ["address_id"], name: "index_warehouse_orders_on_address_id" t.index ["batch_id"], name: "index_warehouse_orders_on_batch_id" + t.index ["created_via"], name: "index_warehouse_orders_on_created_via" t.index ["hc_id"], name: "index_warehouse_orders_on_hc_id" t.index ["idempotency_key"], name: "index_warehouse_orders_on_idempotency_key", unique: true + t.index ["origin_batch_id"], name: "index_warehouse_orders_on_origin_batch_id" t.index ["source_tag_id"], name: "index_warehouse_orders_on_source_tag_id" t.index ["tags"], name: "index_warehouse_orders_on_tags", using: :gin t.index ["template_id"], name: "index_warehouse_orders_on_template_id" @@ -593,6 +588,15 @@ ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do t.index ["zenventory_id"], name: "index_warehouse_purchase_orders_on_zenventory_id", unique: true end + create_table "warehouse_purpose_codes", force: :cascade do |t| + t.string "code" + t.string "description" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.integer "sequence_number" + t.index ["code"], name: "index_warehouse_purpose_codes_on_code" + end + create_table "warehouse_skus", force: :cascade do |t| t.string "sku" t.text "description" @@ -667,6 +671,7 @@ ActiveRecord::Schema[8.0].define(version: 2026_01_15_180434) do add_foreign_key "warehouse_line_items", "warehouse_templates", column: "template_id" add_foreign_key "warehouse_orders", "addresses" add_foreign_key "warehouse_orders", "batches" + add_foreign_key "warehouse_orders", "batches", column: "origin_batch_id" add_foreign_key "warehouse_orders", "source_tags" add_foreign_key "warehouse_orders", "users" add_foreign_key "warehouse_orders", "warehouse_templates", column: "template_id"