rxjava based mjpegdecoder does not work as expected

0

Since i am having trouble with OpenCV's mjpeg stream handling e.g. for logitech cams I'd like to create an mjpeg stream handler using rxjava. The goal is to create an Image Observer that is fed from an mjpeg url. Please find below the current state of the code.

There are several issues for which I'd like to get solutions/answers:

  1. Why does it seem that the StringObserable returns always the same first frame instead of moving on in the original inputstream on each onNext subscription?
  2. How can the Subscriber be mapped to be an Emitter for JPeg images?

With those two answers i asume this could be an nice solution. Any other thoughts?

Unit Test

  @Test
  public void testMJpegStream() throws Exception {
    // Dorf Appenzell
    String url="http://213.193.89.202/axis-cgi/mjpg/video.cgi";
    //url="http://localhost:8081?type=simulator&mode=stream";
    //url="http://picarford:8080/?action=stream";
    MJpegHandler mjpegHandler=new MJpegHandler(url);
    MJpegDecoder.debug=true;
    int bufferSize = 1024 * 64; // 64 KByte Buffer
    MJpegDecoder mjpegDecoder = mjpegHandler.open(bufferSize);
    Thread.sleep(1000);
    mjpegDecoder.close();
  }

MJpegHandler

package org.rcdukes.imageview;

import static org.asynchttpclient.Dsl.asyncHttpClient;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.Future;

import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Response;
import org.asynchttpclient.handler.BodyDeferringAsyncHandler;
import org.asynchttpclient.handler.BodyDeferringAsyncHandler.BodyDeferringInputStream;

public class MJpegHandler {

  AsyncHttpClient asyncHttpClient;
  private PipedInputStream pipedInputStream;
  private PipedOutputStream pipedOutputStream;
  private BodyDeferringAsyncHandler outputHandler;
  BodyDeferringInputStream inputStream;

  public BodyDeferringInputStream getInputStream() {
    return inputStream;
  }

  /**
   * get an mjpeg stream from the given url
   * 
   * @param url
   * @return - the MJPeg Stream
   * @throws Exception
   */
  public MJpegHandler(String url) throws Exception {
    // https://stackoverflow.com/a/50402629/1497139
    asyncHttpClient = asyncHttpClient();
    asyncHttpClient.prepareGet(url);
    pipedInputStream = new PipedInputStream();
    pipedOutputStream = new PipedOutputStream(
        pipedInputStream);
    outputHandler = new BodyDeferringAsyncHandler(
        pipedOutputStream);
    Future<Response> futureResponse = asyncHttpClient.prepareGet(url)
        .execute(outputHandler);
    Response response = outputHandler.getResponse();
    if (response.getStatusCode() == 200) {
      inputStream=new BodyDeferringAsyncHandler.BodyDeferringInputStream(
          futureResponse, outputHandler, pipedInputStream);
    } 
  }

  public MJpegHandler() {
  }

  /**
   * open me with the given bufferSize
   * 
   * @param url
   * @return
   * @throws Exception
   */
  public MJpegDecoder open(int bufferSize) throws Exception {
    MJpegDecoder mjpegDecoder = new MJpegDecoder(this);
    mjpegDecoder.open(bufferSize);
    return mjpegDecoder;
  }

  /**
   * close this handler
   * @throws IOException
   */
  public void close() throws IOException {
     this.asyncHttpClient.close();
  }

}

MJpegDecoder

package org.rcdukes.imageview;

import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;

import org.asynchttpclient.handler.BodyDeferringAsyncHandler.BodyDeferringInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.Observable;
import rx.Subscriber;
import rx.observables.StringObservable;
import rx.schedulers.Schedulers;

/**
 * reactive MJPegDecoder
 * 
 * @author wf
 *
 */
public class MJpegDecoder extends Subscriber<byte[]> {
  protected static final Logger LOG = LoggerFactory
      .getLogger(MJpegDecoder.class);

  int prev = 0;
  int cur = 0;
  private ByteArrayOutputStream jpgOut;
  private byte[] curFrame;
  public static boolean debug = false;
  private int bufferIndex = 0;
  private int frameIndex = 0;
  FileOutputStream fos;

  private int bufferSize;

  private Observable<byte[]> mjpegSubscription;

  private MJpegHandler mjpegHandler;

  /**
   * open the decoder for the given stream
   * 
   * @param mJpegHandler
   * @param bufferSize
   */
  public MJpegDecoder(MJpegHandler mJpegHandler) {
    this.mjpegHandler = mJpegHandler;
    this.curFrame = new byte[0];
    if (debug) {
      try {
        fos = new FileOutputStream("/tmp/decoder.mjpg");
      } catch (FileNotFoundException e) {
        handle(e);
      }
    }
  }

  @Override
  public void onCompleted() {
    try {
      this.mjpegHandler.close();
      if (fos != null) {
        fos.close();
      }
      fos = null;
    } catch (IOException e) {
      onError(e);
    }
  }

  @Override
  public void onError(Throwable e) {
    handle(e);
  }

  @Override
  public void onNext(byte[] buffer) {
    if (debug) {
      String msg = String.format("buffer %6d available %9d kB read",
          ++bufferIndex, bufferIndex * bufferSize / 1024);
      LOG.info(msg);
      try {
        fos.write(curFrame);
        fos.flush();
      } catch (IOException e) {
        handle(e);
      }

    }
    // loop over all bytes in the buffer
    for (int cur : buffer) {
      // Content-Type: multipart/x-mixed-replace; boundary=
      // will have -- we could detect it here
      if (debug) {
        if (prev == 0x2D && cur == 0x2D) {
          LOG.info("boundary detected");
        }
      }
      // check for JPEG start bytes
      if (prev == 0xFFFFFFFF && cur == 0xFFFFFFD8) {
        if (debug) {
          String msg = String.format("frame %6d started", frameIndex + 1);
          LOG.info(msg);
        }
        jpgOut = new ByteArrayOutputStream(bufferSize);
        // first byte needs to be written to output
        jpgOut.write((byte) prev);
      }
      // if within the frame write all bytes
      if (jpgOut != null) {
        jpgOut.write((byte) cur);
        // check for JPEG end bytes
        // if found the frame is finished
        if (prev == 0xFFFFFFFF && cur == 0xFFFFFFD9) {
          // create the byte array of the current jpeg frame
          curFrame = jpgOut.toByteArray();
          try {
            jpgOut.close();
            jpgOut = null;
          } catch (IOException e) {
            onError(e);
          }

          if (debug) {
            String msg = String.format("frame %6d available", ++frameIndex);
            LOG.info(msg);
          }

          // emit the current frame
        }
      }
      prev = cur;
    }
  }

  private void handle(Throwable th) {
    LOG.error(th.getMessage());
    if (debug)
      th.printStackTrace();
  }

  /**
   * open me with the given bufferSize
   * 
   * @param bufferSize
   *          e.g. 64 KByte Buffer - 10.5 msecs/100 FPS at 1920x1080
   *          1000/(1920*1080*3/1024/64)
   */
  public void open(int bufferSize) {
    this.bufferSize = bufferSize;
    BodyDeferringInputStream inputStream = this.mjpegHandler.getInputStream();
    if (inputStream != null) {
      mjpegSubscription = StringObservable.from(inputStream, bufferSize)
          .subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread());
      mjpegSubscription.subscribe(this);
    }
  }

  /**
   * close me
   */
  public void close() {
    this.unsubscribe();
    this.onCompleted();
  }

}
java
rx-java
rx-java2
asked on Stack Overflow Jan 24, 2020 by Wolfgang Fahl

0 Answers

Nobody has answered this question yet.


User contributions licensed under CC BY-SA 3.0