ADR - NorFab Future-Based Interactive Client¤
Status¤
Proposed.
Date¤
2026-06-06.
Decision¤
Implement a future-based client job API with blocking synchronous methods only.
The canonical non-blocking submission API will be:
future = client.submit_job(...)
Callers can then block for job events or final results:
for event in future.events():
...
result = future.result()
NFPClient.run_job() remains the stable synchronous API. Existing Python API
callers, worker built-in clients, service internals, tests, and NFCLI commands
that do not consume events can continue using run_job() for blocking job
execution.
Do not introduce new NFP protocol commands for this refactor. Use the existing command set:
NFP.POSTto submit jobs.NFP.GETto poll job status and retrieve results.NFP.RESPONSEto report accepted, pending, waiting, failed, and completed job states.NFP.EVENTto push progress, lifecycle messages, and input requests from worker to client.NFP.PUTto send job-scoped client input back to the worker.NFP.STREAMto continue serving file-transfer bytes.
Add a required event_type field to all event payloads. Keep the event type
set minimal:
progressinput_requestinput_response
Use progress for normal progress messages and job lifecycle updates such as
started, completed, failed, or stale. Use status for the exact lifecycle
state.
Context¤
The current implementation already has most pieces required for this design.
Current Client Behavior¤
norfab/core/client.py has:
- Client-side SQLite job storage.
- A receiver thread that is the only socket reader.
- A dispatcher thread that submits
NEWjobs withPOSTand polls active jobs withGET. run_job()that creates a job record and blocks by polling the client job database until the result is ready.event_queue, currently used by NFCLI shell progress rendering.
This refactor removes event_queue from the client-side event path. Job events
should be delivered to futures instead.
Current Worker Behavior¤
norfab/core/worker.py has:
- Worker-side SQLite job storage.
- A running-job registry.
- A per-job
client_input_queue. Job.wait_client_input()._put(...), which receivesNFP.PUTpayloads and places decoded JSON into the running job input queue._event(...), which sendsNFP.EVENTpayloads to the original client._get(...), which can return status102forWAITING_CLIENT_INPUT, although current code does not appear to set that status yet.
Current Broker and Protocol Behavior¤
norfab/core/broker.py is a thin ROUTER mediator:
- It routes client commands to selected workers.
- It routes worker
RESPONSE,EVENT, andSTREAMmessages back to clients. - It does not persist job state, event state, or prompt ownership.
norfab/core/NFP.py already defines PUT, EVENT, STREAM, POST, GET,
and RESPONSE. The file streaming design in
docs/development/file_streaming_fetch_file.md already treats PUT as a
job-scoped client-to-worker control channel. The changelog for 0.15.0 also
notes that PUT can be extended for user input during job execution.
Goals¤
- Add a first-class job future object.
- Keep the public API blocking and synchronous.
- Keep
run_job()working exactly as the stable synchronous API. - Remove client-side
event_queue. - Update NFCLI shells so event/progress rendering uses future event streams.
- Add a required, minimal
event_typefield to event payloads. - Use existing NFP commands only.
- Keep file streaming behavior unchanged.
- Keep broker changes minimal.
- Avoid adding a new
ClientJobCoordinatorclass unless implementation proves the existing client internals become too hard to maintain.
Non-Goals¤
- No asyncio API in this refactor.
- No full asyncio rewrite of client, broker, or worker loops.
- No new NFP command constants.
- No broker-managed prompt persistence.
- No
Job.confirm()convenience method. - No
input_request.kindfield. - No worker suspension/resume engine. A worker job waiting for user input can occupy its job thread until timeout or reply.
Proposed API¤
Future Object¤
Add an NFPJobFuture object created by NFPClient.submit_job().
Recommended shape:
class NFPJobFuture:
uuid: str
service: str
task: str
workers_requested: str | list[str]
def result(self, timeout: int | float | None = None, markdown: bool = False) -> Any: ...
def events(self, timeout: int | float | None = None): ...
def send_response(
self,
input_id: str,
value: Any = None,
*,
worker: str | None = None,
cancel: bool = False,
metadata: dict | None = None,
) -> None: ...
The future is not a local executor job. It is a handle to a remote NorFab job tracked by UUID, client job database state, and in-memory event queues.
future.events() is a blocking iterator. It should yield events for the job
until the job is terminal and all queued events have been consumed. Callers can
then call future.result() to retrieve final job results.
Client Methods¤
Add:
future = client.submit_job(
service="nornir",
task="cli",
kwargs={"commands": ["show version"]},
workers="all",
timeout=600,
)
Keep:
result = client.run_job(
service="nornir",
task="cli",
kwargs={"commands": ["show version"]},
workers="all",
timeout=600,
)
run_job() can be internally implemented through submit_job() as part of
this refactor, but its external behavior should not change.
Protocol Usage¤
No new NFP command should be added for this refactor.
Job Submission¤
Client submits jobs exactly as today:
Client -> Broker: NFP.POST
Broker -> Worker: NFP.POST
The client-side future is created before or during job DB insertion. It is then updated by the same receiver and dispatcher paths that update the SQLite job database.
Job Result Polling¤
Client polling remains:
Client -> Broker: NFP.GET
Broker -> Worker: NFP.GET
Worker -> Broker: NFP.RESPONSE
Broker -> Client: NFP.RESPONSE
Worker status values remain status-code based:
201: worker accepted/created job.202: broker dispatched job.300: worker still processing.102: worker is waiting for client input.200: worker returned result.4xxor5xx: failure.
The client should handle 102 as a non-terminal state.
When a worker returns 102, the response payload should include the current
pending input request if one exists. This is required for client restart
recovery because NFP.EVENT messages emitted while a client is offline may not
be delivered to the restarted client.
Example 102 response payload:
{
"worker": "netbox-worker-1",
"uuid": "job-uuid",
"service": "netbox",
"status": "WAITING_CLIENT_INPUT",
"input_request": {
"id": "input-uuid",
"question": "Apply 12 interface updates?",
"default": false,
"required": true,
"choices": null,
"metadata": {
"summary": {
"create": 4,
"update": 8,
"delete": 0
}
}
}
}
When the client receives a 102 payload with input_request, it should expose
that request to the matching future as an event_type="input_request" event
and store it in the client events table.
Events¤
Workers continue sending progress and messages with NFP.EVENT.
Every event must include event_type. Worker-side event creation should set
event_type="progress" automatically when the caller does not provide an event
type.
Event types:
progress: normal progress, lifecycle update, status update, or user-facing message.input_request: worker is asking the client for input.input_response: worker is reporting that input was received, cancelled, or timed out.
Existing event fields remain part of the contract:
event_typemessageseveritystatusresourcetimestamptaskextrastimeout
Clients should treat an event without event_type as malformed. Worker-side
event creation is responsible for setting it automatically.
Input Request Event¤
An input request is an ordinary NFP.EVENT with event_type="input_request"
and structured data under extras.input_request.
Example event payload:
{
"event_type": "input_request",
"uuid": "job-uuid",
"worker": "netbox-worker-1",
"service": "netbox",
"task": "sync_device_interfaces",
"message": "Apply 12 interface updates?",
"severity": "INFO",
"status": "waiting_client_input",
"resource": [],
"timeout": 120,
"extras": {
"input_request": {
"id": "input-uuid",
"question": "Apply 12 interface updates?",
"default": false,
"required": true,
"choices": null,
"metadata": {
"summary": {
"create": 4,
"update": 8,
"delete": 0
}
}
}
}
}
The client detects an input request by checking:
event.get("event_type") == "input_request"
and then reading:
input_request = event.get("extras", {}).get("input_request")
Input Reply¤
The client replies using NFP.PUT, as file streaming already does for chunk
offset requests.
Input reply payload:
{
"input_id": "input-uuid",
"value": true,
"cancel": false,
"metadata": {}
}
File streaming remains unchanged and continues using payloads such as:
{
"offset": 256000
}
Worker _put(...) can continue placing decoded PUT payloads into the running
job's client_input_queue. Job.request_input() validates the input_id and
ignores unrelated messages.
Worker API¤
Add Job.request_input(...).
answer = job.request_input(
question="Apply changes?",
default=False,
timeout=120,
metadata={"summary": summary},
)
Expected flow:
- Generate
input_id. - Set worker DB job status to
WAITING_CLIENT_INPUT. - Persist the pending input request in worker-side job state or events.
- Emit
NFP.EVENTwithevent_type="input_request". - Wait on
Job.wait_client_input(timeout=...). - Validate matching
input_id. - Clear the pending input request and restore worker DB job status to
STARTED. - Emit
NFP.EVENTwithevent_type="input_response"andstatusset toreceived,cancelled, ortimeout. - Return the reply value, return default, or raise timeout/cancel according to helper policy.
Recommended timeout behavior:
- Default behavior should raise a clear timeout exception.
- Default-on-timeout behavior should only be added if it is explicitly needed by the first interactive tasks.
Client Internals¤
Avoid adding a separate ClientJobCoordinator in this refactor. The existing
client internals can handle the new behavior with focused additions.
Required client changes:
- Add
self.job_futures: dict[str, NFPJobFuture]toNFPClient. - Add
submit_job(...)to create the job DB record and register a future. - Remove
self.event_queueand stop exposing raw event queue consumption as the NFCLI progress mechanism. - Extend
handle_event(...): - Store the event in SQLite as today.
- Notify the matching future.
- Extend
handle_response(...): - Update the job DB as today.
- Notify the matching future of status/result changes.
- Treat status
102as a non-terminal waiting-for-input state. - If a
102response containsinput_request, synthesize and store anevent_type="input_request"event for the matching future. - Extend stale-job handling in
poll_active_jobs(...)to notify the matching future. - Implement
NFPJobFuture.send_response(...)by calling the owning clientsend_to_broker(NFP.PUT, ...).
This keeps the refactor close to the current code shape and avoids introducing an abstraction before it is clearly needed.
Client Restart Recovery¤
Client restart recovery should use the existing client job database and worker job persistence.
Assumptions:
- Client job records survive process restart in
ClientJobDatabase. - Worker job records survive while the job is still pending, running, waiting for input, or completed within the worker retention window.
- Broker remains stateless. It does not persist offline client events.
Recovery behavior:
- On client startup, scan the client job database for non-terminal jobs
(
NEW,SUBMITTING,DISPATCHED,STARTED) that have not exceeded their deadline. - Recreate in-memory futures for those jobs in
self.job_futures. - Let the existing dispatcher continue polling those jobs with
NFP.GET. - If a worker is waiting for input, its
GETresponse returns102with the pendinginput_request. - Client
handle_response(...)turns that102.input_requestpayload into anevent_type="input_request"event for the recovered future. - NFCLI event rendering, now based on
future.events(), prompts the user and sends the answer withfuture.send_response(...).
Worker-side routing note:
- A restarted client reconnects with the same ZeroMQ identity, so the original
Job.client_addressremains valid for routing follow-upinput_response,progress, and completion events. - No worker-side client address refresh is required for the normal restart recovery path.
Lost event behavior:
- Progress events emitted while the client was offline may be unavailable to the restarted client unless they are retrieved from worker/client databases.
- Pending input requests must not rely only on event delivery. They must be
recoverable through
GETstatus102.
Broker Changes¤
Broker changes should be minimal.
Required:
- None for the happy path if
NFP.PUTalready routes correctly.
Recommended hardening:
- Improve logging for
PUTdispatch failures. - Return a clear client error when a
PUTtarget cannot be routed, if this can be done without changing the protocol. - Document that
PUTis a job-scoped control command used by both file streaming and interactive input.
Do not add broker prompt persistence in this refactor.
NFCLI Usage¤
Simple NFCLI commands that do not consume job events can keep using run_job():
result = NFCLIENT.run_job(
service="nornir",
task="cli",
kwargs={"commands": commands, "progress": progress},
workers=workers,
timeout=timeout,
)
All NFCLI command paths that display progress/events should move to
submit_job() and consume the future event stream. The shared client
event_queue and requeue behavior should be removed.
Synchronous shell style:
future = NFCLIENT.submit_job(
service="netbox",
task="sync_device_interfaces",
kwargs={"dry_run": False, "progress": progress},
workers=workers,
timeout=timeout,
)
for event in future.events():
if event.get("event_type") == "input_request":
input_request = event.get("extras", {}).get("input_request", {})
answer = ask_user_in_shell(input_request)
future.send_response(
input_request["id"],
answer,
worker=event.get("worker"),
)
else:
print_progress_event(event)
result = future.result()
The existing listen_events_thread() should be removed or rewritten as a
future-based helper. It should accept an NFPJobFuture and consume
future.events() instead of reading NFCLIENT.event_queue.
The @listen_events decorator and all NFCLI shell modules that rely on it
should be updated to use submit_job() and future event consumption for job
event rendering.
Worker Built-In Client Usage¤
Workers that internally call other services should keep using the synchronous client API.
Example:
result = self.client.run_job(
service="filesharing",
task="file_details",
kwargs={"url": url},
workers="all",
timeout=timeout,
)
This keeps current task implementations stable and avoids changing worker built-in client behavior.
Python API Usage¤
Existing Blocking API¤
result = client.run_job(
service="nornir",
task="cli",
kwargs={"commands": ["show version"]},
workers="all",
timeout=600,
)
Future API¤
future = client.submit_job(
service="nornir",
task="cli",
kwargs={"commands": ["show version"]},
workers="all",
timeout=600,
)
for event in future.events():
print(event["message"])
result = future.result()
Interactive Future API¤
future = client.submit_job(
service="netbox",
task="sync_device_interfaces",
kwargs={"dry_run": False},
workers="all",
timeout=600,
)
for event in future.events():
if event.get("event_type") == "input_request":
input_request = event.get("extras", {}).get("input_request", {})
answer = ask_user(input_request)
future.send_response(
input_request["id"],
answer,
worker=event.get("worker"),
)
result = future.result()
Implementation Refactoring Plan¤
Client Future Core¤
- Add
NFPJobFuture. - Add
NFPClient.submit_job(). - Add future registry by UUID.
- Remove client
event_queue. - Fan out
EVENTandRESPONSEupdates to futures from existing client handler functions. - Keep
run_job()behavior unchanged.
Interactive Input¤
- Add
Job.request_input(). - Emit input requests as normal
NFP.EVENTpayloads withevent_type="input_request"andextras.input_request. - Send replies as
NFP.PUTpayloads withinput_id. - Handle worker
102 WAITING_CLIENT_INPUTresponses as non-terminal.
NFCLI Integration¤
- Keep simple commands without event/progress rendering on
run_job(). - Move all NFCLI commands that display job events/progress to
submit_job(). - Remove shared-queue requeue behavior in
listen_events_thread()and replace it with future event consumption. - Update all NFCLI shell modules using
@listen_eventsor rawNFCLIENT.event_queueaccess.
Tests and Documentation¤
- Test existing
run_job()callers. - Test future result waiting.
- Test sync event consumption.
- Test required
event_typehandling. - Test input request, input reply, cancel, timeout, and multi-worker prompts.
- Test file streaming to ensure existing
PUToffset behavior is unchanged.
Compatibility¤
- Existing
run_job()callers continue working. - Existing worker tasks continue working.
- Existing worker built-in clients continue using
run_job(). - Existing event payload fields remain valid, with
event_typeadded as a required field. - Existing file streaming remains unchanged.
- No new NFP commands are introduced.
Risks¤
Prompt Reply Routing¤
For workers="all", more than one worker can ask for input. The client should
reply to the worker that emitted the request. future.send_response(...) should
require worker when more than one active input request exists.
Worker Thread Occupancy¤
Waiting for client input occupies a worker job thread. Use timeouts and keep interactive prompts short.
NFCLI Event Migration¤
Removing event_queue means all NFCLI progress/event rendering must be routed
through futures. The migration should update every NFCLI shell path using
listen_events_thread(), @listen_events, or raw NFCLIENT.event_queue
access.
Future Registry Cleanup¤
Completed futures should be removed from client.job_futures after final
event consumption is complete. A small retention window can help debugging,
but unbounded retention should be avoided.
Open Questions¤
- Should
submit_job()be the only new public method, or shouldrun_job(future=True)also be added as convenience? - Should
Job.request_input()return the default on timeout only when explicitly configured, or always raise on timeout? - Should broker return an explicit error response for failed
PUTdispatch, or is logging enough for this refactor?