Add timeout to standard pilot fetch#1255
Draft
peter941221 wants to merge 3 commits into
Draft
Conversation
brandur
reviewed
May 26, 2026
brandur
reviewed
May 26, 2026
Contributor
brandur
left a comment
There was a problem hiding this comment.
Thanks!
@bgentry Any strong opinions on how you want to handle this one? Another option is to just put the timeout in producer.go in dispatchWork:
func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultCh chan<- producerFetchResult) {
// This intentionally removes any deadlines or cancellation from the parent
// context because we don't want it to get cancelled if the producer is asked
// to shut down. In that situation, we want to finish fetching any jobs we are
// in the midst of fetching, work them, and then stop. Otherwise we'd have a
// risk of shutting down when we had already fetched jobs in the database,
// leaving those jobs stranded. We'd then potentially have to release them
// back to the queue.
ctx := context.WithoutCancel(workCtx)
// Maximum size of the `attempted_by` array on each job row. This maximum is
// rarely hit, but exists to protect against degenerate cases.
const maxAttemptedBy = 100
jobs, err := p.pilot.JobGetAvailable(ctx, p.exec, p.state, &riverdriver.JobGetAvailableParams{
ClientID: p.config.ClientID,
MaxAttemptedBy: maxAttemptedBy,
MaxToLock: count,
Now: p.Time.NowOrNil(),
Queue: p.config.Queue,
ProducerID: p.id.Load(),
Schema: p.config.Schema,
})
if err != nil {
fetchResultCh <- producerFetchResult{err: err}
return
}
fetchResultCh <- producerFetchResult{jobs: jobs}
}That might be better in the way that not every pilot needs to remember to bring its own context cancellations. That said, maybe in this case we might want a longer cancellation for the pro pilot so it'd make sense to break up the two.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Fix
StandardPilot.JobGetAvailableso a stalled fetch does not hang a producer indefinitely.Problem
producer.dispatchWorkintentionally strips cancellation from the work context before fetching jobs so an in-flight fetch is allowed to complete during shutdown:producer.go:744-766That is reasonable, but
StandardPilot.JobGetAvailableforwarded directly toexec.JobGetAvailablewith no timeout at all:rivershared/riverpilot/standard_pilot.go:18-22This meant a stalled driver call could block a standard-pilot producer forever. The pro pilot already applies per-attempt fetch timeouts, so the standard pilot was the outlier.
Change
Add a 10-second timeout inside
StandardPilot.JobGetAvailablebefore calling the driver.This keeps the existing shutdown semantics intact:
dispatchWorkThe timeout is local to the standard pilot so there is no driver SQL change and no producer state-machine change.
Testing
rivershared/riverpilot/standard_pilot_test.goMaxToLock <= 0no-op behaviorJobGetAvailablecall timing out withcontext.DeadlineExceededVerification
Locally verified with:
GOPROXY=https://goproxy.cn,direct GOSUMDB=off go test ./rivershared/riverpilot -count=1Closes #1026.