Skip to content

re2: New release concurrency system #1804

New issue

Have a question about this project? No Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “No Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? No Sign in to your account

Merged
merged 38 commits into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
8b3551c
wip reserve concurrency system
ericallam Mar 8, 2025
16546f1
dequeue message
ericallam Mar 11, 2025
a5b2b39
ack
ericallam Mar 11, 2025
880538f
improve the dead letter queue stuff
ericallam Mar 11, 2025
555d991
fixed some key producer tests
ericallam Mar 11, 2025
1a233ef
the run engine now works with the new reserve concurrency system
ericallam Mar 12, 2025
e9fa4ce
Update delays to use a redis worker and work with the new reserve con…
ericallam Mar 12, 2025
2b6ce16
remove unused method
ericallam Mar 12, 2025
1362d82
Add batchId to the delayed enqueueRun call
ericallam Mar 12, 2025
d11d491
WIP release concurrency queue
ericallam Mar 13, 2025
c8b9972
Get all the release concurrency queue tests passing
ericallam Mar 13, 2025
bf41703
improve the consumer of the concurrency queue
ericallam Mar 13, 2025
b7ddf20
Correctly use the new release concurrency queue in the run engine
ericallam Mar 13, 2025
cce402d
If max tokens is 0, then don't do releasings
ericallam Mar 14, 2025
de9e296
Add configuration for the release concurrency queue
ericallam Mar 14, 2025
cf3b238
remove project and task current concurrency tracking
ericallam Mar 14, 2025
fd9b0bf
WIP new reacquire concurrency system
ericallam Mar 14, 2025
61c0834
Implement reserve concurrency clearing when the child run is acked
ericallam Mar 14, 2025
2048b72
go to QUEUE_EXECUTING state if reacquiring concurrency doesn't work
ericallam Mar 15, 2025
063651c
Upgrade vitest in the run-engine package
ericallam Mar 17, 2025
8c66ec3
Remove reserve concurrency system from run queue
ericallam Mar 17, 2025
7d11e82
Remove reserve concurrency system from run engine
ericallam Mar 17, 2025
1b61b95
WIP run engine systems
ericallam Mar 17, 2025
717cec8
move startRunAttempt to RunAttemptSystem
ericallam Mar 17, 2025
63b43ff
more system work
ericallam Mar 17, 2025
363f066
Delayed run system
ericallam Mar 17, 2025
a4581f1
ttl system
ericallam Mar 17, 2025
7866e95
waiting for worker system
ericallam Mar 17, 2025
67d74f0
More tests passing, fixed the heartbeat issue
ericallam Mar 17, 2025
42fb5d0
Fix more tests
ericallam Mar 17, 2025
29371e9
Implement checkpoint tests, handle dequeuing QUEUED_EXECUTING runs
ericallam Mar 18, 2025
e7c8f94
implement the QUEUED_EXECUTING dequeuing, and creating a checkpoint w…
ericallam Mar 18, 2025
b170a62
fixed the create checkpoint valid snapshot logic
ericallam Mar 18, 2025
f9c7e95
Added updated execution states chart and updated readme
ericallam Mar 18, 2025
7eaf81a
Use releaserId in case we don't end up using run IDs
ericallam Mar 18, 2025
28b3ed0
Implement release concurrency system
ericallam Mar 19, 2025
e5ea9cb
move the release concurrency queue into the release concurrency syste…
ericallam Mar 19, 2025
38e1887
Fixed failing redis worker tests
ericallam Mar 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .cursor/mcp.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
"url": "http://localhost:3333/sse"
}
}
}
}
6 changes: 3 additions & 3 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,15 @@
"type": "node-terminal",
"request": "launch",
"name": "Debug RunEngine tests",
"command": "pnpm run test --filter @internal/run-engine",
"cwd": "${workspaceFolder}",
"command": "pnpm run test ./src/engine/tests/releaseConcurrencyQueue.test.ts -t 'Should manage token bucket and queue correctly'",
"cwd": "${workspaceFolder}/internal-packages/run-engine",
"sourceMaps": true
},
{
"type": "node-terminal",
"request": "launch",
"name": "Debug RunQueue tests",
"command": "pnpm run test ./src/engine/tests/waitpoints.test.ts",
"command": "pnpm run test ./src/run-queue/index.test.ts",
"cwd": "${workspaceFolder}/internal-packages/run-engine",
"sourceMaps": true
}
Expand Down
7 changes: 7 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,13 @@ const EnvironmentSchema = z.object({
RUN_ENGINE_RATE_LIMIT_REJECTION_LOGS_ENABLED: z.string().default("1"),
RUN_ENGINE_RATE_LIMIT_LIMITER_LOGS_ENABLED: z.string().default("0"),

RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED: z.string().default("0"),
RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO: z.coerce.number().default(1),
RUN_ENGINE_RELEASE_CONCURRENCY_MAX_RETRIES: z.coerce.number().int().default(3),
RUN_ENGINE_RELEASE_CONCURRENCY_CONSUMERS_COUNT: z.coerce.number().int().default(1),
RUN_ENGINE_RELEASE_CONCURRENCY_POLL_INTERVAL: z.coerce.number().int().default(500),
RUN_ENGINE_RELEASE_CONCURRENCY_BATCH_SIZE: z.coerce.number().int().default(10),

/** How long should the presence ttl last */
DEV_PRESENCE_TTL_MS: z.coerce.number().int().default(30_000),
DEV_PRESENCE_POLL_INTERVAL_MS: z.coerce.number().int().default(5_000),
Expand Down
19 changes: 6 additions & 13 deletions apps/webapp/app/routes/admin.api.v1.environments.$environmentId.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { ActionFunctionArgs, json, LoaderFunctionArgs } from "@remix-run/server-
import { z } from "zod";
import { prisma } from "~/db.server";
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
import { marqs } from "~/v3/marqs/index.server";
import { engine } from "~/v3/runEngine.server";
import { updateEnvConcurrencyLimits } from "~/v3/runQueue.server";

const ParamsSchema = z.object({
Expand Down Expand Up @@ -113,20 +113,15 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
Object.fromEntries(requestUrl.searchParams.entries())
);

const concurrencyLimit = await marqs.getEnvConcurrencyLimit(environment);
const currentConcurrency = await marqs.currentConcurrencyOfEnvironment(environment);
const reserveConcurrency = await marqs.reserveConcurrencyOfEnvironment(environment);
const concurrencyLimit = await engine.runQueue.getEnvConcurrencyLimit(environment);
const currentConcurrency = await engine.runQueue.currentConcurrencyOfEnvironment(environment);

if (searchParams.queue) {
const queueConcurrencyLimit = await marqs.getQueueConcurrencyLimit(
const queueConcurrencyLimit = await engine.runQueue.getQueueConcurrencyLimit(
environment,
searchParams.queue
);
const queueCurrentConcurrency = await marqs.currentConcurrencyOfQueue(
environment,
searchParams.queue
);
const queueReserveConcurrency = await marqs.reserveConcurrencyOfQueue(
const queueCurrentConcurrency = await engine.runQueue.currentConcurrencyOfQueue(
environment,
searchParams.queue
);
Expand All @@ -135,12 +130,10 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
id: environment.id,
concurrencyLimit,
currentConcurrency,
reserveConcurrency,
queueConcurrencyLimit,
queueCurrentConcurrency,
queueReserveConcurrency,
});
}

return json({ id: environment.id, concurrencyLimit, currentConcurrency, reserveConcurrency });
return json({ id: environment.id, concurrencyLimit, currentConcurrency });
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,9 @@ const { action } = createActionApiRoute(
const waitResult = await engine.blockRunWithWaitpoint({
runId: run.id,
waitpoints: waitpoint.id,
environmentId: authentication.environment.id,
projectId: authentication.environment.project.id,
organizationId: authentication.environment.organization.id,
releaseConcurrency: {
releaseQueue: true,
},
releaseConcurrency: body.releaseConcurrency,
});

return json({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ const { action } = createActionApiRoute(
throw json({ error: "Waitpoint not found" }, { status: 404 });
}

// TODO: Add releaseConcurrency from the body
const result = await engine.blockRunWithWaitpoint({
runId,
waitpoints: [waitpointId],
environmentId: authentication.environment.id,
projectId: authentication.environment.project.id,
organizationId: authentication.environment.organization.id,
});
Expand Down
21 changes: 19 additions & 2 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { RunEngine } from "@internal/run-engine";
import { defaultMachine } from "@trigger.dev/platform/v3";
import { prisma } from "~/db.server";
import { env } from "~/env.server";
import { tracer } from "./tracer.server";
import { singleton } from "~/utils/singleton";
import { defaultMachine, machines } from "@trigger.dev/platform/v3";
import { allMachines } from "./machinePresets.server";
import { tracer } from "./tracer.server";

export const engine = singleton("RunEngine", createRunEngine);

Expand Down Expand Up @@ -73,6 +73,23 @@ function createRunEngine() {
EXECUTING: env.RUN_ENGINE_TIMEOUT_EXECUTING,
EXECUTING_WITH_WAITPOINTS: env.RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS,
},
releaseConcurrency: {
disabled: env.RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED === "0",
maxTokensRatio: env.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO,
maxRetries: env.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_RETRIES,
consumersCount: env.RUN_ENGINE_RELEASE_CONCURRENCY_CONSUMERS_COUNT,
pollInterval: env.RUN_ENGINE_RELEASE_CONCURRENCY_POLL_INTERVAL,
batchSize: env.RUN_ENGINE_RELEASE_CONCURRENCY_BATCH_SIZE,
redis: {
keyPrefix: "engine:",
port: env.RUN_ENGINE_RUN_QUEUE_REDIS_PORT ?? undefined,
host: env.RUN_ENGINE_RUN_QUEUE_REDIS_HOST ?? undefined,
username: env.RUN_ENGINE_RUN_QUEUE_REDIS_USERNAME ?? undefined,
password: env.RUN_ENGINE_RUN_QUEUE_REDIS_PASSWORD ?? undefined,
enableAutoPipelining: true,
...(env.RUN_ENGINE_RUN_QUEUE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
},
},
});

return engine;
Expand Down
24 changes: 20 additions & 4 deletions apps/webapp/app/v3/services/triggerTaskV2.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import {
packetRequiresOffloading,
QueueOptions,
SemanticInternalAttributes,
TaskRunError,
taskRunErrorEnhancer,
taskRunErrorToString,
TriggerTaskRequestBody,
} from "@trigger.dev/core/v3";
import {
Expand Down Expand Up @@ -164,10 +167,10 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
index: options.batchIndex ?? 0,
}
: undefined,
environmentId: environment.id,
projectId: environment.projectId,
organizationId: environment.organizationId,
tx: this._prisma,
releaseConcurrency: body.options?.releaseConcurrency,
});
}
);
Expand Down Expand Up @@ -271,7 +274,7 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
immediate: true,
},
async (event, traceContext, traceparent) => {
const run = await autoIncrementCounter.incrementInTransaction(
const result = await autoIncrementCounter.incrementInTransaction(
`v3-run:${environment.id}:${taskId}`,
async (num, tx) => {
const lockedToBackgroundWorker = body.options?.lockToVersion
Expand Down Expand Up @@ -370,11 +373,18 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
: undefined,
machine: body.options?.machine,
priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined,
releaseConcurrency: body.options?.releaseConcurrency,
},
this._prisma
);

return { run: taskRun, isCached: false };
const error = taskRun.error ? TaskRunError.parse(taskRun.error) : undefined;

if (error) {
event.failWithError(error);
}

return { run: taskRun, error, isCached: false };
},
async (_, tx) => {
const counter = await tx.taskRunNumberCounter.findFirst({
Expand All @@ -390,7 +400,13 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
this._prisma
);

return run;
if (result?.error) {
throw new ServiceValidationError(
taskRunErrorToString(taskRunErrorEnhancer(result.error))
);
}

return result;
}
);
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- AlterEnum
ALTER TYPE "TaskRunExecutionStatus"
ADD
VALUE 'QUEUED_EXECUTING';
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- DropIndex
DROP INDEX "SecretStore_key_idx";

-- AlterTable
ALTER TABLE "TaskRunExecutionSnapshot" ADD COLUMN "previousSnapshotId" TEXT;

-- CreateIndex
CREATE INDEX "SecretStore_key_idx" ON "SecretStore"("key" text_pattern_ops);
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- AlterTable
ALTER TABLE
"TaskQueue"
ADD
COLUMN "releaseConcurrencyOnWaitpoint" BOOLEAN NOT NULL DEFAULT false;
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
Warnings:

- Added the required column `organizationId` to the `TaskRunExecutionSnapshot` table without a default value. This is not possible if the table is not empty.
- Added the required column `projectId` to the `TaskRunExecutionSnapshot` table without a default value. This is not possible if the table is not empty.

*/
-- AlterTable
ALTER TABLE
"TaskRunExecutionSnapshot"
ADD
COLUMN "organizationId" TEXT NOT NULL,
ADD
COLUMN "projectId" TEXT NOT NULL;

-- AddForeignKey
ALTER TABLE
"TaskRunExecutionSnapshot"
ADD
CONSTRAINT "TaskRunExecutionSnapshot_projectId_fkey" FOREIGN KEY ("projectId") REFERENCES "Project"("id") ON DELETE RESTRICT ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE
"TaskRunExecutionSnapshot"
ADD
CONSTRAINT "TaskRunExecutionSnapshot_organizationId_fkey" FOREIGN KEY ("organizationId") REFERENCES "Organization"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- AlterTable
ALTER TABLE
"TaskRunExecutionSnapshot"
ADD
COLUMN "metadata" JSONB;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- AlterTable
ALTER TABLE
"TaskRun"
ADD
COLUMN "lockedQueueId" TEXT;
23 changes: 22 additions & 1 deletion internal-packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ model Organization {
organizationIntegrations OrganizationIntegration[]
workerGroups WorkerInstanceGroup[]
workerInstances WorkerInstance[]
executionSnapshots TaskRunExecutionSnapshot[]
}

model ExternalAccount {
Expand Down Expand Up @@ -504,6 +505,7 @@ model Project {
waitpoints Waitpoint[]
taskRunWaitpoints TaskRunWaitpoint[]
taskRunCheckpoints TaskRunCheckpoint[]
executionSnapshots TaskRunExecutionSnapshot[]
}

enum ProjectVersion {
Expand Down Expand Up @@ -1724,7 +1726,9 @@ model TaskRun {
projectId String

// The specific queue this run is in
queue String
queue String
// The queueId is set when the run is locked to a specific queue
lockedQueueId String?

/// The main queue that this run is part of
masterQueue String @default("main")
Expand Down Expand Up @@ -1965,6 +1969,9 @@ model TaskRunExecutionSnapshot {
isValid Boolean @default(true)
error String?

/// The previous snapshot ID
previousSnapshotId String?

/// Run
runId String
run TaskRun @relation(fields: [runId], references: [id])
Expand All @@ -1982,6 +1989,12 @@ model TaskRunExecutionSnapshot {
environment RuntimeEnvironment @relation(fields: [environmentId], references: [id])
environmentType RuntimeEnvironmentType

projectId String
project Project @relation(fields: [projectId], references: [id])

organizationId String
organization Organization @relation(fields: [organizationId], references: [id])

/// Waitpoints that have been completed for this execution
completedWaitpoints Waitpoint[] @relation("completedWaitpoints")

Expand All @@ -2003,6 +2016,9 @@ model TaskRunExecutionSnapshot {

lastHeartbeatAt DateTime?

/// Metadata used by various systems in the run engine
metadata Json?

/// Used to get the latest valid snapshot quickly
@@index([runId, isValid, createdAt(sort: Desc)])
}
Expand All @@ -2012,6 +2028,8 @@ enum TaskRunExecutionStatus {
RUN_CREATED
/// Run is in the RunQueue
QUEUED
/// Run is in the RunQueue, and is also executing. This happens when a run is continued cannot reacquire concurrency
QUEUED_EXECUTING
/// Run has been pulled from the queue, but isn't executing yet
PENDING_EXECUTING
/// Run is executing on a worker
Expand Down Expand Up @@ -2526,6 +2544,9 @@ model TaskQueue {

paused Boolean @default(false)

/// If true, when a run is paused and waiting for waitpoints to be completed, the run will release the concurrency capacity.
releaseConcurrencyOnWaitpoint Boolean @default(false)

createdAt DateTime @default(now())
updatedAt DateTime @updatedAt

Expand Down
Loading
Loading