Skip to content

๐Ÿ—๏ธ Core Architecture

This document explains how the loglife.core framework handles messages, threading, and concurrency efficiently.


๐Ÿ”„ System Overview

The Framework is built on a Producer-Consumer architecture using Python's queue.Queue. This ensures that the Main Thread (the Web Server) is never blocked by slow operations like processing logic or making network calls.

graph LR A[User] -->|WhatsApp Webhook| B(Main Thread) B -->|Push| C{Receive Queue} C -->|Pop| D[Your Logic Loop] D -->|Push| E{Send Queue} E -->|Pop| F[Sender Thread] F -->|API Call| G[WhatsApp API]

๐Ÿงต Threading Model

The framework automatically manages the critical background threads for you.

Thread Role Efficiency
MainThread Web Server. Receives Webhook & pushes to Receive Queue. Returns 200 OK instantly. โšก High. Non-blocking.
Your Loop Logic. You call core.recv_msg(). This is where your business logic lives. ๐Ÿข Variable. Depends on your code speed.
SenderThread I/O Worker. Pops from Send Queue and calls WhatsApp API. ๐Ÿข Medium. Handles network latency.

Why this matters

Because the SenderThread is separate, your logic loop can queue 10 messages instantly and go back to listening for new inputs, while the Sender Thread handles the slow task of actually delivering them one by one.


๐Ÿ“จ Unified Messaging

The core unifies all inputs (WhatsApp, Emulator, Tests) into a single Message object.

Workflow

  1. Ingestion: Webhook receives JSON $\rightarrow$ Wraps in Message $\rightarrow$ Pushes to Receive Queue.
  2. Consumption: You call recv_msg(), which blocks until a message is available.
  3. Production: You call send_msg(), which wraps your text in a Message $\rightarrow$ Pushes to Send Queue.
  4. Delivery: SenderThread wakes up $\rightarrow$ Pops message $\rightarrow$ Calls external API.

๐Ÿ”Œ API Reference

Chat interface core package (transports, clients, shared protocols).

Message dataclass

Normalized representation of transport messages.

Source code in src/loglife/core/messaging.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@dataclass(slots=True)
class Message:
    """Normalized representation of transport messages."""

    sender: str
    msg_type: str
    raw_payload: str
    client_type: str
    metadata: dict[str, Any] = field(default_factory=dict)
    attachments: dict[str, Any] = field(default_factory=dict)

    @classmethod
    def from_payload(cls, payload: Mapping[str, Any]) -> "Message":
        """Construct a message from a raw transport payload."""
        return cls(
            sender=payload["sender"],
            msg_type=payload["msg_type"],
            raw_payload=payload.get("raw_msg", ""),
            client_type=payload.get("client_type", "unknown"),
            metadata=dict(payload.get("metadata") or {}),
        )

from_payload(payload) classmethod

Construct a message from a raw transport payload.

Source code in src/loglife/core/messaging.py
32
33
34
35
36
37
38
39
40
41
@classmethod
def from_payload(cls, payload: Mapping[str, Any]) -> "Message":
    """Construct a message from a raw transport payload."""
    return cls(
        sender=payload["sender"],
        msg_type=payload["msg_type"],
        raw_payload=payload.get("raw_msg", ""),
        client_type=payload.get("client_type", "unknown"),
        metadata=dict(payload.get("metadata") or {}),
    )

init()

Initialize the core system (DB, Logging, Workers).

Call this at the start of your application.

Source code in src/loglife/core/interface.py
24
25
26
27
28
29
30
31
32
33
def init() -> None:
    """Initialize the core system (DB, Logging, Workers).

    Call this at the start of your application.
    """
    setup_logging()
    init_db()

    # Start the sender worker to handle outbound traffic
    start_sender_worker()

recv_msg(block=True, timeout=None)

Receive the next message from the inbound queue.

Blocks until a message is available unless block=False.

Source code in src/loglife/core/interface.py
36
37
38
39
40
41
def recv_msg(block: bool = True, timeout: float | None = None) -> Message:  # noqa: FBT001, FBT002
    """Receive the next message from the inbound queue.

    Blocks until a message is available unless block=False.
    """
    return _inbound_queue.get(block=block, timeout=timeout)

send_msg(message, to=None)

Send a message to the outbound queue.

Parameters:

Name Type Description Default
message Message | str

A Message object OR a string text.

required
to str | None

The phone number to send to (required if message is a string).

None
Source code in src/loglife/core/interface.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def send_msg(message: Message | str, to: str | None = None) -> None:
    """Send a message to the outbound queue.

    Args:
        message: A Message object OR a string text.
        to: The phone number to send to (required if message is a string).

    """
    if isinstance(message, str):
        if not to:
            msg = "Target 'to' number is required when sending a string."
            raise ValueError(msg)
        queue_async_message(to, message)
    else:
        enqueue_outbound_message(message)