@@ -16,8 +16,8 @@ import {
1616 CreateCodeCommentMutationVariables ,
1717 CreateCommentMutationVariables ,
1818} from 'app/graphql/types' ;
19- import { Blocker , blocker } from 'app/utils/blocker' ;
2019import { camelizeKeys } from 'humps' ;
20+ import { debounce } from 'lodash-es' ;
2121import { SerializedTextOperation , TextOperation } from 'ot' ;
2222import { Channel , Presence , Socket } from 'phoenix' ;
2323import uuid from 'uuid' ;
@@ -52,22 +52,13 @@ declare global {
5252 }
5353}
5454
55- const TIME_TO_THROTTLE_SOLO_MODE_SENDS = 2000 ;
56-
5755class Live {
5856 private identifier = uuid . v4 ( ) ;
5957 private pendingMessages = new Map ( ) ;
6058 private debug = _debug ( 'cs:socket' ) ;
6159 private channel : Channel | null ;
6260 private messageIndex = 0 ;
63- private awaitSendTimer : number ;
6461 private socket : Socket ;
65- /*
66- Since in "Solo mode" we want to batch up operations and other events later,
67- we use a blocker to just hold the sending of the messages until an additional
68- connection enters
69- */
70- private awaitSend : Blocker < void > | null = blocker < void > ( ) ;
7162 private presence : Presence ;
7263 private provideJwtToken : ( ) => string ;
7364 private onApplyOperation : ( moduleShortid : string , operation : any ) => void ;
@@ -93,48 +84,12 @@ class Live {
9384 }
9485
9586 private connectionsCount = 0 ;
96- private setAwaitSend ( ) {
97- this . awaitSend = blocker ( ) ;
98- clearTimeout ( this . awaitSendTimer ) ;
99- this . awaitSendTimer = window . setTimeout ( async ( ) => {
100- if ( this . connectionsCount === 1 ) {
101- // We await the currently resolved blocker before setting it back,
102- // so that messages gets through
103- await this . resolveAwaitSend ( ) ;
104- this . setAwaitSend ( ) ;
105- }
106- } , TIME_TO_THROTTLE_SOLO_MODE_SENDS ) ;
107- }
108-
109- private resolveAwaitSend ( ) {
110- if ( ! this . awaitSend ) {
111- return Promise . resolve ( ) ;
112- }
113- clearTimeout ( this . awaitSendTimer ) ;
114- const awaitSend = this . awaitSend ;
115- this . awaitSend = null ;
116- awaitSend . resolve ( ) ;
117- return awaitSend . promise ;
118- }
119-
120- private async awaitSynchronizedModule ( moduleShortid : string ) {
121- const client = clients . get ( moduleShortid ) ;
122- if ( client . awaitSynchronized ) {
123- await client . awaitSynchronized . promise ;
124- }
125- }
12687
12788 private onSendOperation = async (
12889 moduleShortid : string ,
12990 revision : number ,
13091 operation : TextOperation
13192 ) => {
132- // If we are to await a send, we do it. It will be resolved
133- // related to number of connections changing
134- if ( this . awaitSend ) {
135- await this . awaitSend . promise ;
136- }
137-
13893 logBreadcrumb ( {
13994 category : 'ot' ,
14095 message : `Sending ${ JSON . stringify ( {
@@ -260,16 +215,9 @@ class Live {
260215 reconnect_token : result . reconnectToken ,
261216 } ) ;
262217
263- this . setBlocker ( result . roomInfo . users . length ) ;
264- /*
265- When active we activate or deactivate the sending blocker depending
266- on the number of connections we have. When "solo" we hold operation messages
267- until we get a new connection. If we go back to "solo" we bring in the blocker
268- again
269- */
270218 this . presence = new Presence ( this . channel ) ;
271219 this . presence . onSync ( ( ) => {
272- this . setBlocker ( this . presence . list ( ) . length ) ;
220+ this . connectionsCount = this . presence . list ( ) . length ;
273221 } ) ;
274222
275223 resolve ( result ) ;
@@ -315,7 +263,7 @@ class Live {
315263 } ;
316264 }
317265
318- private sendImmediately < T > ( event , payload : any = { } ) : Promise < T > {
266+ private send < T > ( event , payload : any = { } ) : Promise < T > {
319267 const _messageId = this . identifier + this . messageIndex ++ ;
320268 // eslint-disable-next-line
321269 payload . _messageId = _messageId ;
@@ -335,36 +283,20 @@ class Live {
335283 } ) ;
336284 }
337285
338- send ( event : string , payload ?: { _messageId ?: string ; [ key : string ] : any } ) {
339- if ( this . awaitSend ) {
340- return Promise . resolve ( ) ;
341- }
342-
343- return this . sendImmediately ( event , payload ) ;
286+ saveModule ( moduleShortid : string ) {
287+ return this . send ( 'save' , {
288+ moduleShortid,
289+ } ) ;
344290 }
345291
346- async saveModule ( moduleShortid : string ) {
347- this . sendAfterSynchronized ( moduleShortid , 'save' ) ;
348- }
349-
350- async saveCodeComment (
351- moduleShortid : string ,
352- commentPayload : CreateCodeCommentMutationVariables
353- ) {
354- return this . sendAfterSynchronized < CommentFragment > (
355- moduleShortid ,
356- 'save:comment' ,
357- commentPayload
358- ) ;
292+ async saveCodeComment ( commentPayload : CreateCodeCommentMutationVariables ) {
293+ return this . send < CommentFragment > ( 'save:comment' , commentPayload ) ;
359294 }
360295
361296 async saveComment (
362297 commentPayload : CreateCommentMutationVariables
363298 ) : Promise < CommentFragment > {
364- return this . sendImmediately < CommentFragment > (
365- 'save:comment' ,
366- commentPayload
367- ) ;
299+ return this . send < CommentFragment > ( 'save:comment' , commentPayload ) ;
368300 }
369301
370302 sendModuleUpdate ( module : Module ) {
@@ -485,14 +417,22 @@ class Live {
485417 }
486418
487419 sendModuleStateSyncRequest ( ) {
488- return this . sendImmediately ( 'live:module_state' , { } ) ;
420+ return this . send ( 'live:module_state' , { } ) ;
489421 }
490422
491423 sendUserViewRange (
492424 moduleShortid : string | null ,
493425 liveUserId : string ,
494426 viewRange : UserViewRange
495427 ) {
428+ if ( this . connectionsCount === 1 ) {
429+ return this . sendDebounced ( 'user:view-range' , {
430+ liveUserId,
431+ moduleShortid,
432+ viewRange,
433+ } ) ;
434+ }
435+
496436 return this . send ( 'user:view-range' , {
497437 liveUserId,
498438 moduleShortid,
@@ -505,6 +445,13 @@ class Live {
505445 liveUserId : string ,
506446 selection : any
507447 ) {
448+ if ( this . connectionsCount === 1 ) {
449+ return this . sendDebounced ( 'user:selection' , {
450+ liveUserId,
451+ moduleShortid,
452+ selection,
453+ } ) ;
454+ }
508455 return this . send ( 'user:selection' , {
509456 liveUserId,
510457 moduleShortid,
@@ -514,10 +461,6 @@ class Live {
514461
515462 reset ( ) {
516463 clients . clear ( ) ;
517-
518- clearTimeout ( this . awaitSendTimer ) ;
519-
520- this . awaitSend = null ;
521464 }
522465
523466 resetClient ( moduleShortid : string , revision : number ) {
@@ -553,35 +496,12 @@ class Live {
553496 ) ;
554497 }
555498
556- private async sendAfterSynchronized < T > (
557- moduleShortid : string ,
558- event : string ,
559- payload ?
560- ) : Promise < T > {
561- /*
562- If we save a module we will temporarily lift the message blocker,
563- passing any operations through. As soon as the client of the module
564- is back in synchronized state we can move on with the save
565- */
566- if ( this . awaitSend ) {
567- this . resolveAwaitSend ( ) ;
568- await this . awaitSynchronizedModule ( moduleShortid ) ;
569- this . setAwaitSend ( ) ;
570- }
571-
572- return this . sendImmediately ( event , payload ) ;
573- }
574-
575- private setBlocker ( newCount : number ) {
576- const currentCount = this . connectionsCount ;
577-
578- this . connectionsCount = newCount ;
579- if ( currentCount !== 1 && this . connectionsCount === 1 ) {
580- this . setAwaitSend ( ) ;
581- } else if ( currentCount === 1 && this . connectionsCount > 1 ) {
582- this . resolveAwaitSend ( ) ;
583- }
584- }
499+ private sendDebounced = debounce < ( event : string , payload : any ) => void > (
500+ ( ...args ) => {
501+ this . send ( ...args ) ;
502+ } ,
503+ 500
504+ ) ;
585505}
586506
587507export default new Live ( ) ;
0 commit comments