Part 3 – Making The First Test Pass

In this part, we are going to make the test pass which will prove that the store can persist events.

Remember that our test is a high level integration test with no mocking etc..  If I understood more about the interface to kafka I am going to be using, I would write a lower level test also to prove that our code talks to the kafka interface correctly.  This is always debatable as the interface is not ours, but we need to prove it somewhere.  Saying that, if the high level test works, who cares if we talk to library ‘a’, ‘b’ or ‘c’ to achieve it ?

For now I am not going to do a unit test for it.  I am going to use the high level test to learn about the kafka interface and make a decision whether I should also do unit tests for it or not.

Decide on a library to use

I have done some research using libhunt etc.. as to what the most popular kafka libraries are and I have made a decision that I will use the shopify version (https://github.com/Shopify/sarama)

To install it, we use the following

dep ensure -add gopkg.in/Shopify/sarama.v1

Next, lets import it into our store (kafka_store.go) – adding this just under the package definition

import "gopkg.in/Shopify/sarama.v1"

Ok, thats the dependency added but our tests will fail as the import is not yet used so go will not build.  Thats a job for later now – off to do some shopping.

Setting Up Kafka

I don’t want to learn all about installing kafka on my nice shiny new MacBook Pro, so I am going to use docker-compose to provide me with all of the services that I need for this project, all with the ports forwarded that I need.

So, I added docker-compose.yml into the project root and it looks like this :-

version: '2'
services:
  kafka:
    image: spotify/kafka
    ports:
      - "9092:9092"
    environment:
      ADVERTISED_HOST: ${HOSTNAME}
      ADVERTISED_PORT: 9092

Note that using ${HOSTNAME} for advertised host means that HOSTNAME must be set. Sometimes in OSX it is not.  Also, right now I am unsure if this is correct or if it should be the IP address of the virtual machine used by docker but I don’t know how to get that yet.
Also, sometimes HOSTNAME is set but not exported – so lookout for a message saying it is blank when you start docker-compose.

To run all services, run

docker-compose up

Publishing To Kafka

Now that we have kafka running, lets update our kafka_store to talk to it.

For now, I am just using some example code that I found to write any old rubbish to kafka – once we have proved that it is getting through then we will focus on getting it right.

So, kafka_store.go should now look like this

package store

import (
   "gopkg.in/Shopify/sarama.v1"
   "log"
)

type KafkaStore struct {

}

func NewKafkaStore() *KafkaStore {
   return &KafkaStore{}
}

func (s *KafkaStore) Publish(streamUUID string, event EventInterface) error {
   producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)
   if err != nil {
      log.Fatalln((err))
   }
   defer func() {
      if err := producer.Close(); err != nil {
         log.Fatalln(err)
      }
   }()

   msg := &sarama.ProducerMessage{Topic: streamUUID, Value: sarama.StringEncoder("Hello Kafka - From Go")}

   partition, offset, err := producer.SendMessage(msg)

   if err != nil {
      log.Printf("FAILED to send message %s\n", err)
   } else {
      log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
   }


   return nil
}

func (s *KafkaStore) AllEvents(streamUUID string) (EventLazyIterator, error) {
   return NewKafkaEvents(), nil
}

All I have done here is added code to the Publish function which creates a producer, creates a message saying “Hello Kafka From Go”.  This message is then sent via the producer – then the producer is closed.  Nothing like production code yet but lets just see what happens.

But, how do we see what is happening in kafka ?  We use a tool called kafkacat (https://github.com/edenhill/kafkacat) – please see the URL to see how to install it – its very easy.

Now that kafkacat is installed – lets grab some info from it using this command

kafkacat -L -b localhost

and you should see something like this :-

Metadata for all topics (from broker -1: localhost:9092/bootstrap):

 1 brokers:

  broker 0 at Garys-MacBook-Pro.local:9092

 1 topics:

  topic "0accf7f9-a8c8-45ca-8826-1279948dd1f3" with 1 partitions:

    partition 0, leader 0, replicas: 0, isrs: 0

Don’t worry too much about what it all means, but lets run our test and look at it again – so run :-

go test ./...

then

kafkacat -L -b localhost

and you should see :-

Metadata for all topics (from broker -1: localhost:9092/bootstrap):

 1 brokers:

  broker 0 at Garys-MacBook-Pro.local:9092

 2 topics:

  topic "0accf7f9-a8c8-45ca-8826-1279948dd1f3" with 1 partitions:

    partition 0, leader 0, replicas: 0, isrs: 0

  topic "a2fb53f1-5fd5-48b1-9570-341b72695ff7" with 1 partitions:

    partition 0, leader 0, replicas: 0, isrs: 0

 

So, we can see that we now have an extra topic added – a2fb53f1-5fd5-48b1-9570-341b72695ff7

So, lets see if we can find out some information about that topic with the following command (replacing the long uuid with the one that your output gave you from the above command of course)

kafkacat -b localhost -t a2fb53f1-5fd5-48b1-9570-341b72695ff7

and you should see something like this :-

% Auto-selecting Consumer mode (use -P or -C to override)

% ERROR: Local: Broker transport failure: Garys-MacBook-Pro.local:9092/0: Connect to ipv6#[fe80::84:c8ad:222c:1e15%en0]:9092 failed: Connection refused

% ERROR: Local: Broker transport failure: Garys-MacBook-Pro.local:9092/0: Connect to ipv6#[fe80::84:c8ad:222c:1e15%en0]:9092 failed: Connection refused

Hello Kafka From Go

% Reached end of topic a2fb53f1-5fd5-48b1-9570-341b72695ff7 [0] at offset 1

So, we can see our “Hello Kafka From Go” which is good.  Don’t worry if you get those IPV6 errors above – its to do with the fact we are exposing IPV4 stuff from docker but it is expecting IPV6 stuff which isn’t there.  We can come back to that later – it might slow down stuff a bit as we have to wait for it to time out, but I want to stay focussed on the code right now.

So, now we can see our test message coming through, lets change the code so that the actually event is persisted.

First, lets add an ‘EventEncoder’ that takes anything that implements the EventInterface and produces a string from it.  Eventually, we will have an EventDecoder that does the reverse as well.

So, lets add pkg/store/serialize_event.go and it should look like this :-

package store

import (
   "errors"
   "fmt"
)

func EventEncoder(event EventInterface) ([]byte, error) {
   data, error := event.Encode()
   if error != nil {
      return nil, errors.New(fmt.Sprintf("The event could not encode itself - reason '%s'", error.Error()))
   }
   return data, nil
}

This might look odd as we are simply asking the event to encode itself.  I am thinking ahead a little in that I might want the encoder to be able to do everything for itself – but as I am not sure what I will end up exposing in the EventInterface, then this will do for now and I will re consider later.

So, now we need to add Encode to EventInterface and its only implementation – so the pkg/store/event.go should now look something like this (note that I changed the structure so that it is not exported and changed this internal struct to export the fields otherwise they don’t appear in JSON.

package store

import (
   "encoding/json"
   "fmt"
   "errors"
)

type EventInterface interface {
   Name() string
   UUID() string
   Encode() ([]byte, error)
}

type event struct {
   EventName string
   EventUuid string
}

func (e *event) Name() string {
   return e.EventName
}

func (e *event) UUID() string {
   return e.EventUuid
}

func (e *event) Encode() ([]byte, error) {
   serialized, err := json.Marshal(e)
   if err != nil {
      return nil, errors.New(fmt.Sprintf("The encoding of the event '%s' failed - reason '%s'", e.UUID(), err.Error()))
   }
   return serialized, nil
}

func NewEvent(name string, uuid string) EventInterface {
   return &event{EventName: name, EventUuid: uuid}
}

I changed the type of the Encode to a byte array, simply because that is what the json library outputs.  Not sure if that was the right thing to do or not – I’m not familiar with the byte array compared with string yet and how they can convert between them etc…

So, now if we run our tests and have a look what it stored in kafka (I stopped docker-compose then did docker-compose down then docker-compose up to ensure we had a clean slate first)

Garys-MacBook-Pro:eventstore garytaylor$ kafkacat -b localhost -t 35f9155a-9173-4741-b045-88cd75c0f23b

% Auto-selecting Consumer mode (use -P or -C to override)

% ERROR: Local: Broker transport failure: Garys-MacBook-Pro.local:9092/0: Connect to ipv6#[fe80::84:c8ad:222c:1e15%en0]:9092 failed: Connection refused

{"EventName":"My Test Event","EventUuid":"1234566745645645645654645"}

% Reached end of topic 35f9155a-9173-4741-b045-88cd75c0f23b [0] at offset 1

Result :-). Here we see our event serialised how we expect it (for now at least)

Using The Test Suite To Validate Event Has Persisted

Ok, we cannot go on using command line tools to prove half of our test works, so lets now focus on implementing the ‘Count’ and ‘Next’ functions for KafkaEvents.

Lets do Count first as this is the first thing complaining in the test output.

When I thought about this further, I decided against using Count – Im sure it will be useful in the future, but for the sake of this test – I can get away with calling Next twice, so I removed Count from the EventLazyIterator interface and its only implementation in KafkaEvents.  My main reason is that count is fairly useless as it is only the count at that point in time – we may as well just read the events in this case.

So, our test now looks like this

package store

import (
   "testing"
   "github.com/satori/go.uuid"
   "fmt"
)

func TestKafkaStorePublish(t *testing.T) {
   // Arrange
   // Ensure that the kafka store implements the Store interface
   var store Store = NewKafkaStore()

   // Sent an event to it
   event := NewEvent("My Test Event", "1234566745645645645654645")
   streamUUID := uuid.NewV4().String()
   // Act
   err := store.Publish(streamUUID, event)
   if err != nil {
      t.Error("The store's Publish function had an error ")
      return
   }

   // Assert
   collection, err := store.AllEvents(streamUUID)
   if err != nil {
      t.Error("The AllEvents method returned an error" + err.Error())
      return
   }

   ch := collection.Each()

   foundEvent := <- ch
   if len(ch) != 0 {
      t.Error(fmt.Sprintf("expected zero events after read, got %d", len(ch)))
   }

   if foundEvent.Name() != "My Test Event" {
      t.Error("This is not the event stored")
   }
}

So, off I go to make this pass – and it eventually does.  I won’t go into the details of the code as it is getting bigger and bigger, but this exercise has taught me how I want to design the system etc..  I know how to publish and reading of events. Subscribing is also covered.  So, as I have been hacking away at this, I now plan to scrap the implementation and start again using TDD to test individual parts of the system rather than just 1 high level test.

The high level test will always remain our end goal, but the lower level tests enable us to focus on nice clean, consistent interfaces etc.. and encourage nice, testable code.  I would never ‘write tests after the implementation’  which I know some do, but I do not believe in it – its as important to see your tests failing – and failing correctly as it is. to see them passing.

So, I will now remove all of my code, re implement with unit tests as well as the high level test and I will then post again with the results.  This should then give us the basics required for an event store for a CQRS system.

 


0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *