The tasking system enables our app to complete actions, i.e. tasks, asynchronously from the backend API. For example, when creating a repository, a “snapshot” task is triggered, and begins to run in parallel to the API server.
For a detailed overview of the tasking system see this video and the accompanying slide deck.
Package | Description |
---|---|
pkg/tasks | each file contains code for handling a particular task type |
pkg/tasks/queue | queue used by client and worker to schedule tasks |
pkg/tasks/client | an interface to enqueue a task |
pkg/tasks/worker | an interface to dequeue and handle tasks |
pkg/tasks/payloads | workaround to import certain payloads to dao layer, but payloads are not generally defined here |
Queue
is an interface used by the client and worker packages for scheduling tasks. It is meant to be used through client or worker, not imported independently.
https://github.com/content-services/content-sources-backend/blob/173f764d031da46665136a317caa8213e3677ad7/pkg/tasks/queue/queue.go#L28-L53
TaskClient
is an interface for enqueuing or canceling tasks.
https://github.com/content-services/content-sources-backend/blob/173f764d031da46665136a317caa8213e3677ad7/pkg/tasks/client/client.go#L12-L15
TaskWorkerPool
is an interface used by the main application to configure and start the workers and the heartbeat listener.
https://github.com/content-services/content-sources-backend/blob/173f764d031da46665136a317caa8213e3677ad7/pkg/tasks/worker/worker_pool.go#L17-L28
A worker pool will manage the individual workers. Workers are meant to be used through the TaskWorkerPool
interface, not directly.
Each worker is a goroutine that follows the logic loop below:
Tasks can be cancelled. Every worker listens on a postgres channel, named using the task ID. If that channel receives a notification from the client, the worker’s current task is cancelled.
The tasking system runs in two different processes, the API and the consumer.
The API is the main API server, where tasks are enqueued from endpoint handlers.
The consumer runs two sets of goroutines: the workers and the heartbeat listener.
To add a new task you must define a handler method. Each handler method should end with a Run()
method that performs the task. Tasks should be written to be idempotent i.e. they can be re-run without causing errors.
Here is the snapshot handler as an example:
https://github.com/content-services/content-sources-backend/blob/173f764d031da46665136a317caa8213e3677ad7/pkg/tasks/repository_snapshot.go#L27-L53
To make a task cancellable, it must be added to the list of cancellable tasks. If a cleanup action is required to support cancellation, this should be implemented as a defer call in the Run()
method. See how the snapshot task does cancellation cleanup here:
https://github.com/content-services/content-sources-backend/blob/173f764d031da46665136a317caa8213e3677ad7/pkg/tasks/repository_snapshot.go#L71-L78
Once a handler is created, it needs to be registered to the worker pool. We register our tasks here:
https://github.com/content-services/content-sources-backend/blob/173f764d031da46665136a317caa8213e3677ad7/cmd/content-sources/main.go#L109-L115
See here for a list of all current task types:
https://github.com/content-services/content-sources-backend/blob/173f764d031da46665136a317caa8213e3677ad7/pkg/config/tasks.go#L3-L10