Skip to main content
  1. Posts/

How to Deploy, Configure and Use a TAXII 2 Server to Exchange Cyber Threat Intelligence

8 mins· 0 · 0 ·
TAXII STIX guide
aams-eam
Author
aams-eam
Cybersecurity and Development Enthusiast.
Table of Contents

This post uses OpenTAXII, an open-source implementation of a TAXII server compatible with versions 1 and 2. Specifically, it uses the code in this pull request, which addresses some existing issues with the version 2 of the TAXII implementation.

What is TAXII and STIX? #

TAXII is a set of specifications that define a protocol for the exchange of cyber threat information. It enables organizations to share and disseminate threat intelligence in a standardized and automated manner. STIX is a standardized language for expressing and representing structured threat information. It provides a common framework for describing Cyber Threat Intelligence (CTI), including Indicators of Compromise (IoCs), Tactics, Techniques, and Procedures (TTPs), and other contextual information related to cybersecurity threats. In this post, I will not extensively explain these standards but rather demonstrate how to put them into practice by deploying a TAXII server version 2 and using it to exchange data in STIX 2.1. For more information on TAXII and STIX, you can go to:

Configuration for TAXII 2 #

The default configuration file is not using TAXII version 2 but 1, we need to change the configuration to use TAXII 2. Add the following to OpenTAXII/opentaxii/defaults.yml.

taxii2:
  public_discovery: true
  allow_custom_properties: true
  description: "TAXII2 Server"
  title: "Taxii2 Service"
  max_content_length: 2048
  persistence_api:
    class: opentaxii.persistence.sqldb.Taxii2SQLDatabaseAPI
    parameters:
      db_connection: sqlite:////tmp/data2.db
      create_tables: yes

How to deploy a TAXII 2 Server #

To deploy a TAXII server we are going to use eclecticiq/OpenTAXII. An implementation of TAXII server compatible with TAXII versions 1 and 2.

Deploying it is really simple:

  1. Clone the repository and enter in it:

    git clone https://github.com/eclecticiq/OpenTAXII.git
    cd OpenTAXII
    
  2. Create a virtual environment, you can do it with python3 -m venv .venv. Then activate the environment with source .venv/bin/activate.

  3. Now we are going to install some console commands with:

    python3 setup.py install
    
  4. Finally, we are going to deploy the server executing the following command:

    opentaxii-run-dev
    

This will deploy the server in development mode, to deploy in production mode you can refer to the OpenTAXII documentation: running/#production-mode.

Docker Deployment #

We can deploy OpenTAXII in a docker container with two instances of postgresql, one for authentication data and the other for the rest of data. Hence, we need to change the TAXII configuration to use those instances:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
---

domain: "localhost:9000"

support_basic_auth: yes
return_server_error_details: no

auth_api:
  class: opentaxii.auth.sqldb.SQLDatabaseAPI
  parameters:
    db_connection: postgresql://user1:[email protected]:5432/opentaxii1
    create_tables: yes
    secret: SECRET-STRING-NEEDS-TO-BE-CHANGED
    token_ttl_secs: 3600

taxii1:
  save_raw_inbox_messages: yes
  xml_parser_supports_huge_tree: yes
  count_blocks_in_poll_responses: no
  unauthorized_status: UNAUTHORIZED
  hooks:
  persistence_api:
    class: opentaxii.persistence.sqldb.SQLDatabaseAPI
    parameters:
      db_connection: postgresql://user:[email protected]:5432/opentaxii
      create_tables: yes

taxii2:
  public_discovery: true
  allow_custom_properties: true
  description: "TAXII2 Server"
  title: "Taxii2 Service"
  max_content_length: 2048
  persistence_api:
    class: opentaxii.persistence.sqldb.Taxii2SQLDatabaseAPI
    parameters:
      db_connection: postgresql://user:[email protected]:5432/opentaxii
      create_tables: yes

logging:
  opentaxii: info
  root: info

Go into OpenTAXII/examples/ and execute sudo docker compose up -f docker-compose-local.yaml -d --build. Where docker-compose-local.yml is:

version: '3'

services:
  db:
    image: postgres:9.4
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: password
      POSTGRES_DB: opentaxii
  
  authdb:
    image: postgres:9.4
    environment:
      POSTGRES_USER: user1
      POSTGRES_PASSWORD: password1
      POSTGRES_DB: opentaxii1
  
  opentaxii:
    build:
      context: ../
      dockerfile: Dockerfile
    environment:
      OPENTAXII_AUTH_SECRET: secret
      OPENTAXII_DOMAIN: 127.0.0.1:9000
      OPENTAXII_USER: user
      OPENTAXII_PASS: pass
      DATABASE_HOST: db
      DATABASE_NAME: opentaxii
      DATABASE_USER: user
      DATABASE_PASS: password
      AUTH_DATABASE_HOST: authdb
      AUTH_DATABASE_NAME: opentaxii1
      AUTH_DATABASE_USER: user1
      AUTH_DATABASE_PASS: password1
    volumes:
      - ./:/input:ro
    ports:
      - 9000:9000
    links:
      - db:db
      - authdb:authdb

Now we can exec into the OpenTAXII container and execute the same commands mentioned in section Creating and Accessing API Roots and Collections.

Deployment for Debugging #

I normally use debugpy and a DAP client nvim-dap-ui, to debug python applications. In order to deploy the server for debugging you should:

  1. Make sure that the Flask application is being deployed without debugger, it could cause conflicts with debugpy. Modify opentaxii/cli/__init__.py from app.debug = True to app.debug = False.

  2. Instead of installing you should execute python3 setup.py develop. This will create links to the package instead of copying the files. Now, changes made to the source are immediately reflected in the installed packages.

  3. Deploy the application with debugpy:

    python3 -m debugpy --listen 127.0.0.1:5678 --wait-for-client .venv/bin/opentaxii-run-dev
    
  4. Then, you can attach your DAP client to 127.0.0.1:5678. You can also use the following launch.json to launch the OpenTAXII server and attach the debugger to it.

    {
    
    "version": "0.2.0",
      "configurations": [
        {
          "name": "opentaxii-run-dev",
          "type": "python",
          "request": "launch",
          "console": "integratedTerminal",
          "program": "${workspaceFolder}/.venv/bin/opentaxii-run-dev",
          "pythonPath": "${workspaceFolder}/.venv/bin/python3"
        }
      ]
    }
    

Creating and Accessing API Roots and Collections #

You can create a public API Root with:

opentaxii-add-api-root -t "baseapiroot" -d "This is a base api root" --default --public

Then you can create a collection inside the API root using the rootid field:

opentaxii-add-collection --rootid 3289b4eb-5f15-475d-a336-c3a03eb0975e -t "MyCollection" -d "This is a collection" -a "MyCollectionAlias" --public --public-write

The API root and collection are public, so you can access them without authentication with curl.

  • You can check the TAXII2 API roots with curl http://localhost:9000/taxii2/.
  • The collections with: curl http://localhost:9000/taxii2/<api_root_id>/collections/.
  • You can also access the objects of the collection with: curl http://localhost:9000/taxii2/<api_root_id>/collections/<collection_id>/objects/.

Once that you have created an API root and a collection you can push and pull data with taxii2-client. You can create accounts in order to poll, push and subscribe to private collections. Create OpenTAXII/examples/data-configuration-accounts.yml:

---
accounts:
  - username: user_read
    password: user_read
    permissions:
      taxii1:
        firstcollection: read
      taxii2:
        ea9cdf30-root-idc3-b308-bf658d865cae:
          privCollectionAlias: read
  - username: user_write
    password: user_write
    permissions:
      taxii2:
        ea9cdf30-root-idc3-b308-bf658d865cae:
          privCollectionAlias: modify
  - username: admin
    password: admin
    is_admin: yes

Now, create those accounts with opentaxii-sync-data examples/data-configuration-accounts.yml.

To try poll, push, and subscription functionalities you can use the following python script.

import json
import sys
import requests
from taxii2client.v21 import Server
from taxii2client.exceptions import AccessError
from uuid import uuid4
from time import sleep

# Define your TAXII server and collection details
OPENTAXII_URL = "http://localhost:9000/"
TAXII2_SERVER = OPENTAXII_URL + "taxii2/"
USERNAME = "user_write"
PASSWORD = "user_write"


def pull_data(api_root_url, collection):
    # Pull data from the TAXII collection
    try:
        # Pull data from the collection
        data = collection.get_objects()
        print(f"Num objects pulled: {len(data.get('objects', []))}")
    except AccessError:
        print("[Pull Error] The user does not have write access")
        return None

    return data


def push_data(api_root_url, collection):
    # load stix data and push it
    with open("stix/nettool.stix.json", "r") as f:
        stix_loaded = json.load(f)

    stix_type = stix_loaded["type"]
    stix_id = stix_type + "--" + str(uuid4())
    stix_loaded["id"] = stix_id

    envelope_data = {
        "more": False,
        "objects": [stix_loaded],
    }
    try:
        # Push data to the collection
        collection.add_objects(envelope_data)
        print("Data pushed successfully.")
    except AccessError:
        print("[Push Error] The user does not have write access")


def subscribe(api_root_url, collection):
    total_objects_pulled = 0
    added_after = None

    # Get Authentication Token
    response = requests.post(
        OPENTAXII_URL + "management/auth",
        headers={
            "Content-Type": "application/json",
        },
        json={
            "username": USERNAME,
            "password": PASSWORD,
        },
    )
    auth_token = response.json().get("token", None)

    while True:
        if added_after is None:
            url = api_root_url + "collections/" + collection.id + "/objects/"
        else:
            url = (
                api_root_url
                + "collections/"
                + collection.id
                + f"/objects/?added_after={added_after}"
            )

        # Get all objects from added_after
        response = requests.get(
            url=url,
            headers={
                "Authorization": f"Bearer {auth_token}",
            },
        )
        taxii_env = response.json()
        objects = taxii_env.get("objects", [])

        print(f"Read {len(objects)} objects from the TAXII2 server")
        if len(objects) > 0:
            added_after = response.headers.get("X-TAXII-Date-Added-Last", "")

        sleep(3)


def not_an_action(collection):
    print("That is not an option!")


def main():
    server = Server(
        TAXII2_SERVER,
        user=USERNAME,
        password=PASSWORD,
    )
    print(server.title)
    print("=" * len(server.title))

    print("Select an API Root:")
    print(server.api_roots)
    print()
    for index, aroot in enumerate(server.api_roots, start=1):
        print(f"{index}.")
        try:
            print(f"Title: {aroot.title}")
            print(f"Description: {aroot.description}")
            print(f"Versions: {aroot.versions}")
        except Exception:
            print(
                "This API Root is not public.\nYou need to identify to see this API Root"
            )
        print()

    aroot_choice = input("Enter the number of your choice: ")
    try:
        aroot_choice = int(aroot_choice)
        selected_api_root = server.api_roots[aroot_choice - 1]
        collections_l = selected_api_root.collections
    except (ValueError, IndexError):
        print("Invalid choice. Please enter a valid number.")
        sys.exit()
    except Exception as e:
        print(e)
        print("You cannot access this API Root. You need to authenticate.")
        sys.exit()

    for index, coll in enumerate(collections_l, start=1):
        print(f"{index}.")
        print(f"\tId: {coll.id}")
        print(f"\tTitle: {coll.title}")
        print(f"\tAlias: {coll.alias}")
        print(f"\tDescription: {coll.description}")
        print(f"\tMedia Types: {coll.media_types}")
        print(f"\tCan Read: {coll.can_read}")
        print(f"\tCan Write: {coll.can_write}")
        print(f"\tObjects URL: {coll.objects_url}")
        print(f"\tCustom Properties: {coll.custom_properties}")
        print()

    coll_choice = input("Enter the number of your choice: ")
    try:
        coll_choice = int(coll_choice)
        selected_collection = selected_api_root.collections[coll_choice - 1]
    except (ValueError, IndexError):
        print("Invalid choice. Please enter a valid number.")
        sys.exit()

    actions_d = {
        1: pull_data,
        2: push_data,
        3: subscribe,
    }

    while True:
        print()
        print("1: Pull")
        print("2: Push")
        print("3: Subscribe")
        action_choice = int(input("Enter the number of your choice: "))
        action_func = actions_d.get(action_choice, not_an_action)
        action_func(selected_api_root.url, selected_collection)
        print()


if __name__ == "__main__":
    main()

Where stix/nettool.stix.json is a STIX object:

{
    "modified": "2023-07-25T19:25:59.767Z",
    "name": "Net",
    "description": "The [Net](https://attack.mitre.org/software/S0039) utility is a component of the Windows operating system. It is used in command-line operations for control of users, groups, services, and network connections. (Citation: Microsoft Net Utility)\n\n[Net](https://attack.mitre.org/software/S0039) has a great deal of functionality, (Citation: Savill 1999) much of which is useful for an adversary, such as gathering system and network information for Discovery, moving laterally through [SMB/Windows Admin Shares](https://attack.mitre.org/techniques/T1021/002) using <code>net use</code> commands, and interacting with services. The net1.exe utility is executed for certain functionality when net.exe is run and can be used directly in commands such as <code>net1 user</code>.",
    "type": "tool",
    "id": "tool--03342581-f790-4f03-ba41-e82e67392e25",
    "created": "2017-05-31T21:32:31.601Z",
    "revoked": false,
    "external_references": [],
    "spec_version": "2.1"
}

Related

Using Neovim as IDE with NvChad
11 mins· 0 · 0
developing Neovim guide
Deploying a Kubernetes cluster with containerd and an insecure private Docker registry
7 mins· 0 · 0
kubernetes docker devops containerd guide