Merge commit '16dd3f08c1e5396d5f9ff3f13417901bc4e4b8b9' into glitch-soc/merge-upstream

Conflicts:
- `app/views/settings/preferences/appearance/show.html.haml`:
  Upstream fixed a translation bug in the theme selector that is absent from
  glitch-soc due to our different theming system.
  Discarded upstream changes.
- `streaming/index.js`:
  Upstream changed the signature of a function to change its return type.
  This is not a real conflict, the conflict being caused by an extra
  argument in glitch-soc's code.
  Applied upstream's change while keeping our extra argument.
This commit is contained in:
Claire
2023-06-10 17:18:36 +02:00
4 changed files with 109 additions and 150 deletions

View File

@@ -52,18 +52,31 @@ const redisUrlToClient = async (defaultConfig, redisUrl) => {
};
/**
* Attempts to safely parse a string as JSON, used when both receiving a message
* from redis and when receiving a message from a client over a websocket
* connection, this is why it accepts a `req` argument.
* @param {string} json
* @param {any} req
* @param {any?} req
* @returns {Object.<string, any>|null}
*/
const parseJSON = (json, req) => {
try {
return JSON.parse(json);
} catch (err) {
if (req.accountId) {
log.warn(req.requestId, `Error parsing message from user ${req.accountId}: ${err}`);
/* FIXME: This logging isn't great, and should probably be done at the
* call-site of parseJSON, not in the method, but this would require changing
* the signature of parseJSON to return something akin to a Result type:
* [Error|null, null|Object<string,any}], and then handling the error
* scenarios.
*/
if (req) {
if (req.accountId) {
log.warn(req.requestId, `Error parsing message from user ${req.accountId}: ${err}`);
} else {
log.silly(req.requestId, `Error parsing message from ${req.remoteAddress}: ${err}`);
}
} else {
log.silly(req.requestId, `Error parsing message from ${req.remoteAddress}: ${err}`);
log.warn(`Error parsing message from redis: ${err}`);
}
return null;
}
@@ -163,7 +176,7 @@ const startServer = async () => {
const { redisParams, redisUrl, redisPrefix } = redisConfigFromEnv(process.env);
/**
* @type {Object.<string, Array.<function(string): void>>}
* @type {Object.<string, Array.<function(Object<string, any>): void>>}
*/
const subs = {};
@@ -203,7 +216,10 @@ const startServer = async () => {
return;
}
callbacks.forEach(callback => callback(message));
const json = parseJSON(message, null);
if (!json) return;
callbacks.forEach(callback => callback(json));
};
/**
@@ -225,7 +241,7 @@ const startServer = async () => {
/**
* @param {string} channel
* @param {function(string): void} callback
* @param {function(Object<string, any>): void} callback
*/
const unsubscribe = (channel, callback) => {
log.silly(`Removing listener for ${channel}`);
@@ -369,7 +385,7 @@ const startServer = async () => {
/**
* @param {any} req
* @returns {string}
* @returns {string|undefined}
*/
const channelNameFromPath = req => {
const { path, query } = req;
@@ -478,15 +494,11 @@ const startServer = async () => {
/**
* @param {any} req
* @param {SystemMessageHandlers} eventHandlers
* @returns {function(string): void}
* @returns {function(object): void}
*/
const createSystemMessageListener = (req, eventHandlers) => {
return message => {
const json = parseJSON(message, req);
if (!json) return;
const { event } = json;
const { event } = message;
log.silly(req.requestId, `System message for ${req.accountId}: ${event}`);
@@ -604,19 +616,16 @@ const startServer = async () => {
* @param {function(string[], function(string): void): void} attachCloseHandler
* @param {boolean=} needsFiltering
* @param {boolean=} allowLocalOnly
* @returns {function(string): void}
* @returns {function(object): void}
*/
const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, allowLocalOnly = false) => {
const accountId = req.accountId || req.remoteAddress;
log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
// Currently message is of type string, soon it'll be Record<string, any>
const listener = message => {
const json = parseJSON(message, req);
if (!json) return;
const { event, payload, queued_at } = json;
const { event, payload, queued_at } = message;
const transmit = () => {
const now = new Date().getTime();
@@ -1219,8 +1228,15 @@ const startServer = async () => {
ws.on('close', onEnd);
ws.on('error', onEnd);
ws.on('message', data => {
const json = parseJSON(data, session.request);
ws.on('message', (data, isBinary) => {
if (isBinary) {
log.debug('Received binary data, closing connection');
ws.close(1003, 'The mastodon streaming server does not support binary messages');
return;
}
const message = data.toString('utf8');
const json = parseJSON(message, session.request);
if (!json) return;