Source code for macapype.nodes.segment

import os

import nibabel as nb
import numpy as np

from nipype.interfaces.base import (CommandLine, CommandLineInputSpec,
                                    TraitedSpec)
from nipype.interfaces.base import traits, File

from nipype.interfaces.base import (BaseInterface, BaseInterfaceInputSpec)
from nipype.utils.filemanip import split_filename as split_f

from scipy.ndimage import binary_fill_holes


class BinaryFillHolesInputSpec(BaseInterfaceInputSpec):

    in_file = File(
        exists=True,
        desc='3D Nifti image to fill',
        mandatory=True)

    size = traits.Int(
        3, usedefault=True,
        desc='Size of the filling square',
        mandatory=True)


class BinaryFillHolesOutputSpec(TraitedSpec):

    out_file = File(
        exists=True,
        desc="Filled 3D Nifti image"
    )


class BinaryFillHoles(BaseInterface):
    """ Fill holes of the given 3D Nifti image slice by slice

    Inputs
    --------
    image: File (3D Nifti)


    Outputs
    ---------
    out_file: File (3D Nifti)
        Result filled 3D Nifti image.

    """
    input_spec = BinaryFillHolesInputSpec
    output_spec = BinaryFillHolesOutputSpec

    def _run_interface(self, runtime):

        in_file = self.inputs.in_file
        size = self.inputs.size

        path, fname, ext = split_f(in_file)
        self.out_nii = os.path.abspath(fname + '_filled' + ext)

        nii_image = nb.load(in_file)

        struc = np.ones((size, size, size))

        nii_data = nii_image.get_data()

        nii_data = binary_fill_holes(nii_data, struc)
        out_image = nb.Nifti1Image(nii_data.astype(int),
                                   affine=nii_image._affine)
        nb.save(out_image, self.out_nii)

        return runtime

    def _list_outputs(self):
        outputs = self._outputs().get()
        outputs["out_file"] = self.out_nii

        return outputs


###############################################################################
# AtroposN4
class AtroposN4InputSpec(CommandLineInputSpec):

    dimension = traits.Int(
        3, default=True, usedefault=True,
        exists=True,
        desc='Dimension',
        mandatory=True, position=0, argstr="-d %d")

    brain_file = File(
        exists=True,
        desc='brain_file',
        mandatory=True, position=1, argstr="-a %s")

    brainmask_file = File(
        exists=True,
        desc='brainmask_file',
        mandatory=True, position=2, argstr="-x %s")

    numberOfClasses = traits.Int(
        3, usedefault=True,
        exists=True,
        desc='numberOfClasses',
        mandatory=True, position=3, argstr="-c %d")

    out_pref = traits.String(
        "segment_", usedefault=True, desc="output prefix", mandatory=True,
        position=-1,  argstr="-o %s")

    priors = traits.Either(
        traits.List(File(exists=True)), traits.String(),
        desc='template_file',
        mandatory=False, position=4, argstr="-p %s")

    prior_weight = traits.Float(
        0,
        desc='Atropos prior segmentation weight',
        mandatory=False, position=5, argstr="-w %f")


class AtroposN4OutputSpec(TraitedSpec):
    segmented_file = File(
        exists=True, desc="segmented file with all tissues")

    segmented_files = traits.List(
        File(exists=True), desc="segmented indivual files")


[docs]class AtroposN4(CommandLine): """Description: Wrap of antsAtroposN4.sh Inputs: Mandatory: dimension Int, default=3 , 'Dimension' brain_file File, 'brain_file' brainmask_file File, 'brainmask_file' numberOfClasses Int, default=3, 'numberOfClasses' out_pref: String, default = "segment_", "output prefix" Optional: priors List of Files, 'priors' (not used, will be replaced by template file) prior_weight: Float, default = 0, "Atropos prior segmentation weight" Outputs: brain_file: File, "extracted brain from AtroposN4.sh" """ input_spec = AtroposN4InputSpec output_spec = AtroposN4OutputSpec _cmd = 'bash antsAtroposN4.sh' def _format_arg(self, name, spec, value): import os import shutil cur_path = os.path.abspath("") print("***** In formating args for ", name) # all the files have to be in local and if name == 'brain_file' or name == 'brainmask_file': # requires local copy shutil.copy(value, cur_path) value = os.path.split(value)[1] elif name == 'priors': reg = "tmp_{}.nii.gz" print("***** Copying prior files in cur dir:", value) for i, prior_file in enumerate(value): new_prior = os.path.join( cur_path, reg.format("{:02d}".format(i+1))) print("prior {} copied as {}".format(prior_file, new_prior)) shutil.copy(prior_file, new_prior) value = reg.format("%02d") print('value = {}'.format(value)) elif name == 'out_pref': pth, fname, ext = split_f(self.inputs.brain_file) value = fname+'_' self.inputs.out_pref = value return super(AtroposN4, self)._format_arg(name, spec, value) def _list_outputs(self): import os import glob outputs = self._outputs().get() outputs['segmented_file'] = os.path.abspath( self.inputs.out_pref + "Segmentation.nii.gz") seg_files = glob.glob( self.inputs.out_pref + "SegmentationPosteriors*.nii.gz") print(seg_files) outputs['segmented_files'] = [os.path.abspath( seg_file) for seg_file in seg_files] return outputs
############################################################################### def merge_imgs(list_img_files): import nibabel as nib import numpy as np import os.path as op from nipype.utils.filemanip import split_filename as split_f if not isinstance(list_img_files, list): return list_img_files for i, img_file in enumerate(list_img_files): if i == 0: path, fname, ext = split_f(img_file) img = nib.load(img_file) img_data = img.get_fdata() new_img_data = np.zeros(shape=img_data.shape) new_img_data[img_data != 0] += img_data[img_data != 0] else: img_data = nib.load(img_file).get_fdata() assert img_data.shape == new_img_data.shape, \ "Error, shapes {} != {}".format(img_data.shape, new_img_data.shape) new_img_data[img_data != 0] += img_data[img_data != 0] # creating indexed_mask merged_img = nib.Nifti1Image(dataobj=new_img_data, affine=img.affine, header=img.header) # saving indexed_mask_file merged_img_file = op.abspath(fname + "_merged" + ext) nib.save(merged_img, merged_img_file) return merged_img_file def merge_masks(mask_csf_file, mask_wm_file, mask_gm_file, index_csf=1, index_gm=2, index_wm=3): import nibabel as nib import numpy as np import os.path as op from nipype.utils.filemanip import split_filename as split_f # csf (and init indexed_mask) path, fname, ext = split_f(mask_csf_file) mask_csf = nib.load(mask_csf_file) mask_csf_data = mask_csf.get_data() indexed_mask_data = np.zeros(shape=mask_csf_data.shape) indexed_mask_data[mask_csf_data == 1] = index_csf # gm mask_gm = nib.load(mask_gm_file) mask_gm_data = mask_gm.get_data() assert np.all(indexed_mask_data.shape == mask_gm_data.shape) indexed_mask_data[mask_gm_data == 1] = index_gm # wm mask_wm = nib.load(mask_wm_file) mask_wm_data = mask_wm.get_data() assert np.all(indexed_mask_data.shape == mask_wm_data.shape) indexed_mask_data[mask_wm_data == 1] = index_wm # creating indexed_mask indexed_mask = nib.Nifti1Image(dataobj=indexed_mask_data, affine=mask_csf.affine, header=mask_csf.header) # saving indexed_mask_file indexed_mask_file = op.abspath(fname+"_indexed_mask"+ext) nib.save(indexed_mask, indexed_mask_file) return indexed_mask_file def split_indexed_mask(nii_file, background_val=0): import os import nibabel as nib import numpy as np from nipype.utils.filemanip import split_filename as split_f path, fname, ext = split_f(nii_file) nii = nib.load(nii_file) nii_data = nii.get_data() list_split_files = [] for index in np.unique(nii_data): if index == background_val: continue split_file = os.path.abspath("{}_{}{}".format(fname, index, ext)) split_data = np.zeros(shape=nii_data.shape) split_data[nii_data == index] = 1 nib.save(nib.Nifti1Image(split_data, header=nii.header, affine=nii.affine), split_file) list_split_files.append(split_file) return list_split_files def copy_header(ref_img, img_to_modify): import os from nipype.utils.filemanip import split_filename as split_f path, fname, ext = split_f(img_to_modify) modified_img = os.path.abspath(fname + "_hd" + ext) os.system("CopyImageHeaderInformation {} {} {} 1 1 1".format( ref_img, img_to_modify, modified_img)) return modified_img if __name__ == '__main__': # path_to = "/hpc/crise/meunier.d" path_to = "/hpc/crise/meunier.d/Data/Data_Fred/test_Fred_dev" seg_path = os.path.join( path_to, "test_pipeline_single_indiv_params_ants_t1_template", "full_T1_ants_subpipes", "brain_segment_from_mask_T1_pipe") brain_file = os.path.join( seg_path, "register_NMT_pipe", "_session_01_subject_jazz", "norm_intensity", "sub-jazz_ses-01_T1w_roi_noise_corrected_masked_corrected.nii.gz") brainmask_file = os.path.join( seg_path, "segment_atropos_pipe", "_session_01_subject_jazz", "bin_norm_intensity", "sub-jazz_ses-01_T1w_roi_noise_corrected_masked_corrected_bin.nii.gz") priors = [os.path.join( seg_path, "register_NMT_pipe", "_session_01_subject_jazz", "align_seg_csf", "tmp_01_allineate.nii.gz"), os.path.join( seg_path, "register_NMT_pipe", "_session_01_subject_jazz", "align_seg_gm", "tmp_02_allineate.nii.gz"), os.path.join( seg_path, "register_NMT_pipe", "_session_01_subject_jazz", "align_seg_wm", "tmp_03_allineate.nii.gz")] seg_at = AtroposN4() seg_at.inputs.brain_file = brain_file seg_at.inputs.brainmask_file = brainmask_file seg_at.inputs.priors = priors val = seg_at.run().outputs print(val) def compute_5tt(gm_file, wm_file, csf_file): import os import nibabel as nib import numpy as np from nipype.utils.filemanip import split_filename as split_f path, fname, ext = split_f(gm_file) gm_img = nib.load(gm_file) gm_data = gm_img.get_fdata() empty_vol = np.zeros(shape=gm_data.shape) wm_data = nib.load(wm_file).get_fdata() csf_data = nib.load(csf_file).get_fdata() gen_5tt_data = np.stack((gm_data, empty_vol, wm_data, csf_data, empty_vol), axis=-1) gen_5tt_img = nib.Nifti1Image(gen_5tt_data, affine=gm_img.affine, header=gm_img.header) gen_5tt_file = os.path.abspath(fname + "_5tt" + ext) nib.save(gen_5tt_img, gen_5tt_file) return gen_5tt_file def correct_datatype(nii_file): import os import subprocess from nipype.utils.filemanip import split_filename as split_f path, fname, ext = split_f(nii_file) prefix = fname + "_correct" cmd_line = "nifti_tool -mod_hdr -mod_field datatype 8 \ -infiles {} -prefix {}".format(nii_file, prefix) subprocess.check_output(cmd_line, shell=True) correct_nii_file = os.path.abspath(prefix + ".nii") assert os.path.exists(correct_nii_file), \ "Error, {} should exists".format(correct_nii_file) return correct_nii_file def fill_list_vol(list_vol, nb_classes): import os import nibabel as nib import numpy as np assert isinstance(list_vol, list), \ "Error, {} should be a list...".format(list_vol) if len(list_vol) < nb_classes: first_img = nib.load(list_vol[0]) data = first_img.get_fdata() for new_img_index in range(len(list_vol), nb_classes): new_data = np.zeros(shape=data.shape) new_img_file = os.path.abspath( "empty_extra_img_{}.nii.gz".format(new_img_index)) nib.save(nib.Nifti1Image(dataobj=new_data, affine=first_img.affine, header=first_img.header), new_img_file) list_vol.append(new_img_file) return list_vol