Source code for FairLangProc.datasets.fairness_datasets

"""
Fair-LLM-Benchmark Data Loader

A comprehensive data loading system for bias evaluation datasets in LLM fairness research.
Supports multiple output formats (raw, HuggingFace, PyTorch) and various file types.
"""

import subprocess, re, logging
from pathlib import Path
import tarfile
from typing import Union, List, Dict, Any, Optional, Tuple

# Data handling libraries
import numpy as np
import pandas as pd
from torch.utils.data import Dataset as PtDataset
from datasets import Dataset as HfDataset

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class FairLLMBenchmarkLoader: # pragma: no cover
    """Main class for loading Fair-LLM-Benchmark datasets"""
    
    def __init__(self, benchmark_path: Optional[str] = None):
        """Initialize the loader with benchmark path"""
        self.file_path = Path(__file__).parent.absolute()
        self.benchmark_path = Path(benchmark_path) if benchmark_path else self.file_path / 'Fair-LLM-Benchmark'
        
        # Supported file extensions
        self.extensions = {
            'tsv': self._read_tsv,
            'csv': self._read_csv,
            'json': self._read_json,
            'jsonl': self._read_jsonl,
            'txt': self._read_txt,
            'tgz': self._read_tgz,
            'zip': self._read_zip,
            'py': self._run_python,
            'default': self._read_default
        }
        
        # Dataset categories
        self.unavailable_datasets = {'Equity-Evaluation-Corpus', 'RealToxicityPrompts'}
        self.python_datasets = {'HolisticBias', 'Bias-NLI', 'TrustGPT'}
        self.config_required = {
            'BBQ', 'BEC-Pro', 'BOLD', 'BUG', 'HolisticBias', 'StereoSet', 'WinoBias'
        }
        
        # Available configurations
        self.configs = {
            'BBQ': ['Age', 'Disability_Status', 'Gender_identity', 'Nationality', 
                   'Physical_appearance', 'Race_ethnicity', 'Race_x_gender', 
                   'Race_x_SES', 'Religion', 'SES', 'Sexual_orientation', 'all'],
            'BEC-Pro': ['english', 'german', 'all'],
            'BOLD': ['prompts', 'wikipedia', 'all'],
            'BUG': ['balanced', 'full', 'gold', 'all'],
            'HolisticBias': ['noun_phrases', 'sentences', 'all'],
            'StereoSet': ['word', 'sentence', 'all'],
            'WinoBias': ['pairs', 'gender_words', 'WinoBias']
        }
        
        # Initialize handlers
        self._init_handlers()
    
    def _init_handlers(self):
        """Initialize dataset-specific handlers"""
        self.handlers = {
            'BBQ': self._handle_bbq,
            'BEC-Pro': self._handle_bec_pro,
            'Bias-NLI': self._handle_bias_nli,
            'BOLD': self._handle_bold,
            'BUG': self._handle_bug,
            'CrowS-Pairs': self._handle_crows_pairs,
            'GAP': self._handle_gap,
            'Grep-BiasIR': self._handle_grep_biasir,
            'HolisticBias': self._handle_holistic_bias,
            'HONEST': self._handle_honest,
            'PANDA': self._handle_panda,
            'RealToxicityPrompts': self._handle_real_toxicity_prompts,
            'RedditBias': self._handle_reddit_bias,
            'StereoSet': self._handle_stereoset,
            'TrustGPT': self._handle_trustgpt,
            'UnQover': self._handle_unqover,
            'WinoBias': self._handle_winobias,
            'WinoBias+': self._handle_winobias_plus,
            'Winogender': self._handle_winogender,
            'WinoQueer': self._handle_winoqueer
        }
    
    # File readers
    def _read_csv(self, path: Path) -> pd.DataFrame:
        """Read CSV file"""
        return pd.read_csv(path)
    
    def _read_tsv(self, path: Path) -> pd.DataFrame:
        """Read TSV file"""
        return pd.read_csv(path, sep='\t')
    
    def _read_json(self, path: Path) -> pd.DataFrame:
        """Read JSON file"""
        return pd.read_json(path)
    
    def _read_jsonl(self, path: Path) -> pd.DataFrame:
        """Read JSONL file"""
        return pd.read_json(path, lines=True)
    
    def _read_txt(self, path: Path) -> str:
        """Read text file"""
        with open(path, 'r', encoding='utf-8') as file:
            return file.read()
    
    def _read_zip(self, path: Path) -> pd.DataFrame:
        """Read ZIP compressed CSV"""
        return pd.read_csv(path, compression='zip')
    
    def _read_tgz(self, path: Path) -> Optional[Dict[str, Any]]:
        """Read TGZ compressed file - Complete implementation"""
        try:
            data_dict = {}
            
            with tarfile.open(path, 'r:gz') as tar:
                # Extract to temporary directory
                temp_dir = path.parent / f'temp_{path.stem}'
                temp_dir.mkdir(exist_ok=True)
                
                try:
                    tar.extractall(temp_dir)
                    
                    # Read extracted files
                    for item in temp_dir.rglob('*'):
                        if item.is_file():
                            # Get relative path for key
                            rel_path = item.relative_to(temp_dir)
                            key = str(rel_path).replace('/', '_').replace('\\', '_')
                            
                            # Read the file
                            data_dict[key] = self._read_file(item)
                    
                    # Clean up temp directory
                    import shutil
                    shutil.rmtree(temp_dir)
                    
                except Exception as e:
                    # Clean up temp directory on error
                    import shutil
                    if temp_dir.exists():
                        shutil.rmtree(temp_dir)
                    raise e
            
            return data_dict
            
        except Exception as e:
            logger.error(f"Error reading TGZ file {path}: {e}")
            return None
    
    def _run_python(self, path: Path, *args) -> None:
        """Execute Python file with arguments"""
        program = path.name
        program_folder = path.parent
        
        # Create files folder if it doesn't exist
        files_dir = program_folder / 'files'
        files_dir.mkdir(exist_ok=True)
        
        # Run the command
        process = ['python', program] + list(args)
        try:
            subprocess.run(process, cwd=program_folder, check=True)
        except subprocess.CalledProcessError as e:
            logger.error(f"Error running {program}: {e}")
    
    def _read_default(self, path: Path) -> str:
        """Generic data reader"""
        with open(path, 'r', encoding='utf-8') as file:
            return file.read()
    
    def _read_file(self, path: Path, *args) -> Union[pd.DataFrame, dict, str, None]:
        """Use appropriate reader based on file extension"""
        extension = path.suffix.lstrip('.')
        
        if extension not in self.extensions:
            extension = 'default'
        
        try:
            return self.extensions[extension](path, *args)
        except Exception as e:
            logger.error(f"Error reading {path}: {e}")
            return None
    
    def _read_folder(self, folder_path: Path) -> Dict[str, Any]:
        """Recursively read folder contents"""
        files = {}
        
        if not folder_path.exists():
            logger.warning(f"Folder not found: {folder_path}")
            return files
        
        for item in folder_path.iterdir():
            if item.name == '__init__.py' or item.name == '__pycache__' or item.suffix == '.pyc':
                continue
            if item.is_dir():
                files[item.name] = self._read_folder(item)
            else:
                files[item.name] = self._read_file(item)
        
        return files
    
    def get_datasets(self) -> List[str]:
        """Get list of available datasets"""
        if not self.benchmark_path.exists():
            logger.warning(f"Benchmark path not found: {self.benchmark_path}")
            return []
        
        datasets = [
            item.name for item in self.benchmark_path.iterdir()
            if item.is_dir() and 'git' not in item.name.lower()
        ]
        return sorted(datasets)
    
    def _get_dataset_path(self, name: str) -> Path:
        """Get dataset data path"""
        return self.benchmark_path / name / 'data'
    
    # Dataset handlers
    def _handle_bbq(self, config: str = '') -> Union[pd.DataFrame, Dict[str, Any], None]:
        """Handle BBQ dataset"""
        path = self._get_dataset_path('BBQ')
        
        if config.lower() in ('', 'h', 'help'):
            print('Available BBQ datasets:')
            if path.exists():
                for item in path.iterdir():
                    if item.suffix == '.jsonl':
                        print(item.stem)
                    else:
                        print(item.name)
            print('all')
            return None
        
        if config.lower() == 'all':
            return self._read_folder(path)
        
        if 'template' in config:
            file_path = path / 'templates' / f'{config}.csv'
            return self._read_csv(file_path) if file_path.exists() else None
        
        file_path = path / f'{config}.jsonl'
        return self._read_jsonl(file_path) if file_path.exists() else None
    
    def _handle_bec_pro(self, config: str = '') -> Union[pd.DataFrame, Dict[str, pd.DataFrame], None]:
        """Handle BEC-Pro dataset"""
        path = self._get_dataset_path('BEC-Pro')
        
        files = {
            'english': path / 'BEC-Pro_EN.tsv',
            'german': path / 'BEC-Pro_DE.tsv'
        }
        
        if config == 'all':
            return {lang: self._read_tsv(file_path) 
                   for lang, file_path in files.items() 
                   if file_path.exists()}
        
        if config in files and files[config].exists():
            return self._read_tsv(files[config])
        
        print('Available options: english, german, all')
        return None
    
    def _handle_bias_nli(self, config: str = '') -> Optional[Dict[str, Any]]:
        """Handle Bias-NLI dataset - requires Python execution"""
        path = self._get_dataset_path('Bias-NLI')
        
        if config.lower() in ('', 'h', 'help'):
            print('Bias-NLI dataset configurations:')
            print('  process - Run the processing script')
            print('  load - Load processed data')
            print('  all - Process and load data')
            return None
        
        # Check if we need to run the Python processing script
        if config in ('process', 'all'):
            python_files = [f for f in path.glob('*.py') if f.name != '__init__.py']
            if python_files:
                try:
                    logger.info("Running Bias-NLI processing script...")
                    self._run_python(python_files[0])
                    logger.info("Bias-NLI processing completed")
                except Exception as e:
                    logger.error(f"Error running Bias-NLI processing: {e}")
                    return None
            else:
                logger.warning("No Python processing script found for Bias-NLI")
        
        # Load processed data
        if config in ('load', 'all', ''):
            processed_path = path / 'processed'
            if processed_path.exists():
                return self._read_folder(processed_path)
            else:
                # Try to load from main data folder
                return self._read_folder(path)
        
        return None
    
    def _handle_bold(self, config: str = '') -> Union[pd.DataFrame, Dict[str, Any], None]:
        """Handle BOLD dataset"""
        path = self._get_dataset_path('BOLD')
        
        if config.lower() in ('', 'h', 'help'):
            print('Available BOLD datasets:')
            prompts_path = path / 'prompts'
            if prompts_path.exists():
                for item in prompts_path.iterdir():
                    if item.name.startswith('.') or item.name == '__pycache__' or item.suffix in ['.py', '.pyc']:
                        continue
                    if item.suffix == '.json':
                        print(item.stem)
            print('all, prompts, wikipedia')
            return None
        
        if config.lower() == 'all':
            return self._read_folder(path)
        
        if config == 'prompts':
            return self._read_folder(path / 'prompts')
        
        if config == 'wikipedia':
            return self._read_folder(path / 'wikipedia')
        
        # Try prompts folder first
        for folder in ['prompts', 'wikipedia']:
            file_path = path / folder / f'{config}.json'
            if file_path.exists():
                return self._read_json(file_path)
        
        # Try root folder
        file_path = path / f'{config}.csv'
        return self._read_csv(file_path) if file_path.exists() else None
    
    def _handle_bug(self, config: str = '') -> Union[pd.DataFrame, Dict[str, Any], None]:
        """Handle BUG dataset"""
        path = self._get_dataset_path('BUG')
        
        if config.lower() in ('', 'h', 'help'):
            print('Available BUG datasets:')
            if path.exists():
                for item in path.iterdir():
                    if item.name.startswith('.') or item.name == '__pycache__' or item.suffix in ['.py', '.pyc']:
                        continue
                    print(item.name)
            print('all')
            return None
        
        if config.lower() == 'all':
            return self._read_folder(path)
        
        # Try different file patterns
        patterns = [f'{config}_BUG.csv', f'{config}.csv', f'BUG_{config}.csv']
        
        for pattern in patterns:
            file_path = path / pattern
            if file_path.exists():
                return self._read_csv(file_path)
        
        return None
    
    def _handle_crows_pairs(self, config: str = '') -> Optional[pd.DataFrame]:
        """Handle CrowS-Pairs dataset"""
        file_path = self._get_dataset_path('CrowS-Pairs') / 'crows_pairs_anonymized.csv'
        return self._read_csv(file_path) if file_path.exists() else None
    
    def _handle_gap(self, config: str = '') -> Optional[Dict[str, Any]]:
        """Handle GAP dataset"""
        return self._read_folder(self._get_dataset_path('GAP'))
    
    def _handle_holistic_bias(self, config: str = '') -> Union[pd.DataFrame, Dict[str, Any], None]:
        """Handle HolisticBias dataset"""
        path = self._get_dataset_path('HolisticBias') / 'files'
        
        if config == 'all':
            return self._read_folder(path)
        
        file_mapping = {
            'sentences': 'sentences.csv',
            'phrases': 'noun_phrases.csv',
            'noun_phrases': 'noun_phrases.csv'
        }
        
        if config in file_mapping:
            file_path = path / file_mapping[config]
            return self._read_csv(file_path) if file_path.exists() else None
        
        print('Available datasets: noun_phrases, sentences, all')
        return None
    
    def _handle_stereoset(self, config: str = '') -> Union[Dict[str, pd.DataFrame], None]:
        """Handle StereoSet dataset"""
        path = self._get_dataset_path('StereoSet')
        
        config_mapping = {
            'word': [1],
            'sentence': [0],
            'all': [0, 1]
        }
        
        if config not in config_mapping:
            print('Available options: word, sentence, all')
            return None
        
        rows = config_mapping[config]
        dataframes = {}
        row_dict = {0: 'sentence', 1: 'word'}
        
        for dataset in ['test', 'dev']:
            file_path = path / f'{dataset}.json'
            if not file_path.exists():
                continue
            
            raw_data = self._read_json(file_path)
            
            for row in rows:
                if 'data' not in raw_data.columns or len(raw_data) <= row:
                    continue
                
                target, bias_type, context, labels, options = [], [], [], [], []
                
                item = raw_data.iloc[row]['data']
                for item2 in item:
                    sentences = []
                    label = []
                    
                    for item3 in item2['sentences']:
                        label.append(item3['gold_label'])
                        sentences.append(item3['sentence'])
                    
                    labels.append('[' + '/'.join(label) + ']')
                    options.append('[' + '/'.join(sentences) + ']')
                    target.append(item2['target'])
                    bias_type.append(item2['bias_type'])
                    context.append(item2['context'])
                
                dataframes[f'{dataset}_{row_dict[row]}'] = pd.DataFrame({
                    'options': options,
                    'context': context,
                    'target': target,
                    'bias_type': bias_type,
                    'labels': labels
                })
        
        return dataframes
    
    def _handle_winobias(self, config: str = '') -> Union[List, Dict, None]:
        """Handle WinoBias dataset"""
        files = self._read_folder(self._get_dataset_path('WinoBias'))
        
        if not files:
            return None
        
        if config in ('h', 'help'):
            print('Available datasets: pairs, gender_words, WinoBias')
            return None
        
        if config == 'pairs':
            pairs = []
            for file_name in ['generalized_swaps.txt', 'extra_gendered_words.txt']:
                if file_name in files and files[file_name]:
                    pairs.extend([
                        tuple(word.strip() for word in pair.split('\t'))
                        for pair in files[file_name].split('\n')
                        if pair.strip() and '\t' in pair
                    ])
            return pairs
        
        if 'gender' in config:
            lists = {'male': [], 'female': []}
            
            # Load occupation lists
            if 'male_occupations.txt' in files:
                lists['male'] = [line.strip() for line in files['male_occupations.txt'].split('\n') if line.strip()]
            if 'female_occupations.txt' in files:
                lists['female'] = [line.strip() for line in files['female_occupations.txt'].split('\n') if line.strip()]
            
            # Load gendered words
            gendered_words_path = self.file_path / 'GenderSwaps' / 'gendered_words_unidirectional.txt'
            if gendered_words_path.exists():
                gendered_words = self._read_txt(gendered_words_path)
                for pair in gendered_words.split('\n'):
                    if '\t' in pair:
                        male_word, female_word = pair.split('\t')
                        lists['male'].append(male_word.strip())
                        lists['female'].append(female_word.strip())
            
            return lists
        
        # Process WinoBias files
        prefixes = ['anti', 'pro']
        numbers = ['1', '2']
        set_types = ['dev', 'test']
        
        dataframes = {}
        
        for prefix in prefixes:
            for number in numbers:
                for set_type in set_types:
                    file_name = f'{prefix}_stereotyped_type{number}.txt.{set_type}'
                    
                    if file_name not in files or not files[file_name]:
                        continue
                    
                    sentences = []
                    entities = []
                    pronouns = []
                    
                    for line in files[file_name].split('\n'):
                        if not line.strip():
                            continue
                        
                        # Remove line numbers
                        sentence = ' '.join(line.split()[1:])
                        
                        # Extract entities and pronouns in brackets
                        matches = re.findall(r'\[(.*?)\]', sentence)
                        
                        if len(matches) >= 2:
                            entities.append(matches[0])
                            pronouns.append(matches[1])
                        else:
                            entities.append('')
                            pronouns.append('')
                        
                        # Clean sentence
                        clean_sentence = sentence.replace('[', '').replace(']', '')
                        sentences.append(clean_sentence)
                    
                    dataframes[file_name] = pd.DataFrame({
                        'sentence': sentences,
                        'entity': entities,
                        'pronoun': pronouns
                    })
        
        return dataframes
    
    def _handle_winobias_plus(self, config: str = '') -> Optional[pd.DataFrame]:
        """Handle WinoBias+ dataset"""
        files = self._read_folder(self._get_dataset_path('WinoBias+'))
        
        if not files:
            return None
        
        gendered_data = files.get('WinoBias+.preprocessed', '')
        neutral_data = files.get('WinoBias+.references', '')
        
        gendered_lines = [line.strip() for line in gendered_data.split('\n') if line.strip()] if gendered_data else []
        neutral_lines = [line.strip() for line in neutral_data.split('\n') if line.strip()] if neutral_data else []
        
        # Ensure both lists have the same length
        max_len = max(len(gendered_lines), len(neutral_lines))
        gendered_lines.extend([''] * (max_len - len(gendered_lines)))
        neutral_lines.extend([''] * (max_len - len(neutral_lines)))
        
        return pd.DataFrame({
            'gendered': gendered_lines,
            'neutral': neutral_lines
        })
    
    def _handle_winogender(self, config: str = '') -> Optional[pd.DataFrame]:
        """Handle Winogender dataset"""
        file_path = self._get_dataset_path('Winogender') / 'all_sentences.tsv'
        return self._read_tsv(file_path) if file_path.exists() else None
    
    def _handle_grep_biasir(self, config: str = '') -> Optional[Dict[str, Any]]:
        """Handle Grep-BiasIR dataset"""
        path = self._get_dataset_path('Grep-BiasIR')
        
        if config.lower() in ('', 'h', 'help'):
            print('Grep-BiasIR dataset configurations:')
            print('  queries - Load search queries')
            print('  documents - Load document collection')
            print('  relevance - Load relevance judgments')
            print('  all - Load all data')
            return None
        
        if not path.exists():
            logger.error(f"Grep-BiasIR dataset path not found: {path}")
            return None
        
        data_dict = {}
        
        if config == 'queries' or config == 'all':
            queries_file = path / 'queries.tsv'
            if queries_file.exists():
                data_dict['queries'] = self._read_tsv(queries_file)
            else:
                # Try alternative file names
                for alt_name in ['queries.csv', 'query.tsv', 'query.csv']:
                    alt_file = path / alt_name
                    if alt_file.exists():
                        data_dict['queries'] = self._read_file(alt_file)
                        break
        
        if config == 'documents' or config == 'all':
            docs_file = path / 'documents.tsv'
            if docs_file.exists():
                data_dict['documents'] = self._read_tsv(docs_file)
            else:
                # Try alternative file names
                for alt_name in ['documents.csv', 'docs.tsv', 'docs.csv', 'collection.tsv']:
                    alt_file = path / alt_name
                    if alt_file.exists():
                        data_dict['documents'] = self._read_file(alt_file)
                        break
        
        if config == 'relevance' or config == 'all':
            rel_file = path / 'relevance.tsv'
            if rel_file.exists():
                data_dict['relevance'] = self._read_tsv(rel_file)
            else:
                # Try alternative file names
                for alt_name in ['relevance.csv', 'qrels.tsv', 'qrels.csv', 'judgments.tsv']:
                    alt_file = path / alt_name
                    if alt_file.exists():
                        data_dict['relevance'] = self._read_file(alt_file)
                        break
        
        if config == 'all':
            # Also load any other files in the directory
            for item in path.iterdir():
                if item.name.startswith('.') or item.name == '__pycache__' or item.suffix in ['.py', '.pyc']:
                    continue
                if item.is_file() and item.name not in [f['name'] for f in data_dict.values() if isinstance(f, dict) and 'name' in f]:
                    data_dict[item.stem] = self._read_file(item)
        
        return data_dict if data_dict else None

    def _handle_honest(self, config: str = '') -> Optional[Dict[str, Any]]:
        """Handle HONEST dataset"""
        path = self._get_dataset_path('HONEST')
        
        if config.lower() in ('', 'h', 'help'):
            print('HONEST dataset configurations:')
            print('  templates - Load template sentences')
            print('  completions - Load model completions')
            print('  annotations - Load human annotations')
            print('  all - Load all data')
            return None
        
        if not path.exists():
            logger.error(f"HONEST dataset path not found: {path}")
            return None
        
        data_dict = {}
        
        # Load templates
        if config == 'templates' or config == 'all':
            templates_file = path / 'templates.csv'
            if templates_file.exists():
                data_dict['templates'] = self._read_csv(templates_file)
            else:
                # Try finding template files
                template_files = list(path.glob('*template*'))
                if template_files:
                    data_dict['templates'] = self._read_file(template_files[0])
        
        # Load completions
        if config == 'completions' or config == 'all':
            completions_file = path / 'completions.jsonl'
            if completions_file.exists():
                data_dict['completions'] = self._read_jsonl(completions_file)
            else:
                # Try alternative names
                for alt_name in ['completions.json', 'responses.jsonl', 'responses.json']:
                    alt_file = path / alt_name
                    if alt_file.exists():
                        data_dict['completions'] = self._read_file(alt_file)
                        break
        
        # Load annotations
        if config == 'annotations' or config == 'all':
            annotations_file = path / 'annotations.csv'
            if annotations_file.exists():
                data_dict['annotations'] = self._read_csv(annotations_file)
            else:
                # Try alternative names
                for alt_name in ['annotations.tsv', 'labels.csv', 'labels.tsv']:
                    alt_file = path / alt_name
                    if alt_file.exists():
                        data_dict['annotations'] = self._read_file(alt_file)
                        break
        
        # Load all other files if 'all' is specified
        if config == 'all':
            for item in path.iterdir():
                if item.name.startswith('.') or item.name == '__pycache__' or item.suffix in ['.py', '.pyc']:
                    continue
                if item.is_file() and item.stem not in data_dict:
                    data_dict[item.stem] = self._read_file(item)
        
        return data_dict if data_dict else None

    def _handle_panda(self, config: str = '') -> Optional[Dict[str, Any]]:
        """Handle PANDA dataset"""
        path = self._get_dataset_path('PANDA')
        
        if config.lower() in ('', 'h', 'help'):
            print('PANDA dataset configurations:')
            print('  train - Load training data')
            print('  test - Load test data')
            print('  dev - Load development/validation data')
            print('  all - Load all splits')
            return None
        
        if not path.exists():
            logger.error(f"PANDA dataset path not found: {path}")
            return None
        
        data_dict = {}
        splits = ['train', 'test', 'dev', 'validation']
        
        if config == 'all':
            target_splits = splits
        elif config in splits:
            target_splits = [config]
        else:
            target_splits = splits
        
        for split in target_splits:
            # Try different file formats and naming conventions
            possible_files = [
                f'{split}.jsonl',
                f'{split}.json',
                f'{split}.csv',
                f'{split}.tsv',
                f'panda_{split}.jsonl',
                f'panda_{split}.json',
                f'panda_{split}.csv',
                f'panda_{split}.tsv'
            ]
            
            for filename in possible_files:
                file_path = path / filename
                if file_path.exists():
                    data_dict[split] = self._read_file(file_path)
                    break
        
        # If no specific splits found, try to load all files
        if not data_dict:
            for item in path.iterdir():
                if item.name.startswith('.') or item.name == '__pycache__' or item.suffix in ['.py', '.pyc']:
                    continue
                if item.is_file():
                    data_dict[item.stem] = self._read_file(item)
        
        return data_dict if data_dict else None

    def _handle_reddit_bias(self, config: str = '') -> Optional[Dict[str, Any]]:
        """Handle RedditBias dataset"""
        path = self._get_dataset_path('RedditBias')
        
        if config.lower() in ('', 'h', 'help'):
            print('RedditBias dataset configurations:')
            print('  posts - Load Reddit posts')
            print('  comments - Load Reddit comments')
            print('  annotations - Load bias annotations')
            print('  all - Load all data')
            return None
        
        if not path.exists():
            logger.error(f"RedditBias dataset path not found: {path}")
            return None
        
        data_dict = {}
        
        # Load posts
        if config == 'posts' or config == 'all':
            posts_files = list(path.glob('*post*')) + list(path.glob('*submission*'))
            if posts_files:
                data_dict['posts'] = self._read_file(posts_files[0])
        
        # Load comments
        if config == 'comments' or config == 'all':
            comments_files = list(path.glob('*comment*'))
            if comments_files:
                data_dict['comments'] = self._read_file(comments_files[0])
        
        # Load annotations
        if config == 'annotations' or config == 'all':
            annotation_files = list(path.glob('*annotation*')) + list(path.glob('*label*'))
            if annotation_files:
                data_dict['annotations'] = self._read_file(annotation_files[0])
        
        # Load all files if 'all' specified or no specific files found
        if config == 'all' or not data_dict:
            for item in path.iterdir():
                if item.name.startswith('.') or item.name == '__pycache__' or item.suffix in ['.py', '.pyc']:
                    continue
                if item.is_file() and item.stem not in data_dict:
                    data_dict[item.stem] = self._read_file(item)
        
        return data_dict if data_dict else None

    def _handle_trustgpt(self, config: str = '') -> Optional[Dict[str, Any]]:
        """Handle TrustGPT dataset - requires Python execution"""
        path = self._get_dataset_path('TrustGPT')
        
        if config.lower() in ('', 'h', 'help'):
            print('TrustGPT dataset configurations:')
            print('  process - Run the processing script')
            print('  load - Load processed data')
            print('  all - Process and load data')
            print('  benchmarks - Load specific benchmark data')
            return None
        
        # Check if we need to run the Python processing script
        if config in ('process', 'all'):
            python_files = [f for f in path.glob('*.py') if f.name != '__init__.py']
            if python_files:
                try:
                    logger.info("Running TrustGPT processing script...")
                    self._run_python(python_files[0])
                    logger.info("TrustGPT processing completed")
                except Exception as e:
                    logger.error(f"Error running TrustGPT processing: {e}")
                    return None
            else:
                logger.warning("No Python processing script found for TrustGPT")
        
        # Load processed data
        if config in ('load', 'all', 'benchmarks', ''):
            data_dict = {}
            
            # Look for processed data folder
            processed_path = path / 'processed'
            if processed_path.exists():
                data_dict.update(self._read_folder(processed_path))
            
            # Look for benchmark data
            benchmarks_path = path / 'benchmarks'
            if benchmarks_path.exists():
                data_dict['benchmarks'] = self._read_folder(benchmarks_path)
            
            # Load any other data files
            for item in path.iterdir():
                if item.name.startswith('.') or item.name == '__pycache__' or item.suffix in ['.py', '.pyc']:
                    continue
                if item.is_file() and item.suffix in ['.json', '.jsonl', '.csv', '.tsv']:
                    data_dict[item.stem] = self._read_file(item)
            
            return data_dict if data_dict else None
        
        return None

    def _handle_unqover(self, config: str = '') -> Optional[Dict[str, Any]]:
        """Handle UnQover dataset"""
        path = self._get_dataset_path('UnQover')
        
        if config.lower() in ('', 'h', 'help'):
            print('UnQover dataset configurations:')
            print('  questions - Load questions')
            print('  answers - Load answers')
            print('  annotations - Load coverage annotations')
            print('  all - Load all data')
            return None
        
        if not path.exists():
            logger.error(f"UnQover dataset path not found: {path}")
            return None
        
        data_dict = {}
        
        # Load questions
        if config == 'questions' or config == 'all':
            question_files = list(path.glob('*question*')) + list(path.glob('*queries*'))
            if question_files:
                data_dict['questions'] = self._read_file(question_files[0])
        
        # Load answers
        if config == 'answers' or config == 'all':
            answer_files = list(path.glob('*answer*')) + list(path.glob('*response*'))
            if answer_files:
                data_dict['answers'] = self._read_file(answer_files[0])
        
        # Load annotations
        if config == 'annotations' or config == 'all':
            annotation_files = (list(path.glob('*annotation*')) + 
                            list(path.glob('*coverage*')) + 
                            list(path.glob('*label*')))
            if annotation_files:
                data_dict['annotations'] = self._read_file(annotation_files[0])
        
        # Load all files if 'all' specified or no specific files found
        if config == 'all' or not data_dict:
            for item in path.iterdir():
                if item.name.startswith('.') or item.name == '__pycache__' or item.suffix in ['.py', '.pyc']:
                    continue
                if item.is_file() and item.stem not in data_dict:
                    data_dict[item.stem] = self._read_file(item)
        
        return data_dict if data_dict else None

    def _handle_winoqueer(self, config: str = '') -> Optional[Dict[str, Any]]:
        """Handle WinoQueer dataset"""
        path = self._get_dataset_path('WinoQueer')
        
        if config.lower() in ('', 'h', 'help'):
            print('WinoQueer dataset configurations:')
            print('  sentences - Load WinoQueer sentences')
            print('  templates - Load sentence templates')
            print('  annotations - Load annotations')
            print('  all - Load all data')
            return None
        
        if not path.exists():
            logger.error(f"WinoQueer dataset path not found: {path}")
            return None
        
        data_dict = {}
        
        # Load sentences
        if config == 'sentences' or config == 'all':
            sentence_files = (list(path.glob('*sentence*')) + 
                            list(path.glob('*winoqueer*')) +
                            list(path.glob('*wq*')))
            if sentence_files:
                data_dict['sentences'] = self._read_file(sentence_files[0])
        
        # Load templates
        if config == 'templates' or config == 'all':
            template_files = list(path.glob('*template*'))
            if template_files:
                data_dict['templates'] = self._read_file(template_files[0])
        
        # Load annotations
        if config == 'annotations' or config == 'all':
            annotation_files = list(path.glob('*annotation*')) + list(path.glob('*label*'))
            if annotation_files:
                data_dict['annotations'] = self._read_file(annotation_files[0])
        
        # Load all files if 'all' specified or no specific files found
        if config == 'all' or not data_dict:
            for item in path.iterdir():
                if item.name.startswith('.') or item.name == '__pycache__' or item.suffix in ['.py', '.pyc']:
                    continue
                if item.is_file() and item.stem not in data_dict:
                    data_dict[item.stem] = self._read_file(item)
        
        return data_dict if data_dict else None
    
    def _handle_real_toxicity_prompts(self, config: str = '') -> Optional[Dict[str, Any]]:
        """Handle RealToxicityPrompts dataset"""
        path = self._get_dataset_path('RealToxicityPrompts')
        
        if config.lower() in ('', 'h', 'help'):
            print('RealToxicityPrompts dataset configurations:')
            print('  prompts - Load toxic prompts')
            print('  generations - Load model generations')
            print('  annotations - Load toxicity annotations')
            print('  train - Load training split')
            print('  test - Load test split')
            print('  dev - Load development split')
            print('  all - Load all available data')
            print('\nNote: This dataset is not included by default due to size.')
            print('Download from: https://allenai.org/data/real-toxicity-prompts')
            return None
        
        if not path.exists():
            print('RealToxicityPrompts dataset not found locally.')
            print('This dataset is not included due to size constraints.')
            print('Please download from: https://allenai.org/data/real-toxicity-prompts')
            print(f'Expected location: {path}')
            return None
        
        data_dict = {}
        
        try:
            # Load prompts
            if config == 'prompts' or config == 'all':
                prompts_files = (list(path.glob('*prompt*')) + 
                            list(path.glob('*input*')) +
                            list(path.glob('prompts.*')))
                
                for prompts_file in prompts_files:
                    if prompts_file.exists():
                        key = f'prompts_{prompts_file.stem}' if len(prompts_files) > 1 else 'prompts'
                        data_dict[key] = self._read_file(prompts_file)
                        logger.info(f"Loaded prompts from {prompts_file.name}")
                        break
            
            # Load generations
            if config == 'generations' or config == 'all':
                generation_files = (list(path.glob('*generation*')) + 
                                list(path.glob('*output*')) +
                                list(path.glob('*completion*')) +
                                list(path.glob('generations.*')))
                
                for gen_file in generation_files:
                    if gen_file.exists():
                        key = f'generations_{gen_file.stem}' if len(generation_files) > 1 else 'generations'
                        data_dict[key] = self._read_file(gen_file)
                        logger.info(f"Loaded generations from {gen_file.name}")
                        break
            
            # Load annotations/toxicity scores
            if config == 'annotations' or config == 'all':
                annotation_files = (list(path.glob('*annotation*')) + 
                                list(path.glob('*toxicity*')) +
                                list(path.glob('*score*')) +
                                list(path.glob('*label*')) +
                                list(path.glob('annotations.*')))
                
                for ann_file in annotation_files:
                    if ann_file.exists():
                        key = f'annotations_{ann_file.stem}' if len(annotation_files) > 1 else 'annotations'
                        data_dict[key] = self._read_file(ann_file)
                        logger.info(f"Loaded annotations from {ann_file.name}")
                        break
            
            # Load specific splits
            splits = ['train', 'test', 'dev', 'validation']
            for split in splits:
                if config == split or config == 'all':
                    split_files = []
                    
                    # Look for various naming patterns
                    patterns = [
                        f'{split}.*',
                        f'*{split}*',
                        f'realtoxicityprompts_{split}.*',
                        f'rtp_{split}.*'
                    ]
                    
                    for pattern in patterns:
                        split_files.extend(list(path.glob(pattern)))
                    
                    # Remove duplicates and filter for data files
                    split_files = list(set([f for f in split_files 
                                        if f.suffix in ['.jsonl', '.json', '.csv', '.tsv', '.txt']]))
                    
                    if split_files:
                        # Use the first file found for this split
                        split_file = split_files[0]
                        data_dict[split] = self._read_file(split_file)
                        logger.info(f"Loaded {split} split from {split_file.name}")
            
            # Load perspective API scores if available
            if config == 'perspective' or config == 'all':
                perspective_files = (list(path.glob('*perspective*')) + 
                                list(path.glob('*api*')))
                
                for persp_file in perspective_files:
                    if persp_file.exists():
                        key = f'perspective_{persp_file.stem}' if len(perspective_files) > 1 else 'perspective'
                        data_dict[key] = self._read_file(persp_file)
                        logger.info(f"Loaded Perspective API scores from {persp_file.name}")
                        break
            
            # Load model-specific generations if available
            if config == 'models' or config == 'all':
                models_dir = path / 'models'
                if models_dir.exists():
                    data_dict['models'] = self._read_folder(models_dir)
                    logger.info("Loaded model-specific generations")
            
            # Load all files if 'all' specified or no specific files found
            if config == 'all' or not data_dict:
                logger.info("Loading all available files...")
                
                for item in path.iterdir():
                    if item.name.startswith('.') or item.name == '__pycache__' or item.suffix in ['.py', '.pyc']:
                        continue
                    if item.is_file() and item.stem not in data_dict:
                        # Skip very large files unless specifically requested
                        if item.stat().st_size > 100 * 1024 * 1024:  # 100MB threshold
                            logger.warning(f"Skipping large file {item.name} (size: {item.stat().st_size / (1024*1024):.1f}MB)")
                            continue
                        
                        try:
                            data_dict[item.stem] = self._read_file(item)
                            logger.info(f"Loaded {item.name}")
                        except Exception as e:
                            logger.warning(f"Could not load {item.name}: {e}")
                    
                    elif item.is_dir() and item.name not in data_dict:
                        try:
                            data_dict[item.name] = self._read_folder(item)
                            logger.info(f"Loaded directory {item.name}")
                        except Exception as e:
                            logger.warning(f"Could not load directory {item.name}: {e}")
            
            # Post-process data if needed
            if data_dict:
                # Combine prompts and generations if both are available
                if 'prompts' in data_dict and 'generations' in data_dict:
                    try:
                        prompts_df = data_dict['prompts']
                        generations_df = data_dict['generations']
                        
                        if isinstance(prompts_df, pd.DataFrame) and isinstance(generations_df, pd.DataFrame):
                            # Try to merge on common columns
                            common_cols = set(prompts_df.columns) & set(generations_df.columns)
                            if common_cols:
                                merged = prompts_df.merge(generations_df, on=list(common_cols), how='outer')
                                data_dict['combined'] = merged
                                logger.info("Created combined prompts+generations dataset")
                    except Exception as e:
                        logger.warning(f"Could not combine prompts and generations: {e}")
                
                # Add metadata
                data_dict['_metadata'] = {
                    'dataset': 'RealToxicityPrompts',
                    'description': 'Dataset for evaluating neural toxic degeneration in language models',
                    'source': 'https://allenai.org/data/real-toxicity-prompts',
                    'paper': 'RealToxicityPrompts: Evaluating Neural Toxic Degeneration in Language Models',
                    'loaded_components': list(data_dict.keys()),
                    'load_time': pd.Timestamp.now().isoformat()
                }
                
                logger.info(f"Successfully loaded RealToxicityPrompts with components: {list(data_dict.keys())}")
                return data_dict
            
            else:
                logger.warning("No data files found in RealToxicityPrompts directory")
                return None
        
        except Exception as e:
            logger.error(f"Error loading RealToxicityPrompts dataset: {e}")
            return None
    
    def run_process_and_download(self, name: str, **kwargs) -> Optional[Dict[str, Any]]:
        """Run process for Python datasets and download data"""
        if name in self.python_datasets:
            args = [str(v) for v in kwargs.values()]
            self._run_python_dataset(name, *args)
        
        return self.load_dataset(name)
    
    def _run_python_dataset(self, name: str, *args) -> None:
        """Run Python script for dataset processing"""
        path = self._get_dataset_path(name)
        
        if not path.exists():
            logger.error(f"Dataset path not found: {path}")
            return
        
        python_files = [f for f in path.glob('*.py') if f.name != '__init__.py']
        
        if not python_files:
            logger.error(f"No Python files found in {path}")
            return
        
        # Use the first Python file found
        self._run_python(python_files[0], *args)
    
    def load_dataset(self, dataset: str, config: Optional[str] = None) -> Optional[Dict[str, Any]]:
        """Load dataset with optional configuration"""
        if dataset.lower() in ('', 'h', 'help'):
            print('Available datasets:')
            print('=' * 50)
            for ds in self.get_datasets():
                if ds not in {'Equity-Evaluation-Corpus', 'RealToxicityPrompts'}:
                    print(f"  {ds}")
            return None
        
        if dataset in self.unavailable_datasets:
            logger.error(f'Dataset {dataset} is not available')
            return None
        
        if dataset not in self.handlers:
            logger.error(f'Dataset {dataset} not found')
            return None
        
        if dataset in self.config_required and config is None:
            print(f'Available configurations for {dataset}:')
            print('=' * 50)
            for conf in self.configs.get(dataset, []):
                print(f"  {conf}")
            return None
        
        try:
            return self.handlers[dataset](config or '')
        except Exception as e:
            logger.error(f"Error loading dataset {dataset}: {e}")
            return None


class CustomDataset(PtDataset):
    """Custom PyTorch Dataset wrapper"""
    
    def __init__(self, dataframe: pd.DataFrame):
        self.dataframe = dataframe
    
    def __getitem__(self, index: int) -> Tuple[np.ndarray, Any]:
        row = self.dataframe.iloc[index]
        features = row.iloc[1:].to_numpy()
        label = row.iloc[0]
        return features, label
    
    def __len__(self) -> int:
        return len(self.dataframe)


[docs] def BiasDataLoader( dataset: Optional[str] = None, config: Optional[str] = None, format: str = 'hf', benchmark_path: Optional[str] = None ) -> Optional[Dict[str, Union[pd.DataFrame, List[str], PtDataset, HfDataset]]]: r"""Load specified bias evaluation dataset. Requires downloading the Fair-LLM-Benchmark repository (https://github.com/i-gallegos/Fair-LLM-Benchmark , credits to Isabel O. Gallegos et al). Parameters ---------- dataset : str name of the dataset. config : str dataset configuration if applicable. format : str output format - 'raw', 'hf' (hugging face), or 'pt' (pytorch). benchmark_path : str path where the Fair-LLM-Benchmark resides. If none, it looks for it in FairLangProc/FairLangProc/datasets/Fair-LLM-Benchmark Returns ------- dataDict : dict Dictionary with datasets in the appropriate format. Example ------- >>> from FairLangProc.datasets import BiasDataLoader >>> BiasDataLoader() Available datasets: ==================== BBQ BEC-Pro BOLD BUG CrowS-Pairs GAP HolisticBias StereoSet WinoBias+ WinoBias Winogender >>> BiasDataLoader(dataset = 'BBQ') Available configurations: ==================== Age Disability_Status Gender_identity Nationality Physical_appearance Race_ethnicity Race_x_gender Race_x_SES Religion SES Sexual_orientation all >>> ageBBQ = BiasDataLoader(dataset = 'BBQ', config = 'Age') """ loader = FairLLMBenchmarkLoader(benchmark_path) if not dataset: return loader.load_dataset('') # Load raw data raw_data = loader.load_dataset(dataset, config) if raw_data is None: return None # Convert to requested format if format == 'raw': return raw_data if isinstance(raw_data, dict) else {'data': raw_data} data_dict = {} if format == 'hf': if isinstance(raw_data, dict): for key, value in raw_data.items(): if isinstance(value, pd.DataFrame): data_dict[key] = HfDataset.from_pandas(value) else: logger.warning(f"Skipping {key}: not a DataFrame") elif isinstance(raw_data, pd.DataFrame): data_dict['data'] = HfDataset.from_pandas(raw_data) else: raise TypeError("Data must be a pandas DataFrame or dict of DataFrames for HF format") elif format == 'pt': if isinstance(raw_data, dict): for key, value in raw_data.items(): if isinstance(value, pd.DataFrame): data_dict[key] = CustomDataset(value) else: logger.warning(f"Skipping {key}: not a DataFrame") elif isinstance(raw_data, pd.DataFrame): data_dict['data'] = CustomDataset(raw_data) else: raise TypeError("Data must be a pandas DataFrame or dict of DataFrames for PyTorch format") else: raise ValueError('Supported formats: "hf", "pt", "raw"') return data_dict