Plugin Cookbook
Common real-world patterns and recipes for Atlas plugins — caching, error handling, hooks, credentials, and advanced techniques.
Recipes for real-world plugin scenarios. The Authoring Guide covers how to build a plugin; this page covers how to handle the messy parts. For an overview of all official plugins, see the Plugin Directory.
Caching Strategies
In-Memory Cache with TTL
Context plugins call load() on every agent invocation. Hitting an external API or database each time is wasteful. Cache the result with a TTL:
import { definePlugin } from "@useatlas/plugin-sdk";
export default definePlugin({
id: "cached-context",
types: ["context"],
version: "1.0.0",
contextProvider: {
_cache: null as string | null,
_cacheExpiry: 0,
_ttlMs: 5 * 60 * 1000, // 5 minutes
async load() {
if (this._cache && Date.now() < this._cacheExpiry) {
return this._cache;
}
// Fetch from your source — API, database, filesystem, etc.
const response = await fetch("https://internal.example.com/glossary");
const terms = (await response.json()) as { term: string; definition: string }[];
const markdown = terms.map((t) => `- **${t.term}**: ${t.definition}`).join("\n");
this._cache = `## Company Glossary\n\n${markdown}`;
this._cacheExpiry = Date.now() + this._ttlMs;
return this._cache;
},
async refresh() {
this._cache = null;
this._cacheExpiry = 0;
},
},
});Key points:
refresh()is called when the semantic layer reloads or via the admin UI. It clears the cache so the nextload()fetches fresh data.- Keep the TTL short enough to pick up changes, long enough to avoid hammering your source. 5 minutes is a good default.
- For size-bounded caches (e.g. many keys), use a
Mapwith LRU eviction instead of a single string.
LRU Cache for Multiple Keys
When a context plugin serves different content based on runtime conditions (e.g. per-datasource context), use a bounded map:
import { definePlugin } from "@useatlas/plugin-sdk";
const MAX_CACHE_ENTRIES = 50;
function lruSet(cache: Map<string, string>, key: string, value: string) {
if (cache.size >= MAX_CACHE_ENTRIES) {
// Map iteration order is insertion order — delete the oldest
const oldest = cache.keys().next().value!;
cache.delete(oldest);
}
cache.set(key, value);
}
export default definePlugin({
id: "multi-source-context",
types: ["context"],
version: "1.0.0",
contextProvider: {
_cache: new Map<string, string>(),
async load() {
// In practice, the key might come from a config value or environment
const key = "default";
const cached = this._cache.get(key);
if (cached) return cached;
const result = await fetchContextForSource(key);
lruSet(this._cache, key, result);
return result;
},
async refresh() {
this._cache.clear();
},
},
});
async function fetchContextForSource(source: string): Promise<string> {
// Your fetching logic here
return `Context for ${source}`;
}Error Handling
Fatal vs Degraded: initialize() vs healthCheck()
The plugin lifecycle has two distinct error surfaces. Getting them right is the difference between a plugin that blocks startup when misconfigured and one that gracefully degrades when an external service is down.
Throw from initialize() when the plugin cannot possibly work — missing credentials, invalid config, unreachable required service. This blocks server startup, which is what you want: fail fast before serving traffic.
import { definePlugin } from "@useatlas/plugin-sdk";
export default definePlugin({
id: "strict-context",
types: ["context"],
version: "1.0.0",
contextProvider: {
async load() { return ""; },
},
async initialize(ctx) {
// Fatal: the API key is structurally required
const apiKey = ctx.config["MY_API_KEY"] as string | undefined;
if (!apiKey) {
throw new Error("MY_API_KEY is required — set it in atlas.config.ts");
}
// Fatal: verify the service is reachable at boot
const response = await fetch("https://api.example.com/health", {
headers: { Authorization: `Bearer ${apiKey}` },
signal: AbortSignal.timeout(10_000),
});
if (!response.ok) {
throw new Error(`API health check failed: ${response.status}`);
}
ctx.logger.info("Plugin initialized, API reachable");
},
});Return { healthy: false } from healthCheck() for runtime degradation — the plugin initialized fine but the external service went down, latency spiked, or a transient error occurred. Atlas keeps running; the health endpoint reports the degradation.
import type { PluginHealthResult } from "@useatlas/plugin-sdk";
async healthCheck(): Promise<PluginHealthResult> {
const start = performance.now();
try {
const response = await fetch("https://api.example.com/health", {
signal: AbortSignal.timeout(5_000),
});
const latencyMs = Math.round(performance.now() - start);
if (!response.ok) {
return { healthy: false, message: `API returned ${response.status}`, latencyMs };
}
return { healthy: true, latencyMs };
} catch (err) {
return {
healthy: false,
message: err instanceof Error ? err.message : String(err),
latencyMs: Math.round(performance.now() - start),
};
}
}Never throw from healthCheck() or teardown(). These methods must always return a result. Throwing from health checks crashes the health probe; throwing from teardown prevents other plugins from cleaning up.
Retry with Backoff
For datasource plugins that connect to flaky services, add retry logic to the connection factory:
import type { PluginDBConnection, PluginQueryResult } from "@useatlas/plugin-sdk";
function withRetry(
conn: PluginDBConnection,
maxRetries = 3,
baseDelayMs = 500,
): PluginDBConnection {
return {
async query(sql: string, timeoutMs?: number): Promise<PluginQueryResult> {
let lastError: unknown;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await conn.query(sql, timeoutMs);
} catch (err) {
lastError = err;
if (attempt < maxRetries) {
const delay = baseDelayMs * 2 ** attempt;
await new Promise((resolve) => setTimeout(resolve, delay));
}
}
}
throw lastError;
},
close: () => conn.close(),
};
}Use it in your connection factory:
connection: {
create: () => withRetry(createMyConnection(config), 3, 500),
dbType: "postgres",
},Credential Management
Config-Driven Credentials
Always pass credentials through plugin config, not by reading process.env inside the plugin. This makes dependencies explicit and testable:
// atlas.config.ts — credentials are explicit, visible in one place
import { defineConfig } from "@atlas/api/lib/config";
import { myPlugin } from "./plugins/my-plugin";
export default defineConfig({
plugins: [
myPlugin({
apiKey: process.env.MY_API_KEY!,
apiSecret: process.env.MY_API_SECRET!,
}),
],
});Never commit credentials to version control. Use environment variables in atlas.config.ts and add .env to .gitignore.
Multi-Credential Plugins (OAuth Refresh)
Some plugins need to manage rotating credentials — e.g. OAuth tokens with refresh flows. Keep the token state internal and refresh transparently:
import { z } from "zod";
import { createPlugin } from "@useatlas/plugin-sdk";
import type { AtlasContextPlugin, PluginHealthResult } from "@useatlas/plugin-sdk";
const configSchema = z.object({
clientId: z.string().min(1),
clientSecret: z.string().min(1),
refreshToken: z.string().min(1),
tokenUrl: z.string().url(),
});
type OAuthConfig = z.infer<typeof configSchema>;
interface TokenState {
accessToken: string;
expiresAt: number;
}
async function refreshAccessToken(config: OAuthConfig): Promise<TokenState> {
const response = await fetch(config.tokenUrl, {
method: "POST",
headers: { "Content-Type": "application/x-www-form-urlencoded" },
body: new URLSearchParams({
grant_type: "refresh_token",
client_id: config.clientId,
client_secret: config.clientSecret,
refresh_token: config.refreshToken,
}),
});
if (!response.ok) {
throw new Error(`Token refresh failed: ${response.status}`);
}
const data = (await response.json()) as { access_token: string; expires_in: number };
return {
accessToken: data.access_token,
expiresAt: Date.now() + data.expires_in * 1000 - 60_000, // 1 min buffer
};
}
export const oauthContextPlugin = createPlugin<OAuthConfig, AtlasContextPlugin<OAuthConfig>>({
configSchema,
create(config) {
let tokenState: TokenState | null = null;
async function getToken(): Promise<string> {
if (!tokenState || Date.now() >= tokenState.expiresAt) {
tokenState = await refreshAccessToken(config);
}
return tokenState.accessToken;
}
return {
id: "oauth-context",
types: ["context"] as const,
version: "1.0.0",
config,
contextProvider: {
async load() {
const token = await getToken();
const response = await fetch("https://api.example.com/context", {
headers: { Authorization: `Bearer ${token}` },
});
return response.text();
},
async refresh() {
// Force token refresh on next load
tokenState = null;
},
},
async initialize(ctx) {
// Verify credentials work at boot
try {
tokenState = await refreshAccessToken(config);
ctx.logger.info("OAuth credentials verified");
} catch (err) {
throw new Error(
`OAuth initialization failed: ${err instanceof Error ? err.message : err}`,
);
}
},
async healthCheck(): Promise<PluginHealthResult> {
try {
await getToken();
return { healthy: true };
} catch (err) {
return {
healthy: false,
message: err instanceof Error ? err.message : String(err),
};
}
},
};
},
});Credential Rotation Without Restart
For credentials that rotate externally (e.g. vault-managed secrets), read the current value on each use rather than capturing it at startup:
import { definePlugin } from "@useatlas/plugin-sdk";
export default definePlugin({
id: "vault-context",
types: ["context"],
version: "1.0.0",
contextProvider: {
async load() {
// Read the current secret value each time — supports external rotation
const apiKey = process.env.VAULT_MANAGED_API_KEY;
if (!apiKey) return "";
const response = await fetch("https://api.example.com/data", {
headers: { Authorization: `Bearer ${apiKey}` },
});
return response.text();
},
},
async initialize(ctx) {
// Verify the env var exists at boot, but don't capture the value
if (!process.env.VAULT_MANAGED_API_KEY) {
throw new Error("VAULT_MANAGED_API_KEY must be set");
}
ctx.logger.info("Vault-managed credential detected");
},
});This is the one exception to the "config-driven credentials" rule. When an external secret manager rotates the value, reading process.env at call time ensures you always use the latest credential. Document this pattern clearly so operators know the env var must be set.
Hook Recipes
Hooks intercept agent lifecycle events. Define them on any plugin type via the hooks property. Each hook entry has an optional matcher (return true to run) and a handler.
Audit Logging via afterQuery
Log every query with its results and duration:
import { definePlugin } from "@useatlas/plugin-sdk";
import type { AtlasPluginContext } from "@useatlas/plugin-sdk";
export default definePlugin({
id: "audit-logger",
types: ["context"],
version: "1.0.0",
contextProvider: {
async load() { return ""; },
},
hooks: {
afterQuery: [{
handler: (ctx) => {
// ctx: { sql, connectionId?, result, durationMs }
console.log(JSON.stringify({
event: "query_executed",
sql: ctx.sql,
connectionId: ctx.connectionId,
rowCount: ctx.result.rows.length,
durationMs: ctx.durationMs,
timestamp: new Date().toISOString(),
}));
},
}],
},
});For persistent audit logging, write to the internal database:
import { definePlugin } from "@useatlas/plugin-sdk";
import type { AtlasPluginContext } from "@useatlas/plugin-sdk";
let db: AtlasPluginContext["db"] = null;
export default definePlugin({
id: "db-audit-logger",
types: ["context"],
version: "1.0.0",
contextProvider: {
async load() { return ""; },
},
schema: {
plugin_query_audit: {
fields: {
sql: { type: "string", required: true },
connection_id: { type: "string" },
row_count: { type: "number", required: true },
duration_ms: { type: "number", required: true },
executed_at: { type: "date", required: true },
},
},
},
async initialize(ctx) {
db = ctx.db;
if (!db) {
ctx.logger.warn("No internal DB — audit logs will be skipped");
}
},
hooks: {
afterQuery: [{
handler: async (ctx) => {
if (!db) return;
await db.execute(
`INSERT INTO plugin_query_audit (sql, connection_id, row_count, duration_ms, executed_at)
VALUES ($1, $2, $3, $4, NOW())`,
[ctx.sql, ctx.connectionId ?? null, ctx.result.rows.length, ctx.durationMs],
);
},
}],
},
});Query Rewriting via beforeQuery (Tenant Isolation)
Inject a WHERE clause to scope queries to the current tenant. beforeQuery handlers can return { sql } to rewrite the query or throw to reject it:
import { definePlugin } from "@useatlas/plugin-sdk";
const TENANT_ID = process.env.ATLAS_TENANT_ID;
export default definePlugin({
id: "tenant-filter",
types: ["context"],
version: "1.0.0",
contextProvider: {
async load() { return ""; },
},
async initialize(ctx) {
if (!TENANT_ID) {
throw new Error("ATLAS_TENANT_ID is required for tenant isolation");
}
ctx.logger.info(`Tenant isolation active for tenant: ${TENANT_ID}`);
},
hooks: {
beforeQuery: [{
handler: (ctx) => {
// Simple approach: wrap the original query in a CTE with a filter.
// WARNING: This uses string interpolation for brevity. In production,
// use a SQL AST rewriter or parameterized approach to avoid injection risks.
const wrapped = `WITH _original AS (${ctx.sql}) SELECT * FROM _original WHERE tenant_id = '${TENANT_ID}'`;
return { sql: wrapped };
},
}],
},
});The CTE-wrapping approach shown above is a simplified example. For production tenant isolation, use Atlas's built-in RLS support (ATLAS_RLS_ENABLED, ATLAS_RLS_COLUMN, ATLAS_RLS_CLAIM) which injects WHERE clauses at the validation layer — after all plugin hooks, so plugins cannot strip the filter.
Rate Limiting
Track query counts per time window and reject queries that exceed the limit:
import { definePlugin } from "@useatlas/plugin-sdk";
const WINDOW_MS = 60_000; // 1 minute
const MAX_QUERIES = 100;
const queryLog: number[] = [];
export default definePlugin({
id: "rate-limiter",
types: ["context"],
version: "1.0.0",
contextProvider: {
async load() { return ""; },
},
hooks: {
beforeQuery: [{
handler: () => {
const now = Date.now();
// Remove entries outside the window
while (queryLog.length > 0 && queryLog[0]! < now - WINDOW_MS) {
queryLog.shift();
}
if (queryLog.length >= MAX_QUERIES) {
throw new Error(`Rate limit exceeded: ${MAX_QUERIES} queries per minute`);
}
queryLog.push(now);
},
}],
},
});Data Masking via beforeQuery
Mask sensitive columns by rewriting the SQL to replace them with redacted values at the database level:
import { definePlugin } from "@useatlas/plugin-sdk";
const MASKED_COLUMNS = new Set(["email", "phone", "ssn", "credit_card"]);
export default definePlugin({
id: "data-masker",
types: ["context"],
version: "1.0.0",
contextProvider: {
async load() {
return [
"## Data Masking",
"The following columns are redacted in query results: email, phone, ssn, credit_card.",
"Do not SELECT these columns directly — they will appear as '***REDACTED***'.",
].join("\n");
},
},
hooks: {
beforeQuery: [{
handler: (ctx) => {
// Replace masked column references with redacted literals.
// This is a simplified regex approach — for production use,
// consider a SQL AST rewriter for reliable column detection.
let rewritten = ctx.sql;
for (const col of MASKED_COLUMNS) {
const pattern = new RegExp(`\\b${col}\\b`, "gi");
rewritten = rewritten.replace(pattern, `'***REDACTED***' AS ${col}`);
}
if (rewritten !== ctx.sql) {
return { sql: rewritten };
}
},
}],
},
});This uses beforeQuery (a mutable hook) to rewrite SQL before execution, rather than mutating results after the fact. The regex approach is simplified — for production, use a SQL AST parser to reliably detect column references vs string literals.
Request Observation via onRequest
Log or monitor incoming HTTP requests. Note that onRequest hooks are observation-only — they cannot block or modify requests:
import { definePlugin } from "@useatlas/plugin-sdk";
export default definePlugin({
id: "custom-auth-check",
types: ["context"],
version: "1.0.0",
contextProvider: {
async load() { return ""; },
},
hooks: {
onRequest: [{
matcher: (ctx) => ctx.path.startsWith("/api/"),
handler: (ctx) => {
const apiKey = ctx.headers["x-custom-api-key"];
if (!apiKey) {
// onRequest hooks are observation-only — log the event
console.warn(`Missing X-Custom-API-Key on ${ctx.method} ${ctx.path}`);
}
},
}],
},
});Compliance Gate via beforeToolCall
Block or modify tool calls based on business rules. beforeToolCall fires before every tool execution in the agent loop — it receives the tool name, args, and request context. Return { args } to rewrite, throw to reject, or return void to pass through:
import { definePlugin } from "@useatlas/plugin-sdk";
const RESTRICTED_TABLES = new Set(["salary", "ssn_records", "credit_cards"]);
export default definePlugin({
id: "compliance-gate",
types: ["context"],
version: "1.0.0",
contextProvider: {
async load() { return ""; },
},
hooks: {
beforeToolCall: [{
// Only intercept SQL execution, not explore commands
matcher: (ctx) => ctx.toolName === "executeSQL",
handler: (ctx) => {
const sql = (ctx.args.sql as string) ?? "";
for (const table of RESTRICTED_TABLES) {
if (sql.toLowerCase().includes(table)) {
throw new Error(
`Access to ${table} is restricted by compliance policy`,
);
}
}
},
}],
},
});Cost Tracking via afterToolCall
Observe every tool call to log usage metrics. afterToolCall fires after execution with the result. Return { result } to modify, or return void to observe:
import { definePlugin } from "@useatlas/plugin-sdk";
export default definePlugin({
id: "tool-usage-tracker",
types: ["context"],
version: "1.0.0",
contextProvider: {
async load() { return ""; },
},
hooks: {
afterToolCall: [{
handler: (ctx) => {
console.log(JSON.stringify({
event: "tool_call",
tool: ctx.toolName,
userId: ctx.context.userId,
conversationId: ctx.context.conversationId,
stepCount: ctx.context.toolCallCount,
timestamp: new Date().toISOString(),
}));
},
}],
},
});Tool Call Rate Limiting via beforeToolCall
Limit the number of tool calls per agent run to control costs:
import { definePlugin } from "@useatlas/plugin-sdk";
const MAX_TOOL_CALLS = 15;
export default definePlugin({
id: "tool-rate-limiter",
types: ["context"],
version: "1.0.0",
contextProvider: {
async load() { return ""; },
},
hooks: {
beforeToolCall: [{
handler: (ctx) => {
if (ctx.context.toolCallCount > MAX_TOOL_CALLS) {
throw new Error(
`Tool call limit exceeded (${MAX_TOOL_CALLS}). ` +
"Please refine your question to require fewer queries.",
);
}
},
}],
},
});Custom Query Validation
Replacing the SQL Validation Pipeline
Non-SQL datasources (Salesforce SOQL, GraphQL, MongoDB MQL) need their own validation instead of the standard SQL validation pipeline. Use connection.validate to replace it entirely:
import { createPlugin } from "@useatlas/plugin-sdk";
import type { QueryValidationResult } from "@useatlas/plugin-sdk";
import { z } from "zod";
const SOQL_FORBIDDEN = [/\b(DELETE|INSERT|UPDATE|UPSERT|UNDELETE|MERGE)\b/i];
function validateSOQL(query: string): QueryValidationResult {
const trimmed = query.trim();
if (!trimmed) return { valid: false, reason: "Empty query" };
// Must start with SELECT
if (!/^\s*SELECT\b/i.test(trimmed)) {
return { valid: false, reason: "Only SELECT queries are allowed in SOQL" };
}
// Check for forbidden DML keywords
for (const pattern of SOQL_FORBIDDEN) {
if (pattern.test(trimmed)) {
return { valid: false, reason: `Forbidden operation: ${pattern.source}` };
}
}
return { valid: true };
}
export const soqlPlugin = createPlugin({
configSchema: z.object({
instanceUrl: z.string().url(),
accessToken: z.string().min(1),
}),
create: (config) => ({
id: "soql-datasource",
types: ["datasource"] as const,
version: "1.0.0",
config,
connection: {
create: () => createSOQLConnection(config),
dbType: "salesforce",
validate: validateSOQL,
},
dialect: [
"This datasource uses SOQL (Salesforce Object Query Language).",
"- Use relationship queries instead of JOINs.",
"- No SELECT * — always list specific fields.",
].join("\n"),
}),
});
function createSOQLConnection(config: { instanceUrl: string; accessToken: string }) {
// Your connection factory here
throw new Error("Not implemented — replace with your driver");
}Key points:
validatecompletely replaces the standard 4-layer SQL validation (empty check, regex guard, AST parse, table whitelist). It is your responsibility to enforce safety.reasonis user-facing — it appears in error responses shown to the agent and in audit logs.- Auto-LIMIT is skipped for custom-validated connections since non-SQL languages may not support
LIMIT. - RLS injection is skipped for custom-validated connections since the SQL rewriter can't parse non-SQL queries.
- Hooks still fire —
beforeQuerycan rewrite the query and the rewritten query is re-validated through your custom validator. - When a custom
validatefunction is provided,parserDialectandforbiddenPatternsare ignored.
Async Validation
Validators can be asynchronous — useful when validation requires an external call (e.g. checking a schema service or permission system):
connection: {
create: () => myConn,
dbType: "custom-api",
validate: async (query) => {
const response = await fetch("https://schema.internal/validate", {
method: "POST",
body: JSON.stringify({ query }),
headers: { "Content-Type": "application/json" },
signal: AbortSignal.timeout(5000),
});
if (!response.ok) {
return { valid: false, reason: "Schema service unavailable" };
}
const result = await response.json() as { allowed: boolean; message?: string };
return result.allowed
? { valid: true }
: { valid: false, reason: result.message ?? "Query rejected by schema service" };
},
},Async validators add latency to every query. Prefer synchronous validation when possible. If you must call an external service, add a timeout and consider caching the schema locally.
Complete Plugin with SOQL Length-Limit Validator
A full datasource plugin that enforces Salesforce SOQL limits: query length cap, SELECT-only, and forbidden DML keywords:
import { z } from "zod";
import { createPlugin } from "@useatlas/plugin-sdk";
import type {
AtlasDatasourcePlugin,
PluginDBConnection,
PluginQueryResult,
QueryValidationResult,
} from "@useatlas/plugin-sdk";
const SOQL_MAX_LENGTH = 20_000; // Salesforce SOQL character limit
const SOQL_FORBIDDEN = [/\b(DELETE|INSERT|UPDATE|UPSERT|UNDELETE|MERGE)\b/i];
function validateSOQL(query: string): QueryValidationResult {
const trimmed = query.trim();
if (!trimmed) return { valid: false, reason: "Empty query" };
if (trimmed.length > SOQL_MAX_LENGTH) {
return {
valid: false,
reason: `SOQL query exceeds ${SOQL_MAX_LENGTH} character limit (${trimmed.length} chars)`,
};
}
if (!/^\s*SELECT\b/i.test(trimmed)) {
return { valid: false, reason: "Only SELECT queries are allowed in SOQL" };
}
for (const pattern of SOQL_FORBIDDEN) {
if (pattern.test(trimmed)) {
return { valid: false, reason: `Forbidden SOQL operation: ${pattern.source}` };
}
}
return { valid: true };
}
const configSchema = z.object({
instanceUrl: z.string().url(),
accessToken: z.string().min(1),
});
type SalesforceConfig = z.infer<typeof configSchema>;
function createSOQLConnection(config: SalesforceConfig): PluginDBConnection {
return {
async query(soql: string, timeoutMs?: number): Promise<PluginQueryResult> {
const response = await fetch(
`${config.instanceUrl}/services/data/v59.0/query?q=${encodeURIComponent(soql)}`,
{
headers: { Authorization: `Bearer ${config.accessToken}` },
signal: timeoutMs ? AbortSignal.timeout(timeoutMs) : undefined,
},
);
if (!response.ok) throw new Error(`SOQL query failed: ${response.status}`);
const data = (await response.json()) as { records: Record<string, unknown>[] };
const rows = data.records;
const columns = rows.length > 0 ? Object.keys(rows[0]!).filter((k) => k !== "attributes") : [];
return { columns, rows };
},
async close() {},
};
}
export const salesforcePlugin = createPlugin<
SalesforceConfig,
AtlasDatasourcePlugin<SalesforceConfig>
>({
configSchema,
create: (config) => ({
id: "salesforce-soql",
types: ["datasource"] as const,
version: "1.0.0",
config,
connection: {
create: () => createSOQLConnection(config),
dbType: "salesforce",
validate: validateSOQL,
},
dialect: [
"This datasource uses SOQL (Salesforce Object Query Language).",
"- Use relationship queries instead of JOINs (e.g. Account.Name).",
"- No SELECT * — always list specific fields.",
"- Maximum query length: 20,000 characters.",
].join("\n"),
}),
});Register in atlas.config.ts:
plugins: [
salesforcePlugin({
instanceUrl: process.env.SF_INSTANCE_URL!,
accessToken: process.env.SF_ACCESS_TOKEN!,
}),
],Key points:
validatecompletely replaces the standard 4-layer SQL validation — it is your responsibility to enforce safetyreasonis user-facing — it appears in error responses shown to the agent and in audit logs- Auto-LIMIT and RLS are skipped for custom-validated connections since non-SQL languages may not support them
- Hooks still fire — queries rewritten by
beforeQueryhooks are re-validated throughvalidateSOQL
Advanced Patterns
Registering Custom Tools
Plugins can add tools to the agent via ctx.tools.register() in initialize(). The tool becomes available to the agent alongside the built-in explore and executeSQL tools:
import { z } from "zod";
import { definePlugin } from "@useatlas/plugin-sdk";
import { tool } from "@useatlas/plugin-sdk/ai";
export default definePlugin({
id: "inventory-lookup",
types: ["context"],
version: "1.0.0",
contextProvider: {
async load() {
return "## Inventory Tool\nUse `lookupInventory` to check stock levels by SKU.";
},
},
async initialize(ctx) {
ctx.tools.register({
name: "lookupInventory",
description: "Check current inventory levels for a product SKU",
tool: tool({
description: "Look up current inventory by SKU",
inputSchema: z.object({
sku: z.string().describe("Product SKU code"),
}),
execute: async ({ sku }) => {
const response = await fetch(
`https://inventory.internal/api/stock/${encodeURIComponent(sku)}`,
);
if (!response.ok) return { error: `SKU not found: ${sku}` };
return response.json();
},
}),
});
ctx.logger.info("Inventory lookup tool registered");
},
});The context plugin's load() returns prompt guidance so the agent knows when and how to use the tool. The tool itself is registered via the tool registry during initialization.
Using the Internal Database
Plugins can read and write to the Atlas internal database (DATABASE_URL) via ctx.db. This is useful for persistent state — settings, caches, plugin-specific data:
import { definePlugin } from "@useatlas/plugin-sdk";
import type { AtlasPluginContext, PluginHealthResult } from "@useatlas/plugin-sdk";
let db: AtlasPluginContext["db"] = null;
export default definePlugin({
id: "query-cache",
types: ["context"],
version: "1.0.0",
schema: {
plugin_query_cache: {
fields: {
query_hash: { type: "string", required: true, unique: true },
result_json: { type: "string", required: true },
cached_at: { type: "date", required: true },
},
},
},
contextProvider: {
async load() { return ""; },
},
async initialize(ctx) {
db = ctx.db;
if (!db) {
ctx.logger.warn("No internal DB — query cache disabled");
return;
}
ctx.logger.info("Query cache plugin initialized");
},
hooks: {
beforeQuery: [{
handler: async (ctx) => {
if (!db) return;
// Simple hash — Bun-specific. For portability, use:
// crypto.createHash("sha256").update(ctx.sql).digest("hex")
const hash = Bun.hash(ctx.sql).toString(36);
const result = await db.query(
"SELECT result_json FROM plugin_query_cache WHERE query_hash = $1",
[hash],
);
if (result.rows.length > 0) {
// Cache hit — could be used for analytics, but can't short-circuit the query
// (beforeQuery can only rewrite or reject, not return cached results)
}
},
}],
afterQuery: [{
handler: async (ctx) => {
if (!db) return;
const hash = Bun.hash(ctx.sql).toString(36); // Bun-specific (see note above)
await db.execute(
`INSERT INTO plugin_query_cache (query_hash, result_json, cached_at)
VALUES ($1, $2, NOW())
ON CONFLICT (query_hash) DO UPDATE SET result_json = $2, cached_at = NOW()`,
[hash, JSON.stringify(ctx.result)],
);
},
}],
},
});ctx.db is null when DATABASE_URL is not set. Always check for null before using it — Atlas works without an internal database.
Dynamic Entity Factories
Datasource plugins can discover entities at boot time instead of hardcoding them. Use an async factory function for entities:
import { z } from "zod";
import { createPlugin } from "@useatlas/plugin-sdk";
import type { AtlasDatasourcePlugin, PluginEntity } from "@useatlas/plugin-sdk";
const configSchema = z.object({
url: z.string().url(),
schema: z.string().default("public"),
});
type DynamicConfig = z.infer<typeof configSchema>;
export const dynamicPlugin = createPlugin<DynamicConfig, AtlasDatasourcePlugin<DynamicConfig>>({
configSchema,
create(config) {
return {
id: "dynamic-datasource",
types: ["datasource"] as const,
version: "1.0.0",
config,
connection: {
create: () => createConnectionFromUrl(config.url),
dbType: "postgres",
},
// Async factory — called once at boot, entities merged into the whitelist
entities: async (): Promise<PluginEntity[]> => {
const conn = createConnectionFromUrl(config.url);
try {
// Escape the schema name to prevent injection.
// PluginDBConnection.query() doesn't support parameterized queries,
// so validate the input or use your driver's parameterized API directly.
const safeSchema = config.schema.replace(/'/g, "''");
const result = await conn.query(
`SELECT table_name, obj_description((quote_ident(table_schema) || '.' || quote_ident(table_name))::regclass)
AS description
FROM information_schema.tables
WHERE table_schema = '${safeSchema}' AND table_type = 'BASE TABLE'`,
);
return result.rows.map((row) => ({
name: row.table_name as string,
yaml: [
`table: ${row.table_name}`,
`description: ${(row.description as string) || "Auto-discovered table"}`,
"dimensions: {}",
].join("\n"),
}));
} finally {
await conn.close();
}
},
};
},
});
function createConnectionFromUrl(url: string) {
// Your connection factory here
throw new Error("Not implemented — replace with your driver");
}Multi-Type Plugins
A single plugin can implement multiple types. For example, a plugin that provides both a datasource connection and context guidance:
import { z } from "zod";
import { createPlugin } from "@useatlas/plugin-sdk";
import type {
AtlasDatasourcePlugin,
AtlasContextPlugin,
PluginHealthResult,
} from "@useatlas/plugin-sdk";
const configSchema = z.object({
url: z.string().url(),
dialect: z.string().default("Use APPROX_COUNT_DISTINCT for cardinality estimates."),
});
type Config = z.infer<typeof configSchema>;
// Intersection type for multi-type plugins
type DatasourceAndContext = AtlasDatasourcePlugin<Config> & AtlasContextPlugin<Config>;
export const multiPlugin = createPlugin<Config, DatasourceAndContext>({
configSchema,
create(config) {
return {
id: "multi-type-example",
types: ["datasource", "context"] as const,
version: "1.0.0",
config,
// Datasource facet
connection: {
create: () => createMyConnection(config.url),
dbType: "postgres",
},
dialect: config.dialect,
// Context facet
contextProvider: {
async load() {
return `## Dialect Notes\n\n${config.dialect}`;
},
},
async healthCheck(): Promise<PluginHealthResult> {
try {
const conn = createMyConnection(config.url);
await conn.query("SELECT 1", 5000);
await conn.close();
return { healthy: true };
} catch (err) {
return {
healthy: false,
message: err instanceof Error ? err.message : String(err),
};
}
},
};
},
});
function createMyConnection(url: string) {
throw new Error("Not implemented — replace with your driver");
}Multi-Tenant Plugins
Serve different configurations per tenant by keying on a runtime identifier (e.g. an environment variable or request header):
import { z } from "zod";
import { createPlugin } from "@useatlas/plugin-sdk";
import type { AtlasDatasourcePlugin, PluginDBConnection } from "@useatlas/plugin-sdk";
const configSchema = z.object({
tenants: z.record(
z.string(), // tenant ID
z.object({
url: z.string().url(),
schema: z.string().default("public"),
}),
),
defaultTenant: z.string(),
});
type MultiTenantConfig = z.infer<typeof configSchema>;
export const multiTenantPlugin = createPlugin<
MultiTenantConfig,
AtlasDatasourcePlugin<MultiTenantConfig>
>({
configSchema,
create(config) {
const connectionPool = new Map<string, PluginDBConnection>();
function getConnectionForTenant(tenantId: string): PluginDBConnection {
const tenantConfig = config.tenants[tenantId];
if (!tenantConfig) {
throw new Error(`Unknown tenant: ${tenantId}`);
}
let conn = connectionPool.get(tenantId);
if (!conn) {
conn = createTenantConnection(tenantConfig.url, tenantConfig.schema);
connectionPool.set(tenantId, conn);
}
return conn;
}
return {
id: "multi-tenant-datasource",
types: ["datasource"] as const,
version: "1.0.0",
config,
connection: {
create: () => getConnectionForTenant(config.defaultTenant),
dbType: "postgres",
},
async initialize(ctx) {
const tenantIds = Object.keys(config.tenants);
ctx.logger.info(
`Multi-tenant plugin initialized with ${tenantIds.length} tenant(s): ${tenantIds.join(", ")}`,
);
},
async teardown() {
const closes = [...connectionPool.values()].map((c) => c.close());
await Promise.all(closes);
connectionPool.clear();
},
};
},
});
function createTenantConnection(url: string, schema: string): PluginDBConnection {
throw new Error("Not implemented — replace with your driver");
}Register with per-tenant connection strings:
// atlas.config.ts
plugins: [
multiTenantPlugin({
defaultTenant: "acme",
tenants: {
acme: { url: process.env.ACME_DB_URL!, schema: "acme" },
globex: { url: process.env.GLOBEX_DB_URL!, schema: "globex" },
},
}),
],