Building Connectors¶
Create custom connectors to integrate any data source with Moltler.
Connector Architecture¶
A connector consists of:
- Configuration - Connection settings and credentials
- Entities - Data types exposed (issues, users, metrics)
- Query Handler - Translates queries to API calls
- Actions - Write operations (create, update, delete)
- Sync Engine - Incremental data synchronization
Creating a Custom Connector¶
1. Define Connector Type¶
CREATE CONNECTOR TYPE 'myservice'
DESCRIPTION 'Custom connector for MyService API'
CONFIG_SCHEMA {
"api_url": {"type": "string", "required": true},
"api_key": {"type": "string", "required": true, "secret": true}
};
2. Define Entities¶
CREATE CONNECTOR ENTITY 'myservice.items'
FIELDS {
"id": {"type": "string", "primary_key": true},
"name": {"type": "string"},
"status": {"type": "string"},
"created_at": {"type": "timestamp"},
"metadata": {"type": "document"}
}
QUERY_HANDLER '''
async function query(config, filters, options) {
const response = await fetch(config.api_url + '/items', {
headers: {'Authorization': 'Bearer ' + config.api_key}
});
return await response.json();
}
''';
3. Implement Query Handler¶
// Query handler receives:
// - config: connector configuration
// - filters: WHERE clause conditions
// - options: LIMIT, ORDER BY, etc.
async function query(config, filters, options) {
const params = new URLSearchParams();
// Apply filters
if (filters.status) {
params.append('status', filters.status);
}
// Apply pagination
if (options.limit) {
params.append('limit', options.limit);
}
const url = `${config.api_url}/items?${params}`;
const response = await fetch(url, {
headers: {
'Authorization': `Bearer ${config.api_key}`,
'Content-Type': 'application/json'
}
});
if (!response.ok) {
throw new Error(`API error: ${response.status}`);
}
const data = await response.json();
// Transform to expected format
return data.items.map(item => ({
id: item.id,
name: item.name,
status: item.status,
created_at: new Date(item.created_at),
metadata: item.extra || {}
}));
}
4. Implement Actions¶
CREATE CONNECTOR ACTION 'myservice.items.create'
PARAMETERS {
"name": {"type": "string", "required": true},
"status": {"type": "string", "default": "active"}
}
HANDLER '''
async function execute(config, params) {
const response = await fetch(config.api_url + '/items', {
method: 'POST',
headers: {
'Authorization': 'Bearer ' + config.api_key,
'Content-Type': 'application/json'
},
body: JSON.stringify(params)
});
return await response.json();
}
''';
5. Register Connector¶
Using Your Connector¶
Create Instance¶
CREATE CONNECTOR my_service_prod
TYPE 'myservice'
CONFIG {
"api_url": "https://api.myservice.com",
"api_key": "{{secrets.myservice_key}}"
};
Query¶
Execute Actions¶
Advanced Features¶
Pagination¶
async function query(config, filters, options) {
let allResults = [];
let cursor = null;
do {
const params = new URLSearchParams();
if (cursor) params.append('cursor', cursor);
params.append('limit', 100);
const response = await fetch(
`${config.api_url}/items?${params}`,
{headers: {'Authorization': `Bearer ${config.api_key}`}}
);
const data = await response.json();
allResults = allResults.concat(data.items);
cursor = data.next_cursor;
if (options.limit && allResults.length >= options.limit) {
break;
}
} while (cursor);
return allResults.slice(0, options.limit);
}
Rate Limiting¶
const rateLimiter = {
lastCall: 0,
minInterval: 100, // 10 requests per second
async wait() {
const now = Date.now();
const elapsed = now - this.lastCall;
if (elapsed < this.minInterval) {
await new Promise(r => setTimeout(r, this.minInterval - elapsed));
}
this.lastCall = Date.now();
}
};
async function query(config, filters, options) {
await rateLimiter.wait();
// ... make API call
}
Caching¶
const cache = new Map();
const CACHE_TTL = 60000; // 1 minute
async function query(config, filters, options) {
const cacheKey = JSON.stringify({config, filters, options});
const cached = cache.get(cacheKey);
if (cached && Date.now() - cached.time < CACHE_TTL) {
return cached.data;
}
const data = await fetchFromAPI(config, filters, options);
cache.set(cacheKey, {data, time: Date.now()});
return data;
}
Error Handling¶
async function query(config, filters, options) {
try {
const response = await fetch(url, {headers});
if (response.status === 401) {
throw new ConnectorAuthError('Invalid API key');
}
if (response.status === 429) {
const retryAfter = response.headers.get('Retry-After') || 60;
throw new ConnectorRateLimitError(
`Rate limited. Retry after ${retryAfter}s`
);
}
if (!response.ok) {
throw new ConnectorError(
`API error: ${response.status} ${response.statusText}`
);
}
return await response.json();
} catch (error) {
if (error.name === 'FetchError') {
throw new ConnectorConnectionError(
`Cannot connect to ${config.api_url}`
);
}
throw error;
}
}
Testing Connectors¶
Unit Tests¶
TEST CONNECTOR TYPE 'myservice'
NAME 'query returns items'
MOCK_API {
"GET /items": {
"status": 200,
"body": {"items": [{"id": "1", "name": "Test"}]}
}
}
BEGIN
CREATE CONNECTOR test_conn TYPE 'myservice'
CONFIG {"api_url": "mock://", "api_key": "test"};
DECLARE results = SELECT * FROM test_conn.items;
ASSERT ARRAY_LENGTH(results) = 1;
ASSERT results[0].name = 'Test';
END TEST;
Integration Tests¶
TEST CONNECTOR TYPE 'myservice'
NAME 'integration with real API'
TAGS ['integration', 'slow']
BEGIN
CREATE CONNECTOR test_conn TYPE 'myservice'
CONFIG {
"api_url": ENV('MYSERVICE_TEST_URL'),
"api_key": ENV('MYSERVICE_TEST_KEY')
};
-- Create item
DECLARE created = CONNECTOR_EXEC test_conn.items.create({
"name": "Integration Test " || CURRENT_TIMESTAMP()
});
ASSERT created.id IS NOT NULL;
-- Query item
DECLARE items = SELECT * FROM test_conn.items
WHERE id = created.id;
ASSERT ARRAY_LENGTH(items) = 1;
-- Cleanup
CONNECTOR_EXEC test_conn.items.delete(id => created.id);
END TEST;
Publishing Connectors¶
Package Connector¶
PACKAGE CONNECTOR TYPE 'myservice'
VERSION '1.0.0'
DESCRIPTION 'Connector for MyService API'
AUTHOR 'Your Name'
LICENSE 'MIT';
Publish¶
Best Practices¶
- Use secrets for credentials - Never hardcode API keys
- Implement rate limiting - Respect API limits
- Cache when appropriate - Reduce API calls
- Handle errors gracefully - Provide useful error messages
- Document thoroughly - Include examples and field descriptions
- Test extensively - Cover edge cases and error conditions
- Version your connector - Use semantic versioning