How to Deploy, Configure and Use a TAXII 2 Server to Exchange Cyber Threat Intelligence
Table of Contents
This post uses OpenTAXII, an open-source implementation of a TAXII server compatible with versions 1 and 2. Specifically, it uses the code in this pull request, which addresses some existing issues with the version 2 of the TAXII implementation.
What is TAXII and STIX? #
TAXII is a set of specifications that define a protocol for the exchange of cyber threat information. It enables organizations to share and disseminate threat intelligence in a standardized and automated manner. STIX is a standardized language for expressing and representing structured threat information. It provides a common framework for describing Cyber Threat Intelligence (CTI), including Indicators of Compromise (IoCs), Tactics, Techniques, and Procedures (TTPs), and other contextual information related to cybersecurity threats. In this post, I will not extensively explain these standards but rather demonstrate how to put them into practice by deploying a TAXII server version 2 and using it to exchange data in STIX 2.1. For more information on TAXII and STIX, you can go to:
Configuration for TAXII 2 #
The default configuration file is not using TAXII version 2 but 1, we need to change the configuration to use TAXII 2. Add the following to OpenTAXII/opentaxii/defaults.yml
.
taxii2:
public_discovery: true
allow_custom_properties: true
description: "TAXII2 Server"
title: "Taxii2 Service"
max_content_length: 2048
persistence_api:
class: opentaxii.persistence.sqldb.Taxii2SQLDatabaseAPI
parameters:
db_connection: sqlite:////tmp/data2.db
create_tables: yes
How to deploy a TAXII 2 Server #
To deploy a TAXII server we are going to use eclecticiq/OpenTAXII. An implementation of TAXII server compatible with TAXII versions 1 and 2.
Deploying it is really simple:
-
Clone the repository and enter in it:
git clone https://github.com/eclecticiq/OpenTAXII.git cd OpenTAXII
-
Create a virtual environment, you can do it with
python3 -m venv .venv
. Then activate the environment withsource .venv/bin/activate
. -
Now we are going to install some console commands with:
python3 setup.py install
-
Finally, we are going to deploy the server executing the following command:
opentaxii-run-dev
This will deploy the server in development mode, to deploy in production mode you can refer to the OpenTAXII documentation: running/#production-mode.
Docker Deployment #
We can deploy OpenTAXII in a docker container with two instances of postgresql
, one for authentication data and the other for the rest of data. Hence, we need to change the TAXII configuration to use those instances:
|
|
Go into OpenTAXII/examples/
and execute sudo docker compose up -f docker-compose-local.yaml -d --build
. Where docker-compose-local.yml
is:
version: '3'
services:
db:
image: postgres:9.4
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: password
POSTGRES_DB: opentaxii
authdb:
image: postgres:9.4
environment:
POSTGRES_USER: user1
POSTGRES_PASSWORD: password1
POSTGRES_DB: opentaxii1
opentaxii:
build:
context: ../
dockerfile: Dockerfile
environment:
OPENTAXII_AUTH_SECRET: secret
OPENTAXII_DOMAIN: 127.0.0.1:9000
OPENTAXII_USER: user
OPENTAXII_PASS: pass
DATABASE_HOST: db
DATABASE_NAME: opentaxii
DATABASE_USER: user
DATABASE_PASS: password
AUTH_DATABASE_HOST: authdb
AUTH_DATABASE_NAME: opentaxii1
AUTH_DATABASE_USER: user1
AUTH_DATABASE_PASS: password1
volumes:
- ./:/input:ro
ports:
- 9000:9000
links:
- db:db
- authdb:authdb
Now we can exec into the OpenTAXII container and execute the same commands mentioned in section Creating and Accessing API Roots and Collections.
Deployment for Debugging #
I normally use
debugpy and a DAP client nvim-dap-ui
, to debug python applications. In order to deploy the server for debugging you should:
-
Make sure that the Flask application is being deployed without debugger, it could cause conflicts with debugpy. Modify
opentaxii/cli/__init__.py
fromapp.debug = True
toapp.debug = False
. -
Instead of installing you should execute
python3 setup.py develop
. This will create links to the package instead of copying the files. Now, changes made to the source are immediately reflected in the installed packages. -
Deploy the application with debugpy:
python3 -m debugpy --listen 127.0.0.1:5678 --wait-for-client .venv/bin/opentaxii-run-dev
-
Then, you can attach your DAP client to
127.0.0.1:5678
. You can also use the followinglaunch.json
to launch the OpenTAXII server and attach the debugger to it.{ "version": "0.2.0", "configurations": [ { "name": "opentaxii-run-dev", "type": "python", "request": "launch", "console": "integratedTerminal", "program": "${workspaceFolder}/.venv/bin/opentaxii-run-dev", "pythonPath": "${workspaceFolder}/.venv/bin/python3" } ] }
Creating and Accessing API Roots and Collections #
You can create a public API Root with:
opentaxii-add-api-root -t "baseapiroot" -d "This is a base api root" --default --public
Then you can create a collection inside the API root using the rootid
field:
opentaxii-add-collection --rootid 3289b4eb-5f15-475d-a336-c3a03eb0975e -t "MyCollection" -d "This is a collection" -a "MyCollectionAlias" --public --public-write
The API root and collection are public, so you can access them without authentication with curl
.
- You can check the TAXII2 API roots with
curl http://localhost:9000/taxii2/
. - The collections with:
curl http://localhost:9000/taxii2/<api_root_id>/collections/
. - You can also access the objects of the collection with:
curl http://localhost:9000/taxii2/<api_root_id>/collections/<collection_id>/objects/
.
Once that you have created an API root and a collection you can push and pull data with
taxii2-client. You can create accounts in order to poll, push and subscribe to private collections. Create OpenTAXII/examples/data-configuration-accounts.yml
:
---
accounts:
- username: user_read
password: user_read
permissions:
taxii1:
firstcollection: read
taxii2:
ea9cdf30-root-idc3-b308-bf658d865cae:
privCollectionAlias: read
- username: user_write
password: user_write
permissions:
taxii2:
ea9cdf30-root-idc3-b308-bf658d865cae:
privCollectionAlias: modify
- username: admin
password: admin
is_admin: yes
Now, create those accounts with opentaxii-sync-data examples/data-configuration-accounts.yml
.
To try poll, push, and subscription functionalities you can use the following python script.
import json
import sys
import requests
from taxii2client.v21 import Server
from taxii2client.exceptions import AccessError
from uuid import uuid4
from time import sleep
# Define your TAXII server and collection details
OPENTAXII_URL = "http://localhost:9000/"
TAXII2_SERVER = OPENTAXII_URL + "taxii2/"
USERNAME = "user_write"
PASSWORD = "user_write"
def pull_data(api_root_url, collection):
# Pull data from the TAXII collection
try:
# Pull data from the collection
data = collection.get_objects()
print(f"Num objects pulled: {len(data.get('objects', []))}")
except AccessError:
print("[Pull Error] The user does not have write access")
return None
return data
def push_data(api_root_url, collection):
# load stix data and push it
with open("stix/nettool.stix.json", "r") as f:
stix_loaded = json.load(f)
stix_type = stix_loaded["type"]
stix_id = stix_type + "--" + str(uuid4())
stix_loaded["id"] = stix_id
envelope_data = {
"more": False,
"objects": [stix_loaded],
}
try:
# Push data to the collection
collection.add_objects(envelope_data)
print("Data pushed successfully.")
except AccessError:
print("[Push Error] The user does not have write access")
def subscribe(api_root_url, collection):
total_objects_pulled = 0
added_after = None
# Get Authentication Token
response = requests.post(
OPENTAXII_URL + "management/auth",
headers={
"Content-Type": "application/json",
},
json={
"username": USERNAME,
"password": PASSWORD,
},
)
auth_token = response.json().get("token", None)
while True:
if added_after is None:
url = api_root_url + "collections/" + collection.id + "/objects/"
else:
url = (
api_root_url
+ "collections/"
+ collection.id
+ f"/objects/?added_after={added_after}"
)
# Get all objects from added_after
response = requests.get(
url=url,
headers={
"Authorization": f"Bearer {auth_token}",
},
)
taxii_env = response.json()
objects = taxii_env.get("objects", [])
print(f"Read {len(objects)} objects from the TAXII2 server")
if len(objects) > 0:
added_after = response.headers.get("X-TAXII-Date-Added-Last", "")
sleep(3)
def not_an_action(collection):
print("That is not an option!")
def main():
server = Server(
TAXII2_SERVER,
user=USERNAME,
password=PASSWORD,
)
print(server.title)
print("=" * len(server.title))
print("Select an API Root:")
print(server.api_roots)
print()
for index, aroot in enumerate(server.api_roots, start=1):
print(f"{index}.")
try:
print(f"Title: {aroot.title}")
print(f"Description: {aroot.description}")
print(f"Versions: {aroot.versions}")
except Exception:
print(
"This API Root is not public.\nYou need to identify to see this API Root"
)
print()
aroot_choice = input("Enter the number of your choice: ")
try:
aroot_choice = int(aroot_choice)
selected_api_root = server.api_roots[aroot_choice - 1]
collections_l = selected_api_root.collections
except (ValueError, IndexError):
print("Invalid choice. Please enter a valid number.")
sys.exit()
except Exception as e:
print(e)
print("You cannot access this API Root. You need to authenticate.")
sys.exit()
for index, coll in enumerate(collections_l, start=1):
print(f"{index}.")
print(f"\tId: {coll.id}")
print(f"\tTitle: {coll.title}")
print(f"\tAlias: {coll.alias}")
print(f"\tDescription: {coll.description}")
print(f"\tMedia Types: {coll.media_types}")
print(f"\tCan Read: {coll.can_read}")
print(f"\tCan Write: {coll.can_write}")
print(f"\tObjects URL: {coll.objects_url}")
print(f"\tCustom Properties: {coll.custom_properties}")
print()
coll_choice = input("Enter the number of your choice: ")
try:
coll_choice = int(coll_choice)
selected_collection = selected_api_root.collections[coll_choice - 1]
except (ValueError, IndexError):
print("Invalid choice. Please enter a valid number.")
sys.exit()
actions_d = {
1: pull_data,
2: push_data,
3: subscribe,
}
while True:
print()
print("1: Pull")
print("2: Push")
print("3: Subscribe")
action_choice = int(input("Enter the number of your choice: "))
action_func = actions_d.get(action_choice, not_an_action)
action_func(selected_api_root.url, selected_collection)
print()
if __name__ == "__main__":
main()
Where stix/nettool.stix.json
is a STIX object:
{
"modified": "2023-07-25T19:25:59.767Z",
"name": "Net",
"description": "The [Net](https://attack.mitre.org/software/S0039) utility is a component of the Windows operating system. It is used in command-line operations for control of users, groups, services, and network connections. (Citation: Microsoft Net Utility)\n\n[Net](https://attack.mitre.org/software/S0039) has a great deal of functionality, (Citation: Savill 1999) much of which is useful for an adversary, such as gathering system and network information for Discovery, moving laterally through [SMB/Windows Admin Shares](https://attack.mitre.org/techniques/T1021/002) using <code>net use</code> commands, and interacting with services. The net1.exe utility is executed for certain functionality when net.exe is run and can be used directly in commands such as <code>net1 user</code>.",
"type": "tool",
"id": "tool--03342581-f790-4f03-ba41-e82e67392e25",
"created": "2017-05-31T21:32:31.601Z",
"revoked": false,
"external_references": [],
"spec_version": "2.1"
}