Utilities¶
Supporting modules that keep DELM reliable and observable.
Concurrency and Retry¶
delm.utils.concurrent_processing.ConcurrentProcessor ¶
Thin wrapper over ThreadPoolExecutor.
<<<<<<< HEAD
Args:
max_workers: Number of threads. None
(or <= 0) picks a heuristic
default min(32, os.cpu_count() + 4)
. A value of 1 forces
sequential execution.
=======
Parameters
----------
max_workers : Optional[int], optional
Number of threads. None
(or <= 0) picks a heuristic default
min(32, os.cpu_count() + 4)
. A value of 1 forces sequential mode.
ad04d3dddfe7e9c168c2221c5933c22d45bd42d1
process_concurrently ¶
process_concurrently(
items: Sequence[T], fn: Callable[[T], R]
) -> List[R]
Apply fn
to each element of items
(optionally) in parallel.
Results are returned in the same order as items
.
Parameters: |
|
---|
Returns: |
|
---|
Raises: |
|
---|
delm.utils.retry_handler.RetryHandler ¶
Handle retries with exponential backoff.
execute_with_retry ¶
execute_with_retry(
func: Callable, *args: Any, **kwargs: Any
) -> Any
Execute function with retry logic.
Parameters: |
|
---|
Returns: |
|
---|
Raises: |
|
---|
Cost Tracking¶
delm.utils.cost_tracker.CostTracker ¶
Track tokens and estimate cost for an extraction run.
count_tokens ¶
count_tokens(text: str) -> int
Return token count for a given string using the model tokenizer.
count_tokens_batch ¶
count_tokens_batch(texts: List[str]) -> int
Return total token count for an iterable of strings.
estimate_cost ¶
estimate_cost(
input_tokens: int, output_tokens: int
) -> float
Estimate dollar cost for given input and output token counts.
from_dict
classmethod
¶
from_dict(d: dict) -> CostTracker
Create a tracker from a previously serialized dictionary.
get_cost_summary_dict ¶
get_cost_summary_dict() -> dict[str, Any]
Return a dictionary summary of the current cost state.
track_output_pydantic ¶
track_output_pydantic(response: Any) -> None
Accumulate output tokens from a Pydantic model response.
delm.utils.cost_estimation ¶
Cost estimation helpers for DELM.
Provides utilities to estimate approximate input token costs without API calls and total extraction costs using a sampled run.
estimate_input_token_cost ¶
estimate_input_token_cost(
config: Union[str, Dict[str, Any], DELMConfig],
data_source: Union[str, Path] | DataFrame,
save_file_log: bool = True,
log_dir: Union[str, Optional][Path] = Path(
DEFAULT_LOG_DIR
)
/ "cost_estimation",
console_log_level: str = DEFAULT_CONSOLE_LOG_LEVEL,
file_log_level: str = DEFAULT_FILE_LOG_LEVEL,
) -> float
Estimate input token cost over the entire dataset without API calls.
Parameters: |
|
---|
Returns: |
|
---|
estimate_total_cost ¶
estimate_total_cost(
config: Union[str, Dict[str, Any], DELMConfig],
data_source: Union[str, Path] | DataFrame,
sample_size: int = 10,
save_file_log: bool = True,
log_dir: Union[str, Optional][Path] = Path(
DEFAULT_LOG_DIR
)
/ "cost_estimation",
console_log_level: str = DEFAULT_CONSOLE_LOG_LEVEL,
file_log_level: str = DEFAULT_FILE_LOG_LEVEL,
) -> float
Estimate total cost using API calls on a sample of the data.
Parameters: |
|
---|
Returns: |
|
---|
Semantic Cache¶
delm.utils.semantic_cache.SemanticCache ¶
Bases: ABC
Minimal interface all cache back‑ends must implement.
get
abstractmethod
¶
get(key: str) -> Optional[bytes]
Return raw (compressed) bytes or None if missing.
prune
abstractmethod
¶
prune(*, max_size_bytes: int) -> None
Delete oldest entries until on‑disk size ≤ max_size_bytes.
set
abstractmethod
¶
set(
key: str,
value: bytes,
meta: Mapping[str, Any] | None = None,
) -> None
Insert value
for key
(no return). Must be durable when the method returns.
stats
abstractmethod
¶
stats() -> Mapping[str, Any]
Return diagnostic info (rows, size_bytes, hit_rate, etc.).
delm.utils.semantic_cache.FilesystemJSONCache ¶
Bases: SemanticCache
Stores each entry in <root>/<first4>/<key>.json.zst
.
Pros: zero deps, inspectable. Cons: many inodes, slower for 50k+ rows.
delm.utils.semantic_cache.SQLiteWALCache ¶
Bases: SemanticCache
delm.utils.semantic_cache.LMDBCache ¶
Bases: SemanticCache
delm.utils.semantic_cache.SemanticCacheFactory ¶
Create a cache instance from a config mapping (dict or attr‑access).