"""
Script to generate polygons from contours and then use
polygons to get source positions and flux densities
"""
import math
import sys
import timeit
from multiprocessing import Process, Queue
from operator import attrgetter, itemgetter
from optparse import OptionParser
import numpy as np
from astropy import units as u
from astropy import wcs
from astropy.coordinates import Angle
from astropy.io import fits
from astropy.wcs import WCS
from shapely.geometry import LineString, Polygon
from shapely.ops import polylabel
from skimage import measure
from skimage.draw import polygon as skimage_polygon
from tw_source_finder.beam_to_pixels import calculate_area
from tw_source_finder.breizorro_extract import make_noise_map
from tw_source_finder.check_array import check_array
from tw_source_finder.coords import rad_to_dms, rad_to_hms
from tw_source_finder.process_polygon_data import maxDist, simple_distance
# 'boiler plate' function for handling Queue-based
# parallel processing
[docs]def contour_worker(input, output):
for func, args in iter(input.get, "STOP"):
result = func(*args)
output.put(result)
# function for processing individual polygons / contours to
# get contained flux densities as well as determine if a source is
# resolved or extended
[docs]def Process_contour(x, y):
source_pos = (
-10000.0,
0,
) # return something even if our result is garbage
rr, cc = skimage_polygon(x, y)
data_result = orig_image[rr, cc]
# print('raw sum', data_result.sum())
sum = data_result.sum() / pixels_beam
# print('normalized sum', sum)
contained_points = len(rr) # needed if we want a flux density error
point_source = False
try:
max_signal = data_result.max()
except:
print("failure to get maximum within contour ")
print("probably a contour at edge of image - skipping")
max_signal = 0.0
if max_signal >= limiting_flux:
beam_error = contained_points / pixels_beam * noise_out
ten_pc_error = 0.1 * sum
flux_density_error = math.sqrt(ten_pc_error * ten_pc_error + beam_error * beam_error)
contour = []
for i in range(len(x)):
contour.append([x[i], y[i]])
p = Polygon(contour)
centroid = p.centroid # gives position of object
use_max = 0
if p.contains(centroid):
# as usual, stupid x and y cordinate switching problem
lon, lat = w.all_pix2world(centroid.y, centroid.x, 0)
else:
print(" ")
polylabel_location = polylabel(p, tolerance=1)
# print('polylabel_location', polylabel_location.x, polylabel_location.y)
x_pos = int(polylabel_location.x)
y_pos = int(polylabel_location.y)
data_max = orig_image[x_pos, y_pos]
# print('polylabel modified x_pos,y_pos, data_max', x_pos, y_pos, data_max)
use_max = 1
# print('centroid is ', centroid)
# print('centroid lies outside polygon - looking for maximum')
location = np.unravel_index(np.argmax(data_result, axis=None), data_result.shape)
# print('location', location)
x_pos = rr[location]
y_pos = cc[location]
data_max = orig_image[x_pos, y_pos]
# print('modified x_pos,y_pos, data_max', x_pos, y_pos, data_max)
lon, lat = w.all_pix2world(y_pos, x_pos, 0)
# do some formatting
# lon = Angle(lon, unit = u.deg)
# lat = Angle(lat, unit = u.deg)
# print('lat ', lat)
h, m, s = rad_to_hms(math.radians(lon))
if h < 10:
h = "0" + str(h)
else:
h = str(h)
if m < 10:
m = "0" + str(m)
else:
m = str(m)
s = round(s, 2)
if s < 10:
s = "0" + str(s)
else:
s = str(s)
if len(s) < 5:
s = s + "0"
h_m_s = h + ":" + m + ":" + s
d, m, s = rad_to_dms(math.radians(lat))
if d >= 0 and d < 10:
d = "0" + str(d)
elif d < 0 and abs(d) < 10:
d = "-0" + str(d)
else:
d = str(d)
if m < 10:
m = "0" + str(m)
else:
m = str(m)
s = round(s, 2)
if s < 10:
s = "0" + str(s)
else:
s = str(s)
if len(s) < 5:
s = s + "0"
d_m_s = d + ":" + m + ":" + s
source = h_m_s + ", " + d_m_s + ", " + str(lon) + ", " + str(lat)
# source = str(lon) + ', ' + str(lat)
# get source size
result = maxDist(contour, pixel_size)
ang_size = result[0]
pos_angle = result[1]
# print('raw angular size is ', ang_size)
# print('raw position angle is ', pos_angle)
# print('mean beam size is ', mean_beam)
sigma = mean_beam / 2.355 # 2.355 = 2 * sqrt(2 * ln 2)
ang_size = ang_size - 4 * sigma # 2 signa level ~ 10 % level of beam
if ang_size < 0.0:
ang_size = 0.0
# diff = ang_size*ang_size - mean_beam*mean_beam
# if diff > 0.0:
# ang_size = math.sqrt(ang_size*ang_size - mean_beam*mean_beam)
# else:
# ang_size = 0.0
# print('sum max_signal', sum, max_signal)
ratio = np.abs((sum - max_signal) / sum)
# print('ratio is ', ratio)
# print('corrected angular size is ', ang_size)
# test for point source
# if (ratio <= 0.2) or (ang_size < 0.0):
if (ratio <= 0.2) or (ang_size < 0.5 * mean_beam):
# if ang_size <= 0.5 * mean_beam:
point_source = True
# also test for point source
if contained_points < pixels_beam:
point_source = True
sum = max_signal
if point_source:
# output = source + ', ' + str(round(sum*1000,3)) + ', ' + str(round(flux_density_error*1000,4)) + ', ' +str(0.0) + ', ' +str(0.0)
output = (
source
+ ","
+ str(sum)
+ ","
+ str(flux_density_error)
+ ","
+ str(0.0)
+ ","
+ str(0.0)
)
else:
ang = round(ang_size, 2)
pa = round(pos_angle, 2)
# output = source + ', ' + str(round(sum*1000,3)) + ', ' + str(round(flux_density_error*1000,4)) + ', ' + str(ang) + ', ' + str(pa)
output = (
source
+ ","
+ str(sum)
+ ","
+ str(flux_density_error)
+ ","
+ str(ang)
+ ","
+ str(pa)
)
source_pos = (lon, output, use_max) # we will sort on the lon (ra)
return source_pos
# function that uses input parameters to set up things for distributed
# and parallel data processing
[docs]def generate_source_list(filename, threshold_value, noise):
global orig_image, pixels_beam, w, pixel_size, mean_beam, noise_out, limiting_flux
lowercase_fits = filename.lower()
loc = lowercase_fits.find(".fits")
outfile = filename[:loc] + "_source_list.txt"
f = open(outfile, "w")
output = "# processing fits image " + filename + " \n"
print(output)
f.write(output)
print("threshold value", threshold_value)
print("default noise (Jy)", noise)
hdu_list = fits.open(filename)
hdu = hdu_list[0]
# print('original fits header', hdu.header)
w = WCS(hdu.header)
w = w.celestial
orig_image = check_array(hdu.data)
nans = np.isnan(orig_image)
orig_image[nans] = 0
print("original image max signal", orig_image.max())
# the 'orig_image' can be defined as a global variable as it is only
# used in read-only mode. So it can be shared over separate cores / threads
# without worrying that something can get over-written during processing
incoming_dimensions = hdu.header["NAXIS"]
pixel_size = hdu.header["CDELT2"] * 3600.0
print("pixel size arcsec = ", pixel_size)
bmaj = hdu.header["BMAJ"] * 3600.0
bmin = hdu.header["BMIN"] * 3600.0
mean_beam = 0.5 * (bmaj + bmin)
output = "# mean beam size (arcsec) " + str(round(mean_beam, 2)) + "\n"
f.write(output)
pixels_beam = calculate_area(bmaj, bmin, pixel_size)
output = "# calculated pixels per beam " + str(round(pixels_beam, 2)) + "\n"
f.write(output)
if noise == 0.0:
print("determining noise in image - this may take some time ...")
noise_out = make_noise_map(orig_image)
print("make_mask: breizorro median noise ", noise_out)
else:
noise_out = noise
output = "# noise out " + str(noise_out) + " Jy\n"
f.write(output)
limiting_flux = noise_out * threshold_value
print("limiting flux", limiting_flux)
output = "# limiting_flux " + str(limiting_flux * 1000) + " mJy\n"
f.write(output)
mask = np.where(orig_image > limiting_flux, 1.0, 0.0)
mask = mask.astype("float32")
# print('image mask hdu_header', hdu.header)
hdu.data = mask
hdu.header["DATAMIN"] = 0.0
hdu.header["DATAMAX"] = 1.0
outfile = "orig_image_mask.fits"
hdu.writeto(outfile, overwrite=True)
print("generated image mask")
contours = measure.find_contours(mask, 0.5)
print("number of potential sources ", len(contours))
# Start worker processes
num_processors = 1
if num_processors <= 2:
try:
import multiprocessing
processors = multiprocessing.cpu_count()
if processors > num_processors:
num_processors = processors
print("*** setting number of processors to", num_processors)
except:
pass
# set up tasks and queues
TASKS = []
for i in range(len(contours)):
contour = contours[i]
if len(contour) > 2:
x = []
y = []
for j in range(len(contour)):
x.append(contour[j][0])
y.append(contour[j][1])
TASKS.append((Process_contour, (x, y)))
task_queue = Queue()
done_queue = Queue()
# Submit tasks
print("submitting parallel tasks")
for task in TASKS:
task_queue.put(task)
for i in range(num_processors):
Process(target=contour_worker, args=(task_queue, done_queue)).start()
source_list = []
num_max = 0
# Get the results from parallel processing
for i in range(len(TASKS)):
source_pos = done_queue.get(timeout=300)
if source_pos[0] > -10000.0:
source_list.append(source_pos)
num_max = num_max + source_pos[2]
# Tell child processes to stop
for i in range(num_processors):
task_queue.put("STOP")
print("sorting ")
ra_sorted_list = sorted(source_list, key=itemgetter(0))
num_detected_sources = len(ra_sorted_list)
output = "# number of sources detected " + str(num_detected_sources) + "\n"
f.write(output)
output = "# number of souces using maximum for source position: " + str(num_max) + "\n"
f.write(output)
f.write("#\n")
output = "# source ra_hms dec_dms ra(deg) dec(deg) flux(mJy) error ang_size_(arcsec) pos_angle_(deg)\n"
output = (
"source,ra_hms,dec_dms,ra(deg),dec(deg),flux(Jy),error,ang_size_(arcsec),pos_angle_(deg)\n"
)
f.write(output)
for i in range(len(ra_sorted_list)):
output = str(i) + ", " + ra_sorted_list[i][1] + "\n"
f.write(output)
f.close()
# we are done
return
[docs]def main():
parser = OptionParser(usage="%prog [options] ")
parser.add_option(
"-f",
"--file",
dest="filename",
help="FITS file with radio image (default = None)",
default=None,
)
parser.add_option(
"-t",
"--threshold",
dest="threshold",
help="Threshold value for source detection in units of noise (default = 6)",
default=6,
)
parser.add_option(
"-n",
"--noise",
dest="noise",
help="noise specification in mJy, where noise cannot be found from image (default = 0)",
default=0.0,
)
(options, args) = parser.parse_args()
print("options", options)
filename = options.filename
noise = float(options.noise)
if noise > 0.0:
noise = noise / 1000.0 # convert to Jy
signal_flux = float(options.threshold)
start_time = timeit.default_timer()
generate_source_list(filename, signal_flux, noise)
print("get_source_list finished \n")
elapsed = timeit.default_timer() - start_time
print("Run Time:", elapsed, "seconds")
# =============================
#
# example: run as 'get_simple_source_list.py -f xyz.fits -t 6.5'
#