Skip to content

Event sinks

JSON Lines

JSONLSink appends one compact JSON object per line and creates missing parent directories:

from agentlogsafe import AgentLogger, JSONLSink

log = AgentLogger(sink=JSONLSink("var/audit/agent-events.jsonl"))

Passing the path directly is equivalent. Writes are synchronous and protected from threads within one process. Coordinate writes externally or use a centralized sink when multiple processes target the same file.

Durability

from agentlogsafe import Durability, JSONLSink

sink = JSONLSink("events.jsonl", durability=Durability.FSYNC)
Mode Successful write means
OS_BUFFERED File handle closed; data may remain in the operating-system cache
FLUSH Python buffers explicitly flushed before close
FSYNC The operating system was asked to commit data to durable storage

Hardware and remote filesystems can provide weaker guarantees than fsync implies.

Memory sink

from agentlogsafe import AgentLogger, MemorySink

sink = MemorySink()
log = AgentLogger(sink=sink)
log.user_message("hello")
assert sink.events[0].event_type == "user_message"

Events are retained until the sink is released or its events list is cleared.

Null sink

NullSink discards events. It is the default when no sink is supplied and can be useful when a feature flag disables persistence while callers still depend on the returned event object.

Custom sinks

Implement one method matching the EventSink protocol:

from agentlogsafe import AgentEvent

class QueueSink:
    def __init__(self, queue):
        self.queue = queue

    def write(self, event: AgentEvent) -> None:
        self.queue.put(event.to_dict())

Sink implementations should define their own retry, buffering, durability, and failure semantics. Raise an exception when an event was not accepted; silently dropping audit events makes completeness impossible to assess.

Buffered sink

BufferedSink decouples producers from a synchronous destination using a bounded queue and one background thread:

from agentlogsafe import BufferedSink, JSONLSink, OverflowPolicy

with BufferedSink(
    JSONLSink("events.jsonl"),
    capacity=2048,
    overflow=OverflowPolicy.BLOCK,
) as sink:
    log = AgentLogger(sink=sink)
    log.user_message("hello")

A successful write only guarantees queue admission. flush() waits for destination acceptance and surfaces worker errors; close() drains, flushes, and stops the worker. DROP_NEWEST increments dropped_events, while RAISE rejects writes immediately when capacity is exhausted. Audit-sensitive workloads should normally use BLOCK.

Async sinks

AsyncJSONLSink.write() delegates file I/O to a worker thread so it does not block the event loop. AsyncBufferedSink provides async write, flush, close, and context-manager methods around a bounded buffered sink:

async with AsyncBufferedSink(JSONLSink("events.jsonl")) as sink:
    await sink.write(event)