Type: Permanent (Long-Lived)
CoordinatorAgent aggregates and manages results from multiple worker agents. It maintains long-term durable state and provides query APIs for retrieving and analyzing results.
CoordinatorAgent is a permanent agent that:
submitResult(workerId: string, result: TaskResult<TResult>): Promise<void>Submit a result from a worker agent.
Parameters:
workerId - Unique identifier of the worker submitting the resultresult - TaskResult object containing:
taskId - The task identifiersuccess - Whether the task succeededresult - The actual result data (if successful)error - Error message (if failed)duration - Execution time in millisecondstimestamp - Completion timestampExample:
const result: TaskResult<ProcessedData> = {
taskId: 'task-123',
success: true,
result: { recordsProcessed: 1000, outputUrl: 'https://...' },
duration: 2500,
timestamp: Date.now()
};
await coordinator.submitResult('worker-456', result);
getResults(filter?: ResultFilter): Promise<ResultEntry<TResult>[]>Retrieve results with optional filtering.
Parameters:
filter (optional):
workerId - Filter by specific workersuccess - Filter by success/failuresince - Only results after this timestamplimit - Maximum number of results to returnReturns: Array of ResultEntry objects, each containing:
workerId - ID of the worker that submitted itresult - The TaskResultreceivedAt - When the coordinator received itExample:
// Get last 10 results
const recent = await coordinator.getResults({ limit: 10 });
// Get all successful results from worker-123
const workerResults = await coordinator.getResults({
workerId: 'worker-123',
success: true
});
// Get results from last hour
const hourAgo = Date.now() - (60 * 60 * 1000);
const recentResults = await coordinator.getResults({ since: hourAgo });
getSuccessfulResults(limit?: number): Promise<TResult[]>Get only successful results (unwrapped data, not metadata).
Example:
const topResults = await coordinator.getSuccessfulResults(5);
topResults.forEach(result => {
console.log(result.recordsProcessed, result.outputUrl);
});
getFailedResults(limit?: number): Promise<ResultEntry<TResult>[]>Get only failed results with full metadata.
Example:
const failures = await coordinator.getFailedResults();
failures.forEach(entry => {
console.log(`Worker ${entry.workerId} failed: ${entry.result.error}`);
});
getSummary(): Promise<Summary>Get aggregate statistics about all results.
Returns:
totalResults - Total number of results receivedsuccessCount - Number of successful resultsfailureCount - Number of failed resultsaverageDuration - Average task execution time (ms)oldestResult - Timestamp of oldest resultnewestResult - Timestamp of newest resultExample:
const summary = await coordinator.getSummary();
console.log(`
Total: ${summary.totalResults}
Success Rate: ${(summary.successCount / summary.totalResults * 100).toFixed(2)}%
Avg Duration: ${summary.averageDuration}ms
`);
getResultsByWorker(workerId: string): Promise<ResultEntry<TResult>[]>Get all results from a specific worker.
Example:
const workerResults = await coordinator.getResultsByWorker('worker-789');
console.log(`Worker 789 completed ${workerResults.length} tasks`);
getRecentResults(sinceMs: number): Promise<ResultEntry<TResult>[]>Get results from the last N milliseconds.
Example:
// Results from last 5 minutes
const recentResults = await coordinator.getRecentResults(5 * 60 * 1000);
getCount(): Promise<number>Get total number of results stored.
Example:
const count = await coordinator.getCount();
console.log(`Coordinator has ${count} results`);
getSuccessRate(): Promise<number>Get success rate as a percentage (0-100).
Example:
const successRate = await coordinator.getSuccessRate();
console.log(`Success rate: ${successRate.toFixed(2)}%`);
clear(): Promise<void>Clear all results and reset statistics.
Example:
await coordinator.clear();
console.log('All results cleared');
import { CoordinatorAgent, type TaskResult } from 'common-agents';
interface DataProcessingResult {
sourceUrl: string;
recordsProcessed: number;
outputUrl: string;
transformationsApplied: string[];
}
class DataCoordinator extends CoordinatorAgent<
Env,
CoordinatorAgentState,
DataProcessingResult
> {
// Optional: Add custom processing when results are submitted
override async submitResult(
workerId: string,
result: TaskResult<DataProcessingResult>
): Promise<void> {
// Call parent implementation
await super.submitResult(workerId, result);
// Custom logic: Send notification for failures
if (!result.success) {
await this.notifyFailure(workerId, result);
}
// Custom logic: Track processed records
if (result.success && result.result) {
await this.updateRecordStats(result.result.recordsProcessed);
}
}
// Custom method: Get total records processed
@callable()
async getTotalRecordsProcessed(): Promise<number> {
const results = await this.getSuccessfulResults();
return results.reduce((sum, r) => sum + r.recordsProcessed, 0);
}
// Custom method: Get results by transformation type
@callable()
async getResultsByTransformation(transformation: string): Promise<DataProcessingResult[]> {
const allResults = await this.getSuccessfulResults();
return allResults.filter(r =>
r.transformationsApplied.includes(transformation)
);
}
private async notifyFailure(workerId: string, result: TaskResult<DataProcessingResult>) {
// Send alert
await fetch('https://api.example.com/alerts', {
method: 'POST',
body: JSON.stringify({
workerId,
taskId: result.taskId,
error: result.error,
timestamp: result.timestamp
})
});
}
private async updateRecordStats(count: number) {
this.updateMetadata({
totalRecordsProcessed: ((this.state.metadata?.totalRecordsProcessed as number) || 0) + count
});
}
}
// 1. Create coordinator
const coordinator = await getAgentByName(env.COORDINATOR, 'default');
// 2. Workers submit results as they complete
class DataWorker extends WorkerAgent {
protected async reportResult(result: TaskResult<DataProcessingResult>): Promise<void> {
const coordinator = await getAgentByName(this.env.COORDINATOR, 'default');
await coordinator.submitResult(this.getWorkerId(), result);
}
}
// 3. Query coordinator for aggregated data
const summary = await coordinator.getSummary();
console.log(`Processed ${summary.totalResults} tasks`);
console.log(`Success rate: ${await coordinator.getSuccessRate()}%`);
// Get recent failures
const failures = await coordinator.getFailedResults(10);
failures.forEach(f => {
console.log(`Task ${f.result.taskId} failed: ${f.result.error}`);
});
// Get specific worker's results
const workerResults = await coordinator.getResultsByWorker('worker-123');
console.log(`Worker 123 completed ${workerResults.length} tasks`);
FleetManager orchestrates workers that report to the coordinator:
// Fleet manager configuration
class DataFleetManager extends FleetManagerAgent {
constructor() {
super();
// Workers will automatically report to coordinator
}
protected async getWorkerInstance(workerId: string) {
return await getAgentByName(this.env.DATA_WORKER, workerId);
}
}
// Workflow
const fleetManager = await getAgentByName(env.FLEET_MANAGER, 'default');
const coordinator = await getAgentByName(env.COORDINATOR, 'default');
// Submit batch
const batchId = await fleetManager.submitBatch(tasks);
// Poll for completion
while (true) {
const status = await fleetManager.getBatchStatus(batchId);
if (status.status === 'completed' || status.status === 'failed') {
break;
}
await new Promise(resolve => setTimeout(resolve, 1000));
}
// Get aggregated results from coordinator
const summary = await coordinator.getSummary();
const results = await coordinator.getSuccessfulResults();
class AnalyticsCoordinator extends CoordinatorAgent<Env, CoordinatorAgentState, AnalyticsResult> {
// Calculate hourly statistics
@callable()
async getHourlyStats(): Promise<Record<string, { count: number; avgDuration: number }>> {
const results = await this.getResults();
const hourlyData: Record<string, { total: number; duration: number; count: number }> = {};
for (const entry of results) {
const hour = new Date(entry.receivedAt).toISOString().slice(0, 13); // YYYY-MM-DDTHH
if (!hourlyData[hour]) {
hourlyData[hour] = { total: 0, duration: 0, count: 0 };
}
hourlyData[hour].count++;
hourlyData[hour].duration += entry.result.duration;
}
const stats: Record<string, { count: number; avgDuration: number }> = {};
for (const [hour, data] of Object.entries(hourlyData)) {
stats[hour] = {
count: data.count,
avgDuration: data.duration / data.count
};
}
return stats;
}
// Get top performers
@callable()
async getTopWorkers(limit: number = 10): Promise<Array<{ workerId: string; count: number }>> {
const results = await this.getResults({ success: true });
const workerCounts = new Map<string, number>();
for (const entry of results) {
workerCounts.set(entry.workerId, (workerCounts.get(entry.workerId) || 0) + 1);
}
return Array.from(workerCounts.entries())
.map(([workerId, count]) => ({ workerId, count }))
.sort((a, b) => b.count - a.count)
.slice(0, limit);
}
}
getSuccessRate() for health checkssubmitResult() to trigger alertsgetRecentResults() for time-based querieslimit parameter to reduce data transfer