Core Managers

Modules that power DELM beyond the high-level pipeline: data preprocessing, experiment storage, schema coordination, and batched extraction.

Data Processor

delm.core.data_processor.DataProcessor

Handles data loading, preprocessing, chunking, and scoring.

load_data

load_data(
    data_source: Union[str, Path, DataFrame],
) -> pd.DataFrame

Load data from various sources.

Parameters:
  • data_source (Union[str, Path, DataFrame]) –

    The data source to load. Can be a path to a file or directory (str or Path), or a DataFrame.

Returns:
  • DataFrame

    A DataFrame containing the loaded data.

Raises:
  • FileNotFoundError

    If the data source path does not exist.

  • ValueError

    If the target column is not found in the data source or if the data source is a directory and contains multiple file types.

process_dataframe

process_dataframe(df: DataFrame) -> pd.DataFrame

Apply chunking and scoring to DataFrame.

Parameters:
  • df (DataFrame) –

    The DataFrame to process.

Returns:
  • DataFrame

    A DataFrame containing the processed data.

Raises:
  • ValueError

    If drop_target_column is True and no splitting strategy is specified.

Experiment Managers

delm.core.experiment_manager.BaseExperimentManager

Bases: ABC

Abstract base class for DELM experiment managers.

cleanup_batch_checkpoints abstractmethod

cleanup_batch_checkpoints()

Remove all batch checkpoint files after consolidation.

consolidate_batches abstractmethod

consolidate_batches() -> pd.DataFrame

Consolidate all batch files into a single DataFrame and save as final result.

Returns:
  • DataFrame

    A DataFrame containing the consolidated data.

delete_batch_checkpoint abstractmethod

delete_batch_checkpoint(batch_id: int) -> bool

Delete the batch checkpoint file for a given batch ID.

Parameters:
  • batch_id (int) –

    The ID of the batch.

Returns:
  • bool

    True if the batch checkpoint file was deleted, False otherwise.

get_all_existing_batch_ids abstractmethod

get_all_existing_batch_ids() -> set

Get all existing batch IDs.

Returns:
  • set

    A set of all existing batch IDs.

get_batch_checkpoint_path abstractmethod

get_batch_checkpoint_path(batch_id: int) -> Path

Get the path to the batch checkpoint file for a given batch ID.

Parameters:
  • batch_id (int) –

    The ID of the batch.

Returns:
  • Path

    The path to the batch checkpoint file.

get_results abstractmethod

get_results() -> pd.DataFrame

Get the results from the experiment directory.

Returns:
  • DataFrame

    A DataFrame containing the results.

initialize_experiment abstractmethod

initialize_experiment(delm_config: DELMConfig)

Initialize the experiment.

Parameters:
  • delm_config (DELMConfig) –

    The DELM configuration.

list_batch_checkpoints abstractmethod

list_batch_checkpoints() -> List[Path]

List all batch checkpoint files in the processing cache directory.

Returns:
  • List[Path]

    A list of paths to the batch checkpoint files.

load_batch_checkpoint abstractmethod

load_batch_checkpoint(batch_path: Path) -> pd.DataFrame

Load a batch checkpoint from a feather file.

Parameters:
  • batch_path (Path) –

    The path to the batch checkpoint file.

Returns:
  • DataFrame

    A DataFrame containing the batch checkpoint data.

load_batch_checkpoint_by_id abstractmethod

load_batch_checkpoint_by_id(batch_id: int) -> pd.DataFrame

Load a batch checkpoint by batch ID.

Parameters:
  • batch_id (int) –

    The ID of the batch.

Returns:
  • DataFrame

    A DataFrame containing the batch checkpoint data.

load_preprocessed_data abstractmethod

load_preprocessed_data(
    file_path: Optional[Path] = None,
) -> pd.DataFrame

Load the preprocessed data from the experiment directory.

Parameters:
  • file_path (Optional[Path], default: None ) –

    Optional explicit path to a feather file; when omitted, use the manager's default preprocessed data path.

Returns:
  • DataFrame

    A DataFrame containing the preprocessed data.

load_state abstractmethod

load_state() -> Optional[CostTracker]

Load the experiment state from the experiment directory.

Returns:
  • Optional[CostTracker]

    The restored cost tracker, or None if not found.

save_batch_checkpoint abstractmethod

save_batch_checkpoint(
    batch_df: DataFrame, batch_id: int
) -> Path

Save a batch checkpoint to the experiment directory.

Parameters:
  • batch_df (DataFrame) –

    The DataFrame to save.

  • batch_id (int) –

    The ID of the batch.

Returns:
  • Path

    The path to the saved data.

save_extracted_data abstractmethod

save_extracted_data(df: DataFrame) -> Path

Save the extracted data to the experiment directory.

Parameters:
  • df (DataFrame) –

    The DataFrame to save.

Returns:
  • Path

    The path to the saved data.

save_preprocessed_data abstractmethod

save_preprocessed_data(df: DataFrame) -> Path

Save the preprocessed data to the experiment directory.

Parameters:
  • df (DataFrame) –

    The DataFrame to save.

Returns:
  • Path

    The path to the saved data.

save_state abstractmethod

save_state(cost_tracker: CostTracker)

Save the experiment state to the experiment directory.

Parameters:
  • cost_tracker (CostTracker) –

    The cost tracker to save.

delm.core.experiment_manager.DiskExperimentManager

Bases: BaseExperimentManager

Handles experiment directories, config/schema validation, batch checkpointing, and state management (disk-based).

cleanup_batch_checkpoints

cleanup_batch_checkpoints()

Remove all batch checkpoint files after consolidation.

consolidate_batches

consolidate_batches() -> pd.DataFrame

Consolidate all batch files into a single DataFrame and save as final result.

Returns:
  • DataFrame

    The concatenated DataFrame across all batch files.

Raises:
  • FileNotFoundError

    If no batch files are present.

delete_batch_checkpoint

delete_batch_checkpoint(batch_id: int) -> bool

Delete the batch checkpoint file for a given batch ID.

Returns:
  • bool

    True if the file was deleted; False if it did not exist.

get_all_existing_batch_ids

get_all_existing_batch_ids() -> set

Return a set of all batch IDs for which a checkpoint file exists.

get_batch_checkpoint_path

get_batch_checkpoint_path(batch_id: int) -> Path

Return the full path to the batch checkpoint file for a given batch ID.

get_results

get_results() -> pd.DataFrame

Get the consolidated results from the experiment directory.

Returns:
  • DataFrame

    A DataFrame containing the results.

Raises:
  • FileNotFoundError

    If the consolidated result file does not exist.

initialize_experiment

initialize_experiment(delm_config: DELMConfig)

Validate and create experiment directory structure; write config and schema files.

Raises:
  • ExperimentManagementError

    If the experiment directory exists and neither overwrite nor checkpoint/resume is allowed.

  • FileNotFoundError

    If attempting to resume without config files present.

  • ValueError

    If resume config or schema mismatches current configuration.

is_experiment_completed

is_experiment_completed() -> bool

Check if the experiment is completed by checking if the consolidated result file exists.

list_batch_checkpoints

list_batch_checkpoints() -> List[Path]

List all batch checkpoint files in the processing cache directory.

load_batch_checkpoint

load_batch_checkpoint(batch_path: Path) -> pd.DataFrame

Load a batch checkpoint from a feather file.

Parameters:
  • batch_path (Path) –

    Path to the batch feather file.

Returns:
  • DataFrame

    The loaded DataFrame.

Raises:
  • FileNotFoundError

    If the file does not exist.

  • ValueError

    If the file extension is not .feather.

load_batch_checkpoint_by_id

load_batch_checkpoint_by_id(batch_id: int) -> pd.DataFrame

Load a batch checkpoint by batch ID.

Parameters:
  • batch_id (int) –

    Batch ID to load.

Returns:
  • DataFrame

    The loaded DataFrame.

load_preprocessed_data

load_preprocessed_data(
    file_path: Optional[Path] = None,
) -> pd.DataFrame

Load preprocessed data from feather file.

load_state

load_state() -> Optional[CostTracker]

Load experiment state from state file as JSON. Returns dict or None if not found.

save_batch_checkpoint

save_batch_checkpoint(
    batch_df: DataFrame, batch_id: int
) -> Path

Save a batch checkpoint as a feather file.

save_extracted_data

save_extracted_data(df: DataFrame) -> Path

Save extracted data as feather file.

save_preprocessed_data

save_preprocessed_data(df: DataFrame) -> Path

Save preprocessed data as feather file.

save_state

save_state(cost_tracker: CostTracker)

Save experiment state (cost tracker only) to state file as JSON.

verify_resume_config

verify_resume_config(delm_config: DELMConfig)

Compare config/schema in config/ folder to user-supplied DELMConfig. Abort if they differ.

delm.core.experiment_manager.InMemoryExperimentManager

Bases: BaseExperimentManager

Stores all experiment data in memory. Disk-specific features are not supported.

cleanup_batch_checkpoints

cleanup_batch_checkpoints()

Remove all batch checkpoints from memory.

consolidate_batches

consolidate_batches() -> pd.DataFrame

Concatenate all batch DataFrames in memory.

Returns:
  • DataFrame

    Concatenated DataFrame across all in-memory batches.

Raises:
  • ValueError

    If no batches have been saved.

delete_batch_checkpoint

delete_batch_checkpoint(batch_id: int) -> bool

Delete a batch checkpoint by ID.

Returns:
  • bool

    True if the checkpoint existed and was removed; False otherwise.

get_all_existing_batch_ids

get_all_existing_batch_ids() -> set

Return all batch IDs stored in memory.

get_batch_checkpoint_path

get_batch_checkpoint_path(batch_id: int) -> str

Return the synthetic path string for a batch ID.

get_results

get_results() -> pd.DataFrame

Return extracted results held in memory.

Returns:
  • DataFrame

    The extracted results DataFrame.

Raises:
  • ValueError

    If results have not been saved.

initialize_experiment

initialize_experiment(delm_config: DELMConfig)

Initialize in-memory experiment by storing config and schema dicts.

list_batch_checkpoints

list_batch_checkpoints() -> list

List all batch checkpoint IDs in memory.

load_batch_checkpoint

load_batch_checkpoint(batch_path: str) -> pd.DataFrame

Load a batch checkpoint by a synthetic path string.

Parameters:
  • batch_path (str) –

    Path string in the form "in-memory-batch-{id}".

Returns:
  • DataFrame

    The stored batch DataFrame.

Raises:
  • ValueError

    If the path is malformed.

load_batch_checkpoint_by_id

load_batch_checkpoint_by_id(batch_id: int) -> pd.DataFrame

Load a batch checkpoint by batch ID.

Parameters:
  • batch_id (int) –

    The batch identifier previously saved.

Returns:
  • DataFrame

    The stored batch DataFrame.

Raises:
  • ValueError

    If the batch is not present in memory.

save_batch_checkpoint

save_batch_checkpoint(
    batch_df: DataFrame, batch_id: int
) -> str

Save a batch checkpoint in memory.

Returns:
  • str

    A synthetic identifier string for the in-memory batch (e.g., "in-memory-batch-3").

save_extracted_data

save_extracted_data(df: DataFrame) -> str

Save extracted data in memory.

Returns:
  • str

    The literal string "in-memory".

save_preprocessed_data

save_preprocessed_data(df: DataFrame) -> str

Save preprocessed data in memory.

Returns:
  • str

    The literal string "in-memory".

save_state

save_state(cost_tracker: CostTracker)

Save the cost tracker in memory.

Extraction Manager

delm.core.extraction_manager.ExtractionManager

Handles LLM extraction and result parsing.

__init__

__init__(
    model_config: LLMExtractionConfig,
    schema_manager: SchemaManager,
    cost_tracker: CostTracker,
    semantic_cache: SemanticCache,
)

Initialize the ExtractionManager.

Parameters:

parse_results_dataframe

parse_results_dataframe(
    results: List[Dict[str, Any]],
    text_chunks: List[str],
    text_chunk_ids: List[int],
    batch_id: int = 0,
) -> pd.DataFrame

Parse extraction results into a DataFrame. Also cleans the results to remove any invalid items according to the schema.

Parameters:
  • results (List[Dict[str, Any]]) –

    The results to parse.

  • text_chunks (List[str]) –

    The text chunks that were used to generate the results.

  • text_chunk_ids (List[int]) –

    The IDs of the text chunks that were used to generate the results.

  • batch_id (int, default: 0 ) –

    The ID of the batch that the results belong to.

Returns:
  • DataFrame

    A DataFrame containing the parsed results.

process_with_batching

process_with_batching(
    text_chunks: List[str],
    text_chunk_ids: List[int],
    batch_size: int,
    experiment_manager: BaseExperimentManager,
    auto_checkpoint: bool = True,
) -> pd.DataFrame

Process text chunks with persistent batching and checkpointing.

This method handles the complete extraction pipeline with: - Splitting text chunks into batches - Processing batches with concurrent execution - Saving batch checkpoints for resuming - Consolidating results into final DataFrame

Parameters:
  • text_chunks (List[str]) –

    The text chunks to process.

  • text_chunk_ids (List[int]) –

    The IDs of the text chunks.

  • batch_size (int) –

    The size of each batch.

  • experiment_manager (BaseExperimentManager) –

    The experiment manager.

  • auto_checkpoint (bool, default: True ) –

    Whether to auto-checkpoint.

Returns:
  • DataFrame

    A DataFrame containing the extracted data.

Schema Manager

delm.schemas.SchemaManager

Manages schema loading and validation.

get_extraction_schema

get_extraction_schema() -> BaseSchema

Get the loaded extraction schema.