Core Managers¶
Modules that power DELM beyond the high-level pipeline: data preprocessing, experiment storage, schema coordination, and batched extraction.
Data Processor¶
delm.core.data_processor.DataProcessor ¶
Handles data loading, preprocessing, chunking, and scoring.
load_data ¶
load_data(
data_source: Union[str, Path, DataFrame],
) -> pd.DataFrame
Load data from various sources.
Parameters: |
|
---|
Returns: |
|
---|
Raises: |
|
---|
process_dataframe ¶
process_dataframe(df: DataFrame) -> pd.DataFrame
Apply chunking and scoring to DataFrame.
Parameters: |
|
---|
Returns: |
|
---|
Raises: |
|
---|
Experiment Managers¶
delm.core.experiment_manager.BaseExperimentManager ¶
Bases: ABC
Abstract base class for DELM experiment managers.
cleanup_batch_checkpoints
abstractmethod
¶
cleanup_batch_checkpoints()
Remove all batch checkpoint files after consolidation.
consolidate_batches
abstractmethod
¶
consolidate_batches() -> pd.DataFrame
Consolidate all batch files into a single DataFrame and save as final result.
Returns: |
|
---|
delete_batch_checkpoint
abstractmethod
¶
delete_batch_checkpoint(batch_id: int) -> bool
Delete the batch checkpoint file for a given batch ID.
Parameters: |
|
---|
Returns: |
|
---|
get_all_existing_batch_ids
abstractmethod
¶
get_all_existing_batch_ids() -> set
Get all existing batch IDs.
Returns: |
|
---|
get_batch_checkpoint_path
abstractmethod
¶
get_batch_checkpoint_path(batch_id: int) -> Path
Get the path to the batch checkpoint file for a given batch ID.
Parameters: |
|
---|
Returns: |
|
---|
get_results
abstractmethod
¶
get_results() -> pd.DataFrame
Get the results from the experiment directory.
Returns: |
|
---|
initialize_experiment
abstractmethod
¶
initialize_experiment(delm_config: DELMConfig)
Initialize the experiment.
Parameters: |
|
---|
list_batch_checkpoints
abstractmethod
¶
list_batch_checkpoints() -> List[Path]
List all batch checkpoint files in the processing cache directory.
Returns: |
|
---|
load_batch_checkpoint
abstractmethod
¶
load_batch_checkpoint(batch_path: Path) -> pd.DataFrame
Load a batch checkpoint from a feather file.
Parameters: |
|
---|
Returns: |
|
---|
load_batch_checkpoint_by_id
abstractmethod
¶
load_batch_checkpoint_by_id(batch_id: int) -> pd.DataFrame
Load a batch checkpoint by batch ID.
Parameters: |
|
---|
Returns: |
|
---|
load_preprocessed_data
abstractmethod
¶
load_preprocessed_data(
file_path: Optional[Path] = None,
) -> pd.DataFrame
Load the preprocessed data from the experiment directory.
Parameters: |
|
---|
Returns: |
|
---|
load_state
abstractmethod
¶
load_state() -> Optional[CostTracker]
Load the experiment state from the experiment directory.
Returns: |
|
---|
save_batch_checkpoint
abstractmethod
¶
save_batch_checkpoint(
batch_df: DataFrame, batch_id: int
) -> Path
Save a batch checkpoint to the experiment directory.
Parameters: |
|
---|
Returns: |
|
---|
save_extracted_data
abstractmethod
¶
save_extracted_data(df: DataFrame) -> Path
Save the extracted data to the experiment directory.
Parameters: |
|
---|
Returns: |
|
---|
save_preprocessed_data
abstractmethod
¶
save_preprocessed_data(df: DataFrame) -> Path
Save the preprocessed data to the experiment directory.
Parameters: |
|
---|
Returns: |
|
---|
save_state
abstractmethod
¶
save_state(cost_tracker: CostTracker)
Save the experiment state to the experiment directory.
Parameters: |
|
---|
delm.core.experiment_manager.DiskExperimentManager ¶
Bases: BaseExperimentManager
Handles experiment directories, config/schema validation, batch checkpointing, and state management (disk-based).
cleanup_batch_checkpoints ¶
cleanup_batch_checkpoints()
Remove all batch checkpoint files after consolidation.
consolidate_batches ¶
consolidate_batches() -> pd.DataFrame
Consolidate all batch files into a single DataFrame and save as final result.
Returns: |
|
---|
Raises: |
|
---|
delete_batch_checkpoint ¶
delete_batch_checkpoint(batch_id: int) -> bool
Delete the batch checkpoint file for a given batch ID.
Returns: |
|
---|
get_all_existing_batch_ids ¶
get_all_existing_batch_ids() -> set
Return a set of all batch IDs for which a checkpoint file exists.
get_batch_checkpoint_path ¶
get_batch_checkpoint_path(batch_id: int) -> Path
Return the full path to the batch checkpoint file for a given batch ID.
get_results ¶
get_results() -> pd.DataFrame
Get the consolidated results from the experiment directory.
Returns: |
|
---|
Raises: |
|
---|
initialize_experiment ¶
initialize_experiment(delm_config: DELMConfig)
Validate and create experiment directory structure; write config and schema files.
Raises: |
|
---|
is_experiment_completed ¶
is_experiment_completed() -> bool
Check if the experiment is completed by checking if the consolidated result file exists.
list_batch_checkpoints ¶
list_batch_checkpoints() -> List[Path]
List all batch checkpoint files in the processing cache directory.
load_batch_checkpoint ¶
load_batch_checkpoint(batch_path: Path) -> pd.DataFrame
Load a batch checkpoint from a feather file.
Parameters: |
|
---|
Returns: |
|
---|
Raises: |
|
---|
load_batch_checkpoint_by_id ¶
load_batch_checkpoint_by_id(batch_id: int) -> pd.DataFrame
Load a batch checkpoint by batch ID.
Parameters: |
|
---|
Returns: |
|
---|
load_preprocessed_data ¶
load_preprocessed_data(
file_path: Optional[Path] = None,
) -> pd.DataFrame
Load preprocessed data from feather file.
load_state ¶
load_state() -> Optional[CostTracker]
Load experiment state from state file as JSON. Returns dict or None if not found.
save_batch_checkpoint ¶
save_batch_checkpoint(
batch_df: DataFrame, batch_id: int
) -> Path
Save a batch checkpoint as a feather file.
save_extracted_data ¶
save_extracted_data(df: DataFrame) -> Path
Save extracted data as feather file.
save_preprocessed_data ¶
save_preprocessed_data(df: DataFrame) -> Path
Save preprocessed data as feather file.
save_state ¶
save_state(cost_tracker: CostTracker)
Save experiment state (cost tracker only) to state file as JSON.
verify_resume_config ¶
verify_resume_config(delm_config: DELMConfig)
Compare config/schema in config/ folder to user-supplied DELMConfig. Abort if they differ.
delm.core.experiment_manager.InMemoryExperimentManager ¶
Bases: BaseExperimentManager
Stores all experiment data in memory. Disk-specific features are not supported.
consolidate_batches ¶
consolidate_batches() -> pd.DataFrame
Concatenate all batch DataFrames in memory.
Returns: |
|
---|
Raises: |
|
---|
delete_batch_checkpoint ¶
delete_batch_checkpoint(batch_id: int) -> bool
Delete a batch checkpoint by ID.
Returns: |
|
---|
get_all_existing_batch_ids ¶
get_all_existing_batch_ids() -> set
Return all batch IDs stored in memory.
get_batch_checkpoint_path ¶
get_batch_checkpoint_path(batch_id: int) -> str
Return the synthetic path string for a batch ID.
get_results ¶
get_results() -> pd.DataFrame
Return extracted results held in memory.
Returns: |
|
---|
Raises: |
|
---|
initialize_experiment ¶
initialize_experiment(delm_config: DELMConfig)
Initialize in-memory experiment by storing config and schema dicts.
load_batch_checkpoint ¶
load_batch_checkpoint(batch_path: str) -> pd.DataFrame
Load a batch checkpoint by a synthetic path string.
Parameters: |
|
---|
Returns: |
|
---|
Raises: |
|
---|
load_batch_checkpoint_by_id ¶
load_batch_checkpoint_by_id(batch_id: int) -> pd.DataFrame
Load a batch checkpoint by batch ID.
Parameters: |
|
---|
Returns: |
|
---|
Raises: |
|
---|
save_batch_checkpoint ¶
save_batch_checkpoint(
batch_df: DataFrame, batch_id: int
) -> str
Save a batch checkpoint in memory.
Returns: |
|
---|
save_extracted_data ¶
save_extracted_data(df: DataFrame) -> str
Save extracted data in memory.
Returns: |
|
---|
save_preprocessed_data ¶
save_preprocessed_data(df: DataFrame) -> str
Save preprocessed data in memory.
Returns: |
|
---|
Extraction Manager¶
delm.core.extraction_manager.ExtractionManager ¶
Handles LLM extraction and result parsing.
__init__ ¶
__init__(
model_config: LLMExtractionConfig,
schema_manager: SchemaManager,
cost_tracker: CostTracker,
semantic_cache: SemanticCache,
)
Initialize the ExtractionManager.
Parameters: |
|
---|
parse_results_dataframe ¶
parse_results_dataframe(
results: List[Dict[str, Any]],
text_chunks: List[str],
text_chunk_ids: List[int],
batch_id: int = 0,
) -> pd.DataFrame
Parse extraction results into a DataFrame. Also cleans the results to remove any invalid items according to the schema.
Parameters: |
|
---|
Returns: |
|
---|
process_with_batching ¶
process_with_batching(
text_chunks: List[str],
text_chunk_ids: List[int],
batch_size: int,
experiment_manager: BaseExperimentManager,
auto_checkpoint: bool = True,
) -> pd.DataFrame
Process text chunks with persistent batching and checkpointing.
This method handles the complete extraction pipeline with: - Splitting text chunks into batches - Processing batches with concurrent execution - Saving batch checkpoints for resuming - Consolidating results into final DataFrame
Parameters: |
|
---|
Returns: |
|
---|