Master data collection, quality, governance, and pipeline management for AI projects
Data quality directly impacts model accuracy. Poor data leads to poor predictions, regardless of algorithm sophistication.
Efficient data management reduces storage costs, processing time, and computational resources needed for training.
Proper data governance ensures regulatory compliance (GDPR, CCPA) and ethical AI development.
Enter your data metrics to see the potential impact on your AI project...
Company | Data Challenge | Solution | Result |
---|---|---|---|
Netflix | Personalization at scale | Real-time data pipelines | 75% of views from recommendations |
Uber | Dynamic pricing data | Stream processing architecture | Sub-second pricing updates |
Airbnb | Image quality variations | Automated data validation | 40% reduction in listing errors |
APIs, DBs, Files
ETL/ELT
Data Lake/Warehouse
Transform & Clean
Model Training
{ "customer_id": "12345", "age": 28, "purchase_history": [ {"date": "2024-01-15", "amount": 129.99}, {"date": "2024-02-20", "amount": 79.50} ], "segment": "premium" }
2024-03-15 10:23:45 [INFO] User login: user_123 2024-03-15 10:23:47 [DEBUG] Session started 2024-03-15 10:24:12 [ERROR] Payment failed: timeout
Click on quality dimensions to learn more...
Strategy | Use Case | Pros | Cons |
---|---|---|---|
Batch Collection | Historical analysis | Efficient, cost-effective | Not real-time |
Stream Processing | Real-time decisions | Low latency, fresh data | Complex, expensive |
Web Scraping | Public data gathering | Large scale, automated | Legal considerations |
API Integration | Third-party data | Structured, reliable | Rate limits, costs |
Manual Entry | Expert annotations | High quality | Slow, expensive |
Traditional approach for structured data
# Extract data = extract_from_source() # Transform cleaned_data = clean_missing_values(data) normalized_data = normalize_features(cleaned_data) engineered_data = create_features(normalized_data) # Load load_to_warehouse(engineered_data)
Modern approach for big data
Select your industry and scale to generate a customized governance framework...
class DataValidator: def __init__(self, rules): self.rules = rules self.validation_results = [] def validate_schema(self, data): """Check data structure and types""" expected_columns = self.rules['schema'] actual_columns = data.columns missing = set(expected_columns) - set(actual_columns) if missing: self.log_error(f"Missing columns: {missing}") return False return True def validate_ranges(self, data): """Check value ranges and boundaries""" for column, constraints in self.rules['ranges'].items(): if column in data.columns: min_val = data[column].min() max_val = data[column].max() if min_val < constraints['min']: self.log_warning(f"{column} has values below {constraints['min']}") if max_val > constraints['max']: self.log_warning(f"{column} has values above {constraints['max']}") def validate_nulls(self, data): """Check for missing values""" null_counts = data.isnull().sum() for column, count in null_counts.items(): if count > 0: percentage = (count / len(data)) * 100 if percentage > self.rules['null_threshold']: self.log_error(f"{column} has {percentage:.2f}% null values") def run_validation(self, data): """Execute all validation checks""" self.validate_schema(data) self.validate_ranges(data) self.validate_nulls(data) return self.validation_results
Pattern | Description | Tools | Best For |
---|---|---|---|
Git-style | Track changes with commits | DVC, Git LFS | Small-medium datasets |
Time-based | Snapshots at intervals | Delta Lake, Iceberg | Time-series data |
Schema Evolution | Track structure changes | Avro, Parquet | Evolving datasets |
Immutable Logs | Append-only storage | Kafka, Event Store | Event sourcing |
Model identifies uncertain samples for human labeling
75% fewer labels needed
Use heuristics and rules to generate labels
Leverage pre-trained models for initial labels
Configure your pipeline components above...
Paste your data and click Analyze to get a quality report...
import re import hashlib class PIIDetector: def __init__(self): self.patterns = { 'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', 'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', 'ssn': r'\b\d{3}-\d{2}-\d{4}\b', 'credit_card': r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b' } def detect_pii(self, text): """Detect PII in text data""" findings = [] for pii_type, pattern in self.patterns.items(): matches = re.finditer(pattern, text) for match in matches: findings.append({ 'type': pii_type, 'position': match.span(), 'value': match.group() # In production, don't log actual PII }) return findings def anonymize_pii(self, text, method='hash'): """Replace PII with anonymized values""" anonymized = text for pii_type, pattern in self.patterns.items(): if method == 'hash': anonymized = re.sub(pattern, lambda m: self._hash_value(m.group()), anonymized) elif method == 'mask': anonymized = re.sub(pattern, lambda m: self._mask_value(m.group(), pii_type), anonymized) return anonymized def _hash_value(self, value): """Hash PII value""" return hashlib.sha256(value.encode()).hexdigest()[:8] def _mask_value(self, value, pii_type): """Mask PII value""" if pii_type == 'email': parts = value.split('@') return f"{parts[0][:2]}***@{parts[1]}" elif pii_type == 'phone': return 'XXX-XXX-' + value[-4:] else: return 'X' * len(value) # Usage detector = PIIDetector() sample_text = "Contact John at john@email.com or 555-123-4567" pii_found = detector.detect_pii(sample_text) anonymized = detector.anonymize_pii(sample_text, method='mask') print(f"PII Found: {pii_found}") print(f"Anonymized: {anonymized}")
Configure baseline and current statistics to detect drift...
Version | Changes | Migration Script | Impact |
---|---|---|---|
v1.0 | Initial schema | CREATE TABLE users (id, name, email) |
- |
v1.1 | Add timestamp | ALTER TABLE ADD created_at TIMESTAMP |
Backward compatible |
v2.0 | Normalize address | CREATE TABLE addresses; ALTER TABLE users... |
Breaking change |
from kafka import KafkaConsumer, KafkaProducer from kafka.errors import KafkaError import json import logging from datetime import datetime class StreamProcessor: def __init__(self, input_topic, output_topic, bootstrap_servers): self.consumer = KafkaConsumer( input_topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='latest', value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) self.producer = KafkaProducer( bootstrap_servers=bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode('utf-8') ) self.output_topic = output_topic self.stats = { 'processed': 0, 'errors': 0, 'start_time': datetime.now() } def process_message(self, message): """Process individual message with business logic""" try: # Data validation if not self.validate_message(message): raise ValueError("Message validation failed") # Feature engineering enriched = self.enrich_message(message) # Anomaly detection if self.detect_anomaly(enriched): enriched['anomaly_flag'] = True self.alert_anomaly(enriched) # Aggregation window enriched = self.update_aggregates(enriched) return enriched except Exception as e: logging.error(f"Processing error: {e}") self.stats['errors'] += 1 return None def validate_message(self, message): """Validate message schema and content""" required_fields = ['timestamp', 'user_id', 'event_type', 'data'] return all(field in message for field in required_fields) def enrich_message(self, message): """Add derived features and metadata""" enriched = message.copy() # Add processing timestamp enriched['processed_at'] = datetime.now().isoformat() # Add derived features if 'amount' in message.get('data', {}): amount = message['data']['amount'] enriched['data']['amount_category'] = self.categorize_amount(amount) # Add user segment enriched['user_segment'] = self.get_user_segment(message['user_id']) return enriched def detect_anomaly(self, message): """Simple anomaly detection logic""" if 'amount' in message.get('data', {}): amount = message['data']['amount'] # Flag unusually high amounts if amount > 10000: return True # Check for unusual patterns if message.get('event_type') == 'login': # Multiple logins in short time if self.check_velocity(message['user_id'], 'login', window_seconds=60) > 5: return True return False def run(self): """Main processing loop""" logging.info("Starting stream processor...") for message in self.consumer: try: # Process message processed = self.process_message(message.value) if processed: # Send to output topic future = self.producer.send(self.output_topic, processed) future.get(timeout=10) # Wait for confirmation self.stats['processed'] += 1 # Log progress if self.stats['processed'] % 1000 == 0: self.log_stats() except KafkaError as e: logging.error(f"Kafka error: {e}") def log_stats(self): """Log processing statistics""" runtime = (datetime.now() - self.stats['start_time']).total_seconds() throughput = self.stats['processed'] / runtime if runtime > 0 else 0 logging.info(f"Processed: {self.stats['processed']}, " f"Errors: {self.stats['errors']}, " f"Throughput: {throughput:.2f} msg/sec") # Usage processor = StreamProcessor( input_topic='raw-events', output_topic='processed-events', bootstrap_servers='localhost:9092' ) processor.run()
expectations: - expectation_type: expect_column_mean_to_be_between kwargs: column: "price" min_value: 50 max_value: 500 - expectation_type: expect_column_values_to_not_be_null kwargs: column: "user_id" mostly: 0.99 - expectation_type: expect_column_unique_value_count_to_be_between kwargs: column: "product_id" min_value: 100 max_value: 10000
Customer Data
Product Data
Transaction Data
Technique | Impact | Implementation | Trade-off |
---|---|---|---|
Partitioning | 10-100x query speed | Date/Hash partitions | Storage overhead |
Indexing | 5-50x lookup speed | B-tree, bitmap indexes | Write performance |
Caching | 100-1000x access speed | Redis, Memcached | Consistency challenges |
Compression | 50-90% storage reduction | Parquet, ORC formats | CPU overhead |
Columnar Storage | 10-100x analytics speed | Parquet, Arrow | Row operations slower |
class FeatureStore: def __init__(self, online_store, offline_store): self.online_store = online_store # Redis/DynamoDB self.offline_store = offline_store # S3/BigQuery self.feature_registry = {} def register_feature(self, feature_def): """Register new feature definition""" self.feature_registry[feature_def.name] = { 'description': feature_def.description, 'dtype': feature_def.dtype, 'source': feature_def.source, 'transformation': feature_def.transformation, 'ttl': feature_def.ttl, 'version': feature_def.version } def compute_features(self, entity_id, feature_list): """Compute features for given entity""" features = {} for feature_name in feature_list: # Check online store first cached = self.get_online_feature(entity_id, feature_name) if cached and not self.is_stale(cached): features[feature_name] = cached continue # Compute from offline store feature_def = self.feature_registry[feature_name] raw_data = self.offline_store.get(feature_def['source'], entity_id) computed = feature_def['transformation'](raw_data) # Update online store self.set_online_feature(entity_id, feature_name, computed) features[feature_name] = computed return features def get_training_data(self, entity_ids, feature_list, point_in_time): """Get historical features for training""" training_data = [] for entity_id in entity_ids: # Time travel query historical_features = self.offline_store.time_travel_query( entity_id=entity_id, features=feature_list, timestamp=point_in_time ) training_data.append(historical_features) return pd.DataFrame(training_data) def monitor_feature_drift(self, feature_name, window='7d'): """Monitor feature distribution changes""" current_stats = self.compute_feature_stats(feature_name, 'now') baseline_stats = self.compute_feature_stats(feature_name, f'now-{window}') # Calculate drift metrics kl_divergence = self.calculate_kl_divergence(baseline_stats, current_stats) psi = self.calculate_psi(baseline_stats, current_stats) if kl_divergence > 0.1 or psi > 0.2: self.alert_feature_drift(feature_name, kl_divergence, psi) return {'kl_divergence': kl_divergence, 'psi': psi}
Category | Tool | Best For | Scale | Cost |
---|---|---|---|---|
Orchestration | Apache Airflow | Complex workflows | Large | Open Source |
Prefect | Python workflows | Medium | Freemium | |
Dagster | Data-aware pipelines | Large | Open Source | |
Storage | AWS S3 | Object storage | Unlimited | Pay-per-use |
Snowflake | Cloud warehouse | Large | Pay-per-use | |
Delta Lake | ACID transactions | Large | Open Source | |
Processing | Apache Spark | Batch processing | Very Large | Open Source |
Apache Flink | Stream processing | Large | Open Source | |
DBT | SQL transformations | Medium | Open Source |
# Data profiling with pandas df.describe() # Basic statistics df.info() # Data types and nulls df.nunique() # Unique values per column # Data validation df.isnull().sum() # Count missing values df.duplicated().sum() # Count duplicates df.dtypes # Check data types # Data cleaning df.dropna() # Remove missing values df.fillna(method='forward') # Forward fill df.drop_duplicates() # Remove duplicates # Data transformation df.apply(lambda x: x.lower()) # Apply function df.groupby('category').agg({'value': 'mean'}) # Aggregate pd.merge(df1, df2, on='key') # Join datasets # Data export df.to_parquet('data.parquet') # Efficient storage df.to_csv('data.csv', index=False) # CSV export df.to_json('data.json', orient='records') # JSON export
Problem: Training data contains information from the future
Solution: Proper train/test splits, time-based validation
Problem: Pipeline continues despite data quality issues
Solution: Implement circuit breakers and alerts
Problem: Unexpected changes in data structure
Solution: Schema registry and validation
Completeness: % non-null
Accuracy: % correct values
Consistency: % matching rules
Timeliness: Age of data
Throughput: Records/sec
Latency: End-to-end time
Error rate: Failed/Total
Cost: $/GB processed
Coverage: % entities tracked
Freshness: Update frequency
Usage: Queries/day
Value: ROI of data