Master data collection, quality, governance, and pipeline management for AI projects
Data quality directly impacts model accuracy. Poor data leads to poor predictions, regardless of algorithm sophistication.
Efficient data management reduces storage costs, processing time, and computational resources needed for training.
Proper data governance ensures regulatory compliance (GDPR, CCPA) and ethical AI development.
Enter your data metrics to see the potential impact on your AI project...
| Company | Data Challenge | Solution | Result |
|---|---|---|---|
| Netflix | Personalization at scale | Real-time data pipelines | 75% of views from recommendations |
| Uber | Dynamic pricing data | Stream processing architecture | Sub-second pricing updates |
| Airbnb | Image quality variations | Automated data validation | 40% reduction in listing errors |
APIs, DBs, Files
ETL/ELT
Data Lake/Warehouse
Transform & Clean
Model Training
{
"customer_id": "12345",
"age": 28,
"purchase_history": [
{"date": "2024-01-15", "amount": 129.99},
{"date": "2024-02-20", "amount": 79.50}
],
"segment": "premium"
}
2024-03-15 10:23:45 [INFO] User login: user_123 2024-03-15 10:23:47 [DEBUG] Session started 2024-03-15 10:24:12 [ERROR] Payment failed: timeout
Click on quality dimensions to learn more...
| Strategy | Use Case | Pros | Cons |
|---|---|---|---|
| Batch Collection | Historical analysis | Efficient, cost-effective | Not real-time |
| Stream Processing | Real-time decisions | Low latency, fresh data | Complex, expensive |
| Web Scraping | Public data gathering | Large scale, automated | Legal considerations |
| API Integration | Third-party data | Structured, reliable | Rate limits, costs |
| Manual Entry | Expert annotations | High quality | Slow, expensive |
Traditional approach for structured data
# Extract data = extract_from_source() # Transform cleaned_data = clean_missing_values(data) normalized_data = normalize_features(cleaned_data) engineered_data = create_features(normalized_data) # Load load_to_warehouse(engineered_data)
Modern approach for big data
Select your industry and scale to generate a customized governance framework...
class DataValidator:
def __init__(self, rules):
self.rules = rules
self.validation_results = []
def validate_schema(self, data):
"""Check data structure and types"""
expected_columns = self.rules['schema']
actual_columns = data.columns
missing = set(expected_columns) - set(actual_columns)
if missing:
self.log_error(f"Missing columns: {missing}")
return False
return True
def validate_ranges(self, data):
"""Check value ranges and boundaries"""
for column, constraints in self.rules['ranges'].items():
if column in data.columns:
min_val = data[column].min()
max_val = data[column].max()
if min_val < constraints['min']:
self.log_warning(f"{column} has values below {constraints['min']}")
if max_val > constraints['max']:
self.log_warning(f"{column} has values above {constraints['max']}")
def validate_nulls(self, data):
"""Check for missing values"""
null_counts = data.isnull().sum()
for column, count in null_counts.items():
if count > 0:
percentage = (count / len(data)) * 100
if percentage > self.rules['null_threshold']:
self.log_error(f"{column} has {percentage:.2f}% null values")
def run_validation(self, data):
"""Execute all validation checks"""
self.validate_schema(data)
self.validate_ranges(data)
self.validate_nulls(data)
return self.validation_results
| Pattern | Description | Tools | Best For |
|---|---|---|---|
| Git-style | Track changes with commits | DVC, Git LFS | Small-medium datasets |
| Time-based | Snapshots at intervals | Delta Lake, Iceberg | Time-series data |
| Schema Evolution | Track structure changes | Avro, Parquet | Evolving datasets |
| Immutable Logs | Append-only storage | Kafka, Event Store | Event sourcing |
Model identifies uncertain samples for human labeling
75% fewer labels needed
Use heuristics and rules to generate labels
Leverage pre-trained models for initial labels
Configure your pipeline components above...
Paste your data and click Analyze to get a quality report...
import re
import hashlib
class PIIDetector:
def __init__(self):
self.patterns = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b'
}
def detect_pii(self, text):
"""Detect PII in text data"""
findings = []
for pii_type, pattern in self.patterns.items():
matches = re.finditer(pattern, text)
for match in matches:
findings.append({
'type': pii_type,
'position': match.span(),
'value': match.group() # In production, don't log actual PII
})
return findings
def anonymize_pii(self, text, method='hash'):
"""Replace PII with anonymized values"""
anonymized = text
for pii_type, pattern in self.patterns.items():
if method == 'hash':
anonymized = re.sub(pattern, lambda m: self._hash_value(m.group()), anonymized)
elif method == 'mask':
anonymized = re.sub(pattern, lambda m: self._mask_value(m.group(), pii_type), anonymized)
return anonymized
def _hash_value(self, value):
"""Hash PII value"""
return hashlib.sha256(value.encode()).hexdigest()[:8]
def _mask_value(self, value, pii_type):
"""Mask PII value"""
if pii_type == 'email':
parts = value.split('@')
return f"{parts[0][:2]}***@{parts[1]}"
elif pii_type == 'phone':
return 'XXX-XXX-' + value[-4:]
else:
return 'X' * len(value)
# Usage
detector = PIIDetector()
sample_text = "Contact John at john@email.com or 555-123-4567"
pii_found = detector.detect_pii(sample_text)
anonymized = detector.anonymize_pii(sample_text, method='mask')
print(f"PII Found: {pii_found}")
print(f"Anonymized: {anonymized}")
Configure baseline and current statistics to detect drift...
| Version | Changes | Migration Script | Impact |
|---|---|---|---|
| v1.0 | Initial schema | CREATE TABLE users (id, name, email) |
- |
| v1.1 | Add timestamp | ALTER TABLE ADD created_at TIMESTAMP |
Backward compatible |
| v2.0 | Normalize address | CREATE TABLE addresses; ALTER TABLE users... |
Breaking change |
from kafka import KafkaConsumer, KafkaProducer
from kafka.errors import KafkaError
import json
import logging
from datetime import datetime
class StreamProcessor:
def __init__(self, input_topic, output_topic, bootstrap_servers):
self.consumer = KafkaConsumer(
input_topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset='latest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.output_topic = output_topic
self.stats = {
'processed': 0,
'errors': 0,
'start_time': datetime.now()
}
def process_message(self, message):
"""Process individual message with business logic"""
try:
# Data validation
if not self.validate_message(message):
raise ValueError("Message validation failed")
# Feature engineering
enriched = self.enrich_message(message)
# Anomaly detection
if self.detect_anomaly(enriched):
enriched['anomaly_flag'] = True
self.alert_anomaly(enriched)
# Aggregation window
enriched = self.update_aggregates(enriched)
return enriched
except Exception as e:
logging.error(f"Processing error: {e}")
self.stats['errors'] += 1
return None
def validate_message(self, message):
"""Validate message schema and content"""
required_fields = ['timestamp', 'user_id', 'event_type', 'data']
return all(field in message for field in required_fields)
def enrich_message(self, message):
"""Add derived features and metadata"""
enriched = message.copy()
# Add processing timestamp
enriched['processed_at'] = datetime.now().isoformat()
# Add derived features
if 'amount' in message.get('data', {}):
amount = message['data']['amount']
enriched['data']['amount_category'] = self.categorize_amount(amount)
# Add user segment
enriched['user_segment'] = self.get_user_segment(message['user_id'])
return enriched
def detect_anomaly(self, message):
"""Simple anomaly detection logic"""
if 'amount' in message.get('data', {}):
amount = message['data']['amount']
# Flag unusually high amounts
if amount > 10000:
return True
# Check for unusual patterns
if message.get('event_type') == 'login':
# Multiple logins in short time
if self.check_velocity(message['user_id'], 'login', window_seconds=60) > 5:
return True
return False
def run(self):
"""Main processing loop"""
logging.info("Starting stream processor...")
for message in self.consumer:
try:
# Process message
processed = self.process_message(message.value)
if processed:
# Send to output topic
future = self.producer.send(self.output_topic, processed)
future.get(timeout=10) # Wait for confirmation
self.stats['processed'] += 1
# Log progress
if self.stats['processed'] % 1000 == 0:
self.log_stats()
except KafkaError as e:
logging.error(f"Kafka error: {e}")
def log_stats(self):
"""Log processing statistics"""
runtime = (datetime.now() - self.stats['start_time']).total_seconds()
throughput = self.stats['processed'] / runtime if runtime > 0 else 0
logging.info(f"Processed: {self.stats['processed']}, "
f"Errors: {self.stats['errors']}, "
f"Throughput: {throughput:.2f} msg/sec")
# Usage
processor = StreamProcessor(
input_topic='raw-events',
output_topic='processed-events',
bootstrap_servers='localhost:9092'
)
processor.run()
expectations:
- expectation_type: expect_column_mean_to_be_between
kwargs:
column: "price"
min_value: 50
max_value: 500
- expectation_type: expect_column_values_to_not_be_null
kwargs:
column: "user_id"
mostly: 0.99
- expectation_type: expect_column_unique_value_count_to_be_between
kwargs:
column: "product_id"
min_value: 100
max_value: 10000
Customer Data
Product Data
Transaction Data
| Technique | Impact | Implementation | Trade-off |
|---|---|---|---|
| Partitioning | 10-100x query speed | Date/Hash partitions | Storage overhead |
| Indexing | 5-50x lookup speed | B-tree, bitmap indexes | Write performance |
| Caching | 100-1000x access speed | Redis, Memcached | Consistency challenges |
| Compression | 50-90% storage reduction | Parquet, ORC formats | CPU overhead |
| Columnar Storage | 10-100x analytics speed | Parquet, Arrow | Row operations slower |
class FeatureStore:
def __init__(self, online_store, offline_store):
self.online_store = online_store # Redis/DynamoDB
self.offline_store = offline_store # S3/BigQuery
self.feature_registry = {}
def register_feature(self, feature_def):
"""Register new feature definition"""
self.feature_registry[feature_def.name] = {
'description': feature_def.description,
'dtype': feature_def.dtype,
'source': feature_def.source,
'transformation': feature_def.transformation,
'ttl': feature_def.ttl,
'version': feature_def.version
}
def compute_features(self, entity_id, feature_list):
"""Compute features for given entity"""
features = {}
for feature_name in feature_list:
# Check online store first
cached = self.get_online_feature(entity_id, feature_name)
if cached and not self.is_stale(cached):
features[feature_name] = cached
continue
# Compute from offline store
feature_def = self.feature_registry[feature_name]
raw_data = self.offline_store.get(feature_def['source'], entity_id)
computed = feature_def['transformation'](raw_data)
# Update online store
self.set_online_feature(entity_id, feature_name, computed)
features[feature_name] = computed
return features
def get_training_data(self, entity_ids, feature_list, point_in_time):
"""Get historical features for training"""
training_data = []
for entity_id in entity_ids:
# Time travel query
historical_features = self.offline_store.time_travel_query(
entity_id=entity_id,
features=feature_list,
timestamp=point_in_time
)
training_data.append(historical_features)
return pd.DataFrame(training_data)
def monitor_feature_drift(self, feature_name, window='7d'):
"""Monitor feature distribution changes"""
current_stats = self.compute_feature_stats(feature_name, 'now')
baseline_stats = self.compute_feature_stats(feature_name, f'now-{window}')
# Calculate drift metrics
kl_divergence = self.calculate_kl_divergence(baseline_stats, current_stats)
psi = self.calculate_psi(baseline_stats, current_stats)
if kl_divergence > 0.1 or psi > 0.2:
self.alert_feature_drift(feature_name, kl_divergence, psi)
return {'kl_divergence': kl_divergence, 'psi': psi}
| Category | Tool | Best For | Scale | Cost |
|---|---|---|---|---|
| Orchestration | Apache Airflow | Complex workflows | Large | Open Source |
| Prefect | Python workflows | Medium | Freemium | |
| Dagster | Data-aware pipelines | Large | Open Source | |
| Storage | AWS S3 | Object storage | Unlimited | Pay-per-use |
| Snowflake | Cloud warehouse | Large | Pay-per-use | |
| Delta Lake | ACID transactions | Large | Open Source | |
| Processing | Apache Spark | Batch processing | Very Large | Open Source |
| Apache Flink | Stream processing | Large | Open Source | |
| DBT | SQL transformations | Medium | Open Source |
# Data profiling with pandas
df.describe() # Basic statistics
df.info() # Data types and nulls
df.nunique() # Unique values per column
# Data validation
df.isnull().sum() # Count missing values
df.duplicated().sum() # Count duplicates
df.dtypes # Check data types
# Data cleaning
df.dropna() # Remove missing values
df.fillna(method='forward') # Forward fill
df.drop_duplicates() # Remove duplicates
# Data transformation
df.apply(lambda x: x.lower()) # Apply function
df.groupby('category').agg({'value': 'mean'}) # Aggregate
pd.merge(df1, df2, on='key') # Join datasets
# Data export
df.to_parquet('data.parquet') # Efficient storage
df.to_csv('data.csv', index=False) # CSV export
df.to_json('data.json', orient='records') # JSON export
Problem: Training data contains information from the future
Solution: Proper train/test splits, time-based validation
Problem: Pipeline continues despite data quality issues
Solution: Implement circuit breakers and alerts
Problem: Unexpected changes in data structure
Solution: Schema registry and validation
Completeness: % non-nullAccuracy: % correct valuesConsistency: % matching rulesTimeliness: Age of data
Throughput: Records/secLatency: End-to-end timeError rate: Failed/TotalCost: $/GB processed
Coverage: % entities trackedFreshness: Update frequencyUsage: Queries/dayValue: ROI of data