import { AbsoluteTimeRange } from '../prototypes/DatasetMessages';
import * as StreamingServicePb from '../prototypes/streamingService';
import { MAGIC_NUMBER_FIRST_DATASET } from './constants';
import { convertInt128ToAbsoluteTime } from '../utils/time.utils';
import { createStreamingServiceClient } from './dataset.utils';
import { ServerStreamingCall } from '@protobuf-ts/runtime-rpc';
import { TokenContainer } from './streaming-token.types';

export type SpecificStreamingCall = ServerStreamingCall<
  StreamingServicePb.StreamDatasetsParameters,
  StreamingServicePb.StreamDatasetsResponse
>;

export type TimeRangeAbs = {
  startTime: bigint;
  stopTime: bigint;
};

export const prepareStream = (
  token: TokenContainer,
  locatorId?: string,
  timeRangeAbs?: TimeRangeAbs,
  abortController?: AbortController
): SpecificStreamingCall => {
  const identifier = StreamingServicePb.DataSourceIdentifier.create();
  identifier.identifiers = {
    oneofKind: 'token',
    token: token.token!
  };
  const params = StreamingServicePb.StreamDatasetsParameters.create();
  const dataset = StreamingServicePb.StreamDatasetsParameters_SubscribedDataset.create();
  dataset.subscribeId = MAGIC_NUMBER_FIRST_DATASET;
  const locator = StreamingServicePb.DatasetLocator.create();
  locator.dataSource = identifier;
  locator.id = locatorId ?? '';
  dataset.locator = locator;
  if (timeRangeAbs && timeRangeAbs.stopTime > 0n) {
    const timeRange = AbsoluteTimeRange.create();
    timeRange.start = convertInt128ToAbsoluteTime(timeRangeAbs?.startTime, false); //.BBM_ABSOLUTE_TIME_MIN();
    timeRange.stop = convertInt128ToAbsoluteTime(timeRangeAbs?.stopTime, false); //BBM_ABSOLUTE_TIME_MAX();
    dataset.timeRange = timeRange;
  }

  params.subscribedDatasets.push(dataset);
  const streamingServiceClient = createStreamingServiceClient(token.webgrpcServerURL!, abortController);
  return streamingServiceClient.streamDatasets(params);
};
