Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions lib/lib.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ export interface BaseWriteOptions {
*/
firehose?: boolean;

/**
* If true, data sent to firehose will be gzip compressed.
*
* @default false
*/
gzipFirehose?: boolean

/**
* The number of records, where each record is an event, to micro-batch locally in the SDK before writing
* them to either kinesis, firehose or S3. See the other options in this object to understand how this
Expand Down
19 changes: 12 additions & 7 deletions lib/stream/leo-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ module.exports = function(configure) {
}
done(null, e);
}));
if (opts.useS3 && !opts.firehose) {
if (opts.useS3) {
args.push(leoS3(ls, outQueue, configure, { prefix: opts.prefix || id, ...opts.s3Opts }));
} else if (!opts.firehose) {
// TODO: This should be part of auto switch
Expand Down Expand Up @@ -696,7 +696,7 @@ module.exports = function(configure) {
}
};
var type = "kinesis";
if (opts.useS3 && !opts.firehose) { //why would both of these be set?
if (opts.useS3) {
type = "s3";
} else if (opts.firehose) {
type = "firehose";
Expand Down Expand Up @@ -766,7 +766,7 @@ module.exports = function(configure) {
logger.time("firehose request");
firehose.putRecordBatch({
Records: [{
Data: records.join('')
Data: opts.gzipFirehose ? (records.map(r => r.toString("base64")).join('\n') + '\n') : records.join('')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@czirker Are we expecting the person using this to be gzipping the content themselves?

}],
DeliveryStreamName: configure.bus.firehose
}, function(err, data) {
Expand Down Expand Up @@ -850,13 +850,13 @@ module.exports = function(configure) {
enableLogging: true,
snapshot: opts.snapshot
}, opts.chunk || {});
chunkOpts.gzip = !opts.firehose;
chunkOpts.gzip = opts.gzipFirehose || !opts.firehose;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@czirker Or is that what this does? The chunker does the gzipping?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. The chunker will do the zipping. The user just needs to specify that they want to enable the feature using gzipFirehose:true


let currentQueues = new Set();
var p = ls.buffer(opts, function(obj, callback) {
if (obj.stats) {
Object.values(obj.stats).map(v => {
let queues = v.queues;
let queues = v.queues || {};
// We don't want queues to continue past here, so remove it
delete v.queues;
return Object.keys(queues);
Expand All @@ -867,7 +867,12 @@ module.exports = function(configure) {
if (obj.gzip) {
records.push(obj.gzip);
} else if (obj.s3) {
records.push(zlib.gzipSync(JSON.stringify(obj) + "\n"));
let data = JSON.stringify(obj) + "\n";
if (chunkOpts.gzip) {
records.push(zlib.gzipSync(data));
} else {
records.push(data);
}
}
}
if (obj.correlations) {
Expand All @@ -888,7 +893,7 @@ module.exports = function(configure) {
});

let streams = [];
if (!opts.useS3 && !opts.firehose) {
if (!opts.useS3) {
streams.push(ls.throughAsync(async (obj, push) => {
let size = Buffer.byteLength(JSON.stringify(obj));
if (size > twoHundredK * 3) {
Expand Down