Merge branch 'master' into glitch-soc/merge-upstream

Conflicts:
- `app/controllers/activitypub/collections_controller.rb`:
  Conflict due to glitch-soc having to take care of local-only
  pinned toots in that controller.
  Took upstream's changes and restored the local-only special
  handling.
- `app/controllers/auth/sessions_controller.rb`:
  Minor conflicts due to the theming system, applied upstream
  changes, adapted the following two files for glitch-soc's
  theming system:
  - `app/controllers/concerns/sign_in_token_authentication_concern.rb`
  - `app/controllers/concerns/two_factor_authentication_concern.rb`
- `app/services/backup_service.rb`:
  Minor conflict due to glitch-soc having to handle local-only
  toots specially. Applied upstream changes and restored
  the local-only special handling.
- `app/views/admin/custom_emojis/index.html.haml`:
  Minor conflict due to the theming system.
- `package.json`:
  Upstream dependency updated, too close to a glitch-soc-only
  dependency in the file.
- `yarn.lock`:
  Upstream dependency updated, too close to a glitch-soc-only
  dependency in the file.
This commit is contained in:
Thibaut Girka
2020-06-09 10:39:20 +02:00
246 changed files with 5027 additions and 1354 deletions

View File

@ -144,13 +144,21 @@ const startWorker = (workerId) => {
callbacks.forEach(callback => callback(message));
});
const subscriptionHeartbeat = (channel) => {
const interval = 6*60;
const subscriptionHeartbeat = channels => {
if (!Array.isArray(channels)) {
channels = [channels];
}
const interval = 6 * 60;
const tellSubscribed = () => {
redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval*3);
channels.forEach(channel => redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval * 3));
};
tellSubscribed();
const heartbeat = setInterval(tellSubscribed, interval*1000);
const heartbeat = setInterval(tellSubscribed, interval * 1000);
return () => {
clearInterval(heartbeat);
};
@ -203,7 +211,7 @@ const startWorker = (workerId) => {
return;
}
client.query('SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => {
client.query('SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, devices.device_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id LEFT OUTER JOIN devices ON oauth_access_tokens.id = devices.access_token_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => {
done();
if (err) {
@ -232,6 +240,7 @@ const startWorker = (workerId) => {
req.accountId = result.rows[0].account_id;
req.chosenLanguages = result.rows[0].chosen_languages;
req.allowNotifications = scopes.some(scope => ['read', 'read:notifications'].includes(scope));
req.deviceId = result.rows[0].device_id;
next();
});
@ -353,11 +362,15 @@ const startWorker = (workerId) => {
});
};
const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => {
const accountId = req.accountId || req.remoteAddress;
const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => {
const accountId = req.accountId || req.remoteAddress;
const streamType = notificationOnly ? ' (notification)' : '';
log.verbose(req.requestId, `Starting stream from ${id} for ${accountId}${streamType}`);
if (!Array.isArray(ids)) {
ids = [ids];
}
log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}${streamType}`);
const listener = message => {
const { event, payload, queued_at } = JSON.parse(message);
@ -436,8 +449,11 @@ const startWorker = (workerId) => {
});
};
subscribe(`${redisPrefix}${id}`, listener);
attachCloseHandler(`${redisPrefix}${id}`, listener);
ids.forEach(id => {
subscribe(`${redisPrefix}${id}`, listener);
});
attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
};
// Setup stream output to HTTP
@ -464,9 +480,16 @@ const startWorker = (workerId) => {
};
// Setup stream end for HTTP
const streamHttpEnd = (req, closeHandler = false) => (id, listener) => {
const streamHttpEnd = (req, closeHandler = false) => (ids, listener) => {
if (!Array.isArray(ids)) {
ids = [ids];
}
req.on('close', () => {
unsubscribe(id, listener);
ids.forEach(id => {
unsubscribe(id, listener);
});
if (closeHandler) {
closeHandler();
}
@ -522,8 +545,13 @@ const startWorker = (workerId) => {
app.use(errorMiddleware);
app.get('/api/v1/streaming/user', (req, res) => {
const channel = `timeline:${req.accountId}`;
streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)));
const channels = [`timeline:${req.accountId}`];
if (req.deviceId) {
channels.push(`timeline:${req.accountId}:${req.deviceId}`);
}
streamFrom(channels, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channels)));
});
app.get('/api/v1/streaming/user/notification', (req, res) => {
@ -603,7 +631,12 @@ const startWorker = (workerId) => {
switch(location.query.stream) {
case 'user':
channel = `timeline:${req.accountId}`;
channel = [`timeline:${req.accountId}`];
if (req.deviceId) {
channel.push(`timeline:${req.accountId}:${req.deviceId}`);
}
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
break;
case 'user:notification':