Merge branch 'main' into glitch-soc/merge-upstream

Conflicts:
- `streaming/index.js`:
  Filtering code for streaming notifications has been refactored upstream, but
  glitch-soc had similar code for local-only toots in the same places.
  Ported upstream changes, but did not refactor local-only filtering.
This commit is contained in:
Claire
2021-09-26 18:28:59 +02:00
47 changed files with 549 additions and 266 deletions

View File

@@ -282,6 +282,14 @@ const startWorker = (workerId) => {
next();
};
/**
* @param {any} req
* @param {string[]} necessaryScopes
* @return {boolean}
*/
const isInScope = (req, necessaryScopes) =>
req.scopes.some(scope => necessaryScopes.includes(scope));
/**
* @param {string} token
* @param {any} req
@@ -314,7 +322,6 @@ const startWorker = (workerId) => {
req.scopes = result.rows[0].scopes.split(' ');
req.accountId = result.rows[0].account_id;
req.chosenLanguages = result.rows[0].chosen_languages;
req.allowNotifications = req.scopes.some(scope => ['read', 'read:notifications'].includes(scope));
req.deviceId = result.rows[0].device_id;
resolve();
@@ -581,15 +588,13 @@ const startWorker = (workerId) => {
* @param {function(string, string): void} output
* @param {function(string[], function(string): void): void} attachCloseHandler
* @param {boolean=} needsFiltering
* @param {boolean=} notificationOnly
* @param {boolean=} allowLocalOnly
* @return {function(string): void}
*/
const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false, allowLocalOnly = false) => {
const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, allowLocalOnly = false) => {
const accountId = req.accountId || req.remoteAddress;
const streamType = notificationOnly ? ' (notification)' : '';
log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}${streamType}`);
log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
const listener = message => {
const json = parseJSON(message);
@@ -607,14 +612,6 @@ const startWorker = (workerId) => {
output(event, encodedPayload);
};
if (notificationOnly && event !== 'notification') {
return;
}
if (event === 'notification' && !req.allowNotifications) {
return;
}
// Only send local-only statuses to logged-in users
if (event === 'update' && payload.local_only && !(req.accountId && allowLocalOnly)) {
log.silly(req.requestId, `Message ${payload.id} filtered because it was local-only`);
@@ -767,7 +764,7 @@ const startWorker = (workerId) => {
const onSend = streamToHttp(req, res);
const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering, options.notificationOnly, options.allowLocalOnly);
streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering, options.allowLocalOnly);
}).catch(err => {
log.verbose(req.requestId, 'Subscription error:', err.toString());
httpNotFound(res);
@@ -783,88 +780,106 @@ const startWorker = (workerId) => {
* @property {string} [only_media]
*/
/**
* @param {any} req
* @return {string[]}
*/
const channelsForUserStream = req => {
const arr = [`timeline:${req.accountId}`];
if (isInScope(req, ['crypto']) && req.deviceId) {
arr.push(`timeline:${req.accountId}:${req.deviceId}`);
}
if (isInScope(req, ['read', 'read:notifications'])) {
arr.push(`timeline:${req.accountId}:notifications`);
}
return arr;
};
/**
* @param {any} req
* @param {string} name
* @param {StreamParams} params
* @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean, notificationOnly: boolean } }>}
* @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>}
*/
const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => {
switch(name) {
case 'user':
resolve({
channelIds: req.deviceId ? [`timeline:${req.accountId}`, `timeline:${req.accountId}:${req.deviceId}`] : [`timeline:${req.accountId}`],
options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true },
channelIds: channelsForUserStream(req),
options: { needsFiltering: false, allowLocalOnly: true },
});
break;
case 'user:notification':
resolve({
channelIds: [`timeline:${req.accountId}`],
options: { needsFiltering: false, notificationOnly: true, allowLocalOnly: true },
channelIds: [`timeline:${req.accountId}:notifications`],
options: { needsFiltering: false, allowLocalOnly: true },
});
break;
case 'public':
resolve({
channelIds: ['timeline:public'],
options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: isTruthy(params.allow_local_only) },
options: { needsFiltering: true, allowLocalOnly: isTruthy(params.allow_local_only) },
});
break;
case 'public:allow_local_only':
resolve({
channelIds: ['timeline:public'],
options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true },
options: { needsFiltering: true, allowLocalOnly: true },
});
break;
case 'public:local':
resolve({
channelIds: ['timeline:public:local'],
options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true },
options: { needsFiltering: true, allowLocalOnly: true },
});
break;
case 'public:remote':
resolve({
channelIds: ['timeline:public:remote'],
options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: false },
options: { needsFiltering: true, allowLocalOnly: false },
});
break;
case 'public:media':
resolve({
channelIds: ['timeline:public:media'],
options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: isTruthy(query.allow_local_only) },
options: { needsFiltering: true, allowLocalOnly: isTruthy(query.allow_local_only) },
});
break;
case 'public:allow_local_only:media':
resolve({
channelIds: ['timeline:public:media'],
options: { needsFiltering: true, notificationsOnly: false, allowLocalOnly: true },
options: { needsFiltering: true, allowLocalOnly: true },
});
break;
case 'public:local:media':
resolve({
channelIds: ['timeline:public:local:media'],
options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true },
options: { needsFiltering: true, allowLocalOnly: true },
});
break;
case 'public:remote:media':
resolve({
channelIds: ['timeline:public:remote:media'],
options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: false },
options: { needsFiltering: true, allowLocalOnly: false },
});
break;
case 'direct':
resolve({
channelIds: [`timeline:direct:${req.accountId}`],
options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true },
options: { needsFiltering: false, allowLocalOnly: true },
});
break;
@@ -874,7 +889,7 @@ const startWorker = (workerId) => {
} else {
resolve({
channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}`],
options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true },
options: { needsFiltering: true, allowLocalOnly: true },
});
}
@@ -885,7 +900,7 @@ const startWorker = (workerId) => {
} else {
resolve({
channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}:local`],
options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true },
options: { needsFiltering: true, allowLocalOnly: true },
});
}
@@ -894,7 +909,7 @@ const startWorker = (workerId) => {
authorizeListAccess(params.list, req).then(() => {
resolve({
channelIds: [`timeline:list:${params.list}`],
options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true },
options: { needsFiltering: false, allowLocalOnly: true },
});
}).catch(() => {
reject('Not authorized to stream this list');
@@ -941,7 +956,8 @@ const startWorker = (workerId) => {
const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
const stopHeartbeat = subscriptionHeartbeat(channelIds);
const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.notificationOnly, options.allowLocalOnly);
const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.allowLocalOnly);
subscriptions[channelIds.join(';')] = {
listener,