forked from MetaMask/eth-block-tracker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPollingBlockTracker.ts
More file actions
126 lines (108 loc) · 3.52 KB
/
PollingBlockTracker.ts
File metadata and controls
126 lines (108 loc) · 3.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import getCreateRandomId from 'json-rpc-random-id';
import pify from 'pify';
import { JsonRpcRequest } from 'json-rpc-engine';
import { BaseBlockTracker, Provider } from './BaseBlockTracker';
const createRandomId = getCreateRandomId();
const sec = 1000;
interface PollingBlockTrackerArgs {
provider: Provider;
pollingInterval: number;
retryTimeout: number;
keepEventLoopActive: boolean;
setSkipCacheFlag: boolean;
}
interface ExtendedJsonRpcRequest<T> extends JsonRpcRequest<T> {
skipCache?: boolean;
}
export class PollingBlockTracker extends BaseBlockTracker {
private _provider: Provider;
private _pollingInterval: number;
private _retryTimeout: number;
private _keepEventLoopActive: boolean;
private _setSkipCacheFlag: boolean;
constructor(opts: Partial<PollingBlockTrackerArgs> = {}) {
// parse + validate args
if (!opts.provider) {
throw new Error('PollingBlockTracker - no provider specified.');
}
super({
blockResetDuration: opts.pollingInterval,
});
// config
this._provider = opts.provider;
this._pollingInterval = opts.pollingInterval || 20 * sec;
this._retryTimeout = opts.retryTimeout || this._pollingInterval / 10;
this._keepEventLoopActive =
opts.keepEventLoopActive === undefined ? true : opts.keepEventLoopActive;
this._setSkipCacheFlag = opts.setSkipCacheFlag || false;
}
// trigger block polling
async checkForLatestBlock() {
await this._updateLatestBlock();
return await this.getLatestBlock();
}
protected _start(): void {
this._synchronize().catch((err) => this.emit('error', err));
}
private async _synchronize(): Promise<void> {
while (this._isRunning) {
try {
await this._updateLatestBlock();
await timeout(this._pollingInterval, !this._keepEventLoopActive);
} catch (err: any) {
const newErr = new Error(
`PollingBlockTracker - encountered an error while attempting to update latest block:\n${
err.stack ?? err
}`,
);
try {
this.emit('error', newErr);
} catch (emitErr) {
console.error(newErr);
}
await timeout(this._retryTimeout, !this._keepEventLoopActive);
}
}
}
private async _updateLatestBlock(): Promise<void> {
// fetch + set latest block
const latestBlock = await this._fetchLatestBlock();
this._newPotentialLatest(latestBlock);
}
private async _fetchLatestBlock(): Promise<string> {
const req: ExtendedJsonRpcRequest<[]> = {
jsonrpc: '2.0',
id: createRandomId(),
method: 'eth_blockNumber',
params: [],
};
if (this._setSkipCacheFlag) {
req.skipCache = true;
}
const res = await pify((cb) => this._provider.sendAsync(req, cb))();
if (res.error) {
throw new Error(
`PollingBlockTracker - encountered error fetching block:\n${res.error}`,
);
}
return res.result;
}
}
/**
* Waits for the specified amount of time.
*
* @param duration - The amount of time in milliseconds.
* @param unref - Assuming this function is run in a Node context, governs
* whether Node should wait before the `setTimeout` has completed before ending
* the process (true for no, false for yes). Defaults to false.
* @returns A promise that can be used to wait.
*/
function timeout(duration: number, unref: boolean) {
return new Promise((resolve) => {
const timeoutRef = setTimeout(resolve, duration);
// don't keep process open
if (timeoutRef.unref && unref) {
timeoutRef.unref();
}
});
}