import { Injectable, NgZone } from '@angular/core';
import { Observable, of } from 'rxjs';
import { finalize } from 'rxjs/operators';

@Injectable()
export class EventSourceUtil {
  static create<T>(url: string, zone: NgZone, type?: new (it: object) => T): Observable<T> {
    if (!EventSource) {
      // No support for Server-Sent Events.
      // Should consider either polling for latest event or providing a polyfill.
      return of(null);
    }

    const eventSource = new EventSource(url);
    return new Observable<T>((observer) => {
      eventSource.onmessage = (event) => {
        zone.run(() => observer.next(type ? new type(JSON.parse(event.data)) : event.data));
      };
      window.addEventListener('beforeunload', () => {
        eventSource.close();
        observer.complete();
      });
    }).pipe(finalize(() => eventSource.close()));
  }
}
