Data Pipeline Examples¶
ETL, data transformation, and processing workflows with elastic-script.
Basic Data Transformation¶
Transform and enrich documents:
CREATE PROCEDURE enrich_logs(source_index STRING, target_index STRING)
BEGIN
PRINT '=== Enriching logs from ' || source_index || ' ===';
DECLARE logs CURSOR FOR ESQL_QUERY('
FROM ' || source_index || '
| LIMIT 1000
');
DECLARE processed NUMBER = 0;
DECLARE errors NUMBER = 0;
FOR log IN logs LOOP
TRY
-- Get original fields
DECLARE message STRING = DOCUMENT_GET(log, 'message');
DECLARE level STRING = DOCUMENT_GET(log, 'level');
DECLARE timestamp DATE = DOCUMENT_GET(log, '@timestamp');
-- Enrich with computed fields
DECLARE enriched DOCUMENT = DOCUMENT_MERGE(log, {
"message_length": LENGTH(message),
"level_normalized": UPPER(level),
"day_of_week": EXTRACT_DAY(timestamp),
"processed_at": CURRENT_TIMESTAMP(),
"processed_by": "elastic-script-pipeline"
});
-- Classify severity
DECLARE severity NUMBER;
IF level = 'ERROR' THEN
SET severity = 1;
ELSEIF level = 'WARN' THEN
SET severity = 2;
ELSEIF level = 'INFO' THEN
SET severity = 3;
ELSE
SET severity = 4;
END IF;
SET enriched = DOCUMENT_MERGE(enriched, {"severity_score": severity});
-- Index to target
ES_INDEX(target_index, NULL, enriched);
SET processed = processed + 1;
CATCH error
SET errors = errors + 1;
PRINT 'Error processing document: ' || error;
END TRY;
END LOOP;
PRINT 'Processed: ' || processed || ', Errors: ' || errors;
RETURN processed;
END PROCEDURE;
Aggregation Pipeline¶
Compute and store aggregated metrics:
CREATE PROCEDURE aggregate_metrics(time_window STRING)
BEGIN
PRINT '=== Computing Metrics (' || time_window || ') ===';
-- Aggregate by service
DECLARE service_metrics CURSOR FOR ESQL_QUERY('
FROM logs-*
| WHERE @timestamp > NOW() - ' || time_window || '
| STATS
request_count = COUNT(*),
error_count = COUNT(*) WHERE level = "ERROR",
avg_response_time = AVG(response_time_ms)
BY service.name
');
DECLARE timestamp DATE = CURRENT_TIMESTAMP();
FOR svc IN service_metrics LOOP
DECLARE service STRING = DOCUMENT_GET(svc, 'service.name');
DECLARE requests NUMBER = DOCUMENT_GET(svc, 'request_count');
DECLARE errors NUMBER = DOCUMENT_GET(svc, 'error_count');
DECLARE avg_time NUMBER = DOCUMENT_GET(svc, 'avg_response_time');
-- Calculate error rate
DECLARE error_rate NUMBER = 0;
IF requests > 0 THEN
SET error_rate = ROUND((errors * 100.0) / requests, 2);
END IF;
-- Create metric document
DECLARE metric DOCUMENT = {
"@timestamp": timestamp,
"service": service,
"time_window": time_window,
"metrics": {
"request_count": requests,
"error_count": errors,
"error_rate_percent": error_rate,
"avg_response_time_ms": ROUND(avg_time, 2)
}
};
-- Store metric
ES_INDEX('service-metrics', NULL, metric);
PRINT service || ': ' || requests || ' requests, ' || error_rate || '% errors';
END LOOP;
END PROCEDURE;
Data Migration¶
Migrate data between indices with transformation:
CREATE PROCEDURE migrate_index(
source_index STRING,
target_index STRING,
batch_size NUMBER
)
BEGIN
PRINT '=== Migrating ' || source_index || ' to ' || target_index || ' ===';
DECLARE total_migrated NUMBER = 0;
DECLARE batch_number NUMBER = 0;
DECLARE continue_migration BOOLEAN = TRUE;
WHILE continue_migration LOOP
SET batch_number = batch_number + 1;
DECLARE offset NUMBER = (batch_number - 1) * batch_size;
PRINT 'Processing batch ' || batch_number || ' (offset: ' || offset || ')';
DECLARE docs CURSOR FOR ESQL_QUERY('
FROM ' || source_index || '
| SORT @timestamp
| LIMIT ' || batch_size || '
');
DECLARE batch_count NUMBER = 0;
FOR doc IN docs LOOP
-- Apply any transformations here
DECLARE transformed DOCUMENT = DOCUMENT_MERGE(doc, {
"migrated_at": CURRENT_TIMESTAMP(),
"source_index": source_index
});
-- Remove any fields you don't want
SET transformed = DOCUMENT_REMOVE(transformed, '_score');
ES_INDEX(target_index, NULL, transformed);
SET batch_count = batch_count + 1;
END LOOP;
SET total_migrated = total_migrated + batch_count;
-- Check if we've processed all documents
IF batch_count < batch_size THEN
SET continue_migration = FALSE;
END IF;
PRINT ' Migrated ' || batch_count || ' documents (total: ' || total_migrated || ')';
END LOOP;
PRINT '';
PRINT 'Migration complete: ' || total_migrated || ' documents';
RETURN total_migrated;
END PROCEDURE;
Data Quality Checks¶
Validate data quality and generate reports:
CREATE PROCEDURE data_quality_check(index_name STRING)
BEGIN
PRINT '╔══════════════════════════════════════════╗';
PRINT '║ DATA QUALITY REPORT ║';
PRINT '║ Index: ' || RPAD(index_name, 26, ' ') || '║';
PRINT '╚══════════════════════════════════════════╝';
PRINT '';
DECLARE issues ARRAY = [];
-- Check 1: Document count
DECLARE count_result CURSOR FOR ESQL_QUERY('
FROM ' || index_name || ' | STATS count = COUNT(*)
');
DECLARE total_docs NUMBER = 0;
FOR row IN count_result LOOP
SET total_docs = DOCUMENT_GET(row, 'count');
END LOOP;
PRINT '📊 Total documents: ' || total_docs;
-- Check 2: Missing required fields
DECLARE missing_timestamp CURSOR FOR ESQL_QUERY('
FROM ' || index_name || '
| WHERE @timestamp IS NULL
| STATS count = COUNT(*)
');
FOR row IN missing_timestamp LOOP
DECLARE missing NUMBER = DOCUMENT_GET(row, 'count');
IF missing > 0 THEN
PRINT '⚠️ Missing @timestamp: ' || missing || ' documents';
SET issues = ARRAY_APPEND(issues, 'Missing @timestamp: ' || missing);
ELSE
PRINT '✅ All documents have @timestamp';
END IF;
END LOOP;
-- Check 3: Date range
DECLARE date_range CURSOR FOR ESQL_QUERY('
FROM ' || index_name || '
| STATS
oldest = MIN(@timestamp),
newest = MAX(@timestamp)
');
FOR row IN date_range LOOP
PRINT '📅 Date range: ' || DOCUMENT_GET(row, 'oldest')
|| ' to ' || DOCUMENT_GET(row, 'newest');
END LOOP;
-- Check 4: Field cardinality
DECLARE level_dist CURSOR FOR ESQL_QUERY('
FROM ' || index_name || '
| STATS count = COUNT(*) BY level
');
PRINT '';
PRINT '📊 Level distribution:';
FOR row IN level_dist LOOP
DECLARE level STRING = DOCUMENT_GET(row, 'level');
DECLARE count NUMBER = DOCUMENT_GET(row, 'count');
DECLARE pct NUMBER = ROUND((count * 100.0) / total_docs, 1);
PRINT ' ' || RPAD(level, 10, ' ') || count || ' (' || pct || '%)';
END LOOP;
-- Summary
PRINT '';
IF ARRAY_LENGTH(issues) = 0 THEN
PRINT '✅ No data quality issues found';
ELSE
PRINT '⚠️ Found ' || ARRAY_LENGTH(issues) || ' issues:';
FOR i IN 0..(ARRAY_LENGTH(issues)-1) LOOP
PRINT ' • ' || issues[i];
END LOOP;
END IF;
RETURN issues;
END PROCEDURE;
ETL with External APIs¶
Fetch external data and load into Elasticsearch:
CREATE PROCEDURE load_external_data(api_url STRING, target_index STRING)
BEGIN
PRINT '=== Loading data from external API ===';
PRINT 'Source: ' || api_url;
TRY
-- Fetch data from API
DECLARE response STRING = HTTP_GET(api_url);
-- Parse JSON response (assuming array of objects)
-- Note: Actual JSON parsing implementation may vary
DECLARE items ARRAY = JSON_PARSE(response);
PRINT 'Received ' || ARRAY_LENGTH(items) || ' items';
DECLARE loaded NUMBER = 0;
FOR i IN 0..(ARRAY_LENGTH(items)-1) LOOP
DECLARE item DOCUMENT = items[i];
-- Add metadata
SET item = DOCUMENT_MERGE(item, {
"loaded_at": CURRENT_TIMESTAMP(),
"source_url": api_url
});
ES_INDEX(target_index, NULL, item);
SET loaded = loaded + 1;
END LOOP;
PRINT 'Loaded ' || loaded || ' documents to ' || target_index;
RETURN loaded;
CATCH error
PRINT 'Failed to load data: ' || error;
RETURN 0;
END TRY;
END PROCEDURE;
Scheduled Cleanup¶
Remove old data based on retention policy:
CREATE PROCEDURE cleanup_old_data(index_pattern STRING, retention_days NUMBER)
BEGIN
DECLARE cutoff DATE = DATE_SUB(CURRENT_DATE(), retention_days);
PRINT '=== Data Cleanup ===';
PRINT 'Index pattern: ' || index_pattern;
PRINT 'Retention: ' || retention_days || ' days';
PRINT 'Cutoff date: ' || cutoff;
-- Find old documents
DECLARE old_docs CURSOR FOR ESQL_QUERY('
FROM ' || index_pattern || '
| WHERE @timestamp < "' || cutoff || '"
| STATS count = COUNT(*), indices = VALUES(_index)
');
FOR row IN old_docs LOOP
DECLARE count NUMBER = DOCUMENT_GET(row, 'count');
DECLARE indices ARRAY = DOCUMENT_GET(row, 'indices');
IF count > 0 THEN
PRINT 'Found ' || count || ' documents to clean up';
PRINT 'Affected indices: ' || ARRAY_JOIN(indices, ', ');
-- In production, you would delete these documents
-- ES_DELETE_BY_QUERY(index_pattern, '@timestamp < "' || cutoff || '"');
PRINT '⚠️ Cleanup would delete ' || count || ' documents';
PRINT ' (Dry run - no documents deleted)';
ELSE
PRINT '✅ No documents older than retention period';
END IF;
END LOOP;
END PROCEDURE;
Usage¶
-- Enrich and transform logs
CALL enrich_logs('raw-logs', 'enriched-logs');
-- Compute hourly metrics
CALL aggregate_metrics('1 hour');
-- Migrate index with transformation
CALL migrate_index('logs-v1', 'logs-v2', 500);
-- Run data quality checks
CALL data_quality_check('logs-sample');
-- Cleanup old data (dry run)
CALL cleanup_old_data('logs-*', 90);