cancel TileLoader job when thread shouldPause

- handle Exceptions in PbfTileDataSource, cleanup in finally{}
This commit is contained in:
Hannes Janetzek 2014-01-19 17:10:27 +01:00
parent 0710e5776e
commit ec8821e557
3 changed files with 65 additions and 61 deletions

View File

@ -16,6 +16,8 @@
*/ */
package org.oscim.layers.tile.vector; package org.oscim.layers.tile.vector;
import java.util.concurrent.CancellationException;
import org.oscim.core.GeometryBuffer.GeometryType; import org.oscim.core.GeometryBuffer.GeometryType;
import org.oscim.core.MapElement; import org.oscim.core.MapElement;
import org.oscim.core.MercatorProjection; import org.oscim.core.MercatorProjection;
@ -131,14 +133,18 @@ public class VectorTileLoader extends TileLoader implements IRenderTheme.Callbac
mTile = tile; mTile = tile;
mTile.layers = new ElementLayers(); mTile.layers = new ElementLayers();
QueryResult result = null;
// query database, which calls 'process' callback try {
QueryResult result = mTileDataSource.executeQuery(mTile, this); // query database, which calls 'process' callback
result = mTileDataSource.executeQuery(mTile, this);
mTile = null; } catch (CancellationException e) {
log.debug("canceled {}", mTile);
clearState(); } catch (Exception e) {
log.debug("{}", e);
} finally {
mTile = null;
clearState();
}
return (result == QueryResult.SUCCESS); return (result == QueryResult.SUCCESS);
} }
@ -205,8 +211,12 @@ public class VectorTileLoader extends TileLoader implements IRenderTheme.Callbac
@Override @Override
public void process(MapElement element) { public void process(MapElement element) {
clearState(); clearState();
if (isCanceled())
throw new CancellationException();
mElement = element; mElement = element;
if (element.type == GeometryType.POINT) { if (element.type == GeometryType.POINT) {

View File

@ -56,6 +56,7 @@ public class LwHttp {
private Socket mSocket; private Socket mSocket;
private OutputStream mCommandStream; private OutputStream mCommandStream;
private InputStream mResponseStream; private InputStream mResponseStream;
private OutputStream mCacheOutputStream;
private long mLastRequest = 0; private long mLastRequest = 0;
private SocketAddress mSockAddr; private SocketAddress mSockAddr;
@ -106,40 +107,34 @@ public class LwHttp {
} }
static class Buffer extends BufferedInputStream { static class Buffer extends BufferedInputStream {
final OutputStream mCache; final OutputStream mCacheOutputstream;
public Buffer(InputStream is, OutputStream cache) { public Buffer(InputStream is, OutputStream cache) {
super(is, 4096); super(is, 4096);
mCache = cache; mCacheOutputstream = cache;
} }
@Override @Override
public synchronized int read() throws IOException { public int read() throws IOException {
int data = super.read(); int data = super.read();
if (data >= 0) if (data >= 0)
mCache.write(data); mCacheOutputstream.write(data);
return data; return data;
} }
@Override @Override
public synchronized int read(byte[] buffer, int offset, int byteCount) public int read(byte[] buffer, int offset, int byteCount)
throws IOException { throws IOException {
int len = super.read(buffer, offset, byteCount); int len = super.read(buffer, offset, byteCount);
if (len >= 0) if (len >= 0)
mCache.write(buffer, offset, len); mCacheOutputstream.write(buffer, offset, len);
return len; return len;
} }
} }
OutputStream mCacheOutputStream;
public void setOutputStream(OutputStream outputStream) {
mCacheOutputStream = outputStream;
}
public void close() { public void close() {
if (mSocket != null) { if (mSocket != null) {
try { try {
@ -177,6 +172,10 @@ public class LwHttp {
while (end < read && (buf[end] != '\n')) while (end < read && (buf[end] != '\n'))
end++; end++;
if (end == BUFFER_SIZE) {
return null;
}
if (buf[end] != '\n') if (buf[end] != '\n')
continue; continue;
@ -209,7 +208,7 @@ public class LwHttp {
if (!ok) { if (!ok) {
String line = new String(buf, pos, end - pos - 1); String line = new String(buf, pos, end - pos - 1);
log.debug(">" + line + "< "); log.debug("> {} <", line);
} }
pos += (end - pos) + 1; pos += (end - pos) + 1;
@ -255,12 +254,14 @@ public class LwHttp {
mMaxReq = RESPONSE_EXPECTED_LIVES; mMaxReq = RESPONSE_EXPECTED_LIVES;
// log.debug("create connection"); // log.debug("create connection");
} else { } else {
// FIXME not sure if this is correct way to drain socket
int avail = mResponseStream.available(); int avail = mResponseStream.available();
if (avail > 0) { if (avail > 0) {
log.debug("Consume left-over bytes: " + avail); log.debug("left-over bytes: " + avail);
while ((avail = mResponseStream.available()) > 0) close();
mResponseStream.read(buffer); lwHttpConnect();
// FIXME not sure if this is correct way to drain socket
//while ((avail = mResponseStream.available()) > 0)
// mResponseStream.read(buffer);
} }
} }
@ -353,8 +354,16 @@ public class LwHttp {
return true; return true;
} }
public void requestCompleted() { public void setOutputStream(OutputStream outputStream) {
mCacheOutputStream = outputStream;
}
public void requestCompleted(boolean keepConnection) {
mLastRequest = System.nanoTime(); mLastRequest = System.nanoTime();
mCacheOutputStream = null;
if (!keepConnection)
close();
} }
public int getContentLength() { public int getContentLength() {

View File

@ -30,10 +30,6 @@ import org.oscim.utils.IOUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/**
*
*
*/
public abstract class PbfTileDataSource implements ITileDataSource { public abstract class PbfTileDataSource implements ITileDataSource {
static final Logger log = LoggerFactory.getLogger(PbfTileDataSource.class); static final Logger log = LoggerFactory.getLogger(PbfTileDataSource.class);
@ -48,66 +44,55 @@ public abstract class PbfTileDataSource implements ITileDataSource {
@Override @Override
public QueryResult executeQuery(MapTile tile, ITileDataSink sink) { public QueryResult executeQuery(MapTile tile, ITileDataSink sink) {
boolean success = true;
ITileCache.TileWriter cacheWriter = null; ITileCache.TileWriter cacheWriter = null;
if (mTileCache != null) { if (mTileCache != null) {
ITileCache.TileReader c = mTileCache.getTile(tile); ITileCache.TileReader c = mTileCache.getTile(tile);
if (c == null) { if (c == null) {
// create new cache entry
cacheWriter = mTileCache.writeTile(tile); cacheWriter = mTileCache.writeTile(tile);
mConn.setOutputStream(cacheWriter.getOutputStream());
} else { } else {
InputStream is = c.getInputStream();
try { try {
InputStream is = c.getInputStream();
if (mTileDecoder.decode(tile, sink, is, c.getBytes())) { if (mTileDecoder.decode(tile, sink, is, c.getBytes())) {
IOUtils.closeQuietly(is);
return QueryResult.SUCCESS; return QueryResult.SUCCESS;
} }
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} finally {
IOUtils.closeQuietly(is);
} }
log.debug(tile + " Cache read failed");
} }
} }
boolean success = false;
try { try {
if (cacheWriter != null)
mConn.setOutputStream(cacheWriter.getOutputStream());
InputStream is; InputStream is;
if (!mConn.sendRequest(tile)) { if (!mConn.sendRequest(tile)) {
log.debug(tile + " Request failed"); log.debug("{} Request failed", tile);
success = false; } else if ((is = mConn.readHeader()) == null) {
} else if ((is = mConn.readHeader()) != null) { log.debug("{} Network Error", tile);
} else {
int bytes = mConn.getContentLength(); int bytes = mConn.getContentLength();
success = mTileDecoder.decode(tile, sink, is, bytes); success = mTileDecoder.decode(tile, sink, is, bytes);
if (!success)
log.debug(tile + " Decoding failed");
} else {
log.debug(tile + " Network Error");
success = false;
} }
} catch (SocketException e) { } catch (SocketException e) {
log.debug(tile + " Socket exception: " + e.getMessage()); log.debug("{} Socket exception: {}", tile, e.getMessage());
success = false;
} catch (SocketTimeoutException e) { } catch (SocketTimeoutException e) {
log.debug(tile + " Socket Timeout"); log.debug("{} Socket Timeout", tile);
success = false;
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
log.debug(tile + " No Network"); log.debug("{} No Network", tile);
success = false; } catch (IOException e) {
} catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
success = false; } finally {
mConn.requestCompleted(success);
if (cacheWriter != null)
cacheWriter.complete(success);
} }
mConn.requestCompleted();
if (cacheWriter != null)
cacheWriter.complete(success);
if (success)
mConn.close();
return success ? QueryResult.SUCCESS : QueryResult.FAILED; return success ? QueryResult.SUCCESS : QueryResult.FAILED;
} }