Kafka API
Base URL: https://api.sparbz.cloud/api/v1
Overview
The Kafka API allows you to manage Apache Kafka clusters, topics, partitions, consumer groups, and related resources. Create and scale Kafka clusters, manage topics and producers/consumers, and monitor cluster health.
All endpoints require authentication via Bearer token or API key.
Cluster Management
GET /kafka
List all Kafka clusters in your organization.
Headers:
Authorization: Bearer <token>
Query Parameters:
| Parameter | Type | Description |
|---|---|---|
| status | string | Filter by status: provisioning, running, stopped, error |
| tier | string | Filter by tier: starter, pro, scale |
| limit | integer | Number of results (default: 20, max: 100) |
| offset | integer | Pagination offset |
Response (200 OK):
[
{
"id": "uuid",
"name": "events-cluster",
"tier": "pro",
"status": "running",
"brokers": 3,
"topic_count": 12,
"consumer_group_count": 4,
"storage_used_gb": 250,
"storage_limit_gb": 500,
"bootstrap_servers": "kafka-xxx.sparbz.cloud:9092",
"created_at": "2024-10-15T10:30:00Z"
}
]
POST /kafka
Create a new Kafka cluster.
Headers:
Authorization: Bearer <token>
Request Body:
{
"name": "string",
"tier": "starter | pro | scale",
"broker_count": 1,
"retention_days": 7
}
Response (201 Created):
{
"id": "uuid",
"name": "events-cluster",
"tier": "pro",
"status": "provisioning",
"brokers": 3,
"created_at": "2024-11-29T10:30:00Z"
}
GET /kafka/:id
Get details of a specific Kafka cluster.
Headers:
Authorization: Bearer <token>
Response (200 OK):
{
"id": "uuid",
"name": "events-cluster",
"tier": "pro",
"status": "running",
"brokers": 3,
"bootstrap_servers": "kafka-xxx.sparbz.cloud:9092",
"schema_registry": "https://schema-registry-xxx.sparbz.cloud",
"topic_count": 12,
"consumer_group_count": 4,
"storage_used_gb": 250,
"storage_limit_gb": 500,
"cpu_used_millicores": 1200,
"cpu_limit_millicores": 4000,
"memory_used_mb": 2048,
"memory_limit_mb": 8192,
"created_at": "2024-10-15T10:30:00Z",
"updated_at": "2024-11-29T15:45:00Z"
}
DELETE /kafka/:id
Delete a Kafka cluster and all its data.
Headers:
Authorization: Bearer <token>
Response (204 No Content)
Topic Management
GET /kafka/:id/topics
List all topics in a Kafka cluster.
Headers:
Authorization: Bearer <token>
Response (200 OK):
[
{
"name": "user-events",
"partitions": 6,
"replication_factor": 3,
"leader_epochs": 5,
"retention_ms": 604800000,
"segment_ms": 86400000,
"compression_type": "snappy",
"min_in_sync_replicas": 2
}
]
POST /kafka/:id/topics
Create a new topic in a Kafka cluster.
Headers:
Authorization: Bearer <token>
Request Body:
{
"name": "string",
"partitions": 6,
"replication_factor": 3,
"retention_ms": 604800000,
"compression_type": "snappy"
}
Response (201 Created):
{
"name": "user-events",
"partitions": 6,
"replication_factor": 3,
"created_at": "2024-11-29T10:30:00Z"
}
GET /kafka/:id/topics/:name
Get details of a specific topic.
Response (200 OK):
{
"name": "user-events",
"partitions": 6,
"replication_factor": 3,
"leader_epochs": 5,
"retention_ms": 604800000,
"segment_ms": 86400000,
"compression_type": "snappy",
"min_in_sync_replicas": 2,
"cleanup_policy": "delete",
"size_bytes": 1073741824,
"message_count": 15000000
}
DELETE /kafka/:id/topics/:name
Delete a topic from a Kafka cluster.
Response (204 No Content)
User and ACL Management
GET /kafka/:id/users
List all Kafka users (ACL accounts).
Response (200 OK):
[
{
"id": "uuid",
"username": "producer-app",
"type": "client",
"acls": [
{
"resource_type": "topic",
"resource_name": "user-events",
"operation": "Write",
"principal": "User:producer-app"
}
]
}
]
POST /kafka/:id/users
Create a new Kafka user with ACL.
Request Body:
{
"username": "string",
"type": "client | broker | admin",
"password": "string"
}
Response (201 Created):
{
"id": "uuid",
"username": "producer-app",
"password": "generated-secure-password",
"sasl_mechanism": "SCRAM-SHA-512",
"created_at": "2024-11-29T10:30:00Z"
}
Consumer Groups
GET /kafka/:id/consumer-groups
List all consumer groups in a cluster.
Response (200 OK):
[
{
"id": "group-1",
"state": "Stable",
"members": 3,
"topics": ["user-events", "system-events"],
"lag_sum": 2500,
"created_at": "2024-10-15T10:30:00Z"
}
]
GET /kafka/:id/consumer-groups/:groupId
Get details of a specific consumer group.
Response (200 OK):
{
"id": "group-1",
"state": "Stable",
"protocol": "range",
"protocol_type": "consumer",
"members": [
{
"member_id": "consumer-1",
"host": "consumer-host",
"topics": ["user-events"]
}
],
"lag": {
"topic": "user-events",
"partition": 0,
"current_offset": 15000000,
"log_end_offset": 15002500,
"lag": 2500
}
}
Schema Management
GET /kafka/:id/schemas
List all schemas in the Schema Registry.
Response (200 OK):
[
{
"id": 1,
"subject": "user-events-value",
"version": 5,
"schema": "{...avro schema...}",
"references": []
}
]
POST /kafka/:id/schemas
Register a new schema in the Schema Registry.
Request Body:
{
"subject": "string",
"schema": "JSON schema as string",
"schema_type": "AVRO | PROTOBUF | JSON"
}
Response (201 Created):
{
"id": 1,
"subject": "user-events-value",
"version": 1,
"created_at": "2024-11-29T10:30:00Z"
}
Connectors
GET /kafka/:id/connectors
List all Kafka Connect connectors.
Response (200 OK):
[
{
"name": "postgres-source",
"type": "source",
"state": "RUNNING",
"worker_id": "worker-1",
"tasks": [
{
"id": 0,
"state": "RUNNING"
}
]
}
]
POST /kafka/:id/connectors
Create a new Kafka Connect connector.
Request Body:
{
"name": "string",
"connector_class": "string",
"tasks_max": 1,
"config": {}
}
Response (201 Created):
{
"name": "postgres-source",
"config": {...},
"tasks": [{"connector": "postgres-source", "task": 0}]
}
Monitoring and Metrics
GET /kafka/:id/metrics
Get cluster metrics.
Response (200 OK):
{
"cluster_id": "uuid",
"timestamp": "2024-11-29T15:45:00Z",
"brokers": [
{
"id": 0,
"cpu_percent": 45.2,
"memory_percent": 62.1,
"network_in_bytes_per_sec": 1024000,
"network_out_bytes_per_sec": 2048000,
"disk_read_bytes_per_sec": 512000,
"disk_write_bytes_per_sec": 768000
}
],
"topics": [
{
"name": "user-events",
"bytes_in_per_sec": 102400,
"bytes_out_per_sec": 204800,
"messages_in_per_sec": 1024
}
]
}
Error Responses
| Code | Description |
|---|---|
| 400 | Bad Request - Invalid topic name, partition count, or replication factor |
| 401 | Unauthorized - Invalid or missing token |
| 403 | Forbidden - Insufficient permissions |
| 404 | Not Found - Cluster, topic, or consumer group doesn't exist |
| 409 | Conflict - Topic already exists |
| 429 | Too Many Requests - Cluster limit reached |
| 500 | Internal Server Error |
Rate Limits
- Cluster operations: 10 requests/minute
- Topic operations: 20 requests/minute
- Consumer group queries: 50 requests/minute
Common Use Cases
Creating a Topic with High Throughput
POST /kafka/{cluster_id}/topics
{
"name": "high-throughput-events",
"partitions": 12,
"replication_factor": 3,
"min_in_sync_replicas": 2,
"compression_type": "snappy",
"retention_ms": 86400000
}
Setting Up ACLs for Producer
POST /kafka/{cluster_id}/users
{
"username": "producer-app",
"type": "client"
}
Monitoring Consumer Lag
GET /kafka/{cluster_id}/consumer-groups/{group_id}
See the response above for lag details per partition.