diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs b/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs index 001c3225ef..6c339cf389 100644 --- a/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs +++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs @@ -19,6 +19,7 @@ using System.Collections.Generic; using System.Globalization; using System.IO; +using System.Linq; using Org.Apache.REEF.IO.PartitionedData; using Org.Apache.REEF.IO.PartitionedData.FileSystem; using Org.Apache.REEF.IO.PartitionedData.FileSystem.Parameters; @@ -95,7 +96,7 @@ public void TestEvaluatorSideWithMultipleFilesOnePartition() Assert.NotNull(partition); Assert.NotNull(partition.Id); int count = 0; - var e = partition.GetPartitionHandle(); + var e = partition.GetPartitionHandle().First(); foreach (var v in e) { Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "Data read {0}: ", v)); @@ -173,7 +174,7 @@ public void TestWithByteDeserializer() .GetInstance>>(); using (partition as IDisposable) { - var e = partition.GetPartitionHandle(); + var e = partition.GetPartitionHandle().First(); foreach (var v in e) { Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "Data read {0}: ", v)); @@ -211,7 +212,7 @@ public void TestWithRowDeserializer() .GetInstance>>(); using (partition as IDisposable) { - IEnumerable e = partition.GetPartitionHandle(); + IEnumerable e = partition.GetPartitionHandle().First(); foreach (var row in e) { @@ -256,7 +257,7 @@ public void TestTempFileFolderWithRowDeserializer() .GetInstance>>(); using (partition as IDisposable) { - IEnumerable e = partition.GetPartitionHandle(); + IEnumerable e = partition.GetPartitionHandle().First(); foreach (var row in e) { diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestRandomInputDataSet.cs b/lang/cs/Org.Apache.REEF.IO.Tests/TestRandomInputDataSet.cs index 452c75e147..1ed0da6d06 100644 --- a/lang/cs/Org.Apache.REEF.IO.Tests/TestRandomInputDataSet.cs +++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestRandomInputDataSet.cs @@ -17,6 +17,7 @@ using System.Collections.Generic; using System.IO; +using System.Linq; using Org.Apache.REEF.IO.PartitionedData; using Org.Apache.REEF.IO.PartitionedData.Random; using Org.Apache.REEF.Tang.Implementations.Tang; @@ -89,9 +90,10 @@ public void TestEvaluatorSide() Assert.NotNull(partition); Assert.NotNull(partition.Id); - using (var partitionStream = partition.GetPartitionHandle()) + var partitionStreams = partition.GetPartitionHandle(); + Assert.NotNull(partitionStreams); + using (var partitionStream = partitionStreams.Single()) { - Assert.NotNull(partitionStream); Assert.True(partitionStream.CanRead); Assert.False(partitionStream.CanWrite); Assert.Equal(ExpectedNumberOfBytesPerPartition, partitionStream.Length); diff --git a/lang/cs/Org.Apache.REEF.IO/CacheLevel.cs b/lang/cs/Org.Apache.REEF.IO/CacheLevel.cs new file mode 100644 index 0000000000..8c1eeb4387 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IO/CacheLevel.cs @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace Org.Apache.REEF.IO +{ + /// + /// The level at which data is cached. + /// + public enum CacheLevel + { + /// + /// The data is deserialized. + /// + InMemoryMaterialized = 0, + + /// + /// The data is in memory, but as MemoryStream. + /// + InMemoryAsStream = 1, + + /// + /// The data is in disk. + /// + Disk = 2, + + /// + /// The data is not local. + /// + NotLocal = 3, + } +} \ No newline at end of file diff --git a/lang/cs/Org.Apache.REEF.IO/DataCache.cs b/lang/cs/Org.Apache.REEF.IO/DataCache.cs new file mode 100644 index 0000000000..92b56f7526 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IO/DataCache.cs @@ -0,0 +1,276 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.IO; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Exceptions; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Attributes; + +namespace Org.Apache.REEF.IO +{ + /// + /// A class that manages the caching of data at different layers of storage medium. + /// + [Unstable("0.15", "API contract may change.")] + public sealed class DataCache + { + private readonly IDataMover _dataMover; + + private Optional _diskDirectory = Optional.Empty(); + private Optional> _memStreams = Optional>.Empty(); + private Optional> _materialized = Optional>.Empty(); + + private CacheLevel _cacheLevel = CacheLevel.NotLocal; + + [Inject] + private DataCache(IDataMover dataMover) + { + _dataMover = dataMover; + } + + /// + /// The current level at which the data is cached. + /// + public CacheLevel CacheLevel + { + get { return _cacheLevel; } + } + + /// + /// The directory where cached data resides, if the data is cached on disk. + /// + public Optional DiskDirectory + { + get { return _diskDirectory; } + } + + /// + /// The collection of memory streams where cached data resides, if the data is + /// cached in memory. + /// + public Optional> MemoryStreams + { + get { return _memStreams; } + } + + /// + /// The cached collection of "materialized" objects, if the data is cached + /// in memory and deserialized. + /// + public Optional> Materialized + { + get { return _materialized; } + } + + /// + /// Cache the data to a . + /// If the data is already cached at a lower level, no action will be taken. + /// + /// Level to cache to. + /// The cached level. + public CacheLevel Cache(CacheLevel cacheToLevel) + { + if (_cacheLevel <= cacheToLevel) + { + return _cacheLevel; + } + + switch (cacheToLevel) + { + case CacheLevel.NotLocal: + return _cacheLevel; + case CacheLevel.Disk: + return CacheToDisk(); + case CacheLevel.InMemoryAsStream: + return CacheToMemory(); + case CacheLevel.InMemoryMaterialized: + return CacheToMaterialized(); + default: + throw new SystemException("Unexpected cache level " + cacheToLevel); + } + } + + /// + /// Cache to disk with the injected . + /// + /// The cached level. + private CacheLevel CacheToDisk() + { + switch (_cacheLevel) + { + case CacheLevel.NotLocal: + _diskDirectory = Optional.Of(_dataMover.RemoteToDisk()); + break; + default: + throw new SystemException( + "Unexpected cache level transition from " + _cacheLevel + " to " + CacheLevel.Disk); + } + + _cacheLevel = CacheLevel.Disk; + return _cacheLevel; + } + + /// + /// Cache to memory with the injected . + /// + /// The cached level. + public CacheLevel CacheToMemory() + { + switch (_cacheLevel) + { + case CacheLevel.Disk: + CheckDisk(); + _memStreams = Optional>.Of( + new List(_dataMover.DiskToMemory(_diskDirectory.Value))); + CleanUpDisk(); + break; + case CacheLevel.NotLocal: + _memStreams = Optional>.Of( + new List(_dataMover.RemoteToMemory())); + break; + default: + throw new SystemException( + "Unexpected cache level transition from " + _cacheLevel + " to " + CacheLevel.InMemoryAsStream); + } + + _cacheLevel = CacheLevel.InMemoryAsStream; + return _cacheLevel; + } + + /// + /// "Materializes" and deserializes the data, with an option to cache. + /// + /// The deserialized data. + public IEnumerable Materialize(bool shouldCache) + { + if (shouldCache) + { + CacheToMaterialized(); + } + + switch (_cacheLevel) + { + case CacheLevel.InMemoryAsStream: + CheckMemory(); + return _dataMover.MemoryToMaterialized(_memStreams.Value); + case CacheLevel.Disk: + CheckDisk(); + return _dataMover.DiskToMaterialized(_diskDirectory.Value); + case CacheLevel.NotLocal: + return _dataMover.RemoteToMaterialized(); + case CacheLevel.InMemoryMaterialized: + CheckMaterialized(); + return _materialized.Value; + default: + throw new IllegalStateException("Illegal cache layer."); + } + } + + /// + /// Deserialize and cache the data in memory from the level the data is currently at. + /// + private CacheLevel CacheToMaterialized() + { + switch (_cacheLevel) + { + case CacheLevel.InMemoryAsStream: + CheckMemory(); + _materialized = Optional>.Of( + new List(_dataMover.MemoryToMaterialized(_memStreams.Value))); + CleanUpMemory(); + break; + case CacheLevel.Disk: + CheckDisk(); + _materialized = Optional>.Of( + new List(_dataMover.DiskToMaterialized(_diskDirectory.Value))); + CleanUpDisk(); + break; + case CacheLevel.NotLocal: + _materialized = Optional>.Of( + new List(_dataMover.RemoteToMaterialized())); + break; + default: + throw new SystemException( + "Unexpected cache level transition from " + _cacheLevel + " to " + CacheLevel.InMemoryMaterialized); + } + + _cacheLevel = CacheLevel.InMemoryMaterialized; + return _cacheLevel; + } + + /// + /// Check that the data is "materialized." + /// + private void CheckMaterialized() + { + if (!_materialized.IsPresent()) + { + throw new IllegalStateException("Collection is expected to be materialized in memory."); + } + } + + /// + /// Check that the data is in memory as a . + /// + private void CheckMemory() + { + if (!_memStreams.IsPresent()) + { + throw new IllegalStateException("Data is expected to be in memory."); + } + } + + /// + /// Clean up the data stored as s. + /// + private void CleanUpMemory() + { + CheckMemory(); + + foreach (var memStream in _memStreams.Value) + { + memStream.Dispose(); + } + + _memStreams = Optional>.Empty(); + } + + /// + /// Check that the data is stored on disk. + /// + private void CheckDisk() + { + if (!_diskDirectory.IsPresent()) + { + throw new IllegalStateException("Disk directory is expected to be present."); + } + } + + /// + /// Clean up the data stored on disk. + /// + private void CleanUpDisk() + { + CheckDisk(); + _diskDirectory.Value.Delete(true); + _diskDirectory = Optional.Empty(); + } + } +} \ No newline at end of file diff --git a/lang/cs/Org.Apache.REEF.IO/FileStreamDataMover.cs b/lang/cs/Org.Apache.REEF.IO/FileStreamDataMover.cs new file mode 100644 index 0000000000..a56e644bc8 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IO/FileStreamDataMover.cs @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Collections.Generic; +using System.IO; +using System.Linq; +using Org.Apache.REEF.Utilities.Attributes; + +namespace Org.Apache.REEF.IO +{ + /// + /// An abstract class inherits and models + /// each file as a (and consequently each file an object + /// of type T. + /// + [Unstable("0.15", "API contract may change.")] + public abstract class FileStreamDataMover : SerializerBasedDataMover + { + protected FileStreamDataMover(ISerializer serializer) + : base(serializer) + { + } + + public override IEnumerable DiskToMemory(DirectoryInfo info) + { + foreach (var file in info.EnumerateFiles("*", SearchOption.AllDirectories)) + { + var memStream = new MemoryStream(); + using (var fileStream = new FileStream(file.FullName, FileMode.Open)) + { + fileStream.CopyTo(memStream); + } + + yield return memStream; + } + } + + public override IEnumerable DiskToMaterialized(DirectoryInfo info) + { + var fileStreams = + info + .EnumerateFiles("*", SearchOption.AllDirectories) + .Select(file => new FileStream(file.FullName, FileMode.Open)); + + return StreamToMaterialized(fileStreams); + } + } +} \ No newline at end of file diff --git a/lang/cs/Org.Apache.REEF.IO/IDataMover.cs b/lang/cs/Org.Apache.REEF.IO/IDataMover.cs new file mode 100644 index 0000000000..cdd1264522 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IO/IDataMover.cs @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Collections.Generic; +using System.IO; +using Org.Apache.REEF.Utilities.Attributes; + +namespace Org.Apache.REEF.IO +{ + /// + /// An interface that describes a contract for data movement between + /// different types of storage medium. + /// + [Unstable("0.15", "API contract may change.")] + public interface IDataMover + { + /// + /// Fetches the data from remote and stores to disk. + /// + DirectoryInfo RemoteToDisk(); + + /// + /// Fetches the data from remote and puts in memory without deserialization. + /// + IEnumerable RemoteToMemory(); + + /// + /// Fetches the data from remote and deserializes it. + /// + IEnumerable RemoteToMaterialized(); + + /// + /// Fetches the data from a directory on disk and puts in memory + /// without deserialization. + /// + /// The directory. + IEnumerable DiskToMemory(DirectoryInfo info); + + /// + /// Fetches the data from a directory on disk and deserializes it. + /// + /// + IEnumerable DiskToMaterialized(DirectoryInfo info); + + /// + /// Deserializes a memory stream. + /// + /// Memory streams to deserialize. + /// + IEnumerable MemoryToMaterialized(IEnumerable streams); + } +} \ No newline at end of file diff --git a/lang/cs/Org.Apache.REEF.IO/ISerializer.cs b/lang/cs/Org.Apache.REEF.IO/ISerializer.cs new file mode 100644 index 0000000000..8f6c8e6f5f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IO/ISerializer.cs @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Collections.Generic; +using System.IO; + +namespace Org.Apache.REEF.IO +{ + public interface ISerializer + { + void Serialize(T value, out Stream stream); + + T Deserialize(Stream stream); + } +} \ No newline at end of file diff --git a/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj b/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj index 2fda1205e0..5dac181cda 100644 --- a/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj +++ b/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj @@ -70,6 +70,8 @@ under the License. + + @@ -98,6 +100,10 @@ under the License. + + + + @@ -118,6 +124,7 @@ under the License. + diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/DefaultInputPartition.cs b/lang/cs/Org.Apache.REEF.IO/PartitionedData/DefaultInputPartition.cs new file mode 100644 index 0000000000..41652496a9 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/DefaultInputPartition.cs @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Collections.Generic; +using Org.Apache.REEF.IO.PartitionedData.Random.Parameters; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.IO.PartitionedData +{ + public sealed class DefaultInputPartition : IInputPartition + { + private readonly DataCache _dataCache; + private readonly string _partitionId; + + [Inject] + private DefaultInputPartition( + [Parameter(typeof(PartitionId))] string partitionId, + DataCache dataCache) + { + _partitionId = partitionId; + _dataCache = dataCache; + } + + public string Id + { + get { return _partitionId; } + } + + public CacheLevel Cache(CacheLevel level) + { + return _dataCache.Cache(level); + } + + public IEnumerable GetPartitionHandle() + { + return _dataCache.Materialize(false); + } + } +} \ No newline at end of file diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs index 0d17467a7e..a8e6a0beb5 100644 --- a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs +++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs @@ -26,7 +26,6 @@ using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Attributes; -using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.IO.PartitionedData.FileSystem @@ -71,13 +70,22 @@ public string Id /// /// Caches from the remote File System to a local disk. /// - public void Cache() + public CacheLevel Cache(CacheLevel level) { lock (_lock) { - if (!_localFiles.IsPresent()) + if (level <= CacheLevel.Disk) { - _localFiles = Optional>.Of(Download()); + if (!_localFiles.IsPresent()) + { + _localFiles = Optional>.Of(Download()); + } + + return CacheLevel.Disk; + } + else + { + return _localFiles.IsPresent() ? CacheLevel.Disk : CacheLevel.NotLocal; } } } @@ -123,7 +131,7 @@ private ISet Download() /// provided by the Serializer /// /// - public T GetPartitionHandle() + public IEnumerable GetPartitionHandle() { lock (_lock) { @@ -131,15 +139,15 @@ public T GetPartitionHandle() { if (!_localFiles.IsPresent()) { - Cache(); + Cache(CacheLevel.Disk); } // For now, assume IFileDeSerializer is local. - return _fileSerializer.Deserialize(_localFiles.Value); + return new List { _fileSerializer.Deserialize(_localFiles.Value) }; } // For now, assume IFileDeSerializer is remote. - return _fileSerializer.Deserialize(_remoteFilePaths); + return new List { _fileSerializer.Deserialize(_remoteFilePaths) }; } } diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/IInputPartition.cs b/lang/cs/Org.Apache.REEF.IO/PartitionedData/IInputPartition.cs index 33116cd617..ecd07e19e1 100644 --- a/lang/cs/Org.Apache.REEF.IO/PartitionedData/IInputPartition.cs +++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/IInputPartition.cs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +using System.Collections.Generic; using Org.Apache.REEF.Utilities.Attributes; namespace Org.Apache.REEF.IO.PartitionedData @@ -24,7 +25,8 @@ namespace Org.Apache.REEF.IO.PartitionedData /// /// Generic Type representing data pointer. /// For example, for data in local file it can be file pointer - public interface IInputPartition + [Unstable("API contract may change.")] + public interface IInputPartition { /// /// The id of the partition. @@ -32,15 +34,15 @@ public interface IInputPartition string Id { get; } /// - /// Caches the data locally, cached location is based on the implementation. + /// Caches the data based on the method parameter. Returns the actual cached level. /// - [Unstable("0.14", "Contract may change.")] - void Cache(); + [Unstable("0.15", "Contract may change.")] + CacheLevel Cache(CacheLevel level); /// /// Gives a pointer to the underlying partition. /// /// The pointer to the underlying partition - T GetPartitionHandle(); + IEnumerable GetPartitionHandle(); } } \ No newline at end of file diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/Random/RandomInputPartition.cs b/lang/cs/Org.Apache.REEF.IO/PartitionedData/Random/RandomInputPartition.cs index df55555293..f76de6cf1a 100644 --- a/lang/cs/Org.Apache.REEF.IO/PartitionedData/Random/RandomInputPartition.cs +++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/Random/RandomInputPartition.cs @@ -16,6 +16,7 @@ // under the License. using System; +using System.Collections.Generic; using System.Diagnostics; using System.IO; using Org.Apache.REEF.IO.PartitionedData.Random.Parameters; @@ -52,13 +53,18 @@ public string Id get { return _id; } } - public void Cache() + public CacheLevel Cache(CacheLevel level) { lock (_lock) { + if (level > CacheLevel.InMemoryMaterialized) + { + return CacheLevel.NotLocal; + } + if (_randomData.IsPresent()) { - return; + return CacheLevel.InMemoryMaterialized; } var random = new System.Random(); @@ -75,20 +81,22 @@ public void Cache() generatedData[index] = randomDoubleAsBytes[j]; } } + _randomData = Optional.Of(generatedData); + return CacheLevel.InMemoryMaterialized; } } - public Stream GetPartitionHandle() + public IEnumerable GetPartitionHandle() { lock (_lock) { if (!_randomData.IsPresent()) { - Cache(); + Cache(CacheLevel.InMemoryMaterialized); } - return new MemoryStream(_randomData.Value, false); + return new List { new MemoryStream(_randomData.Value, false) }; } } } diff --git a/lang/cs/Org.Apache.REEF.IO/SerializerBasedDataMover.cs b/lang/cs/Org.Apache.REEF.IO/SerializerBasedDataMover.cs new file mode 100644 index 0000000000..b5a2854c01 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IO/SerializerBasedDataMover.cs @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Collections.Generic; +using System.IO; +using System.Linq; +using Org.Apache.REEF.Utilities.Attributes; + +namespace Org.Apache.REEF.IO +{ + /// + /// A that uses to deserialize + /// data from . + /// + [Unstable("0.15", "API contract may change.")] + public abstract class SerializerBasedDataMover : IDataMover + { + private readonly ISerializer _serializer; + + protected SerializerBasedDataMover(ISerializer serializer) + { + _serializer = serializer; + } + + protected ISerializer Serializer + { + get { return _serializer; } + } + + public abstract DirectoryInfo RemoteToDisk(); + + public abstract IEnumerable RemoteToMemory(); + + public abstract IEnumerable RemoteToMaterialized(); + + public abstract IEnumerable DiskToMemory(DirectoryInfo info); + + public abstract IEnumerable DiskToMaterialized(DirectoryInfo info); + + public IEnumerable MemoryToMaterialized(IEnumerable streams) + { + return StreamToMaterialized(streams); + } + + /// + /// Uses an to deserialize data from , + /// where each stream is an object of type T. + /// + protected IEnumerable StreamToMaterialized(IEnumerable streams) + { + var deserialized = streams.Select(stream => _serializer.Deserialize(stream)).ToList(); + return deserialized.AsReadOnly(); + } + } +} \ No newline at end of file