From 09a15093bf8749822847c28e4b70fb391a4636ed Mon Sep 17 00:00:00 2001 From: Clint Zirker Date: Thu, 16 Feb 2023 10:24:35 -0700 Subject: [PATCH 1/2] Adding ability to compress data when delivering to firehose --- lib/lib.d.ts | 7 +++++++ lib/stream/leo-stream.js | 12 ++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/lib/lib.d.ts b/lib/lib.d.ts index b68b3db3..3089a77d 100644 --- a/lib/lib.d.ts +++ b/lib/lib.d.ts @@ -82,6 +82,13 @@ export interface BaseWriteOptions { */ firehose?: boolean; + /** + * If true, data sent to firehose will be gzip compressed. + * + * @default false + */ + gzipFirehose?: boolean + /** * The number of records, where each record is an event, to micro-batch locally in the SDK before writing * them to either kinesis, firehose or S3. See the other options in this object to understand how this diff --git a/lib/stream/leo-stream.js b/lib/stream/leo-stream.js index f5d12abe..d643a671 100644 --- a/lib/stream/leo-stream.js +++ b/lib/stream/leo-stream.js @@ -310,7 +310,7 @@ module.exports = function(configure) { } done(null, e); })); - if (opts.useS3 && !opts.firehose) { + if (opts.useS3) { args.push(leoS3(ls, outQueue, configure, { prefix: opts.prefix || id })); } else if (!opts.firehose) { // TODO: This should be part of auto switch @@ -695,7 +695,7 @@ module.exports = function(configure) { } }; var type = "kinesis"; - if (opts.useS3 && !opts.firehose) { //why would both of these be set? + if (opts.useS3) { type = "s3"; } else if (opts.firehose) { type = "firehose"; @@ -765,7 +765,7 @@ module.exports = function(configure) { logger.time("firehose request"); firehose.putRecordBatch({ Records: [{ - Data: records.join('') + Data: opts.gzipFirehose ? (records.map(r => r.toString("base64")).join('\n') + '\n') : records.join('') }], DeliveryStreamName: configure.bus.firehose }, function(err, data) { @@ -849,13 +849,13 @@ module.exports = function(configure) { enableLogging: true, snapshot: opts.snapshot }, opts.chunk || {}); - chunkOpts.gzip = !opts.firehose; + chunkOpts.gzip = opts.gzipFirehose || !opts.firehose; let currentQueues = new Set(); var p = ls.buffer(opts, function(obj, callback) { if (obj.stats) { Object.values(obj.stats).map(v => { - let queues = v.queues; + let queues = v.queues || {}; // We don't want queues to continue past here, so remove it delete v.queues; return Object.keys(queues); @@ -887,7 +887,7 @@ module.exports = function(configure) { }); let streams = []; - if (!opts.useS3 && !opts.firehose) { + if (!opts.useS3) { streams.push(ls.throughAsync(async (obj, push) => { let size = Buffer.byteLength(JSON.stringify(obj)); if (size > twoHundredK * 3) { From 46395429a1b52f7b5c1294eea4315cc63af8c39d Mon Sep 17 00:00:00 2001 From: czirker <45575356+czirker@users.noreply.github.com> Date: Wed, 10 Jan 2024 15:32:03 -0700 Subject: [PATCH 2/2] Fixing case where S3 is used but not compressing firehose data --- lib/stream/leo-stream.js | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/stream/leo-stream.js b/lib/stream/leo-stream.js index deae3d45..dc9914a5 100644 --- a/lib/stream/leo-stream.js +++ b/lib/stream/leo-stream.js @@ -867,7 +867,12 @@ module.exports = function(configure) { if (obj.gzip) { records.push(obj.gzip); } else if (obj.s3) { - records.push(zlib.gzipSync(JSON.stringify(obj) + "\n")); + let data = JSON.stringify(obj) + "\n"; + if (chunkOpts.gzip) { + records.push(zlib.gzipSync(data)); + } else { + records.push(data); + } } } if (obj.correlations) {