Activities
Activities are the side-effectful unit of work. Anything non-deterministic (an HTTP call, a database write, file I/O, a random number, the current time) belongs in an activity, not in the workflow function.
The shape of an activity
An activity is a Go function with this signature:
type ActivityFunc func(ctx sdk.ActivityContext, input any) (any, error)Register one with the worker client:
client.RegisterActivity("Process", func(ctx sdk.ActivityContext, input any) (any, error) {
payload := input.(map[string]any)
if err := chargeCustomer(payload["customer_id"].(string)); err != nil {
return nil, err
}
return map[string]any{"status": "charged"}, nil
})Workflows schedule it the same way they schedule any other activity:
client.RegisterWorkflow("Checkout", func(ctx sdk.WorkflowContext, input any) (any, error) {
ctx.QueueActivity("Process", input)
return "done", nil
})Inputs and outputs are JSON-encoded on the wire. Use any type that round-trips cleanly through encoding/json. Inside the activity, the input arrives as any decoded from JSON, so you typically re-marshal into a typed struct:
type ChargeInput struct {
CustomerID string `json:"customer_id"`
Amount float64 `json:"amount"`
}
client.RegisterActivity("Charge", func(ctx sdk.ActivityContext, raw any) (any, error) {
var in ChargeInput
bytes, _ := json.Marshal(raw)
if err := json.Unmarshal(bytes, &in); err != nil {
return nil, err
}
return chargeCard(in.CustomerID, in.Amount)
})ActivityContext
The ctx passed into your function exposes two methods:
type ActivityContext interface {
Heartbeat(details []byte) (cancelRequested bool, err error)
TaskToken() string
}Heartbeat(details)tells the engine the activity is still alive. It refreshes the engine-side visibility timeout for the task so the reclaim loop does not pull it back. It also returnscancelRequested=trueif someone calledCancelWorkflowon the parent workflow.TaskToken()returns the opaque token the engine uses to identify this activity attempt. Useful for log correlation; not load-bearing for control flow.
Heartbeats
Short activities (under a few seconds) do not need to heartbeat. Long activities should heartbeat periodically so the engine knows they are still progressing and propagates cancellation back to them.
client.RegisterActivity("Reindex", func(ctx sdk.ActivityContext, _ any) (any, error) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
rows, err := openCursor()
if err != nil { return nil, err }
defer rows.Close()
processed := 0
for rows.Next() {
if err := processRow(rows); err != nil { return nil, err }
processed++
select {
case <-ticker.C:
cancel, err := ctx.Heartbeat([]byte(fmt.Sprintf(`{"processed":%d}`, processed)))
if err != nil { return nil, err }
if cancel { return nil, errors.New("canceled by workflow") }
default:
}
}
return processed, nil
})The details payload is opaque to the engine but visible in the dashboard, useful for debugging long-running work.
Cancellation
When someone calls CancelWorkflow(workflowID), the engine marks the workflow's cancel-requested flag and appends WorkflowCancelRequested to history. The flag is checked on every heartbeat and surfaced through the cancelRequested return value.
There is no out-of-band cancel signal to the activity goroutine. Activities that need to cancel must heartbeat and check the return value. Activities that do not heartbeat will run to completion regardless of cancellation; the engine just discards their result.
Retries
When an activity returns an error, the engine consults the registered RetryPolicy and either schedules a retry or marks the activity permanently failed.
The default policy applies if you do not specify one. Per-activity retry policy overrides are on the roadmap (see roadmap); today the engine uses:
&engine.RetryPolicy{
InitialInterval: 1 * time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 60 * time.Second,
MaximumAttempts: 5,
}The retry delay is computed by RetryPolicy.BackoffFor(attempt) and a durable timer is registered for that delay. When the timer fires, the engine re-enqueues the activity task. History records ActivityRetryScheduled for each retry and ActivityFailed for the final attempt if all retries exhaust.
A successful attempt after retries appends ActivityCompleted as if it had succeeded on the first try; from the workflow's perspective the activity simply took longer.
Idempotency
Retries mean your activity function may run more than once for the same logical task. Make it idempotent. The standard patterns:
- Conditional inserts.
INSERT ... ON CONFLICT DO NOTHINGwith a deterministic ID. - External idempotency keys. Most payment and email APIs accept an
Idempotency-Keyheader; derive it fromworkflow_id + activity_name + attempt-independent input. - Check-then-write. Look up whether the side effect happened (a row, a file, a remote record). Skip if so.
The engine cannot make your os.Exec idempotent for you. That is on you.
Visibility timeout and reclaim
When a worker calls PollActivityTask or receives a task on the streaming channel, the engine records a visibility_timeout. If the worker does not call CompleteActivity (and does not heartbeat) before that timeout, the engine's reclaim loop pulls the task back and re-dispatches it to another worker.
Heartbeat resets the visibility timeout. The default initial timeout is configured engine-side; long activities that heartbeat keep extending it.
This is why a worker crash mid-activity does not lose work: the un-acked task gets reclaimed and another worker picks it up. The activity then runs again from scratch (which is why idempotency matters).
Errors vs panics
A panic inside an activity function aborts the worker goroutine but does not crash the worker process; the SDK catches it and reports the failure to the engine. The engine treats it the same as a returned error: retry per policy.
Returning a wrapped error preserves the error chain through JSON encoding only as the top-level message. Structured error data should go in a custom typed return that the workflow can inspect, not in the error.
What to read next
- Workflows for how activities chain together.
- Replay model for why your workflow function re-runs but your activity does not.
- Observability for the metrics activities emit.