Imports + mirrors :DD (#993)

* Imports + mirrors :DD

* Stuff and things

* Fixes

* Fixes x2

* Tests!

* Hmm
This commit is contained in:
Mahad Kalam 2026-02-23 15:00:43 +00:00 committed by GitHub
parent 73223f1ec7
commit 7317cc45e7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
32 changed files with 2261 additions and 237 deletions

View file

@ -219,6 +219,7 @@ class Api::Hackatime::V1::HackatimeController < ApplicationController
def handle_heartbeat(heartbeat_array)
results = []
should_enqueue_mirror_sync = false
heartbeat_array.each do |heartbeat|
source_type = :direct_entry
@ -251,12 +252,14 @@ class Api::Hackatime::V1::HackatimeController < ApplicationController
end
queue_project_mapping(heartbeat[:project])
results << [ new_heartbeat.attributes, 201 ]
should_enqueue_mirror_sync ||= source_type == :direct_entry
rescue => e
Rails.logger.error("Error creating heartbeat: #{e.class.name} #{e.message}")
results << [ { error: e.message, type: e.class.name }, 422 ]
end
PosthogService.capture_once_per_day(@user, "heartbeat_sent", { heartbeat_count: heartbeat_array.size })
enqueue_mirror_sync if should_enqueue_mirror_sync
results
end
@ -270,6 +273,14 @@ class Api::Hackatime::V1::HackatimeController < ApplicationController
Rails.logger.error("Error queuing project mapping: #{e.class.name} #{e.message}")
end
def enqueue_mirror_sync
return unless Flipper.enabled?(:wakatime_imports_mirrors)
MirrorFanoutEnqueueJob.perform_later(@user.id)
rescue => e
Rails.logger.error("Error enqueuing mirror sync fanout: #{e.class.name} #{e.message}")
end
def check_lockout
return unless @user&.pending_deletion?
render json: { error: "Account pending deletion" }, status: :forbidden

View file

@ -0,0 +1,131 @@
class My::HeartbeatImportSourcesController < ApplicationController
before_action :ensure_current_user
before_action :ensure_imports_and_mirrors_enabled
def create
if current_user.heartbeat_import_source.present?
redirect_to my_settings_data_path, alert: "Import source already configured. Update it instead."
return
end
source = current_user.build_heartbeat_import_source(create_params)
source.provider = :wakatime_compatible
source.status = :idle
if source.save
HeartbeatImportSourceSyncJob.perform_later(source.id) if source.sync_enabled?
redirect_to my_settings_data_path, notice: "Import source configured successfully."
else
redirect_to my_settings_data_path, alert: source.errors.full_messages.to_sentence
end
end
def update
source = current_user.heartbeat_import_source
unless source
redirect_to my_settings_data_path, alert: "No import source is configured."
return
end
rerun_backfill = ActiveModel::Type::Boolean.new.cast(params.dig(:heartbeat_import_source, :rerun_backfill))
attrs = update_params
attrs = attrs.except(:encrypted_api_key) if attrs[:encrypted_api_key].blank?
if source.update(attrs)
source.reset_backfill! if rerun_backfill
HeartbeatImportSourceSyncJob.perform_later(source.id) if source.sync_enabled?
redirect_to my_settings_data_path, notice: "Import source updated successfully."
else
redirect_to my_settings_data_path, alert: source.errors.full_messages.to_sentence
end
end
def show
source = current_user.heartbeat_import_source
render json: { import_source: source_payload(source) }
end
def destroy
source = current_user.heartbeat_import_source
unless source
redirect_to my_settings_data_path, alert: "No import source is configured."
return
end
source.destroy
redirect_to my_settings_data_path, notice: "Import source removed."
end
def sync_now
source = current_user.heartbeat_import_source
unless source
redirect_to my_settings_data_path, alert: "No import source is configured."
return
end
unless source.sync_enabled?
redirect_to my_settings_data_path, alert: "Enable sync before running sync now."
return
end
HeartbeatImportSourceSyncJob.perform_later(source.id)
redirect_to my_settings_data_path, notice: "Sync queued."
end
private
def ensure_current_user
redirect_to root_path, alert: "You must be logged in to view this page." unless current_user
end
def ensure_imports_and_mirrors_enabled
return if Flipper.enabled?(:wakatime_imports_mirrors)
if request.format.json?
render json: { error: "Imports and mirrors are currently disabled." }, status: :not_found
else
redirect_to my_settings_data_path, alert: "Imports and mirrors are currently disabled."
end
end
def create_params
base_params.merge(provider: :wakatime_compatible)
end
def update_params
base_params
end
def base_params
params.require(:heartbeat_import_source).permit(
:endpoint_url,
:encrypted_api_key,
:sync_enabled,
:initial_backfill_start_date,
:initial_backfill_end_date
)
end
def source_payload(source)
return nil unless source
{
id: source.id,
provider: source.provider,
endpoint_url: source.endpoint_url,
sync_enabled: source.sync_enabled,
status: source.status,
initial_backfill_start_date: source.initial_backfill_start_date&.iso8601,
initial_backfill_end_date: source.initial_backfill_end_date&.iso8601,
backfill_cursor_date: source.backfill_cursor_date&.iso8601,
last_synced_at: source.last_synced_at&.iso8601,
last_synced_ago: source.last_synced_at ? view_context.time_ago_in_words(source.last_synced_at) : nil,
last_error_message: source.last_error_message,
last_error_at: source.last_error_at&.iso8601,
consecutive_failures: source.consecutive_failures,
imported_count: Rails.cache.fetch("user:#{current_user.id}:wakapi_import_count", expires_in: 5.minutes) do
current_user.heartbeats.where(source_type: :wakapi_import).count
end
}
end
end

View file

@ -36,7 +36,7 @@ class SessionsController < ApplicationController
PosthogService.identify(@user)
PosthogService.capture(@user, "user_signed_in", { method: "hca" })
if @user.created_at > 5.seconds.ago
if @user.previously_new_record?
redirect_to my_wakatime_setup_path, notice: "Successfully signed in with Hack Club Auth! Welcome!"
elsif session[:return_data]&.dig("url").present?
return_url = session[:return_data].delete("url")
@ -101,7 +101,7 @@ class SessionsController < ApplicationController
if slack_state&.dig("close_window")
redirect_to close_window_path
elsif @user.created_at > 5.seconds.ago
elsif @user.previously_new_record?
session[:return_data] = { "url" => safe_return_url(slack_state&.dig("continue").presence) }
redirect_to my_wakatime_setup_path, notice: "Successfully signed in with Slack! Welcome!"
elsif slack_state&.dig("continue").present? && safe_return_url(slack_state["continue"]).present?

View file

@ -32,6 +32,7 @@ class Settings::BaseController < InertiaController
def prepare_settings_page
@is_own_settings = is_own_settings?
@can_enable_slack_status = @user.slack_access_token.present? && @user.slack_scopes.include?("users.profile:write")
@imports_and_mirrors_enabled = Flipper.enabled?(:wakatime_imports_mirrors)
@enabled_sailors_logs = SailorsLogNotificationPreference.where(
slack_uid: @user.slack_uid,
@ -60,7 +61,8 @@ class Settings::BaseController < InertiaController
@general_badge_url = GithubReadmeStats.new(@user.id, "darcula").generate_badge_url
@latest_api_key_token = @user.api_keys.last&.token
@mirrors = current_user.wakatime_mirrors.order(created_at: :desc)
@mirrors = @imports_and_mirrors_enabled ? current_user.wakatime_mirrors.order(created_at: :desc) : []
@import_source = @imports_and_mirrors_enabled ? current_user.heartbeat_import_source : nil
end
def settings_page_props(active_section:, settings_update_path:)
@ -127,7 +129,9 @@ class Settings::BaseController < InertiaController
export_range_heartbeats_path: export_my_heartbeats_path,
create_heartbeat_import_path: my_heartbeat_imports_path,
create_deletion_path: create_deletion_path,
user_wakatime_mirrors_path: user_wakatime_mirrors_path(current_user)
user_wakatime_mirrors_path: user_wakatime_mirrors_path(current_user),
heartbeat_import_source_path: my_heartbeat_import_source_path,
heartbeat_import_source_sync_path: sync_my_heartbeat_import_source_path
},
options: {
countries: ISO3166::Country.all.map { |country|
@ -217,19 +221,15 @@ class Settings::BaseController < InertiaController
heartbeats_last_7_days: number_with_delimiter(heartbeats_last_7_days),
is_restricted: (@user.trust_level == "red")
},
import_source: serialized_import_source(@import_source),
mirrors: serialized_mirrors(@mirrors),
admin_tools: {
visible: current_user.admin_level.in?(%w[admin superadmin]),
mirrors: @mirrors.map { |mirror|
{
id: mirror.id,
endpoint_url: mirror.endpoint_url,
last_synced_ago: (mirror.last_synced_at ? "#{time_ago_in_words(mirror.last_synced_at)} ago" : "Never"),
destroy_path: user_wakatime_mirror_path(current_user, mirror)
}
}
mirrors: serialized_mirrors(@mirrors)
},
ui: {
show_dev_import: Rails.env.development?
show_dev_import: Rails.env.development?,
show_imports_and_mirrors: @imports_and_mirrors_enabled
},
heartbeat_import: {
import_id: heartbeat_import_id,
@ -276,4 +276,43 @@ class Settings::BaseController < InertiaController
def is_own_settings?
params["id"] == "my" || params["id"]&.blank?
end
def serialized_import_source(source)
return nil unless source
{
id: source.id,
provider: source.provider,
endpoint_url: source.endpoint_url,
sync_enabled: source.sync_enabled,
status: source.status,
initial_backfill_start_date: source.initial_backfill_start_date&.iso8601,
initial_backfill_end_date: source.initial_backfill_end_date&.iso8601,
backfill_cursor_date: source.backfill_cursor_date&.iso8601,
last_synced_at: source.last_synced_at&.iso8601,
last_synced_ago: (source.last_synced_at ? "#{time_ago_in_words(source.last_synced_at)} ago" : "Never"),
last_error_message: source.last_error_message,
last_error_at: source.last_error_at&.iso8601,
consecutive_failures: source.consecutive_failures,
imported_count: Rails.cache.fetch("user:#{source.user_id}:wakapi_import_count", expires_in: 5.minutes) do
source.user.heartbeats.where(source_type: :wakapi_import).count
end
}
end
def serialized_mirrors(mirrors)
mirrors.map { |mirror|
{
id: mirror.id,
endpoint_url: mirror.endpoint_url,
enabled: mirror.enabled,
last_synced_at: mirror.last_synced_at&.iso8601,
last_synced_ago: (mirror.last_synced_at ? "#{time_ago_in_words(mirror.last_synced_at)} ago" : "Never"),
consecutive_failures: mirror.consecutive_failures,
last_error_message: mirror.last_error_message,
last_error_at: mirror.last_error_at&.iso8601,
destroy_path: user_wakatime_mirror_path(current_user, mirror)
}
}
end
end

View file

@ -1,20 +1,22 @@
class WakatimeMirrorsController < ApplicationController
before_action :set_user
before_action :require_current_user
before_action :ensure_imports_and_mirrors_enabled
before_action :set_mirror, only: [ :destroy ]
def create
@mirror = @user.wakatime_mirrors.build(mirror_params)
@mirror.request_host = request.host
if @mirror.save
redirect_to my_settings_path, notice: "WakaTime mirror added successfully"
redirect_to my_settings_data_path, notice: "WakaTime mirror added successfully"
else
redirect_to my_settings_path, alert: "Failed to add WakaTime mirror: #{@mirror.errors.full_messages.join(', ')}"
redirect_to my_settings_data_path, alert: "Failed to add WakaTime mirror: #{@mirror.errors.full_messages.join(', ')}"
end
end
def destroy
@mirror.destroy
redirect_to my_settings_path, notice: "WakaTime mirror removed successfully"
redirect_to my_settings_data_path, notice: "WakaTime mirror removed successfully"
end
private
@ -36,4 +38,10 @@ class WakatimeMirrorsController < ApplicationController
redirect_to root_path, alert: "You are not authorized to access this page"
end
end
def ensure_imports_and_mirrors_enabled
return if Flipper.enabled?(:wakatime_imports_mirrors)
redirect_to my_settings_data_path, alert: "Imports and mirrors are currently disabled."
end
end

View file

@ -1,7 +1,4 @@
<script lang="ts">
import { onMount } from "svelte";
import Button from "../../../components/Button.svelte";
import Modal from "../../../components/Modal.svelte";
import SettingsShell from "./Shell.svelte";
import type { AdminPageProps } from "./types";
@ -12,36 +9,8 @@
heading,
subheading,
admin_tools,
paths,
errors,
}: AdminPageProps = $props();
let csrfToken = $state("");
let deleteMirrorModalOpen = $state(false);
let selectedMirror = $state<{
endpoint_url: string;
destroy_path: string;
} | null>(null);
const openDeleteMirrorModal = (mirror: {
endpoint_url: string;
destroy_path: string;
}) => {
selectedMirror = mirror;
deleteMirrorModalOpen = true;
};
const closeDeleteMirrorModal = () => {
deleteMirrorModalOpen = false;
selectedMirror = null;
};
onMount(() => {
csrfToken =
document
.querySelector("meta[name='csrf-token']")
?.getAttribute("content") || "";
});
</script>
<SettingsShell
@ -54,80 +23,12 @@
{admin_tools}
>
{#if admin_tools.visible}
<div class="space-y-8">
<section id="wakatime_mirror">
<h2 class="text-xl font-semibold text-surface-content">
WakaTime Mirrors
</h2>
<p class="mt-1 text-sm text-muted">
Mirror heartbeats to external WakaTime-compatible endpoints.
</p>
{#if admin_tools.mirrors.length > 0}
<div class="mt-4 space-y-2">
{#each admin_tools.mirrors as mirror}
<div class="rounded-md border border-surface-200 bg-darker p-3">
<p class="text-sm font-semibold text-surface-content">
{mirror.endpoint_url}
</p>
<p class="mt-1 text-xs text-muted">
Last synced: {mirror.last_synced_ago}
</p>
<div class="mt-3">
<Button
type="button"
variant="surface"
size="xs"
onclick={() => openDeleteMirrorModal(mirror)}
>
Delete
</Button>
</div>
</div>
{/each}
</div>
{/if}
<form
method="post"
action={paths.user_wakatime_mirrors_path}
class="mt-5 space-y-3"
>
<input type="hidden" name="authenticity_token" value={csrfToken} />
<div>
<label
for="endpoint_url"
class="mb-2 block text-sm text-surface-content"
>
Endpoint URL
</label>
<input
id="endpoint_url"
type="url"
name="wakatime_mirror[endpoint_url]"
value="https://wakatime.com/api/v1"
required
class="w-full rounded-md border border-surface-200 bg-darker px-3 py-2 text-sm text-surface-content focus:border-primary focus:outline-none"
/>
</div>
<div>
<label
for="mirror_key"
class="mb-2 block text-sm text-surface-content"
>
WakaTime API Key
</label>
<input
id="mirror_key"
type="password"
name="wakatime_mirror[encrypted_api_key]"
required
class="w-full rounded-md border border-surface-200 bg-darker px-3 py-2 text-sm text-surface-content focus:border-primary focus:outline-none"
/>
</div>
<Button type="submit" variant="primary">Add mirror</Button>
</form>
</section>
<div class="space-y-3">
<h2 class="text-xl font-semibold text-surface-content">Admin</h2>
<p class="text-sm text-muted">
Mirror and import controls are available under Data settings for all
users.
</p>
</div>
{:else}
<p
@ -137,39 +38,3 @@
</p>
{/if}
</SettingsShell>
<Modal
bind:open={deleteMirrorModalOpen}
title="Delete mirror endpoint?"
description={selectedMirror
? `${selectedMirror.endpoint_url} will stop receiving mirrored heartbeats.`
: "This mirror endpoint will be removed."}
maxWidth="max-w-lg"
hasActions
>
{#snippet actions()}
{#if selectedMirror}
<div class="grid grid-cols-1 gap-3 sm:grid-cols-2">
<Button
type="button"
variant="dark"
class="h-10 w-full border border-surface-300 text-muted"
onclick={closeDeleteMirrorModal}
>
Cancel
</Button>
<form method="post" action={selectedMirror.destroy_path} class="m-0">
<input type="hidden" name="_method" value="delete" />
<input type="hidden" name="authenticity_token" value={csrfToken} />
<Button
type="submit"
variant="primary"
class="h-10 w-full text-on-primary"
>
Delete mirror
</Button>
</form>
</div>
{/if}
{/snippet}
</Modal>

View file

@ -26,6 +26,8 @@
paths,
migration,
data_export,
import_source,
mirrors,
ui,
heartbeat_import,
errors,
@ -45,6 +47,10 @@
let errorsCount = $state(0);
let isStartingImport = $state(false);
let isPolling = $state(false);
let importSource = $state<DataPageProps["import_source"] | null>(null);
let backfillMode = $state<"all_time" | "date_range">("all_time");
let importStartDate = $state("");
let importEndDate = $state("");
const importPollParams: { heartbeat_import_id?: string } = {};
const tweenedProgress = tweened(0, { duration: 320, easing: cubicOut });
@ -76,6 +82,16 @@
{ autoStart: false },
);
const { start: startImportSourcePolling, stop: stopImportSourcePolling } =
usePoll(
10000,
{
only: ["import_source"],
preserveUrl: true,
},
{ autoStart: false },
);
onMount(() => {
csrfToken =
document
@ -83,12 +99,41 @@
?.getAttribute("content") || "";
syncImportFromProps(heartbeat_import);
if (ui.show_imports_and_mirrors) {
importSource = import_source || null;
importStartDate = importSource?.initial_backfill_start_date || "";
importEndDate = importSource?.initial_backfill_end_date || "";
backfillMode =
importStartDate || importEndDate ? "date_range" : "all_time";
startImportSourcePolling();
}
return () => {
if (ui.show_imports_and_mirrors) {
stopImportSourcePolling();
}
};
});
$effect(() => {
syncImportFromProps(heartbeat_import);
});
$effect(() => {
if (!ui.show_imports_and_mirrors) {
importSource = null;
importStartDate = "";
importEndDate = "";
backfillMode = "all_time";
return;
}
importSource = import_source || null;
importStartDate = importSource?.initial_backfill_start_date || "";
importEndDate = importSource?.initial_backfill_end_date || "";
backfillMode = importStartDate || importEndDate ? "date_range" : "all_time";
});
function isTerminalImportState(state: string) {
return state === "completed" || state === "failed";
}
@ -279,6 +324,373 @@
{/if}
</section>
{#if ui.show_imports_and_mirrors}
<section
id="imports_and_mirrors"
class="rounded-md border border-surface-200 bg-darker p-4 sm:p-5"
>
<h2 class="text-xl font-semibold text-surface-content">
Imports & Mirrors
</h2>
<p class="mt-1 text-sm text-muted">
Connect WakaTime-compatible sources and destinations.
</p>
<div class="mt-4 space-y-7">
<section id="wakatime_import_source">
<h3 class="text-lg font-semibold text-surface-content">
Import from WakaTime
</h3>
{#if importSource}
<div
class="mt-3 rounded-md border border-surface-200 bg-surface p-3"
>
<p class="text-sm text-surface-content">
Status: <span class="font-semibold"
>{importSource.status}</span
>
</p>
<p class="mt-1 text-xs text-muted">
Last synced: {importSource.last_synced_ago || "Never"}
</p>
<p class="mt-1 text-xs text-muted">
Imported: {importSource.imported_count.toLocaleString()}
</p>
{#if importSource.last_error_message}
<p class="mt-1 text-xs text-red-300">
Last error: {importSource.last_error_message}
</p>
{/if}
</div>
{/if}
<form
method="post"
action={paths.heartbeat_import_source_path}
class="mt-3 space-y-3 rounded-md border border-surface-200 bg-surface p-3"
>
<input
type="hidden"
name="authenticity_token"
value={csrfToken}
/>
{#if importSource}
<input type="hidden" name="_method" value="patch" />
{/if}
<div>
<label
for="import_endpoint_url"
class="mb-2 block text-sm text-surface-content"
>
Endpoint URL
</label>
<input
id="import_endpoint_url"
type="url"
name="heartbeat_import_source[endpoint_url]"
required
value={importSource?.endpoint_url ||
"https://wakatime.com/api/v1"}
class="w-full rounded-md border border-surface-200 bg-darker px-3 py-2 text-sm text-surface-content focus:border-primary focus:outline-none"
/>
</div>
<div>
<label
for="import_api_key"
class="mb-2 block text-sm text-surface-content"
>
API Key
</label>
<input
id="import_api_key"
type="password"
name="heartbeat_import_source[encrypted_api_key]"
required={!importSource}
class="w-full rounded-md border border-surface-200 bg-darker px-3 py-2 text-sm text-surface-content focus:border-primary focus:outline-none"
/>
{#if importSource}
<p class="mt-1 text-xs text-muted">
Leave blank to keep the existing key.
</p>
{/if}
</div>
<div class="rounded-md border border-surface-200 bg-darker p-3">
<p class="text-sm font-semibold text-surface-content">
Backfill scope
</p>
<div class="mt-2 flex flex-wrap items-center gap-4">
<label
class="inline-flex items-center gap-2 text-sm text-surface-content"
>
<input
type="radio"
name="backfill_mode"
value="all_time"
checked={backfillMode === "all_time"}
onchange={() => {
backfillMode = "all_time";
importStartDate = "";
importEndDate = "";
}}
class="peer sr-only"
/>
<span
class="h-4 w-4 rounded-full border border-surface-200 bg-surface ring-offset-2 ring-offset-surface transition peer-checked:border-primary peer-checked:bg-primary peer-focus-visible:ring-2 peer-focus-visible:ring-primary/40"
></span>
All time
</label>
<label
class="inline-flex items-center gap-2 text-sm text-surface-content"
>
<input
type="radio"
name="backfill_mode"
value="date_range"
checked={backfillMode === "date_range"}
onchange={() => {
backfillMode = "date_range";
}}
class="peer sr-only"
/>
<span
class="h-4 w-4 rounded-full border border-surface-200 bg-surface ring-offset-2 ring-offset-surface transition peer-checked:border-primary peer-checked:bg-primary peer-focus-visible:ring-2 peer-focus-visible:ring-primary/40"
></span>
Specific date range
</label>
</div>
{#if backfillMode === "all_time"}
<input
type="hidden"
name="heartbeat_import_source[initial_backfill_start_date]"
value=""
/>
<input
type="hidden"
name="heartbeat_import_source[initial_backfill_end_date]"
value=""
/>
{:else}
<div class="mt-3 grid grid-cols-1 gap-3 sm:grid-cols-2">
<div>
<label
for="import_start_date"
class="mb-2 block text-sm text-surface-content"
>
Start date
</label>
<input
id="import_start_date"
type="date"
name="heartbeat_import_source[initial_backfill_start_date]"
bind:value={importStartDate}
required
class="w-full rounded-md border border-surface-200 bg-surface px-3 py-2 text-sm text-surface-content focus:border-primary focus:outline-none"
/>
</div>
<div>
<label
for="import_end_date"
class="mb-2 block text-sm text-surface-content"
>
End date
</label>
<input
id="import_end_date"
type="date"
name="heartbeat_import_source[initial_backfill_end_date]"
bind:value={importEndDate}
required
class="w-full rounded-md border border-surface-200 bg-surface px-3 py-2 text-sm text-surface-content focus:border-primary focus:outline-none"
/>
</div>
</div>
{/if}
</div>
<div class="flex flex-wrap items-center gap-3">
<input
type="hidden"
name="heartbeat_import_source[sync_enabled]"
value="0"
/>
<label
class="inline-flex items-center gap-2 text-sm text-surface-content"
>
<input
type="checkbox"
name="heartbeat_import_source[sync_enabled]"
value="1"
checked={importSource ? importSource.sync_enabled : true}
class="h-4 w-4 rounded border-surface-200 bg-surface text-primary"
/>
Continuous sync enabled
</label>
{#if importSource}
<label
class="inline-flex items-center gap-2 text-sm text-surface-content"
>
<input
type="checkbox"
name="heartbeat_import_source[rerun_backfill]"
value="1"
class="h-4 w-4 rounded border-surface-200 bg-surface text-primary"
/>
Re-run backfill
</label>
{/if}
</div>
<div class="flex flex-wrap gap-2">
<Button type="submit" variant="primary">
{importSource ? "Update source" : "Create source"}
</Button>
</div>
</form>
{#if importSource}
<div class="mt-3 flex flex-wrap gap-2">
<form
method="post"
action={paths.heartbeat_import_source_sync_path}
>
<input
type="hidden"
name="authenticity_token"
value={csrfToken}
/>
<Button type="submit" variant="surface">Sync now</Button>
</form>
<form
method="post"
action={paths.heartbeat_import_source_path}
onsubmit={(event) => {
if (
!window.confirm("Remove import source configuration?")
) {
event.preventDefault();
}
}}
>
<input type="hidden" name="_method" value="delete" />
<input
type="hidden"
name="authenticity_token"
value={csrfToken}
/>
<Button type="submit" variant="surface">Remove source</Button>
</form>
</div>
{/if}
</section>
<section id="wakatime_mirror">
<h3 class="text-lg font-semibold text-surface-content">
Mirror to WakaTime
</h3>
{#if mirrors.length > 0}
<div class="mt-3 space-y-2">
{#each mirrors as mirror}
<div
class="rounded-md border border-surface-200 bg-surface p-3"
>
<p class="text-sm font-semibold text-surface-content">
{mirror.endpoint_url}
</p>
<p class="mt-1 text-xs text-muted">
Status: {mirror.enabled ? "enabled" : "paused"}
</p>
<p class="mt-1 text-xs text-muted">
Last synced: {mirror.last_synced_ago}
</p>
{#if mirror.last_error_message}
<p class="mt-1 text-xs text-red-300">
Last error: {mirror.last_error_message}
</p>
{/if}
{#if mirror.consecutive_failures && mirror.consecutive_failures > 0}
<p class="mt-1 text-xs text-muted">
Consecutive failures: {mirror.consecutive_failures}
</p>
{/if}
<form
method="post"
action={mirror.destroy_path}
class="mt-3"
onsubmit={(event) => {
if (!window.confirm("Delete this mirror endpoint?")) {
event.preventDefault();
}
}}
>
<input type="hidden" name="_method" value="delete" />
<input
type="hidden"
name="authenticity_token"
value={csrfToken}
/>
<Button type="submit" variant="surface" size="xs">
Delete mirror
</Button>
</form>
</div>
{/each}
</div>
{/if}
<form
method="post"
action={paths.user_wakatime_mirrors_path}
class="mt-3 space-y-3 rounded-md border border-surface-200 bg-surface p-3"
>
<input
type="hidden"
name="authenticity_token"
value={csrfToken}
/>
<div>
<label
for="mirror_endpoint_url"
class="mb-2 block text-sm text-surface-content"
>
Endpoint URL
</label>
<input
id="mirror_endpoint_url"
type="url"
name="wakatime_mirror[endpoint_url]"
value="https://wakatime.com/api/v1"
required
class="w-full rounded-md border border-surface-200 bg-darker px-3 py-2 text-sm text-surface-content focus:border-primary focus:outline-none"
/>
</div>
<div>
<label
for="mirror_key"
class="mb-2 block text-sm text-surface-content"
>
WakaTime API Key
</label>
<input
id="mirror_key"
type="password"
name="wakatime_mirror[encrypted_api_key]"
required
class="w-full rounded-md border border-surface-200 bg-darker px-3 py-2 text-sm text-surface-content focus:border-primary focus:outline-none"
/>
</div>
<Button type="submit" variant="primary">Add mirror</Button>
</form>
</section>
</div>
</section>
{/if}
<section id="download_user_data">
<h2 class="text-xl font-semibold text-surface-content">Download Data</h2>

View file

@ -86,6 +86,8 @@ export type PathsProps = {
create_heartbeat_import_path: string;
create_deletion_path: string;
user_wakatime_mirrors_path: string;
heartbeat_import_source_path: string;
heartbeat_import_source_sync_path: string;
};
export type OptionsProps = {
@ -161,13 +163,19 @@ export type AdminToolsProps = {
mirrors: {
id: number;
endpoint_url: string;
enabled?: boolean;
last_synced_at?: string | null;
last_synced_ago: string;
consecutive_failures?: number;
last_error_message?: string | null;
last_error_at?: string | null;
destroy_path: string;
}[];
};
export type UiProps = {
show_dev_import: boolean;
show_imports_and_mirrors: boolean;
};
export type HeartbeatImportStatusProps = {
@ -190,6 +198,23 @@ export type HeartbeatImportProps = {
status?: HeartbeatImportStatusProps | null;
};
export type HeartbeatImportSourceProps = {
id: number;
provider: string;
endpoint_url: string;
sync_enabled: boolean;
status: string;
initial_backfill_start_date?: string | null;
initial_backfill_end_date?: string | null;
backfill_cursor_date?: string | null;
last_synced_at?: string | null;
last_synced_ago?: string | null;
last_error_message?: string | null;
last_error_at?: string | null;
consecutive_failures: number;
imported_count: number;
};
export type ErrorsProps = {
full_messages: string[];
username: string[];
@ -248,6 +273,8 @@ export type DataPageProps = SettingsCommonProps & {
paths: PathsProps;
migration: MigrationProps;
data_export: DataExportProps;
import_source?: HeartbeatImportSourceProps | null;
mirrors: AdminToolsProps["mirrors"];
ui: UiProps;
heartbeat_import: HeartbeatImportProps;
};
@ -292,7 +319,7 @@ export const buildSections = (sectionPaths: SectionPaths, adminVisible: boolean)
{
id: "data" as SectionId,
label: "Data",
blurb: "Exports, migration jobs, and account deletion controls.",
blurb: "Exports, imports, mirrors, migration jobs, and deletion controls.",
path: sectionPaths.data,
},
];
@ -301,7 +328,7 @@ export const buildSections = (sectionPaths: SectionPaths, adminVisible: boolean)
sections.push({
id: "admin",
label: "Admin",
blurb: "WakaTime mirror endpoints.",
blurb: "Administrative controls.",
path: sectionPaths.admin,
});
}
@ -327,9 +354,10 @@ const hashSectionMap: Record<string, SectionId> = {
user_markscribe: "badges",
user_heatmap: "badges",
user_migration_assistant: "data",
wakatime_import_source: "data",
download_user_data: "data",
delete_account: "data",
wakatime_mirror: "admin",
wakatime_mirror: "data",
};
export const sectionFromHash = (hash: string): SectionId | null => {

View file

@ -0,0 +1,11 @@
class HeartbeatImportSourceSchedulerJob < ApplicationJob
queue_as :latency_5m
def perform
return unless Flipper.enabled?(:wakatime_imports_mirrors)
HeartbeatImportSource.where(sync_enabled: true).where.not(status: :paused).pluck(:id).each do |source_id|
HeartbeatImportSourceSyncJob.perform_later(source_id)
end
end
end

View file

@ -0,0 +1,149 @@
class HeartbeatImportSourceSyncDayJob < ApplicationJob
queue_as :latency_5m
include GoodJob::ActiveJobExtensions::Concurrency
retry_on WakatimeCompatibleClient::TransientError,
wait: ->(executions) { (executions**2).seconds + rand(1..4).seconds },
attempts: 8
good_job_control_concurrency_with(
key: -> { "heartbeat_import_source_sync_day_job_#{arguments.first}_#{arguments.second}" },
total_limit: 1
)
def perform(source_id, date_string)
return unless Flipper.enabled?(:wakatime_imports_mirrors)
source = HeartbeatImportSource.find_by(id: source_id)
return unless source&.sync_enabled?
date = Date.iso8601(date_string)
rows = source.client.fetch_heartbeats(date:)
upsert_heartbeats(source.user_id, rows)
source.update!(
last_synced_at: Time.current,
last_error_message: nil,
last_error_at: nil,
consecutive_failures: 0
)
rescue WakatimeCompatibleClient::AuthenticationError => e
source&.update!(
sync_enabled: false,
status: :paused,
last_error_message: e.message.to_s.truncate(500),
last_error_at: Time.current,
consecutive_failures: source.consecutive_failures.to_i + 1
)
rescue WakatimeCompatibleClient::TransientError => e
source&.update!(
status: source&.backfilling? ? :backfilling : :failed,
last_error_message: e.message.to_s.truncate(500),
last_error_at: Time.current,
consecutive_failures: source.consecutive_failures.to_i + 1
)
raise
rescue WakatimeCompatibleClient::RequestError => e
source&.update!(
status: :failed,
last_error_message: e.message.to_s.truncate(500),
last_error_at: Time.current,
consecutive_failures: source.consecutive_failures.to_i + 1
)
rescue ArgumentError => e
source&.update!(
status: :failed,
last_error_message: e.message.to_s.truncate(500),
last_error_at: Time.current,
consecutive_failures: source.consecutive_failures.to_i + 1
)
end
private
def upsert_heartbeats(user_id, rows)
normalized = rows.filter_map { |row| normalize_row(user_id, row) }
return if normalized.empty?
deduped_records = normalized.group_by { |record| record[:fields_hash] }.map do |_, records|
records.max_by { |record| record[:time].to_f }
end
Heartbeat.upsert_all(deduped_records, unique_by: [ :fields_hash ])
end
def normalize_row(user_id, row)
data = row.respond_to?(:with_indifferent_access) ? row.with_indifferent_access : row.to_h.with_indifferent_access
timestamp = extract_timestamp(data)
return nil if timestamp.blank?
attrs = {
user_id: user_id,
branch: value_or_nil(data[:branch]),
category: value_or_nil(data[:category]) || "coding",
dependencies: extract_dependencies(data[:dependencies]),
editor: value_or_nil(data[:editor]),
entity: value_or_nil(data[:entity]),
language: value_or_nil(data[:language]),
machine: value_or_nil(data[:machine]),
operating_system: value_or_nil(data[:operating_system]),
project: value_or_nil(data[:project]),
type: value_or_nil(data[:type]),
user_agent: value_or_nil(data[:user_agent]),
line_additions: data[:line_additions],
line_deletions: data[:line_deletions],
lineno: data[:lineno],
lines: data[:lines],
cursorpos: data[:cursorpos],
project_root_count: data[:project_root_count],
time: timestamp,
is_write: ActiveModel::Type::Boolean.new.cast(data[:is_write]),
source_type: :wakapi_import
}
now = Time.current
attrs[:created_at] = now
attrs[:updated_at] = now
attrs[:fields_hash] = Heartbeat.generate_fields_hash(attrs)
attrs
rescue TypeError, JSON::ParserError
nil
end
def extract_dependencies(value)
return value if value.is_a?(Array)
return [] if value.blank?
JSON.parse(value.to_s)
rescue JSON::ParserError
value.to_s.split(",").map(&:strip).reject(&:blank?)
end
def extract_timestamp(data)
value = data[:time]
value = data[:created_at] if value.blank?
return nil if value.blank?
if value.is_a?(Numeric)
normalized = value.to_f
return (normalized / 1000.0) if normalized > 1_000_000_000_000
return normalized
end
parsed = Time.parse(value.to_s).to_f
return parsed if parsed.positive?
nil
rescue ArgumentError
nil
end
def value_or_nil(value)
return nil if value.nil?
return value.strip.presence if value.is_a?(String)
value
end
end

View file

@ -0,0 +1,131 @@
class HeartbeatImportSourceSyncJob < ApplicationJob
queue_as :latency_5m
include GoodJob::ActiveJobExtensions::Concurrency
BACKFILL_WINDOW_DAYS = 5
retry_on WakatimeCompatibleClient::TransientError,
wait: ->(executions) { (executions**2).seconds + rand(1..4).seconds },
attempts: 8
good_job_control_concurrency_with(
key: -> { "heartbeat_import_source_sync_job_#{arguments.first}" },
total_limit: 1
)
def perform(source_id)
return unless Flipper.enabled?(:wakatime_imports_mirrors)
source = HeartbeatImportSource.find_by(id: source_id)
return unless source&.sync_enabled?
return if source.paused?
initialize_backfill_if_needed(source)
if source.backfilling?
schedule_backfill_window(source)
return
end
source.update!(status: :syncing)
enqueue_day_sync(source, Date.yesterday)
enqueue_day_sync(source, Date.current)
rescue WakatimeCompatibleClient::AuthenticationError => e
source&.update!(
sync_enabled: false,
status: :paused,
last_error_message: e.message.to_s.truncate(500),
last_error_at: Time.current,
consecutive_failures: source.consecutive_failures.to_i + 1
)
rescue WakatimeCompatibleClient::TransientError => e
source&.update!(
status: source&.backfilling? ? :backfilling : :failed,
last_error_message: e.message.to_s.truncate(500),
last_error_at: Time.current,
consecutive_failures: source.consecutive_failures.to_i + 1
)
raise
rescue WakatimeCompatibleClient::RequestError => e
source&.update!(
status: :failed,
last_error_message: e.message.to_s.truncate(500),
last_error_at: Time.current,
consecutive_failures: source.consecutive_failures.to_i + 1
)
end
private
def initialize_backfill_if_needed(source)
should_initialize = source.idle? ||
(source.failed? && source.backfill_cursor_date.blank? && source.last_synced_at.blank?)
return unless should_initialize
return unless source.backfill_cursor_date.blank?
start_date = source.initial_backfill_start_date
end_date = source.initial_backfill_end_date || Date.current
if start_date.blank?
begin
start_date = source.client.fetch_all_time_since_today_start_date
rescue => e
Rails.logger.error("Failed to fetch all_time_since_today for source #{source.id}: #{e.message}")
source.update!(status: :failed, last_error_message: e.message, last_error_at: Time.current)
return
end
end
if start_date > end_date
source.update!(
status: :syncing,
backfill_cursor_date: nil,
initial_backfill_start_date: start_date,
initial_backfill_end_date: end_date
)
return
end
source.update!(
status: :backfilling,
initial_backfill_start_date: start_date,
initial_backfill_end_date: end_date,
backfill_cursor_date: start_date,
last_error_message: nil,
last_error_at: nil,
consecutive_failures: 0
)
end
def schedule_backfill_window(source)
cursor = source.backfill_cursor_date
end_date = source.initial_backfill_end_date || Date.current
return if cursor.blank?
if cursor > end_date
source.update!(status: :syncing, backfill_cursor_date: nil)
self.class.perform_later(source.id)
return
end
window_end = [ cursor + (BACKFILL_WINDOW_DAYS - 1).days, end_date ].min
(cursor..window_end).each do |date|
enqueue_day_sync(source, date)
end
next_cursor = window_end + 1.day
if next_cursor > end_date
source.update!(status: :syncing, backfill_cursor_date: nil)
self.class.perform_later(source.id)
else
source.update!(status: :backfilling, backfill_cursor_date: next_cursor)
self.class.perform_later(source.id)
end
end
def enqueue_day_sync(source, date)
HeartbeatImportSourceSyncDayJob.perform_later(source.id, date.iso8601)
end
end

View file

@ -0,0 +1,24 @@
class MirrorFanoutEnqueueJob < ApplicationJob
queue_as :latency_10s
DEBOUNCE_TTL = 10.seconds
def perform(user_id)
return unless Flipper.enabled?(:wakatime_imports_mirrors)
return if debounced?(user_id)
User.find_by(id: user_id)&.wakatime_mirrors&.active&.pluck(:id)&.each do |mirror_id|
WakatimeMirrorSyncJob.perform_later(mirror_id)
end
end
private
def debounced?(user_id)
key = "mirror_fanout_enqueue_job:user:#{user_id}"
return true if Rails.cache.read(key)
Rails.cache.write(key, true, expires_in: DEBOUNCE_TTL)
false
end
end

View file

@ -1,7 +1,86 @@
class WakatimeMirrorSyncJob < ApplicationJob
queue_as :default
queue_as :latency_10s
def perform(mirror)
mirror.sync_heartbeats
include GoodJob::ActiveJobExtensions::Concurrency
BATCH_SIZE = 25
MAX_BATCHES_PER_RUN = 20
class MirrorTransientError < StandardError; end
retry_on MirrorTransientError,
wait: ->(executions) { (executions**2).seconds + rand(1..4).seconds },
attempts: 8
retry_on HTTP::TimeoutError, HTTP::ConnectionError,
wait: ->(executions) { (executions**2).seconds + rand(1..4).seconds },
attempts: 8
good_job_control_concurrency_with(
key: -> { "wakatime_mirror_sync_job_#{arguments.first}" },
total_limit: 1
)
def perform(mirror_id)
return unless Flipper.enabled?(:wakatime_imports_mirrors)
mirror = WakatimeMirror.find_by(id: mirror_id)
return unless mirror&.enabled?
batches_processed = 0
cursor = mirror.last_synced_heartbeat_id.to_i
loop do
batch = mirror.direct_heartbeats_after(cursor).limit(BATCH_SIZE).to_a
break if batch.empty?
response = mirror.post_heartbeats(batch.map { |heartbeat| mirror_payload(heartbeat) })
status_code = response.status.to_i
if response.status.success?
cursor = batch.last.id
mirror.update!(
last_synced_heartbeat_id: cursor,
last_synced_at: Time.current,
consecutive_failures: 0,
last_error_message: nil,
last_error_at: nil
)
elsif [ 401, 403 ].include?(status_code)
mirror.mark_auth_failed!("Authentication failed (#{status_code}). Check your API key.")
return
elsif transient_status?(status_code)
mirror.record_transient_failure!("Mirror request failed with status #{status_code}.")
raise MirrorTransientError, "Mirror request failed with status #{status_code}"
else
mirror.mark_failed!("Mirror request failed with status #{status_code}.")
return
end
batches_processed += 1
break if batches_processed >= MAX_BATCHES_PER_RUN
end
if batches_processed >= MAX_BATCHES_PER_RUN &&
mirror.direct_heartbeats_after(cursor).exists?
self.class.perform_later(mirror.id)
end
rescue HTTP::TimeoutError, HTTP::ConnectionError => e
mirror&.record_transient_failure!("Mirror request failed: #{e.class.name}")
raise
end
private
def mirror_payload(heartbeat)
heartbeat.attributes.slice(*payload_attributes)
end
def payload_attributes
@payload_attributes ||= Heartbeat.indexed_attributes - [ "user_id" ]
end
def transient_status?(status_code)
status_code == 408 || status_code == 429 || status_code >= 500
end
end

View file

@ -0,0 +1,77 @@
class HeartbeatImportSource < ApplicationRecord
require "uri"
belongs_to :user
encrypts :encrypted_api_key, deterministic: false
enum :provider, {
wakatime_compatible: 0
}
enum :status, {
idle: 0,
backfilling: 1,
syncing: 2,
paused: 3,
failed: 4
}
validates :provider, presence: true
validates :endpoint_url, presence: true
validates :encrypted_api_key, presence: true
validates :user_id, uniqueness: true
validate :validate_endpoint_url
validate :validate_backfill_range
before_validation :normalize_endpoint_url
def client
WakatimeCompatibleClient.new(endpoint_url:, api_key: encrypted_api_key)
end
def reset_backfill!
update!(
status: :idle,
backfill_cursor_date: nil,
last_synced_at: nil,
last_error_message: nil,
last_error_at: nil,
consecutive_failures: 0
)
end
private
def normalize_endpoint_url
self.endpoint_url = endpoint_url.to_s.strip.sub(%r{/*\z}, "")
end
def validate_backfill_range
return unless initial_backfill_start_date.present? && initial_backfill_end_date.present?
return unless initial_backfill_start_date > initial_backfill_end_date
errors.add(:initial_backfill_end_date, "must be on or after the start date")
end
def validate_endpoint_url
return if endpoint_url.blank?
uri = URI.parse(endpoint_url)
unless uri.is_a?(URI::HTTP) || uri.is_a?(URI::HTTPS)
errors.add(:endpoint_url, "must be an HTTP or HTTPS URL")
return
end
if uri.host.blank?
errors.add(:endpoint_url, "must include a host")
return
end
if !Rails.env.development? && uri.scheme != "https"
errors.add(:endpoint_url, "must use https")
end
rescue URI::InvalidURIError
errors.add(:endpoint_url, "is invalid")
end
end

View file

@ -133,6 +133,7 @@ class User < ApplicationRecord
class_name: "SailorsLog"
has_many :wakatime_mirrors, dependent: :destroy
has_one :heartbeat_import_source, dependent: :destroy
scope :search_identity, ->(term) {
term = term.to_s.strip.downcase

View file

@ -2,65 +2,110 @@ class WakatimeMirror < ApplicationRecord
require "uri"
belongs_to :user
has_many :heartbeats, through: :user
encrypts :encrypted_api_key, deterministic: false
attr_accessor :request_host
validates :endpoint_url, presence: true
validates :encrypted_api_key, presence: true
validates :endpoint_url, uniqueness: { scope: :user_id }
validate :endpoint_url_not_hackatime
validate :validate_endpoint_url
after_create :schedule_initial_sync
before_validation :normalize_endpoint_url
before_create :initialize_last_synced_heartbeat_id
def unsynced_heartbeats
# Get heartbeats since last sync, or all heartbeats if never synced
user.heartbeats.where("created_at > ?", last_synced_at || Time.at(0))
scope :active, -> { where(enabled: true) }
def direct_heartbeats_after(heartbeat_id)
user.heartbeats.where(source_type: :direct_entry).where("id > ?", heartbeat_id.to_i).order(id: :asc)
end
def sync_heartbeats
return unless encrypted_api_key.present?
# Get the next batch of heartbeats to sync (max 25 per WakaTime API limit)
batch = unsynced_heartbeats.limit(25).to_a
return if batch.empty?
# Send them all in a single request using the bulk endpoint
begin
body = batch.map { |h| h.attributes.slice(*Heartbeat.indexed_attributes) }
response = HTTP.headers(
"Authorization" => "Basic #{Base64.strict_encode64(encrypted_api_key + ':')}",
def post_heartbeats(payload)
HTTP.timeout(connect: 5, read: 30, write: 10)
.headers(
"Authorization" => "Basic #{Base64.strict_encode64("#{encrypted_api_key}:")}",
"Content-Type" => "application/json"
).post(
"#{endpoint_url}/users/current/heartbeats.bulk",
json: body
)
.post("#{endpoint_url}/users/current/heartbeats.bulk", json: payload)
end
if response.status.success?
update_column(:last_synced_at, Time.current)
puts "Successfully synced #{batch.size} heartbeats: #{response.body}"
# queue another sync job
WakatimeMirrorSyncJob.perform_later(self)
else
Rails.logger.error("Failed to sync heartbeats to #{endpoint_url}: #{response.body}")
end
rescue => e
Rails.logger.error("Error syncing heartbeats to #{endpoint_url}: #{e.message}")
end
def clear_error_state!
update!(
last_error_message: nil,
last_error_at: nil,
consecutive_failures: 0
)
end
def record_transient_failure!(message)
update!(
status_payload_for_failure(message, keep_enabled: true)
)
end
def mark_auth_failed!(message)
update!(
status_payload_for_failure(message, keep_enabled: false)
)
end
def mark_failed!(message)
update!(
status_payload_for_failure(message, keep_enabled: enabled)
)
end
private
def endpoint_url_not_hackatime
def initialize_last_synced_heartbeat_id
self.last_synced_heartbeat_id ||= user.heartbeats.maximum(:id)
end
def normalize_endpoint_url
self.endpoint_url = endpoint_url.to_s.strip.sub(%r{/*\z}, "")
end
def validate_endpoint_url
return unless endpoint_url.present?
uri = URI.parse(endpoint_url)
errors.add(:endpoint_url, "cannot be hackatime.hackclub.com") if uri.host == "hackatime.hackclub.com"
unless uri.is_a?(URI::HTTP) || uri.is_a?(URI::HTTPS)
errors.add(:endpoint_url, "must be an HTTP or HTTPS URL")
return
end
if uri.host.blank?
errors.add(:endpoint_url, "must include a host")
return
end
if !Rails.env.development? && uri.scheme != "https"
errors.add(:endpoint_url, "must use https")
return
end
if disallowed_hosts.include?(uri.host.downcase)
errors.add(:endpoint_url, "cannot target this Hackatime host")
end
rescue URI::InvalidURIError
# other validations will handle invalid URLs
errors.add(:endpoint_url, "is invalid")
end
def schedule_initial_sync
WakatimeMirrorSyncJob.perform_later(self)
def disallowed_hosts
hosts = %w[hackatime.hackclub.com www.hackatime.hackclub.com localhost 127.0.0.1]
hosts << request_host.to_s.downcase if request_host.present?
default_host = Rails.application.config.action_mailer.default_url_options&.dig(:host)
hosts << default_host.to_s.downcase if default_host.present?
hosts.uniq
end
def status_payload_for_failure(message, keep_enabled:)
{
enabled: keep_enabled,
last_error_message: message.to_s.truncate(500),
last_error_at: Time.current,
consecutive_failures: consecutive_failures.to_i + 1
}
end
end

View file

@ -0,0 +1,63 @@
class WakatimeCompatibleClient
class AuthenticationError < StandardError; end
class TransientError < StandardError; end
class RequestError < StandardError; end
def initialize(endpoint_url:, api_key:)
@endpoint_url = endpoint_url.to_s.sub(%r{/*\z}, "")
@api_key = api_key.to_s
end
def fetch_all_time_since_today_start_date
body = get_json("/users/current/all_time_since_today")
start_date = body.dig("data", "range", "start_date") ||
body.dig("data", "start_date") ||
body.dig("range", "start_date")
raise RequestError, "Missing start_date in all_time_since_today response" if start_date.blank?
Date.iso8601(start_date.to_s)
rescue ArgumentError
raise RequestError, "Invalid start_date in all_time_since_today response"
end
def fetch_heartbeats(date:)
body = get_json("/users/current/heartbeats", params: { date: date.iso8601 })
if body.is_a?(Array)
body
elsif body["data"].is_a?(Array)
body["data"]
elsif body["heartbeats"].is_a?(Array)
body["heartbeats"]
else
[]
end
end
private
def get_json(path, params: nil)
response = HTTP.timeout(connect: 5, read: 30, write: 10)
.headers(headers)
.get("#{@endpoint_url}#{path}", params:)
status = response.status.to_i
raise AuthenticationError, "Authentication failed (#{status})" if [ 401, 403 ].include?(status)
raise TransientError, "Request failed with status #{status}" if status == 408 || status == 429 || status >= 500
raise RequestError, "Request failed with status #{status}" unless response.status.success?
JSON.parse(response.to_s)
rescue HTTP::TimeoutError, HTTP::ConnectionError => e
raise TransientError, e.message
rescue JSON::ParserError
raise RequestError, "Invalid JSON response"
end
def headers
{
"Authorization" => "Basic #{Base64.strict_encode64("#{@api_key}:")}",
"Content-Type" => "application/json",
"Accept" => "application/json"
}
end
end

View file

@ -110,6 +110,10 @@ Rails.application.configure do
class: "Cache::HeartbeatCountsJob",
kwargs: { force_reload: true }
},
heartbeat_import_source_scheduler: {
cron: "*/5 * * * *",
class: "HeartbeatImportSourceSchedulerJob"
},
geocode_users_without_country: {
cron: "7 * * * *",

View file

@ -158,6 +158,12 @@ Rails.application.routes.draw do
post "my/settings/rotate_api_key", to: "settings/access#rotate_api_key", as: :my_settings_rotate_api_key
namespace :my do
resource :heartbeat_import_source,
only: [ :create, :update, :show, :destroy ],
controller: "heartbeat_import_sources" do
post :sync, on: :collection, action: :sync_now
end
resources :heartbeat_imports, only: [ :create, :show ]
resources :project_repo_mappings, param: :project_name, only: [ :edit, :update ], constraints: { project_name: /.+/ } do

View file

@ -0,0 +1,21 @@
class CreateHeartbeatImportSources < ActiveRecord::Migration[8.1]
def change
create_table :heartbeat_import_sources do |t|
t.references :user, null: false, foreign_key: true, index: { unique: true }
t.integer :provider, null: false, default: 0
t.string :endpoint_url, null: false
t.string :encrypted_api_key, null: false
t.boolean :sync_enabled, null: false, default: true
t.integer :status, null: false, default: 0
t.date :initial_backfill_start_date
t.date :initial_backfill_end_date
t.date :backfill_cursor_date
t.datetime :last_synced_at
t.text :last_error_message
t.datetime :last_error_at
t.integer :consecutive_failures, null: false, default: 0
t.timestamps
end
end
end

View file

@ -0,0 +1,9 @@
class AddSyncStateToWakatimeMirrors < ActiveRecord::Migration[8.1]
def change
add_column :wakatime_mirrors, :enabled, :boolean, null: false, default: true
add_column :wakatime_mirrors, :last_synced_heartbeat_id, :bigint
add_column :wakatime_mirrors, :last_error_message, :text
add_column :wakatime_mirrors, :last_error_at, :datetime
add_column :wakatime_mirrors, :consecutive_failures, :integer, null: false, default: 0
end
end

View file

@ -0,0 +1,17 @@
class AddIndexOnHeartbeatsForMirrorReads < ActiveRecord::Migration[8.1]
disable_ddl_transaction!
INDEX_NAME = "index_heartbeats_on_user_source_id_direct".freeze
def up
add_index :heartbeats,
[ :user_id, :source_type, :id ],
name: INDEX_NAME,
where: "(source_type = 0 AND deleted_at IS NULL)",
algorithm: :concurrently
end
def down
remove_index :heartbeats, name: INDEX_NAME, algorithm: :concurrently
end
end

28
db/schema.rb generated
View file

@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema[8.1].define(version: 2026_02_21_113553) do
ActiveRecord::Schema[8.1].define(version: 2026_02_23_134705) do
# These are extensions that must be enabled in order to support this database
enable_extension "pg_catalog.plpgsql"
enable_extension "pg_stat_statements"
@ -257,6 +257,25 @@ ActiveRecord::Schema[8.1].define(version: 2026_02_21_113553) do
t.index ["name"], name: "index_heartbeat_editors_on_name", unique: true
end
create_table "heartbeat_import_sources", force: :cascade do |t|
t.date "backfill_cursor_date"
t.integer "consecutive_failures", default: 0, null: false
t.datetime "created_at", null: false
t.string "encrypted_api_key", null: false
t.string "endpoint_url", null: false
t.date "initial_backfill_end_date"
t.date "initial_backfill_start_date"
t.datetime "last_error_at"
t.text "last_error_message"
t.datetime "last_synced_at"
t.integer "provider", default: 0, null: false
t.integer "status", default: 0, null: false
t.boolean "sync_enabled", default: true, null: false
t.datetime "updated_at", null: false
t.bigint "user_id", null: false
t.index ["user_id"], name: "index_heartbeat_import_sources_on_user_id", unique: true
end
create_table "heartbeat_languages", force: :cascade do |t|
t.datetime "created_at", null: false
t.string "name", null: false
@ -356,6 +375,7 @@ ActiveRecord::Schema[8.1].define(version: 2026_02_21_113553) do
t.index ["user_id", "operating_system", "time"], name: "idx_heartbeats_user_operating_system_time", where: "(deleted_at IS NULL)"
t.index ["user_id", "project", "time"], name: "idx_heartbeats_user_project_time_stats", where: "((deleted_at IS NULL) AND (project IS NOT NULL))"
t.index ["user_id", "project"], name: "index_heartbeats_on_user_id_and_project", where: "(deleted_at IS NULL)"
t.index ["user_id", "source_type", "id"], name: "index_heartbeats_on_user_source_id_direct", where: "((source_type = 0) AND (deleted_at IS NULL))"
t.index ["user_id", "time", "category"], name: "index_heartbeats_on_user_time_category"
t.index ["user_id", "time", "language"], name: "idx_heartbeats_user_time_language_stats", where: "(deleted_at IS NULL)"
t.index ["user_id", "time", "language_id"], name: "idx_heartbeats_user_time_language_id", where: "(deleted_at IS NULL)"
@ -647,10 +667,15 @@ ActiveRecord::Schema[8.1].define(version: 2026_02_21_113553) do
end
create_table "wakatime_mirrors", force: :cascade do |t|
t.integer "consecutive_failures", default: 0, null: false
t.datetime "created_at", null: false
t.boolean "enabled", default: true, null: false
t.string "encrypted_api_key", null: false
t.string "endpoint_url", default: "https://wakatime.com/api/v1", null: false
t.datetime "last_error_at"
t.text "last_error_message"
t.datetime "last_synced_at"
t.bigint "last_synced_heartbeat_id"
t.datetime "updated_at", null: false
t.bigint "user_id", null: false
t.index ["user_id", "endpoint_url"], name: "index_wakatime_mirrors_on_user_id_and_endpoint_url", unique: true
@ -669,6 +694,7 @@ ActiveRecord::Schema[8.1].define(version: 2026_02_21_113553) do
add_foreign_key "email_verification_requests", "users"
add_foreign_key "goals", "users"
add_foreign_key "heartbeat_branches", "users"
add_foreign_key "heartbeat_import_sources", "users"
add_foreign_key "heartbeat_machines", "users"
add_foreign_key "heartbeat_projects", "users"
add_foreign_key "heartbeats", "heartbeat_branches", column: "branch_id"

View file

@ -0,0 +1,110 @@
require "test_helper"
class My::HeartbeatImportSourcesControllerTest < ActionDispatch::IntegrationTest
setup do
Flipper.enable(:wakatime_imports_mirrors)
end
teardown do
Flipper.disable(:wakatime_imports_mirrors)
end
test "requires auth for create" do
post my_heartbeat_import_source_path, params: {
heartbeat_import_source: {
endpoint_url: "https://wakatime.com/api/v1",
encrypted_api_key: "api-key"
}
}
assert_response :redirect
assert_redirected_to root_path
end
test "authenticated user can create source and queue sync" do
user = User.create!(timezone: "UTC")
sign_in_as(user)
GoodJob::Job.where(job_class: "HeartbeatImportSourceSyncJob").delete_all
assert_difference -> { HeartbeatImportSource.count }, 1 do
assert_difference -> { GoodJob::Job.where(job_class: "HeartbeatImportSourceSyncJob").count }, 1 do
post my_heartbeat_import_source_path, params: {
heartbeat_import_source: {
endpoint_url: "https://wakatime.com/api/v1",
encrypted_api_key: "api-key",
sync_enabled: "1"
}
}
end
end
assert_response :redirect
assert_redirected_to my_settings_data_path
end
test "show returns configured source payload" do
user = User.create!(timezone: "UTC")
source = user.create_heartbeat_import_source!(
provider: :wakatime_compatible,
endpoint_url: "https://wakatime.com/api/v1",
encrypted_api_key: "api-key"
)
sign_in_as(user)
get my_heartbeat_import_source_path
assert_response :success
payload = JSON.parse(response.body)
assert_equal source.id, payload.dig("import_source", "id")
assert_equal "wakatime_compatible", payload.dig("import_source", "provider")
end
test "sync now queues source sync" do
user = User.create!(timezone: "UTC")
source = user.create_heartbeat_import_source!(
provider: :wakatime_compatible,
endpoint_url: "https://wakatime.com/api/v1",
encrypted_api_key: "api-key",
sync_enabled: true
)
sign_in_as(user)
GoodJob::Job.where(job_class: "HeartbeatImportSourceSyncJob").delete_all
assert_difference -> { GoodJob::Job.where(job_class: "HeartbeatImportSourceSyncJob").count }, 1 do
post sync_my_heartbeat_import_source_path
end
assert_response :redirect
assert_redirected_to my_settings_data_path
assert_equal source.id, GoodJob::Job.where(job_class: "HeartbeatImportSourceSyncJob").last.serialized_params.dig("arguments", 0)
end
test "destroy removes source" do
user = User.create!(timezone: "UTC")
user.create_heartbeat_import_source!(
provider: :wakatime_compatible,
endpoint_url: "https://wakatime.com/api/v1",
encrypted_api_key: "api-key"
)
sign_in_as(user)
assert_difference -> { HeartbeatImportSource.count }, -1 do
delete my_heartbeat_import_source_path
end
assert_response :redirect
assert_redirected_to my_settings_data_path
end
test "returns not found json when imports and mirrors are disabled" do
user = User.create!(timezone: "UTC")
sign_in_as(user)
Flipper.disable(:wakatime_imports_mirrors)
get my_heartbeat_import_source_path, as: :json
assert_response :not_found
payload = JSON.parse(response.body)
assert_equal "Imports and mirrors are currently disabled.", payload["error"]
end
end

View file

@ -0,0 +1,46 @@
require "test_helper"
class WakatimeMirrorsControllerTest < ActionDispatch::IntegrationTest
setup do
Flipper.enable(:wakatime_imports_mirrors)
end
teardown do
Flipper.disable(:wakatime_imports_mirrors)
end
test "creates mirror when imports and mirrors are enabled" do
user = User.create!(timezone: "UTC")
sign_in_as(user)
assert_difference -> { user.reload.wakatime_mirrors.count }, 1 do
post user_wakatime_mirrors_path(user), params: {
wakatime_mirror: {
endpoint_url: "https://wakapi.dev/api/compat/wakatime/v1",
encrypted_api_key: "mirror-key"
}
}
end
assert_response :redirect
assert_redirected_to my_settings_data_path
end
test "blocks mirror create when imports and mirrors are disabled" do
user = User.create!(timezone: "UTC")
sign_in_as(user)
Flipper.disable(:wakatime_imports_mirrors)
assert_no_difference -> { user.reload.wakatime_mirrors.count } do
post user_wakatime_mirrors_path(user), params: {
wakatime_mirror: {
endpoint_url: "https://wakapi.dev/api/compat/wakatime/v1",
encrypted_api_key: "mirror-key"
}
}
end
assert_response :redirect
assert_redirected_to my_settings_data_path
end
end

View file

@ -0,0 +1,187 @@
require "test_helper"
require "webmock/minitest"
class HeartbeatImportSourceSyncJobTest < ActiveJob::TestCase
setup do
Flipper.enable(:wakatime_imports_mirrors)
end
teardown do
Flipper.disable(:wakatime_imports_mirrors)
end
def create_source(user:, **attrs)
user.create_heartbeat_import_source!(
{
provider: :wakatime_compatible,
endpoint_url: "https://wakatime.com/api/v1",
encrypted_api_key: "import-key",
sync_enabled: true,
status: :idle
}.merge(attrs)
)
end
def queued_jobs_for(job_class)
GoodJob::Job.where(job_class: job_class)
end
test "full-history default schedules backfill windows and re-enqueues coordinator" do
GoodJob::Job.delete_all
user = User.create!(timezone: "UTC")
source = create_source(user: user)
stub_request(:get, "https://wakatime.com/api/v1/users/current/all_time_since_today")
.to_return(
status: 200,
body: {
data: {
range: {
start_date: (Date.current - 10.days).iso8601
}
}
}.to_json,
headers: { "Content-Type" => "application/json" }
)
HeartbeatImportSourceSyncJob.perform_now(source.id)
day_jobs = queued_jobs_for("HeartbeatImportSourceSyncDayJob")
sync_jobs = queued_jobs_for("HeartbeatImportSourceSyncJob")
assert_equal 5, day_jobs.count
assert_equal 1, sync_jobs.count
source.reload
assert source.backfilling?
assert_equal(Date.current - 5.days, source.backfill_cursor_date)
end
test "range override limits scheduled days" do
GoodJob::Job.delete_all
user = User.create!(timezone: "UTC")
start_date = Date.current - 2.days
end_date = Date.current - 1.day
source = create_source(
user: user,
initial_backfill_start_date: start_date,
initial_backfill_end_date: end_date
)
HeartbeatImportSourceSyncJob.perform_now(source.id)
day_jobs = queued_jobs_for("HeartbeatImportSourceSyncDayJob")
day_args = day_jobs.map { |job| job.serialized_params.fetch("arguments").last }
assert_equal 2, day_jobs.count
assert_includes day_args, start_date.iso8601
assert_includes day_args, end_date.iso8601
end
test "ongoing sync enqueues today and yesterday" do
GoodJob::Job.delete_all
user = User.create!(timezone: "UTC")
source = create_source(
user: user,
status: :syncing,
initial_backfill_start_date: Date.current - 7.days,
initial_backfill_end_date: Date.current,
backfill_cursor_date: nil,
last_synced_at: Time.current
)
HeartbeatImportSourceSyncJob.perform_now(source.id)
day_jobs = queued_jobs_for("HeartbeatImportSourceSyncDayJob")
scheduled_dates = day_jobs.map { |job| Date.iso8601(job.serialized_params.fetch("arguments").last) }
assert_equal 2, day_jobs.count
assert_includes scheduled_dates, Date.current
assert_includes scheduled_dates, Date.yesterday
end
test "day job imports and dedupes by fields_hash" do
user = User.create!(timezone: "UTC")
source = create_source(user: user, status: :syncing)
timestamp = Time.current.to_f
payload = [
{
entity: "src/a.rb",
type: "file",
category: "coding",
project: "alpha",
language: "Ruby",
editor: "VS Code",
time: timestamp
},
{
entity: "src/a.rb",
type: "file",
category: "coding",
project: "alpha",
language: "Ruby",
editor: "VS Code",
time: timestamp
}
]
stub_request(:get, "https://wakatime.com/api/v1/users/current/heartbeats")
.with(query: { "date" => Date.current.iso8601 })
.to_return(
status: 200,
body: { data: payload }.to_json,
headers: { "Content-Type" => "application/json" }
)
HeartbeatImportSourceSyncDayJob.perform_now(source.id, Date.current.iso8601)
assert_equal 1, user.heartbeats.where(source_type: :wakapi_import).count
assert source.reload.last_synced_at.present?
end
test "day job pauses source on auth errors" do
user = User.create!(timezone: "UTC")
source = create_source(user: user, status: :syncing)
stub_request(:get, "https://wakatime.com/api/v1/users/current/heartbeats")
.with(query: { "date" => Date.current.iso8601 })
.to_return(status: 401, body: "{}")
HeartbeatImportSourceSyncDayJob.perform_now(source.id, Date.current.iso8601)
source.reload
assert source.paused?
assert_not source.sync_enabled
assert_includes source.last_error_message, "Authentication failed"
end
test "day job marks transient errors for retry" do
user = User.create!(timezone: "UTC")
source = create_source(user: user, status: :syncing)
stub_request(:get, "https://wakatime.com/api/v1/users/current/heartbeats")
.with(query: { "date" => Date.current.iso8601 })
.to_return(status: 500, body: "{}")
assert_raises(WakatimeCompatibleClient::TransientError) do
HeartbeatImportSourceSyncDayJob.new.perform(source.id, Date.current.iso8601)
end
source.reload
assert source.failed?
assert_equal 1, source.consecutive_failures
end
test "coordinator does nothing when imports and mirrors are disabled" do
GoodJob::Job.delete_all
user = User.create!(timezone: "UTC")
source = create_source(user: user)
Flipper.disable(:wakatime_imports_mirrors)
HeartbeatImportSourceSyncJob.perform_now(source.id)
assert_equal 0, queued_jobs_for("HeartbeatImportSourceSyncDayJob").count
assert_equal "idle", source.reload.status
end
end

View file

@ -0,0 +1,48 @@
require "test_helper"
class MirrorFanoutEnqueueJobTest < ActiveJob::TestCase
setup do
Flipper.enable(:wakatime_imports_mirrors)
end
teardown do
Flipper.disable(:wakatime_imports_mirrors)
end
test "debounce prevents enqueue storms per user" do
original_cache = Rails.cache
Rails.cache = ActiveSupport::Cache::MemoryStore.new
GoodJob::Job.delete_all
user = User.create!(timezone: "UTC")
user.wakatime_mirrors.create!(
endpoint_url: "https://wakatime.com/api/v1",
encrypted_api_key: "mirror-key-1"
)
user.wakatime_mirrors.create!(
endpoint_url: "https://wakapi.dev/api/compat/wakatime/v1",
encrypted_api_key: "mirror-key-2"
)
assert_difference -> { GoodJob::Job.where(job_class: "WakatimeMirrorSyncJob").count }, 2 do
MirrorFanoutEnqueueJob.perform_now(user.id)
MirrorFanoutEnqueueJob.perform_now(user.id)
end
ensure
Rails.cache = original_cache
end
test "does not enqueue mirror sync when imports and mirrors are disabled" do
GoodJob::Job.delete_all
user = User.create!(timezone: "UTC")
user.wakatime_mirrors.create!(
endpoint_url: "https://wakatime.com/api/v1",
encrypted_api_key: "mirror-key"
)
Flipper.disable(:wakatime_imports_mirrors)
assert_no_difference -> { GoodJob::Job.where(job_class: "WakatimeMirrorSyncJob").count } do
MirrorFanoutEnqueueJob.perform_now(user.id)
end
end
end

View file

@ -0,0 +1,344 @@
require "test_helper"
require "webmock/minitest"
require "socket"
require "net/http"
require "timeout"
class WakatimeMirrorSyncJobTest < ActiveJob::TestCase
setup do
Flipper.enable(:wakatime_imports_mirrors)
end
teardown do
Flipper.disable(:wakatime_imports_mirrors)
end
class MockWakatimeServer
attr_reader :base_url, :port
def initialize
@requests = Queue.new
@server = TCPServer.new("0.0.0.0", 0)
@stopped = false
@clients = []
@mutex = Mutex.new
@port = @server.addr[1]
@base_url = "http://127.0.0.2:#{@port}/api/v1"
end
def start
@thread = Thread.new do
loop do
break if @stopped
socket = @server.accept
@mutex.synchronize { @clients << socket }
handle_client(socket)
rescue IOError, Errno::EBADF
break
end
end
wait_until_ready!
end
def stop
@stopped = true
@server.close unless @server.closed?
@mutex.synchronize do
@clients.each { |client| client.close unless client.closed? }
@clients.clear
end
@thread&.join(2)
end
def pop_requests
requests = []
loop do
requests << @requests.pop(true)
end
rescue ThreadError
requests
end
private
def handle_client(socket)
request_line = socket.gets
return if request_line.nil?
_method, path, = request_line.split(" ")
headers = {}
while (line = socket.gets)
break if line == "\r\n"
key, value = line.split(":", 2)
headers[key.to_s.strip.downcase] = value.to_s.strip
end
content_length = headers.fetch("content-length", "0").to_i
body = content_length.positive? ? socket.read(content_length).to_s : ""
if path == "/api/v1/users/current/heartbeats.bulk"
@requests << {
path: path,
body: body,
authorization: headers["authorization"]
}
respond(socket, 201, "{}")
elsif path == "/__health"
respond(socket, 200, "{}")
else
respond(socket, 404, "{}")
end
ensure
@mutex.synchronize { @clients.delete(socket) }
socket.close unless socket.closed?
end
def respond(socket, status, body)
phrase = status == 200 ? "OK" : status == 201 ? "Created" : "Not Found"
socket.write("HTTP/1.1 #{status} #{phrase}\r\n")
socket.write("Content-Type: application/json\r\n")
socket.write("Content-Length: #{body.bytesize}\r\n")
socket.write("Connection: close\r\n")
socket.write("\r\n")
socket.write(body)
end
def wait_until_ready!
Timeout.timeout(5) do
loop do
begin
response = Net::HTTP.get_response(URI("http://127.0.0.2:#{@port}/__health"))
return if response.is_a?(Net::HTTPSuccess)
rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH
end
sleep 0.05
end
end
end
end
def create_heartbeat(user:, source_type:, entity:, project: "mirror-project", at_time: Time.current)
user.heartbeats.create!(
entity: entity,
type: "file",
category: "coding",
time: at_time.to_f,
project: project,
source_type: source_type
)
end
test "sync sends only direct heartbeats in chunks of 25 and advances cursor" do
user = User.create!(timezone: "UTC")
mirror = user.wakatime_mirrors.create!(
endpoint_url: "https://wakatime.com/api/v1",
encrypted_api_key: "mirror-key"
)
direct_heartbeats = 30.times.map do |index|
create_heartbeat(
user: user,
source_type: :direct_entry,
entity: "src/direct_#{index}.rb",
project: "direct-project",
at_time: Time.current + index.seconds
)
end
5.times do |index|
create_heartbeat(
user: user,
source_type: :wakapi_import,
entity: "src/imported_#{index}.rb",
project: "import-project",
at_time: Time.current + (100 + index).seconds
)
end
payload_batches = []
stub_request(:post, "https://wakatime.com/api/v1/users/current/heartbeats.bulk")
.to_return do |request|
payload_batches << JSON.parse(request.body)
{ status: 201, body: "{}", headers: { "Content-Type" => "application/json" } }
end
with_development_env do
WakatimeMirrorSyncJob.perform_now(mirror.id)
end
assert_equal [ 25, 5 ], payload_batches.map(&:size)
assert_equal 30, payload_batches.flatten.size
assert payload_batches.flatten.all? { |row| row["project"] == "direct-project" }
assert_equal direct_heartbeats.last.id, mirror.reload.last_synced_heartbeat_id
end
test "sync respects last_synced_heartbeat_id cursor" do
user = User.create!(timezone: "UTC")
mirror = user.wakatime_mirrors.create!(
endpoint_url: "https://wakatime.com/api/v1",
encrypted_api_key: "mirror-key"
)
first = create_heartbeat(
user: user,
source_type: :direct_entry,
entity: "src/old.rb",
at_time: Time.current - 1.minute
)
create_heartbeat(
user: user,
source_type: :direct_entry,
entity: "src/new.rb",
at_time: Time.current
)
mirror.update!(last_synced_heartbeat_id: first.id)
payload_batches = []
stub_request(:post, "https://wakatime.com/api/v1/users/current/heartbeats.bulk")
.to_return do |request|
payload_batches << JSON.parse(request.body)
{ status: 201, body: "{}", headers: { "Content-Type" => "application/json" } }
end
with_development_env do
WakatimeMirrorSyncJob.perform_now(mirror.id)
end
assert_equal 1, payload_batches.flatten.size
assert_equal "src/new.rb", payload_batches.flatten.first["entity"]
end
test "auth failures disable mirror and stop syncing" do
user = User.create!(timezone: "UTC")
mirror = user.wakatime_mirrors.create!(
endpoint_url: "https://wakatime.com/api/v1",
encrypted_api_key: "mirror-key"
)
create_heartbeat(
user: user,
source_type: :direct_entry,
entity: "src/direct.rb"
)
stub_request(:post, "https://wakatime.com/api/v1/users/current/heartbeats.bulk")
.to_return(status: 401, body: "{}")
with_development_env do
WakatimeMirrorSyncJob.perform_now(mirror.id)
end
mirror.reload
assert_not mirror.enabled
assert_includes mirror.last_error_message, "Authentication failed"
assert mirror.last_error_at.present?
assert_equal 1, mirror.consecutive_failures
end
test "transient failures keep mirror enabled and raise for retry" do
user = User.create!(timezone: "UTC")
mirror = user.wakatime_mirrors.create!(
endpoint_url: "https://wakatime.com/api/v1",
encrypted_api_key: "mirror-key"
)
create_heartbeat(
user: user,
source_type: :direct_entry,
entity: "src/direct.rb"
)
stub_request(:post, "https://wakatime.com/api/v1/users/current/heartbeats.bulk")
.to_return(status: 500, body: "{}")
assert_raises(WakatimeMirrorSyncJob::MirrorTransientError) do
WakatimeMirrorSyncJob.new.perform(mirror.id)
end
mirror.reload
assert mirror.enabled
assert_equal 1, mirror.consecutive_failures
end
test "sync posts to a real wakatime-compatible mock server on a random port" do
WebMock.allow_net_connect!
server = MockWakatimeServer.new
server.start
user = User.create!(timezone: "UTC")
mirror = user.wakatime_mirrors.create!(
endpoint_url: "https://wakatime.com/api/v1",
encrypted_api_key: "mirror-key"
)
mirror.update_column(:endpoint_url, server.base_url)
create_heartbeat(
user: user,
source_type: :direct_entry,
entity: "src/direct_1.rb",
project: "direct-project"
)
create_heartbeat(
user: user,
source_type: :direct_entry,
entity: "src/direct_2.rb",
project: "direct-project"
)
create_heartbeat(
user: user,
source_type: :wakapi_import,
entity: "src/imported.rb",
project: "import-project"
)
with_development_env do
WakatimeMirrorSyncJob.perform_now(mirror.id)
end
requests = server.pop_requests
assert_equal 1, requests.length
assert_operator server.port, :>, 0
payload = JSON.parse(requests.first.fetch(:body))
assert_equal 2, payload.length
assert_equal [ "src/direct_1.rb", "src/direct_2.rb" ], payload.map { |row| row["entity"] }
assert_match(/\ABasic /, requests.first.fetch(:authorization).to_s)
ensure
server&.stop
WebMock.disable_net_connect!(allow_localhost: false)
end
test "does nothing when imports and mirrors are disabled" do
user = User.create!(timezone: "UTC")
mirror = user.wakatime_mirrors.create!(
endpoint_url: "https://wakatime.com/api/v1",
encrypted_api_key: "mirror-key"
)
create_heartbeat(
user: user,
source_type: :direct_entry,
entity: "src/direct.rb"
)
Flipper.disable(:wakatime_imports_mirrors)
stub_request(:post, "https://wakatime.com/api/v1/users/current/heartbeats.bulk")
.to_return(status: 201, body: "{}")
WakatimeMirrorSyncJob.perform_now(mirror.id)
assert_not_requested :post, "https://wakatime.com/api/v1/users/current/heartbeats.bulk"
end
private
def with_development_env
rails_singleton = class << Rails; self; end
rails_singleton.alias_method :__original_env_for_test, :env
rails_singleton.define_method(:env) { ActiveSupport::StringInquirer.new("development") }
yield
ensure
rails_singleton.remove_method :env
rails_singleton.alias_method :env, :__original_env_for_test
rails_singleton.remove_method :__original_env_for_test
end
end

View file

@ -0,0 +1,50 @@
require "test_helper"
class HeartbeatImportSourceTest < ActiveSupport::TestCase
test "validates one source per user" do
user = User.create!(timezone: "UTC")
HeartbeatImportSource.create!(
user: user,
provider: :wakatime_compatible,
endpoint_url: "https://wakatime.com/api/v1",
encrypted_api_key: "abc123"
)
duplicate = HeartbeatImportSource.new(
user: user,
provider: :wakatime_compatible,
endpoint_url: "https://wakapi.dev/api/compat/wakatime/v1",
encrypted_api_key: "xyz789"
)
assert_not duplicate.valid?
assert_includes duplicate.errors[:user_id], "has already been taken"
end
test "requires https endpoint outside development" do
source = HeartbeatImportSource.new(
user: User.create!(timezone: "UTC"),
provider: :wakatime_compatible,
endpoint_url: "http://example.com/api/v1",
encrypted_api_key: "abc123"
)
assert_not source.valid?
assert_includes source.errors[:endpoint_url], "must use https"
end
test "validates backfill date range order" do
source = HeartbeatImportSource.new(
user: User.create!(timezone: "UTC"),
provider: :wakatime_compatible,
endpoint_url: "https://example.com/api/v1",
encrypted_api_key: "abc123",
initial_backfill_start_date: Date.new(2026, 2, 10),
initial_backfill_end_date: Date.new(2026, 2, 1)
)
assert_not source.valid?
assert_includes source.errors[:initial_backfill_end_date], "must be on or after the start date"
end
end

View file

@ -0,0 +1,52 @@
require "test_helper"
class WakatimeMirrorTest < ActiveSupport::TestCase
def create_direct_heartbeat(user, at_time)
user.heartbeats.create!(
entity: "src/file.rb",
type: "file",
category: "coding",
time: at_time.to_f,
project: "mirror-test",
source_type: :direct_entry
)
end
test "initializes cursor at current heartbeat tip on create" do
user = User.create!(timezone: "UTC")
first = create_direct_heartbeat(user, Time.current - 5.minutes)
second = create_direct_heartbeat(user, Time.current - 1.minute)
mirror = user.wakatime_mirrors.create!(
endpoint_url: "https://wakatime.com/api/v1",
encrypted_api_key: "mirror-key"
)
assert_equal second.id, mirror.last_synced_heartbeat_id
assert_operator second.id, :>, first.id
end
test "rejects endpoints that point to hackatime host" do
user = User.create!(timezone: "UTC")
mirror = user.wakatime_mirrors.build(
endpoint_url: "https://hackatime.hackclub.com/api/v1",
encrypted_api_key: "mirror-key"
)
assert_not mirror.valid?
assert_includes mirror.errors[:endpoint_url], "cannot target this Hackatime host"
end
test "rejects app host equivalent endpoints" do
user = User.create!(timezone: "UTC")
mirror = user.wakatime_mirrors.build(
endpoint_url: "https://example.com/api/v1",
encrypted_api_key: "mirror-key"
)
mirror.request_host = "example.com"
assert_not mirror.valid?
assert_includes mirror.errors[:endpoint_url], "cannot target this Hackatime host"
end
end

View file

@ -16,44 +16,10 @@ class AdminSettingsTest < ApplicationSystemTestCase
assert_text "You are not authorized to access this page"
end
test "admin settings can add and delete mirror endpoint" do
test "admin settings page is available to admin users" do
@user.update!(admin_level: :admin)
visit my_settings_admin_path
assert_text "WakaTime Mirrors"
endpoint_url = "https://example-wakatime.invalid/api/v1"
fill_in "Endpoint URL", with: endpoint_url
fill_in "WakaTime API Key", with: "mirror-key-#{SecureRandom.hex(8)}"
assert_difference -> { @user.reload.wakatime_mirrors.count }, +1 do
click_on "Add mirror"
assert_text "WakaTime mirror added successfully"
end
visit my_settings_admin_path
assert_text endpoint_url
click_on "Delete"
within_modal do
click_on "Delete mirror"
end
assert_text "WakaTime mirror removed successfully"
assert_equal 0, @user.reload.wakatime_mirrors.count
end
test "admin settings rejects hackatime mirror endpoint" do
@user.update!(admin_level: :admin)
visit my_settings_admin_path
fill_in "Endpoint URL", with: "https://hackatime.hackclub.com/api/v1"
fill_in "WakaTime API Key", with: "mirror-key-#{SecureRandom.hex(8)}"
click_on "Add mirror"
assert_text "cannot be hackatime.hackclub.com"
assert_equal 0, @user.reload.wakatime_mirrors.count
assert_text "Mirror and import controls are available under Data settings for all users."
end
end

View file

@ -5,10 +5,15 @@ class DataSettingsTest < ApplicationSystemTestCase
include SettingsSystemTestHelpers
setup do
Flipper.enable(:wakatime_imports_mirrors)
@user = User.create!(timezone: "UTC")
sign_in_as(@user)
end
teardown do
Flipper.disable(:wakatime_imports_mirrors)
end
test "data settings page renders key sections" do
assert_settings_page(
path: my_settings_data_path,
@ -41,4 +46,63 @@ class DataSettingsTest < ApplicationSystemTestCase
assert_text "Account Scheduled for Deletion"
assert_text "I changed my mind"
end
test "regular user can add and delete mirror endpoint from data settings" do
visit my_settings_data_path
endpoint_url = "https://example-wakatime.invalid/api/v1"
fill_in "mirror_endpoint_url", with: endpoint_url
fill_in "mirror_key", with: "mirror-key-#{SecureRandom.hex(8)}"
assert_difference -> { @user.reload.wakatime_mirrors.count }, +1 do
click_on "Add mirror"
assert_text "WakaTime mirror added successfully"
end
assert_text endpoint_url
assert_difference -> { @user.reload.wakatime_mirrors.count }, -1 do
accept_confirm do
click_on "Delete mirror"
end
assert_text "WakaTime mirror removed successfully"
end
end
test "data settings rejects hackatime mirror endpoint" do
visit my_settings_data_path
fill_in "mirror_endpoint_url", with: "https://hackatime.hackclub.com/api/v1"
fill_in "mirror_key", with: "mirror-key-#{SecureRandom.hex(8)}"
click_on "Add mirror"
assert_text "cannot target this Hackatime host"
assert_equal 0, @user.reload.wakatime_mirrors.count
end
test "data settings can configure import source and show status panel" do
visit my_settings_data_path
fill_in "import_endpoint_url", with: "https://wakatime.com/api/v1"
fill_in "import_api_key", with: "import-key-#{SecureRandom.hex(8)}"
assert_difference -> { HeartbeatImportSource.count }, +1 do
click_on "Create source"
assert_text "Import source configured successfully."
end
assert_text "Status:"
assert_text "Imported:"
assert_button "Sync now"
end
test "imports and mirrors section is hidden when feature is disabled" do
Flipper.disable(:wakatime_imports_mirrors)
visit my_settings_data_path
assert_no_text "Imports & Mirrors"
assert_no_field "mirror_endpoint_url"
assert_no_field "import_endpoint_url"
end
end