Chronicle Parser - JSON

Hi All,

Can we parse a log with the below mentioned format with out making any changes while ingesting?

[
  {
    "header": {
      "name": "EcoScope Data",
      "well": "35/12-6S",
      "field": "Fram",
      "date": "2022-06-14",
      "operator": "GeoSoft",
      "startIndex": 2907.79,
      "endIndex": 2907.84,
      "step": 0.01
    },
    "curves": [
      {
        "name": "MD",
        "description": "Measured depth",
        "quantity": "length",
        "unit": "m",
        "valueType": "float",
        "dimensions": 1
      },
      {
        "name": "A40H",
        "description": "Attenuation resistivity 40 inch",
        "quantity": "electrical resistivity",
        "unit": "ohm.m",
        "valueType": "float",
        "dimensions": 1
      }
    ],
    "data": [
      [2907.79, 29.955],
      [2907.80, 28.892],
      [2907.81, 27.868],
      [2907.82, 31.451],
      [2907.83, 28.080],
      [2907.84, 27.733]
    ]
  }
]

 

Solved Solved
0 2 673
1 ACCEPTED SOLUTION

๐Ÿ‘‹ It would require flattening and extract the JSON log from the event, e.g.,

 

cat json.log | jq -c '.[]'

Otherwise there will be an error when you try to use the JSON input plugin.  Alternatively you could look to use a GROK regex to extract the inner JSON from the JSON array, but flattening and having a single new line delimited record would be recommended. 

An example Parser would then look as follows:

filter {

  json {
    source => "message"
    array_function => "split_columns"
  }
  #TODO(add error handling in case of JSON extraction failure

  mutate {
    replace => {
      "event1.idm.read_only_udm.metadata.event_type" => "GENERIC_EVENT"
    }
  }
  #TODO(evalute a more specific UDM event type)

  mutate {
    replace => {
      "header_name.key" => "header_name"
      "header_name.value.string_value" => "%{header.name}"
    }
  }
  #TODO(add your remainding headers)

  mutate {
    merge => {
      "event1.idm.read_only_udm.additional.fields" => "header_name"
    }
  }

  for idx, curve in curves {
    mutate {
      convert => {
        "idx" => "string"
      }
    }
    mutate {
      replace => {
        "_curve_name.key" => "curve_name[%{idx}]"
        "_curve_name.value.string_value" => "%{curve.name}"
      }
    }

    mutate {
      merge => {
        "event1.idm.read_only_udm.additional.fields" => "_curve_name"
      }
    }

    mutate {
      replace => {
        "_curve_name" => ""
      }
    }
  }
  #TODO(extract the remaining curve fields)

  for idx,each_record in data {

    mutate {
      convert => {
        "idx" => "string"
      }
    }
    for subidx,each_value in each_record  {

      mutate {
        convert => {
          "subidx" => "string"
          "each_value" => "string"
        }
      }
      mutate {
        replace => {
          "_data.key" => "data[%{idx},%{subidx}]"
          "_data.value.string_value" => "%{each_value}"
        }
      }

      mutate {
        merge => {
          "event1.idm.read_only_udm.additional.fields" => "_data"
        }
      }

      mutate {
        replace => {
          "_data" => ""
        }
      }


    }
  }

  mutate {
    merge => {
      "@output" => "event1"
    }
  }

}

And example output:

 

events: <
  timestamp: <
    seconds: 1700741914
    nanos: 719837698
  >
  idm: <
    read_only_udm: <
      metadata: <
        event_timestamp: <
          seconds: 1700741914
          nanos: 719837698
        >
        event_type: GENERIC_EVENT
      >
      additional: <
        fields: <
          key: "curve_name[0]"
          value: <
            string_value: "MD"
          >
        >
        fields: <
          key: "curve_name[1]"
          value: <
            string_value: "A40H"
          >
        >
        fields: <
          key: "data[0,0]"
          value: <
            string_value: "2907.79"
          >
        >
        fields: <
          key: "data[0,1]"
          value: <
            string_value: "29.955"
          >
        >
        fields: <
          key: "data[1,0]"
          value: <
            string_value: "2907.8"
          >
        >
        fields: <
          key: "data[1,1]"
          value: <
            string_value: "28.892"
          >
        >
        fields: <
          key: "data[2,0]"
          value: <
            string_value: "2907.81"
          >
        >
        fields: <
          key: "data[2,1]"
          value: <
            string_value: "27.868"
          >
        >
        fields: <
          key: "data[3,0]"
          value: <
            string_value: "2907.82"
          >
        >
        fields: <
          key: "data[3,1]"
          value: <
            string_value: "31.451"
          >
        >
        fields: <
          key: "data[4,0]"
          value: <
            string_value: "2907.83"
          >
        >
        fields: <
          key: "data[4,1]"
          value: <
            string_value: "28.08"
          >
        >
        fields: <
          key: "data[5,0]"
          value: <
            string_value: "2907.84"
          >
        >
        fields: <
          key: "data[5,1]"
          value: <
            string_value: "27.733"
          >
        >
        fields: <
          key: "header_name"
          value: <
            string_value: "EcoScope Data"
          >
        >
      >
    >
  >
>

 

View solution in original post

2 REPLIES 2

๐Ÿ‘‹ It would require flattening and extract the JSON log from the event, e.g.,

 

cat json.log | jq -c '.[]'

Otherwise there will be an error when you try to use the JSON input plugin.  Alternatively you could look to use a GROK regex to extract the inner JSON from the JSON array, but flattening and having a single new line delimited record would be recommended. 

An example Parser would then look as follows:

filter {

  json {
    source => "message"
    array_function => "split_columns"
  }
  #TODO(add error handling in case of JSON extraction failure

  mutate {
    replace => {
      "event1.idm.read_only_udm.metadata.event_type" => "GENERIC_EVENT"
    }
  }
  #TODO(evalute a more specific UDM event type)

  mutate {
    replace => {
      "header_name.key" => "header_name"
      "header_name.value.string_value" => "%{header.name}"
    }
  }
  #TODO(add your remainding headers)

  mutate {
    merge => {
      "event1.idm.read_only_udm.additional.fields" => "header_name"
    }
  }

  for idx, curve in curves {
    mutate {
      convert => {
        "idx" => "string"
      }
    }
    mutate {
      replace => {
        "_curve_name.key" => "curve_name[%{idx}]"
        "_curve_name.value.string_value" => "%{curve.name}"
      }
    }

    mutate {
      merge => {
        "event1.idm.read_only_udm.additional.fields" => "_curve_name"
      }
    }

    mutate {
      replace => {
        "_curve_name" => ""
      }
    }
  }
  #TODO(extract the remaining curve fields)

  for idx,each_record in data {

    mutate {
      convert => {
        "idx" => "string"
      }
    }
    for subidx,each_value in each_record  {

      mutate {
        convert => {
          "subidx" => "string"
          "each_value" => "string"
        }
      }
      mutate {
        replace => {
          "_data.key" => "data[%{idx},%{subidx}]"
          "_data.value.string_value" => "%{each_value}"
        }
      }

      mutate {
        merge => {
          "event1.idm.read_only_udm.additional.fields" => "_data"
        }
      }

      mutate {
        replace => {
          "_data" => ""
        }
      }


    }
  }

  mutate {
    merge => {
      "@output" => "event1"
    }
  }

}

And example output:

 

events: <
  timestamp: <
    seconds: 1700741914
    nanos: 719837698
  >
  idm: <
    read_only_udm: <
      metadata: <
        event_timestamp: <
          seconds: 1700741914
          nanos: 719837698
        >
        event_type: GENERIC_EVENT
      >
      additional: <
        fields: <
          key: "curve_name[0]"
          value: <
            string_value: "MD"
          >
        >
        fields: <
          key: "curve_name[1]"
          value: <
            string_value: "A40H"
          >
        >
        fields: <
          key: "data[0,0]"
          value: <
            string_value: "2907.79"
          >
        >
        fields: <
          key: "data[0,1]"
          value: <
            string_value: "29.955"
          >
        >
        fields: <
          key: "data[1,0]"
          value: <
            string_value: "2907.8"
          >
        >
        fields: <
          key: "data[1,1]"
          value: <
            string_value: "28.892"
          >
        >
        fields: <
          key: "data[2,0]"
          value: <
            string_value: "2907.81"
          >
        >
        fields: <
          key: "data[2,1]"
          value: <
            string_value: "27.868"
          >
        >
        fields: <
          key: "data[3,0]"
          value: <
            string_value: "2907.82"
          >
        >
        fields: <
          key: "data[3,1]"
          value: <
            string_value: "31.451"
          >
        >
        fields: <
          key: "data[4,0]"
          value: <
            string_value: "2907.83"
          >
        >
        fields: <
          key: "data[4,1]"
          value: <
            string_value: "28.08"
          >
        >
        fields: <
          key: "data[5,0]"
          value: <
            string_value: "2907.84"
          >
        >
        fields: <
          key: "data[5,1]"
          value: <
            string_value: "27.733"
          >
        >
        fields: <
          key: "header_name"
          value: <
            string_value: "EcoScope Data"
          >
        >
      >
    >
  >
>

 

Is there a way we get the parser output in JSON format instead of the above one? and it would be better if the output is like as below

metadata.event_type = "GENERIC_EVENT

security_result.action = "ALLOW"