Fluency
High throughput data ingestion logger to Fluentd, AWS S3 and Treasure Data
Install / Use
/learn @komamitsu/FluencyREADME
Fluency
<img src="https://travis-ci.org/komamitsu/fluency.svg?branch=master"/>
High throughput data ingestion logger to Fluentd and Fluent Bit (and AWS S3 and Treasure Data.)
This document is for version 2. If you're looking for a document for version 1, see this.
Ingestion to Fluentd
Features
- Better performance (4 times faster than fluent-logger-java)
- Asynchronous flush
- TCP / UDP heartbeat with Fluentd
- Failover with multiple Fluentds
- Enable / disable ack response mode
- TLS / SSL support for https://docs.fluentd.org/v1.0/articles/in_forward#how-to-enable-tls-encryption
PackedForwardformat- Backup of buffered data on local disk
Install
Gradle
dependencies {
compile "org.komamitsu:fluency-core:${fluency.version}"
compile "org.komamitsu:fluency-fluentd:${fluency.version}"
}
Maven
<dependency>
<groupId>org.komamitsu</groupId>
<artifactId>fluency-core</artifactId>
<version>${fluency.version}</version>
</dependency>
<dependency>
<groupId>org.komamitsu</groupId>
<artifactId>fluency-fluentd</artifactId>
<version>${fluency.version}</version>
</dependency>
Usage
Create Fluency instance
For single Fluentd
// Single Fluentd(localhost:24224 by default)
// - TCP heartbeat (by default)
// - Asynchronous flush (by default)
// - Without ack response (by default)
// - Flush attempt interval is 600ms (by default)
// - Initial chunk buffer size is 1MB (by default)
// - Threshold chunk buffer size to flush is 4MB (by default)
// - Threshold chunk buffer retention time to flush is 1000 ms (by default)
// - Max total buffer size is 512MB (by default)
// - Use off heap memory for buffer pool (by default)
// - Max retries of sending events is 8 (by default)
// - Max wait until all buffers are flushed is 10 seconds (by default)
// - Max wait until the flusher is terminated is 10 seconds (by default)
// - Socket connection timeout is 5000 ms (by default)
// - Socket read timeout is 5000 ms (by default)
Fluency fluency = new FluencyBuilderForFluentd().build();
For multiple Fluentd with failover
// Multiple Fluentd(localhost:24224, localhost:24225)
Fluency fluency = new FluencyBuilderForFluentd().build(
Arrays.asList(
new InetSocketAddress(24224),
new InetSocketAddress(24225)));
Enable ACK response mode
// Single Fluentd(localhost:24224)
// - With ack response
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setAckResponseMode(true);
Fluency fluency = builder.build();
Enable file backup mode
In this mode, Fluency takes backup of unsent memory buffers as files when closing and then resends them when restarting
// Single Fluentd(localhost:24224)
// - Backup directory is the temporary directory
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setFileBackupDir(System.getProperty("java.io.tmpdir"));
Fluency fluency = builder.build();
Buffer configuration
Fluency has some parameters to configure a flush timing of buffer. This diagram may help to understand it. <img src="https://raw.githubusercontent.com/wiki/komamitsu/fluency/images/buffer-flush.png" alt="buffer-flush" width="640"/>
For high throughput data ingestion with high latency
// Single Fluentd(xxx.xxx.xxx.xxx:24224)
// - Initial chunk buffer size is 16MB
// - Threshold chunk buffer size to flush is 64MB
// Keep this value (BufferRetentionSize) between `Initial chunk buffer size` and `Max total buffer size`
// - Max total buffer size = 1024MB
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setBufferChunkInitialSize(16 * 1024 * 1024);
builder.setBufferChunkRetentionSize(64 * 1024 * 1024);
builder.setMaxBufferSize(1024 * 1024 * 1024L);
Fluency fluency = builder.build("xxx.xxx.xxx.xxx", 24224);
Socket configuration
// Single Fluentd(localhost:24224)
// - Socket connection timeout is 15000 ms
// - Socket read timeout is 10000 ms
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setConnectionTimeoutMilli(15000);
builder.setReadTimeoutMilli(10000);
Fluency fluency = builder.build();
Waits on close sequence
// Single Fluentd(localhost:24224)
// - Max wait until all buffers are flushed is 30 seconds
// - Max wait until the flusher is terminated is 40 seconds
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setWaitUntilBufferFlushed(30);
builder.setWaitUntilFlusherTerminated(40);
Fluency fluency = builder.build();
Register Jackson modules
// Single Fluentd(localhost:24224)
// - SimpleModule that has FooSerializer is enabled
SimpleModule simpleModule = new SimpleModule();
simpleModule.addSerializer(Foo.class, new FooSerializer());
FluentdRecordFormatter.Config recordFormatterConfig =
new FluentdRecordFormatter.Config();
recordFormatterConfig.setJacksonModules(
Collections.singletonList(simpleModule));
FluencyBuilderForFluentd builder = new FluencyBuilder();
builder.setRecordFormatter(new FluentdRecordFormatter(recordFormatterConfig));
Fluency fluency = builder.build();
Set a custom error handler
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setErrorHandler(ex -> {
// Send a notification
});
Fluency fluency = builder.build();
:
// If flushing events to Fluentd fails and retried out, the error handler is called back.
fluency.emit("foo.bar", event);
Send requests over SSL/TLS
// Single Fluentd(localhost:24224)
// - Enable SSL/TLS
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setSslEnabled(true);
// Or, provide your own SSLSocketFactory at runtime (replace with your own)
builder.setSslSocketFactory(SSLSocketFactory.getDefault())
Fluency fluency = builder.build();
If you want to use a custom truststore, specify the JKS file path using -Djavax.net.ssl.trustStore (and -Djavax.net.ssl.trustStorePassword if needed). You can create a custom truststore like this:
$ keytool -import -file server.crt -alias mytruststore -keystore truststore.jks
For server side configuration, see https://docs.fluentd.org/v1.0/articles/in_forward#how-to-enable-tls/ssl-encryption .
Mutual TLS
See this project.
Other configurations
// Multiple Fluentd(localhost:24224, localhost:24225)
// - Flush attempt interval is 200ms
// - Max retry of sending events is 12
// - Use JVM heap memory for buffer pool
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setFlushAttemptIntervalMillis(200);
builder.setSenderMaxRetryCount(12);
builder.setJvmHeapBufferMode(true);
Fluency fluency = builder.build(
Arrays.asList(
new InetSocketAddress(24224),
new InetSocketAddress(24225));
Emit event
String tag = "foo_db.bar_tbl";
Map<String, Object> event = new HashMap<String, Object>();
event.put("name", "komamitsu");
event.put("age", 42);
event.put("rate", 3.14);
fluency.emit(tag, event);
If you want to use EventTime as a timestamp, call Fluency#emit with an EventTime object in the following way
int epochSeconds;
int nanoseconds;
:
EventTime eventTime = EventTime.fromEpoch(epochSeconds, nanoseconds);
// You can also create an EventTime object like this
// EventTime eventTime = EventTime.fromEpochMilli(System.currentTimeMillis());
fluency.emit(tag, eventTime, event);
Error handling
Fluency#emit keeps buffered data in memory even if a retriable exception happens. But in case of buffer full, the method throws org.komamitsu.fluency.BufferFullException. There are 2 options to handle the exception.
a) Ignore the exception so that main application isn't blocked
try {
fluency.emit(tag, event);
}
catch (BufferFullException e) {
// Just log the error and move forward
logger.warn("Fluency's buffer is full", e);
}
b) Retry until the data is successfully buffered
// Considering maximum retry count would be also good
while (true) {
try {
fluency.emit(tag, event);
break;
}
catch (BufferFullException e) {
// Log the error, sleep and retry
logger.warn("Fluency's buffer is full. Retrying", e);
TimeUnit.SECONDS.sleep(5);
}
}
Which to choose depends on how important the data is and how long the application can be blocked.
Wait until buffered data is flushed and release resource
fluency.close();
Know how much Fluency is allocating memory
LOG.debug("Memory size allocated by Fluency is {}", fluency.getAllocatedBufferSize());
Know how much Fluency is buffering unsent data in memory
LOG.debug("Unsent data size buffered by Fluency in memory is {}", fluency.getBufferedDataSize());
Ingestion to Fluentd to use extra features with Java 16 or later
Features
- UNIX domain socket support
Install
Gradle
dependencies {
compile "org.komamitsu:fluency-core:${fluency.version}"
compile "org.komamitsu:fluency-fluentd:${fluency.version}"
compile "org.komamitsu:fluency-fluentd-ext:${fluency.version}"
}
Maven
<dependency>
<groupId>org.komamitsu</groupId>
