Refactoring streaming connections (#4645)
This commit is contained in:
parent
10e9a9a3f9
commit
ea958cae7f
94
app/javascript/mastodon/actions/streaming.js
Normal file
94
app/javascript/mastodon/actions/streaming.js
Normal file
|
@ -0,0 +1,94 @@
|
||||||
|
import createStream from '../stream';
|
||||||
|
import {
|
||||||
|
updateTimeline,
|
||||||
|
deleteFromTimelines,
|
||||||
|
refreshHomeTimeline,
|
||||||
|
connectTimeline,
|
||||||
|
disconnectTimeline,
|
||||||
|
} from './timelines';
|
||||||
|
import { updateNotifications, refreshNotifications } from './notifications';
|
||||||
|
import { getLocale } from '../locales';
|
||||||
|
|
||||||
|
const { messages } = getLocale();
|
||||||
|
|
||||||
|
export function connectTimelineStream (timelineId, path, pollingRefresh = null) {
|
||||||
|
return (dispatch, getState) => {
|
||||||
|
const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']);
|
||||||
|
const accessToken = getState().getIn(['meta', 'access_token']);
|
||||||
|
const locale = getState().getIn(['meta', 'locale']);
|
||||||
|
let polling = null;
|
||||||
|
|
||||||
|
const setupPolling = () => {
|
||||||
|
polling = setInterval(() => {
|
||||||
|
pollingRefresh(dispatch);
|
||||||
|
}, 20000);
|
||||||
|
};
|
||||||
|
|
||||||
|
const clearPolling = () => {
|
||||||
|
if (polling) {
|
||||||
|
clearInterval(polling);
|
||||||
|
polling = null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const subscription = createStream(streamingAPIBaseURL, accessToken, path, {
|
||||||
|
|
||||||
|
connected () {
|
||||||
|
if (pollingRefresh) {
|
||||||
|
clearPolling();
|
||||||
|
}
|
||||||
|
dispatch(connectTimeline(timelineId));
|
||||||
|
},
|
||||||
|
|
||||||
|
disconnected () {
|
||||||
|
if (pollingRefresh) {
|
||||||
|
setupPolling();
|
||||||
|
}
|
||||||
|
dispatch(disconnectTimeline(timelineId));
|
||||||
|
},
|
||||||
|
|
||||||
|
received (data) {
|
||||||
|
switch(data.event) {
|
||||||
|
case 'update':
|
||||||
|
dispatch(updateTimeline(timelineId, JSON.parse(data.payload)));
|
||||||
|
break;
|
||||||
|
case 'delete':
|
||||||
|
dispatch(deleteFromTimelines(data.payload));
|
||||||
|
break;
|
||||||
|
case 'notification':
|
||||||
|
dispatch(updateNotifications(JSON.parse(data.payload), messages, locale));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
reconnected () {
|
||||||
|
if (pollingRefresh) {
|
||||||
|
clearPolling();
|
||||||
|
pollingRefresh(dispatch);
|
||||||
|
}
|
||||||
|
dispatch(connectTimeline(timelineId));
|
||||||
|
},
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
const disconnect = () => {
|
||||||
|
if (subscription) {
|
||||||
|
subscription.close();
|
||||||
|
}
|
||||||
|
clearPolling();
|
||||||
|
};
|
||||||
|
|
||||||
|
return disconnect;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function refreshHomeTimelineAndNotification (dispatch) {
|
||||||
|
dispatch(refreshHomeTimeline());
|
||||||
|
dispatch(refreshNotifications());
|
||||||
|
}
|
||||||
|
|
||||||
|
export const connectUserStream = () => connectTimelineStream('home', 'user', refreshHomeTimelineAndNotification);
|
||||||
|
export const connectCommunityStream = () => connectTimelineStream('community', 'public:local');
|
||||||
|
export const connectMediaStream = () => connectTimelineStream('community', 'public:local');
|
||||||
|
export const connectPublicStream = () => connectTimelineStream('public', 'public');
|
||||||
|
export const connectHashtagStream = (tag) => connectTimelineStream(`hashtag:${tag}`, `hashtag&tag=${tag}`);
|
|
@ -2,21 +2,13 @@ import React from 'react';
|
||||||
import { Provider } from 'react-redux';
|
import { Provider } from 'react-redux';
|
||||||
import PropTypes from 'prop-types';
|
import PropTypes from 'prop-types';
|
||||||
import configureStore from '../store/configureStore';
|
import configureStore from '../store/configureStore';
|
||||||
import {
|
|
||||||
updateTimeline,
|
|
||||||
deleteFromTimelines,
|
|
||||||
refreshHomeTimeline,
|
|
||||||
connectTimeline,
|
|
||||||
disconnectTimeline,
|
|
||||||
} from '../actions/timelines';
|
|
||||||
import { showOnboardingOnce } from '../actions/onboarding';
|
import { showOnboardingOnce } from '../actions/onboarding';
|
||||||
import { updateNotifications, refreshNotifications } from '../actions/notifications';
|
|
||||||
import BrowserRouter from 'react-router-dom/BrowserRouter';
|
import BrowserRouter from 'react-router-dom/BrowserRouter';
|
||||||
import Route from 'react-router-dom/Route';
|
import Route from 'react-router-dom/Route';
|
||||||
import ScrollContext from 'react-router-scroll/lib/ScrollBehaviorContext';
|
import ScrollContext from 'react-router-scroll/lib/ScrollBehaviorContext';
|
||||||
import UI from '../features/ui';
|
import UI from '../features/ui';
|
||||||
import { hydrateStore } from '../actions/store';
|
import { hydrateStore } from '../actions/store';
|
||||||
import createStream from '../stream';
|
import { connectUserStream } from '../actions/streaming';
|
||||||
import { IntlProvider, addLocaleData } from 'react-intl';
|
import { IntlProvider, addLocaleData } from 'react-intl';
|
||||||
import { getLocale } from '../locales';
|
import { getLocale } from '../locales';
|
||||||
const { localeData, messages } = getLocale();
|
const { localeData, messages } = getLocale();
|
||||||
|
@ -33,56 +25,7 @@ export default class Mastodon extends React.PureComponent {
|
||||||
};
|
};
|
||||||
|
|
||||||
componentDidMount() {
|
componentDidMount() {
|
||||||
const { locale } = this.props;
|
this.disconnect = store.dispatch(connectUserStream());
|
||||||
const streamingAPIBaseURL = store.getState().getIn(['meta', 'streaming_api_base_url']);
|
|
||||||
const accessToken = store.getState().getIn(['meta', 'access_token']);
|
|
||||||
|
|
||||||
const setupPolling = () => {
|
|
||||||
this.polling = setInterval(() => {
|
|
||||||
store.dispatch(refreshHomeTimeline());
|
|
||||||
store.dispatch(refreshNotifications());
|
|
||||||
}, 20000);
|
|
||||||
};
|
|
||||||
|
|
||||||
const clearPolling = () => {
|
|
||||||
clearInterval(this.polling);
|
|
||||||
this.polling = undefined;
|
|
||||||
};
|
|
||||||
|
|
||||||
this.subscription = createStream(streamingAPIBaseURL, accessToken, 'user', {
|
|
||||||
|
|
||||||
connected () {
|
|
||||||
clearPolling();
|
|
||||||
store.dispatch(connectTimeline('home'));
|
|
||||||
},
|
|
||||||
|
|
||||||
disconnected () {
|
|
||||||
setupPolling();
|
|
||||||
store.dispatch(disconnectTimeline('home'));
|
|
||||||
},
|
|
||||||
|
|
||||||
received (data) {
|
|
||||||
switch(data.event) {
|
|
||||||
case 'update':
|
|
||||||
store.dispatch(updateTimeline('home', JSON.parse(data.payload)));
|
|
||||||
break;
|
|
||||||
case 'delete':
|
|
||||||
store.dispatch(deleteFromTimelines(data.payload));
|
|
||||||
break;
|
|
||||||
case 'notification':
|
|
||||||
store.dispatch(updateNotifications(JSON.parse(data.payload), messages, locale));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|
|
||||||
reconnected () {
|
|
||||||
clearPolling();
|
|
||||||
store.dispatch(connectTimeline('home'));
|
|
||||||
store.dispatch(refreshHomeTimeline());
|
|
||||||
store.dispatch(refreshNotifications());
|
|
||||||
},
|
|
||||||
|
|
||||||
});
|
|
||||||
|
|
||||||
// Desktop notifications
|
// Desktop notifications
|
||||||
if (typeof window.Notification !== 'undefined' && Notification.permission === 'default') {
|
if (typeof window.Notification !== 'undefined' && Notification.permission === 'default') {
|
||||||
|
@ -98,14 +41,9 @@ export default class Mastodon extends React.PureComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
componentWillUnmount () {
|
componentWillUnmount () {
|
||||||
if (typeof this.subscription !== 'undefined') {
|
if (this.disconnect) {
|
||||||
this.subscription.close();
|
this.disconnect();
|
||||||
this.subscription = null;
|
this.disconnect = null;
|
||||||
}
|
|
||||||
|
|
||||||
if (typeof this.polling !== 'undefined') {
|
|
||||||
clearInterval(this.polling);
|
|
||||||
this.polling = null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,15 +7,11 @@ import ColumnHeader from '../../components/column_header';
|
||||||
import {
|
import {
|
||||||
refreshCommunityTimeline,
|
refreshCommunityTimeline,
|
||||||
expandCommunityTimeline,
|
expandCommunityTimeline,
|
||||||
updateTimeline,
|
|
||||||
deleteFromTimelines,
|
|
||||||
connectTimeline,
|
|
||||||
disconnectTimeline,
|
|
||||||
} from '../../actions/timelines';
|
} from '../../actions/timelines';
|
||||||
import { addColumn, removeColumn, moveColumn } from '../../actions/columns';
|
import { addColumn, removeColumn, moveColumn } from '../../actions/columns';
|
||||||
import { defineMessages, injectIntl, FormattedMessage } from 'react-intl';
|
import { defineMessages, injectIntl, FormattedMessage } from 'react-intl';
|
||||||
import ColumnSettingsContainer from './containers/column_settings_container';
|
import ColumnSettingsContainer from './containers/column_settings_container';
|
||||||
import createStream from '../../stream';
|
import { connectCommunityStream } from '../../actions/streaming';
|
||||||
|
|
||||||
const messages = defineMessages({
|
const messages = defineMessages({
|
||||||
title: { id: 'column.community', defaultMessage: 'Local timeline' },
|
title: { id: 'column.community', defaultMessage: 'Local timeline' },
|
||||||
|
@ -23,8 +19,6 @@ const messages = defineMessages({
|
||||||
|
|
||||||
const mapStateToProps = state => ({
|
const mapStateToProps = state => ({
|
||||||
hasUnread: state.getIn(['timelines', 'community', 'unread']) > 0,
|
hasUnread: state.getIn(['timelines', 'community', 'unread']) > 0,
|
||||||
streamingAPIBaseURL: state.getIn(['meta', 'streaming_api_base_url']),
|
|
||||||
accessToken: state.getIn(['meta', 'access_token']),
|
|
||||||
});
|
});
|
||||||
|
|
||||||
@connect(mapStateToProps)
|
@connect(mapStateToProps)
|
||||||
|
@ -35,8 +29,6 @@ export default class CommunityTimeline extends React.PureComponent {
|
||||||
dispatch: PropTypes.func.isRequired,
|
dispatch: PropTypes.func.isRequired,
|
||||||
columnId: PropTypes.string,
|
columnId: PropTypes.string,
|
||||||
intl: PropTypes.object.isRequired,
|
intl: PropTypes.object.isRequired,
|
||||||
streamingAPIBaseURL: PropTypes.string.isRequired,
|
|
||||||
accessToken: PropTypes.string.isRequired,
|
|
||||||
hasUnread: PropTypes.bool,
|
hasUnread: PropTypes.bool,
|
||||||
multiColumn: PropTypes.bool,
|
multiColumn: PropTypes.bool,
|
||||||
};
|
};
|
||||||
|
@ -61,46 +53,16 @@ export default class CommunityTimeline extends React.PureComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
componentDidMount () {
|
componentDidMount () {
|
||||||
const { dispatch, streamingAPIBaseURL, accessToken } = this.props;
|
const { dispatch } = this.props;
|
||||||
|
|
||||||
dispatch(refreshCommunityTimeline());
|
dispatch(refreshCommunityTimeline());
|
||||||
|
this.disconnect = dispatch(connectCommunityStream());
|
||||||
if (typeof this._subscription !== 'undefined') {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this._subscription = createStream(streamingAPIBaseURL, accessToken, 'public:local', {
|
|
||||||
|
|
||||||
connected () {
|
|
||||||
dispatch(connectTimeline('community'));
|
|
||||||
},
|
|
||||||
|
|
||||||
reconnected () {
|
|
||||||
dispatch(connectTimeline('community'));
|
|
||||||
},
|
|
||||||
|
|
||||||
disconnected () {
|
|
||||||
dispatch(disconnectTimeline('community'));
|
|
||||||
},
|
|
||||||
|
|
||||||
received (data) {
|
|
||||||
switch(data.event) {
|
|
||||||
case 'update':
|
|
||||||
dispatch(updateTimeline('community', JSON.parse(data.payload)));
|
|
||||||
break;
|
|
||||||
case 'delete':
|
|
||||||
dispatch(deleteFromTimelines(data.payload));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
componentWillUnmount () {
|
componentWillUnmount () {
|
||||||
if (typeof this._subscription !== 'undefined') {
|
if (this.disconnect) {
|
||||||
this._subscription.close();
|
this.disconnect();
|
||||||
this._subscription = null;
|
this.disconnect = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,17 +7,13 @@ import ColumnHeader from '../../components/column_header';
|
||||||
import {
|
import {
|
||||||
refreshHashtagTimeline,
|
refreshHashtagTimeline,
|
||||||
expandHashtagTimeline,
|
expandHashtagTimeline,
|
||||||
updateTimeline,
|
|
||||||
deleteFromTimelines,
|
|
||||||
} from '../../actions/timelines';
|
} from '../../actions/timelines';
|
||||||
import { addColumn, removeColumn, moveColumn } from '../../actions/columns';
|
import { addColumn, removeColumn, moveColumn } from '../../actions/columns';
|
||||||
import { FormattedMessage } from 'react-intl';
|
import { FormattedMessage } from 'react-intl';
|
||||||
import createStream from '../../stream';
|
import { connectHashtagStream } from '../../actions/streaming';
|
||||||
|
|
||||||
const mapStateToProps = (state, props) => ({
|
const mapStateToProps = (state, props) => ({
|
||||||
hasUnread: state.getIn(['timelines', `hashtag:${props.params.id}`, 'unread']) > 0,
|
hasUnread: state.getIn(['timelines', `hashtag:${props.params.id}`, 'unread']) > 0,
|
||||||
streamingAPIBaseURL: state.getIn(['meta', 'streaming_api_base_url']),
|
|
||||||
accessToken: state.getIn(['meta', 'access_token']),
|
|
||||||
});
|
});
|
||||||
|
|
||||||
@connect(mapStateToProps)
|
@connect(mapStateToProps)
|
||||||
|
@ -27,8 +23,6 @@ export default class HashtagTimeline extends React.PureComponent {
|
||||||
params: PropTypes.object.isRequired,
|
params: PropTypes.object.isRequired,
|
||||||
columnId: PropTypes.string,
|
columnId: PropTypes.string,
|
||||||
dispatch: PropTypes.func.isRequired,
|
dispatch: PropTypes.func.isRequired,
|
||||||
streamingAPIBaseURL: PropTypes.string.isRequired,
|
|
||||||
accessToken: PropTypes.string.isRequired,
|
|
||||||
hasUnread: PropTypes.bool,
|
hasUnread: PropTypes.bool,
|
||||||
multiColumn: PropTypes.bool,
|
multiColumn: PropTypes.bool,
|
||||||
};
|
};
|
||||||
|
@ -53,28 +47,13 @@ export default class HashtagTimeline extends React.PureComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
_subscribe (dispatch, id) {
|
_subscribe (dispatch, id) {
|
||||||
const { streamingAPIBaseURL, accessToken } = this.props;
|
this.disconnect = dispatch(connectHashtagStream(id));
|
||||||
|
|
||||||
this.subscription = createStream(streamingAPIBaseURL, accessToken, `hashtag&tag=${id}`, {
|
|
||||||
|
|
||||||
received (data) {
|
|
||||||
switch(data.event) {
|
|
||||||
case 'update':
|
|
||||||
dispatch(updateTimeline(`hashtag:${id}`, JSON.parse(data.payload)));
|
|
||||||
break;
|
|
||||||
case 'delete':
|
|
||||||
dispatch(deleteFromTimelines(data.payload));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_unsubscribe () {
|
_unsubscribe () {
|
||||||
if (typeof this.subscription !== 'undefined') {
|
if (this.disconnect) {
|
||||||
this.subscription.close();
|
this.disconnect();
|
||||||
this.subscription = null;
|
this.disconnect = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,15 +7,11 @@ import ColumnHeader from '../../components/column_header';
|
||||||
import {
|
import {
|
||||||
refreshPublicTimeline,
|
refreshPublicTimeline,
|
||||||
expandPublicTimeline,
|
expandPublicTimeline,
|
||||||
updateTimeline,
|
|
||||||
deleteFromTimelines,
|
|
||||||
connectTimeline,
|
|
||||||
disconnectTimeline,
|
|
||||||
} from '../../actions/timelines';
|
} from '../../actions/timelines';
|
||||||
import { addColumn, removeColumn, moveColumn } from '../../actions/columns';
|
import { addColumn, removeColumn, moveColumn } from '../../actions/columns';
|
||||||
import { defineMessages, injectIntl, FormattedMessage } from 'react-intl';
|
import { defineMessages, injectIntl, FormattedMessage } from 'react-intl';
|
||||||
import ColumnSettingsContainer from './containers/column_settings_container';
|
import ColumnSettingsContainer from './containers/column_settings_container';
|
||||||
import createStream from '../../stream';
|
import { connectPublicStream } from '../../actions/streaming';
|
||||||
|
|
||||||
const messages = defineMessages({
|
const messages = defineMessages({
|
||||||
title: { id: 'column.public', defaultMessage: 'Federated timeline' },
|
title: { id: 'column.public', defaultMessage: 'Federated timeline' },
|
||||||
|
@ -23,8 +19,6 @@ const messages = defineMessages({
|
||||||
|
|
||||||
const mapStateToProps = state => ({
|
const mapStateToProps = state => ({
|
||||||
hasUnread: state.getIn(['timelines', 'public', 'unread']) > 0,
|
hasUnread: state.getIn(['timelines', 'public', 'unread']) > 0,
|
||||||
streamingAPIBaseURL: state.getIn(['meta', 'streaming_api_base_url']),
|
|
||||||
accessToken: state.getIn(['meta', 'access_token']),
|
|
||||||
});
|
});
|
||||||
|
|
||||||
@connect(mapStateToProps)
|
@connect(mapStateToProps)
|
||||||
|
@ -36,8 +30,6 @@ export default class PublicTimeline extends React.PureComponent {
|
||||||
intl: PropTypes.object.isRequired,
|
intl: PropTypes.object.isRequired,
|
||||||
columnId: PropTypes.string,
|
columnId: PropTypes.string,
|
||||||
multiColumn: PropTypes.bool,
|
multiColumn: PropTypes.bool,
|
||||||
streamingAPIBaseURL: PropTypes.string.isRequired,
|
|
||||||
accessToken: PropTypes.string.isRequired,
|
|
||||||
hasUnread: PropTypes.bool,
|
hasUnread: PropTypes.bool,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -61,46 +53,16 @@ export default class PublicTimeline extends React.PureComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
componentDidMount () {
|
componentDidMount () {
|
||||||
const { dispatch, streamingAPIBaseURL, accessToken } = this.props;
|
const { dispatch } = this.props;
|
||||||
|
|
||||||
dispatch(refreshPublicTimeline());
|
dispatch(refreshPublicTimeline());
|
||||||
|
this.disconnect = dispatch(connectPublicStream());
|
||||||
if (typeof this._subscription !== 'undefined') {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this._subscription = createStream(streamingAPIBaseURL, accessToken, 'public', {
|
|
||||||
|
|
||||||
connected () {
|
|
||||||
dispatch(connectTimeline('public'));
|
|
||||||
},
|
|
||||||
|
|
||||||
reconnected () {
|
|
||||||
dispatch(connectTimeline('public'));
|
|
||||||
},
|
|
||||||
|
|
||||||
disconnected () {
|
|
||||||
dispatch(disconnectTimeline('public'));
|
|
||||||
},
|
|
||||||
|
|
||||||
received (data) {
|
|
||||||
switch(data.event) {
|
|
||||||
case 'update':
|
|
||||||
dispatch(updateTimeline('public', JSON.parse(data.payload)));
|
|
||||||
break;
|
|
||||||
case 'delete':
|
|
||||||
dispatch(deleteFromTimelines(data.payload));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
componentWillUnmount () {
|
componentWillUnmount () {
|
||||||
if (typeof this._subscription !== 'undefined') {
|
if (this.disconnect) {
|
||||||
this._subscription.close();
|
this.disconnect();
|
||||||
this._subscription = null;
|
this.disconnect = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Reference in a new issue