High-level async Python client for codex app-server.
It gives you a convenient conversation API over stdio or websocket without having to manage raw protocol events yourself.
Documentation: https://emsi.github.io/codex-app-server-sdk/
- simple one-shot turns with
chat_once(...) - step-streaming turns with
chat(...)(thinking,exec,codex, etc.), non-delta - built-in thread/turn lifecycle handling
- thread-scoped config + forking via
ThreadHandle - inactivity timeout continuation for long-running turns
- turn cancellation with unread-step/event drain via
cancel(...) - optional low-level
request(...)access when needed
Install uv (if needed):
curl -LsSf https://astral.sh/uv/install.sh | shInstall the package from PyPI:
uv add codex-app-server-sdkOr pip-compatible install in the active environment:
uv pip install codex-app-server-sdk- Docs site: https://emsi.github.io/codex-app-server-sdk/
- PyPI: https://pypi.org/project/codex-app-server-sdk/
import asyncio
from codex_app_server_sdk import CodexClient
async def main() -> None:
async with CodexClient.connect_stdio() as client:
result = await client.chat_once("Hello from Python")
print(result.final_text)
asyncio.run(main())By default, stdio transport runs:
- command:
codex app-server
You can override via:
connect_stdio(command=[...])- environment variable:
CODEX_APP_SERVER_CMD
import asyncio
from codex_app_server_sdk import CodexClient
async def main() -> None:
async with CodexClient.connect_websocket() as client:
result = await client.chat_once("Hello over websocket")
print(result.final_text)
asyncio.run(main())Websocket defaults:
- URL:
CODEX_APP_SERVER_WS_URLorws://127.0.0.1:8765 - Bearer token:
CODEX_APP_SERVER_TOKEN(optional)
Both high-level APIs support resuming the same running turn.
import asyncio
from codex_app_server_sdk import CodexClient, CodexTurnInactiveError
async def main() -> None:
async with CodexClient.connect_stdio(inactivity_timeout=120.0) as client:
continuation = None
while True:
try:
if continuation is None:
result = await client.chat_once("Do a longer task")
else:
result = await client.chat_once(continuation=continuation)
print(result.final_text)
break
except CodexTurnInactiveError as exc:
continuation = exc.continuation
idle = (
f"{exc.idle_seconds:.1f}s"
if exc.idle_seconds is not None
else "unknown"
)
print(
f"[warn] turn inactive for {idle}; resuming "
f"(thread_id={continuation.thread_id}, turn_id={continuation.turn_id})"
)
asyncio.run(main())Use explicit thread handles when you need thread-scoped configuration.
import asyncio
from codex_app_server_sdk import CodexClient, ThreadConfig, TurnOverrides
async def main() -> None:
async with CodexClient.connect_stdio() as client:
thread = await client.start_thread(
ThreadConfig(
cwd="/home/me/project",
base_instructions="You are concise.",
developer_instructions="Prefer rg over grep.",
model="gpt-5",
)
)
result = await thread.chat_once("Summarize the repo layout.")
print(result.final_text)
await thread.update_defaults(ThreadConfig(model="gpt-5.1-codex-mini"))
forked = await thread.fork(
overrides=ThreadConfig(
developer_instructions="Focus on tests first.",
)
)
async for step in forked.chat(
"Run a quick diagnostics pass.",
turn_overrides=TurnOverrides(effort="low"),
):
print(step.step_type, step.text)
asyncio.run(main())CodexClient: connection/session scope (transport, request routing, lifecycle).ThreadHandle+ThreadConfig: thread scope (cwd,baseInstructions,developerInstructions,model, etc.).TurnOverrides: per-turn scope (cwd,model,effort,summary, ...).
UNSET(default): omit field from request payload; keep server default/current value.None: send JSONnullexplicitly (where protocol allows) to reset/clear.
Example:
from codex_app_server_sdk import ThreadConfig, UNSET
cfg = ThreadConfig(
model=UNSET, # omit key
developer_instructions=None, # send explicit null
)When resuming with continuation=..., do not pass extra turn-start arguments in
that same call. Specifically, do not pass: text, thread_id, user,
metadata, thread_config, or turn_overrides.
Apply thread changes via thread.update_defaults(...) or start a new/forked thread before continuing with a new turn.
More complete examples are under examples/.
All thread_* examples print lifecycle progress checkpoints by default so long operations are visible.
Use --quiet on those scripts for minimal output.
Recommended example for step-oriented API and continuation behavior.
Stdio:
uv run python examples/chat_steps_rich.pyWebsocket:
uv run python examples/chat_steps_rich.py --transport websocket --url ws://127.0.0.1:8765With extra payload summaries:
uv run python examples/chat_steps_rich.py --show-dataCancel timed-out turns instead of auto-resume:
uv run python examples/chat_steps_rich.py --cancel-on-timeoutCommon options:
--transport {stdio,websocket}--cmd "codex app-server"(stdio mode)--url ws://127.0.0.1:8765(websocket mode)--token "$CODEX_APP_SERVER_TOKEN"(websocket mode)--prompt "..."--user "..."--inactivity-timeout 120--show-data--cancel-on-timeout
uv run python examples/thread_config_and_fork.py \
--transport stdio \
--cwd . \
--base-instructions "Be concise." \
--developer-instructions "Prioritize correctness."Websocket:
uv run python examples/thread_config_and_fork.py \
--transport websocket \
--url ws://127.0.0.1:8765Quiet mode:
uv run python examples/thread_config_and_fork.py --quietuv run python examples/thread_resume_by_id.py \
--transport stdio \
--thread-id <existing-thread-id> \
--prompt "Continue the previous conversation."Quiet mode:
uv run python examples/thread_resume_by_id.py --thread-id <existing-thread-id> --quietThis example starts two new threads and runs turns concurrently on those fresh
ThreadHandles over one shared client connection (it does not call
thread/resume for the newly started threads).
uv run python examples/thread_concurrent_handles.py --transport stdioQuiet mode:
uv run python examples/thread_concurrent_handles.py --quietThis example uses the newly exposed helper APIs:
thread/read,thread/list,thread/name/set,thread/archivemodel/listconfig/read- endpoint-aware summaries with explicit
<not-provided>/nullvalues - optional thread model update reporting with
--set-model config/readprintsorigin_entries: count of config keys that include provenance metadata (which layer/file provided that effective value)
uv run python examples/thread_ops_showcase.py \
--transport stdio \
--prompt "Give a 3-bullet summary." \
--thread-name "showcase-thread"Websocket:
uv run python examples/thread_ops_showcase.py \
--transport websocket \
--url ws://127.0.0.1:8765Show model update intent and before/after thread snapshot model visibility:
uv run python examples/thread_ops_showcase.py --set-model gpt-5.3-codexWith raw payload dumps:
uv run python examples/thread_ops_showcase.py --show-dataQuiet mode:
uv run python examples/thread_ops_showcase.py --quietuv run python examples/chat_session_stdio.pyCustom command and prompts:
uv run python examples/chat_session_stdio.py \
--cmd "codex app-server" \
--prompt "First prompt" \
--prompt "Second prompt"uv run python examples/chat_session_websocket.pyWith explicit endpoint/token:
uv run python examples/chat_session_websocket.py \
--url ws://127.0.0.1:8765 \
--token "$CODEX_APP_SERVER_TOKEN"Or via environment:
export CODEX_APP_SERVER_WS_URL=ws://127.0.0.1:8765
export CODEX_APP_SERVER_TOKEN=your-token
uv run python examples/chat_session_websocket.pyconnect_stdio(...): create a stdio-configured client (unstarted).connect_websocket(...): create a websocket-configured client (unstarted).start(): connect transport and start receive loop (idempotent).initialize(params=None, timeout=None): perform JSON-RPC initialize handshake with default-merged params (protocolVersion,clientInfo,capabilities) and return normalizedInitializeResult.request(method, params=None, timeout=None): low-level JSON-RPC request helper.start_thread(config=None): create thread and returnThreadHandle.resume_thread(thread_id, overrides=None): resume thread and returnThreadHandle.fork_thread(thread_id, overrides=None): fork thread and returnThreadHandle.set_thread_defaults(thread_id, overrides): apply thread-level overrides viathread/resume.read_thread(thread_id, include_turns=True): read one thread.list_threads(...): list threads with optional filters.set_thread_name(thread_id, name): rename thread.archive_thread(thread_id)/unarchive_thread(thread_id): archive lifecycle controls.rollback_thread(thread_id, num_turns=...): drop recent turns from thread history.compact_thread(thread_id): request context compaction.chat(...)(text=None, thread_id=None, user=None, metadata=None, thread_config=None, turn_overrides=None, inactivity_timeout=None, continuation=None): async iterator yielding completed non-delta step blocks.chat_once(...)(text=None, thread_id=None, user=None, metadata=None, thread_config=None, turn_overrides=None, inactivity_timeout=None, continuation=None): send one user message and wait for completed turn.cancel(continuation, timeout=None): interrupt running turn, return unread steps/events, and clean turn state.steer_turn(thread_id=..., expected_turn_id=..., input_items=...): steer active turn input.start_review(thread_id=..., target=..., delivery=None): run review mode.list_models(...): discover available models.exec_command(command, ...): run one command via server command API.read_config(...),read_config_requirements(),write_config_value(...),batch_write_config(...): config APIs.interrupt_turn(turn_id, timeout=None): low-level turn interruption request.close(): cancel receive loop and close transport.
Transport.connect/send/recv/close: abstract interface.StdioTransport: line-delimited JSON over subprocess stdin/stdout.WebSocketTransport: JSON messages over websocket frames.
InitializeResult: parsed initialize response (protocol_version,server_info,capabilities,raw).ConversationStep: completed step fromchat(...)(step_type,item_type,text,item_id,thread_id,turn_id,data).ChatResult: buffered turn output (thread_id,turn_id,final_text,raw_events,assistant_item_id,completion_source).ChatContinuation: continuation token for timed-out running turns (thread_id,turn_id,cursor,mode).CancelResult: cancellation result with unreadsteps/raw_eventsplus terminal flags.ThreadConfig: thread-level config forthread/start,thread/resume,thread/fork(cwd,base_instructions,developer_instructions,model, ...).TurnOverrides: per-turn overrides forwarded toturn/start(cwd,model,effort, ...).UNSET: sentinel for “omit this field from request payload.”ApprovalPolicy: literal type for approval policy values (untrusted,on-failure,on-request,never).
thread_id: bound thread id.defaults: local thread config snapshot.chat_once(...): convenience one-turn call bound to this thread.chat(...): step-streaming call bound to this thread.update_defaults(overrides): apply thread defaults between messages.fork(overrides=None): fork thread and get a new handle.read(include_turns=True): low-level thread/read helper.set_name(name),archive(),unarchive(),rollback(num_turns),compact(): thread lifecycle/history helpers.start_review(target, delivery=None): thread-bound review API.
CodexError: base exception.CodexTransportError: transport/connectivity problems.CodexTimeoutError: request timeout (and base for timeout-related flow).CodexTurnInactiveError: per-turn inactivity timeout with resumablecontinuation.CodexProtocolError: protocol/JSON-RPC error (optionalcodeanddata).
- This version does not expose token-delta streaming as a public API.
chat(...)provides async streaming of completed step blocks (non-delta) from liveitem/completednotifications only.chat(...)intentionally does not mergethread/readsnapshot items for the same turn, avoiding duplicate blocks when snapshot item IDs differ from live event item IDs.chat_once(...)resolves final text from completedagentMessageitems (item/completed), withthread/read(includeTurns=true)fallback.turn_timeoutis intentionally removed to avoid conflicting timeout semantics.- Turn waits are controlled by
inactivity_timeout(or unbounded whenNone). cancel(...)interrupts a continuation turn, returns unread buffered data, and cleans internal session state so the same thread can be reused safely.- Advanced thread-level config/fork uses protocol v2 methods (
thread/start,thread/resume,thread/fork) exposed viaThreadHandleandThreadConfig. metadatais applied onturn/startpayloads for message turns; thread-level config uses schema-aligned fields on thread methods.- preferred lifecycle is
async with CodexClient.connect_*() as client:; manualstart()/close()remains available for advanced control. - The client uses modern thread/turn methods (
thread/start,thread/resume,turn/start,turn/interrupt). initializecurrently sendsprotocolVersion: "1"as handshake metadata.- Websocket transport targets
websockets(>=16,<17), usesadditional_headers, and disables compression by default (compression=None) for codex app-server compatibility. - After dependency changes, run
uv syncto refresh the virtual environment.
initialize() performs the protocol handshake and returns InitializeResult.
chat_once(...)andchat(...)callinitialize()automatically on first use.- call
initialize()explicitly when you want to fail fast before first turn, inspect server metadata, or send custom init params.
When params=None, the client sends:
{
"protocolVersion": "1",
"clientInfo": {
"name": "codex-app-server-sdk",
"version": "0.1.0"
},
"capabilities": {
"optOutNotificationMethods": [
"codex/event/agent_message_content_delta",
"codex/event/reasoning_content_delta",
"codex/event/item_started",
"codex/event/item_completed",
"codex/event/task_started",
"codex/event/task_complete"
]
}
}Supported/customizable keys:
protocolVersion: strclientInfo: dict(commonlyname,version, plus optional extra fields)capabilities: dictcapabilities.optOutNotificationMethods: list[str]- any additional top-level keys are passed through unchanged
Merge rules:
- the payload starts from the default block above;
- caller
paramsare shallow-merged at top level; - if caller provides
capabilitiesas a dict and omitsoptOutNotificationMethods, defaults are auto-injected; - if caller provides
capabilities.optOutNotificationMethods, caller value is preserved; - if caller sets
capabilitiestoNoneor a non-dict value, no injection is applied.
protocol_version: extracted fromprotocolVersionorprotocol_versionin server resultserver_info: extracted fromserverInfoorserver_infocapabilities: extracted fromcapabilitiesraw: full raw initialize result payload
import asyncio
from codex_app_server_sdk import CodexClient
async def main() -> None:
async with CodexClient.connect_stdio() as client:
init = await client.initialize(
{
"clientInfo": {
"name": "my-client",
"version": "0.3.0",
},
"capabilities": {
"optOutNotificationMethods": [
"codex/event/agent_message_content_delta"
]
},
}
)
print(init.protocol_version)
asyncio.run(main())