Since i am having trouble with OpenCV's mjpeg stream handling e.g. for logitech cams I'd like to create an mjpeg stream handler using rxjava. The goal is to create an Image Observer that is fed from an mjpeg url. Please find below the current state of the code.
There are several issues for which I'd like to get solutions/answers:
With those two answers i asume this could be an nice solution. Any other thoughts?
Unit Test
@Test
public void testMJpegStream() throws Exception {
// Dorf Appenzell
String url="http://213.193.89.202/axis-cgi/mjpg/video.cgi";
//url="http://localhost:8081?type=simulator&mode=stream";
//url="http://picarford:8080/?action=stream";
MJpegHandler mjpegHandler=new MJpegHandler(url);
MJpegDecoder.debug=true;
int bufferSize = 1024 * 64; // 64 KByte Buffer
MJpegDecoder mjpegDecoder = mjpegHandler.open(bufferSize);
Thread.sleep(1000);
mjpegDecoder.close();
}
MJpegHandler
package org.rcdukes.imageview;
import static org.asynchttpclient.Dsl.asyncHttpClient;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.Future;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Response;
import org.asynchttpclient.handler.BodyDeferringAsyncHandler;
import org.asynchttpclient.handler.BodyDeferringAsyncHandler.BodyDeferringInputStream;
public class MJpegHandler {
AsyncHttpClient asyncHttpClient;
private PipedInputStream pipedInputStream;
private PipedOutputStream pipedOutputStream;
private BodyDeferringAsyncHandler outputHandler;
BodyDeferringInputStream inputStream;
public BodyDeferringInputStream getInputStream() {
return inputStream;
}
/**
* get an mjpeg stream from the given url
*
* @param url
* @return - the MJPeg Stream
* @throws Exception
*/
public MJpegHandler(String url) throws Exception {
// https://stackoverflow.com/a/50402629/1497139
asyncHttpClient = asyncHttpClient();
asyncHttpClient.prepareGet(url);
pipedInputStream = new PipedInputStream();
pipedOutputStream = new PipedOutputStream(
pipedInputStream);
outputHandler = new BodyDeferringAsyncHandler(
pipedOutputStream);
Future<Response> futureResponse = asyncHttpClient.prepareGet(url)
.execute(outputHandler);
Response response = outputHandler.getResponse();
if (response.getStatusCode() == 200) {
inputStream=new BodyDeferringAsyncHandler.BodyDeferringInputStream(
futureResponse, outputHandler, pipedInputStream);
}
}
public MJpegHandler() {
}
/**
* open me with the given bufferSize
*
* @param url
* @return
* @throws Exception
*/
public MJpegDecoder open(int bufferSize) throws Exception {
MJpegDecoder mjpegDecoder = new MJpegDecoder(this);
mjpegDecoder.open(bufferSize);
return mjpegDecoder;
}
/**
* close this handler
* @throws IOException
*/
public void close() throws IOException {
this.asyncHttpClient.close();
}
}
MJpegDecoder
package org.rcdukes.imageview;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import org.asynchttpclient.handler.BodyDeferringAsyncHandler.BodyDeferringInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.observables.StringObservable;
import rx.schedulers.Schedulers;
/**
* reactive MJPegDecoder
*
* @author wf
*
*/
public class MJpegDecoder extends Subscriber<byte[]> {
protected static final Logger LOG = LoggerFactory
.getLogger(MJpegDecoder.class);
int prev = 0;
int cur = 0;
private ByteArrayOutputStream jpgOut;
private byte[] curFrame;
public static boolean debug = false;
private int bufferIndex = 0;
private int frameIndex = 0;
FileOutputStream fos;
private int bufferSize;
private Observable<byte[]> mjpegSubscription;
private MJpegHandler mjpegHandler;
/**
* open the decoder for the given stream
*
* @param mJpegHandler
* @param bufferSize
*/
public MJpegDecoder(MJpegHandler mJpegHandler) {
this.mjpegHandler = mJpegHandler;
this.curFrame = new byte[0];
if (debug) {
try {
fos = new FileOutputStream("/tmp/decoder.mjpg");
} catch (FileNotFoundException e) {
handle(e);
}
}
}
@Override
public void onCompleted() {
try {
this.mjpegHandler.close();
if (fos != null) {
fos.close();
}
fos = null;
} catch (IOException e) {
onError(e);
}
}
@Override
public void onError(Throwable e) {
handle(e);
}
@Override
public void onNext(byte[] buffer) {
if (debug) {
String msg = String.format("buffer %6d available %9d kB read",
++bufferIndex, bufferIndex * bufferSize / 1024);
LOG.info(msg);
try {
fos.write(curFrame);
fos.flush();
} catch (IOException e) {
handle(e);
}
}
// loop over all bytes in the buffer
for (int cur : buffer) {
// Content-Type: multipart/x-mixed-replace; boundary=
// will have -- we could detect it here
if (debug) {
if (prev == 0x2D && cur == 0x2D) {
LOG.info("boundary detected");
}
}
// check for JPEG start bytes
if (prev == 0xFFFFFFFF && cur == 0xFFFFFFD8) {
if (debug) {
String msg = String.format("frame %6d started", frameIndex + 1);
LOG.info(msg);
}
jpgOut = new ByteArrayOutputStream(bufferSize);
// first byte needs to be written to output
jpgOut.write((byte) prev);
}
// if within the frame write all bytes
if (jpgOut != null) {
jpgOut.write((byte) cur);
// check for JPEG end bytes
// if found the frame is finished
if (prev == 0xFFFFFFFF && cur == 0xFFFFFFD9) {
// create the byte array of the current jpeg frame
curFrame = jpgOut.toByteArray();
try {
jpgOut.close();
jpgOut = null;
} catch (IOException e) {
onError(e);
}
if (debug) {
String msg = String.format("frame %6d available", ++frameIndex);
LOG.info(msg);
}
// emit the current frame
}
}
prev = cur;
}
}
private void handle(Throwable th) {
LOG.error(th.getMessage());
if (debug)
th.printStackTrace();
}
/**
* open me with the given bufferSize
*
* @param bufferSize
* e.g. 64 KByte Buffer - 10.5 msecs/100 FPS at 1920x1080
* 1000/(1920*1080*3/1024/64)
*/
public void open(int bufferSize) {
this.bufferSize = bufferSize;
BodyDeferringInputStream inputStream = this.mjpegHandler.getInputStream();
if (inputStream != null) {
mjpegSubscription = StringObservable.from(inputStream, bufferSize)
.subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread());
mjpegSubscription.subscribe(this);
}
}
/**
* close me
*/
public void close() {
this.unsubscribe();
this.onCompleted();
}
}
User contributions licensed under CC BY-SA 3.0