Sounds epic, doesn’t it? Actually, it’s not that epic!
It could be interesting (or very geeky) to talk about how to ingest data in Middle-earth (and what for). However, I guess it would be out of the scope of this blog, so I’m afraid this post has nothing to do with that. This post is about how to ingest data from different kinds of file systems by means of Kafka-Connect using a connector I’ve forged recently.
From the legacy systems -in terms of hardware and software- for managing huge amounts of data to Big Data systems, a lot has happened; the way we might know an architecture for processing data is pretty different from how we did in the “old days”. I can think of some approaches we are using or hearing a lot about such as Microservices, Lambda or Kappa architectures in which we find a wide spectrum of amazing technologies, thanks especially to the open source community. These approaches are focused on how to process data with different sorts of objectives for addressing mostly business needs for data intensive applications.
How to deal with legacy systems
Even though new Big Data systems have emerged with a completely different paradigm to the “legacies” -solving some issues that organizations were complaining about- and that their popularity is growing, most businesses see the prospect of replacing them as too risky and feel locked into their “legacy systems” that are often critical to their day-to-day jobs. Dealing with these systems can sometimes be a bit awkward; they impose strong requirements on us to integrate our apps with their ecosystem, breaking some rules we wouldn’t normally like to but, at least for the time being, we have to manage this and do our best. For this reason, Big Data techs cannot look away and must face these limitations on a daily basis.
One interesting thing users/organizations are realizing, is the importance of streaming data and the “goldmine” it represents. I’m not going to talk about how important (near) real-time processing is becoming or what a Kappa architecture is. I will talk about how data, stored in a ‘mainframe’ or something similar, could be fed into a streaming platform.
I wouldn’t say it’s rare that when trying to integrate your application straight into one of these ‘legacy systems’, you come across someone who tells you: “No, you can’t access it in that way”. What now, then? One alternative is to export all the data you need to files in a shared file system and process them later. If we’re talking about ingesting this data into a streaming platform, one of the core pieces we use intensively in our Big Data platform is Apache Kafka.
We can consider Kafka as a de-facto platform for streaming architectures. It’s very active, continuously improving and its ecosystem is very rich and growing. One of the components closely related to Kafka is Kafka-Connect: a framework used for interacting from/to Kafka with external systems. Currently, there are dozens of connectors for Kafka-Connect available which allow us to ingest or bulk data from/to several kind of systems, but in this post I’m focusing on a connector I’m actually working on: kafka-connect-fs. So let’s get into the nitty-gritty!
What is this connector all about?
In a simple way: it’s a Kafka-Connect source connector (for the moment) for ingesting data from files with different sorts of formats persisted in a file system and loading them into Kafka.
Why this connector?
Well, as I’ve told you, there are a lot of connectors out there and some of them even ingest data from files, but the idea behind this one was to provide an abstraction layer to the native file system itself and, thus, allowing you to connect to “any” file system you like.
Thanks to hadoop-common lib, we have this abstraction layer using the following abstract class ‘org.apache.hadoop.fs.FileSystem’ which includes a generic file system implementation, and allows custom implementation. Out of the box, there are several file system types you can use, but in case your file system is not included, you can develop your own one! Some of them you can use are:
- HDFS.
- WebHDFS.
- S3.
- FTP.
- Local File System.
- Hadoop Archive File System.
For sure, there should be more custom implementations of these file systems but these ones should cover most of our use cases.
By the way, you can find the list of connectors here.
Features
There are two main concepts within the connector you have to take into account. They are the following:
- Policies: they define how you’re going to poll data from the file system. For instance, continuously, from time to time, file-watcher, etc.
- File readers: the format you will need to use depending on the source file data format you have in the file system. By now, the connector supports plain text (delimited or not), Avro, Parquet and Sequence Files.
Policies and readers have their own custom configurations and there are even more config options you can find in the documentation site; you can find some tips, an FAQ section and lots more.
How this works
When starting the connector, the URIs included in the connector config are grouped based on the number of tasks defined in this config. After that, the tasks are initialized and then start polling data by means of the defined policy.
This policy handles the connection with the FS, retrieves files based on its own configuration and provides the reader for processing files with their corresponding records. Also, the policy carries out the offset management so, if the same file is processed again, it will seek the file from the last committed offset (which does not mean the current record matches the last one processed necessarily) to avoid processing the same records over and over again.
Once this is done, the reader delivers the corresponding records to the working tasks and copies them to Kafka and… that’s it!
Notice that you won’t get exactly-once semantics with source connectors as Kafka doesn’t support it yet but you will get at-least-once and at-most-once semantics. However, this will be presumably supported after this KIP. In fact, exactly-once semantics is already supported in Kafka-Streams after merging this PR (this feature is already included in Kafka 0.11.0.0 version).
Running the connector
If you want to try the connector, you just have to download it from the repo and then compile, package and put it into the Kafka-Connect classpath.
To try the connector you can just deploy Kafka-Connect in standalone mode. To do this, the properties file indicating the connector config are required; this file would look like this:
name=FsSourceConnector connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector tasks.max=1 fs.uris=file:///data,hdfs://localhost:9000/ topic=mytopic policy.class=com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy policy.recursive=true policy.regexp=^[0-9]*\.txt$ file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader
And execute this command:
bin/connect-standalone etc/kafka/connect-standalone.properties etc/kafka-connect-fs/kafka-connect-fs.properties
Future work
In this post we’ve seen how to ingest data from a wide variety of file systems and copy them into Kafka using the kafka-connect-fs connector. There are more configuration options you can use to adapt the connector to your particular use case and there are a bunch of features I’d like to include in the next versions (new file readers, policies, sink connector and more). Coming soon…
In case the current version doesn’t fit your needs, you’re free to implement new features and very welcome to contribute to the project!