
A PHP 7.4+ library to consume the Confluent Schema Registry REST API.

8.1.0 2024-01-26 13:22 UTC


schema-registry-ci Actions Status Maintainability Test Coverage Latest Stable Version Total Downloads License

A PHP 7.4+ library to consume the Confluent Schema Registry REST API. It provides low level functions to create PSR-7 compliant requests that can be used as well as high level abstractions to ease developer experience.



Hard dependencies

Optional dependencies


This library is installed via composer.

composer require "flix-tech/confluent-schema-registry-api=^7.4"


If you are still running on a version less than 5.0.3 we recommend updating it immediately since there was a critical bug with the exception handling.


This library follows strict semantic versioning, so you can expect any minor and patch release to be compatible, while major version upgrades will have incompatibilities that will be released in the UPGRADE.md file.


Asynchronous API

Interface declaration

Example PromisingRegistry


use GuzzleHttp\Client;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Exception\SchemaRegistryException;

$registry = new PromisingRegistry(
    new Client(['base_uri' => 'registry.example.com'])

// Register a schema with a subject
$schema = AvroSchema::parse('{"type": "string"}');

// The promise will either contain a schema id as int when fulfilled,
// or a SchemaRegistryException instance when rejected.
// If the subject does not exist, it will be created implicitly
$promise = $registry->register('test-subject', $schema);

// If you want to resolve the promise, you might either get the value or an instance of a SchemaRegistryException
// It is more like an Either Monad, since returning Exceptions from rejection callbacks will throw them.
// We want to leave that decision to the user of the lib.
// TODO: Maybe return an Either Monad instead
$promise = $promise->then(
    static function ($schemaIdOrSchemaRegistryException) {
        if ($schemaIdOrSchemaRegistryException instanceof SchemaRegistryException) {
            throw $schemaIdOrSchemaRegistryException;
        return $schemaIdOrSchemaRegistryException;

// Resolve the promise
$schemaId = $promise->wait();

// Get a schema by schema id
$promise = $registry->schemaForId($schemaId);
// As above you could add additional callbacks to the promise
$schema = $promise->wait();

// Get the version of a schema for a given subject.
$version = $registry->schemaVersion(

// You can also get a schema by subject and version
$schema = $registry->schemaForSubjectAndVersion('test-subject', $version)->wait();

// You can additionally just query for the currently latest schema within a subject.
// *NOTE*: Once you requested this it might not be the latest version anymore.
$latestSchema = $registry->latestVersion('test-subject')->wait();

// Sometimes you want to find out the global schema id for a given schema
$schemaId = $registry->schemaId('test-subject', $schema)->wait();

Synchronous API

Interface declaration

Example BlockingRegistry


use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use GuzzleHttp\Client;

$registry = new BlockingRegistry(
    new PromisingRegistry(
        new Client(['base_uri' => 'registry.example.com'])

// What the blocking registry does is actually resolving the promises
// with `wait` and adding a throwing rejection callback.
$schema = AvroSchema::parse('{"type": "string"}');

// This will be an int, and not a promise
$schemaId = $registry->register('test-subject', $schema);


There is a CachedRegistry that accepts a CacheAdapter together with a Registry. It supports both async and sync APIs.


From version 4.x of this library the API for the CacheAdapterInterface has been changed in order to allow caching of schema ids by hash of a given schema.



use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use FlixTech\SchemaRegistryApi\Registry\Cache\DoctrineCacheAdapter;
use Doctrine\Common\Cache\ArrayCache;
use GuzzleHttp\Client;

$asyncApi = new PromisingRegistry(
    new Client(['base_uri' => 'registry.example.com'])

$syncApi = new BlockingRegistry($asyncApi);

$doctrineCachedSyncApi = new CachedRegistry(
    new DoctrineCacheAdapter(
        new ArrayCache()

// All adapters support both APIs, for async APIs additional fulfillment callbacks will be registered.
$avroObjectCachedAsyncApi = new CachedRegistry(
    new AvroObjectCacheAdapter()

// NEW in version 4.x, passing in custom hash functions to cache schema ids via the schema hash
// By default the following function is used internally
$defaultHashFunction = static function (AvroSchema $schema) {
   return md5((string) $schema); 

// You can also define your own hash callable
$sha1HashFunction = static function (AvroSchema $schema) {
   return sha1((string) $schema); 

// Pass the hash function as optional 3rd parameter to the CachedRegistry constructor
$avroObjectCachedAsyncApi = new CachedRegistry(
    new AvroObjectCacheAdapter(),

Low Level API

There is a low-level API that provides simple functions that return PSR-7 request objects for the different endpoints of the registry. See Requests/Functions for more information.

There are also requests to use the new DELETE API of the schema registry.


This library uses a Makefile to run the test suite and requires docker.

You can set the default variables by copying variables.mk.dist to variables.mk and change them to your liking.

Build the local docker image

PHP_VERSION=7.3 XDEBUG_VERSION=2.9.8 make docker

Unit tests, Coding standards and static analysis

PHP_VERSION=7.3 make ci-local

Integration tests

This library uses a docker-compose configuration to fire up a schema registry for integration testing, hence docker-compose from version 1.18.x is required to run those tests.

The platform can be controlled with the following environment variables
Building the confluent platform with a specific version and run the integration tests
CONFLUENT_VERSION=5.2.3 make platform
make phpunit-integration
make clean


In order to contribute to this library, follow this workflow:

  • Fork the repository
  • Create a feature branch
  • Work on the feature
  • Run tests to verify that the tests are passing
  • Open a PR to the upstream
  • Be happy about contributing to open source!