Skip to main content

Kafka API

Base URL: https://api.sparbz.cloud/api/v1

Overview

The Kafka API allows you to manage Apache Kafka clusters, topics, partitions, consumer groups, and related resources. Create and scale Kafka clusters, manage topics and producers/consumers, and monitor cluster health.

All endpoints require authentication via Bearer token or API key.


Cluster Management

GET /kafka

List all Kafka clusters in your organization.

Headers:

Authorization: Bearer <token>

Query Parameters:

ParameterTypeDescription
statusstringFilter by status: provisioning, running, stopped, error
tierstringFilter by tier: starter, pro, scale
limitintegerNumber of results (default: 20, max: 100)
offsetintegerPagination offset

Response (200 OK):

[
{
"id": "uuid",
"name": "events-cluster",
"tier": "pro",
"status": "running",
"brokers": 3,
"topic_count": 12,
"consumer_group_count": 4,
"storage_used_gb": 250,
"storage_limit_gb": 500,
"bootstrap_servers": "kafka-xxx.sparbz.cloud:9092",
"created_at": "2024-10-15T10:30:00Z"
}
]

POST /kafka

Create a new Kafka cluster.

Headers:

Authorization: Bearer <token>

Request Body:

{
"name": "string",
"tier": "starter | pro | scale",
"broker_count": 1,
"retention_days": 7
}

Response (201 Created):

{
"id": "uuid",
"name": "events-cluster",
"tier": "pro",
"status": "provisioning",
"brokers": 3,
"created_at": "2024-11-29T10:30:00Z"
}

GET /kafka/:id

Get details of a specific Kafka cluster.

Headers:

Authorization: Bearer <token>

Response (200 OK):

{
"id": "uuid",
"name": "events-cluster",
"tier": "pro",
"status": "running",
"brokers": 3,
"bootstrap_servers": "kafka-xxx.sparbz.cloud:9092",
"schema_registry": "https://schema-registry-xxx.sparbz.cloud",
"topic_count": 12,
"consumer_group_count": 4,
"storage_used_gb": 250,
"storage_limit_gb": 500,
"cpu_used_millicores": 1200,
"cpu_limit_millicores": 4000,
"memory_used_mb": 2048,
"memory_limit_mb": 8192,
"created_at": "2024-10-15T10:30:00Z",
"updated_at": "2024-11-29T15:45:00Z"
}

DELETE /kafka/:id

Delete a Kafka cluster and all its data.

Headers:

Authorization: Bearer <token>

Response (204 No Content)


Topic Management

GET /kafka/:id/topics

List all topics in a Kafka cluster.

Headers:

Authorization: Bearer <token>

Response (200 OK):

[
{
"name": "user-events",
"partitions": 6,
"replication_factor": 3,
"leader_epochs": 5,
"retention_ms": 604800000,
"segment_ms": 86400000,
"compression_type": "snappy",
"min_in_sync_replicas": 2
}
]

POST /kafka/:id/topics

Create a new topic in a Kafka cluster.

Headers:

Authorization: Bearer <token>

Request Body:

{
"name": "string",
"partitions": 6,
"replication_factor": 3,
"retention_ms": 604800000,
"compression_type": "snappy"
}

Response (201 Created):

{
"name": "user-events",
"partitions": 6,
"replication_factor": 3,
"created_at": "2024-11-29T10:30:00Z"
}

GET /kafka/:id/topics/:name

Get details of a specific topic.

Response (200 OK):

{
"name": "user-events",
"partitions": 6,
"replication_factor": 3,
"leader_epochs": 5,
"retention_ms": 604800000,
"segment_ms": 86400000,
"compression_type": "snappy",
"min_in_sync_replicas": 2,
"cleanup_policy": "delete",
"size_bytes": 1073741824,
"message_count": 15000000
}

DELETE /kafka/:id/topics/:name

Delete a topic from a Kafka cluster.

Response (204 No Content)


User and ACL Management

GET /kafka/:id/users

List all Kafka users (ACL accounts).

Response (200 OK):

[
{
"id": "uuid",
"username": "producer-app",
"type": "client",
"acls": [
{
"resource_type": "topic",
"resource_name": "user-events",
"operation": "Write",
"principal": "User:producer-app"
}
]
}
]

POST /kafka/:id/users

Create a new Kafka user with ACL.

Request Body:

{
"username": "string",
"type": "client | broker | admin",
"password": "string"
}

Response (201 Created):

{
"id": "uuid",
"username": "producer-app",
"password": "generated-secure-password",
"sasl_mechanism": "SCRAM-SHA-512",
"created_at": "2024-11-29T10:30:00Z"
}

Consumer Groups

GET /kafka/:id/consumer-groups

List all consumer groups in a cluster.

Response (200 OK):

[
{
"id": "group-1",
"state": "Stable",
"members": 3,
"topics": ["user-events", "system-events"],
"lag_sum": 2500,
"created_at": "2024-10-15T10:30:00Z"
}
]

GET /kafka/:id/consumer-groups/:groupId

Get details of a specific consumer group.

Response (200 OK):

{
"id": "group-1",
"state": "Stable",
"protocol": "range",
"protocol_type": "consumer",
"members": [
{
"member_id": "consumer-1",
"host": "consumer-host",
"topics": ["user-events"]
}
],
"lag": {
"topic": "user-events",
"partition": 0,
"current_offset": 15000000,
"log_end_offset": 15002500,
"lag": 2500
}
}

Schema Management

GET /kafka/:id/schemas

List all schemas in the Schema Registry.

Response (200 OK):

[
{
"id": 1,
"subject": "user-events-value",
"version": 5,
"schema": "{...avro schema...}",
"references": []
}
]

POST /kafka/:id/schemas

Register a new schema in the Schema Registry.

Request Body:

{
"subject": "string",
"schema": "JSON schema as string",
"schema_type": "AVRO | PROTOBUF | JSON"
}

Response (201 Created):

{
"id": 1,
"subject": "user-events-value",
"version": 1,
"created_at": "2024-11-29T10:30:00Z"
}

Connectors

GET /kafka/:id/connectors

List all Kafka Connect connectors.

Response (200 OK):

[
{
"name": "postgres-source",
"type": "source",
"state": "RUNNING",
"worker_id": "worker-1",
"tasks": [
{
"id": 0,
"state": "RUNNING"
}
]
}
]

POST /kafka/:id/connectors

Create a new Kafka Connect connector.

Request Body:

{
"name": "string",
"connector_class": "string",
"tasks_max": 1,
"config": {}
}

Response (201 Created):

{
"name": "postgres-source",
"config": {...},
"tasks": [{"connector": "postgres-source", "task": 0}]
}

Monitoring and Metrics

GET /kafka/:id/metrics

Get cluster metrics.

Response (200 OK):

{
"cluster_id": "uuid",
"timestamp": "2024-11-29T15:45:00Z",
"brokers": [
{
"id": 0,
"cpu_percent": 45.2,
"memory_percent": 62.1,
"network_in_bytes_per_sec": 1024000,
"network_out_bytes_per_sec": 2048000,
"disk_read_bytes_per_sec": 512000,
"disk_write_bytes_per_sec": 768000
}
],
"topics": [
{
"name": "user-events",
"bytes_in_per_sec": 102400,
"bytes_out_per_sec": 204800,
"messages_in_per_sec": 1024
}
]
}

Error Responses

CodeDescription
400Bad Request - Invalid topic name, partition count, or replication factor
401Unauthorized - Invalid or missing token
403Forbidden - Insufficient permissions
404Not Found - Cluster, topic, or consumer group doesn't exist
409Conflict - Topic already exists
429Too Many Requests - Cluster limit reached
500Internal Server Error

Rate Limits

  • Cluster operations: 10 requests/minute
  • Topic operations: 20 requests/minute
  • Consumer group queries: 50 requests/minute

Common Use Cases

Creating a Topic with High Throughput

POST /kafka/{cluster_id}/topics
{
"name": "high-throughput-events",
"partitions": 12,
"replication_factor": 3,
"min_in_sync_replicas": 2,
"compression_type": "snappy",
"retention_ms": 86400000
}

Setting Up ACLs for Producer

POST /kafka/{cluster_id}/users
{
"username": "producer-app",
"type": "client"
}

Monitoring Consumer Lag

GET /kafka/{cluster_id}/consumer-groups/{group_id}

See the response above for lag details per partition.