Skip to content

Commit d9e2a69

Browse files
save operations
2 parents a42398e + dd93dc8 commit d9e2a69

File tree

8 files changed

+177
-294
lines changed

8 files changed

+177
-294
lines changed

packages/app/src/app/overmind/effects/live/clients.ts

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { OTClient } from './ot/client';
1+
import { Client, TextOperation } from 'ot';
22

33
export type SendOperation = (
44
moduleShortid: string,
@@ -8,24 +8,11 @@ export type SendOperation = (
88

99
export type ApplyOperation = (moduleShortid: string, operation: any) => void;
1010

11-
function operationToElixir(ot) {
12-
return ot.map(op => {
13-
if (typeof op === 'number') {
14-
if (op < 0) {
15-
return { d: -op };
16-
}
17-
18-
return op;
19-
}
20-
21-
return { i: op };
22-
});
23-
}
24-
25-
class CodeSandboxOTClient extends OTClient {
11+
class CodeSandboxOTClient extends Client {
2612
moduleShortid: string;
2713
onSendOperation: (revision: number, operation: any) => Promise<unknown>;
2814
onApplyOperation: (operation: any) => void;
15+
saveOperation: TextOperation | null;
2916

3017
constructor(
3118
revision: number,
@@ -40,15 +27,20 @@ class CodeSandboxOTClient extends OTClient {
4027
}
4128

4229
sendOperation(revision, operation) {
43-
this.onSendOperation(revision, operationToElixir(operation.toJSON())).then(
44-
() => {
45-
this.serverAck();
46-
}
47-
);
30+
this.onSendOperation(revision, operation).then(() => {
31+
this.serverAck();
32+
});
4833
}
4934

50-
applyOperation(operation) {
51-
this.onApplyOperation(operation);
35+
flush() {
36+
const saveOperation = this.saveOperation;
37+
38+
this.saveOperation = null;
39+
40+
return {
41+
revision: this.revision,
42+
operation: saveOperation,
43+
};
5244
}
5345

5446
serverAck() {
@@ -61,16 +53,25 @@ class CodeSandboxOTClient extends OTClient {
6153
}
6254
}
6355

64-
applyClient(operation: any) {
65-
super.applyClient(operation);
56+
revertFlush(operation) {
57+
if (this.saveOperation) {
58+
this.saveOperation = operation.compose(this.saveOperation);
59+
} else {
60+
this.saveOperation = operation;
61+
}
6662
}
6763

68-
applyServer(operation: any) {
69-
super.applyServer(operation);
64+
applyOperation(operation) {
65+
this.onApplyOperation(operation);
7066
}
7167

72-
serverReconnect() {
73-
super.serverReconnect();
68+
applyClient(operation: any) {
69+
if (this.saveOperation) {
70+
this.saveOperation = this.saveOperation.compose(operation);
71+
} else {
72+
this.saveOperation = operation;
73+
}
74+
super.applyClient(operation);
7475
}
7576
}
7677

packages/app/src/app/overmind/effects/live/index.ts

Lines changed: 85 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,20 @@ import {
44
Module,
55
RoomInfo,
66
} from '@codesandbox/common/lib/types';
7+
import { logBreadcrumb } from '@codesandbox/common/lib/utils/analytics/sentry';
78
import _debug from '@codesandbox/common/lib/utils/debug';
89
import { camelizeKeys } from 'humps';
910
import { TextOperation } from 'ot';
10-
import { Socket, Channel } from 'phoenix';
11+
import { Channel, Socket } from 'phoenix';
1112
import uuid from 'uuid';
1213

13-
import { logBreadcrumb } from '@codesandbox/common/lib/utils/analytics/sentry';
14-
import clientsFactory from './clients';
1514
import { OPTIMISTIC_ID_PREFIX } from '../utils';
15+
import clientsFactory from './clients';
1616

1717
type Options = {
1818
onApplyOperation(args: { moduleShortid: string; operation: any }): void;
1919
provideJwtToken(): string;
20+
getConnectionsCount(): number;
2021
};
2122

2223
type JoinChannelResponse = {
@@ -30,21 +31,20 @@ declare global {
3031
}
3132
}
3233

33-
const identifier = uuid.v4();
34-
const sentMessages = new Map();
35-
const debug = _debug('cs:socket');
34+
class Live {
35+
private identifier = uuid.v4();
36+
private sentMessages = new Map();
37+
private debug = _debug('cs:socket');
38+
private channel: Channel | null;
39+
private messageIndex = 0;
40+
private clients: ReturnType<typeof clientsFactory>;
41+
private _socket: Socket;
3642

37-
let channel: Channel | null;
38-
let messageIndex = 0;
39-
let clients: ReturnType<typeof clientsFactory>;
40-
let _socket: Socket;
41-
let provideJwtToken: () => string;
43+
private provideJwtToken: () => string;
44+
private getConnectionsCount: () => number;
4245

43-
export default new (class Live {
4446
initialize(options: Options) {
45-
const live = this;
46-
47-
clients = clientsFactory(
47+
this.clients = clientsFactory(
4848
(moduleShortid, revision, operation) => {
4949
logBreadcrumb({
5050
type: 'ot',
@@ -55,11 +55,7 @@ export default new (class Live {
5555
})}`,
5656
});
5757

58-
return live.send('operation', {
59-
moduleShortid,
60-
operation,
61-
revision,
62-
});
58+
return this.sendOperation(moduleShortid, revision, operation);
6359
},
6460
(moduleShortid, operation) => {
6561
options.onApplyOperation({
@@ -68,62 +64,63 @@ export default new (class Live {
6864
});
6965
}
7066
);
71-
provideJwtToken = options.provideJwtToken;
67+
this.provideJwtToken = options.provideJwtToken;
68+
this.getConnectionsCount = options.getConnectionsCount;
7269
}
7370

7471
getSocket() {
75-
return _socket || this.connect();
72+
return this._socket || this.connect();
7673
}
7774

7875
connect(): Socket {
79-
if (!_socket) {
76+
if (!this._socket) {
8077
const protocol = process.env.LOCAL_SERVER ? 'ws' : 'wss';
81-
_socket = new Socket(`${protocol}://${location.host}/socket`, {
78+
this._socket = new Socket(`${protocol}://${location.host}/socket`, {
8279
params: {
83-
guardian_token: provideJwtToken(),
80+
guardian_token: this.provideJwtToken(),
8481
},
8582
});
8683

87-
_socket.onClose(e => {
84+
this._socket.onClose(e => {
8885
if (e.code === 1006) {
8986
// This is an abrupt close, the server probably restarted or carshed. We don't want to overload
9087
// the server, so we manually wait and try to connect;
91-
_socket.disconnect();
88+
this._socket.disconnect();
9289

9390
const waitTime = 500 + 5000 * Math.random();
9491

9592
setTimeout(() => {
96-
_socket.connect();
93+
this._socket.connect();
9794
}, waitTime);
9895
}
9996
});
10097

101-
_socket.connect();
102-
window.socket = _socket;
103-
debug('Connecting to socket', _socket);
98+
this._socket.connect();
99+
window.socket = this._socket;
100+
this.debug('Connecting to socket', this._socket);
104101
}
105102

106-
return _socket;
103+
return this._socket;
107104
}
108105

109106
disconnect() {
110107
return new Promise((resolve, reject) => {
111-
if (!channel) {
108+
if (!this.channel) {
112109
resolve({});
113110
return;
114111
}
115112

116-
channel
113+
this.channel
117114
.leave()
118115
.receive('ok', resp => {
119-
if (!channel) {
116+
if (!this.channel) {
120117
return resolve({});
121118
}
122119

123-
channel.onMessage = d => d;
124-
channel = null;
125-
sentMessages.clear();
126-
messageIndex = 0;
120+
this.channel.onMessage = d => d;
121+
this.channel = null;
122+
this.sentMessages.clear();
123+
this.messageIndex = 0;
127124

128125
return resolve(resp);
129126
})
@@ -134,9 +131,9 @@ export default new (class Live {
134131

135132
joinChannel(roomId: string): Promise<JoinChannelResponse> {
136133
return new Promise((resolve, reject) => {
137-
channel = this.getSocket().channel(`live:${roomId}`, { version: 2 });
134+
this.channel = this.getSocket().channel(`live:${roomId}`, { version: 2 });
138135

139-
channel
136+
this.channel
140137
.join()
141138
.receive('ok', resp => {
142139
const result = camelizeKeys(resp) as JoinChannelResponse;
@@ -154,18 +151,18 @@ export default new (class Live {
154151
data: object;
155152
}) => {}
156153
) {
157-
if (!channel) {
154+
if (!this.channel) {
158155
return;
159156
}
160157

161-
channel.onMessage = (event: any, data: any) => {
158+
this.channel.onMessage = (event: any, data: any) => {
162159
const disconnected =
163160
(data == null || Object.keys(data).length === 0) &&
164161
event === 'phx_error';
165162
const alteredEvent = disconnected ? 'connection-loss' : event;
166163

167164
const _isOwnMessage = Boolean(
168-
data && data._messageId && sentMessages.delete(data._messageId)
165+
data && data._messageId && this.sentMessages.delete(data._messageId)
169166
);
170167

171168
if (event === 'phx_reply' || event.startsWith('chan_reply_')) {
@@ -184,14 +181,18 @@ export default new (class Live {
184181
}
185182

186183
send(event: string, payload: { _messageId?: string; [key: string]: any }) {
187-
const _messageId = identifier + messageIndex++;
184+
if (this.getConnectionsCount() < 2) {
185+
return Promise.resolve();
186+
}
187+
188+
const _messageId = this.identifier + this.messageIndex++;
188189
// eslint-disable-next-line
189190
payload._messageId = _messageId;
190-
sentMessages.set(_messageId, payload);
191+
this.sentMessages.set(_messageId, payload);
191192

192193
return new Promise((resolve, reject) => {
193-
if (channel) {
194-
channel
194+
if (this.channel) {
195+
this.channel
195196
.push(event, payload)
196197
.receive('ok', resolve)
197198
.receive('error', reject);
@@ -231,7 +232,7 @@ export default new (class Live {
231232
}
232233

233234
try {
234-
clients.get(moduleShortid).applyClient(operation);
235+
this.clients.get(moduleShortid).applyClient(operation);
235236
} catch (e) {
236237
// Something went wrong, probably a sync mismatch. Request new version
237238
this.send('live:module_state', {});
@@ -342,31 +343,59 @@ export default new (class Live {
342343
});
343344
}
344345

346+
getClient(moduleShortid: string) {
347+
return this.clients.get(moduleShortid);
348+
}
349+
345350
getAllClients() {
346-
return clients.getAll();
351+
return this.clients.getAll();
347352
}
348353

349354
applyClient(moduleShortid: string, operation: any) {
350-
return clients
355+
return this.clients
351356
.get(moduleShortid)
352357
.applyClient(TextOperation.fromJSON(operation));
353358
}
354359

355360
applyServer(moduleShortid: string, operation: any) {
356-
return clients
361+
return this.clients
357362
.get(moduleShortid)
358363
.applyServer(TextOperation.fromJSON(operation));
359364
}
360365

361366
serverAck(moduleShortid: string) {
362-
return clients.get(moduleShortid).serverAck();
367+
return this.clients.get(moduleShortid).serverAck();
363368
}
364369

365370
createClient(moduleShortid: string, revision: number) {
366-
return clients.create(moduleShortid, revision);
371+
return this.clients.create(moduleShortid, revision);
367372
}
368373

369374
resetClients() {
370-
clients.clear();
375+
this.clients.clear();
371376
}
372-
})();
377+
378+
private operationToElixir(ot) {
379+
return ot.map(op => {
380+
if (typeof op === 'number') {
381+
if (op < 0) {
382+
return { d: -op };
383+
}
384+
385+
return op;
386+
}
387+
388+
return { i: op };
389+
});
390+
}
391+
392+
private sendOperation(moduleShortid, revision, operation) {
393+
return this.send('operation', {
394+
moduleShortid,
395+
operation: this.operationToElixir(operation.toJSON()),
396+
revision: Number(revision),
397+
});
398+
}
399+
}
400+
401+
export default new Live();

0 commit comments

Comments
 (0)