Home Reference Source

src/demux/transmuxer-interface.ts

  1. import work from './webworkify-webpack';
  2. import { Events } from '../events';
  3. import Transmuxer, {
  4. TransmuxConfig,
  5. TransmuxState,
  6. isPromise,
  7. } from '../demux/transmuxer';
  8. import { logger } from '../utils/logger';
  9. import { ErrorTypes, ErrorDetails } from '../errors';
  10. import { getMediaSource } from '../utils/mediasource-helper';
  11. import { EventEmitter } from 'eventemitter3';
  12. import { Fragment, Part } from '../loader/fragment';
  13. import type { ChunkMetadata, TransmuxerResult } from '../types/transmuxer';
  14. import type Hls from '../hls';
  15. import type { HlsEventEmitter } from '../events';
  16. import type { PlaylistLevelType } from '../types/loader';
  17. import type { TypeSupported } from './tsdemuxer';
  18.  
  19. const MediaSource = getMediaSource() || { isTypeSupported: () => false };
  20.  
  21. export default class TransmuxerInterface {
  22. private hls: Hls;
  23. private id: PlaylistLevelType;
  24. private observer: HlsEventEmitter;
  25. private frag: Fragment | null = null;
  26. private part: Part | null = null;
  27. private useWorker: boolean;
  28. private worker: any;
  29. private onwmsg?: Function;
  30. private transmuxer: Transmuxer | null = null;
  31. private onTransmuxComplete: (transmuxResult: TransmuxerResult) => void;
  32. private onFlush: (chunkMeta: ChunkMetadata) => void;
  33.  
  34. constructor(
  35. hls: Hls,
  36. id: PlaylistLevelType,
  37. onTransmuxComplete: (transmuxResult: TransmuxerResult) => void,
  38. onFlush: (chunkMeta: ChunkMetadata) => void
  39. ) {
  40. const config = hls.config;
  41. this.hls = hls;
  42. this.id = id;
  43. this.useWorker = !!config.enableWorker;
  44. this.onTransmuxComplete = onTransmuxComplete;
  45. this.onFlush = onFlush;
  46.  
  47. const forwardMessage = (ev, data) => {
  48. data = data || {};
  49. data.frag = this.frag;
  50. data.id = this.id;
  51. this.hls.trigger(ev, data);
  52. };
  53.  
  54. // forward events to main thread
  55. this.observer = new EventEmitter() as HlsEventEmitter;
  56. this.observer.on(Events.FRAG_DECRYPTED, forwardMessage);
  57. this.observer.on(Events.ERROR, forwardMessage);
  58.  
  59. const typeSupported: TypeSupported = {
  60. mp4: MediaSource.isTypeSupported('video/mp4'),
  61. mpeg: MediaSource.isTypeSupported('audio/mpeg'),
  62. mp3: MediaSource.isTypeSupported('audio/mp4; codecs="mp3"'),
  63. };
  64. // navigator.vendor is not always available in Web Worker
  65. // refer to https://developer.mozilla.org/en-US/docs/Web/API/WorkerGlobalScope/navigator
  66. const vendor = navigator.vendor;
  67. if (this.useWorker && typeof Worker !== 'undefined') {
  68. logger.log('demuxing in webworker');
  69. let worker;
  70. try {
  71. worker = this.worker = work(
  72. require.resolve('../demux/transmuxer-worker.ts')
  73. );
  74. this.onwmsg = this.onWorkerMessage.bind(this);
  75. worker.addEventListener('message', this.onwmsg);
  76. worker.onerror = (event) => {
  77. this.useWorker = false;
  78. logger.warn('Exception in webworker, fallback to inline');
  79. this.hls.trigger(Events.ERROR, {
  80. type: ErrorTypes.OTHER_ERROR,
  81. details: ErrorDetails.INTERNAL_EXCEPTION,
  82. fatal: false,
  83. event: 'demuxerWorker',
  84. error: new Error(
  85. `${event.message} (${event.filename}:${event.lineno})`
  86. ),
  87. });
  88. };
  89. worker.postMessage({
  90. cmd: 'init',
  91. typeSupported: typeSupported,
  92. vendor: vendor,
  93. id: id,
  94. config: JSON.stringify(config),
  95. });
  96. } catch (err) {
  97. logger.warn('Error in worker:', err);
  98. logger.error(
  99. 'Error while initializing DemuxerWorker, fallback to inline'
  100. );
  101. if (worker) {
  102. // revoke the Object URL that was used to create transmuxer worker, so as not to leak it
  103. self.URL.revokeObjectURL(worker.objectURL);
  104. }
  105. this.transmuxer = new Transmuxer(
  106. this.observer,
  107. typeSupported,
  108. config,
  109. vendor,
  110. id
  111. );
  112. this.worker = null;
  113. }
  114. } else {
  115. this.transmuxer = new Transmuxer(
  116. this.observer,
  117. typeSupported,
  118. config,
  119. vendor,
  120. id
  121. );
  122. }
  123. }
  124.  
  125. destroy(): void {
  126. const w = this.worker;
  127. if (w) {
  128. w.removeEventListener('message', this.onwmsg);
  129. w.terminate();
  130. this.worker = null;
  131. this.onwmsg = undefined;
  132. } else {
  133. const transmuxer = this.transmuxer;
  134. if (transmuxer) {
  135. transmuxer.destroy();
  136. this.transmuxer = null;
  137. }
  138. }
  139. const observer = this.observer;
  140. if (observer) {
  141. observer.removeAllListeners();
  142. }
  143. this.frag = null;
  144. // @ts-ignore
  145. this.observer = null;
  146. // @ts-ignore
  147. this.hls = null;
  148. }
  149.  
  150. push(
  151. data: ArrayBuffer,
  152. initSegmentData: Uint8Array | undefined,
  153. audioCodec: string | undefined,
  154. videoCodec: string | undefined,
  155. frag: Fragment,
  156. part: Part | null,
  157. duration: number,
  158. accurateTimeOffset: boolean,
  159. chunkMeta: ChunkMetadata,
  160. defaultInitPTS?: number
  161. ): void {
  162. chunkMeta.transmuxing.start = self.performance.now();
  163. const { transmuxer, worker } = this;
  164. const timeOffset = part ? part.start : frag.start;
  165. // TODO: push "clear-lead" decrypt data for unencrypted fragments in streams with encrypted ones
  166. const decryptdata = frag.decryptdata;
  167. const lastFrag = this.frag;
  168.  
  169. const discontinuity = !(lastFrag && frag.cc === lastFrag.cc);
  170. const trackSwitch = !(lastFrag && chunkMeta.level === lastFrag.level);
  171. const snDiff = lastFrag ? chunkMeta.sn - (lastFrag.sn as number) : -1;
  172. const partDiff = this.part ? chunkMeta.part - this.part.index : -1;
  173. const progressive =
  174. snDiff === 0 &&
  175. chunkMeta.id > 1 &&
  176. chunkMeta.id === lastFrag?.stats.chunkCount;
  177. const contiguous =
  178. !trackSwitch &&
  179. (snDiff === 1 ||
  180. (snDiff === 0 && (partDiff === 1 || (progressive && partDiff <= 0))));
  181. const now = self.performance.now();
  182.  
  183. if (trackSwitch || snDiff || frag.stats.parsing.start === 0) {
  184. frag.stats.parsing.start = now;
  185. }
  186. if (part && (partDiff || !contiguous)) {
  187. part.stats.parsing.start = now;
  188. }
  189. const initSegmentChange = !(
  190. lastFrag && frag.initSegment?.url === lastFrag.initSegment?.url
  191. );
  192. const state = new TransmuxState(
  193. discontinuity,
  194. contiguous,
  195. accurateTimeOffset,
  196. trackSwitch,
  197. timeOffset,
  198. initSegmentChange
  199. );
  200. if (!contiguous || discontinuity || initSegmentChange) {
  201. logger.log(`[transmuxer-interface, ${frag.type}]: Starting new transmux session for sn: ${chunkMeta.sn} p: ${chunkMeta.part} level: ${chunkMeta.level} id: ${chunkMeta.id}
  202. discontinuity: ${discontinuity}
  203. trackSwitch: ${trackSwitch}
  204. contiguous: ${contiguous}
  205. accurateTimeOffset: ${accurateTimeOffset}
  206. timeOffset: ${timeOffset}
  207. initSegmentChange: ${initSegmentChange}`);
  208. const config = new TransmuxConfig(
  209. audioCodec,
  210. videoCodec,
  211. initSegmentData,
  212. duration,
  213. defaultInitPTS
  214. );
  215. this.configureTransmuxer(config);
  216. }
  217.  
  218. this.frag = frag;
  219. this.part = part;
  220.  
  221. // Frags with sn of 'initSegment' are not transmuxed
  222. if (worker) {
  223. // post fragment payload as transferable objects for ArrayBuffer (no copy)
  224. worker.postMessage(
  225. {
  226. cmd: 'demux',
  227. data,
  228. decryptdata,
  229. chunkMeta,
  230. state,
  231. },
  232. data instanceof ArrayBuffer ? [data] : []
  233. );
  234. } else if (transmuxer) {
  235. const transmuxResult = transmuxer.push(
  236. data,
  237. decryptdata,
  238. chunkMeta,
  239. state
  240. );
  241. if (isPromise(transmuxResult)) {
  242. transmuxResult.then((data) => {
  243. this.handleTransmuxComplete(data);
  244. });
  245. } else {
  246. this.handleTransmuxComplete(transmuxResult as TransmuxerResult);
  247. }
  248. }
  249. }
  250.  
  251. flush(chunkMeta: ChunkMetadata) {
  252. chunkMeta.transmuxing.start = self.performance.now();
  253. const { transmuxer, worker } = this;
  254. if (worker) {
  255. worker.postMessage({
  256. cmd: 'flush',
  257. chunkMeta,
  258. });
  259. } else if (transmuxer) {
  260. const transmuxResult = transmuxer.flush(chunkMeta);
  261. if (isPromise(transmuxResult)) {
  262. transmuxResult.then((data) => {
  263. this.handleFlushResult(data, chunkMeta);
  264. });
  265. } else {
  266. this.handleFlushResult(
  267. transmuxResult as Array<TransmuxerResult>,
  268. chunkMeta
  269. );
  270. }
  271. }
  272. }
  273.  
  274. private handleFlushResult(
  275. results: Array<TransmuxerResult>,
  276. chunkMeta: ChunkMetadata
  277. ) {
  278. results.forEach((result) => {
  279. this.handleTransmuxComplete(result);
  280. });
  281. this.onFlush(chunkMeta);
  282. }
  283.  
  284. private onWorkerMessage(ev: any): void {
  285. const data = ev.data;
  286. const hls = this.hls;
  287. switch (data.event) {
  288. case 'init': {
  289. // revoke the Object URL that was used to create transmuxer worker, so as not to leak it
  290. self.URL.revokeObjectURL(this.worker.objectURL);
  291. break;
  292. }
  293.  
  294. case 'transmuxComplete': {
  295. this.handleTransmuxComplete(data.data);
  296. break;
  297. }
  298.  
  299. case 'flush': {
  300. this.onFlush(data.data);
  301. break;
  302. }
  303.  
  304. // pass logs from the worker thread to the main logger
  305. case 'workerLog':
  306. if (logger[data.data.logType]) {
  307. logger[data.data.logType](data.data.message);
  308. }
  309. break;
  310.  
  311. default: {
  312. data.data = data.data || {};
  313. data.data.frag = this.frag;
  314. data.data.id = this.id;
  315. hls.trigger(data.event, data.data);
  316. break;
  317. }
  318. }
  319. }
  320.  
  321. private configureTransmuxer(config: TransmuxConfig) {
  322. const { worker, transmuxer } = this;
  323. if (worker) {
  324. worker.postMessage({
  325. cmd: 'configure',
  326. config,
  327. });
  328. } else if (transmuxer) {
  329. transmuxer.configure(config);
  330. }
  331. }
  332.  
  333. private handleTransmuxComplete(result: TransmuxerResult) {
  334. result.chunkMeta.transmuxing.end = self.performance.now();
  335. this.onTransmuxComplete(result);
  336. }
  337. }