Skip to content

Commit

Permalink
Make interface implementation consistent among ExtractorOutputs
Browse files Browse the repository at this point in the history
The method track(int id) currently has different behaviours across
implementations. This CL maps ids to track outputs, which means
that successive calls with the same id will return the same
TrackOutput instance. Also fixes TsExtractor inconsistent behavior
after a seek.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=136026721
  • Loading branch information
AquilesCanta authored and ojw28 committed Oct 14, 2016
1 parent ff712ae commit e685edc
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class AdtsReaderTest extends TestCase {

@Override
protected void setUp() throws Exception {
FakeExtractorOutput fakeExtractorOutput = new FakeExtractorOutput(true);
FakeExtractorOutput fakeExtractorOutput = new FakeExtractorOutput();
adtsOutput = fakeExtractorOutput.track(0);
id3Output = fakeExtractorOutput.track(1);
adtsReader = new AdtsReader(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public interface Extractor {
boolean sniff(ExtractorInput input) throws IOException, InterruptedException;

/**
* Initializes the extractor with an {@link ExtractorOutput}.
* Initializes the extractor with an {@link ExtractorOutput}. Called at most once.
*
* @param output An {@link ExtractorOutput} to receive extracted data.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@
public interface ExtractorOutput {

/**
* Called when the {@link Extractor} identifies the existence of a track in the stream.
* Called by the {@link Extractor} to get the {@link TrackOutput} for a specific track.
* <p>
* Returns a {@link TrackOutput} that will receive track level data belonging to the track.
* The same {@link TrackOutput} is returned if multiple calls are made with the same
* {@code trackId}.
*
* @param trackId A unique track identifier.
* @return The {@link TrackOutput} that should receive track level data belonging to the track.
* @param trackId A track identifier.
* @return The {@link TrackOutput} for the given track identifier.
*/
TrackOutput track(int trackId);

/**
* Called when all tracks have been identified, meaning that {@link #track(int)} will not be
* called again.
* Called when all tracks have been identified, meaning no new {@code trackId} values will be
* passed to {@link #track(int)}.
*/
void endTracks();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public Extractor[] createExtractors() {

};


public static final int TS_STREAM_TYPE_MPA = 0x03;
public static final int TS_STREAM_TYPE_MPA_LSF = 0x04;
public static final int TS_STREAM_TYPE_AAC = 0x0F;
Expand Down Expand Up @@ -92,6 +91,7 @@ public Extractor[] createExtractors() {

// Accessed only by the loading thread.
private ExtractorOutput output;
private boolean tracksEnded;
private ElementaryStreamReader id3Reader;

public TsExtractor() {
Expand Down Expand Up @@ -120,8 +120,8 @@ public TsExtractor(TimestampAdjuster timestampAdjuster,
tsScratch = new ParsableBitArray(new byte[3]);
trackIds = new SparseBooleanArray();
tsPayloadReaders = new SparseArray<>();
tsPayloadReaders.put(TS_PAT_PID, new PatReader());
continuityCounters = new SparseIntArray();
resetPayloadReaders();
}

// Extractor implementation.
Expand Down Expand Up @@ -153,11 +153,10 @@ public void init(ExtractorOutput output) {
@Override
public void seek(long position) {
timestampAdjuster.reset();
for (int i = 0; i < tsPayloadReaders.size(); i++) {
tsPayloadReaders.valueAt(i).seek();
}
tsPacketBuffer.reset();
continuityCounters.clear();
// Elementary stream readers' state should be cleared to get consistent behaviours when seeking.
resetPayloadReaders();
}

@Override
Expand Down Expand Up @@ -252,6 +251,13 @@ public int read(ExtractorInput input, PositionHolder seekPosition)

// Internals.

private void resetPayloadReaders() {
trackIds.clear();
tsPayloadReaders.clear();
tsPayloadReaders.put(TS_PAT_PID, new PatReader());
id3Reader = null;
}

/**
* Parses TS packet payload data.
*/
Expand Down Expand Up @@ -345,7 +351,7 @@ public void consume(ParsableByteArray data, boolean payloadUnitStartIndicator,
patScratch.skipBits(13); // network_PID (13)
} else {
int pid = patScratch.readBits(13);
tsPayloadReaders.put(pid, new PmtReader());
tsPayloadReaders.put(pid, new PmtReader(pid));
}
}
}
Expand All @@ -365,14 +371,16 @@ private class PmtReader extends TsPayloadReader {

private final ParsableBitArray pmtScratch;
private final ParsableByteArray sectionData;
private final int pid;

private int sectionLength;
private int sectionBytesRead;
private int crc;

public PmtReader() {
public PmtReader(int pid) {
pmtScratch = new ParsableBitArray(new byte[5]);
sectionData = new ParsableByteArray();
this.pid = pid;
}

@Override
Expand Down Expand Up @@ -466,8 +474,16 @@ public void consume(ParsableByteArray data, boolean payloadUnitStartIndicator,
tsPayloadReaders.put(elementaryPid, new PesReader(pesPayloadReader, timestampAdjuster));
}
}

output.endTracks();
if (mapByType) {
if (!tracksEnded) {
output.endTracks();
}
} else {
tsPayloadReaders.remove(TS_PAT_PID);
tsPayloadReaders.remove(pid);
output.endTracks();
}
tracksEnded = true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import android.net.Uri;
import android.os.Handler;
import android.util.SparseArray;
import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.Format;
import com.google.android.exoplayer2.FormatHolder;
Expand All @@ -41,7 +42,6 @@
import com.google.android.exoplayer2.util.Util;
import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;

/**
* A {@link MediaPeriod} that extracts data using an {@link Extractor}.
Expand All @@ -68,6 +68,7 @@
private final Runnable maybeFinishPrepareRunnable;
private final Runnable onContinueLoadingRequestedRunnable;
private final Handler handler;
private final SparseArray<DefaultTrackOutput> sampleQueues;

private Callback callback;
private SeekMap seekMap;
Expand All @@ -77,7 +78,6 @@
private boolean seenFirstTrackSelection;
private boolean notifyReset;
private int enabledTrackCount;
private DefaultTrackOutput[] sampleQueues;
private TrackGroupArray tracks;
private long durationUs;
private boolean[] trackEnabledStates;
Expand Down Expand Up @@ -131,7 +131,7 @@ public void run() {
handler = new Handler();

pendingResetPositionUs = C.TIME_UNSET;
sampleQueues = new DefaultTrackOutput[0];
sampleQueues = new SparseArray<>();
length = C.LENGTH_UNSET;
}

Expand All @@ -141,8 +141,9 @@ public void release() {
@Override
public void run() {
extractorHolder.release();
for (DefaultTrackOutput sampleQueue : sampleQueues) {
sampleQueue.disable();
int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) {
sampleQueues.valueAt(i).disable();
}
}
});
Expand Down Expand Up @@ -178,7 +179,7 @@ public long selectTracks(TrackSelection[] selections, boolean[] mayRetainStreamF
Assertions.checkState(trackEnabledStates[track]);
enabledTrackCount--;
trackEnabledStates[track] = false;
sampleQueues[track].disable();
sampleQueues.valueAt(track).disable();
streams[i] = null;
}
}
Expand All @@ -201,9 +202,10 @@ public long selectTracks(TrackSelection[] selections, boolean[] mayRetainStreamF
if (!seenFirstTrackSelection) {
// At the time of the first track selection all queues will be enabled, so we need to disable
// any that are no longer required.
for (int i = 0; i < sampleQueues.length; i++) {
int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) {
if (!trackEnabledStates[i]) {
sampleQueues[i].disable();
sampleQueues.valueAt(i).disable();
}
}
}
Expand Down Expand Up @@ -270,11 +272,12 @@ public long seekToUs(long positionUs) {
// Treat all seeks into non-seekable media as being to t=0.
positionUs = seekMap.isSeekable() ? positionUs : 0;
lastSeekPositionUs = positionUs;
int trackCount = sampleQueues.size();
// If we're not pending a reset, see if we can seek within the sample queues.
boolean seekInsideBuffer = !isPendingReset();
for (int i = 0; seekInsideBuffer && i < sampleQueues.length; i++) {
for (int i = 0; seekInsideBuffer && i < trackCount; i++) {
if (trackEnabledStates[i]) {
seekInsideBuffer = sampleQueues[i].skipToKeyframeBefore(positionUs);
seekInsideBuffer = sampleQueues.valueAt(i).skipToKeyframeBefore(positionUs);
}
}
// If we failed to seek within the sample queues, we need to restart.
Expand All @@ -284,8 +287,8 @@ public long seekToUs(long positionUs) {
if (loader.isLoading()) {
loader.cancelLoading();
} else {
for (int i = 0; i < sampleQueues.length; i++) {
sampleQueues[i].reset(trackEnabledStates[i]);
for (int i = 0; i < trackCount; i++) {
sampleQueues.valueAt(i).reset(trackEnabledStates[i]);
}
}
}
Expand All @@ -296,7 +299,7 @@ public long seekToUs(long positionUs) {
// SampleStream methods.

/* package */ boolean isReady(int track) {
return loadingFinished || (!isPendingReset() && !sampleQueues[track].isEmpty());
return loadingFinished || (!isPendingReset() && !sampleQueues.valueAt(track).isEmpty());
}

/* package */ void maybeThrowError() throws IOException {
Expand All @@ -308,7 +311,8 @@ public long seekToUs(long positionUs) {
return C.RESULT_NOTHING_READ;
}

return sampleQueues[track].readData(formatHolder, buffer, loadingFinished, lastSeekPositionUs);
return sampleQueues.valueAt(track).readData(formatHolder, buffer, loadingFinished,
lastSeekPositionUs);
}

// Loader.Callback implementation.
Expand All @@ -332,8 +336,9 @@ public void onLoadCanceled(ExtractingLoadable loadable, long elapsedRealtimeMs,
long loadDurationMs, boolean released) {
copyLengthFromLoader(loadable);
if (!released && enabledTrackCount > 0) {
for (int i = 0; i < sampleQueues.length; i++) {
sampleQueues[i].reset(trackEnabledStates[i]);
int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) {
sampleQueues.valueAt(i).reset(trackEnabledStates[i]);
}
callback.onContinueLoadingRequested(this);
}
Expand All @@ -358,11 +363,13 @@ public int onLoadError(ExtractingLoadable loadable, long elapsedRealtimeMs,

@Override
public TrackOutput track(int id) {
sampleQueues = Arrays.copyOf(sampleQueues, sampleQueues.length + 1);
DefaultTrackOutput sampleQueue = new DefaultTrackOutput(allocator);
sampleQueue.setUpstreamFormatChangeListener(this);
sampleQueues[sampleQueues.length - 1] = sampleQueue;
return sampleQueue;
DefaultTrackOutput trackOutput = sampleQueues.get(id);
if (trackOutput == null) {
trackOutput = new DefaultTrackOutput(allocator);
trackOutput.setUpstreamFormatChangeListener(this);
sampleQueues.put(id, trackOutput);
}
return trackOutput;
}

@Override
Expand Down Expand Up @@ -390,18 +397,18 @@ private void maybeFinishPrepare() {
if (released || prepared || seekMap == null || !tracksBuilt) {
return;
}
for (DefaultTrackOutput sampleQueue : sampleQueues) {
if (sampleQueue.getUpstreamFormat() == null) {
int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) {
if (sampleQueues.valueAt(i).getUpstreamFormat() == null) {
return;
}
}
loadCondition.close();
int trackCount = sampleQueues.length;
TrackGroup[] trackArray = new TrackGroup[trackCount];
trackEnabledStates = new boolean[trackCount];
durationUs = seekMap.getDurationUs();
for (int i = 0; i < trackCount; i++) {
trackArray[i] = new TrackGroup(sampleQueues[i].getUpstreamFormat());
trackArray[i] = new TrackGroup(sampleQueues.valueAt(i).getUpstreamFormat());
}
tracks = new TrackGroupArray(trackArray);
prepared = true;
Expand Down Expand Up @@ -455,26 +462,29 @@ private void configureRetry(ExtractingLoadable loadable) {
// a new load.
lastSeekPositionUs = 0;
notifyReset = prepared;
for (int i = 0; i < sampleQueues.length; i++) {
sampleQueues[i].reset(!prepared || trackEnabledStates[i]);
int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) {
sampleQueues.valueAt(i).reset(!prepared || trackEnabledStates[i]);
}
loadable.setLoadPosition(0);
}
}

private int getExtractedSamplesCount() {
int extractedSamplesCount = 0;
for (DefaultTrackOutput sampleQueue : sampleQueues) {
extractedSamplesCount += sampleQueue.getWriteIndex();
int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) {
extractedSamplesCount += sampleQueues.valueAt(i).getWriteIndex();
}
return extractedSamplesCount;
}

private long getLargestQueuedTimestampUs() {
long largestQueuedTimestampUs = Long.MIN_VALUE;
for (DefaultTrackOutput sampleQueue : sampleQueues) {
int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) {
largestQueuedTimestampUs = Math.max(largestQueuedTimestampUs,
sampleQueue.getLargestQueuedTimestampUs());
sampleQueues.valueAt(i).getLargestQueuedTimestampUs());
}
return largestQueuedTimestampUs;
}
Expand Down Expand Up @@ -523,7 +533,7 @@ public int readData(FormatHolder formatHolder, DecoderInputBuffer buffer) {

@Override
public void skipToKeyframeBefore(long timeUs) {
sampleQueues[track].skipToKeyframeBefore(timeUs);
sampleQueues.valueAt(track).skipToKeyframeBefore(timeUs);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public interface SingleTrackMetadataOutput {

// Accessed only on the loader thread.
private boolean seenTrack;
private int seenTrackId;

/**
* @param extractor The extractor to wrap.
Expand Down Expand Up @@ -116,8 +117,9 @@ public int read(ExtractorInput input) throws IOException, InterruptedException {

@Override
public TrackOutput track(int id) {
Assertions.checkState(!seenTrack);
Assertions.checkState(!seenTrack || seenTrackId == id);
seenTrack = true;
seenTrackId = id;
return this;
}

Expand Down
Loading

0 comments on commit e685edc

Please sign in to comment.