Callbacks are the cornerstone of asynchronous programming.
If you want to calculate 2 + 2 and show the results, the
synchronous programming approach would be:
In asynchronous programming, this task is broken apart into two discrete steps: computation and result handling. Using jobqueue, this looks like:
The asynchronous format allows parts of your code to run independently, in this case on separate R processes. When a part finishes, the callback will be executed to allow you to work with the result.
Callbacks are not limited to just the job finishing. See
the “State Triggers” section for a list of all events that can trigger a
callback.
Important
Hooks are evaluated on the main process, not the background processes. Therefore, ensure that the callback functions execute quickly so as to not delay
jobhandling.
The callback function should accept one argument: the object that triggered the callback. Functions accepting zero or multiple arguments are also allowed.
This is a great place for R’s new shorthand function definitions (R
>= 4.1.0). Lambda syntax can also be used (see
rlang::as_function()).
# jobqueue hooks
hook <- function (jq) { message('jobqueue is ', jq$state) }
hook <- \(jq) message('jobqueue is ', jq$state)
hook <- ~message('jobqueue is ', .$state)
# worker hooks
hook <- function (worker) { message('worker is ', worker$state) }
hook <- \(w) message('worker is ', w$state)
hook <- ~message('worker is ', .$state)
# job hooks
hook <- function (job) { message('job is ', job$state) }
hook <- \(j) message('job is ', j$state)
hook <- ~message('job is ', .$state)jobqueue, worker, and job
objects update their $state as described in the tables
below. Each time the state changes, any callbacks registered to that
state are executed. In addition, you can register
state = '*' or state='.next' which trigger
regardless of the present state name.
| State | Triggers |
|---|---|
'*' |
Every time the state changes. |
'.next' |
Only one time, the next time the state changes. |
jobqueue States| State | Triggers |
|---|---|
'starting' |
After initialization, before workers are started. |
'idle' |
When all workers are idle. Also, after initial
startup. |
'busy' |
At least one worker is busy. |
'stopped' |
After <jobqueue>$stop() is called. |
worker States| State | Triggers |
|---|---|
'starting' |
Background process is being configured. |
'idle' |
Waiting on jobs to be submitted. |
'busy' |
After a job starts running. |
'stopped' |
After <worker>$stop() is called. |
job States| State | Triggers |
|---|---|
'created' |
After job object initialization. |
'submitted' |
After <job>$queue is assigned. |
'queued' |
After stop_id and copy_id are
resolved. |
'starting' |
Before evaluation begins. |
'running' |
After <job>$worker is set and evaluation
begins. |
'done' |
After <job>$output is assigned. |
Callbacks can be attached to jobqueue,
worker, or job objects.
You can add callbacks either in the constructor, or later with
$on().
hook <- ~message(.$uid, ' is ', .$state)
jq <- jobqueue(hooks = list(q_idle = hook))
w <- worker_class$new(hooks = list(idle = hook))
j <- job_class$new(hooks = list(done = hook))
jq$on('busy', hook)
w$on('busy', hook)
j$on('starting', hook)In jobqueue(), hooks can set hooks for the
jobqueue, worker, and job. The
rules are:
q_, w_, or j_
attaches the hook to the jobqueue, worker, or
job, respectively.jobs.hooks, of the format:Callbacks attached in the constructor cannot be removed.
When you attach a callback with $on(), the return value
is a function, which, when called, will remove that callback from the
object.
job HooksWhen you create a jobqueue, you can define a set of
callbacks to automatically apply to all jobs created with
the <jobqueue>$run() command.
n <- 0
jq <- jobqueue(hooks = list(created = ~{ n <<- n + 1 } ))
for (i in 1:5) jq$run({ 'Hi' })
n
#> [1] 5How does jobqueue() know to apply the hooks
to job objects instead of the jobqueue or
worker objects? Unless otherwise indicated,
jobqueue() hooks are assumed to be for
jobs. You can also explicitly specify that
hooks are for jobs by using the formats
described in the “Attaching Callbacks” section above.
jq <- jobqueue(hooks = list(j_created = ~{ n <<- n + 1 } ))
# or
jq <- jobqueue(hooks = list(job = list(created = ~{ n <<- n + 1 } )))If you set hooks in <jobqueue>$run(),
those hooks will REPLACE the job hooks from
jobqueue().
Below, we’ll set up a callback function that triggers when each
job enters the 'queued' state. It will modify
the job, adding a custom <job>$priority
field to the job object. Then it will modify the
jobqueue‘s internal list of jobs
(<jobqueue>$jobs), sorting them according to each
job’s <job>$priority. Last, it will
attach additional callbacks to the job to output timing
information upon exit from the 'queued’ state and upon
entry into the 'done' state.
library(glue)
library(jobqueue)
# Our callback/hook function.
prioritize <- function (job) {
queue <- job$jobqueue
queue_jobs <- job$jobqueue$jobs
# Apply a random priority to this job.
job$priority <- round(runif(1) * 10) - 5
# Sort all this jobqueue's jobs by priority (including this job).
priorities <- sapply(queue_jobs, `[[`, 'priority')
job$jobqueue$jobs <- queue_jobs[order(priorities)]
# Add hooks to this job to report queued/total times.
t1 <- Sys.time()
tdiff <- function () format(round(Sys.time() - t1, 1))
job$on('.next', ~message(glue(
'job {.$uid} (priority {.$priority}) was {.$state} after {tdiff()}' )))
job$on('done', ~message(glue(
'job {.$uid} (priority {.$priority}) finished in {tdiff()}' )))
}
# A single worker best illustrates processing order.
jq <- jobqueue(
'workers' = 1,
'hooks' = list(queued = prioritize) )
for (i in 1:5) {
job <- jq$run({ 3.14 })
message(glue_data(job, 'Created job {uid} with priority {priority}'))
}
#> job J11 (priority -3) was dispatched after 0.1 secs
#> Created job J11 with priority -3
#> Created job J12 with priority -2
#> Created job J13 with priority 1
#> Created job J14 with priority 0
#> Created job J15 with priority -1
#> job J11 (priority -3) finished in 0.7 secs
#> job J12 (priority -2) was dispatched after 0.6 secs
#> job J12 (priority -2) finished in 1.1 secs
#> job J15 (priority -1) was dispatched after 0.7 secs
#> job J15 (priority -1) finished in 1.3 secs
#> job J14 (priority 0) was dispatched after 1.4 secs
#> job J14 (priority 0) finished in 2 secs
#> job J13 (priority 1) was dispatched after 2.1 secs
#> job J13 (priority 1) finished in 2.6 secsIn an actual application, you could set
<job>$priority based on
<job>$vars.
prioritize <- function (job) {
# Give priority to lower number of replications
job$priority <- job$vars$replications
queue_jobs <- job$jobqueue$jobs
priorities <- sapply(queue_jobs, `[[`, 'priority')
job$jobqueue$jobs <- queue_jobs[order(priorities)]
}
jq <- jobqueue(hooks = list(queued = prioritize))
# vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
for (reps in 5:1) job <- jq$run({ 3.14 }, vars = list(replications = reps))Or, define the custom <job>$priority field in
<jobqueue>$run().
prioritize <- function (job) {
queue_jobs <- job$jobqueue$jobs
priorities <- sapply(queue_jobs, `[[`, 'priority')
job$jobqueue$jobs <- queue_jobs[order(priorities)]
}
jq <- jobqueue(hooks = list(queued = prioritize))
# vvvvvvvvvvvvvvv
for (reps in 5:1) job <- jq$run({ 3.14 }, priority = reps)Or, set <job>$priority in a hook that triggers
before prioritize() is triggered.
set_priority <- function (job) {
job$priority <- job$vars$replications
}
prioritize <- function (job) {
queue_jobs <- job$jobqueue$jobs
priorities <- sapply(queue_jobs, `[[`, 'priority')
job$jobqueue$jobs <- queue_jobs[order(priorities)]
}
# vvvvvvvvvvvvvvvvvvvvvv
jq <- jobqueue(hooks = list(created = set_priority, queued = prioritize))
for (reps in 5:1) job <- jq$run({ 3.14 }, vars = list(replications = reps))Say you’re hosting a web service, where users are allowed to submit
one job every 30 seconds. jobs can take more
than 30 seconds, so the solution is more complex than setting
stop_id = user_id. And what if you want to stop the new
job instead of the old one?
rate_limit <- function (job) {
job$t_start <- Sys.time()
for (j in job$jobqueue$jobs)
if (j$user_id == job$user_id)
if (job$t_start - j$t_start < 30)
job$stop('Rate Limit Exceeded')
}
jq <- jobqueue(hooks = list(submitted = rate_limit))
j_A1 <- jq$run({ 42 }, user_id = 'A')
j_B1 <- jq$run({ 42 }, user_id = 'B')
j_B2 <- jq$run({ 42 }, user_id = 'B')
j_B1$result
#> [1] 42
j_B2$result
#> <interrupt: Rate Limit Exceeded>Note that the above code won’t completely solve the rate limiting
task. If a user’s job only takes five seconds to complete,
then they could submit a job every six seconds and the
jobqueue would be none the wiser.
To give the jobqueue awareness of previously completed
jobs, you’ll need to persistently store per-user
job start times - like in the below solution.
t_user <- list()
rate_limit <- function (job) {
t_start <- Sys.time()
t_diff <- t_user[[job$user_id]] - t_start
if (isTRUE(t_diff < 30)) { job$stop('Rate Limit Exceeded') }
else { t_user[[job$user_id]] <<- t_start }
}
jq <- jobqueue(hooks = list(created = rate_limit))In the first example, we attached a hook to 'submitted'
because that’s when <job>$jobqueue becomes available
in callbacks. In the latter example, we attached a hook to
'created' instead because we didn’t need
<job>$jobqueue for that solution. Check the trigger
order listed in the “Callback Triggers” section above, and attach
callbacks as early as possible to expedite job
handling.