# Copyright (C) 2012, Christof Buchbender
# BSD License (License.txt)
# standart library
import math
import os
import string
import sys
import matplotlib.pyplot as plt
import numpy as np
# from generaltools
from generaltools import log_tools
# from astrolyze
import main
import fits
import gildas
import miriad
import astrolyze.functions.constants as const
USER = os.getenv("USER")
[docs]class Stack(object):
r""" Class to work with several maps at once.
Allows to treat all images inside a folder simultaneously. This allows to
to perform the same transformations on all maps.
The images in the input folder can have arbitrary formats, units,
resolutions and other properties. The Stack class can help to unify the
different parameters of the maps to help comparing them.
Parameters
----------
folder : str
The folder containing the images that are to be treated as a stack
data_format : str
If not None filter images by format.
Examples
--------
The stack class can be used like builtin objects that are iterable e.g.:
>>> st = Stack("folder_name")
>>> list_map_names = [i.map_name for i in st]
.. note::
`Stack` is also the basis for the :py:mod:`astrolyze.sed.Sed` package.
"""
def __init__(self, folder, data_format=None):
r"""
Initialization of the stack. Reading in the maps and creating a list
of ``Map`` class objects, depending on the input format.
"""
self.log = log_tools.init_logger(
directory="/home/{}/.astrolyze/".format(USER),
name="astrolyze"
)
self.gildas_formats = ['gdf', 'mean', 'velo', 'width', 'lmv',
'lmv-clean']
self.fits_formats = ['fits']
self.miriad_formats = ['']
self.folder = folder
self.get_list(data_format=data_format)
print self.list
self.stack = []
for i in self.list:
self.stack += [self.get_map_format(i)]
self.resolutions = []
self.units = []
self.dataFormats = []
for i in self.stack:
print i
self.resolutions += [i.resolution]
self.units += [i.fluxUnit]
self.dataFormats = [i.dataFormat]
self.log = log_tools.init_logger(
directory="/home/{}/.astrolyze/".format(USER),
name="astrolyze"
)
def __repr__(self):
string = "Stack(\"{}\")\nContains:\n".format(self.folder)
for map_ in self.stack:
string += "{}\n".format(map_)
return string
def __len__(self):
return len(self.stack)
def __getitem__(self, position):
return self.stack[position]
def __iter__(self):
return (i for i in self.stack)
[docs] def get_list(self, data_format=None, depth=False):
r"""
Loading a list of files in all sub-folders.
Parameters
----------
folder : string
The path to the folder that has to be parsed.
data_format : string
Search for specific files containing the string, e.g.
'.fits'
depth : integer
The steps into the sub-folder system. Defaults to maximum depth.
Returns
-------
final_list : array
Array with the string to the files in the folder and sub folders.
folder_list :
Array with the strings to the folders. Only if depth is set.
"""
def get_sub_folder(folder):
r"""
Returns a list with the contents of a folder.
"""
miriad_subfolder = ['header', 'mask', 'image', 'history']
os.system('ls ' + str(folder) + ' > temp')
filein = open('temp').readlines()
list = []
for i in filein:
# We have to exclude sub_folders that are miriad file
# sub-folders.
if (i.strip() in miriad_subfolder
and len(folder.split('_')) >= 5):
pass
else:
list += [str(folder) + '/' + str(i.strip())]
os.system('rm -r temp')
return list
self.folder_list = [self.folder]
self.list = []
# Variable to steer the depth of the search.
x = 1
execute = True
while execute:
# Variables to check if the algorithm found folder
# and or files in the last iterations. If only files are
# found the loop is stopped.
folder_check = False
file_check = False
new_folder_list = []
for folder in self.folder_list:
list = get_sub_folder(folder)
for sub_folder in list:
if os.path.isdir(sub_folder):
new_folder_list += [sub_folder]
folder_check = True
if len(sub_folder.split('_')) >= 5:
self.list += [sub_folder]
if os.path.isfile(sub_folder):
if (data_format is not None and data_format
in sub_folder):
if sub_folder not in self.list:
self.list += [sub_folder]
if (data_format is None):
self.list += [sub_folder]
file_check = True
if not folder_check and file_check:
execute = False
x = x + 1
if x > depth and depth:
execute = False
if x > 1000:
print 'Not all folders contain files.'
execute = False
# Replace the list of folders from last iteration with the
# list of folders of the next.
self.folder_list = new_folder_list
[docs] def copy_structure(self, old_prefix, new_prefix):
r"""
Copies a folder structure from old_prefix to new_prefix. To assure all
folders exists before working with or copying data.
Parameters
----------
list : list
A list containing the relative or absolute paths to files.
old_prefix : string
The old path to the folder structure that has to be copied. Has to
actually appear in all the strings in list.
new_prefix : string
The path to where the folder structure is to be copied.
Notes
-----
This is useful if one is working on many files stored in several
sub-folders retrieved using :py:func:`get_list`.
Examples
--------
Say the folder structure is like this
>>> ls ../modified/
co10/
co21/
>>> ls ../modfied/co10/
map1/
map2/
>>> ls ../modified/co21/
map1/
map2/
This can be copied to say ../even_more_modified by doing as follows:
>>> from astrolyze.maptools import *
>>> list = maptools.get_list(../modified)
>>> maptools.copy_structure(list, old_prefix='../modified',
>>> new_prefix='../even_more_modified')
"""
for item in list:
item = item.replace(old_prefix, new_prefix)
folder_parts = item.split('/')[:-1]
string = folder_parts[0]
folders = [string]
for i in folder_parts[1:]:
print i
string = string + '/' + i
folders += [string]
for i in folders:
print i
if not os.path.isdir(i):
print 'mkdir ' + i
os.system('mkdir ' + i)
else:
pass
[docs] def unify_resolutions(self, folder=None, resolution=False):
'''
Smoothing all maps to the same resolution; either the maximum
resolution found in the stack or a given resolution.
Parameters
----------
resolution : float or list
This may be either:
1. A list with three entries, i.e.
[[minor_fwhm], [major_fwhm], [position_angle]]
2. A list with two entries, position_angle defaults to 0, i.e.
[[minor_fwhm], [major_fwhm]]
3. A float. Same minor, major fwhm pa=0
folder : string
The path tot the folder in which the files are to be stored.
Notes
-----
The position angle of the output images is fixed to ``Zero`` and can
currently not be modified.
Depending on the map units different scaling normalizations have to be
used after smoothing such that the output units are correct. This
function tries to deduce the scaling by itself based on the unit that
is given in the map name. Be sure that this is correct otherwise the
flux values may be wrong in the output images.
'''
folder = check_folder_state(folder)
# Deduce the maximum resolution of the maps in the Stack.
max_major_fwhm = max(self.resolutions, key=lambda x: x[0])[0]
max_minor_fwhm = max(self.resolutions, key=lambda x: x[1])[1]
# Not well implemented the position angle is fixed.!!
pa = 0
new_stack = []
# This is executed if no specific output resolution is given.
if not resolution:
for map_ in self.stack:
# Check the necessary scaling.
if map_.fluxUnit.upper() in ['JYB']:
scaling = ''
if map_.fluxUnit.upper() in ['TMB', 'T', 'KKMS']:
scaling = '0.0'
old_resolution = map_.resolution
if map_.resolution != 'uk':
if folder is not None:
map_.prefix = folder
if map_.dataFormat not in self.miriad_formats:
if folder is not None:
map_.prefix = folder
map_ = map_.toMiriad()
elif map_.dataFormat in self.miriad_formats:
if folder is not None:
os.system('cp {0} {1}'.format(map_.map_name,
folder))
map_.prefix = folder
map_.map_name = map_.returnName()
old_map = map_.map_name
if (float(max_major_fwhm) != float(map_.resolution[0]) and
float(max_minor_fwhm) != float(map_.resolution[1])):
map_ = map_.smooth([max_major_fwhm, max_minor_fwhm,
pa], scale=scaling)
os.system('rm -rf ' +
str(map_.returnName(dataFormat='fits')))
map_ = map_.toFits()
new_stack += [map_]
os.system('rm -rf ' + map_.returnName(dataFormat=None))
os.system('rm -rf ' +
map_.returnName(dataFormat=None,
resolution=old_resolution))
elif (float(max_major_fwhm) == float(map_.resolution[0])
and
float(max_minor_fwhm) == float(map_.resolution[1])):
os.system('rm -rf ' +
str(map_.returnName(dataFormat='fits')))
map_ = map_.toFits()
new_stack += [map_]
os.system('rm -rf {}'.format(
map_.returnName(dataFormat=None))
)
if resolution:
for map_ in self.stack:
try:
if (resolution[0] > map_.resolution[0] or resolution[1] >
map_.resolution[1]):
print (map_.mapName + ' : resolution higher than new '
'one. --> Skip! ')
continue
except:
try:
if (resolution > map_.resolution[0] or resolution >
map_.resolution[1]):
print (map_.mapName + ' : resolution higher than '
'new one. --> Skip!')
continue
except:
pass
if folder is not None:
map_.prefix = folder
# change to miriad and save in new folder
map_ = map_.toMiriad()
map_ = map_.smooth(resolution, scale=scaling)
os.system('rm -rf ' + map_.returnName(dataFormat='fits'))
map_ = map_.toFits()
new_stack += [map_]
os.system('rm -rf ' + map_.returnName(dataFormat=None))
return new_stack
[docs] def unify_units(self, unit='JyB', folder=None, debug=True):
r"""
Change all maps in the stack to the same unit.
Parameters
----------
unit : string
See :py:func:`astrolyze.maps.fits.FitsMap.change_units`
for details.
folder : string
The target folder. By default the maps
are put into their current folder.
"""
# Assure that the folder-string has a '\' at the end.
folder = check_folder_state(folder)
# The stack has to be updated thus create a new container.
new_stack = []
for map_ in self.stack:
# The map format has to be fits for this to work.
if map_.dataFormat not in self.fits_formats:
if folder is not None:
map_.prefix = folder
map_ = map_.toFits()
elif map_.dataFormat in self.fits_formats:
map_.prefix = folder
map_ = map_.update_file()
old_map = map_.map_name
print 'old', old_map
map_ = map_.change_unit(unit, debug=debug)
print map_.map_name, old_map
if old_map.strip() != map_.map_name.strip():
os.system('rm -rf {}'.format(old_map))
new_stack += [map_]
return new_stack
[docs] def unify_dimensions(self, template=None, folder=None):
r""" Reproject all maps to the same central coordinates and map
dimensions. All properties of one template map are copied to all the
other maps using the ``"reproject"`` task of GILDAS.
Parameters
----------
template : string
Path to the map that will serve as a template, it may be one of
the input maps of the stack.
folder : string
Path to the folder where the output maps are stored.
"""
# Assure that the folder-string has a '\' at the end.
folder = check_folder_state(folder)
# Load the template map and convert it to Gildas if necessary.
template = self.get_map_format(template)
if template.dataFormat not in self.gildas_formats:
template = template.toGildas()
# The stack has to be updated thus create a new container.
new_stack = []
# Iteratively convert all maps.
for map_ in self.stack:
if map_.dataFormat not in self.gildas_formats:
if folder is not None:
map_.prefix = folder
map_ = map_.toGildas()
elif map_.dataFormat in self.gildas_formats:
os.system('cp {0} {1}'.format(map_.map_name, folder))
if folder is not None:
map_.prefix = folder
map_.map_name = map_.returnName()
old_map = map_.map_name
map_ = map_.reproject(template=template.map_name)
map_ = map_.toFits()
while len(map_.data) == 1:
map_.data = map_.data[0]
max_value = max(
map_.data[np.where(np.invert(np.isnan(map_.data)))]
)
min_value = min(
map_.data[np.where(np.invert(np.isnan(map_.data)))]
)
map_.header["NAXIS"] = 2
for i in [3, 4]:
del map_.header['NAXIS{}'.format(i)]
del map_.header['CDELT{}'.format(i)]
del map_.header['CRPIX{}'.format(i)]
del map_.header['CRVAL{}'.format(i)]
del map_.header['CTYPE{}'.format(i)]
del map_.header['CROTA{}'.format(i)]
map_.header['DATAMAX'] = max_value
map_.header['DATAMIN'] = min_value
map_ = map_.update_file()
new_stack += [map_]
os.system('rm -rf {}'.format(old_map))
os.system('rm -rf {}'.format(map_.returnName(dataFormat='gdf')))
return new_stack
[docs] def unify_projections(self, coordinate=None, angle=None, folder=None):
r"""
Changing the central coordinate and the rotation angle.
"""
# Assure that the folder-string has a '\' at the end.
folder = check_folder_state(folder)
# The stack has to be updated thus create a new container.
new_stack = []
# Iteratively convert all maps.
for map_ in self.stack:
if map_.dataFormat not in self.gildas_formats:
if folder is not None:
map_.prefix = folder
map_ = map_.toGildas()
elif map_.dataFormat in self.gildas_formats:
os.system('cp {0} {1}'.format(map_.map_name, folder))
if folder is not None:
map_.prefix = folder
map_.map_name = map_.returnName()
old_map = map_.map_name
if coordinate:
map_ = map_.reproject(coord=coordinate)
if angle:
map_ = map_.goRot(angle)
map_ = map_.toFits()
new_stack += [map_]
os.system('rm -rf {}'.format(old_map))
os.system('rm -rf {}'.format(map_.returnName(dataFormat='gdf')))
return new_stack
[docs] def pixel_pixel_compare(self, folder=None, plot=False, tol=1e6):
r""" Producing a pixel-to-pixel comparison for all combinations
possible between the maps of the stack.
Parameters
----------
folder : string
Path to the folder where the text files with the pixel
to pixel comparisons are stored
plot : [True | False]
Decides whether to produce pixel-to-pixel plots directly.
tol : float
The tolerance for the maximum difference between the
values of the pixel of two compared maps. Default to 1e6.
Returns
-------
Created text files in the specified folders that contain two columns
with the pixel-to-pixel comparisons. These can be used
"""
def _scatter_plot(folder):
r""" This plots the pixel_to_pixel comparisons.
Parameters
----------
folder : string
Path to the folder where the images should be stored.
"""
folder = check_folder_state(folder)
colors = ['green', 'red', 'black', 'yellow', 'blue', 'navy']
os.system('ls > temp')
filein = open('temp').readlines()
if 'scatterPlots\n' in filein:
os.system('rm -rf scatterPlots/*.*')
pass
else:
os.system('mkdir scatterPlots')
os.system('rm temp')
for j in list:
plt.clf()
os.system('ls ' + str(j) + ' > temp')
filein = open('temp').readlines()
a = 0
for i in filein:
name = (i.split('.')[0].split('_')[0] + '_' +
i.split('.')[0].split('_')[1])
region = i.split('.')[0].split('_')[2]
filein = open(j + '/' + i.strip()).readlines()
x = []
y = []
for k in filein:
x += [k.split()[0]]
y += [k.split()[1]]
plt.plot(x, y, 'x', color=colors[a], label=region)
a = a + 1
plt.legend(numpoints=1, loc='lower right')
plt.xlabel(str(name.split('_')[0]))
plt.ylabel(str(name.split('_')[1]))
plt.savefig('scatterPlots/' + name + '.eps', dpi=None,
facecolor='w', edgecolor='w',
orientation='portrait', papertype='a4',
format='eps', transparent=False,
bbox_inches='tight')
folder = check_folder_state(folder)
self.unify_formats()
for i in range(len(list)-1):
item = list.pop()
i1 = item.strip().split('.')
dataFormat = i1.pop()
print dataFormat
for j in list:
i1 = j.strip().split('.')
print j
dataFormat = i1.pop()
print dataFormat
if dataFormat == 'fits':
map = mapClassFits.fitsMap(j)
map = map.toMiriad()
j = map.mapName
if dataFormat == 'gdf':
map = mapClassGildas.gildasMap(j)
map = map.toMiriad()
j = map.mapName
first = mapClassMiriad.miriadMap(str(item))
second = mapClassMiriad.miriadMap(str(j))
os.system('mkdir ' + str(folder) + '/' + str(one.species) +
'_' + str(two.species))
os.system('imcmp in1=' + str(item) + ' in2=' + str(j) +
'log='+str(folder) + '/' + str(one.species) +
'_'+str(two.species) + '/' + str(one.species) + '_' +
str(two.species) + '_' + str(one.comments[0]) +
'.log tol='+str(tol))
# Some help functions
def check_folder_state(folder):
if folder is not None and '/' not in folder[-1]:
folder = folder + '/'
# If it does not exist we create it.
if folder is not None and not os.path.isdir(folder):
os.system('mkdir {}'.format(folder))
return folder