diff --git a/lib/lib.d.ts b/lib/lib.d.ts index d14096a..a986e6a 100644 --- a/lib/lib.d.ts +++ b/lib/lib.d.ts @@ -116,6 +116,13 @@ export interface BaseWriteOptions { */ firehose?: boolean; + /** + * If true, data sent to firehose will be gzip compressed. + * + * @default false + */ + gzipFirehose?: boolean + /** * The number of records, where each record is an event, to micro-batch locally in the SDK before writing * them to either kinesis, firehose or S3. See the other options in this object to understand how this diff --git a/lib/stream/leo-stream.js b/lib/stream/leo-stream.js index ef97921..dc9914a 100644 --- a/lib/stream/leo-stream.js +++ b/lib/stream/leo-stream.js @@ -311,7 +311,7 @@ module.exports = function(configure) { } done(null, e); })); - if (opts.useS3 && !opts.firehose) { + if (opts.useS3) { args.push(leoS3(ls, outQueue, configure, { prefix: opts.prefix || id, ...opts.s3Opts })); } else if (!opts.firehose) { // TODO: This should be part of auto switch @@ -696,7 +696,7 @@ module.exports = function(configure) { } }; var type = "kinesis"; - if (opts.useS3 && !opts.firehose) { //why would both of these be set? + if (opts.useS3) { type = "s3"; } else if (opts.firehose) { type = "firehose"; @@ -766,7 +766,7 @@ module.exports = function(configure) { logger.time("firehose request"); firehose.putRecordBatch({ Records: [{ - Data: records.join('') + Data: opts.gzipFirehose ? (records.map(r => r.toString("base64")).join('\n') + '\n') : records.join('') }], DeliveryStreamName: configure.bus.firehose }, function(err, data) { @@ -850,13 +850,13 @@ module.exports = function(configure) { enableLogging: true, snapshot: opts.snapshot }, opts.chunk || {}); - chunkOpts.gzip = !opts.firehose; + chunkOpts.gzip = opts.gzipFirehose || !opts.firehose; let currentQueues = new Set(); var p = ls.buffer(opts, function(obj, callback) { if (obj.stats) { Object.values(obj.stats).map(v => { - let queues = v.queues; + let queues = v.queues || {}; // We don't want queues to continue past here, so remove it delete v.queues; return Object.keys(queues); @@ -867,7 +867,12 @@ module.exports = function(configure) { if (obj.gzip) { records.push(obj.gzip); } else if (obj.s3) { - records.push(zlib.gzipSync(JSON.stringify(obj) + "\n")); + let data = JSON.stringify(obj) + "\n"; + if (chunkOpts.gzip) { + records.push(zlib.gzipSync(data)); + } else { + records.push(data); + } } } if (obj.correlations) { @@ -888,7 +893,7 @@ module.exports = function(configure) { }); let streams = []; - if (!opts.useS3 && !opts.firehose) { + if (!opts.useS3) { streams.push(ls.throughAsync(async (obj, push) => { let size = Buffer.byteLength(JSON.stringify(obj)); if (size > twoHundredK * 3) {