Skip to content

fix: refill release concurrency token bucket queue when runs resume before checkpoints are created #1933

New issue

Have a question about this project? No Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “No Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? No Sign in to your account

Merged
merged 3 commits into from
Apr 16, 2025

Conversation

ericallam
Copy link
Member

@ericallam ericallam commented Apr 16, 2025

This PR fixes an issue where the release concurrency queue could get permanently stuck because runs that were in EXECUTING_WITH_WAITPOINT snapshot status would get resumed before the checkpoint was created. This would cause a token of the release concurrency bucket to be consumed (when the run was first blocked), but the token would never get returned, to the bucket, because the checkpoint would never get created. This now adds token returning when a run goes from EXECUTING_WITH_WAITPOINT -> EXECUTING, and EXECUTING_WITH_WAITPOINT -> QUEUED_EXECUTING -> EXECUTING, but only if the snapshot is not in the release concurrency queue.

Summary by CodeRabbit

  • New Features
    • Added the ability to refill concurrency tokens only if not already queued, improving concurrency management.
    • Introduced new methods for retrieving concurrency queue metrics and conditional token refilling.
    • Integrated concurrency token refilling into run continuation and dequeue processes to ensure accurate concurrency handling.
  • Bug Fixes
    • Enhanced handling of concurrency tokens during run state transitions, especially after waitpoints.
  • Tests
    • Added comprehensive tests for concurrency token refilling and queue behavior in complex scenarios.

Copy link

changeset-bot bot commented Apr 16, 2025

⚠️ No Changeset found

Latest commit: 2bbdaee

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Contributor

coderabbitai bot commented Apr 16, 2025

Walkthrough

This change introduces enhancements to the concurrency control mechanisms in the run engine. It adds new methods to the ReleaseConcurrencyTokenBucketQueue for refilling tokens conditionally and retrieving queue metrics, and integrates these capabilities into the DequeueSystem, ReleaseConcurrencySystem, and WaitpointSystem. The systems now ensure that concurrency tokens are properly refilled when runs transition out of waitpoints or are dequeued. Additional tests validate the new logic.

Changes

File(s) Change Summary
internal-packages/run-engine/package.json Removed trailing newline character; no functional or script modifications.
internal-packages/run-engine/src/engine/index.ts Passed releaseConcurrencySystem as an option to the DequeueSystem during initialization.
internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts Added getReleaseQueueMetrics and refillTokenIfNotInQueue methods to the class. Implemented a new Redis Lua command for conditional token refilling. Extended Redis command interface accordingly.
internal-packages/run-engine/src/engine/systems/dequeueSystem.ts Integrated ReleaseConcurrencySystem into the DequeueSystem. Updated the dequeue logic to call refillTokensForSnapshot when continuing queued executions.
internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts Added overloaded refillTokensForSnapshot method to handle both snapshot IDs and objects. Enhanced debug logging for concurrency release actions.
internal-packages/run-engine/src/engine/systems/waitpointSystem.ts Added call to releaseConcurrencySystem.refillTokensForSnapshot(snapshot) inside continueRunIfUnblocked to refill concurrency tokens after reacquiring concurrency.
internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts Added two new tests: one verifying token bucket refilling after waitpoints and another covering concurrency reacquisition failure scenarios with correct token bucket behavior.
internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts Added tests for the new refillTokenIfNotInQueue method, covering successful refills, prevention of duplicate refills, handling of multiple queues, and enforcement of maximum token limits.

Sequence Diagram(s)

sequenceDiagram
    participant WaitpointSystem
    participant ReleaseConcurrencySystem
    participant ReleaseConcurrencyTokenBucketQueue
    participant DequeueSystem
    participant Worker

    WaitpointSystem->>ReleaseConcurrencySystem: refillTokensForSnapshot(snapshot)
    ReleaseConcurrencySystem->>ReleaseConcurrencyTokenBucketQueue: refillTokenIfNotInQueue(...)
    ReleaseConcurrencyTokenBucketQueue-->>ReleaseConcurrencySystem: success/failure

    DequeueSystem->>ReleaseConcurrencySystem: refillTokensForSnapshot(previousSnapshotId)
    ReleaseConcurrencySystem->>ReleaseConcurrencyTokenBucketQueue: refillTokenIfNotInQueue(...)
    ReleaseConcurrencyTokenBucketQueue-->>ReleaseConcurrencySystem: success/failure

    DequeueSystem->>Worker: notify (after token refill)
Loading

Possibly related PRs

  • triggerdotdev/trigger.dev#1883: Modifies the executor's return type and behavior in ReleaseConcurrencyTokenBucketQueue and updates ReleaseConcurrencySystem to return booleans indicating success, directly affecting the same concurrency management logic.
  • triggerdotdev/trigger.dev#1804: Implements the new release concurrency system and introduces the QUEUED_EXECUTING execution status, which is extended and refined by the current changes.

Suggested reviewers

  • matt-aitken
  • nicktrn

Poem

In the warren where tokens hop and play,
We refill the bucket, keep queues at bay.
Waitpoints unblocked, concurrency anew,
The engine runs smoother, as rabbits pursue.
With every test passing, the system feels bright—
Hop, refill, and onward, through the code's gentle night!
🐇✨

Tip

⚡💬 Agentic Chat (Pro Plan, General Availability)
  • We're introducing multi-step agentic chat in review comments and issue comments, within and outside of PR's. This feature enhances review and issue discussions with the CodeRabbit agentic chat by enabling advanced interactions, including the ability to create pull requests directly from comments and add commits to existing pull requests.

📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 97838df and 2bbdaee.

📒 Files selected for processing (1)
  • internal-packages/run-engine/src/engine/systems/waitpointSystem.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: units / 🧪 Unit Tests

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (6)
internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts (4)

716-741: Consider more robust waiting to improve test reliability.
The setTimeout(100) call may cause flakiness on slower environments. Using event-based determinism or longer timeouts might reduce intermittent failures.


745-770: Revisit short timeouts to avoid flakiness.
Similar to the previous test, relying on setTimeout(100) could introduce sporadic test failures in resource-constrained CI environments.


773-805: Validate queued logic without fixed delays if feasible.
Using a fixed delay to confirm queue processing can be brittle. An alternative approach is polling for state changes or mocking the queue’s executor.


807-824: Clarify the return value when max tokens are reached.
Currently, refillTokenIfNotInQueue returns true even when tokens don’t actually increase because the max threshold was reached. Consider returning false or a separate status to signal no net refill occurred.

internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (1)

72-116: Refill tokens logic is well-organized.
The overload approach and snapshot checks look good. You might consider returning a status (e.g., success/failure) instead of silently returning when the snapshot is missing or in an invalid state, so callers can handle it intentionally.

internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts (1)

1091-1235: Comprehensive waitpoint refill test.
This test thoroughly checks that the token bucket is refilled after waitpoints complete and the run resumes. Consider verifying concurrency changes using a more event-based or shorter polling approach to reduce reliance on a fixed 1s timeout.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1f6a283 and 0ccffed.

📒 Files selected for processing (8)
  • internal-packages/run-engine/package.json (1 hunks)
  • internal-packages/run-engine/src/engine/index.ts (1 hunks)
  • internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (3 hunks)
  • internal-packages/run-engine/src/engine/systems/dequeueSystem.ts (2 hunks)
  • internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (2 hunks)
  • internal-packages/run-engine/src/engine/systems/waitpointSystem.ts (2 hunks)
  • internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts (2 hunks)
  • internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts (1)
internal-packages/testcontainers/src/index.ts (1)
  • redisTest (128-128)
internal-packages/run-engine/src/engine/systems/dequeueSystem.ts (1)
internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (1)
  • ReleaseConcurrencySystem (26-280)
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: typecheck / typecheck
🔇 Additional comments (15)
internal-packages/run-engine/src/engine/index.ts (1)

308-308: Add releaseConcurrencySystem to DequeueSystem

This change correctly integrates the ReleaseConcurrencySystem with the DequeueSystem, enabling concurrency token refill logic when dequeuing runs. This is crucial for the fix as it allows the DequeueSystem to refill concurrency tokens when runs transition states.

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts (2)

513-516: Improved observability with debug logging

Adding this debug log statement enhances observability by tracking run state during the critical continueRunIfUnblocked operation, making it easier to diagnose concurrency-related issues.


551-551: Ensure concurrency tokens are refilled when resuming runs

This is the key fix for the issue. When a run continues after being blocked by a waitpoint, this line ensures that concurrency tokens are properly refilled before sending the notification to the worker. This prevents concurrency token leakage when runs resume before checkpoints are created.

internal-packages/run-engine/package.json (1)

39-40: Enhanced test scripts for improved development workflow

The updated test script now includes the --run flag for immediate execution, and the new test:dev script facilitates continuous testing during development. These changes support the testing of the concurrency token refill functionality introduced in this PR.

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts (3)

14-14: Add import for ReleaseConcurrencySystem

Adding this import enables the DequeueSystem to utilize the ReleaseConcurrencySystem for token refill operations.


21-21: Add ReleaseConcurrencySystem dependency to DequeueSystem

These changes properly integrate the ReleaseConcurrencySystem as a dependency of the DequeueSystem by:

  1. Adding it to the DequeueSystemOptions interface
  2. Adding a private member to store the reference
  3. Assigning it in the constructor

This enables the DequeueSystem to interact with concurrency token management.

Also applies to: 28-28, 34-34


165-169: Refill concurrency tokens during execution state transitions

This is a critical part of the fix. When a run transitions from QUEUED_EXECUTING to EXECUTING, this code ensures that concurrency tokens are refilled for the previous snapshot if it exists. This prevents concurrency token leakage when runs resume before checkpoints are created.

The code correctly checks for the existence of previousSnapshotId before attempting to refill tokens, ensuring robustness.

internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (2)

135-138: Helpful debug statement.
This new log clarifies when concurrency is skipped.


144-147: Good logging for development environments.
Logging the snapshot ID offers valuable insight during debugging. No concerns here.

internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts (2)

6-6: Import addition looks fine.
Adding EventBusEventArgs helps type the notified event properly.


1237-1505: Excellent multi-run concurrency scenario.
This adds meaningful coverage for partial concurrency reacquisition and queued re-execution. Implementation appears robust.

internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (4)

262-270: Clean implementation of queue metrics retrieval.

This method provides a useful way to retrieve the current token count and queue length for a release queue, which is valuable for monitoring and diagnostics. The implementation correctly handles potential null values from Redis.


272-313: Well-designed conditional token refill mechanism.

This method implements the core functionality needed to fix the issue described in the PR title. It properly checks if the releaser is not in the queue before refilling a token, handling edge cases like zero max tokens, and providing appropriate logging. The boolean return value makes it easy for callers to determine if the refill was successful.


840-883: Solid implementation of the Lua script for conditional token refilling.

The Lua script correctly:

  1. Checks if the releaser ID is in the queue
  2. Only refills a token if the ID is not found
  3. Enforces the maximum token limit
  4. Cleans up metadata
  5. Updates the master queue based on queue length

The implementation is thorough and handles all necessary edge cases.


941-950: Appropriate extension of the Redis command interface.

The interface extension correctly defines the parameters and return type for the new Redis command, following the same pattern as the other commands in the interface.

@ericallam ericallam merged commit e501113 into main Apr 16, 2025
12 checks passed
@ericallam ericallam deleted the ea-branch-46 branch April 16, 2025 19:19
No Sign up for free to join this conversation on GitHub. Already have an account? No Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants