-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsh.script
35 lines (34 loc) · 2.03 KB
/
sh.script
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.functions._
import com.sparkfits.fits._
import healpix.essentials.HealpixBase
import healpix.essentials.Pointing
import healpix.essentials.Scheme.RING
class ExtPointing extends healpix.essentials.Pointing with java.io.Serializable
case class HealpixGrid(nside : Int, hp : healpix.essentials.HealpixBase, ptg : ExtPointing) {def index(ra : Double, dec : Double) : Long = {ptg.theta = ra; ptg.phi = dec; hp.ang2pix(ptg)}}
case class Point2D(ra: Double, dec: Double)
case class Point3D(ra: Double, dec: Double, z : Double)
def dec2theta(dec : Double) : Double = {Math.PI / 2.0 - Math.PI / 180.0 * dec}
def ra2phi(ra : Double) : Double = {Math.PI / 180.0 * ra}
def convertToPoint2D(x : Array[String]) : Point2D = {Point2D(x(0).toDouble, x(1).toDouble)}
val catalogFilename = "hdfs://134.158.75.222:8020//lsst/LSST1Y"
val nside = 512
val session = SparkSession.builder().getOrCreate()
import session.implicits._
var ptg = new ExtPointing
val hp = new HealpixBase(nside, RING)
val grid = HealpixGrid(nside, hp, ptg)
val df = session.readfits.option("datatype", "table").option("HDU", 1).load(catalogFilename)
val df_index = df.select($"RA", $"Dec", ($"Z_COSMO").as("z")).as[Point3D]
df_index.cache()
val firstShell = 0.1; val lastShell = 0.5; val shells:Int = 10; val d:Double = (lastShell - firstShell) / shells
ptg.phi = 0.0; ptg.theta = 0.0; val radius = 0.02
val selectedPixels = grid.hp.queryDiscInclusive(ptg, radius, 4).toArray
print(selectedPixels.length)
val sp = session.sparkContext.broadcast(selectedPixels)
val df1 = df_index.map(x => ((shells * (x.z - firstShell) / (lastShell - firstShell)).toInt, grid.index(dec2theta(x.dec), ra2phi(x.ra) ), x) )
val df2 = df1.filter(x => sp.value.contains(x._2)) // select pixels touching the selected disk
val df3 = df2.map(x => (x._1, x._2, x._3)) // consider all points inside the disk
val df4 = df3.select($"_1".alias("shell_index"), $"_2".alias("pixel"), $"_3".alias("point"))
val df5 = df4.groupBy("shell_index")
val df6 = df5.agg(count($"pixel"))