LogoTurboGo
Features

Pubsub

Lightweight publish/subscribe system for in-memory event messaging in TurboGo.

Configure Pubsub

TurboGo includes an in-memory publish/subscribe (pubsub) system for event-driven communication between components.


Structure Integration

Pubsub is initialized in main.go via:

app := TurboGo.New().WithPubsub()
go controller.PubsubHandler(app.EngineCtx)

✅ You can also chain it with other modules:

app := TurboGo.New().WithCache().WithQueue().WithPubsub()

How It Works

  • Publish messages using Publish(topic, data)
  • Listeners can subscribe to specific topics using Subscribe(topic) or SubscribeAll()
  • Messages are delivered asynchronously via channels
  • Supports multiple consumers per topic
  • Can be extended to include persistence, retries, or dead-letter logic

Example Usage

Publish an event:

// Memory only
c.MustPubsub().Memory.Publish("user.created", payload)

// Storage only
c.MustPubsub().Storage.Publish("user.created", payload)

// Both (default)
c.MustPubsub().PublishAll("user.created", payload)

Register a subscriber:

Subscribe to events using:

// Memory only
ch := c.Pubsub.Memory.Subscribe("user.created")

// Storage only
ch := c.Pubsub.Storage.Subscribe("user.created")

// Both (default)
ch := c.Pubsub.SubscribeAll("user.created")

Handling events example:

func OnUserCreated(data []byte) error {
	var input CreateUserInput
	if err := json.Unmarshal(data, &input); err != nil {
		return fmt.Errorf("invalid pubsub data: %w", err)
	}
	fmt.Printf("📣 PubSub Event: %s <%s>\n", input.Name, input.Email)
	return nil
}
func PubsubHandler(ps *core.EngineContext) {
	ch := ps.Pubsub.SubscribeAll("user.created")
	for msg := range ch {
		if err := OnUserCreated(msg); err != nil {
			fmt.Println("❌ pubsub handler error:", err)
		}
	}
}

Behind the Scenes

TurboGo Pubsub supports two delivery layers:

  • Memory: Real-time broadcast via channels
  • Storage: Persistent logs for reliable delivery

Messages can be published to either or both.
Old messages are auto-deleted every 10 minutes if older than 24 hours.


Configuration Modes

  • Memory Only – Real-time and fast
  • Storage Only – Durable and replayable
  • Memory + Storage (default) – Combines both

Auto Cleanup

Stored messages are cleaned up automatically:

  • Interval: every 10 * time.Minute
  • Older Than: 24 * time.Hour

Ensures efficient storage usage.


Use Cases

  • Real-time internal event broadcasting
  • Triggering follow-up processes (e.g. welcome email after registration)
  • Decoupled architecture for in-app modules