Building Data Pipelines

Problem Statement

There are various usecases to consume data from multiple source streams (Kafka, SQS, RMQ) and populating to stores like Redis, DDB, MySql, S3, Http Rest and Kafka with a minimal processing (no heavy business logic). Data copy could be real time streaming or batch. So we need a generic framework to ship data from any source to any sink with a reliability, sink and source can be a Http endpoint also which is a usecase for configuring http webhooks (dumping the request data to Kafka and process them asynchronously). We should be able to deploy service (usecase) by just configuring source and sink endpoints and some simple SerDe code.

Background

When it comes to handling data flowing to/from a variety of data sources, Flume can rescue for a wide variety of use cases.

  1. Reliable data ingestion service from different type stores (RMQ, Kafka, MySql, SQS) to different types of stores (S3, DynamoDB, ScyllaDB, Redis, FCM, Kinesis). .
  2. Reliable data aggregator, collecting data from multiple data sources and storing in one centralized store (for example Kafka to S3).
  3. Data enrichment pipeline, If we need to read data from multiple streams and join them and store them in one place (like redis or mysql). For example if we want to enrich the data for a specific entity which has states/data coming from multiple topics from multiple services.
  4. We use it for data multiplexing, a data replicator to multiple data stores with a data filter. If we want to take multiple actions for the same event, for example we need to execute multiple actions like 1.Call Http Rest API 2. Call Comm-Platform for PN, 3.Call data platform client for instrumentation. All of these actions are configured as adaptors.
  5. We use it for ETL pipelines, We use this for loading DynamoDB for Spark job output.
  6. We use it as a webhooks.
  7. Service to Service communication with more reliable retries and back pressure support.

Why flume

  1. Reliable: It offers reliable message delivery. Flume uses a transactional approach to guarantee the reliable delivery of the events, where two transactions are maintained for each message one is sender and the other one is receiver.
  2. Fault tolerance and horizontal Scalable: Flume offers most of the source implementations and channels are distributed and scales horizontally.
  3. Extensible: Flume is implemented with interface design pattern, we can easily implement custom plugins and configure them to run in runtime, plugins and core engine is completely decoupled.
  4. High throughput, low latency : Flume is a very lightweight agent, It does not add any extra overhead while data ingestion from source to sink, we have tested and scaled max 2M events per second from kafka to multiple sources like S3, Hdfs and Scylla db.
  5. Data flow: Flume is designed using pipeline design pattern, where source and sinks are connected by a buffer (channel). Flume works with streaming data sources which continuously produces data. While the rate of incoming data exceeds the rate at which data can be written to the destination, buffer storage platform from transient spikes.
  6. Contextual routing: Flume supports contextual routing based on headers in input events, and does multiplexing or replicating data routing.
  7. Multi hop flows: Flume can be used as a multi hop data flow agent with simple declarative configurations, for example the dp events flow from an application can be configured with a simple configuration like http source => Kafka sink => S3.
  8. Fan-in, Fan-out flows: Fan-in and Fan-out da flow can be configured using multiplexing channels.
  9. Config: Flume agents are deployed as a declarative config .
  10. Metrics / Monitoring: Flume emits a lot of metrics for every component as it initializes through JMX, which can be easily routed thru prometheus.

Flume architecture

Core Concepts

The purpose of Flume is to provide a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store. The architecture of Flume NG is based on a few concepts that together help achieve this objective. Some of these concepts have existed in the past implementation, but have changed drastically. Here is a summary of concepts that Flume NG introduces, redefines, or reused from earlier implementation:

Event: A byte payload with optional string headers that represent the unit of data that Flume can transport from its point of origination to it’s final destination.

Flow: Movement of events from the point of origin to their final destination is considered a data flow, or simply flow. This is not a rigorous definition and is used only at a high level for description purposes.

Agent: An independent process that hosts flume components such as sources, channels and sinks, and thus has the ability to receive, store and forward events to their next-hop destination.

Source: An interface implementation that can consume events delivered to it via a specific mechanism. For example, an Http source is a source implementation that can be used to receive http events from clients or other agents in the flow. When a source receives an event, it hands it over to one or more channels. There are two different types of flume sources available, Pollable source which will be infinitely called by framework after interval (for example Kafka Source) and EventDriven source which will be triggered by an external event (for example Http Source).

Channel: A transient store for events, where events are delivered to the channel via sources operating within the agent. An event put in a channel stays in that channel until a sink removes it for further transport. An example of a channel is the JDBC channel that uses a file-system backed embedded database to persist the events until they are removed by a sink. Channels play an important role in ensuring durability of the flows.

Sink: An interface implementation that can remove events from a channel and transmit them to the next agent in the flow, or to the event’s final destination. Sinks that transmit the event to its final destination are also known as terminal sinks. The Flume S3 sink is an example of a terminal sink. Whereas the Flume Http sink is an example of a regular sink that can transmit messages to other agents that are running an Http source.

A Flume source consumes events delivered to it by an external source like a web server. The external source sends events to Flume in a format that is recognized by the target Flume source. For example, an Http Flume source can be used to receive http events from http clients or other Flume agents in the flow that send events from an http sink. When a Flume source receives an event, it stores it into one or more channels. The channel is a passive store that keeps the event until it’s consumed by a Flume sink. The file channel is one example — it is backed by the local filesystem. The sink removes the event from the channel and puts it into an external repository like S3 (via Flume S3 sink) or forwards it to the Flume source of the next Flume agent (next hop) in the flow. The source and sink within the given agent run asynchronously with the events staged in the channel. (official documentation)

Multi hop agent example

Flume allows a user to build multi-hop flows where events travel through multiple agents before reaching the final destination. It also allows fan-in and fan-out flows and contextual routing. In multi agent flow, one of the agents sink will become source to another agent.

Fan-In agent example

A very common scenario in event collection is a large number of events producing clients sending data to a few data collections agents that are attached to the storage system. For example, events collected from hundreds of application servers sent to a dozen of agents that write to S3 cluster.

Fan-Out / Multiplexing / Replicating agent example

Flume supports multiplexing the event flow to one or more destinations. This is achieved by defining a flow multiplexer that can replicate or selectively route an event to one or more channels.

The above example shows a source from agent “multiplexing” fanning out the flow to three different channels. This fan out can be replicating or multiplexing. In the case of replicating flow, each event is sent to all three channels. For the multiplexing case, an event is delivered to a subset of the available channels when an event’s header matches a preconfigured value. For example, if an event header called “EventType” is set to “S3”, then it should go to sink-S3, if it’s “Kafka” then it should go to sink-Kafka, otherwise sink-RMQ. The mapping can be set in the agent’s configuration file.

Building High Throughput, Exactly-Once pipelines

High Throughput

There are many tuning parameters to build a high throughput and fault tolerant data pipeline;

Batching

We can tune batchSize of the source and sink to increase the throughput. Value for these properties depends on the type of source/sink. By tuning the batch size we were able to get 100K events/second for Kafka as source and channel and S3 as sink.

Sink Group

Sink group allows us to group multiple sinks as one. Sink group can be configured either with Load balancing or Failover to write events to multiple sinks.

Multiple Sinks

Instead of attaching the sink group to the channel, we can connect multiple sinks directly to the channel. and each sink will run in its own thread. With this configuration, the channel can become the bottleneck since multiple sinks are competing for a single channel.

Multiple Channels — Multiple Sinks

This is already explained in the above examples (Fan-In and Fan-Out), we can configure multiple sinks to multiple channels using either round robin channel selector or multiplexing channel selector.

Memory Channel vs File Channel vs Spillable Channel

It is very important to choose the channel type while designing the flume flow and its a tradeoff to make high throughput vs durable data pipeline.

The file channel is durable and every event written to file channel is persisted to disk before it is available to sink, events written to file channel will survive the JVM crash.

The memory channel is a volatile channel, events are stored in memory only: if the JVM crashes, any events stored in the memory channel are lost.

The memory channel has very low write/read latencies compared to the file channel and the number of events that can be stored is limited by available RAM. The file channel, on the other hand, has far superior buffering capability due to utilizing disk space.

The SpillableMemoryChannel is a hybrid channel, It use main memory for buffering events until it has reached capacity. Thereafter the file channel will be used as overflow.

Exactly-Once (Not Tested):

Depending on source and sink, we can build a data pipeline which copies data from source to sink with exactly once semantics. We achieved this for the pipeline with Kafka/RMQ as a source and Hdfs/S3 as a sink. This also takes care of copying data to sink based on the source event time which is a use case for late arrival events.

More details here