Skip to content

Admin

civitas.admin

civitas.admin.COF

Bases: Model

Consequence of Failure

civitas.admin.CapitalProject

Bases: Model

Capital project object

civitas.admin.CapitalProjectFeatureCombination

Bases: Model

Capital project feature combinationobject

civitas.admin.CivicAddress

Bases: Model

Civic Address object

civitas.admin.Community

Bases: _Administrative

Administrative community

civitas.admin.CommunityAdmin

Bases: ModelAdmin

province

province(obj)

Return region

Parameters:

Name Type Description Default
obj Community
required

Returns:

Type Description
Source code in django_project/civitas/admin/community.py
def province(self, obj):
    """ Return region
    :param obj:
    :type obj: Community
    :return:
    """
    return obj.region.province

civitas.admin.Condition

Bases: _Term

Condition of feature 1 very good 2 good 3 fair 4 poor 5 very poor

civitas.admin.Deterioration

Bases: _Term

The process of becoming progressively worse. Contains name and the equation

civitas.admin.FeatureBase

Bases: Model

Model for base feature. Every feature needs to override this model

calculation

calculation()

Calculate all field that needs calculation

Source code in django_project/civitas/models/feature/feature_base.py
def calculation(self):
    """ Calculate all field that needs calculation"""
    if self.featurecalculation_set.all().count():
        return self.featurecalculation_set.all()[0]
    return None

lifespan

lifespan()

Return lifespan of feature

Source code in django_project/civitas/models/feature/feature_base.py
def lifespan(self):
    """ Return lifespan of feature"""
    if not self.type:
        return None
    return self.type.lifespan

civitas.admin.FeatureBaseAdmin

Bases: ModelAdmin

age

age(obj)

return age based on calculation

Source code in django_project/civitas/admin/feature/feature.py
def age(self, obj):
    """ return age based on calculation"""
    if obj.calculation():
        return obj.calculation().age
    return '-'

annual_reserve_cost

annual_reserve_cost(obj)

return annual reserve cost based on calculation

Source code in django_project/civitas/admin/feature/feature.py
def annual_reserve_cost(self, obj):
    """ return annual reserve cost based on calculation"""
    if obj.calculation():
        return obj.calculation().annual_reserve_cost()
    return '-'

estimated_maintenance_cost

estimated_maintenance_cost(obj)

return maintenance cost based on calculation

Source code in django_project/civitas/admin/feature/feature.py
def estimated_maintenance_cost(self, obj):
    """ return maintenance cost based on calculation"""
    if obj.calculation():
        return obj.calculation().maintenance_cost()
    return '-'

estimated_renewal_cost

estimated_renewal_cost(obj)

return renewal cost based on calculation

Source code in django_project/civitas/admin/feature/feature.py
def estimated_renewal_cost(self, obj):
    """ return renewal cost based on calculation"""
    if obj.calculation():
        return obj.calculation().renewal_cost()
    return '-'

remaining_life

remaining_life(obj)

return remaining life based on calculation

Source code in django_project/civitas/admin/feature/feature.py
def remaining_life(self, obj):
    """ return remaining life based on calculation"""
    if obj.calculation():
        return obj.calculation().remaining_life()
    return '-'

remaining_life_percent

remaining_life_percent(obj)

return remaining life percent based on calculation

Source code in django_project/civitas/admin/feature/feature.py
def remaining_life_percent(self, obj):
    """ return remaining life percent based on calculation"""
    if obj.calculation():
        return obj.calculation().remaining_life_percent()
    return '-'

civitas.admin.FeatureClass

Bases: _Term

The first Level of the Asset Hierarchy as defined in "Background" Sheet ie. TRN = Transportation

civitas.admin.FeatureGeometry

Bases: Model

Geometry of feature base

geometry

geometry()

return geometry of feature

Source code in django_project/civitas/models/feature/feature_geometry.py
def geometry(self):
    """ return geometry of feature """
    if self.geom_point:
        return self.geom_point
    if self.geom_line:
        return self.geom_line
    if self.geom_polygon:
        return self.geom_polygon

civitas.admin.FeatureProperty

Bases: Model

This is additional property for the feature

civitas.admin.FeatureSubClass

Bases: _Term

The second Level of the Asset Hierarchy as defined in "Background" Sheet ie. RD = Roads It is linked with AssetClass

civitas.admin.FeatureType

Bases: _Term

The third Level of the Asset Hierarchy as defined in "Background" Sheet

civitas.admin.FeatureTypeCombination

Bases: _Term

Combination between class, subclass and type

civitas.admin.POF

Bases: Model

Probability of Failure

civitas.admin.Province

Bases: _Administrative

Administrative province

civitas.admin.Region

Bases: _Administrative

Administrative region

civitas.admin.ReporterData

Bases: Model

Materialized View for reported data

by_community staticmethod

by_community(community: Community) -> dict

Return reporter_data by community

Source code in django_project/civitas/models/view/reporter_data.py
@staticmethod
def by_community(community: Community) -> dict:
    """
    Return reporter_data by community
    """
    return ReporterData.objects.filter(
        community_id=community.id
    )[0:2]

by_community_all staticmethod

by_community_all(community: Community) -> dict

Return reporter_data by community

Source code in django_project/civitas/models/view/reporter_data.py
@staticmethod
def by_community_all(community: Community) -> dict:
    """
    Return reporter_data by community
    """
    return ReporterData._showall(community)

dashboard staticmethod

dashboard(community: Community) -> dict

dashboard

Args: community (Community): community object

Returns: array: dictionary of each cost breakdown

Source code in django_project/civitas/models/view/reporter_data.py
@staticmethod
def dashboard(community: Community) -> dict:
    """_dashboard_

    Args:
        community (Community): community object

    Returns:
        array: dictionary of each cost breakdown
    """
    data = ReporterData.objects.filter(
        community_id=community.id
        ).exclude(
            system_name__in=[
                'Non-Municipal Infrastructure',
                'Abandoned Infrastructure'
                ]
        )

    _dashboard = DashboardData(data=data)  

    system_names = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name') \
        .distinct() \
        .annotate(label=Concat('system_name', models.Value('')))

    ##### Renewal costs of assets #####

    _renewal_cost_of_assets_a = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name', 'renewal_cost')

    _data_renewal= [{'system_name': val["system_name"], 'renewal_cost': val['renewal_cost']} for val in _renewal_cost_of_assets_a]

    renewal_sum = 0

    qs_renewal = []

    for name in system_names:
        qs_renewal.append({"system_name": name["system_name"], "label": name["label"],"values": 0.0})

    for x in _data_renewal:

        system_name = x["system_name"]

        if str(x["renewal_cost"]) == 'NaN' or not x["renewal_cost"]:
            x["renewal_cost"] = 0.00
        elif x["renewal_cost"]:
            renewal_sum = renewal_sum + float(x["renewal_cost"])
            index = next((index for (index, d) in enumerate(qs_renewal) if d["system_name"] == system_name), None)
            qs_renewal[index]["values"] = float(qs_renewal[index]["values"]) + float(x["renewal_cost"])

    #### End Renewal costs of assets ####

    ##### Maintenance #####

    _maintenance_cost_of_assets = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name', 'maintenance_cost')

    _data_maintenace = [{'system_name': val["system_name"], 'maintenance_cost': val['maintenance_cost']} for val in _maintenance_cost_of_assets]

    maintenance_sum = 0

    qs_maintenance = []

    for name in system_names:
        qs_maintenance.append({"system_name": name["system_name"], "label": name["label"],"values": 0.0})

    for x in _data_maintenace:

        system_name = x["system_name"]

        if str(x["maintenance_cost"]) == 'NaN' or not x["maintenance_cost"]:
            x["maintenance_cost"] = 0.00
        elif x["maintenance_cost"]:
            maintenance_sum = maintenance_sum + float(x["maintenance_cost"])
            index = next((index for (index, d) in enumerate(qs_maintenance) if d["system_name"] == system_name), None)
            qs_maintenance[index]["values"] = float(qs_maintenance[index]["values"]) + float(x["maintenance_cost"])

    #### End Maintenance ####

    #### Risk Renewal####

    _risk_renewal_of_assets = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name', 'risk_level', 'renewal_cost')

    _data_renewal_cost_of_assets = [{'system_name': val["system_name"], 'renewal_cost': val['renewal_cost'], 'risk_level': val['risk_level']} \
                                     for val in _risk_renewal_of_assets]

    qs_renewal_cost_of_assets = []

    for risk in _dashboard.risk_level_list:
        for name in system_names:
            qs_renewal_cost_of_assets.append({"system_name": name["system_name"], 
                                              "risk_level": risk["risk_level"], 
                                              "label": name["label"],
                                              "values": 0.0})

    risk_renewal_of_assets_sum = 0

    for x in _data_renewal_cost_of_assets:

        system_name = x["system_name"]

        risk_level = ''
        if not x['risk_level'] is None:
            risk_level = x["risk_level"].strip()
        else:
            risk_level = 'None'

        print("risk_level", risk_level)

        if str(x["renewal_cost"]) == 'NaN' or not x["renewal_cost"]:
            x["renewal_cost"] = 0.00
        else:
            risk_renewal_of_assets_sum = risk_renewal_of_assets_sum + Decimal(x["renewal_cost"])
        index = next((index for (index, d) in enumerate(qs_renewal_cost_of_assets) \
                       if d["system_name"] == system_name and d["risk_level"] == risk_level), None)
        if index :
            qs_renewal_cost_of_assets[index]["values"] = float(qs_renewal_cost_of_assets[index]["values"]) + float(x["renewal_cost"])

    #### END Risk Renewal####

    #### remaining years renewal cost ####

    _remaining_years_renewal_cost = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name', 'renewal_cost') 

    #### End remaining years renewal cost ####
    remaining_years_renewal_risk_cost = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name', 'risk_level') \
        .distinct() \
        .annotate(label=Concat('system_name', models.Value('')), values=Sum('renewal_cost'))

    ###### Annual Reserve  #########
    _annual_reserve = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name', 'annual_reserve') 

    _data_annual_reserve = [{'system_name': val["system_name"], 'annual_reserve': val['annual_reserve']} for val in _annual_reserve]

    annual_sum = 0

    qs_annual_data = []

    for name in system_names:
        qs_annual_data.append({"system_name": name["system_name"], "label": name["label"],"values": 0.0})

    for x in _data_annual_reserve:

        system_name = x["system_name"]

        annual_reserve = 0
        if str(x["annual_reserve"]) == 'NaN' or not x["annual_reserve"]:
            x["annual_reserve"] = 0.00
            annual_reserve = 0.00
        else:
            annual_reserve = x["annual_reserve"]

        annual_sum = annual_sum + Decimal(annual_reserve)

        index = next((index for (index, d) in enumerate(qs_annual_data) if d["system_name"] == system_name), None)
        qs_annual_data[index]["values"] = float(qs_annual_data[index]["values"]) + float(annual_reserve)

    ###### End Annual Reserve  #########

    qs = [
        {
            'id': 'renewal_cost', 
            "title": "Renewal costs of assets",
            "qs": qs_renewal, 
            'type': 'non_stacked', 
            'sum': {"sum": renewal_sum},
            'description': 'Replacement costs are based on current available costs and include the following components: Capital Costs - 65%, Contingency - 15%, Design - 10%, Inspections and Removal - 10%. As a starting point, a default replacement cost is applied for each asset type.  However, in some cases, where the above general formula is not applicable, or requires significantly less or more effort in one of the above areas, a custom cost might have been applied. This value will override the default value.'
        },              
        {
            'id': 'maintenenance_cost', 
            "title": "Maintenance costs of assets",
            "qs": qs_maintenance, 
            'type': 'non_stacked',
            'sum': {"sum": maintenance_sum},
            'description': 'Maintenance costs are the estimated annual cost to maintain assets. As a starting point, a default value of 10% of the renewal cost is used'
        },   
        {
            'id': 'annual_reserve', 
            "title": "Annual Average Infrastructure Demand",
            "qs": qs_annual_data, 
            'type': 'non_stacked',
            'sum': {'sum': annual_sum},
            'description':  "This graph uses lifespan projections and renewal costs for a long-term outlook of  infrastructure. This projection is theoretical and is not a realistic indication of spending timelines. A valuable output of this projection is an annualized infrastructure demand, indicated as a dotted line on the graph. This annualized value is obtained by dividing the renewal cost by the lifespan for each asset in the database and then summing the total. As lifespan and renewal cost data are updated, the annual infrastructure demand will update. The annual infrastructure demand could be lowered by committing to operations and maintenance programs to extend lifespans, deciding to rehabilitate versus replace, and more. The values shown in the graph is based on current $ values and the actual value of this average annual investment will increase over time with inflation."
        },   
        {
            'id': 'system_risk_renewal', 
            "title": "Risk By System",
            "qs": qs_renewal_cost_of_assets, 
            "formatted": _dashboard.stacked_a(qs_renewal_cost_of_assets),
            "total_bottom": _dashboard.stacked_a_total(_dashboard.stacked_a(qs_renewal_cost_of_assets)),
            'type': 'stacked_a',
            'sum': {"sum": risk_renewal_of_assets_sum},
            'description': 'A risk value is obtained by combining Probability of Failure  (PoF) and Consequence of Failure (CoF) values as per the following matrix. It is common asset management practice to shift the matrix in favour of the consequence of failure,'
        },        
        {
            'id': 'remaining_years_renewal_system', 
            "title": "Remaining Years by Renewal Cost by System",
            "qs": _remaining_years_renewal_cost, 
            "formatted": _dashboard.stacked_b_list(system_names),
            "total": _dashboard.stacked_b_total(_dashboard.stacked_b_list(system_names), system_names=system_names),
            "graph": _dashboard.stacked_b_graph(_dashboard.stacked_b_list(system_names), system_names=system_names),
            "pdf_table_1": _dashboard.pdf_table_1(_dashboard.stacked_b_list(system_names)),
            "pdf_table_2": _dashboard.pdf_table_2(_dashboard.stacked_b_list(system_names)),
            'type': 'stacked_b',
            'system_names': system_names,
            'sum': {"sum": risk_renewal_of_assets_sum},
            'description': ''
        },   
        {
            'id': 'remaining_years_renewal_risk', 
            "title": "Remaining Years by Renewal Cost by Risk",
            "qs": remaining_years_renewal_risk_cost, 
            "formatted":  _dashboard.stacked_c_list(),
            "graph":  _dashboard.stacked_c_graph( _dashboard.stacked_c_list()),
            "total": _dashboard.stacked_c_total( _dashboard.stacked_c_list()),
            "pdf_table_1": _dashboard.pdf_table_1(_dashboard.stacked_c_list()),
            "pdf_table_2": _dashboard.pdf_table_2(_dashboard.stacked_c_list()),
            'type': 'stacked_c',
            'risk_levels': _dashboard.risk_level_list,
            'sum': {"sum": risk_renewal_of_assets_sum},
            'description': ''
        },  
    ]

    return qs

summary_cof staticmethod

summary_cof(community: Community) -> dict

Return summary of cof for specific community The return will be grouped by class

Source code in django_project/civitas/models/view/reporter_data.py
@staticmethod
def summary_cof(community: Community) -> dict:
    """
    Return summary of cof for specific community
    The return will be grouped by class
    """

    cof_dict = ReporterData._summary(community, 'cof_name')

    for val in cof_dict:
        for x in val:
            if str(val[x]) == 'nan' or val[x] == 'NaN':
                val[x] = 0
    return cof_dict

summary_pof staticmethod

summary_pof(community: Community) -> dict

Return summary of pof for specific community The return will be grouped by class

Source code in django_project/civitas/models/view/reporter_data.py
@staticmethod
def summary_pof(community: Community) -> dict:
    """
    Return summary of pof for specific community
    The return will be grouped by class
    """
    pof_dict = ReporterData._summary(community, 'pof_name')

    for val in pof_dict:
        for x in val:
            if str(val[x]) == 'nan' or val[x] == 'NaN':
                val[x] = 0
    return pof_dict

summary_risk staticmethod

summary_risk(community: Community) -> dict

Return summary of risk for specific community The return will be grouped by class

Source code in django_project/civitas/models/view/reporter_data.py
@staticmethod
def summary_risk(community: Community) -> dict:
    """
    Return summary of risk for specific community
    The return will be grouped by class
    """

    risk_dict = ReporterData._summary(community, 'risk_level')

    for val in risk_dict:
        for x in val:
            if str(val[x]) == 'nan' or val[x] == 'NaN':
                val[x] = 0
    return risk_dict

civitas.admin.Risk

Bases: Model

Probability of Failure

civitas.admin.System

Bases: _Term

System is a collection of all of feature

civitas.admin.SystemAdmin

Bases: ModelAdmin

province

province(obj)

Return province

Parameters:

Name Type Description Default
obj System
required

Returns:

Type Description
Source code in django_project/civitas/admin/community.py
def province(self, obj):
    """ Return province
    :param obj:
    :type obj: System
    :return:
    """
    return obj.community.region.province

region

region(obj)

Return region

Parameters:

Name Type Description Default
obj System
required

Returns:

Type Description
Source code in django_project/civitas/admin/community.py
def region(self, obj):
    """ Return region
    :param obj:
    :type obj: System
    :return:
    """
    return obj.community.region

civitas.admin.Unit

Bases: _Term

Unit

API

civitas.api

civitas.api.AssetClassAPI

Bases: APIView

Return summary_pof of ReporterData

get

get(request, pk)

Return data of features

Source code in django_project/civitas/api/community.py
def get(self, request, pk):
    """ Return data of features """
    community = get_object_or_404(
        Community, pk=pk
    )

    data = []

    sql = f"""SELECT DISTINCT 
            mv_features.asset_sub_class, 
            mv_features.asset_identifier,
            mv_features.def_stylename,
            mv_features.type,
            asset_class.description 
            FROM asset_class 
            LEFT JOIN mv_features ON mv_features.asset_identifier = asset_class.name 
            WHERE mv_features.community_id = {pk};"""
    with connections[settings.CIVITAS_DATABASE].cursor() as cursor:
        cursor.execute(sql)
        rows = cursor.fetchall()
        for row in rows:

            asset_sub_class = row[0]
            asset_identifier = row[1]
            def_stylename = row[2]
            asset_type = row[3]
            asset_class = row[4]

            if not any(d['asset_class'] == asset_class for d in data):
                data.append({
                    "asset_class": asset_class, 
                    "asset_identifier": asset_identifier, 
                    "asset_sub_class": [{'asset': asset_sub_class, 'type': [asset_type]}], 
                    "def_stylename": [def_stylename]
                })
            else:
                index = [i for i,_ in enumerate(data) if _['asset_class'] == asset_class][0]    
                if not any(d['asset'] == asset_sub_class for d in data[index]["asset_sub_class"]):
                    data[index]["asset_sub_class"].append({'asset': asset_sub_class, 'type': [asset_type]})
                else:
                    _index = [i for i,_ in enumerate(data[index]["asset_sub_class"]) if _['asset'] == asset_sub_class][0]  
                    data[index]["asset_sub_class"][_index]["type"].append(asset_type)

    return Response(
        data
    )

civitas.api.AssetDataCustomAPI

Bases: APIView

Return summary_pof of ReporterData

get

get(request, pk)

Return data of features

Source code in django_project/civitas/api/community.py
def get(self, request, pk):
    """ Return data of features """
    community = get_object_or_404(
        Community, pk=pk
    )

    data = []

    sql = f"SELECT DISTINCT system_name, asset_identifier FROM mv_features WHERE community_id = {pk};"
    with connections[settings.CIVITAS_DATABASE].cursor() as cursor:
        cursor.execute(sql)
        rows = cursor.fetchall()
        for row in rows:
            system_name = row[0]
            asset_identifier = row[1]
            if not any(d['system_name'] == system_name for d in data):
                data.append({"system_name": system_name, "asset_identifier": [asset_identifier]})
            else:
                index = [i for i,_ in enumerate(data) if _['system_name'] == system_name][0]
                data[index]["asset_identifier"].append(asset_identifier)

    return Response(
        data
    )

post

post(request)

Return data of features

Source code in django_project/civitas/api/community.py
@method_decorator(csrf_exempt)
def post(self, request):
    """ Return data of features """
    pk = request.POST["pk"]
    selected  = request.POST.getlist("selected[]")

    community = get_object_or_404(
        Community, pk=pk
    )

    final_list = []
    for item in selected:
        final_list.append(ReporterData._showcustom(community, item))

    print(final_list)

    return Response(
        final_list
    )

civitas.api.AssetDataDefaultAPI

Bases: APIView

Return summary_pof of ReporterData

get

get(request, pk)

Return data of features

Source code in django_project/civitas/api/community.py
def get(self, request, pk):
    """ Return data of features """
    community = get_object_or_404(
        Community, pk=pk
    )

    return Response(
        ReporterData._showdefault(community)
    )

civitas.api.AssetDataDetailedAPI

Bases: APIView

Return summary_pof of ReporterData

get

get(request, pk)

Return data of features

Source code in django_project/civitas/api/community.py
def get(self, request, pk):
    """ Return data of features """
    community = get_object_or_404(
        Community, pk=pk
    )

    return Response(
        ReporterData._showdetailed(community)
    )

civitas.api.AssetDataDownloadAPI

Bases: APIView

Return summary_pof of ReporterData

get

get(request, pk)

Return data of features

Source code in django_project/civitas/api/community.py
def get(self, request, pk):
    """ Return data of features """
    community = get_object_or_404(
        Community, pk=pk
    )

    return Response(
        ReporterData._showall(community)
    )

civitas.api.AssetDataTable

AssetDataTable()

Bases: APIView

Return summary_pof of ReporterData

Source code in django_project/civitas/api/community.py
def __init__(self):
    self.features = None
    self.obj_paginator = None
    self.page_range = None
    self.community_id = None

get

get(request, pk, page_num)

Return data of features

Source code in django_project/civitas/api/community.py
def get(self, request, pk, page_num):
    """ Return data of features """
    self.community_id  = get_object_or_404(
        Community, pk=pk
    )
    #get data
    self.features = ReporterData._showall(self.community_id )
    per_page = 10
    self.obj_paginator = Paginator(self.features, per_page)
    page = self.obj_paginator.page(page_num).object_list
    self.page_range = self.obj_paginator.page_range

    context = {
        'page': list(page),
        'page_range': list(self.page_range)
    }

    return Response(context)

civitas.api.AssetSubClassAPI

Bases: APIView

Return summary_pof of ReporterData

get

get(request, pk)

Return data of features

Source code in django_project/civitas/api/community.py
def get(self, request, pk):
    """ Return data of features """
    community = get_object_or_404(
        Community, pk=pk
    )

    data = []

    sql = f"SELECT DISTINCT asset_sub_class, asset_identifier FROM mv_features WHERE community_id = {pk};"
    with connections[settings.CIVITAS_DATABASE].cursor() as cursor:
        cursor.execute(sql)
        rows = cursor.fetchall()
        for row in rows:
            asset_sub_class = row[0]
            asset_identifier = row[1]
            if not any(d['asset_sub_class'] == asset_sub_class for d in data):
                data.append({"asset_sub_class": asset_sub_class, "asset_identifier": [asset_identifier]})
            else:
                index = [i for i,_ in enumerate(data) if _['asset_sub_class'] == asset_sub_class][0]
                data[index]["asset_identifier"].append(asset_identifier)

    return Response(
        data
    )

civitas.api.Community

Bases: _Administrative

Administrative community

civitas.api.CommunityAPI

Bases: APIView

Return community list

get

get(request)

Return data of features

Source code in django_project/civitas/api/community.py
def get(self, request):
    """ Return data of features """
    return Response(
        CommunitySerializer(
            request.user.communities.order_by('name'), many=True
        ).data
    )

civitas.api.CommunityAccessPermission

Bases: BasePermission

Check if community is accessible by user

civitas.api.CommunityCOFAPI

Bases: APIView

Return summary_cof of ReporterData

get

get(request, pk)

Return data of features

Source code in django_project/civitas/api/community.py
def get(self, request, pk):
    """ Return data of features """
    community = get_object_or_404(
        Community, pk=pk
    )
    return Response(
        ReporterData.summary_cof(community)
    )

civitas.api.CommunityDetailAPI

Bases: APIView

Return community list

get

get(request, pk)

Return data of features

Source code in django_project/civitas/api/community.py
def get(self, request, pk):
    """ Return data of features """
    community = get_object_or_404(
        Community, pk=pk
    )
    detail = CommunityDetailSerializer(community).data
    organisation = community.organisation
    detail['organisation'] = OrganisationSerializer(
        organisation, user=request.user).data
    return Response(detail)

civitas.api.CommunityDetailSerializer

Bases: CommunitySerializer

get_systems

get_systems(obj)

Return system of community

Parameters:

Name Type Description Default
obj Community
required
Source code in django_project/civitas/serializer/community.py
def get_systems(self, obj):
    """ Return system of community
    :param obj:
    :type obj: Community
    """
    return SystemSerializer(obj.system_set.order_by('name'), many=True).data

civitas.api.CommunityPOFAPI

Bases: APIView

Return summary_pof of ReporterData

get

get(request, pk)

Return data of features

Source code in django_project/civitas/api/community.py
def get(self, request, pk):
    """ Return data of features """
    community = get_object_or_404(
        Community, pk=pk
    )
    return Response(
        ReporterData.summary_pof(community)
    )

civitas.api.CommunityRiskAPI

Bases: APIView

Return summary_risk of ReporterData

get

get(request, pk)

Return data of features

Source code in django_project/civitas/api/community.py
def get(self, request, pk):
    """ Return data of features """
    community = get_object_or_404(
        Community, pk=pk
    )
    return Response(
        ReporterData.summary_risk(community)
    )

civitas.api.CommunitySerializer

Bases: ModelSerializer

get_province

get_province(obj)

Return province of community

Parameters:

Name Type Description Default
obj Community
required
Source code in django_project/civitas/serializer/community.py
def get_province(self, obj):
    """ Return province of community
    :param obj:
    :type obj: Community
    """
    return obj.region.province.name

get_region

get_region(obj)

Return region of community

Parameters:

Name Type Description Default
obj Community
required
Source code in django_project/civitas/serializer/community.py
def get_region(self, obj):
    """ Return region of community
    :param obj:
    :type obj: Community
    """
    return obj.region.name

civitas.api.FeatureBase

Bases: Model

Model for base feature. Every feature needs to override this model

calculation

calculation()

Calculate all field that needs calculation

Source code in django_project/civitas/models/feature/feature_base.py
def calculation(self):
    """ Calculate all field that needs calculation"""
    if self.featurecalculation_set.all().count():
        return self.featurecalculation_set.all()[0]
    return None

lifespan

lifespan()

Return lifespan of feature

Source code in django_project/civitas/models/feature/feature_base.py
def lifespan(self):
    """ Return lifespan of feature"""
    if not self.type:
        return None
    return self.type.lifespan

civitas.api.FeatureDataGeoSerializer

Bases: GeoFeatureModelSerializer

get_geometry

get_geometry(obj)

Get geometry

Source code in django_project/civitas/serializer/features.py
def get_geometry(self, obj):
    """ Get geometry
    :type obj: FeatureBase
    """
    try:
        return obj.featuregeometry.geometry()
    except FeatureGeometry.DoesNotExist:
        return None

get_identifier

get_identifier(obj: FeatureBase)

identifier of object

Source code in django_project/civitas/serializer/features.py
def get_identifier(self, obj: FeatureBase):
    """ identifier of object
    :type obj: ReporterData
    """
    return (
        f'{obj.the_class.description}.'
        f'{obj.sub_class.name}.{obj.id}'
    )

to_representation

to_representation(obj: FeatureBase)

Additional properties

Source code in django_project/civitas/serializer/features.py
def to_representation(self, obj: FeatureBase):
    """ Additional properties
    :type obj: FeatureBase
    """
    data = super(FeatureDataGeoSerializer, self).to_representation(obj)
    try:
        reporter_data = ReporterData.objects.get(feature_id=obj.id)
    except ReporterData.DoesNotExist:
        reporter_data = None

    system = obj.system
    community = system.community if system else None
    region = community.region if community else None
    province = region.province if region else None
    data['properties']['feature_id'] = obj.id
    data['properties']['Label'] = obj.display_label
    data['properties']['Province'] = province.name if province else None
    data['properties']['Region'] = region.name if region else None
    data['properties']['Community'] = community.name if community else None
    data['properties']['System'] = system.name if system else None
    data['properties']['Asset Class'] = obj.the_class.description if obj.the_class else None
    data['properties']['Asset Sub Class'] = obj.sub_class.name if obj.sub_class else None
    data['properties']['Condition'] = obj.condition.name if obj.condition else None
    data['properties']['Type'] = obj.type.name if obj.type else None

    data['properties']['Description'] = obj.description
    for feature_property in obj.featureproperty_set.all():
        data['properties'][feature_property.property.name.title()] = feature_property.value_text
    data['properties']['Install Date'] = obj.install_date
    data['properties']['Inspection Date'] = obj.inspection_date

    data['properties']['Lifespan'] = reporter_data.lifespan if reporter_data else None
    data['properties']['Age'] = reporter_data.age if reporter_data else None
    data['properties']['Remaining Years'] = reporter_data.remaining_years if reporter_data else None
    data['properties']['Risk Level'] = reporter_data.risk_level if reporter_data else None
    data['properties']['Probability of Failure'] = reporter_data.pof_name if reporter_data else None
    data['properties']['Consequence of Failure'] = reporter_data.cof_name if reporter_data else None
    data['properties']['Renewal Cost'] = reporter_data.renewal_cost if reporter_data else None
    data['properties']['Annual Reserve'] = reporter_data.annual_reserve if reporter_data else None
    data['properties']['Maintenance Cost'] = reporter_data.maintenance_cost if reporter_data else None
    return data

civitas.api.FeatureGeojsonDetailAPI

Bases: APIView

get: Return geojson of a feature

get

get(request, pk)

Return data of features

Source code in django_project/civitas/api/features.py
def get(self, request, pk):
    """ Return data of features """
    feature = get_object_or_404(FeatureBase, pk=pk)
    return Response(
        FeatureGeometryGeoSerializer(feature).data
    )

civitas.api.FeatureGeometryGeoSerializer

Bases: GeoFeatureModelSerializer

get_cof

get_cof(obj)
Source code in django_project/civitas/serializer/features.py
def get_cof(self, obj):
    """
    :type obj: FeatureBase
    """
    return obj.cof.name if obj.cof else '-'

get_condition

get_condition(obj)
Source code in django_project/civitas/serializer/features.py
def get_condition(self, obj):
    """
    :type obj: FeatureBase
    """
    return obj.condition.name if obj.condition else '-'

get_geometry

get_geometry(obj)

Get geometry

Source code in django_project/civitas/serializer/features.py
def get_geometry(self, obj):
    """ Get geometry
    :type obj: FeatureBase
    """
    try:
        return obj.featuregeometry.geometry()
    except FeatureGeometry.DoesNotExist:
        return None

get_pof

get_pof(obj)
Source code in django_project/civitas/serializer/features.py
def get_pof(self, obj):
    """
    :type obj: FeatureBase
    """
    return obj.pof.name if obj.pof else '-'

get_quantity

get_quantity(obj)
Source code in django_project/civitas/serializer/features.py
def get_quantity(self, obj):
    """
    :type obj: FeatureBase
    """
    return '{} {}'.format(obj.quantity, obj.sub_class.unit.name if obj.sub_class else '')

get_sub_class

get_sub_class(obj)
Source code in django_project/civitas/serializer/features.py
def get_sub_class(self, obj):
    """
    :type obj: FeatureBase
    """
    return obj.sub_class.name if obj.sub_class else '-'

get_system

get_system(obj)
Source code in django_project/civitas/serializer/features.py
def get_system(self, obj):
    """
    :type obj: FeatureBase
    """
    return obj.system.name if obj.system else '-'

get_type

get_type(obj)
Source code in django_project/civitas/serializer/features.py
def get_type(self, obj):
    """
    :type obj: FeatureBase
    """
    return obj.type.name if obj.type else '-'

to_representation

to_representation(instance)

Additional properties

Source code in django_project/civitas/serializer/features.py
def to_representation(self, instance):
    """ Additional properties
    :type instance: FeatureBase
    """
    data = super(FeatureGeometryGeoSerializer, self).to_representation(instance)
    data['properties']['class'] = instance.the_class.name if instance.the_class else '-'
    return data

civitas.api.ReporterData

Bases: Model

Materialized View for reported data

by_community staticmethod

by_community(community: Community) -> dict

Return reporter_data by community

Source code in django_project/civitas/models/view/reporter_data.py
@staticmethod
def by_community(community: Community) -> dict:
    """
    Return reporter_data by community
    """
    return ReporterData.objects.filter(
        community_id=community.id
    )[0:2]

by_community_all staticmethod

by_community_all(community: Community) -> dict

Return reporter_data by community

Source code in django_project/civitas/models/view/reporter_data.py
@staticmethod
def by_community_all(community: Community) -> dict:
    """
    Return reporter_data by community
    """
    return ReporterData._showall(community)

dashboard staticmethod

dashboard(community: Community) -> dict

dashboard

Args: community (Community): community object

Returns: array: dictionary of each cost breakdown

Source code in django_project/civitas/models/view/reporter_data.py
@staticmethod
def dashboard(community: Community) -> dict:
    """_dashboard_

    Args:
        community (Community): community object

    Returns:
        array: dictionary of each cost breakdown
    """
    data = ReporterData.objects.filter(
        community_id=community.id
        ).exclude(
            system_name__in=[
                'Non-Municipal Infrastructure',
                'Abandoned Infrastructure'
                ]
        )

    _dashboard = DashboardData(data=data)  

    system_names = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name') \
        .distinct() \
        .annotate(label=Concat('system_name', models.Value('')))

    ##### Renewal costs of assets #####

    _renewal_cost_of_assets_a = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name', 'renewal_cost')

    _data_renewal= [{'system_name': val["system_name"], 'renewal_cost': val['renewal_cost']} for val in _renewal_cost_of_assets_a]

    renewal_sum = 0

    qs_renewal = []

    for name in system_names:
        qs_renewal.append({"system_name": name["system_name"], "label": name["label"],"values": 0.0})

    for x in _data_renewal:

        system_name = x["system_name"]

        if str(x["renewal_cost"]) == 'NaN' or not x["renewal_cost"]:
            x["renewal_cost"] = 0.00
        elif x["renewal_cost"]:
            renewal_sum = renewal_sum + float(x["renewal_cost"])
            index = next((index for (index, d) in enumerate(qs_renewal) if d["system_name"] == system_name), None)
            qs_renewal[index]["values"] = float(qs_renewal[index]["values"]) + float(x["renewal_cost"])

    #### End Renewal costs of assets ####

    ##### Maintenance #####

    _maintenance_cost_of_assets = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name', 'maintenance_cost')

    _data_maintenace = [{'system_name': val["system_name"], 'maintenance_cost': val['maintenance_cost']} for val in _maintenance_cost_of_assets]

    maintenance_sum = 0

    qs_maintenance = []

    for name in system_names:
        qs_maintenance.append({"system_name": name["system_name"], "label": name["label"],"values": 0.0})

    for x in _data_maintenace:

        system_name = x["system_name"]

        if str(x["maintenance_cost"]) == 'NaN' or not x["maintenance_cost"]:
            x["maintenance_cost"] = 0.00
        elif x["maintenance_cost"]:
            maintenance_sum = maintenance_sum + float(x["maintenance_cost"])
            index = next((index for (index, d) in enumerate(qs_maintenance) if d["system_name"] == system_name), None)
            qs_maintenance[index]["values"] = float(qs_maintenance[index]["values"]) + float(x["maintenance_cost"])

    #### End Maintenance ####

    #### Risk Renewal####

    _risk_renewal_of_assets = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name', 'risk_level', 'renewal_cost')

    _data_renewal_cost_of_assets = [{'system_name': val["system_name"], 'renewal_cost': val['renewal_cost'], 'risk_level': val['risk_level']} \
                                     for val in _risk_renewal_of_assets]

    qs_renewal_cost_of_assets = []

    for risk in _dashboard.risk_level_list:
        for name in system_names:
            qs_renewal_cost_of_assets.append({"system_name": name["system_name"], 
                                              "risk_level": risk["risk_level"], 
                                              "label": name["label"],
                                              "values": 0.0})

    risk_renewal_of_assets_sum = 0

    for x in _data_renewal_cost_of_assets:

        system_name = x["system_name"]

        risk_level = ''
        if not x['risk_level'] is None:
            risk_level = x["risk_level"].strip()
        else:
            risk_level = 'None'

        print("risk_level", risk_level)

        if str(x["renewal_cost"]) == 'NaN' or not x["renewal_cost"]:
            x["renewal_cost"] = 0.00
        else:
            risk_renewal_of_assets_sum = risk_renewal_of_assets_sum + Decimal(x["renewal_cost"])
        index = next((index for (index, d) in enumerate(qs_renewal_cost_of_assets) \
                       if d["system_name"] == system_name and d["risk_level"] == risk_level), None)
        if index :
            qs_renewal_cost_of_assets[index]["values"] = float(qs_renewal_cost_of_assets[index]["values"]) + float(x["renewal_cost"])

    #### END Risk Renewal####

    #### remaining years renewal cost ####

    _remaining_years_renewal_cost = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name', 'renewal_cost') 

    #### End remaining years renewal cost ####
    remaining_years_renewal_risk_cost = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name', 'risk_level') \
        .distinct() \
        .annotate(label=Concat('system_name', models.Value('')), values=Sum('renewal_cost'))

    ###### Annual Reserve  #########
    _annual_reserve = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name', 'annual_reserve') 

    _data_annual_reserve = [{'system_name': val["system_name"], 'annual_reserve': val['annual_reserve']} for val in _annual_reserve]

    annual_sum = 0

    qs_annual_data = []

    for name in system_names:
        qs_annual_data.append({"system_name": name["system_name"], "label": name["label"],"values": 0.0})

    for x in _data_annual_reserve:

        system_name = x["system_name"]

        annual_reserve = 0
        if str(x["annual_reserve"]) == 'NaN' or not x["annual_reserve"]:
            x["annual_reserve"] = 0.00
            annual_reserve = 0.00
        else:
            annual_reserve = x["annual_reserve"]

        annual_sum = annual_sum + Decimal(annual_reserve)

        index = next((index for (index, d) in enumerate(qs_annual_data) if d["system_name"] == system_name), None)
        qs_annual_data[index]["values"] = float(qs_annual_data[index]["values"]) + float(annual_reserve)

    ###### End Annual Reserve  #########

    qs = [
        {
            'id': 'renewal_cost', 
            "title": "Renewal costs of assets",
            "qs": qs_renewal, 
            'type': 'non_stacked', 
            'sum': {"sum": renewal_sum},
            'description': 'Replacement costs are based on current available costs and include the following components: Capital Costs - 65%, Contingency - 15%, Design - 10%, Inspections and Removal - 10%. As a starting point, a default replacement cost is applied for each asset type.  However, in some cases, where the above general formula is not applicable, or requires significantly less or more effort in one of the above areas, a custom cost might have been applied. This value will override the default value.'
        },              
        {
            'id': 'maintenenance_cost', 
            "title": "Maintenance costs of assets",
            "qs": qs_maintenance, 
            'type': 'non_stacked',
            'sum': {"sum": maintenance_sum},
            'description': 'Maintenance costs are the estimated annual cost to maintain assets. As a starting point, a default value of 10% of the renewal cost is used'
        },   
        {
            'id': 'annual_reserve', 
            "title": "Annual Average Infrastructure Demand",
            "qs": qs_annual_data, 
            'type': 'non_stacked',
            'sum': {'sum': annual_sum},
            'description':  "This graph uses lifespan projections and renewal costs for a long-term outlook of  infrastructure. This projection is theoretical and is not a realistic indication of spending timelines. A valuable output of this projection is an annualized infrastructure demand, indicated as a dotted line on the graph. This annualized value is obtained by dividing the renewal cost by the lifespan for each asset in the database and then summing the total. As lifespan and renewal cost data are updated, the annual infrastructure demand will update. The annual infrastructure demand could be lowered by committing to operations and maintenance programs to extend lifespans, deciding to rehabilitate versus replace, and more. The values shown in the graph is based on current $ values and the actual value of this average annual investment will increase over time with inflation."
        },   
        {
            'id': 'system_risk_renewal', 
            "title": "Risk By System",
            "qs": qs_renewal_cost_of_assets, 
            "formatted": _dashboard.stacked_a(qs_renewal_cost_of_assets),
            "total_bottom": _dashboard.stacked_a_total(_dashboard.stacked_a(qs_renewal_cost_of_assets)),
            'type': 'stacked_a',
            'sum': {"sum": risk_renewal_of_assets_sum},
            'description': 'A risk value is obtained by combining Probability of Failure  (PoF) and Consequence of Failure (CoF) values as per the following matrix. It is common asset management practice to shift the matrix in favour of the consequence of failure,'
        },        
        {
            'id': 'remaining_years_renewal_system', 
            "title": "Remaining Years by Renewal Cost by System",
            "qs": _remaining_years_renewal_cost, 
            "formatted": _dashboard.stacked_b_list(system_names),
            "total": _dashboard.stacked_b_total(_dashboard.stacked_b_list(system_names), system_names=system_names),
            "graph": _dashboard.stacked_b_graph(_dashboard.stacked_b_list(system_names), system_names=system_names),
            "pdf_table_1": _dashboard.pdf_table_1(_dashboard.stacked_b_list(system_names)),
            "pdf_table_2": _dashboard.pdf_table_2(_dashboard.stacked_b_list(system_names)),
            'type': 'stacked_b',
            'system_names': system_names,
            'sum': {"sum": risk_renewal_of_assets_sum},
            'description': ''
        },   
        {
            'id': 'remaining_years_renewal_risk', 
            "title": "Remaining Years by Renewal Cost by Risk",
            "qs": remaining_years_renewal_risk_cost, 
            "formatted":  _dashboard.stacked_c_list(),
            "graph":  _dashboard.stacked_c_graph( _dashboard.stacked_c_list()),
            "total": _dashboard.stacked_c_total( _dashboard.stacked_c_list()),
            "pdf_table_1": _dashboard.pdf_table_1(_dashboard.stacked_c_list()),
            "pdf_table_2": _dashboard.pdf_table_2(_dashboard.stacked_c_list()),
            'type': 'stacked_c',
            'risk_levels': _dashboard.risk_level_list,
            'sum': {"sum": risk_renewal_of_assets_sum},
            'description': ''
        },  
    ]

    return qs

summary_cof staticmethod

summary_cof(community: Community) -> dict

Return summary of cof for specific community The return will be grouped by class

Source code in django_project/civitas/models/view/reporter_data.py
@staticmethod
def summary_cof(community: Community) -> dict:
    """
    Return summary of cof for specific community
    The return will be grouped by class
    """

    cof_dict = ReporterData._summary(community, 'cof_name')

    for val in cof_dict:
        for x in val:
            if str(val[x]) == 'nan' or val[x] == 'NaN':
                val[x] = 0
    return cof_dict

summary_pof staticmethod

summary_pof(community: Community) -> dict

Return summary of pof for specific community The return will be grouped by class

Source code in django_project/civitas/models/view/reporter_data.py
@staticmethod
def summary_pof(community: Community) -> dict:
    """
    Return summary of pof for specific community
    The return will be grouped by class
    """
    pof_dict = ReporterData._summary(community, 'pof_name')

    for val in pof_dict:
        for x in val:
            if str(val[x]) == 'nan' or val[x] == 'NaN':
                val[x] = 0
    return pof_dict

summary_risk staticmethod

summary_risk(community: Community) -> dict

Return summary of risk for specific community The return will be grouped by class

Source code in django_project/civitas/models/view/reporter_data.py
@staticmethod
def summary_risk(community: Community) -> dict:
    """
    Return summary of risk for specific community
    The return will be grouped by class
    """

    risk_dict = ReporterData._summary(community, 'risk_level')

    for val in risk_dict:
        for x in val:
            if str(val[x]) == 'nan' or val[x] == 'NaN':
                val[x] = 0
    return risk_dict

civitas.api.ReporterDataDownloadAPI

Bases: APIView

Download csv file for the reporter data

get

get(request, pk)

Return data of features

Source code in django_project/civitas/api/community.py
def get(self, request, pk):
    """ Return data of features """
    community = get_object_or_404(
        Community, pk=pk
    )
    response = HttpResponse(content_type='text/csv')
    response['Content-Disposition'] = f'attachment; filename="{community.name} Reporter Data.csv"'

    data = ReporterDataSerializer(ReporterData.by_community(community), many=True).data
    header = ReporterDataSerializer.Meta.fields

    writer = csv.DictWriter(response, fieldnames=header)
    writer.writeheader()
    for row in data:
        writer.writerow(row)

    return response

civitas.api.ReporterDataGeojsonDetailAPI

Bases: APIView

get: Return geojson of a feature

get

get(request, pk)

Return data of features

Source code in django_project/civitas/api/features.py
def get(self, request, pk):
    """ Return data of features """
    feature = get_object_or_404(FeatureBase, pk=pk)
    return Response(
        FeatureDataGeoSerializer(feature).data
    )

civitas.api.SystemNameAPI

Bases: APIView

Return summary_pof of ReporterData

get

get(request, pk)

Return data of features

Source code in django_project/civitas/api/community.py
def get(self, request, pk):
    """ Return data of features """
    community = get_object_or_404(
        Community, pk=pk
    )

    data = []

    sql = f"SELECT DISTINCT system_name, asset_identifier FROM mv_features WHERE community_id = {pk};"
    with connections[settings.CIVITAS_DATABASE].cursor() as cursor:
        cursor.execute(sql)
        rows = cursor.fetchall()
        for row in rows:
            system_name = row[0]
            asset_identifier = row[1]
            if not any(d['system_name'] == system_name for d in data):
                data.append({"system_name": system_name, "asset_identifier": [asset_identifier]})
            else:
                index = [i for i,_ in enumerate(data) if _['system_name'] == system_name][0]
                data[index]["asset_identifier"].append(asset_identifier)

    return Response(
        data
    )

Migrations

civitas.migrations

Models

civitas.models

civitas.models.AimsoirFeatureCode

Bases: Model

AimsoirFeatureCode

civitas.models.COF

Bases: Model

Consequence of Failure

civitas.models.CapitalProject

Bases: Model

Capital project object

civitas.models.CapitalProjectFeatureCombination

Bases: Model

Capital project feature combinationobject

civitas.models.CivicAddress

Bases: Model

Civic Address object

civitas.models.Community

Bases: _Administrative

Administrative community

civitas.models.Condition

Bases: _Term

Condition of feature 1 very good 2 good 3 fair 4 poor 5 very poor

civitas.models.DashboardData

DashboardData(data)
Source code in django_project/civitas/models/view/dashboard_data.py
def __init__(self, data):
    self.data = data

    self.risk_level_list = [
        {"risk_level":"Extreme", "index": 5}, 
        {"risk_level":"High" , "index": 4}, 
        {"risk_level":"Medium" , "index": 3}, 
        {"risk_level":"Low", "index": 2}, 
        {"risk_level":"Minimal", "index": 1}, 
        {"risk_level":"None", "index": 0}
    ]

stacked_a

stacked_a(risk_renewal_of_assets)

risk_renewal_assets_list

Returns: formatted list to be used correctly in table

Source code in django_project/civitas/models/view/dashboard_data.py
def stacked_a(self, risk_renewal_of_assets):
    """risk_renewal_assets_list

    Returns:
        formatted list to be used correctly in table
    """
    risk_renewal_formatted_list = []
    total_sum = 0
    for k in list(risk_renewal_of_assets):
        if k["values"] is not None:
            values = k["values"]
        else:
            values = 0

        if k not in risk_renewal_formatted_list:
            risk_renewal_formatted_list\
                .append({
                    'system_name': k["system_name"], 
                    "risk": [], 
                    "total_value": 0, 
                    "total_per": 0
                })
            total_sum = float(total_sum) + float(values)

    final_risk_renewal = [i for j, i in enumerate(risk_renewal_formatted_list) if i not in risk_renewal_formatted_list[:j]]

    for val in list(risk_renewal_of_assets):
        for key in final_risk_renewal:
            if key['system_name'] == val["system_name"]:
                value = val['risk_level']

                if str(val["values"] ) == "NaN" or str(val["values"] ) == "nan":
                    _values = 0

                elif val["values"] is not None:
                    _values = val["values"]

                else:
                    _values = 0

                key["risk"].append({"risk_level": value, "values": _values})
                key["total_value"] = float(key["total_value"]) + float(_values)
                key["total_per"] = (float(key["total_value"] ) / total_sum) * 100

    return final_risk_renewal

stacked_b_graph

stacked_b_graph(input_list, system_names)

remaining_years_renewal_system

Returns: formatted list to be used correctly in graph

Source code in django_project/civitas/models/view/dashboard_data.py
def stacked_b_graph(self, input_list, system_names):
    """remaining_years_renewal_system

    Returns:
        formatted list to be used correctly in graph
    """
    formatted_list = []   

    # start with basic list
    for system in system_names:
        formatted_list\
            .append({'name': system["system_name"], 
                               'type': 'bar', 
                               'x': [], 
                               'y': []}
                            )

    for input in input_list:
        for list in formatted_list:
            for asset in input['asset']:
                if list['name'] == asset['system_name']:
                    list["x"].append(input["remaining_years"])
                    if isinstance(asset["values"], float) or isinstance(asset["values"], Decimal):
                        list["y"].append(float(asset["values"]))

    return formatted_list

stacked_b_list

stacked_b_list(system_names)

remaining_years_renewal_system

Returns: formatted list to be used correctly in table and graph

Source code in django_project/civitas/models/view/dashboard_data.py
def stacked_b_list(self, system_names):
    """remaining_years_renewal_system

    Returns:
        formatted list to be used correctly in table and graph
    """

    # find lowest value
    # lowest = (min(data, key=lambda x:x['remaining_years']))

    # create interval of 5 years
    remaining_years_formatted_list = []
    for x in range(95, -6, -5):
        remaining_years_formatted_list.append({
            'remaining_years': f"{x} - {x+4}", 
            'asset': [], 
            'total': 0
        })

    # append data to list for interval
    for k in self.data:
        for val in remaining_years_formatted_list:
            str_split = val["remaining_years"].split(" - ")
            min = int(str_split[0])
            max = int(str_split[1])
            if k.remaining_years is not None:
                remaining_years = k.remaining_years 
            else:
                remaining_years = 0

            if min <= remaining_years <= max:
                if min <= remaining_years <= max:

                    if str(k.renewal_cost) == "NaN":
                        renewal_cost = 0

                    elif k.renewal_cost is not None :
                        renewal_cost = k.renewal_cost
                    else:
                        renewal_cost = 0

                    if not any(d['system_name'] == k.system_name for d in val['asset']):
                        val["asset"]\
                        .append({
                            'system_name': k.system_name, 
                            'values': renewal_cost
                        })
                        val['total'] = val['total'] + renewal_cost
                    else:
                        index = [i for i,_ in enumerate(val['asset']) if _['system_name'] == k.system_name][0]
                        val['asset'][index]['values'] = val['asset'][index]['values'] + renewal_cost
                        val['total'] = val['total'] + renewal_cost

    # add system names where value is 0 for graph to work correctly
    for k in remaining_years_formatted_list:
        for system in system_names:
            if not any(d['system_name'] == system["system_name"] for d in k["asset"]):
                k["asset"]\
                    .append({
                        'system_name': system["system_name"], 
                        'values': 0.00
                    })

    # sort list alphabetically
    for k in remaining_years_formatted_list:
        new_list = sorted(k['asset'], key=operator.itemgetter('system_name'))     
        k['asset'] = new_list

    return reversed(remaining_years_formatted_list)

stacked_c_graph

stacked_c_graph(input_list)

remaining_years_renewal_system

Returns: formatted list to be used correctly in graph

Source code in django_project/civitas/models/view/dashboard_data.py
def stacked_c_graph(self, input_list):
    """remaining_years_renewal_system

    Returns:
        formatted list to be used correctly in graph
    """
    formatted_list = []   

    # start with basic list
    for risk in self.risk_level_list:
        formatted_list\
            .append({
                'name': risk["risk_level"], 
                'type': 'bar', 'x': [], 'y': []
            })

    for input in input_list:
        for asset in input['risk_level']:
            for list in formatted_list:

                if list['name'] == asset['level']:
                    list["x"].append(input["remaining_years"])
                    list["y"].append(asset["values"])

    return formatted_list

stacked_c_list

stacked_c_list()

remaining_years_renewal_system

Returns: formatted list to be used correctly in table and graph

Source code in django_project/civitas/models/view/dashboard_data.py
def stacked_c_list(self):
        """remaining_years_renewal_system

        Returns:
            formatted list to be used correctly in table and graph
        """
        # create interval of 5 years
        remaining_years_formatted_list = []
        for x in range(95, -6, -5):
            remaining_years_formatted_list.append({
                'remaining_years': f"{x} - {x+4}", 
                'risk_level': [], 
                'total': 0
            })

        # append data to list for interval
        for k in self.data:
            for val in remaining_years_formatted_list:
                str_split = val["remaining_years"].split(" - ")
                min = int(str_split[0])
                max = int(str_split[1])

                if k.remaining_years is not None:
                    remaining_years = k.remaining_years 
                else:
                    remaining_years = 0

                if min <= remaining_years <= max:
                    if k.risk_level is None:
                            risk_lev = "None"
                    else:
                        risk_lev = k.risk_level.strip()

                    risk_level_index = [i for i,_ in enumerate(self.risk_level_list) if _['risk_level'] == risk_lev][0]

                    if not any(d['level'] == risk_lev for d in val['risk_level']):
                        if str(k.renewal_cost) == 'NaN':
                            val['total'] = float(0.00)
                            val["risk_level"]\
                            .append({'level': risk_lev, 
                                    'values': 0.00, 
                                    'index': 0
                                    })
                        if k.renewal_cost is not None:
                            val["risk_level"]\
                                .append({
                                    'level': risk_lev, 
                                    'values': k.renewal_cost, 
                                    'index': self.risk_level_list[risk_level_index]['index']
                                })
                            val['total'] = float(val['total']) + float(k.renewal_cost)
                        else: 
                            val['total'] = float(0.00)
                            val["risk_level"]\
                            .append({'level': risk_lev, 
                                    'values': 0.00, 
                                    'index': 0
                                    })
                    else:
                        index = [i for i,_ in enumerate(val['risk_level']) if _['level'] == risk_lev][0]
                        if str(k.renewal_cost) == 'NaN':
                            val['risk_level'][index]['values'] = float(val['risk_level'][index]['values']) + float(0)
                            val['total'] = float(val['total']) + float(0)
                        elif k.renewal_cost is not None:
                            val['risk_level'][index]['values'] = float(val['risk_level'][index]['values']) + float(k.renewal_cost)
                            val['total'] = float(val['total']) + float(k.renewal_cost)

        # add system names where value is 0 for graph to work correctly
        for k in remaining_years_formatted_list:
            for lev in self.risk_level_list:
                if not any(d['level'] == lev["risk_level"] for d in k["risk_level"]):
                    risk_level_index = [i for i,_ in enumerate(self.risk_level_list) if _['risk_level'] == lev["risk_level"]][0]
                    k["risk_level"]\
                        .append({
                            'level': lev["risk_level"], 
                            'values': 0.00, 
                            'index': self.risk_level_list[risk_level_index]['index']
                        })

        # sort list from highest to lowest
        for k in remaining_years_formatted_list:
            new_list = sorted(k['risk_level'], key=operator.itemgetter('index'))     
            k['risk_level'] = reversed(new_list)

        return reversed(remaining_years_formatted_list)

civitas.models.Deterioration

Bases: _Term

The process of becoming progressively worse. Contains name and the equation

civitas.models.FeatureBase

Bases: Model

Model for base feature. Every feature needs to override this model

calculation

calculation()

Calculate all field that needs calculation

Source code in django_project/civitas/models/feature/feature_base.py
def calculation(self):
    """ Calculate all field that needs calculation"""
    if self.featurecalculation_set.all().count():
        return self.featurecalculation_set.all()[0]
    return None

lifespan

lifespan()

Return lifespan of feature

Source code in django_project/civitas/models/feature/feature_base.py
def lifespan(self):
    """ Return lifespan of feature"""
    if not self.type:
        return None
    return self.type.lifespan

civitas.models.FeatureClass

Bases: _Term

The first Level of the Asset Hierarchy as defined in "Background" Sheet ie. TRN = Transportation

civitas.models.FeatureGeometry

Bases: Model

Geometry of feature base

geometry

geometry()

return geometry of feature

Source code in django_project/civitas/models/feature/feature_geometry.py
def geometry(self):
    """ return geometry of feature """
    if self.geom_point:
        return self.geom_point
    if self.geom_line:
        return self.geom_line
    if self.geom_polygon:
        return self.geom_polygon

civitas.models.FeatureProperty

Bases: Model

This is additional property for the feature

civitas.models.FeatureSubClass

Bases: _Term

The second Level of the Asset Hierarchy as defined in "Background" Sheet ie. RD = Roads It is linked with AssetClass

civitas.models.FeatureType

Bases: _Term

The third Level of the Asset Hierarchy as defined in "Background" Sheet

civitas.models.FeatureTypeCombination

Bases: _Term

Combination between class, subclass and type

civitas.models.POF

Bases: Model

Probability of Failure

civitas.models.Property

Bases: Model

This is property for feature

civitas.models.Province

Bases: _Administrative

Administrative province

civitas.models.Region

Bases: _Administrative

Administrative region

civitas.models.ReporterData

Bases: Model

Materialized View for reported data

by_community staticmethod

by_community(community: Community) -> dict

Return reporter_data by community

Source code in django_project/civitas/models/view/reporter_data.py
@staticmethod
def by_community(community: Community) -> dict:
    """
    Return reporter_data by community
    """
    return ReporterData.objects.filter(
        community_id=community.id
    )[0:2]

by_community_all staticmethod

by_community_all(community: Community) -> dict

Return reporter_data by community

Source code in django_project/civitas/models/view/reporter_data.py
@staticmethod
def by_community_all(community: Community) -> dict:
    """
    Return reporter_data by community
    """
    return ReporterData._showall(community)

dashboard staticmethod

dashboard(community: Community) -> dict

dashboard

Args: community (Community): community object

Returns: array: dictionary of each cost breakdown

Source code in django_project/civitas/models/view/reporter_data.py
@staticmethod
def dashboard(community: Community) -> dict:
    """_dashboard_

    Args:
        community (Community): community object

    Returns:
        array: dictionary of each cost breakdown
    """
    data = ReporterData.objects.filter(
        community_id=community.id
        ).exclude(
            system_name__in=[
                'Non-Municipal Infrastructure',
                'Abandoned Infrastructure'
                ]
        )

    _dashboard = DashboardData(data=data)  

    system_names = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name') \
        .distinct() \
        .annotate(label=Concat('system_name', models.Value('')))

    ##### Renewal costs of assets #####

    _renewal_cost_of_assets_a = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name', 'renewal_cost')

    _data_renewal= [{'system_name': val["system_name"], 'renewal_cost': val['renewal_cost']} for val in _renewal_cost_of_assets_a]

    renewal_sum = 0

    qs_renewal = []

    for name in system_names:
        qs_renewal.append({"system_name": name["system_name"], "label": name["label"],"values": 0.0})

    for x in _data_renewal:

        system_name = x["system_name"]

        if str(x["renewal_cost"]) == 'NaN' or not x["renewal_cost"]:
            x["renewal_cost"] = 0.00
        elif x["renewal_cost"]:
            renewal_sum = renewal_sum + float(x["renewal_cost"])
            index = next((index for (index, d) in enumerate(qs_renewal) if d["system_name"] == system_name), None)
            qs_renewal[index]["values"] = float(qs_renewal[index]["values"]) + float(x["renewal_cost"])

    #### End Renewal costs of assets ####

    ##### Maintenance #####

    _maintenance_cost_of_assets = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name', 'maintenance_cost')

    _data_maintenace = [{'system_name': val["system_name"], 'maintenance_cost': val['maintenance_cost']} for val in _maintenance_cost_of_assets]

    maintenance_sum = 0

    qs_maintenance = []

    for name in system_names:
        qs_maintenance.append({"system_name": name["system_name"], "label": name["label"],"values": 0.0})

    for x in _data_maintenace:

        system_name = x["system_name"]

        if str(x["maintenance_cost"]) == 'NaN' or not x["maintenance_cost"]:
            x["maintenance_cost"] = 0.00
        elif x["maintenance_cost"]:
            maintenance_sum = maintenance_sum + float(x["maintenance_cost"])
            index = next((index for (index, d) in enumerate(qs_maintenance) if d["system_name"] == system_name), None)
            qs_maintenance[index]["values"] = float(qs_maintenance[index]["values"]) + float(x["maintenance_cost"])

    #### End Maintenance ####

    #### Risk Renewal####

    _risk_renewal_of_assets = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name', 'risk_level', 'renewal_cost')

    _data_renewal_cost_of_assets = [{'system_name': val["system_name"], 'renewal_cost': val['renewal_cost'], 'risk_level': val['risk_level']} \
                                     for val in _risk_renewal_of_assets]

    qs_renewal_cost_of_assets = []

    for risk in _dashboard.risk_level_list:
        for name in system_names:
            qs_renewal_cost_of_assets.append({"system_name": name["system_name"], 
                                              "risk_level": risk["risk_level"], 
                                              "label": name["label"],
                                              "values": 0.0})

    risk_renewal_of_assets_sum = 0

    for x in _data_renewal_cost_of_assets:

        system_name = x["system_name"]

        risk_level = ''
        if not x['risk_level'] is None:
            risk_level = x["risk_level"].strip()
        else:
            risk_level = 'None'

        print("risk_level", risk_level)

        if str(x["renewal_cost"]) == 'NaN' or not x["renewal_cost"]:
            x["renewal_cost"] = 0.00
        else:
            risk_renewal_of_assets_sum = risk_renewal_of_assets_sum + Decimal(x["renewal_cost"])
        index = next((index for (index, d) in enumerate(qs_renewal_cost_of_assets) \
                       if d["system_name"] == system_name and d["risk_level"] == risk_level), None)
        if index :
            qs_renewal_cost_of_assets[index]["values"] = float(qs_renewal_cost_of_assets[index]["values"]) + float(x["renewal_cost"])

    #### END Risk Renewal####

    #### remaining years renewal cost ####

    _remaining_years_renewal_cost = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name', 'renewal_cost') 

    #### End remaining years renewal cost ####
    remaining_years_renewal_risk_cost = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name', 'risk_level') \
        .distinct() \
        .annotate(label=Concat('system_name', models.Value('')), values=Sum('renewal_cost'))

    ###### Annual Reserve  #########
    _annual_reserve = data.exclude(system_name=None) \
        .order_by('system_name') \
        .values('system_name', 'annual_reserve') 

    _data_annual_reserve = [{'system_name': val["system_name"], 'annual_reserve': val['annual_reserve']} for val in _annual_reserve]

    annual_sum = 0

    qs_annual_data = []

    for name in system_names:
        qs_annual_data.append({"system_name": name["system_name"], "label": name["label"],"values": 0.0})

    for x in _data_annual_reserve:

        system_name = x["system_name"]

        annual_reserve = 0
        if str(x["annual_reserve"]) == 'NaN' or not x["annual_reserve"]:
            x["annual_reserve"] = 0.00
            annual_reserve = 0.00
        else:
            annual_reserve = x["annual_reserve"]

        annual_sum = annual_sum + Decimal(annual_reserve)

        index = next((index for (index, d) in enumerate(qs_annual_data) if d["system_name"] == system_name), None)
        qs_annual_data[index]["values"] = float(qs_annual_data[index]["values"]) + float(annual_reserve)

    ###### End Annual Reserve  #########

    qs = [
        {
            'id': 'renewal_cost', 
            "title": "Renewal costs of assets",
            "qs": qs_renewal, 
            'type': 'non_stacked', 
            'sum': {"sum": renewal_sum},
            'description': 'Replacement costs are based on current available costs and include the following components: Capital Costs - 65%, Contingency - 15%, Design - 10%, Inspections and Removal - 10%. As a starting point, a default replacement cost is applied for each asset type.  However, in some cases, where the above general formula is not applicable, or requires significantly less or more effort in one of the above areas, a custom cost might have been applied. This value will override the default value.'
        },              
        {
            'id': 'maintenenance_cost', 
            "title": "Maintenance costs of assets",
            "qs": qs_maintenance, 
            'type': 'non_stacked',
            'sum': {"sum": maintenance_sum},
            'description': 'Maintenance costs are the estimated annual cost to maintain assets. As a starting point, a default value of 10% of the renewal cost is used'
        },   
        {
            'id': 'annual_reserve', 
            "title": "Annual Average Infrastructure Demand",
            "qs": qs_annual_data, 
            'type': 'non_stacked',
            'sum': {'sum': annual_sum},
            'description':  "This graph uses lifespan projections and renewal costs for a long-term outlook of  infrastructure. This projection is theoretical and is not a realistic indication of spending timelines. A valuable output of this projection is an annualized infrastructure demand, indicated as a dotted line on the graph. This annualized value is obtained by dividing the renewal cost by the lifespan for each asset in the database and then summing the total. As lifespan and renewal cost data are updated, the annual infrastructure demand will update. The annual infrastructure demand could be lowered by committing to operations and maintenance programs to extend lifespans, deciding to rehabilitate versus replace, and more. The values shown in the graph is based on current $ values and the actual value of this average annual investment will increase over time with inflation."
        },   
        {
            'id': 'system_risk_renewal', 
            "title": "Risk By System",
            "qs": qs_renewal_cost_of_assets, 
            "formatted": _dashboard.stacked_a(qs_renewal_cost_of_assets),
            "total_bottom": _dashboard.stacked_a_total(_dashboard.stacked_a(qs_renewal_cost_of_assets)),
            'type': 'stacked_a',
            'sum': {"sum": risk_renewal_of_assets_sum},
            'description': 'A risk value is obtained by combining Probability of Failure  (PoF) and Consequence of Failure (CoF) values as per the following matrix. It is common asset management practice to shift the matrix in favour of the consequence of failure,'
        },        
        {
            'id': 'remaining_years_renewal_system', 
            "title": "Remaining Years by Renewal Cost by System",
            "qs": _remaining_years_renewal_cost, 
            "formatted": _dashboard.stacked_b_list(system_names),
            "total": _dashboard.stacked_b_total(_dashboard.stacked_b_list(system_names), system_names=system_names),
            "graph": _dashboard.stacked_b_graph(_dashboard.stacked_b_list(system_names), system_names=system_names),
            "pdf_table_1": _dashboard.pdf_table_1(_dashboard.stacked_b_list(system_names)),
            "pdf_table_2": _dashboard.pdf_table_2(_dashboard.stacked_b_list(system_names)),
            'type': 'stacked_b',
            'system_names': system_names,
            'sum': {"sum": risk_renewal_of_assets_sum},
            'description': ''
        },   
        {
            'id': 'remaining_years_renewal_risk', 
            "title": "Remaining Years by Renewal Cost by Risk",
            "qs": remaining_years_renewal_risk_cost, 
            "formatted":  _dashboard.stacked_c_list(),
            "graph":  _dashboard.stacked_c_graph( _dashboard.stacked_c_list()),
            "total": _dashboard.stacked_c_total( _dashboard.stacked_c_list()),
            "pdf_table_1": _dashboard.pdf_table_1(_dashboard.stacked_c_list()),
            "pdf_table_2": _dashboard.pdf_table_2(_dashboard.stacked_c_list()),
            'type': 'stacked_c',
            'risk_levels': _dashboard.risk_level_list,
            'sum': {"sum": risk_renewal_of_assets_sum},
            'description': ''
        },  
    ]

    return qs

summary_cof staticmethod

summary_cof(community: Community) -> dict

Return summary of cof for specific community The return will be grouped by class

Source code in django_project/civitas/models/view/reporter_data.py
@staticmethod
def summary_cof(community: Community) -> dict:
    """
    Return summary of cof for specific community
    The return will be grouped by class
    """

    cof_dict = ReporterData._summary(community, 'cof_name')

    for val in cof_dict:
        for x in val:
            if str(val[x]) == 'nan' or val[x] == 'NaN':
                val[x] = 0
    return cof_dict

summary_pof staticmethod

summary_pof(community: Community) -> dict

Return summary of pof for specific community The return will be grouped by class

Source code in django_project/civitas/models/view/reporter_data.py
@staticmethod
def summary_pof(community: Community) -> dict:
    """
    Return summary of pof for specific community
    The return will be grouped by class
    """
    pof_dict = ReporterData._summary(community, 'pof_name')

    for val in pof_dict:
        for x in val:
            if str(val[x]) == 'nan' or val[x] == 'NaN':
                val[x] = 0
    return pof_dict

summary_risk staticmethod

summary_risk(community: Community) -> dict

Return summary of risk for specific community The return will be grouped by class

Source code in django_project/civitas/models/view/reporter_data.py
@staticmethod
def summary_risk(community: Community) -> dict:
    """
    Return summary of risk for specific community
    The return will be grouped by class
    """

    risk_dict = ReporterData._summary(community, 'risk_level')

    for val in risk_dict:
        for x in val:
            if str(val[x]) == 'nan' or val[x] == 'NaN':
                val[x] = 0
    return risk_dict

civitas.models.System

Bases: _Term

System is a collection of all of feature

civitas.models.Unit

Bases: _Term

Unit

Serializers

civitas.serializer

Utilities

civitas.views

Views

civitas.views