Elasticsearch Functions¶
Native integration with Elasticsearch for querying and data manipulation.
ESQL Integration¶
ESQL_QUERY¶
Executes an ES|QL query and returns results.
DECLARE results CURSOR FOR ESQL_QUERY('
FROM logs-*
| WHERE level = "ERROR"
| STATS count = COUNT(*) BY service
| SORT count DESC
| LIMIT 10
');
FOR row IN results LOOP
PRINT DOCUMENT_GET(row, 'service') || ': ' || DOCUMENT_GET(row, 'count');
END LOOP;
Syntax: ESQL_QUERY(query_string)
Returns: CURSOR - Iterable result set
ES|QL Reference
See the Elasticsearch ES|QL documentation for query syntax.
Using Cursors¶
Results from ESQL_QUERY are returned as a cursor, which can be iterated:
-- Declare cursor
DECLARE logs CURSOR FOR ESQL_QUERY('FROM logs-* | LIMIT 100');
-- Iterate results
FOR log IN logs LOOP
DECLARE message STRING = DOCUMENT_GET(log, 'message');
PRINT message;
END LOOP;
Document Operations¶
ES_GET¶
Retrieves a document by ID.
Syntax: ES_GET(index, document_id)
ES_INDEX¶
Indexes (creates or updates) a document.
DECLARE doc DOCUMENT = {
"title": "My Document",
"content": "Hello World",
"timestamp": CURRENT_TIMESTAMP()
};
DECLARE result DOCUMENT = ES_INDEX('my-index', 'doc-id', doc);
PRINT 'Indexed: ' || DOCUMENT_GET(result, '_id');
Syntax: ES_INDEX(index, document_id, document)
ES_DELETE¶
Deletes a document by ID.
Syntax: ES_DELETE(index, document_id)
ES_UPDATE¶
Updates specific fields in a document.
DECLARE updates DOCUMENT = {
"status": "processed",
"processed_at": CURRENT_TIMESTAMP()
};
ES_UPDATE('my-index', 'doc-id', updates);
Syntax: ES_UPDATE(index, document_id, partial_document)
Aggregations with ES|QL¶
ES|QL supports powerful aggregations:
-- Count by category
DECLARE stats CURSOR FOR ESQL_QUERY('
FROM sales
| STATS
total_revenue = SUM(amount),
avg_order = AVG(amount),
order_count = COUNT(*)
BY category
| SORT total_revenue DESC
');
FOR row IN stats LOOP
PRINT DOCUMENT_GET(row, 'category') || ': $' || DOCUMENT_GET(row, 'total_revenue');
END LOOP;
Common Aggregation Functions¶
| Function | Description |
|---|---|
COUNT(*) | Count documents |
SUM(field) | Sum numeric values |
AVG(field) | Calculate average |
MIN(field) | Find minimum value |
MAX(field) | Find maximum value |
PERCENTILE(field, n) | Calculate percentile |
Example: Log Analysis Pipeline¶
CREATE PROCEDURE analyze_errors(time_range STRING)
BEGIN
-- Get error counts by service
DECLARE errors CURSOR FOR ESQL_QUERY('
FROM logs-*
| WHERE level = "ERROR" AND @timestamp > NOW() - ' || time_range || '
| STATS error_count = COUNT(*) BY service.name
| SORT error_count DESC
| LIMIT 20
');
PRINT '=== Error Summary ===';
FOR row IN errors LOOP
DECLARE service STRING = DOCUMENT_GET(row, 'service.name');
DECLARE count NUMBER = DOCUMENT_GET(row, 'error_count');
PRINT service || ': ' || count || ' errors';
END LOOP;
END PROCEDURE;
-- Usage
CALL analyze_errors('1 hour');
Example: Data Enrichment¶
CREATE PROCEDURE enrich_and_index(source_index STRING, target_index STRING)
BEGIN
DECLARE docs CURSOR FOR ESQL_QUERY('FROM ' || source_index || ' | LIMIT 1000');
DECLARE processed NUMBER = 0;
FOR doc IN docs LOOP
-- Enrich document
DECLARE enriched DOCUMENT = DOCUMENT_MERGE(doc, {
"processed_at": CURRENT_TIMESTAMP(),
"processed_by": "elastic-script",
"message_length": LENGTH(DOCUMENT_GET(doc, 'message'))
});
-- Index to new location
ES_INDEX(target_index, NULL, enriched);
SET processed = processed + 1;
END LOOP;
PRINT 'Processed ' || processed || ' documents';
RETURN processed;
END PROCEDURE;
Best Practices¶
Query Performance
- Use specific index patterns instead of wildcards when possible
- Add
LIMITto prevent processing excessive data - Use
WHEREclauses to filter early in the query
Rate Limiting
When processing large datasets, consider adding delays or batch processing to avoid overwhelming the cluster.