Event sinks
JSON Lines
JSONLSink appends one compact JSON object per line and creates missing parent
directories:
from agentlogsafe import AgentLogger, JSONLSink
log = AgentLogger(sink=JSONLSink("var/audit/agent-events.jsonl"))
Passing the path directly is equivalent. Writes are synchronous and protected from threads within one process. Coordinate writes externally or use a centralized sink when multiple processes target the same file.
Durability
from agentlogsafe import Durability, JSONLSink
sink = JSONLSink("events.jsonl", durability=Durability.FSYNC)
| Mode | Successful write means |
|---|---|
OS_BUFFERED |
File handle closed; data may remain in the operating-system cache |
FLUSH |
Python buffers explicitly flushed before close |
FSYNC |
The operating system was asked to commit data to durable storage |
Hardware and remote filesystems can provide weaker guarantees than fsync implies.
Memory sink
from agentlogsafe import AgentLogger, MemorySink
sink = MemorySink()
log = AgentLogger(sink=sink)
log.user_message("hello")
assert sink.events[0].event_type == "user_message"
Events are retained until the sink is released or its events list is cleared.
Null sink
NullSink discards events. It is the default when no sink is supplied and can be
useful when a feature flag disables persistence while callers still depend on the
returned event object.
Custom sinks
Implement one method matching the EventSink protocol:
from agentlogsafe import AgentEvent
class QueueSink:
def __init__(self, queue):
self.queue = queue
def write(self, event: AgentEvent) -> None:
self.queue.put(event.to_dict())
Sink implementations should define their own retry, buffering, durability, and failure semantics. Raise an exception when an event was not accepted; silently dropping audit events makes completeness impossible to assess.
Buffered sink
BufferedSink decouples producers from a synchronous destination using a bounded
queue and one background thread:
from agentlogsafe import BufferedSink, JSONLSink, OverflowPolicy
with BufferedSink(
JSONLSink("events.jsonl"),
capacity=2048,
overflow=OverflowPolicy.BLOCK,
) as sink:
log = AgentLogger(sink=sink)
log.user_message("hello")
A successful write only guarantees queue admission. flush() waits for destination
acceptance and surfaces worker errors; close() drains, flushes, and stops the worker.
DROP_NEWEST increments dropped_events, while RAISE rejects writes immediately
when capacity is exhausted. Audit-sensitive workloads should normally use BLOCK.
Async sinks
AsyncJSONLSink.write() delegates file I/O to a worker thread so it does not block
the event loop. AsyncBufferedSink provides async write, flush, close, and
context-manager methods around a bounded buffered sink: