fix(coding-agent): simplify activity sync processing

This commit is contained in:
Vegard Stikbakke
2026-06-09 10:03:50 +02:00
Unverified
parent ff6757d9e8
commit 8f787bb228
4 changed files with 132 additions and 183 deletions
@@ -28,6 +28,8 @@ export interface ActivitySyncPayload {
compressedBytes: number;
}
type EncodedActivitySyncPayload = Omit<ActivitySyncPayload, "watermark" | "contentEncoding">;
function parseIsoTime(value: string): number {
const time = new Date(value).getTime();
if (Number.isNaN(time)) throw new Error(`Invalid session analytics timestamp: ${value}`);
@@ -96,60 +98,44 @@ function getRecordGroups(records: SessionAnalyticsRecord[]): SessionAnalyticsRec
}
function getPayloadWatermark(
records: SessionAnalyticsRecord[],
isOnlyPayload: boolean,
payload: EncodedActivitySyncPayload,
isFinalPayload: boolean,
scanCutoff: string,
serverWatermark: string | null,
): string {
if (isOnlyPayload || isFinalPayload) return scanCutoff;
const maxTimestamp = getSessionAnalyticsRecordTimestamp(records[records.length - 1]);
if (serverWatermark && parseIsoTime(maxTimestamp) <= parseIsoTime(serverWatermark)) return serverWatermark;
return maxTimestamp;
if (isFinalPayload) return scanCutoff;
if (serverWatermark && parseIsoTime(payload.lastRecordTimestamp) <= parseIsoTime(serverWatermark))
return serverWatermark;
return payload.lastRecordTimestamp;
}
async function createPayload(
async function encodeActivitySyncPayload(
records: SessionAnalyticsRecord[],
watermark: string,
maxCompressedBytes: number,
maxDecompressedBytes: number,
compress: ((input: Buffer) => Promise<Buffer>) | undefined,
): Promise<ActivitySyncPayload> {
): Promise<EncodedActivitySyncPayload | undefined> {
const ndjson = serializeSessionAnalyticsNdjson(records);
if (ndjson.byteLength > maxDecompressedBytes) {
throw new Error(
`Session analytics payload exceeds decompressed size limit (${ndjson.byteLength} > ${maxDecompressedBytes} bytes)`,
);
}
if (ndjson.byteLength > maxDecompressedBytes) return undefined;
const body = await compressActivitySyncNdjson(ndjson, compress);
if (body.byteLength > maxCompressedBytes) {
throw new Error(
`Session analytics payload exceeds compressed size limit (${body.byteLength} > ${maxCompressedBytes} bytes)`,
);
}
if (body.byteLength > maxCompressedBytes) return undefined;
return {
records,
recordCount: records.length,
firstRecordTimestamp: getSessionAnalyticsRecordTimestamp(records[0]),
lastRecordTimestamp: getSessionAnalyticsRecordTimestamp(records[records.length - 1]),
watermark,
contentEncoding: ACTIVITY_SYNC_CONTENT_ENCODING,
body,
decompressedBytes: ndjson.byteLength,
compressedBytes: body.byteLength,
};
}
async function payloadFits(
records: SessionAnalyticsRecord[],
maxCompressedBytes: number,
maxDecompressedBytes: number,
compress: ((input: Buffer) => Promise<Buffer>) | undefined,
): Promise<boolean> {
const ndjson = serializeSessionAnalyticsNdjson(records);
if (ndjson.byteLength > maxDecompressedBytes) return false;
const body = await compressActivitySyncNdjson(ndjson, compress);
return body.byteLength <= maxCompressedBytes;
function withPayloadWatermark(payload: EncodedActivitySyncPayload, watermark: string): ActivitySyncPayload {
return {
...payload,
watermark,
contentEncoding: ACTIVITY_SYNC_CONTENT_ENCODING,
};
}
export async function buildActivitySyncPayloads(
@@ -159,53 +145,43 @@ export async function buildActivitySyncPayloads(
const maxCompressedBytes = options.maxCompressedBytes ?? ACTIVITY_SYNC_MAX_COMPRESSED_BYTES;
const maxDecompressedBytes = options.maxDecompressedBytes ?? ACTIVITY_SYNC_MAX_DECOMPRESSED_BYTES;
const sortedRecords = sortSessionAnalyticsRecords(options.records);
const compress = options.compress;
if (await payloadFits(sortedRecords, maxCompressedBytes, maxDecompressedBytes, options.compress)) {
return [
await createPayload(
sortedRecords,
options.scanCutoff,
maxCompressedBytes,
maxDecompressedBytes,
options.compress,
),
];
}
const singlePayload = await encodeActivitySyncPayload(
sortedRecords,
maxCompressedBytes,
maxDecompressedBytes,
compress,
);
if (singlePayload) return [withPayloadWatermark(singlePayload, options.scanCutoff)];
const batches: SessionAnalyticsRecord[][] = [];
let current: SessionAnalyticsRecord[] = [];
const batches: EncodedActivitySyncPayload[] = [];
let current: EncodedActivitySyncPayload | undefined;
for (const group of getRecordGroups(sortedRecords)) {
const candidate = [...current, ...group];
if (
current.length > 0 &&
!(await payloadFits(candidate, maxCompressedBytes, maxDecompressedBytes, options.compress))
) {
batches.push(current);
current = [];
}
const next = [...current, ...group];
if (!(await payloadFits(next, maxCompressedBytes, maxDecompressedBytes, options.compress))) {
throw new Error("Session analytics records with the same timestamp exceed the upload size limit");
const candidateRecords = current ? [...current.records, ...group] : group;
const candidate = await encodeActivitySyncPayload(
candidateRecords,
maxCompressedBytes,
maxDecompressedBytes,
compress,
);
if (candidate) {
current = candidate;
continue;
}
if (!current) throw new Error("Session analytics records with the same timestamp exceed the upload size limit");
batches.push(current);
const next = await encodeActivitySyncPayload(group, maxCompressedBytes, maxDecompressedBytes, compress);
if (!next) throw new Error("Session analytics records with the same timestamp exceed the upload size limit");
current = next;
}
if (current.length > 0) batches.push(current);
if (current) batches.push(current);
return Promise.all(
batches.map((batch, index) =>
createPayload(
batch,
getPayloadWatermark(
batch,
batches.length === 1,
index === batches.length - 1,
options.scanCutoff,
options.serverWatermark,
),
maxCompressedBytes,
maxDecompressedBytes,
options.compress,
),
return batches.map((batch, index) =>
withPayloadWatermark(
batch,
getPayloadWatermark(batch, index === batches.length - 1, options.scanCutoff, options.serverWatermark),
),
);
}
@@ -1,8 +1,6 @@
import { createReadStream } from "node:fs";
import { createInterface } from "node:readline";
import type { FileEntry, SessionEntry, SessionHeader } from "../session-manager.ts";
import { projectSessionForAnalytics, type SessionAnalyticsRecord } from "./session-analytics.ts";
import { discoverSessions, type SessionDiscoveryProgressCallback } from "./session-discovery.ts";
import { readSessionFile } from "./session-file.ts";
export interface BuildSessionAnalyticsUploadOptions {
/** Server watermark from GET /analytics/activity/:deviceId. */
@@ -21,37 +19,6 @@ export interface BuildSessionAnalyticsUploadResult {
malformedFiles: number;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null;
}
function isSessionHeader(value: unknown): value is SessionHeader {
if (!isRecord(value)) return false;
return (
value.type === "session" &&
typeof value.id === "string" &&
value.id.length > 0 &&
typeof value.timestamp === "string" &&
value.timestamp.length > 0 &&
typeof value.cwd === "string" &&
(value.version === undefined || typeof value.version === "number") &&
(value.parentSession === undefined || typeof value.parentSession === "string")
);
}
function isSessionEntry(value: unknown): value is SessionEntry {
if (!isRecord(value)) return false;
return (
typeof value.type === "string" &&
value.type !== "session" &&
typeof value.id === "string" &&
value.id.length > 0 &&
(value.parentId === null || typeof value.parentId === "string") &&
typeof value.timestamp === "string" &&
value.timestamp.length > 0
);
}
function parseIsoTime(value: string): number | undefined {
const time = new Date(value).getTime();
return Number.isNaN(time) ? undefined : time;
@@ -69,39 +36,6 @@ function recordIsBeforeScanCutoff(record: SessionAnalyticsRecord, scanCutoffTime
return recordTime !== undefined && recordTime < scanCutoffTime;
}
async function readSessionFile(path: string): Promise<{ header: SessionHeader; entries: SessionEntry[] } | undefined> {
const stream = createReadStream(path, { encoding: "utf8" });
const lines = createInterface({ input: stream, crlfDelay: Infinity });
let header: SessionHeader | undefined;
const entries: SessionEntry[] = [];
try {
for await (const line of lines) {
if (!line.trim()) continue;
let parsed: unknown;
try {
parsed = JSON.parse(line) as FileEntry;
} catch {
return undefined;
}
if (!header) {
if (!isSessionHeader(parsed)) return undefined;
header = parsed;
continue;
}
if (!isSessionEntry(parsed)) return undefined;
entries.push(parsed);
}
} finally {
lines.close();
stream.destroy();
}
return header ? { header, entries } : undefined;
}
export async function buildSessionAnalyticsUpload(
options: BuildSessionAnalyticsUploadOptions,
): Promise<BuildSessionAnalyticsUploadResult> {
@@ -1,10 +1,9 @@
import type { Dirent } from "fs";
import { createReadStream } from "fs";
import { readdir, stat } from "fs/promises";
import { basename, dirname, join, relative, resolve } from "path";
import { createInterface } from "readline";
import { getSessionsDir } from "../../config.ts";
import type { SessionHeader } from "../session-manager.ts";
import { readSessionHeader } from "./session-file.ts";
export type SessionDiscoveryPhase = "scan" | "read";
@@ -40,28 +39,6 @@ export interface DiscoveredSession {
sizeBytes: number;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null;
}
function parseSessionHeader(value: unknown): SessionHeader | undefined {
if (!isRecord(value)) return undefined;
if (value.type !== "session") return undefined;
if (typeof value.id !== "string" || !value.id) return undefined;
if (typeof value.timestamp !== "string" || !value.timestamp) return undefined;
if (typeof value.cwd !== "string") return undefined;
if (value.version !== undefined && typeof value.version !== "number") return undefined;
if (value.parentSession !== undefined && typeof value.parentSession !== "string") return undefined;
return {
type: "session",
version: value.version,
id: value.id,
timestamp: value.timestamp,
cwd: value.cwd,
parentSession: value.parentSession,
};
}
function parseDate(value: string): Date | undefined {
const date = new Date(value);
return Number.isNaN(date.getTime()) ? undefined : date;
@@ -118,27 +95,6 @@ export async function discoverSessionFiles(options: DiscoverSessionFilesOptions
return files;
}
async function readSessionHeader(filePath: string): Promise<SessionHeader | undefined> {
const stream = createReadStream(filePath, { encoding: "utf8" });
const lines = createInterface({ input: stream, crlfDelay: Infinity });
try {
for await (const line of lines) {
if (!line.trim()) return undefined;
let parsed: unknown;
try {
parsed = JSON.parse(line);
} catch {
return undefined;
}
return parseSessionHeader(parsed);
}
return undefined;
} finally {
lines.close();
stream.destroy();
}
}
async function discoverSessionFromFile(sessionsRoot: string, filePath: string): Promise<DiscoveredSession | undefined> {
const [fileStats, header] = await Promise.all([stat(filePath), readSessionHeader(filePath)]);
if (!header) return undefined;
@@ -0,0 +1,83 @@
import { createReadStream } from "node:fs";
import { createInterface } from "node:readline";
import type { SessionEntry, SessionHeader } from "../session-manager.ts";
export interface ParsedSessionFile {
header: SessionHeader;
entries: SessionEntry[];
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function parseSessionHeader(value: unknown): SessionHeader | undefined {
if (!isRecord(value)) return undefined;
if (value.type !== "session") return undefined;
if (typeof value.id !== "string" || !value.id) return undefined;
if (typeof value.timestamp !== "string" || !value.timestamp) return undefined;
if (typeof value.cwd !== "string") return undefined;
if (value.version !== undefined && typeof value.version !== "number") return undefined;
if (value.parentSession !== undefined && typeof value.parentSession !== "string") return undefined;
return {
type: "session",
version: value.version,
id: value.id,
timestamp: value.timestamp,
cwd: value.cwd,
parentSession: value.parentSession,
};
}
function parseSessionEntry(value: unknown): SessionEntry | undefined {
if (!isRecord(value)) return undefined;
if (typeof value.type !== "string" || value.type === "session") return undefined;
if (typeof value.id !== "string" || !value.id) return undefined;
if (value.parentId !== null && typeof value.parentId !== "string") return undefined;
if (typeof value.timestamp !== "string" || !value.timestamp) return undefined;
// Projection handles entry-type-specific fields defensively; the file boundary only requires the common session entry shape.
return value as unknown as SessionEntry;
}
async function readSessionJsonl(path: string, includeEntries: boolean): Promise<ParsedSessionFile | undefined> {
const stream = createReadStream(path, { encoding: "utf8" });
const lines = createInterface({ input: stream, crlfDelay: Infinity });
let header: SessionHeader | undefined;
const entries: SessionEntry[] = [];
try {
for await (const line of lines) {
if (!line.trim()) continue;
let parsed: unknown;
try {
parsed = JSON.parse(line) as unknown;
} catch {
return undefined;
}
if (!header) {
header = parseSessionHeader(parsed);
if (!header) return undefined;
if (!includeEntries) return { header, entries };
continue;
}
const entry = parseSessionEntry(parsed);
if (!entry) return undefined;
entries.push(entry);
}
} finally {
lines.close();
stream.destroy();
}
return header ? { header, entries } : undefined;
}
export async function readSessionHeader(path: string): Promise<SessionHeader | undefined> {
return (await readSessionJsonl(path, false))?.header;
}
export async function readSessionFile(path: string): Promise<ParsedSessionFile | undefined> {
return readSessionJsonl(path, true);
}