Utilities

Supporting modules that keep DELM reliable and observable.

Concurrency and Retry

delm.utils.concurrent_processing.ConcurrentProcessor

Thin wrapper over ThreadPoolExecutor.

<<<<<<< HEAD Args: max_workers: Number of threads. None (or <= 0) picks a heuristic default min(32, os.cpu_count() + 4). A value of 1 forces sequential execution. ======= Parameters ---------- max_workers : Optional[int], optional Number of threads. None (or <= 0) picks a heuristic default min(32, os.cpu_count() + 4). A value of 1 forces sequential mode.

ad04d3dddfe7e9c168c2221c5933c22d45bd42d1

process_concurrently

process_concurrently(
    items: Sequence[T], fn: Callable[[T], R]
) -> List[R]

Apply fn to each element of items (optionally) in parallel.

Results are returned in the same order as items.

Parameters:
  • items (Sequence[T]) –

    The items to process.

  • fn (Callable[[T], R]) –

    The function to apply to each item.

Returns:
  • List[R]

    A list of results corresponding to each input item.

Raises:
  • Exception

    If a worker raises, the first exception is re‑raised after all futures complete.

delm.utils.retry_handler.RetryHandler

Handle retries with exponential backoff.

execute_with_retry

execute_with_retry(
    func: Callable, *args: Any, **kwargs: Any
) -> Any

Execute function with retry logic.

Parameters:
  • func (Callable) –

    The function to execute.

  • *args (Any, default: () ) –

    Arguments to pass to the function.

  • **kwargs (Any, default: {} ) –

    Keyword arguments to pass to the function.

Returns:
  • Any

    The result of the function execution.

Raises:
  • Exception

    The last exception from the function execution if all attempts fail.

Cost Tracking

delm.utils.cost_tracker.CostTracker

Track tokens and estimate cost for an extraction run.

count_tokens

count_tokens(text: str) -> int

Return token count for a given string using the model tokenizer.

count_tokens_batch

count_tokens_batch(texts: List[str]) -> int

Return total token count for an iterable of strings.

estimate_cost

estimate_cost(
    input_tokens: int, output_tokens: int
) -> float

Estimate dollar cost for given input and output token counts.

from_dict classmethod

from_dict(d: dict) -> CostTracker

Create a tracker from a previously serialized dictionary.

get_cost_summary_dict

get_cost_summary_dict() -> dict[str, Any]

Return a dictionary summary of the current cost state.

get_current_cost

get_current_cost() -> float

Return the current estimated total cost.

is_over_budget

is_over_budget() -> bool

Return True if current estimated cost exceeds max_budget.

print_cost_summary

print_cost_summary() -> None

Print a human‑readable cost summary to stdout.

to_dict

to_dict() -> dict

Serialize the tracker state to a dictionary.

track_input_text

track_input_text(text: str)

Accumulate input tokens for a single text string.

track_output_pydantic

track_output_pydantic(response: Any) -> None

Accumulate output tokens from a Pydantic model response.

track_output_text

track_output_text(text: str)

Accumulate output tokens for a single text string.

delm.utils.cost_estimation

Cost estimation helpers for DELM.

Provides utilities to estimate approximate input token costs without API calls and total extraction costs using a sampled run.

estimate_input_token_cost

estimate_input_token_cost(
    config: Union[str, Dict[str, Any], DELMConfig],
    data_source: Union[str, Path] | DataFrame,
    save_file_log: bool = True,
    log_dir: Union[str, Optional][Path] = Path(
        DEFAULT_LOG_DIR
    )
    / "cost_estimation",
    console_log_level: str = DEFAULT_CONSOLE_LOG_LEVEL,
    file_log_level: str = DEFAULT_FILE_LOG_LEVEL,
) -> float

Estimate input token cost over the entire dataset without API calls.

Parameters:
  • config (Union[str, Dict[str, Any], DELMConfig]) –

    Configuration for the DELM pipeline (config path | dict | DELMConfig).

  • data_source (Union[str, Path] | DataFrame) –

    Source data for extraction (path or DataFrame).

  • save_file_log (bool, default: True ) –

    Whether to write a rotating log file.

  • log_dir (Union[str, Optional][Path], default: Path(DEFAULT_LOG_DIR) / 'cost_estimation' ) –

    Directory for log files when save_file_log is True.

  • console_log_level (str, default: DEFAULT_CONSOLE_LOG_LEVEL ) –

    Log level for console output.

  • file_log_level (str, default: DEFAULT_FILE_LOG_LEVEL ) –

    Log level for file output.

Returns:
  • float

    Estimated dollar cost of input tokens for processing all chunks.

estimate_total_cost

estimate_total_cost(
    config: Union[str, Dict[str, Any], DELMConfig],
    data_source: Union[str, Path] | DataFrame,
    sample_size: int = 10,
    save_file_log: bool = True,
    log_dir: Union[str, Optional][Path] = Path(
        DEFAULT_LOG_DIR
    )
    / "cost_estimation",
    console_log_level: str = DEFAULT_CONSOLE_LOG_LEVEL,
    file_log_level: str = DEFAULT_FILE_LOG_LEVEL,
) -> float

Estimate total cost using API calls on a sample of the data.

Parameters:
  • config (Union[str, Dict[str, Any], DELMConfig]) –

    Configuration for the DELM pipeline (config path | dict | DELMConfig).

  • data_source (Union[str, Path] | DataFrame) –

    Source data for extraction (path or DataFrame).

  • sample_size (int, default: 10 ) –

    Number of records to sample for cost estimation.

  • save_file_log (bool, default: True ) –

    Whether to write a rotating log file.

  • log_dir (Union[str, Optional][Path], default: Path(DEFAULT_LOG_DIR) / 'cost_estimation' ) –

    Directory for log files when save_file_log is True.

  • console_log_level (str, default: DEFAULT_CONSOLE_LOG_LEVEL ) –

    Log level for console output.

  • file_log_level (str, default: DEFAULT_FILE_LOG_LEVEL ) –

    Log level for file output.

Returns:
  • float

    Estimated dollar cost for processing the entire dataset, scaled from the sample.

Semantic Cache

delm.utils.semantic_cache.SemanticCache

Bases: ABC

Minimal interface all cache back‑ends must implement.

get abstractmethod

get(key: str) -> Optional[bytes]

Return raw (compressed) bytes or None if missing.

prune abstractmethod

prune(*, max_size_bytes: int) -> None

Delete oldest entries until on‑disk size ≤ max_size_bytes.

set abstractmethod

set(
    key: str,
    value: bytes,
    meta: Mapping[str, Any] | None = None,
) -> None

Insert value for key (no return). Must be durable when the method returns.

stats abstractmethod

stats() -> Mapping[str, Any]

Return diagnostic info (rows, size_bytes, hit_rate, etc.).

delm.utils.semantic_cache.FilesystemJSONCache

Bases: SemanticCache

Stores each entry in <root>/<first4>/<key>.json.zst.

Pros: zero deps, inspectable. Cons: many inodes, slower for 50k+ rows.

delm.utils.semantic_cache.SQLiteWALCache

Bases: SemanticCache

close

close()

Close all thread-local database connections and clean up zstd objects.

delm.utils.semantic_cache.LMDBCache

Bases: SemanticCache

delm.utils.semantic_cache.SemanticCacheFactory

Create a cache instance from a config mapping (dict or attr‑access).