My DevOps Adventure: Building with Apache Camel, CouchDB, and Elasticsearch (Part 4)

After the experience with the previous architecture (Logstash managing ingestion, and Camel enriching documents) I reached a point where both processes were competing instead of cooperating.
The couchdb_changes feed kept firing endless update events, and Logstash’s incremental ingestion model no longer made sense for my workload. I needed more control. I needed predictability.

That’s when I decided to go all-in with Apache Camel: not just for enrichment, but as the core ETL engine.

Rethinking the ETL

I kept the same infrastructure as before: CouchDB and Elasticsearch running inside Docker, securely connected via TLS, both using the same volumes and certificates established in the first part of this series.

The only thing that changed was the data flow. Instead of streaming updates from CouchDB through the _changes feed, I switched to pulling documents directly from the database, in controlled batches,
using Camel routes.

The simplicity I had seen when building the enricher with Camel convinced me that the same approach could work beautifully for ETL too. With just a few lines of Java, I could read data from CouchDB, process it in-flight, and push it into Elasticsearch: all without external dependencies or heavy plugins.

Choosing the Right Strategy: startkey vs skip/limit

CouchDB offers two main ways to page through data:

  1. skip + limit: where each page skips N documents before fetching the next batch.
    It’s easy to implement, but inefficient at scale: CouchDB still has to scan all the skipped documents internally before returning results. As the offset grows, queries become exponentially slower.
  2. startkey + limit: where each page starts from the last document key of the previous batch.
    This method leverages CouchDB’s underlying B-tree index and allows true incremental reads with minimal overhead.

I chose the second approach (startkey pagination for one simple reason): it scales linearly.
It doesn’t matter whether I’m processing 10,000 or 10 million documents: the query cost remains predictable, and there’s no performance penalty from skipped offsets.

With this method, I could consume all documents directly from _all_docs, one range at a time, always starting from the last processed key:

GET /company/_all_docs?include_docs=true&startkey="000000123"&limit=10000

Each batch would feed into Camel, which handled transformation and indexing into Elasticsearch just as before (only now, it did so without relying on CouchDB’s change feed or Logstash’s streaming logic).

This shift meant that every record fetched was already stable and up to date. No replays, no overwritten enrichments, no wasted cycles.

Why This Approach Matters

This new ETL process gave me three immediate advantages:

  • Full control over ingestion pace: I could tune batch sizes and throttling based on available memory and Elasticsearch throughput.
  • Deterministic processing: no more unpredictable replays from _changes.
  • Seamless integration: Camel’s routing model made it easy to reuse processors, transformations, and error handling patterns from the previous enricher project.

It also simplified observability: since Camel and Elasticsearch both expose clear metrics via Spring Boot Actuator, I could now monitor ingestion rate, latency, and memory behavior in real time.

Memory Insights and Early Optimizations

Once I had the ETL up and running, I turned my attention back to the enricher.
After all, both pipelines now shared the same runtime and libraries – and I was still facing occasional OutOfMemoryError exceptions during long runs.

Using VisualVM, I inspected the heap and found that most of the retained objects were String and byte[] instances, created continuously during JSON parsing and HTTP payload construction.
They lingered far longer in memory than expected, surviving multiple GC cycles, and eventually filling the heap.

I decided to rework parts of the enricher to minimize temporary allocations, reduce long-lived references, and make route processing more efficient.
But before diving into those improvements, I wanted to be sure the ETL itself was as lean and stable as possible (consuming CouchDB data at a steady rate, without overwhelming memory or CPU).

In the next section, I’ll walk through the Camel ETL implementation: how it connects to CouchDB, paginates using startkey, and streams documents efficiently into Elasticsearch.

Implementing the Camel ETL (Startkey + Limit)

After moving away from _changes, I rebuilt the ETL pipeline using Apache Camel + Spring Boot, this time pulling documents directly from CouchDB in predictable batches.
Pagination was handled via the startkey + limit approach (giving me complete control over the flow and predictable throughput).

Here’s how the pipeline works:

  1. Bootstrap: A timer starts each ETL route with its own startkey (e.g., 00000000, 22222222, etc.), defining the logical “route” name for logging and metrics.
  2. CouchDB Pagination: The route dynamically builds _all_docs?include_docs=true&startkey=...&limit=... and retrieves the next page of documents.
  3. Transformation: Each CouchDB document is mapped to a lightweight Company object with normalized fields.
  4. Batch & Bulk: Documents are aggregated in configurable batches and sent to Elasticsearch’s _bulk endpoint.
  5. Memory Cleanup: Each cycle explicitly clears payloads and properties to assist the Garbage Collector in long-running executions.

1) Bootstrapping Each Flow

The first route starts the pipeline by triggering a one-time timer.
Each ETL flow defines a starting startkey and can be enabled or disabled through configuration.

from("timer://bootstrapEtlForRouteOne?repeatCount=1")
  .routeId("bootstrapRouteOne")
  .setHeader("route", constant("RouteOne"))
  .setHeader("startkey", constant(etlConf.startkeyRoute1()))
  .process(e -> e.setProperty("enable", etlConf.enable()))
  .choice()
    .when(exchangeProperty("enable").isEqualTo(true))
      .log("RouteOne: enabled!")
      .to("direct:findAllCompanies")
    .otherwise()
      .log("RouteOne: disabled!")
  .end();

Using four startkeys (00000000, 22222222, 44444444, 66666666) allows four routes to run in parallel, effectively sharding the CouchDB keyspace and multiplying throughput by four.

2) Reading Pages from CouchDB

The next route handles page-by-page retrieval from CouchDB using the startkey + limit approach.

from("direct:findAllCompanies")
  .routeId("findAllCompanies")
  .process(this::setInitialProperties)
  .process(this::buildDbUri)
  .toD(dbServerConfig.toUrl("${header.couchdb.dynamicUrl}"))
  .unmarshal().json(JsonLibrary.Jackson)
  .process(this::extractRowsFromDb)
  .split(simple("${exchangeProperty.rows}"))
    .streaming()
    .parallelProcessing()
    .process(this::doc2Branch)
    .to("direct:saveBranches")
  .end()
  .process(this::logEndOfCouchDBRetrieval)
  .choice()
    .when(exchangeProperty("hasNextPage").isEqualTo(true))
      .to("direct:findAllCompanies")
    .otherwise()
      .process(this::logEndOfRoute);

Each iteration:

  • Retrieves a page of documents from _all_docs.
  • Determines the next page’s startkey based on the last document ID.
  • Processes documents in parallel, mapping them to the domain model.
  • Sends batches downstream to Elasticsearch for bulk indexing.

This approach avoids CouchDB’s skip/limit penalty – startkey leverages the B-tree index directly, providing O(1) performance across millions of records.

3) Transforming Each Document

Every CouchDB document is converted into a Company record.
Empty or missing values are normalized, and the address is composed on the fly.

record Company(String duns, String company, String trade, String address) {}

private ObjectNode getJsonNodes(JsonNode doc) {
    ObjectNode node = doc.deepCopy();
    String duns = doc.get("duns").asText();
    String companyName = textOr(doc.get("company_name"), "NO_COMPANY");
    String tradeName   = textOr(doc.get("trade_name"),   "NO_TRADE");
    node.put("duns", duns);
    node.put("company", companyName);
    node.put("trade", tradeName);
    node.put("headquarter", duns.length() < 11);
    putEndereco(doc, node);
    return node;
}

The goal was to keep processing allocation-light: no unnecessary transient objects, no deep JSON parsing overhead, and careful use of StringBuilder for address construction.

4) Bulk Indexing into Elasticsearch

Once documents are processed, the route aggregates them into batches and sends a single JSON payload to Elasticsearch’s _bulk endpoint.

from("direct:saveBranches")
  .routeId("saveBranches")
  .aggregate(constant(true), new AggregationStrategy<Company>())
    .completionSize(etlConfig.completionSize())
    .completionTimeout(etlConfig.completionTimeout())
  .process(this::prepareBulkRequest)
  .filter(e -> !Boolean.TRUE.equals(e.getProperty("skipElasticsearch")))
  .toD(esServerConfig.toUrl("_bulk?"))
  .process(this::freeResources);

Bulk payload generation is fully manual for maximum efficiency:

StringBuilder payload = new StringBuilder(pessoas.size() * 100);
for (Company pj : pessoas) {
    payload.append("{\"index\":{\"_id\":\"").append(pj.duns()).append("\"}}\n");
    payload.append("{\"duns\":\"").append(pj.duns()).append("\",")
           .append("\"trade\":\"").append(pj.trade()).append("\",")
           .append("\"company\":\"").append(pj.company()).append("\",")
           .append("\"address\":\"").append(pj.address()).append("\",")
           .append("\"@timestamp\":\"").append(Instant.now()).append("\",")
           .append("\"@version\":\"1\"}\n");
}
exchange.getMessage().setBody(payload.toString());
exchange.getIn().setHeader(Exchange.HTTP_METHOD, POST);

Building the JSON manually reduces object churn, avoids unnecessary JSON serialization, and keeps GC pressure under control during sustained ingestion.

5) Error Handling and GC Optimization

  • Bulk error handling:
    All HttpOperationFailedException responses (e.g., 400, 429) are caught and logged with the raw Elasticsearch response body for easy troubleshooting.
  • Memory release:
    After each batch, freeResources(...) clears the exchange body, headers, and temporary properties – preventing them from holding on to large char[] arrays longer than necessary.

Quick Technical Notes

  • Configuration prefix:
    The Spring property prefix is etl:
  @ConfigurationProperties(prefix = "etl")
  public record EtlConfig(...) {}
  • Parallel routing:
    When etl.enable-extra-routes=true, four ETL timers will start in parallel (each handling a distinct range of CouchDB keys) multiplying ingestion speed without contention.

At this stage, the Camel-based ETL is deterministic, memory-efficient, and fully controlled.
It consistently consumes CouchDB data at a steady rate, bypassing the unpredictability of _changes and eliminating Logstash’s overhead.

Next, I’ll focus on memory tuning for the enricher process: sharing the optimizations that finally eliminated the OutOfMemoryError issues observed earlier.

Optimizing Memory and Throughput in the Camel Enricher

Once the new ETL was stable, I turned my attention back to the enricher.
The first implementation worked, but under heavy load, it consumed far more memory than expected. Over long runs, the heap would slowly fill up (and eventually, OutOfMemoryError would strike).

The root cause wasn’t a mystery anymore. VisualVM showed exactly what I suspected: a constant growth of String and byte[] objects surviving multiple GC cycles. Each HTTP call to Elasticsearch or CouchDB created new payloads, response buffers, and transient JSON strings that lingered too long in memory.

It was time to rethink the way the enricher handled concurrency, object lifetime, and memory cleanup.

Parallelization and Controlled Concurrency

The first improvement was to give the enricher its own managed thread pool, instead of relying on Camel’s default internal executor.

@Bean(name="threadPoolExecutorService")
public ExecutorService threadPoolExecutorService() {
    return new ThreadPoolExecutor(
        8,   // Number of threads to keep in the pool
        8,   // Maximum number of threads
        60L, // Time for idle threads before termination
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(64),
        new ThreadPoolExecutor.CallerRunsPolicy()
    );
}

By explicitly defining a pool with bounded capacity and a fallback policy (CallerRunsPolicy), I gained full control over concurrency. No more unbounded thread creation, and no more sudden spikes in memory allocation caused by hundreds of concurrent JSON responses.

This change alone made the enricher predictable under load (each task was processed within a fixed number of threads, with backpressure applied automatically when the system was saturated).

Smarter Parallel Splitting

In the original version, the enricher iterated sequentially over Elasticsearch results.
In the new design, I introduced parallel splits using the same ExecutorService to fully utilize CPU cores:

.split(body())
    .parallelProcessing()
    .executorService(threadPoolExecutorService)
    .setHeader("asc", simple("${body}"))
    .to("direct:findBranchesWithMissingData")
.end();

This split pattern allowed Camel to handle multiple “directions” of enrichment (ascending and descending) in parallel, cutting the total enrichment time nearly in half.
Each sub-route worked independently, reading different Elasticsearch segments and writing updates concurrently, while still using the shared executor for coordination.

The result: the enricher now scaled horizontally within the same JVM, without overwhelming memory.

Resource Cleanup and Short-Lived Objects

Another major source of memory pressure came from objects that lingered longer than they should.
Each route in Camel creates an Exchange, which can hold headers, properties, and bodies that stay alive until the route completes.

I started explicitly clearing what I no longer needed, right inside the route:

private void freeResources(Exchange exchange) {
    exchange.removeProperties("*");
    exchange.getMessage().removeHeaders("*");
    exchange.getMessage().setBody(null);
    exchange.setMessage(null);
}

By clearing message bodies and headers after each Elasticsearch update, I drastically reduced the number of retained String objects and freed references early, allowing the garbage collector to reclaim memory more aggressively.

This small change eliminated the slow heap buildup observed in earlier versions.

Aggregating Exchanges and Reducing Object Churn

While analyzing the enricher under load, I realized that a significant portion of CPU and GC activity came from the constant creation of small objects (especially temporary String instances and short-lived Exchange messages).
Each document update triggered a new HTTP call and a fresh Exchange context, resulting in frequent allocation and cleanup cycles.

To reduce this overhead, I introduced a batching layer using Camel’s aggregate() pattern with a custom ExchangeAggregationStrategy. Instead of processing each branch update individually, I grouped up to N updates together before sending them in a single request to Elasticsearch.

.aggregate(constant(true), new ExchangeAggregationStrategy())
.completionSize(100)
.completionTimeout(2000)

The ExchangeAggregationStrategy implementation is intentionally minimal: it reuses existing Exchange objects rather than copying or transforming them:

public class ExchangeAggregationStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            List<Exchange> list = new ArrayList<>();
            newExchange.setProperty(Exchange.GROUPED_EXCHANGE, list);
            list.add(newExchange);
            return newExchange;
        } else {
            List<Exchange> list = oldExchange.getProperty(
                Exchange.GROUPED_EXCHANGE, List.class);
            list.add(newExchange);
            return oldExchange;
        }
    }
}

When the batch is complete, the aggregated exchanges are combined into a single bulk payload, which is then sent to Elasticsearch:

.process(exchange -> {
    @SuppressWarnings("unchecked")
    List<Exchange> exchanges = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
    StringBuilder bulkPayload = new StringBuilder();
    for (Exchange e : exchanges) {
        String duns = e.getProperty(DUNS, String.class);
        String company = e.getProperty(COMPANY, String.class);
        bulkPayload.append(ID.formatted(duns))
                   .append(PAYLOAD.formatted(company, company));
    }
    exchange.getIn().setBody(bulkPayload.toString());
    bulkPayload.setLength(0);
    exchanges.clear();
})
.setHeader(Exchange.HTTP_METHOD, constant(POST))
.toD(esServerConfig.toUrl(BULK));

This small structural change achieved three big wins:

  1. Fewer allocations: fewer Exchange instances, fewer JSON strings, and a single StringBuilder reused for each bulk request.
  2. Lower GC pressure: bulk payloads are short-lived, allowing faster reclamation.
  3. Higher throughput: batching reduced the number of HTTP calls to Elasticsearch by roughly N×.

Additionally, I started replacing repeated literal values with static constants (for example, DUNS, COMPANY, PAYLOAD, BULK).
This further reduced the creation of redundant String objects at runtime and simplified maintenance.

Together, these changes (batching with aggregation and constant reuse) significantly reduced CPU and heap usage, making the enricher both faster and more memory-efficient.

Heap Evolution and Memory Stability

To verify the impact of these optimizations, I profiled the enricher and ETL pipelines with VisualVM over several execution cycles.
The results were striking: the memory footprint and GC behavior evolved dramatically as the code matured.

Early Versions (Continuous Heap Growth)

The first runs showed a clear pattern of gradual heap growth with minimal GC recovery.
Strings, byte buffers, and Exchange instances accumulated faster than the collector could reclaim them.

As seen above, each “plateau” corresponds to a GC event that couldn’t fully clean the heap –
a classic symptom of long-lived references and unbounded thread concurrency.

After Parallelization and Resource Cleanup

Once the dedicated ExecutorService and explicit cleanup were added, the heap curve became far more stable. Memory usage still oscillated, but the GC cycles now succeeded in reclaiming most of the allocated space.

The difference was already noticeable: heap peaks were lower, and the system maintained a steady rhythm of allocation and reclamation.

Aggregation + Constant Reuse

Introducing the ExchangeAggregator and constant reuse was the real breakthrough. Batching 500 updates per request drastically reduced both heap churn and CPU time spent in GC.

The GC curve flattened considerably: proof that fewer transient objects were being created and retained.

Final Iteration (Stable and Efficient Heap)

After combining all improvements (parallel processing, aggregation, cleanup, and constant reuse) the enricher achieved near-ideal memory behavior.
The heap oscillated within a predictable range, and GC activity became almost imperceptible during long runs.

This stability allowed the JVM to handle massive enrichment workloads without memory exhaustion, even as the dataset scaled into the tens of millions.


Summary of Memory Optimization Impact

OptimizationPrimary EffectResult
Dedicated ExecutorServiceControlled concurrencyEliminated unbounded thread creation
Explicit cleanup (freeResources)Shorter object lifetimesReduced retained String and Exchange instances
ExchangeAggregationStrategyBatched processingLower GC frequency and faster throughput
Constant reuseReduced String churnSmaller permanent heap footprint

The evolution of the heap over time reflected exactly what I was aiming for: a system that not only runs fast but stays stable under continuous load.

Conclusion: From Chaos to Control

Looking back, the journey from the initial Logstash-based ingestion to the fully self-contained Camel ETL + Enricher pipeline was more than a refactor: it was a complete shift in control, visibility, and efficiency.

At the beginning, CouchDB’s _changes feed seemed like the perfect way to keep Elasticsearch synchronized in real time. But as the data volume grew and mass updates began flooding the feed, that reactive model quickly turned against me. Logstash was doing exactly what it was built for (continuously streaming) but that wasn’t what my workload needed.

By moving the entire ETL process into Apache Camel, I gained something the original stack never gave me: determinism.
Each document was fetched once, transformed once, and indexed once (under my control, at my pace).

Then, iteration by iteration, the enricher evolved too:

  • A dedicated thread pool brought predictable concurrency.
  • Explicit cleanup kept memory under control.
  • Aggregation eliminated unnecessary object churn.
  • Constant reuse squeezed the last few drops of performance out of the JVM.

At this stage, both the ETL and the enricher were finally in harmony: each working predictably, efficiently, and transparently.
No more unbounded event streams, no more OOM errors, no more uncertainty. Just clean, steady data flowing through a pipeline I could trust.

You can download a version of the code showcased in this article from my GitHub repository.

If you want to review the ETL process creation, take a look at the Part 3

Thank you for reading this article!

Categories: BlogDevOps

0 Comments

Leave a Reply

Avatar placeholder

Your email address will not be published. Required fields are marked *

Optimized by Optimole
Verified by MonsterInsights