Source code for flame_utils.core.element

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Operations about FLAME machine elements, lattice is a special element.
"""

import logging
import os
import re
import sys
import flame

import numpy as np

from collections import Counter

from flame_utils.misc import machine_setter
from flame_utils.misc import flatten
from flame_utils.misc import get_intersection
from flame_utils.misc import conf_update

__authors__ = "Tong Zhang"
__copyright__ = "(c) 2016-2017, Facility for Rare Isotope beams, " \
                "Michigan State University"
__contact__ = "Tong Zhang <zhangt@frib.msu.edu>"

_LOGGER = logging.getLogger(__name__)

STRIPPER_PROP_KEYS = ['IonChargeStates', 'NCharge']
SOURCE_PROP_KEYS = ['IonEk', 'IonEs', 'NCharge', 'IonChargeStates'] # field names, not including S[i], P[i]


[docs]def get_all_types(latfile=None, _machine=None): """Get all unique types from a FLAME machine or lattice file. Parameters ---------- latfile: FLAME lattice file. _machine: FLAME machine object. Returns ------- list None if failed, or list of valid element types' string names. """ m = machine_setter(latfile, _machine, 'get_all_types') if m is None: return None mconf = m.conf() a = np.asarray([i.get('type') for i in m.conf()['elements']]) _, idx = np.unique(a, return_index=True) return a[np.sort(idx)].tolist()
[docs]def get_all_names(latfile=None, _machine=None): """Get all uniqe names from a FLAME machine or lattice file. Parameters ---------- latfile : str FLAME lattice file. _machine : FLAME machine object. Returns ------- str or None None if failed, or list of valid element types' string names. """ m = machine_setter(latfile, _machine, 'get_all_names') if m is None: return None mconf = m.conf() a = np.asarray([i.get('name') for i in m.conf()['elements']]) _, idx = np.unique(a, return_index=True) return a[np.sort(idx)].tolist()
[docs]def inspect_lattice(latfile=None, out=None, _machine=None): """Inspect FLAME lattice file, print a lattice information report, if failed, print nothing. Parameters ---------- latfile : FLAME lattice file. out : output stream, stdout by default. _machine : FLAME machine object. Returns ------- None None if failed, or print information. Examples -------- >>> from flame import Machine >>> from phantasy import flameutils >>> latfile = 'lattice/test.lat' >>> m = Machine(open(latfile, 'r')) >>> flameutils.inspect_lattice(_machine=m) Inspecting lattice: <machine> ============================== TYPE COUNT PERCENTAGE ------------------------------ SOURCE 1 0.08 STRIPPER 1 0.08 QUADRUPOLE 40 3.22 BPM 75 6.04 SOLENOID 78 6.28 SBEND 80 6.44 RFCAVITY 117 9.42 ORBTRIM 120 9.66 DRIFT 730 58.78 >>> # pass the latfile parameter >>> flameutils.inspect_lattice(latfile=latfile) Inspecting lattice: test.lat ============================== TYPE COUNT PERCENTAGE ------------------------------ SOURCE 1 0.08 STRIPPER 1 0.08 QUADRUPOLE 40 3.22 BPM 75 6.04 SOLENOID 78 6.28 SBEND 80 6.44 RFCAVITY 117 9.42 ORBTRIM 120 9.66 DRIFT 730 58.78 >>> >>> ## write inspection message to other streams >>> # write to file >>> fout = open('test.out', 'w') >>> flameutils.inspect_lattice(latfile=latfile, out=fout) >>> fout.close() >>> >>> # write to string >>> from StringIO import StringIO >>> sio = StringIO() >>> flameutils.inspect_lattice(latfile=latfile, out=sio) >>> retstr = sio.getvalue() """ if latfile is None: latfile = "<machine>" # data from machine, not lattice file m = machine_setter(latfile, _machine, 'inspect_lattice') if m is None: return None mconf = m.conf() mconfe = mconf['elements'] msize = len(mconfe) type_cnt = Counter([i.get('type') for i in mconfe]) etable = [(t, n, n / msize) for (t, n) in sorted(type_cnt.items(), key=lambda x: x[1])] out = sys.stdout if out is None else out print("Inspecting lattice: %s" % os.path.basename(latfile), file=out) print("=" * 30, file=out) print("{0:<11s} {1:<7s} {2:<10s}".format("TYPE", "COUNT", "PERCENTAGE"), file=out) print("-" * 30, file=out) for (t, n, p) in etable: outstr = "{t:<12s} {n:<5d} {p:^8.2f}".format(t=t.upper(), n=n, p=p * 100) print(outstr, file=out)
[docs]def get_element(latfile=None, index=None, name=None, type=None, **kws): """Inspect FLAME lattice element, return properties. Parameters ---------- latfile : str FLAME lattice file. index : int (list of) Element index(s). name : str (list of) Element name(s). type : str (list of) Element type(s). Keyword Arguments ----------------- _machine : FLAME machine object. _pattern : str Regex to search element name. Returns ------- res : list of dict or [] List of dict of properties or empty list Examples -------- >>> from flame import Machine >>> from phantasy import flameutils >>> latfile = 'lattice/test.lat' >>> ename = 'LS1_CA01:CAV4_D1150' >>> e = flameutils.get_element(name=ename, latfile=latfile) >>> print(e) [{'index': 27, 'properties': {'aper': 0.017, 'name': 'LS1_CA01:CAV4_D1150', 'f': 80500000.0, 'cavtype': '0.041QWR', 'L': 0.24, 'phi': 325.2, 'scl_fac': 0.819578, 'type': 'rfcavity'}}] >>> # use multiple filters, e.g. get all BPMs in the first 20 elements >>> e = flameutils.get_element(_machine=m, index=range(20), type='bpm') >>> print(e) [{'index': 18, 'properties': {'name': 'LS1_CA01:BPM_D1144', 'type': 'bpm'}}, {'index': 5, 'properties': {'name': 'LS1_CA01:BPM_D1129', 'type': 'bpm'}}] >>> # all these filters could be used together, return [] if found nothing >>> >>> # get names by regex >>> e = flameutils.get_element(_machine=m, _pattern='FS1_BBS:DH_D2394_1.?') >>> print(e) [{'index': 1092, 'properties': {'L': 0.104065, 'aper': 0.07, 'bg': 0.191062, 'name': 'FS1_BBS:DH_D2394_1', 'phi': 4.5, 'phi1': 7.0, 'phi2': 0.0, 'type': 'sbend'}}, {'index': 1101, 'properties': {'L': 0.104065, 'aper': 0.07, 'bg': 0.191062, 'name': 'FS1_BBS:DH_D2394_10', 'phi': 4.5, 'phi1': 0.0, 'phi2': 7.0, 'type': 'sbend'}}] Notes ----- - 1. If more than one optional paramters (index, name, type, _pattern) are provided, only return element that meets all these definitions. 2. If getting by multiple indices/names/types, the order of returned list is void. - Invalid element names or type names will be ignored. See Also -------- get_index_by_name, get_index_by_type :func:`.get_intersection` : Get the intersection of input valid list/tuple. """ _machine = kws.get('_machine', None) m = machine_setter(latfile, _machine, 'get_element') if m is None: return None if index is not None: if not isinstance(index, (list, tuple)): idx_from_index = index, else: idx_from_index = index else: idx_from_index = [] names = [] # name pattern _name_pattern, _name_list = kws.get('_pattern'), None if _name_pattern is not None: _name_list = get_names_by_pattern(pattern=_name_pattern, _machine=m) if _name_list is not None: names = _name_list if name is not None: if isinstance(name, str): names.append(name) elif isinstance(name, list): names.extend(name) if names: idx_from_name = list(flatten(get_index_by_name(names, _machine=m, rtype='list'))) else: idx_from_name = [] if type is not None: idx_from_type = list(flatten(get_index_by_type(type, _machine=m, rtype='list'))) else: idx_from_type = [] ele_idx = get_intersection(index=idx_from_index, name=idx_from_name, type=idx_from_type) if ele_idx == []: _LOGGER.warning("get_element: Nothing to get, invalid filtering.") return [] else: mconf = m.conf() mks = mconf.keys() share_keys = [k for k in mks if k not in ("elements", "name")] retval = [] for i in ele_idx: elem = m.conf(i) elem_k = set(elem.keys()).difference(share_keys) if elem.get('type') == 'stripper': [elem_k.add(k) for k in STRIPPER_PROP_KEYS] elif elem.get('type') == 'source': ndim_charge = len(elem.get('NCharge')) p = elem.get('vector_variable', None) s = elem.get('matrix_variable', None) for v in (s, p): if v is not None: [elem_k.add('{0}{1}'.format(v, i)) for i in range(ndim_charge)] [elem_k.add(k) for k in SOURCE_PROP_KEYS] elem_p = {k: elem.get(k) for k in elem_k} retval.append({'index': i, 'properties': elem_p}) retval.sort(key=lambda i:i['index']) return retval
[docs]def get_index_by_type(type='', latfile=None, rtype='dict', _machine=None): """Get element(s) index by type(s). Parameters ---------- type : str or list of str Single element type name or list[tuple] of element type names. latfile : FLAME lattice file, preferred. rtype : str Return type, 'dict' (default) or 'list'. _machine : FLAME machine object. Returns ------- ind : dict or list Dict, key is type name, value if indice list of each type name, list, of indices list, with the order of type. Notes ----- If *rtype* is ``list``, list of list would be returned instead of a dict, ``flatten()`` function could be used to flatten the list. Examples -------- >>> from flame import Machine >>> from phantasy import flameutils >>> latfile = 'lattice/test.lat' >>> m = Machine(open(latfile, 'r')) >>> types = 'stripper' >>> print(flameutils.get_index_by_type(type=types, latfile=latfile)) {'stripper': [891]} >>> print(flameutils.get_index_by_type(type=types, _machine=m)) {'stripper': [891]} >>> types = ['stripper', 'source'] >>> print(flameutils.get_index_by_type(type=types, latfile=latfile)) {'source': [0], 'stripper': [891]} >>> # return a list instead of dict >>> print(flameutils.get_index_by_type(type=types, latfile=latfile, rtype='list')) [[891], [0]] See Also -------- :func:`.flatten` : flatten recursive list. """ m = machine_setter(latfile, _machine, 'get_index_by_type') if m is None: return None if not isinstance(type, (list, tuple)): type = type, if rtype == 'dict': return {t: m.find(type=t) for t in type} else: # list return [m.find(type=t) for t in type]
[docs]def get_index_by_name(name='', latfile=None, rtype='dict', _machine=None): """Get index(s) by name(s). Parameters ---------- name : str or list of str Single element name or list[tuple] of element names latfile : FLAME lattice file, preferred. rtype : str Return type, 'dict' (default) or 'list'. _machine : FLAME machine object. Returns ------- ind : dict or list dict of element indices, key is name, value is index, list of element indices list Notes ----- If *rtype* is ``list``, list of list would be returned instead of a dict, ``flatten()`` function could be used to flatten the list. Examples -------- >>> from flame import Machine >>> from phantasy import flameutils >>> latfile = 'lattice/test.lat' >>> m = Machine(open(latfile, 'r')) >>> names = 'LS1_CA01:SOL1_D1131_1' >>> print(flameutils.get_index_by_name(name=names, latfile=latfile)) {'LS1_CA01:SOL1_D1131_1': [8]} >>> print(flameutils.get_index_by_name(name=names, _machine=m)) {'LS1_CA01:SOL1_D1131_1': [8]} >>> names = ['LS1_CA01:SOL1_D1131_1', 'LS1_CA01:CAV4_D1150', >>> 'LS1_WB01:BPM_D1286', 'LS1_CA01:BPM_D1144'] >>> print(flameutils.get_index_by_name(name=names, latfile=latfile)) {'LS1_CA01:SOL1_D1131_1': [8], 'LS1_WB01:BPM_D1286': [154], 'LS1_CA01:BPM_D1144': [18], 'LS1_CA01:CAV4_D1150': [27]} >>> # return a list instead of dict >>> print(flameutils.get_index_by_name(name=names, latfile=latfile, rtype='list')) [[8], [27], [154], [18]] See Also -------- :func:`.flatten` : flatten recursive list. """ m = machine_setter(latfile, _machine, 'get_index_by_name') if m is None: return None if not isinstance(name, (list, tuple)): name = name, if rtype == 'dict': return {n: m.find(name=n) for n in name} else: return [m.find(name=n) for n in name]
[docs]def get_names_by_pattern(pattern='.*', latfile=None, _machine=None): """Get element names by regex defined by *pattern*. Parameters ---------- pattern : str Regex to search element name. latfile : FLAME lattice file, preferred. _machine : FLAME machine object. Returns ------- names : List List of element names, if not found, return None. """ m = machine_setter(latfile, _machine, 'get_names_by_pattern') if m is None: return None econf = m.conf().get('elements') rp = re.compile(pattern) m_names = [e.get('name') for e in econf if rp.search(e.get('name'))] if m_names != []: return m_names else: return None
[docs]def insert_element(machine=None, index=None, element=None): """Insert new element to the machine. Parameters ---------- machine : FLAME machine object. index : Insert element before the index (or element name). element : Lattice element dictionary. e.g. {'name':xxx, 'type':yyy, 'L':zzz} Returns ------- machine : FLAME machine object. """ if machine is None: return None try: m = conf_update(machine) mconf = m.conf() except: _LOGGER.error("Failed to load FLAME machine object.") return None if index is not None and element is not None: if isinstance(index, str): index = m.find(name=index)[0] mconf['elements'].insert(index, element) new_m = flame.Machine(mconf) return new_m