Feed events to Kafka Connect

Mosaic enables you to feed events into your SIEM (Security information and event management) and log collection systems. To provide better control over the granularity of data attributes sent to downstream systems, Mosaic allows creating event streams, including multiple streams per event type, and collecting events separately. To learn more, see About event streaming.

To feed data, Mosaic uses Confluent HTTP Source Connector to fetch data from Mosaic Event streaming API endpoint and write it to a Kafka topic.

How it works

The connector polls the specified HTTP endpoint at regular intervals and writes the response to a Kafka topic. Learn more at HTTP Source Connector.

Step 1: Configure a management app

In your Mosaic tenant, configure a management app. Give the app a suitable name, for example, KafkaLogStream.

Note

After saving the management app, open it again and note the Client ID and Client Secret values. You’ll need these parameters to configure the Connector.

Step 2: Create event streams

Before you can start feeding events to Kafka Connect, you have to enable data collection and create event streams in Mosaic.

  1. Enable event collection. Specify the type of events you want to collect.
Copy
Copied
import fetch from 'node-fetch';
async function run() {
  const query = new URLSearchParams({
    type: '<TYPE>', // Event type. One of cis, admin, risk, verify
  }).toString();
  const resp = await fetch(
    `https://api.transmitsecurity.io/activities/v1/activities/start-collect?${query}`,
    {
      method: 'PUT',
      headers: {
        Authorization: 'Bearer <YOUR_TOKEN_HERE>' // Client access token
      }
    }
  );
  const data = await resp.text();
  console.log(data);
}
run();
  1. Create an event stream. Make sure to provide an ID to identify the stream. The stream ID should be a continuous string, without spaces, and unique for each stream.
Copy
Copied
import fetch from 'node-fetch';
async function run() {
  const query = new URLSearchParams({
    type: '<TYPE>', // Event type. One of cis, admin, risk, verify
    stream_id: 'string' // Unique stream ID, without spaces
  }).toString();
  const resp = await fetch(
    `https://api.transmitsecurity.io/activities/v1/activities/stream?${query}`,
    {
      method: 'PUT',
      headers: {
        Authorization: 'Bearer <YOUR_TOKEN_HERE>' // Client access token
      }
    }
  );
  const data = await resp.text();
  console.log(data);
}
run();

Step 3: Configure the Connector

The Connector has to be configured to obtain events from Collect events endpoint. Update the activities-to-kafka-connector.json configuration file as described below:

Setting Description
URL Configuration The URL points to the Mosaic Event streaming API endpoint: "url": "https://api.transmitsecurity.io/activities/v1/activities/collect?type=<type>&stream_id=<stream_id>" Provide parameters:
  • type: Filter activities by type (e.g., “admin”, “user”)
  • stream_id: Define a specific stream ID for your data
Authentication The connector uses OAuth2 authentication. More details: HTTP Source Connector Authentication configuration. Provide client ID and client secret of the Mosaic management app (see Step 1).
  • OAUTH2_CLIENT_ID=your_client_id/li>
  • OAUTH2_CLIENT_SECRET=your_client_secret
Additional settings
  • confluent.topic.bootstrap.servers: The Kafka bootstrap servers
  • poll.interval.ms: Controls how frequently the API is polled for new data
  • topic.name.pattern: The Kafka topic where activities data will be sent http.initial.
  • offset: The starting point for fetching activities

Step 4: Running the Connector using Docker

To connect to your existing Kafka deployment, run the Connector using Docker. Use the confluentinc/kafka-connect-http-source:latest image.

  1. Install the http-source plugin:
    Copy
    Copied
    confluent-hub install --no-prompt confluentinc/kafka-connect-http-source:latest &&  
    confluent-hub install --no-prompt confluentinc/kafka-connect-http:latest &&
  2. Launch the Kafka Connect worker:
    Copy
    Copied
    /etc/confluent/docker/run &
  3. Wait for initialization and then install the activities-to-kafka-connector :
    Copy
    Copied
    curl -s -X POST 
    -H "Content-Type: application/json" 
    --data @/tmp/activities-to-kafka-connector.json http://localhost:8083/connectors &&
Note

You should be able to receive data at this stage.

Step 5: Test the Connector

In the attached docker-compose.yml file, you can see a full example for local testing:

  • the activities-connect docker
  • running kafka and zookeeper
  • kafka-consumer for printing the data

Before running, make sure to update the following values:

  1. "url": "\<EVENT\_STREAM\_URL\>"
  2. "oauth2.client.id": "\<CLIENT\_ID\>" ,
  3. "oauth2.client.secret": "\<CLIENT\_SECRET\>"

To check the status of the connector, run the following docker:

Copy
Copied
docker exec activities-connect curl http://localhost:8083/connectors/activities-connector/status