Conduit 0.10 comes with Multiple collections support

By  Haris Osmanagić

 29 Apr 2024

We’re happy to announce another release of our open-source data integration tool Conduit: 0.10. This one comes only a month after our last release. We thought the new native support for multiple collections was so important that we wanted to release it to our users as quickly as possible.

Multiple collections support

We take our users' feedback very seriously, and something we kept hearing was the need to have the ability to connect and integrate multiple data collections simultaneously. While this could be accomplished in some cases by creating multiple pipelines, it was far from ideal and not very scalable.

What do we mean by “collections”? It depends on the resource that Conduit is interacting with. In a database, a collection is a table, in Kafka it’s a topic, in Elasticsearch it’s an index. We use “collection” as the catch-all term for structures that contain a group of related records.

With this latest release, we believe it will be easier for connector developers to expand their functionality and add support for multiple collections.

To facilitate connectivity between connectors, we included a new metadata field named [opencdc.collection]to indicate the collection from which a record originated. For example, if a record was read from a topic named users, the OpenCDC record would look like this:

 
{
"operation": "create",
"metadata": {
"opencdc.collection": "users",
"opencdc.readAt": "1663858188836816000",
"opencdc.version": "v1",
"conduit.source.plugin.name": "builtin:kafka",
"conduit.source.plugin.version": "v0.8.0"
},
...
}
 

The goal of this feature is to make it easy to route records in a pipeline. What in the past would have taken several pipelines, can now be a single pipeline. However, that’s not the only way to route records in Conduit. Read more about other ways to route records in Conduit.

Connectors with support for multiple collections

To demonstrate the capability of having multiple collections in Conduit, we decided to start with some of our built-in connectors, which are included as part of the Conduit binary.

Kafka connector

This connector now supports the ability to read and write to multiple topics. When configuring Kafka as a source, you can make use of the topics configuration option to include a list of Kafka topics from which records will be read:

 
connectors:
- id: kafka-source
type: source
plugin: builtin:kafka
settings:
# Read records from topic1 and topic2
topics: topic1,topic2
...
 

When configuring Kafka as a destination, you can specify a target topic based on data taken from the record being processed. The default value of the topic parameter is the Go template {{ index .Metadata "opencdc.collection" }}, which means that records will be routed to the topic based on the collection they come from. You can change the parameter to take data from a different field or use a static topic.

 
connectors:
- id: kafka-source
type: destination
plugin: builtin:kafka
settings:
# Route record to topic based on record
metadata field "opencdc.collection"
topic: '{{ index .Metadata "opencdc.collection" }}'
...
 

Postgres connector

In the case of configuring a Postgres connector as a source, we expanded support to reading from multiple tables in the two CDC modes (logical replication and long polling) using the tables configuration option indicating the tables you would like to read from comma separated.

Additionally, we have also added the ability to read all tables from a public schema using a wildcard option (*). We believe this option will come in handy in the following situations:

  • Initial data ingestion: this way you’ll ensure the connector will capture all available tables, reducing the setup time and ensuring no tables are missed.
  • Schema changes: if new tables are added, the connector will automatically pick up new tables eliminating the need for manual updates.
  • Data discovery: this can be helpful to facilitate data discovery detecting changes from all tables, which can be useful when exploring a new data source.
  • Reducing maintenance: the need to maintain a list of specific tables is eliminated, making the maintenance of the connector easier.

Here’s an example of a pipeline configuration file using Postgres as a source:

 
connectors:
- id: pg-source
type: source
plugin: builtin:postgres
settings:
tables: * # All tables in schema 'public'
url: "postgresql://user:password@localhost:5432/exampledb"
 

As with our Kafka connector, the Postgres destination, defaults to setting the destination table as the value of the opencdc.collection metadata field. This can also be customized if you need to. Here’s an example:

 
connectors:
- id: pg-destination
type: destination
plugin: builtin:postgres
settings:
# Route record to table based on record metadata field "opencdc.collection"
table: '{{ index .Metadata "opencdc.collection" }}'
url: "postgresql://user:password@localhost:5432/exampledb"
 

Generator connector

Multiple collections support in the generator enables the generator to emit records with different formats. For example, let’s assume we want to simulate reading from two collections. One contains data about users and the other data about orders. With the generator, that can be accomplished using the following configuration:

 
connectors:
- id: example
type: source
plugin: builtin:generator
settings:
# Global settings
rate: 1000
# Collection "users" produces structured records with fields "id" and "name".
# All user records have the operation 'create'.
collections.users.format.type: structured
collections.users.format.options.id: int
collections.users.format.options.name: string
collections.users.operations: create
# Collection "orders" produces raw records with fields "id" and "product".
# Order records have one of the specified operations chosen randomly.
collections.orders.format.type: raw
collections.orders.format.options.id: int
collections.orders.format.options.product: string
collections.orders.operations: create,update,delete
 

📝 One of the new features is to generate different operations for each record!

Bonus: Dynamic configuration parameters in connectors

With the latest release of the connector SDK, we introduced dynamic configuration parameters. A configuration parameter can now contain a wildcard in its name (*), which can be filled out in the pipeline configuration provided by the user.

We already use this feature in the generator connector to specify multiple collections with separate formats. For instance, the configuration parameter collections.*.format.type can be provided multiple times, where * is replaced with the collection name. We also use it to configure a list of fields generated by the connector using the parameter collections.*.format.options.*.

 
connectors:
- id: example
type: source
plugin: builtin:generator
settings:
# Global settings
rate: 1000
# Collection "users" produces structured records with fields "id" and "name".
collections.users.format.type: structured
collections.users.format.options.id: int
collections.users.format.options.name: string
# Collection "orders" produces raw records with fields "id" and "product".
collections.orders.format.type: raw
collections.orders.format.options.id: int
collections.orders.format.options.product: string
 

You can start using this feature in your own connectors right away!

We’d love your feedback!

Check out the full release notes on the Conduit Changelog. What do you think about multiple collections and dynamic configuration parameters? Is there something you think would be great to have in Conduit? Start a GitHub Discussion, join us on Discord, or reach out via Twitter!

     Meroxa, Conduit, Open source, Real-time data

Haris Osmanagić

Haris Osmanagić

Senior Software Engineer working on Conduit