My DevOps Adventure: Building with Camel, CouchDB, and Elasticsearch (Part 3)

When I started this journey, my goal was clear: I needed a local and fully functional Elastic Stack environment where I could experiment freely and build robust data pipelines.
That was the motivation behind Part 1: setting up Elasticsearch, Kibana, Logstash, and CouchDB using Docker.

In Part 2, I focused on extracting and indexing data from CouchDB using Logstash. I introduced a simulated dataset of headquarters and branches and built an ETL that mapped, transformed, and loaded these documents into Elasticsearch.

Now, in Part 3, it was time to refine what Logstash had built (but with a very deliberate design choice I made earlier), adding Apache Camel into the game.

From ETL to Enrichment

During ingestion, I intentionally wrote placeholders into certain branch records to mark missing fields and make downstream enrichment predictable:

"company": "NO_COMPANY",
"trade"  : "NO_TRADE"

These markers weren’t accidents or data quality issues: they were signals. The plan all along was to let Apache Camel sweep the index after the initial load, find those markers, fetch the authoritative values from CouchDB, and then apply partial updates in Elasticsearch.

Concretely, I designed a set of Camel routes to:

  1. Query Elasticsearch for branch documents where company == "NO_COMPANY" or trade == "NO_TRADE".
  2. Use each document’s identifier (duns) to fetch the corresponding headquarters document from CouchDB.
  3. Extract company_name from the HQ record.
  4. Update the branch in Elasticsearch so that:
  • "company" becomes "company_name"
  • if "trade" was "NO_TRADE", it also becomes "company_name"

This makes enrichment deterministic, idempotent, and easy to reason about.

Scale Meets Reality

The numbers were big: I needed to read ~140 million documents from CouchDB; about 70 million would land in Elasticsearch and needed the company and trade fields.

Initially, Logstash processed ~1,500 records/second, suggesting ~1.1 day to finish the task. But in practice, throughput degraded (one week later, the job still hadn’t completed). Meanwhile, my Camel app began to consume significant memory and CPU during enrichment sweeps.

Those observations told me something important: even with a sound design, the operational behavior of ETL + enrichment at this scale needed a deeper look.
So Part 3 isn’t only about how I built the Camel integration: it’s also about what I learned by watching Logstash under load and Camel in the wild, and which bottlenecks surfaced along the way.

What’s Next

In the next sections, I’ll cover:

  • How the Camel routes orchestrate selective enrichment across Elasticsearch and CouchDB.
  • What I observed when Logstash and Camel ran side by side over millions of documents.
  • The performance and memory challenges I faced – and how they shape the decisions for Part 4.

Building the Camel Integration: Architecture and Setup

After defining the enrichment goal, I moved straight into building the Camel integration (a Spring Boot 3.5.5 application using Apache Camel 4.10.2 and Java 17).
Rather than running inside Docker, I decided to keep it local, so I could iterate faster, debug more easily, and have full control over performance profiling while it interacted with the Dockerized Elastic Stack.

From Concept to Implementation

The idea was simple but powerful: let Logstash handle the heavy ETL work, and let Camel perform smart, incremental enrichment after the initial index was created.

Here’s the hands-on workflow I implemented:

  1. Logstash loads all CouchDB documents into Elasticsearch, marking missing data intentionally with "company": "NO_COMPANY" and "trade": "NO_TRADE".
  2. Camel queries Elasticsearch for documents still containing those markers.
  3. For each hit, it calls CouchDB to retrieve the missing company information.
  4. Then, it updates the original record in Elasticsearch with the correct values via a partial _update request.

This setup lets the system refine itself gradually (no reindexing, no full reloads, just targeted corrections).

Project Setup

I created a new Maven project with Spring Boot and Camel starters, using this structure:

docker/
├── src/
│   └── main/
│       ├── java/br/com/acmattos/article/docker
│       │   ├── DockerApplication.java
│       │   └── camel/enricher/
│       │       ├── ElasticSearchRouteBuilder.java
│       │       ├── CouchDBRouteBuilder.java
│       │       └── EnricherRouteBuilder.java
│       └── resources/
│           └── application.properties
└── pom.xml

Maven Configuration

Here’s the relevant part of the pom.xml:

<dependencyManagement>
    <dependencies>
        <!-- Camel BOM -->
        <dependency>
            <groupId>org.apache.camel.springboot</groupId>
            <artifactId>camel-spring-boot-bom</artifactId>
            <version>4.16.0</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <!-- Spring Boot BOM -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>3.5.5</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <!-- Spring Boot essentials -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!-- Spring Boot and Camel integration -->
    <dependency>
        <groupId>org.apache.camel.springboot</groupId>
        <artifactId>camel-spring-boot-starter</artifactId>
        <version>4.16.0</version>
    </dependency>
    <!-- HTTP and JSON Camel support -->
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-core</artifactId>
        <version>4.16.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-jsonpath</artifactId>
        <version>4.16.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-http</artifactId>
        <version>4.16.0</version>
    </dependency>
    <!-- Logging -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
    <!-- Optional: Testing -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

This minimal setup gives Camel access to HTTP, JSON, scheduling, and logging: everything I needed to connect Elasticsearch and CouchDB securely.

Application Configuration

In application.properties, I configured connection details to reach the services already running in Docker:

# Enricher Timer
enricher.timer.period=10000

# CouchDB Configuration
couchdb.host=https://localhost:6984
couchdb.resource=company
couchdb.secure=true
couchdb.ssl-insecure=true
couchdb.username=changeme
couchdb.password=changeme

# Elasticsearch Configuration
elasticsearch.host=https://localhost:9200
elasticsearch.resource=company_idx
elasticsearch.size=2
elasticsearch.secure=true
elasticsearch.ssl-insecure=true
elasticsearch.username=elastic
elasticsearch.password=changeme

The Camel app communicates directly with Docker-exposed ports (9200, 6984), using the same CA certificate generated during the setup in Part 1 to establish secure HTTPS connections.

The Big Picture: Route Design

Once the setup was ready, I designed three main routes:

  1. Elasticsearch Query Route
    Periodically searches for documents with NO_COMPANY.
    Extracts IDs and sends them to a processing queue.
  2. CouchDB Lookup Route
    Receives an ID, fetches the corresponding headquarters document from CouchDB, and extracts company_name.
  3. Elasticsearch Update Route
    Builds a partial update JSON payload and applies it using the _update API.

The routes communicate using direct endpoints to keep everything in-memory and easy to orchestrate.
Each route logs progress, and reports how many records were updated in each cycle.

Why This Design?

This approach gave me three key benefits:

  • Isolation: The enrichment process runs independently from Logstash, avoiding interference with the ETL pipeline.
  • Efficiency: Only documents marked with placeholders are fetched and updated, minimizing unnecessary reads and writes.
  • Observability: Running locally with full Spring Boot logging made it easier to spot performance issues and memory leaks while iterating.

With the groundwork done (dependencies, configuration, and route structure) I was ready to dive into the Camel routes themselves and start building the enrichment logic step by step.

Implementing the Camel Routes

Once the configuration was ready, I moved on to what I enjoy the most: building routes.
This is where Apache Camel truly shines, by letting you define integration logic declaratively, while still writing clean, maintainable Java code.

For this project, I split the integration into three main route builders:

  1. EnricherRouteBuilder: the orchestrator that periodically triggers the enrichment process.
  2. ElasticsearchRouteBuilder: responsible for finding branch documents that need enrichment and applying updates.
  3. CouchDBRouteBuilder: fetches the missing company data from CouchDB.

Let’s go through them step by step.

The Trigger: EnricherRouteBuilder

@Component
public class EnricherRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("timer://enricher?period={{enricher.timer.period}}")
            .routeId("enricher")
            .log("Starting: ${routeId}")
            .to("direct:findBranchesWithMissingData");
    }
}

This is the entry point of the enrichment pipeline. Every {{enricher.timer.period}} milliseconds, Camel starts a new cycle by sending a message to the direct:findBranchesWithMissingData route.

I like this design because it’s lightweight and event-driven. There’s no external scheduler, no cron jobs, no message broker. Just Camel’s internal timer triggering the entire enrichment loop.

Finding and Updating Branches: ElasticsearchRouteBuilder

@Component
public class ElasticsearchRouteBuilder extends RouteBuilder {
    @Autowired
    private EsServerConfig esServerConfig;

    @Override
    public void configure() throws Exception {
        from("direct:findBranchesWithMissingData")
            .routeId("findBranchesWithMissingData")
            .log("Starting: ${routeId}")
               .toD(esServerConfig.toUrl(
                "_search?q=company:NO_COMPANY&size=" + esServerConfig.size() + "&"))
            .split()
            .jsonpath("$.hits.hits[*]._source.duns")
            .setProperty("duns", simple("${body}"))
            .process(exchange -> {
                String duns = exchange.getProperty("duns", String.class);
                if (duns != null && duns.length() >= 11) {
                    exchange.setProperty("hqDuns", duns.substring(0, 11));
                }
            })
            .log("HQ: ${exchangeProperty.hqDuns} - BR: ${body}")
            .to("direct:findHeadquarterData");

        from("direct:updateBranchMissingData")
            .routeId("updateBranchMissingData")
            .log("Starting: ${routeId}")
            .process(exchange -> {
                String duns = exchange.getProperty("duns", String.class);
                String company = exchange.getProperty("company", String.class);
                String updateQuery = "{ \"doc\": { \"company\": \"" + company +
                    "\", \"trade\": \"" + company + "\" } }";
                exchange.getIn().setBody(updateQuery);
                exchange.getIn().setHeader(
                    "CamelElasticsearchIndex", esServerConfig.resource());
                exchange.getIn().setHeader("CamelElasticsearchId", duns);
            })
            .setHeader(Exchange.HTTP_METHOD, constant("POST"))
            .toD(esServerConfig.toUrl("_update/${header.CamelElasticsearchId}?"))
            .log("Branch (${exchangeProperty.duns}) updated: ${body}");
    }
}

This route does most of the heavy lifting.

  1. It first searches Elasticsearch for documents whose company equals NO_COMPANY.
  2. It then splits the search results and processes each record individually.
    Using jsonpath("$.hits.hits[*]._source.duns"), it extracts the duns field (the document identifier shared between branch and headquarters).
  3. From each duns, it infers the headquarters DUNS (the first 11 characters) and stores it as a property.
  4. Finally, it calls the next route: direct:findHeadquarterData to fetch the missing info from CouchDB.

After CouchDB returns the company name, this route’s second half (direct:updateBranchMissingData) takes care of building a partial update payload and posting it to Elasticsearch’s _update/{id} endpoint.

I deliberately used toD() (dynamic endpoint) here, because it allows the URL to be constructed at runtime (a great fit for flexible, data-driven integrations like this).

Fetching the Source Data: CouchDBRouteBuilder

@Component
public class CouchDBRouteBuilder extends RouteBuilder {
    @Autowired
    private DbServerConfig dbServerConfig;

    @Override
    public void configure() throws Exception {
        from("direct:findHeadquarterData")
            .routeId("findHeadquarterData")
            .log("Starting: ${routeId}")
            .setHeader(Exchange.HTTP_METHOD, constant("GET"))
            .toD(dbServerConfig.toUrl("${exchangeProperty.hqDuns}?"))
            .split()
            .jsonpath("$.company_name")
            .log("company_name: ${body}")
            .setProperty("company", simple("${body}"))
            .to("direct:updateBranchMissingData");
    }
}

This route completes the enrichment cycle: for each branch’s HQ identifier, it queries CouchDB over HTTPS and extracts the authoritative company_name.
It then stores this value as a Camel property (exchangeProperty.company) and hands control back to direct:updateBranchMissingData, which performs the Elasticsearch update.

Even though the route looks simple, it encapsulates a lot of power: the ability to call secure REST endpoints, parse JSON dynamically, and feed the results into the next route without writing any boilerplate code.

SSL and Configuration Management

The connection details and credentials for both servers are managed through Spring Boot configuration properties.
Each server type (Elasticsearch and CouchDB) has its own record implementing a shared ServerConfig interface:

public interface ServerConfig {
    char SLASH = '/';
    String AUTHENTICATION_BASIC =
        "authenticationPreemptive=true&authMethodPriority=Basic&authMethod=Basic";
    String AUTH_USERNAME = "&authUsername=";
    String AUTH_PASSWORD = "&authPassword=";
    String SSL_INSECURE_PARAMETERS =
        "&sslContextParameters=#insecureSSLContextParameters";

    String host();
    String resource();
    Integer size();
    boolean sslInsecure();
    boolean secure();
    String username();
    String password();

    default String toUrl(String uri) {
        StringBuilder builder = new StringBuilder();
        builder.append(host()).append(SLASH).append(resource())
            .append(SLASH).append(uri);
        if (secure()) {
            builder.append(AUTHENTICATION_BASIC)
                   .append(AUTH_USERNAME).append(username())
                   .append(AUTH_PASSWORD).append(password());
        }
        if (secure() && sslInsecure()) {
            builder.append(SSL_INSECURE_PARAMETERS);
        }
        return builder.toString();
    }
}

This interface builds dynamic URLs that include credentials and SSL flags when required.
For development, I enabled an insecure SSL context to trust self-signed certificates from the Docker containers:

@Configuration
public class InsecureSslConfig {
    @Bean("insecureSSLContextParameters")
    public SSLContextParameters insecureSSLContextParameters() {
        var params = new SSLContextParameters();
        var trust = new TrustManagersParameters();
        trust.setTrustManager(new X509TrustManager() {
            public void checkClientTrusted(X509Certificate[] certs, String authType) {}
            public void checkServerTrusted(X509Certificate[] certs, String authType) {}
            public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; }
        });
        params.setTrustManagers(trust);
        return params;
    }
}

This setup allowed me to make secure HTTPS calls between Camel and the Dockerized CouchDB/Elasticsearch instances without disabling SSL verification globally.

Configuration Binding

Both EsServerConfig and DbServerConfig are bound to properties in application.properties using Spring Boot’s @ConfigurationProperties feature:

@ConfigurationProperties(prefix = "elasticsearch")
public record EsServerConfig(
    String host,
    String resource,
    Integer size,
    boolean secure,
    boolean sslInsecure,
    String username,
    String password
) implements ServerConfig {}

and similarly for CouchDB:

@ConfigurationProperties(prefix = "couchdb")
public record DbServerConfig(
    String host,
    String resource,
    Integer size,
    boolean secure,
    boolean sslInsecure,
    String username,
    String password
) implements ServerConfig {}

Bringing It All Together

With all three routes in place, the enrichment flow runs automatically and continuously:

  1. Every n milliseconds, the timer route triggers a new enrichment cycle.
  2. Elasticsearch is queried for incomplete records.
  3. For each record, CouchDB is contacted to retrieve the authoritative company name.
  4. Elasticsearch is updated in place – no reindexing required.

Each cycle leaves the index a little more accurate than before.
When I first saw the logs showing branch records being updated live with their correct company_name, I realized how powerful and elegant Apache Camel’s integration model really is (simple routes, declarative patterns, and a clean feedback loop across the entire data pipeline).

Let´s see a small log sample of a complete run of our Apache Camel integration:

enricher                                 : Starting: enricher
findBranchesWithMissingData              : Starting: findBranchesWithMissingData
findBranchesWithMissingData              : HQ: 636-544-512 - BR: 636-544-512-0002
findHeadquarterData                      : Starting: findHeadquarterData
findHeadquarterData                      : company_name: National Networks Corp.
updateBranchMissingData                  : Starting: updateBranchMissingData
updateBranchMissingData                  : Branch (636-544-512-0002) updated: 
{"_index":"company_idx","_id":"636-544-512-0002","_version":4,"result":"updated",
"_shards":{"total":2,"successful":1,"failed":0},"_seq_no":6008,"_primary_term":54}

Next, I’ll discuss what happened when this integration ran at scale alongside Logstash: how throughput evolved, where performance started to degrade, and what lessons I learned that will guide the next phase of this journey.

Analyzing Performance and Bottlenecks

After a few days of running the full pipeline, something wasn’t right. Despite Logstash working continuously, the number of indexed documents didn’t behave as expected. It should have finished the ETL process in roughly two days, but it didn’t.

Instead, I noticed an irregular pattern: periods of rapid ingestion were followed by long stretches of inactivity.
The chart below illustrates exactly what I saw:

At first glance, the data flow seemed to accelerate, then pause, then resume again (as if Logstash were catching its breath).
That was the first problem: the ETL process was not progressing linearly, and even after days of execution, it still hadn’t completed the ingestion phase.

Then came the second problem (and this one was even more confusing). When I looked at the Camel updater, the number of documents marked for enrichment ("company": "NO_COMPANY" or "trade": "NO_TRADE") would decrease for a while, then rise again.
That shouldn’t happen. Once enriched, those records were supposed to remain fixed.

At that point, the two anomalies started to align in my mind. If Logstash was reprocessing data, and Camel’s workload was fluctuating in sync, then the problem wasn’t in the enrichment logic itself (it was in the way data was being continuously reintroduced into the pipeline).

That realization led me to the real cause of the issue.


The CouchDB _changes Stream (and Its Hidden Cost)

The Logstash pipeline was built around the couchdb_changes input plugin, which listens to CouchDB’s _changes feed.
This feature is brilliant in theory: every insert, update, or delete event triggers a change notification that Logstash consumes and pushes to Elasticsearch.

In a stable dataset, it’s perfect: the index stays synchronized automatically, always reflecting the latest data. But in my environment, things were very different.

CouchDB was being updated in large bulk operations: tens of millions of records inserted, followed by massive updates, deletions, and later reinserts.
That constant churning meant the _changes stream never stabilized. Logstash kept reprocessing the same documents over and over again (and every time it did, it overwrote updates that Camel had just applied).

A Self-Defeating Feedback Loop

This created a kind of “integration tug-of-war”:

  • Logstash continuously reindexed documents from bulk inserts.
  • Camel tried to enrich those documents afterward.
  • Then CouchDB triggered another _changes wave, causing Logstash to reimport the same data, erasing the enrichment.

At the same time, Camel’s local process consumed massive amounts of memory (each enrichment cycle fetching, parsing, and updating thousands of documents in parallel). Eventually, I started seeing familiar red flags in the logs:

java.lang.OutOfMemoryError: Java heap space
[WARN ] enricher - Elasticsearch HTTP request failed (429 Too Many Requests)
[INFO ] findBranchesWithMissingData - Retrying query...

It was a perfect storm: Logstash was overloaded, CouchDB was streaming incessantly, Elasticsearch was juggling updates, and Camel was running out of heap space.

Conclusion

By this point, it was clear: the problem wasn’t in Camel’s logic or in Logstash’s configuration (it was in the model itself). The couchdb_changes plugin works beautifully for continuous synchronization, but it doesn’t scale when CouchDB generates millions of rapid-fire changes.

In this environment, the enrichment process simply couldn’t keep up with the constant reindexing and data churn. It became clear that I needed a new ETL strategy: one capable of handling bulk loads efficiently, processing static snapshots instead of endless change streams, and giving Camel stable data to enrich.

That realization marked the end of this phase of the journey and the beginning of the next one.

You can download a version of the code showcased in this article from my GitHub repository.

If you want to review the Logstash ETL process, take a look at the Part 2

Thank you for reading this article!

Categories: Blog

0 Comments

Leave a Reply

Avatar placeholder

Your email address will not be published. Required fields are marked *

Optimized by Optimole
Verified by MonsterInsights