-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Relationship between SparkFits dataframe and source fits file? #91
Comments
Hi @GeoffDuniam Thanks for the detailed questions! Before starting, I must say that the treatment of image within spark-fits is still experimental compared to the more established case of Tables (but this is a good occasion for me to improve it!).
Correct
Correct
Yes correct, the 5694 first dataframe rows would correspond to the first image (i.e. first frequency channel), etc. But in order to manipulate images (instead of rows of images), you would have to reshape the Spark dataframe. This is a huge bottleneck in spark-fits as there are currently no easy solutions... The most naive approach would be to assign an index to each dataframe row, and grouping by this index assuming each image of the cube has the same size. This is something I could add in spark-fits (returning the image index for each row as an option). But the grouping operation would add (potentially large) overhead in execution time (involves data shuffling...). Another approach would be to force spark-fits to return one image per dataframe row - but it surely would take some time to implement.
Currently no it needs to be pulled from the fits header - spark-fits does use some part of the header data internally, but it is not exposed to the user. I usually use external library like astropy whenever I want to further explore the header. As a future improvement for spark-fits I could imaging we could return a dataframe with header values. Julien |
Hi Julien, Thanks for the reply, it clarifies a lot for us. We'll potentially be looking at large numbers of FITs image files over 1 TB and so we're having to investigate how best to get the data to the end-users. We're interested in SparkFits in that it gives a data frame we can save as a Parquet table on HDFS which gives us the ability to start processing with Spark on YARN, without having to extract the data from a FITs or Spectral cube (which would have a large overhead doing the transformations for a multi TB file). If we can configure the data with a Hive schema we can then use Spark SQL to pull subsets of the data into a Spark enabled Jupyter notebook for analysis. As you say, reshaping the SparkFits data frame into an easier format for end-users to manipulate is something of a challenge; we're investigating possibilities but at the moment no clear solution is presenting itself. We'll keep you posted on progress. Also, please do let us know if we can test any feature for you. Thanks again, Geoff |
Hi @GeoffDuniam, Thanks for reporting this issue! Which version of spark-fits did you use? Do you see the same offset using the 0.8.3 or the 0.8.4 (latest)? Would it be possible to reproduce the problem on a small subset of data and sharing it with me? It does not need to contain any physical information, and playing with the same data structure as you are using would help me to understand where the problem is coming from. Thanks! |
Hi Julien, No problem, I can take one channel of the data from one of the FITS files we're using - it'll be a 2d image, ~ 5500 x 5500 pixels. However, it just occurs to me (and confirmed by checking the Jupyter kernel we're using as well as the spark2-submit routines we're using as well), we're still on 0.7.2. That being the case before I get you to spend time checking this for us, it'd probably be worthwhile upgrading to 0.8.4 and reloading the data. Yes? Thanks again, Julien, appreciate your time. Let me know if you want the data or you'd rather wait until we upload the data again using 0.8.4. Cheers Geoff |
Hi @GeoffDuniam Thanks for your feedback. I would suggest to try with 0.8.4 first as there were important fixes made with respect to 0.7.2. Then if the problem persists, I would be happy to check on a small dataset. Note I will be offline from July 5 until the July 15. Best, |
Hi Julien, Upgraded our package to 0.8.4, and we're still seeing the same result. Probably good to give you some background - the FITS source we're using is one of two Stokes image cubes, with respective sizes of 307 and 835 GB. The cubes are nominally polarised 4-dimensional cubes (Ra, Dec, Polarisation, Frequency) but the cubes have only one level of polarisation, so they're actually 3-dimensional cubes. We're creating the RDD from the data frame with an index (return df.rdd.zipWithIndex() ) and we can't use .glom() on files of this size because we run into memory limitations on the cluster (3 masters, ten workers, one edge node supporting Jupyterhub). The index is there to calculate the specific frequency bands so we can physically partition the data on a Parquet table, and the index also allows us to reference the declination values which we pull out of the HDU. I've done an image extraction from both the data frame and the data stored on the Parquet table and the image still has that offset. I'll generate a subset of the smaller FITS cube and get it to you. Thanks again for the help, much appreciated. Cheers Geoff |
Hi Julien, I've got two FITS files that demonstrate the issue, one with two channels and one with five channels. Interestingly, the smaller two-channel FITS file displays the images correctly, whereas the five-channel file shows the offset. I'm not going to be able to attach them here, the smaller file is ~ 120 MB and we have a 10 MB limit here. I could put them up on dropbox and share them from there? Let me know. Thanks again and regards, Geoff |
Hi Julien, if it helps, I'll give you the links to download the original FITS files from Pawsey as well - the residual SB8170 cube is the big one, 835 GB. Both FITS files are public access. https://data.pawsey.org.au/download/SKAvis/residual.i.SB8170.cube.fits https://data.pawsey.org.au/download/SKAvis/residual.i.SB8170.cube.fits HTH. I'm getting the sub cubes up now and I'll let you know when they're available. Enjoy your break! Cheers Geoff |
Hi @GeoffDuniam, Thanks for the detailed explanations! Sharing the sub cubes (two-channel and five-channel files) via dropbox is fine by me. Let me know. Best, |
Hi Julien, No problem, I have them up on our Cloud Share here - drop me a mail with your address and I'll get the download link to you. geoff dot duniam (at) gmail Cheers Geoff |
Thanks @GeoffDuniam, thanks to the data you sent I think I understood the problem. Long story short: when writing data on disk, spark shuffle the data block (which are different from original images) - and when loading the data, the original data order is lost. The solution to this problem would be to attach the image index to each row when loading data with spark-fits. I will make this feature available in the next spark-fits release (hopefully by the end of the week). Below is a detailed summary of the problem and its solution (using the data you sent): from astropy.io import fits
import matplotlib.pyplot as plt
import numpy as np Loading data with spark-fitsLet's load images stored in FITS using the spark-fits connector: filename = '/path/to/Julien5.fits'
nfrequency = 5
df = spark.read.format('fits').option('hdu', 0).load(filename) This file contains several images split in rows: nrow = df.count()
msg = """
Total number of rows: {}
Number of frequencies: {}
Number of Dec measurements per frequency: {}
Number of RA measurements per frequency: {}
""".format(nrow, nfrequency, int(nrow/nfrequency), len(df.take(1)[0][0]))
print(msg)
# Take the first channel
channel = df.select('Image').take(int(nrow/nfrequency))
# Converting Row into list
channel = [i[0] for i in channel]
# plot the image
plt.imshow(channel) Comparison with standard fits toolsLet's load the same image data with standard FITS tools from astropy datafits = fits.open(filename)
print('(Frequency, 1, Dec, RA)')
print(datafits[0].data.shape)
plt.imshow(datafits[0].data[0][0]) maxdiff = np.max(np.abs(np.nan_to_num(channel) - np.nan_to_num(datafits[0].data[0][0])))
print('Difference between spark-fits and standard tools: {}'.format(maxdiff))
Good - there is no difference between the data read by spark-fits and astropy! Saving in Parquet and reloading dataLet's now save the data read by spark-fits on disk in the parquet format (actually, we could save in any format available to Spark) df.write.parquet(filename.replace('.fits', '.parquet'), compression='none') Let's reload this data as-is, and plot the first image as we did previously df_parquet = spark.read.format('parquet').load(filename.replace('.fits', '.parquet')) # Take the first channel
channel2 = df_parquet.select('Image').take(int(nrow/nfrequency))
# Converting Row into list
channel2 = [i[0] for i in channel2]
# plot the image
plt.imshow(channel2) Oooooops! This time, the image is different, as if it was shifted or if the data was shuffled. What happened? To understand what happened, one needs to understand how data is distributed at each stage. Original FITS dataThe original FITS data is contained in the primary HDU. Each image is a 3D array (freq, dec, ra), and the data block is just the concatenation of all images. There is no ambiguity between different images. spark-fits partitioning logicWhen spark-fits reads data from disk, it pulls data by blocks (aka partitions). Each block size is determined by (1) the underlined storage system if the data is already distributed and/or (2) Spark internal constraints to optimize the data I/O operations. But each block does not have to follow the logic of the data itself: each block has a sequence of rows that is part of an image. Hence, there is no guarantee that the blocks created by spark-fits follow the original images. However, if the data is contained in a single file, spark-fits still follows the order of the data (i.e. it first reads the first row of the first image, the second row of the first image, etc.), hence the user has still the illusion to manipulate data that is ordered as the original FITS file. Saving data on disk, and reloadingWhen saving the data, each spark mapper will write its data blocks on disk. Mappers are not ordered - so the write operation is a competition between them! Hence the mapper which has data for the first image is not guaranteed to write first (that's the beauty of distributed computation). Spark will write on disk all blocks previously defined by spark-fits, but in a complete disorder! That's very much like a puzzle. When the user re-read the data from disk, the blocks are still the same, but their order is completely randomized. Which solution then?As is, once the data is loaded there is no way to re-assemble the data as the order is lost during blocks creation. This is something I did not envisage as I was not really using images with spark-fits. The simplest solution would be to add an image index to each row upon DataFrame creation with spark-fits such that the user can repartition the data accordingly. Fixing the partitioning (new in spark-fits 0.9.0)We save (unordered) data on disk, and reload it using Spark. The data is then repartitioned to put one image per partition. This feature is only available in spark-fits 0.9+. # repartition according to the image index
df_repartitioned = df.repartition('ImgIndex')
# Write on disk
df_repartitioned.write.parquet(filename.replace('.fits', '2.parquet'), compression='none')
# Reload
df_parquet = spark.read.format('parquet').load(filename.replace('.fits', '2.parquet'))
# Take the first channel
channel2 = df_parquet.take(int(nrow/nfrequency))
# Converting Row into list
channel2 = [i[0] for i in channel2]
# plot the image
plt.imshow(channel2) maxdiff = np.max(np.abs(np.nan_to_num(channel) - np.nan_to_num(channel2)))
print('Difference between fits and parquet: {}'.format(maxdiff))
Note that with this repartitioning, you can even manipulate each image separately using # 1 partition = 1 image
df.rdd.mapPartition(image_processing_function) |
Add image index to each row when reading images
Just started looking at SparkFits for a large scale data modelling and analysis project for radio astronomy - we'll be dealing with very large image cubes - (currently testing on a 350gb Stokes 3D fits cube but will be testing further on 1TB - multi TB fits files in future).
Apologies if this is a fairly simple question, but the documentation isn't very clear - how does SparkFits instantiate the data frame from a 3D cube? ie we have an experimental 3D image cube (~350 TB), covering a sky coordinate area of ~ 10 x 10 degrees (Right Ascension and Declination) over ~2590 frequency channels. Pixel measurements are in Hz. The cube dimensions - Ra 5,607, dec 5,694 and Freq channels 2,592 so the cube shape (Ra, Dec, Freq) is (5607, 5692, 2592).
The data frame has 14,544,168 records, each image record is an array of 5607 elements, so if the data frame was expressed as a 2D matrix, it would have the shape of (14544168, 5607).
Each value in each image array is a pixel value in Hz for a specific Ra, Dec and Frequency channel, correct?
Given that the image column in each row is 5607 elements, is it correct to assume each row represents the pixel values for all Ra positions for one specific Dec and Frequency channel value?
Assuming the above is correct, an image for one specific frequency channel would be a 2D array (dec, Ra) of (5694, 5607) - within the SparkFits data frame, how would this be extracted? Rows 1-5694 for one specific frequency channel, subsequent groups of 5694 rows for subsequent frequency channels?
And is it possible to extract the actual values for the Ra, Dec and Frequency values from the data frame, or does this need to be pulled from the fits header?
Again, apologies if these are simple questions - and thanks for your time.
The text was updated successfully, but these errors were encountered: