Fix performance of streaming by parsing message JSON once (#25278)
This commit is contained in:
		| @@ -52,18 +52,31 @@ const redisUrlToClient = async (defaultConfig, redisUrl) => { | ||||
| }; | ||||
|  | ||||
| /** | ||||
|  * Attempts to safely parse a string as JSON, used when both receiving a message | ||||
|  * from redis and when receiving a message from a client over a websocket | ||||
|  * connection, this is why it accepts a `req` argument. | ||||
|  * @param {string} json | ||||
|  * @param {any} req | ||||
|  * @param {any?} req | ||||
|  * @returns {Object.<string, any>|null} | ||||
|  */ | ||||
| const parseJSON = (json, req) => { | ||||
|   try { | ||||
|     return JSON.parse(json); | ||||
|   } catch (err) { | ||||
|     if (req.accountId) { | ||||
|       log.warn(req.requestId, `Error parsing message from user ${req.accountId}: ${err}`); | ||||
|     /* FIXME: This logging isn't great, and should probably be done at the | ||||
|      * call-site of parseJSON, not in the method, but this would require changing | ||||
|      * the signature of parseJSON to return something akin to a Result type: | ||||
|      * [Error|null, null|Object<string,any}], and then handling the error | ||||
|      * scenarios. | ||||
|      */ | ||||
|     if (req) { | ||||
|       if (req.accountId) { | ||||
|         log.warn(req.requestId, `Error parsing message from user ${req.accountId}: ${err}`); | ||||
|       } else { | ||||
|         log.silly(req.requestId, `Error parsing message from ${req.remoteAddress}: ${err}`); | ||||
|       } | ||||
|     } else { | ||||
|       log.silly(req.requestId, `Error parsing message from ${req.remoteAddress}: ${err}`); | ||||
|       log.warn(`Error parsing message from redis: ${err}`); | ||||
|     } | ||||
|     return null; | ||||
|   } | ||||
| @@ -163,7 +176,7 @@ const startServer = async () => { | ||||
|   const { redisParams, redisUrl, redisPrefix } = redisConfigFromEnv(process.env); | ||||
|  | ||||
|   /** | ||||
|    * @type {Object.<string, Array.<function(string): void>>} | ||||
|    * @type {Object.<string, Array.<function(Object<string, any>): void>>} | ||||
|    */ | ||||
|   const subs = {}; | ||||
|  | ||||
| @@ -203,7 +216,10 @@ const startServer = async () => { | ||||
|       return; | ||||
|     } | ||||
|  | ||||
|     callbacks.forEach(callback => callback(message)); | ||||
|     const json = parseJSON(message, null); | ||||
|     if (!json) return; | ||||
|  | ||||
|     callbacks.forEach(callback => callback(json)); | ||||
|   }; | ||||
|  | ||||
|   /** | ||||
| @@ -225,7 +241,7 @@ const startServer = async () => { | ||||
|  | ||||
|   /** | ||||
|    * @param {string} channel | ||||
|    * @param {function(string): void} callback | ||||
|    * @param {function(Object<string, any>): void} callback | ||||
|    */ | ||||
|   const unsubscribe = (channel, callback) => { | ||||
|     log.silly(`Removing listener for ${channel}`); | ||||
| @@ -369,7 +385,7 @@ const startServer = async () => { | ||||
|  | ||||
|   /** | ||||
|    * @param {any} req | ||||
|    * @returns {string} | ||||
|    * @returns {string|undefined} | ||||
|    */ | ||||
|   const channelNameFromPath = req => { | ||||
|     const { path, query } = req; | ||||
| @@ -478,15 +494,11 @@ const startServer = async () => { | ||||
|   /** | ||||
|    * @param {any} req | ||||
|    * @param {SystemMessageHandlers} eventHandlers | ||||
|    * @returns {function(string): void} | ||||
|    * @returns {function(object): void} | ||||
|    */ | ||||
|   const createSystemMessageListener = (req, eventHandlers) => { | ||||
|     return message => { | ||||
|       const json = parseJSON(message, req); | ||||
|  | ||||
|       if (!json) return; | ||||
|  | ||||
|       const { event } = json; | ||||
|       const { event } = message; | ||||
|  | ||||
|       log.silly(req.requestId, `System message for ${req.accountId}: ${event}`); | ||||
|  | ||||
| @@ -603,19 +615,16 @@ const startServer = async () => { | ||||
|    * @param {function(string, string): void} output | ||||
|    * @param {function(string[], function(string): void): void} attachCloseHandler | ||||
|    * @param {boolean=} needsFiltering | ||||
|    * @returns {function(string): void} | ||||
|    * @returns {function(object): void} | ||||
|    */ | ||||
|   const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => { | ||||
|     const accountId = req.accountId || req.remoteAddress; | ||||
|  | ||||
|     log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`); | ||||
|  | ||||
|     // Currently message is of type string, soon it'll be Record<string, any> | ||||
|     const listener = message => { | ||||
|       const json = parseJSON(message, req); | ||||
|  | ||||
|       if (!json) return; | ||||
|  | ||||
|       const { event, payload, queued_at } = json; | ||||
|       const { event, payload, queued_at } = message; | ||||
|  | ||||
|       const transmit = () => { | ||||
|         const now = new Date().getTime(); | ||||
| @@ -1198,8 +1207,15 @@ const startServer = async () => { | ||||
|     ws.on('close', onEnd); | ||||
|     ws.on('error', onEnd); | ||||
|  | ||||
|     ws.on('message', data => { | ||||
|       const json = parseJSON(data, session.request); | ||||
|     ws.on('message', (data, isBinary) => { | ||||
|       if (isBinary) { | ||||
|         log.debug('Received binary data, closing connection'); | ||||
|         ws.close(1003, 'The mastodon streaming server does not support binary messages'); | ||||
|         return; | ||||
|       } | ||||
|       const message = data.toString('utf8'); | ||||
|  | ||||
|       const json = parseJSON(message, session.request); | ||||
|  | ||||
|       if (!json) return; | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user